sharded_thread/
mesh.rs

1//! The idea of the mesh is to be able to connected threads together through
2//! channels.
3
4use std::cell::Cell;
5use std::fmt::Debug;
6use std::sync::atomic::AtomicUsize;
7use std::sync::Arc;
8
9use crate::queue::{SharedQueueChannels, SharedQueueThreaded};
10use crate::shard::{SenderError, Shard};
11
12/// A Mesh is a structure which can be shared in every thread by reference to
13/// allow threads to join the Mesh and talk to each others.
14pub struct MeshBuilder<T> {
15    #[allow(dead_code)]
16    nr_peers: usize,
17    pub(crate) channels: Vec<Arc<SharedQueueThreaded<T>>>,
18    pub(crate) shared_joined: Arc<AtomicUsize>,
19}
20
21impl<T> Debug for MeshBuilder<T> {
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        write!(f, "MeshBuilder")
24    }
25}
26
27impl<T> MeshBuilder<T> {
28    /// Create a new mesh between a number of peers.
29    pub fn new(nr_peers: usize) -> std::io::Result<Self> {
30        let nb_cpu = std::thread::available_parallelism()?.get();
31
32        MeshBuilder::with_cpu(nr_peers, nb_cpu)
33    }
34
35    ///
36    pub fn members(&self) -> usize {
37        self.shared_joined
38            .load(std::sync::atomic::Ordering::Acquire)
39    }
40
41    pub fn with_cpu(nr_peers: usize, nb_cpu: usize) -> std::io::Result<Self> {
42        let mut channels = Vec::with_capacity(nr_peers);
43
44        for _i in 0..nr_peers {
45            channels.push(SharedQueueThreaded::<T>::new(nb_cpu)?);
46        }
47
48        Ok(Self {
49            nr_peers,
50            channels,
51            shared_joined: Arc::new(AtomicUsize::new(0)),
52        })
53    }
54
55    /// Try to send an item directly to a shard, you must know the id of the
56    /// shard you want to send the item to.
57    ///
58    /// Fail if the shard is not registered.
59    #[doc(hidden)]
60    pub fn send_to(&self, pos: usize, item: T) -> Result<(), SenderError> {
61        self.channels
62            .get(pos)
63            .ok_or(SenderError::WrongShard)?
64            .sender()
65            .send(item);
66        Ok(())
67    }
68
69    /// Join the mesh means you can talk to other peer and peer can talk to you.
70    ///
71    /// You must assign yourself an id so other Shard will be able to talk with
72    /// you using this ID
73    pub fn join_with(&self, peer: usize) -> std::io::Result<Shard<T>> {
74        self.shared_joined
75            .fetch_add(1, std::sync::atomic::Ordering::Acquire);
76
77        assert!(peer < self.channels.len());
78
79        let senders = self
80            .channels
81            .iter()
82            .map(SharedQueueChannels::sender)
83            .collect();
84        let (_, receiver) = self.channels[peer].unbounded();
85
86        Ok(Shard {
87            receiver: Cell::new(Some(receiver)),
88            senders,
89            max_shard: self.shared_joined.clone(),
90            shard_id: peer,
91        })
92    }
93}