1use std::cell::Cell;
5use std::fmt::Debug;
6use std::sync::atomic::AtomicUsize;
7use std::sync::Arc;
8
9use crate::queue::{SharedQueueChannels, SharedQueueThreaded};
10use crate::shard::{SenderError, Shard};
11
12pub struct MeshBuilder<T> {
15 #[allow(dead_code)]
16 nr_peers: usize,
17 pub(crate) channels: Vec<Arc<SharedQueueThreaded<T>>>,
18 pub(crate) shared_joined: Arc<AtomicUsize>,
19}
20
21impl<T> Debug for MeshBuilder<T> {
22 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 write!(f, "MeshBuilder")
24 }
25}
26
27impl<T> MeshBuilder<T> {
28 pub fn new(nr_peers: usize) -> std::io::Result<Self> {
30 let nb_cpu = std::thread::available_parallelism()?.get();
31
32 MeshBuilder::with_cpu(nr_peers, nb_cpu)
33 }
34
35 pub fn members(&self) -> usize {
37 self.shared_joined
38 .load(std::sync::atomic::Ordering::Acquire)
39 }
40
41 pub fn with_cpu(nr_peers: usize, nb_cpu: usize) -> std::io::Result<Self> {
42 let mut channels = Vec::with_capacity(nr_peers);
43
44 for _i in 0..nr_peers {
45 channels.push(SharedQueueThreaded::<T>::new(nb_cpu)?);
46 }
47
48 Ok(Self {
49 nr_peers,
50 channels,
51 shared_joined: Arc::new(AtomicUsize::new(0)),
52 })
53 }
54
55 #[doc(hidden)]
60 pub fn send_to(&self, pos: usize, item: T) -> Result<(), SenderError> {
61 self.channels
62 .get(pos)
63 .ok_or(SenderError::WrongShard)?
64 .sender()
65 .send(item);
66 Ok(())
67 }
68
69 pub fn join_with(&self, peer: usize) -> std::io::Result<Shard<T>> {
74 self.shared_joined
75 .fetch_add(1, std::sync::atomic::Ordering::Acquire);
76
77 assert!(peer < self.channels.len());
78
79 let senders = self
80 .channels
81 .iter()
82 .map(SharedQueueChannels::sender)
83 .collect();
84 let (_, receiver) = self.channels[peer].unbounded();
85
86 Ok(Shard {
87 receiver: Cell::new(Some(receiver)),
88 senders,
89 max_shard: self.shared_joined.clone(),
90 shard_id: peer,
91 })
92 }
93}