sharded_thread/
shard.rs

1use std::cell::Cell;
2use std::fmt::Debug;
3use std::sync::atomic::AtomicUsize;
4use std::sync::Arc;
5
6use crate::queue::{Receiver, Sender};
7
8#[derive(Debug, thiserror::Error)]
9pub enum SenderError {
10    #[error("You can't send the value to a shard that doesn't exist.")]
11    WrongShard,
12}
13
14/// The structure which is used to communicate with other peers from the Mesh.
15pub struct Shard<T> {
16    pub(crate) receiver: Cell<Option<Receiver<T>>>,
17    pub(crate) senders: Vec<Sender<T>>,
18    /// Number of shard available
19    pub(crate) max_shard: Arc<AtomicUsize>,
20    /// Actual shard id
21    #[allow(dead_code)]
22    pub(crate) shard_id: usize,
23}
24
25impl<T> Debug for Shard<T> {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        write!(f, "Shard")
28    }
29}
30
31impl<T> Shard<T> {
32    /// Take the receiver of this shard.
33    /// Shard are implemented using `mpsc` channels, so only one Receiver can
34    /// receiving values from the other shards.
35    pub fn receiver(&self) -> Option<Receiver<T>> {
36        self.receiver.take()
37    }
38
39    /// Send a value to the proper shard
40    ///
41    /// Fail if this Shard did not join yet.
42    pub fn send_to(&self, val: T, shard: usize) -> Result<(), SenderError> {
43        let max_shard =
44            self.max_shard.load(std::sync::atomic::Ordering::Acquire);
45
46        if shard >= max_shard {
47            return Err(SenderError::WrongShard);
48        }
49
50        let sender = self
51            .senders
52            .get(shard)
53            .expect("the sender should have been here but he is not.");
54
55        sender.send(val);
56        Ok(())
57    }
58
59    /// Send a value to a shard
60    pub fn send_to_unchecked(&self, val: T, shard: usize) {
61        let sender = self
62            .senders
63            .get(shard)
64            .expect("the sender should have been here but he is not.");
65
66        sender.send(val);
67    }
68}