Skip to main content

Subscriber

Struct Subscriber 

Source
pub struct Subscriber<T: Pod> { /* private fields */ }
Expand description

The read side of a Photon SPMC channel.

Each subscriber has its own cursor — no contention between consumers.

Implementations§

Source§

impl<T: Pod> Subscriber<T>

Source

pub fn try_recv(&mut self) -> Result<T, TryRecvError>

Try to receive the next message without blocking.

Source

pub fn recv(&mut self) -> T

Spin until the next message is available and return it.

Uses a two-phase spin strategy: bare spin for the first 64 iterations (minimum wakeup latency, ~0 ns reaction time), then PAUSE-based spin (saves power, yields to SMT sibling). On Skylake+, PAUSE adds ~140 cycles of delay per iteration — the bare-spin phase avoids this penalty when the message arrives quickly (typical for cross-thread pub/sub).

Source

pub fn recv_with(&mut self, strategy: WaitStrategy) -> T

Block until the next message using the given WaitStrategy.

Unlike recv(), which hard-codes a two-phase spin, this method delegates idle behaviour to the strategy — enabling yield-based, park-based, or adaptive waiting.

§Example
use photon_ring::{channel, WaitStrategy};

let (mut p, s) = channel::<u64>(64);
let mut sub = s.subscribe();
p.publish(7);
assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
Source

pub fn latest(&mut self) -> Option<T>

Skip to the latest published message (discards intermediate ones).

Returns None only if nothing has been published yet. Under heavy producer load, retries internally if the target slot is mid-write.

Source

pub fn pending(&self) -> u64

How many messages are available to read (capped at ring capacity).

Source

pub fn total_received(&self) -> u64

Total messages successfully received by this subscriber.

Source

pub fn total_lagged(&self) -> u64

Total messages lost due to lag (consumer fell behind the ring).

Source

pub fn receive_ratio(&self) -> f64

Ratio of received to total (received + lagged). Returns 0.0 if no messages have been processed.

Source

pub fn recv_batch(&mut self, buf: &mut [T]) -> usize

Receive up to buf.len() messages in a single call.

Messages are written into the provided slice starting at index 0. Returns the number of messages received. On lag, the cursor is advanced and filling continues from the oldest available message.

Source

pub fn drain(&mut self) -> Drain<'_, T>

Returns an iterator that drains all currently available messages. Stops when no more messages are available. Handles lag transparently by retrying after cursor advancement.

Source

pub fn tracker(&self) -> Option<Arc<Padded<AtomicU64>>>

Get this subscriber’s cursor tracker for use in a DependencyBarrier.

Returns None if the subscriber was created on a lossy channel without subscribe_tracked(). Use subscribe_tracked() to ensure a tracker is always present.

Source

pub fn try_recv_gated( &mut self, barrier: &DependencyBarrier, ) -> Result<T, TryRecvError>

Try to receive the next message, but only if all upstream subscribers in the barrier have already processed it.

Returns TryRecvError::Empty if the upstream barrier has not yet advanced past this subscriber’s cursor, or if no new message is available from the ring.

§Example
use photon_ring::{channel, DependencyBarrier, TryRecvError};

let (mut pub_, subs) = channel::<u64>(64);
let mut upstream = subs.subscribe_tracked();
let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
let mut downstream = subs.subscribe();

pub_.publish(42);

// Downstream can't read — upstream hasn't consumed it yet
assert_eq!(downstream.try_recv_gated(&barrier), Err(TryRecvError::Empty));

upstream.try_recv().unwrap();

// Now downstream can proceed
assert_eq!(downstream.try_recv_gated(&barrier), Ok(42));
Source

pub fn recv_gated(&mut self, barrier: &DependencyBarrier) -> T

Blocking receive gated by a dependency barrier.

Spins until all upstream subscribers in the barrier have processed the next message, then reads and returns it. On lag, the cursor is advanced and the method retries.

§Example
use photon_ring::{channel, DependencyBarrier};

let (mut pub_, subs) = channel::<u64>(64);
let mut upstream = subs.subscribe_tracked();
let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
let mut downstream = subs.subscribe();

pub_.publish(99);
upstream.try_recv().unwrap();

assert_eq!(downstream.recv_gated(&barrier), 99);

Trait Implementations§

Source§

impl<T: Pod> Drop for Subscriber<T>

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<T: Pod> Send for Subscriber<T>

Auto Trait Implementations§

§

impl<T> Freeze for Subscriber<T>

§

impl<T> !RefUnwindSafe for Subscriber<T>

§

impl<T> !Sync for Subscriber<T>

§

impl<T> Unpin for Subscriber<T>

§

impl<T> UnsafeUnpin for Subscriber<T>

§

impl<T> !UnwindSafe for Subscriber<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.