Trait ChannelProducer

Source
pub trait ChannelProducer<'a, ItemType: 'a + Debug + Send + Sync, DerivedItemType: 'a + Debug> {
    // Required methods
    fn send(&self, item: ItemType) -> RetryConsumerResult<(), ItemType, ()>;
    fn send_with<F: FnOnce(&mut ItemType)>(
        &self,
        setter: F,
    ) -> RetryConsumerResult<(), F, ()>;
    fn send_with_async<F: FnOnce(&'a mut ItemType) -> Fut + Send, Fut: Future<Output = &'a mut ItemType> + Send>(
        &'a self,
        setter: F,
    ) -> impl Future<Output = RetryConsumerResult<(), F, ()>> + Send;
    fn reserve_slot(&self) -> Option<&mut ItemType>;
    fn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool;
    fn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool;

    // Provided method
    fn send_derived(&self, _derived_item: &DerivedItemType) -> bool { ... }
}
Expand description

Defines how to send events (to a [Uni] or [Multi]).

Required Methods§

Source

fn send(&self, item: ItemType) -> RetryConsumerResult<(), ItemType, ()>

Similar to Self::send_with(), but for sending the already-built item.
See there for how to deal with the returned type.
IMPLEMENTORS: #[inline(always)]

Source

fn send_with<F: FnOnce(&mut ItemType)>( &self, setter: F, ) -> RetryConsumerResult<(), F, ()>

Calls setter, passing a slot so the payload may be filled there, then sends the event through this channel asynchronously.
The returned type is conversible to Result<(), F> by calling .into() on it, returning Err<setter> when the buffer is full, to allow the caller to try again; otherwise you may add any retrying logic using the keen-retry crate’s API like in:

    xxxx.send_with(|slot| *slot = 42)
        .retry_with(|setter| xxxx.send_with(setter))
        .spinning_until_timeout(Duration::from_millis(300), ())     // go see the other options
        .map_errors(|_, setter| (setter, _), |e| e)                 // map the unconsumed `setter` payload into `Err(setter)` when converting to `Result` ahead
        .into()?;

NOTE: this type may allow the compiler some extra optimization steps when compared to Self::send(). When tuning for performance, it is advisable to try this method.
IMPLEMENTORS: #[inline(always)]

Source

fn send_with_async<F: FnOnce(&'a mut ItemType) -> Fut + Send, Fut: Future<Output = &'a mut ItemType> + Send>( &'a self, setter: F, ) -> impl Future<Output = RetryConsumerResult<(), F, ()>> + Send

Similar to [Self::send_with(), but accepts an async setter. This method is useful for sending operations that depend on data acquired by async blocks, allowing select loops (like the following) to be built:

    tokio::select! {
        _ => async {
            channel_producer.send_with_async(|slot| async {
                let data = data_source.read().await;
                fill_slot(data, &mut slot);
                slot
            }).await
        },
       (...other select arms that may execute concurrently with the above arm...)
    }

IMPLEMENTORS: #[inline(always)]

Source

fn reserve_slot(&self) -> Option<&mut ItemType>

Proxy to crate::prelude::advanced::BoundedOgreAllocator::alloc_ref() from the underlying allocator, allowing caller to fill in the data as they wish – in a non-blocking prone API.
See also [Self::send_reserved()] and [Self::cancel_slot_reserve()].

Source

fn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool

Attempts to send an item previously reserved by Self::reserve_slot(). Failure to do so (when false is returned) might be part of the normal channel operation, so retrying is advised. More: some channel implementations are optimized (or even only accept) sending the slots in the same order they were reserved.

Source

fn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool

Attempts to give up sending an item previously reserved by Self::reserve_slot(), freeing it / setting its resources for reuse. Two important things to note:

  1. Failure (when false is returned) might be part of the normal channel operation, so retrying is advised;
  2. Some channel implementations are optimized (or even only accept) cancelling the slots in the same order they were reserved;
  3. These, more restricted & more optimized channels, might not allow publishing any reserved slots if there are cancelled slots in-between – in which case, publishing will only be done when the missing slots are, eventually, published. So, be careful when using the cancellation semantics: ideally, it should only be allowed for the last slot and when no sending occurs in-between.

Provided Methods§

Source

fn send_derived(&self, _derived_item: &DerivedItemType) -> bool

For channels that stores the DerivedItemType instead of the ItemType, this method may be useful – for instance: if the Stream consumes OgreArc<Type> (the derived item type) and the channel is for Type, with this method one may send an OgreArc directly.
IMPLEMENTORS: #[inline(always)]

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<'a, ItemType: 'a + Debug + Send + Sync, OgreAllocatorType: 'a + BoundedOgreAllocator<ItemType> + Send + Sync, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, OgreUnique<ItemType, OgreAllocatorType>> for reactive_mutiny::uni::channels::zero_copy::full_sync::FullSync<'a, ItemType, OgreAllocatorType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug + Default, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, Arc<ItemType>> for reactive_mutiny::multi::channels::arc::atomic::Atomic<'a, ItemType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug + Default, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, Arc<ItemType>> for reactive_mutiny::multi::channels::arc::full_sync::FullSync<'a, ItemType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug + Default, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, ItemType> for reactive_mutiny::uni::channels::movable::atomic::Atomic<'a, ItemType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug, OgreAllocatorType: BoundedOgreAllocator<ItemType> + 'a + Send + Sync, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, OgreUnique<ItemType, OgreAllocatorType>> for reactive_mutiny::uni::channels::zero_copy::atomic::Atomic<'a, ItemType, OgreAllocatorType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug, OgreAllocatorType: BoundedOgreAllocator<ItemType> + 'a + Sync + Send, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, OgreArc<ItemType, OgreAllocatorType>> for reactive_mutiny::multi::channels::ogre_arc::atomic::Atomic<'a, ItemType, OgreAllocatorType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug, OgreAllocatorType: BoundedOgreAllocator<ItemType> + 'a + Sync + Send, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, OgreArc<ItemType, OgreAllocatorType>> for reactive_mutiny::multi::channels::ogre_arc::full_sync::FullSync<'a, ItemType, OgreAllocatorType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, Arc<ItemType>> for reactive_mutiny::multi::channels::arc::crossbeam::Crossbeam<'a, ItemType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, ItemType> for reactive_mutiny::uni::channels::movable::crossbeam::Crossbeam<'a, ItemType, BUFFER_SIZE, MAX_STREAMS>

Source§

impl<'a, ItemType: 'a + Send + Sync + Debug, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, &'static ItemType> for MmapLog<'a, ItemType, MAX_STREAMS>

Source§

impl<'a, ItemType: Send + Sync + Debug + Default + 'a, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> ChannelProducer<'a, ItemType, ItemType> for reactive_mutiny::uni::channels::movable::full_sync::FullSync<'a, ItemType, BUFFER_SIZE, MAX_STREAMS>