Trait reactive_mutiny::types::ChannelProducer
source · pub trait ChannelProducer<'a, ItemType: 'a + Debug + Send + Sync, DerivedItemType: 'a + Debug> {
// Required methods
fn send(&self, item: ItemType) -> RetryConsumerResult<(), ItemType, ()>;
fn send_with<F: FnOnce(&mut ItemType)>(
&self,
setter: F,
) -> RetryConsumerResult<(), F, ()>;
fn send_with_async<F: FnOnce(&'a mut ItemType) -> Fut, Fut: Future<Output = &'a mut ItemType>>(
&'a self,
setter: F,
) -> impl Future<Output = RetryConsumerResult<(), F, ()>>;
fn reserve_slot(&'a self) -> Option<&'a mut ItemType>;
fn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool;
fn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool;
// Provided method
fn send_derived(&self, _derived_item: &DerivedItemType) -> bool { ... }
}Expand description
Defines how to send events (to a [Uni] or [Multi]).
Required Methods§
sourcefn send(&self, item: ItemType) -> RetryConsumerResult<(), ItemType, ()>
fn send(&self, item: ItemType) -> RetryConsumerResult<(), ItemType, ()>
Similar to Self::send_with(), but for sending the already-built item.
See there for how to deal with the returned type.
IMPLEMENTORS: #[inline(always)]
sourcefn send_with<F: FnOnce(&mut ItemType)>(
&self,
setter: F,
) -> RetryConsumerResult<(), F, ()>
fn send_with<F: FnOnce(&mut ItemType)>( &self, setter: F, ) -> RetryConsumerResult<(), F, ()>
Calls setter, passing a slot so the payload may be filled there, then sends the event through this channel asynchronously.
The returned type is conversible to Result<(), F> by calling .into() on it, returning Err<setter> when the buffer is full,
to allow the caller to try again; otherwise you may add any retrying logic using the keen-retry crate’s API like in:
xxxx.send_with(|slot| *slot = 42)
.retry_with(|setter| xxxx.send_with(setter))
.spinning_until_timeout(Duration::from_millis(300), ()) // go see the other options
.map_errors(|_, setter| (setter, _), |e| e) // map the unconsumed `setter` payload into `Err(setter)` when converting to `Result` ahead
.into()?;
NOTE: this type may allow the compiler some extra optimization steps when compared to Self::send(). When tuning for performance,
it is advisable to try this method.
IMPLEMENTORS: #[inline(always)]
sourcefn send_with_async<F: FnOnce(&'a mut ItemType) -> Fut, Fut: Future<Output = &'a mut ItemType>>(
&'a self,
setter: F,
) -> impl Future<Output = RetryConsumerResult<(), F, ()>>
fn send_with_async<F: FnOnce(&'a mut ItemType) -> Fut, Fut: Future<Output = &'a mut ItemType>>( &'a self, setter: F, ) -> impl Future<Output = RetryConsumerResult<(), F, ()>>
Similar to [Self::send_with(), but accepts an async setter.
This method is useful for sending operations that depend on data acquired by async blocks, allowing
select loops (like the following) to be built:
tokio::select! {
_ => async {
channel_producer.send_with_async(|slot| async {
let data = data_source.read().await;
fill_slot(data, &mut slot);
slot
}).await
},
(...other select arms that may execute concurrently with the above arm...)
}
IMPLEMENTORS: #[inline(always)]
sourcefn reserve_slot(&'a self) -> Option<&'a mut ItemType>
fn reserve_slot(&'a self) -> Option<&'a mut ItemType>
Proxy to crate::prelude::advanced::BoundedOgreAllocator::alloc_ref() from the underlying allocator,
allowing caller to fill in the data as they wish – in a non-blocking prone API.
See also [Self::send_reserved()] and [Self::cancel_slot_reserve()].
sourcefn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool
fn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool
Attempts to send an item previously reserved by Self::reserve_slot().
Failure to do so (when false is returned) might be part of the normal channel operation,
so retrying is advised.
More: some channel implementations are optimized (or even only accept) sending the slots
in the same order they were reserved.
sourcefn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool
fn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool
Attempts to give up sending an item previously reserved by Self::reserve_slot(), freeing it / setting its resources for reuse. Two important things to note:
- Failure (when
falseis returned) might be part of the normal channel operation, so retrying is advised; - Some channel implementations are optimized (or even only accept) cancelling the slots in the same order they were reserved;
- These, more restricted & more optimized channels, might not allow publishing any reserved slots if there are cancelled slots in-between – in which case, publishing will only be done when the missing slots are, eventually, published. So, be careful when using the cancellation semantics: ideally, it should only be allowed for the last slot and when no sending occurs in-between.
Provided Methods§
sourcefn send_derived(&self, _derived_item: &DerivedItemType) -> bool
fn send_derived(&self, _derived_item: &DerivedItemType) -> bool
For channels that stores the DerivedItemType instead of the ItemType, this method may be useful
– for instance: if the Stream consumes Arc<String> (the derived item type) and the channel is for Strings, With this method one may send an Arc directly.
The default implementation, though, is made for types that don’t have a derived item type.
IMPLEMENTORS: #[inline(always)]