use super::group::SubscriberGroup;
use super::subscriber::Subscriber;
use crate::pod::Pod;
use crate::ring::{Padded, SharedRing};
use alloc::sync::Arc;
use core::sync::atomic::{AtomicU64, Ordering};
pub struct Subscribable<T: Pod> {
pub(super) ring: Arc<SharedRing<T>>,
}
impl<T: Pod> Clone for Subscribable<T> {
fn clone(&self) -> Self {
Subscribable {
ring: self.ring.clone(),
}
}
}
unsafe impl<T: Pod> Send for Subscribable<T> {}
unsafe impl<T: Pod> Sync for Subscribable<T> {}
impl<T: Pod> Subscribable<T> {
pub fn subscribe(&self) -> Subscriber<T> {
let head = self.ring.cursor.0.load(Ordering::Acquire);
let start = if head == u64::MAX { 0 } else { head + 1 };
let tracker = self.ring.register_tracker(start);
let slots_ptr = self.ring.slots_ptr();
let idx = self.ring.index;
Subscriber {
ring: self.ring.clone(),
slots_ptr,
capacity: idx.capacity,
mask: idx.mask,
reciprocal: idx.reciprocal,
is_pow2: idx.is_pow2,
cursor: start,
tracker,
total_lagged: 0,
total_received: 0,
}
}
pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
let head = self.ring.cursor.0.load(Ordering::Acquire);
let start = if head == u64::MAX { 0 } else { head + 1 };
let tracker = self.ring.register_tracker(start);
let slots_ptr = self.ring.slots_ptr();
let idx = self.ring.index;
SubscriberGroup {
ring: self.ring.clone(),
slots_ptr,
capacity: idx.capacity,
mask: idx.mask,
reciprocal: idx.reciprocal,
is_pow2: idx.is_pow2,
cursor: start,
total_lagged: 0,
total_received: 0,
tracker,
}
}
pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
let head = self.ring.cursor.0.load(Ordering::Acquire);
let cap = self.ring.capacity();
let start = if head == u64::MAX {
0
} else if head >= cap {
head - cap + 1
} else {
0
};
let tracker = self.ring.register_tracker(start);
let slots_ptr = self.ring.slots_ptr();
let idx = self.ring.index;
Subscriber {
ring: self.ring.clone(),
slots_ptr,
capacity: idx.capacity,
mask: idx.mask,
reciprocal: idx.reciprocal,
is_pow2: idx.is_pow2,
cursor: start,
tracker,
total_lagged: 0,
total_received: 0,
}
}
pub fn subscribe_tracked(&self) -> Subscriber<T> {
let head = self.ring.cursor.0.load(Ordering::Acquire);
let start = if head == u64::MAX { 0 } else { head + 1 };
let tracker = self
.ring
.register_tracker(start)
.or_else(|| Some(Arc::new(Padded(AtomicU64::new(start)))));
let slots_ptr = self.ring.slots_ptr();
let idx = self.ring.index;
Subscriber {
ring: self.ring.clone(),
slots_ptr,
capacity: idx.capacity,
mask: idx.mask,
reciprocal: idx.reciprocal,
is_pow2: idx.is_pow2,
cursor: start,
tracker,
total_lagged: 0,
total_received: 0,
}
}
}