1use std::cell::Cell;
2use std::fmt::Debug;
3use std::sync::atomic::AtomicUsize;
4use std::sync::Arc;
5
6use crate::queue::{Receiver, Sender};
7
8#[derive(Debug, thiserror::Error)]
9pub enum SenderError {
10 #[error("You can't send the value to a shard that doesn't exist.")]
11 WrongShard,
12}
13
14pub struct Shard<T> {
16 pub(crate) receiver: Cell<Option<Receiver<T>>>,
17 pub(crate) senders: Vec<Sender<T>>,
18 pub(crate) max_shard: Arc<AtomicUsize>,
20 #[allow(dead_code)]
22 pub(crate) shard_id: usize,
23}
24
25impl<T> Debug for Shard<T> {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 write!(f, "Shard")
28 }
29}
30
31impl<T> Shard<T> {
32 pub fn receiver(&self) -> Option<Receiver<T>> {
36 self.receiver.take()
37 }
38
39 pub fn send_to(&self, val: T, shard: usize) -> Result<(), SenderError> {
43 let max_shard =
44 self.max_shard.load(std::sync::atomic::Ordering::Acquire);
45
46 if shard >= max_shard {
47 return Err(SenderError::WrongShard);
48 }
49
50 let sender = self
51 .senders
52 .get(shard)
53 .expect("the sender should have been here but he is not.");
54
55 sender.send(val);
56 Ok(())
57 }
58
59 pub fn send_to_unchecked(&self, val: T, shard: usize) {
61 let sender = self
62 .senders
63 .get(shard)
64 .expect("the sender should have been here but he is not.");
65
66 sender.send(val);
67 }
68}