use core::{mem::MaybeUninit, num::NonZeroUsize};
use crate::{
queue::ShardOwnership,
spsc::{self, shards::ShardsPtr},
};
pub struct Sender<T> {
inner: spsc::Sender<T, ShardOwnership>,
shards: ShardsPtr<T>,
max_shards: usize,
shard: usize,
}
impl<T> Sender<T> {
pub fn try_clone(&self) -> Option<Self> {
Self::init(
self.shards.clone(),
self.max_shards,
self.shard.wrapping_add(1),
)
}
pub(super) fn new(shards: ShardsPtr<T>, max_shards: NonZeroUsize) -> Self {
Self::init(shards, max_shards.get(), 0).unwrap()
}
fn init(shards: ShardsPtr<T>, max_shards: usize, start_shard: usize) -> Option<Self> {
for offset in 0..max_shards {
let shard = start_shard.wrapping_add(offset) % max_shards;
if let Some(shard_ptr) = shards.claim_producer_queue_ptr(shard) {
let inner = spsc::Sender::from_current(shard_ptr);
return Some(Self {
inner,
shards,
max_shards,
shard,
});
}
}
None
}
pub fn send(&mut self, value: T) {
self.inner.send(value)
}
pub fn try_send(&mut self, value: T) -> Result<(), T> {
self.inner.try_send(value)
}
pub fn write_buffer(&mut self) -> &mut [MaybeUninit<T>] {
self.inner.write_buffer()
}
pub unsafe fn commit(&mut self, len: usize) {
unsafe { self.inner.commit(len) }
}
}
unsafe impl<T> Send for Sender<T> {}
#[cfg(all(test, not(feature = "loom")))]
mod test {
use core::num::NonZeroUsize;
#[test]
fn try_clone_does_not_claim_live_sender_shard_after_receiver_drop() {
let (tx0, rx) = super::super::channel::<usize>(
NonZeroUsize::new(2).unwrap(),
NonZeroUsize::new(4).unwrap(),
);
let tx1 = tx0.try_clone().unwrap();
assert!(tx1.try_clone().is_none());
drop(rx);
assert!(tx1.try_clone().is_none());
}
}