use std::cell::Cell;
use std::fmt::Debug;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use crate::queue::{SharedQueueChannels, SharedQueueThreaded};
use crate::shard::{SenderError, Shard};
pub struct MeshBuilder<T> {
#[allow(dead_code)]
nr_peers: usize,
pub(crate) channels: Vec<Arc<SharedQueueThreaded<T>>>,
pub(crate) shared_joined: Arc<AtomicUsize>,
}
impl<T> Debug for MeshBuilder<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MeshBuilder")
}
}
impl<T> MeshBuilder<T> {
pub fn new(nr_peers: usize) -> std::io::Result<Self> {
let nb_cpu = std::thread::available_parallelism()?.get();
MeshBuilder::with_cpu(nr_peers, nb_cpu)
}
pub fn members(&self) -> usize {
self.shared_joined
.load(std::sync::atomic::Ordering::Acquire)
}
pub fn with_cpu(nr_peers: usize, nb_cpu: usize) -> std::io::Result<Self> {
let mut channels = Vec::with_capacity(nr_peers);
for _i in 0..nr_peers {
channels.push(SharedQueueThreaded::<T>::new(nb_cpu)?);
}
Ok(Self {
nr_peers,
channels,
shared_joined: Arc::new(AtomicUsize::new(0)),
})
}
#[doc(hidden)]
pub fn send_to(&self, pos: usize, item: T) -> Result<(), SenderError> {
self.channels
.get(pos)
.ok_or(SenderError::WrongShard)?
.sender()
.send(item);
Ok(())
}
pub fn join_with(&self, peer: usize) -> std::io::Result<Shard<T>> {
self.shared_joined
.fetch_add(1, std::sync::atomic::Ordering::Acquire);
assert!(peer < self.channels.len());
let senders = self
.channels
.iter()
.map(SharedQueueChannels::sender)
.collect();
let (_, receiver) = self.channels[peer].unbounded();
Ok(Shard {
receiver: Cell::new(Some(receiver)),
senders,
max_shard: self.shared_joined.clone(),
shard_id: peer,
})
}
}