Skip to main content

Consumer

Struct Consumer 

Source
pub struct Consumer<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> { /* private fields */ }

Implementations§

Source§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

Source

pub fn new( mcache: &'a MCache<MCACHE_DEPTH>, dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>, initial_seq: u64, ) -> Self

Create a consumer starting at the given sequence number.

Source

pub fn with_flow_control( mcache: &'a MCache<MCACHE_DEPTH>, dcache: &'a DCache<CHUNK_COUNT, CHUNK_SIZE>, fctl: &'a Fctl, initial_seq: u64, ) -> Self

Create a consumer with credit-based flow control.

The consumer will release a credit after consuming each fragment. Use the same fctl instance as the producer.

Source

pub fn with_metrics(self, metrics: &'a Metrics) -> Self

Attach metrics tracking to this consumer.

Returns a new consumer with the same configuration plus metrics.

Source

pub fn poll(&mut self) -> Result<Option<Fragment<'a, CHUNK_SIZE>>, TangoError>

Poll for the next fragment without blocking.

Returns:

  • Ok(Some(fragment)) if a fragment was available
  • Ok(None) if the sequence is not ready yet
  • Err(TangoError::Overrun) if the consumer was lapped
Source

pub fn wait(&mut self) -> Result<Option<Fragment<'a, CHUNK_SIZE>>, TangoError>

Busy-wait for the next fragment.

Returns:

  • Ok(Some(fragment)) if a fragment was received
  • Ok(None) if the mcache was stopped
  • Err(TangoError::Overrun) if the consumer was lapped
Source

pub fn next_seq(&self) -> u64

Return the next sequence number the consumer expects.

Source

pub fn release_credits(&self, count: u64)

Manually release credits (useful for batch processing).

Call this after you’re done processing a batch of fragments if you want to delay credit release for better throughput.

Source

pub fn poll_batch( &mut self, max_count: usize, ) -> Result<Vec<Fragment<'a, CHUNK_SIZE>>, TangoError>

Poll for multiple fragments at once, up to max_count.

Returns a vector of fragments (up to max_count) that were available. Stops on the first NotReady or error.

This is more efficient than calling poll() in a loop when you expect multiple messages to be available.

Only available with the std feature.

Trait Implementations§

Source§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Debug for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> IntoIterator for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

Source§

fn into_iter(self) -> Self::IntoIter

Convert this consumer into an iterator that busy-waits for messages.

The iterator yields Result<Fragment, TangoError> and will:

  • Yield Ok(fragment) for each successfully consumed message
  • Yield Err(TangoError::Overrun) if the consumer was lapped
  • Return None when the MCache is stopped
§Example
for result in consumer {
    match result {
        Ok(fragment) => println!("Got: {:?}", fragment.payload.as_slice()),
        Err(e) => eprintln!("Error: {}", e),
    }
}
Source§

type Item = Result<Fragment<'a, CHUNK_SIZE>, TangoError>

The type of the elements being iterated over.
Source§

type IntoIter = ConsumerIter<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

Which kind of iterator are we turning this into?

Auto Trait Implementations§

§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Freeze for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> !RefUnwindSafe for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Send for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Sync for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> Unpin for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

§

impl<'a, const MCACHE_DEPTH: usize, const CHUNK_COUNT: usize, const CHUNK_SIZE: usize> !UnwindSafe for Consumer<'a, MCACHE_DEPTH, CHUNK_COUNT, CHUNK_SIZE>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.