use super::mp_publisher::MpPublisher;
use super::publisher::Publisher;
use super::subscribable::Subscribable;
use crate::pod::Pod;
use crate::ring::SharedRing;
use alloc::sync::Arc;
pub fn channel<T: Pod>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
let ring = Arc::new(SharedRing::new(capacity));
let slots_ptr = ring.slots_ptr();
let idx = ring.index;
let cursor_ptr = ring.cursor_ptr();
(
Publisher {
has_backpressure: ring.backpressure.is_some(),
ring: ring.clone(),
slots_ptr,
capacity: idx.capacity,
mask: idx.mask,
reciprocal: idx.reciprocal,
is_pow2: idx.is_pow2,
cursor_ptr,
seq: 0,
cached_slowest: 0,
},
Subscribable { ring },
)
}
pub fn channel_bounded<T: Pod>(
capacity: usize,
watermark: usize,
) -> (Publisher<T>, Subscribable<T>) {
let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
let slots_ptr = ring.slots_ptr();
let idx = ring.index;
let cursor_ptr = ring.cursor_ptr();
(
Publisher {
has_backpressure: ring.backpressure.is_some(),
ring: ring.clone(),
slots_ptr,
capacity: idx.capacity,
mask: idx.mask,
reciprocal: idx.reciprocal,
is_pow2: idx.is_pow2,
cursor_ptr,
seq: 0,
cached_slowest: 0,
},
Subscribable { ring },
)
}
pub fn channel_mpmc<T: Pod>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
use core::sync::atomic::AtomicU64;
let ring = Arc::new(SharedRing::new_mpmc(capacity));
let slots_ptr = ring.slots_ptr();
let idx = ring.index;
let cursor_ptr = ring.cursor_ptr();
let next_seq_ptr = &ring
.next_seq
.as_ref()
.expect("MPMC ring must have next_seq")
.0 as *const AtomicU64;
(
MpPublisher {
ring: ring.clone(),
slots_ptr,
capacity: idx.capacity,
mask: idx.mask,
reciprocal: idx.reciprocal,
is_pow2: idx.is_pow2,
cursor_ptr,
next_seq_ptr,
},
Subscribable { ring },
)
}