reactive_mutiny/
types.rs

1//! Common types across this module
2
3pub use crate::instruments::Instruments;
4use crate::mutiny_stream::MutinyStream;
5use std::{
6    time::Duration,
7    task::Waker,
8    fmt::Debug,
9};
10use std::future::Future;
11use std::sync::Arc;
12
13
14/// Defines common abstractions on how [Uni]s receives produced events and delivers them to `Stream`s.\
15/// Implementors should also implement one of [ChannelProducer] or [UniZeroCopyChannel].
16/// NOTE: all async functions are out of the hot path, so the `async_trait` won't impose performance penalties
17pub trait ChannelCommon<ItemType:        Debug + Send + Sync,
18                        DerivedItemType: Debug> {
19
20    /// Creates a new instance of this channel, to be referred to (in logs) as `name`
21    fn new<IntoString: Into<String>>(name: IntoString) -> Arc<Self>;
22
23    /// Waits until all pending items are taken from this channel, up until `timeout` elapses.\
24    /// Returns the number of still unconsumed items -- which is 0 if it was not interrupted by the timeout
25    #[must_use = "Returns 0 if all elements could be flushed within the given `timeout` or the number of elements yet flushing"]
26    fn flush(&self, timeout: Duration) -> impl Future<Output=u32> + Send;
27
28    /// Tells weather this channel is still enabled to process elements
29    /// (true before calling the "end stream" / "cancel stream" functions)
30    fn is_channel_open(&self) -> bool;
31
32    /// Flushes & signals that the given `stream_id` should cease its activities when there are no more elements left
33    /// to process, waiting for the operation to complete for up to `timeout`.\
34    /// Returns `true` if the stream ended within the given `timeout` or `false` if it is still processing elements.
35    #[must_use = "Returns true if the Channel could be closed within the given time"]
36    fn gracefully_end_stream(&self, stream_id: u32, timeout: Duration) -> impl Future<Output=bool> + Send;
37
38    /// Flushes & signals that all streams should cease their activities when there are no more elements left
39    /// to process, waiting for the operation to complete for up to `timeout`.\
40    /// Returns the number of un-ended streams -- which is 0 if it was not interrupted by the timeout
41    #[must_use = "Returns 0 if all elements could be flushed within the given `timeout` or the number of elements that got unsent after the channel closing"]
42    fn gracefully_end_all_streams(&self, timeout: Duration) -> impl Future<Output=u32> + Send;
43
44    /// Sends a signal to all streams, urging them to cease their operations.\
45    /// In opposition to [end_all_streams()], this method does not wait for any confirmation,
46    /// nor cares if there are remaining elements to be processed.
47    fn cancel_all_streams(&self);
48
49    /// Informs the caller how many active streams are currently managed by this channel
50    /// IMPLEMENTORS: #[inline(always)]
51    fn running_streams_count(&self) -> u32;
52
53    /// Tells how many events are waiting to be taken out of this channel.\
54    /// IMPLEMENTORS: #[inline(always)]
55    fn pending_items_count(&self) -> u32;
56
57    /// Tells how many events may be produced ahead of the consumers.\
58    /// IMPLEMENTORS: #[inline(always)]
59    fn buffer_size(&self) -> u32;
60}
61
62/// Defines abstractions specific to [Uni] channels
63pub trait ChannelUni<'a, ItemType:        Debug + Send + Sync,
64                         DerivedItemType: Debug> {
65
66    /// Returns a `Stream` (and its `stream_id`) able to receive elements sent through this channel.\
67    /// If called more than once, each `Stream` will receive a different element -- "consumer pattern".\
68    /// Currently `panic`s if called more times than allowed by [Uni]'s `MAX_STREAMS`
69    fn create_stream(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32)
70                                          where Self: ChannelConsumer<'a, DerivedItemType>;
71
72}
73
74/// Defines abstractions specific to [Uni] channels
75pub trait ChannelMulti<'a, ItemType:        Debug + Send + Sync,
76                           DerivedItemType: Debug> {
77
78    /// Implemented only for a few [Multi] channels, returns a `Stream` (and its `stream_id`) able to receive elements
79    /// that were sent through this channel *before the call to this method*.\
80    /// It is up to each implementor to define how back in the past those events may go, but it is known that `mmap log`
81    /// based channels are able to see all past events.\
82    /// If called more than once, every stream will see all the past events available.\
83    /// Currently `panic`s if called more times than allowed by [Multi]'s `MAX_STREAMS`
84    fn create_stream_for_old_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32)
85                                                         where Self: ChannelConsumer<'a, DerivedItemType>;
86
87    /// Returns a `Stream` (and its `stream_id`) able to receive elements sent through this channel *after the call to this method*.\
88    /// If called more than once, each `Stream` will see all new elements -- "listener pattern".\
89    /// Currently `panic`s if called more times than allowed by [Multi]'s `MAX_STREAMS`
90    fn create_stream_for_new_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32)
91                                                         where Self: ChannelConsumer<'a, DerivedItemType>;
92
93    /// Implemented only for a few [Multi] channels, returns two `Stream`s (and their `stream_id`s):
94    ///   - one for the past events (that, once exhausted, won't see any of the forthcoming events)
95    ///   - another for the forthcoming events.
96    /// 
97    /// The split is guaranteed not to miss any events: no events will be lost between the last of the "past" and
98    /// the first of the "forthcoming" events.\
99    /// It is up to each implementor to define how back in the past those events may go, but it is known that `mmap log`
100    /// based channels are able to see all past events.\
101    /// If called more than once, every stream will see all the past events available, as well as all future events after this method call.\
102    /// Currently `panic`s if called more times than allowed by [Multi]'s `MAX_STREAMS`
103    fn create_streams_for_old_and_new_events(self: &Arc<Self>) -> ((MutinyStream<'a, ItemType, Self, DerivedItemType>, u32),
104                                                                   (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32))
105                                                                  where Self: ChannelConsumer<'a, DerivedItemType>;
106
107    /// Implemented only for a few [Multi] channels, returns a single `Stream` (and its `stream_id`) able to receive elements
108    /// that were sent through this channel either *before and after the call to this method*.\
109    /// It is up to each implementor to define how back in the past those events may go, but it is known that `mmap log`
110    /// based channels are able to see all past events.\
111    /// Notice that, with this method, there is no way of discriminating where the "old" events end and where the "new" events start.\
112    /// If called more than once, every stream will see all the past events available, as well as all future events after this method call.\
113    /// Currently `panic`s if called more times than allowed by [Multi]'s `MAX_STREAMS`
114    fn create_stream_for_old_and_new_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32)
115                                                                 where Self: ChannelConsumer<'a, DerivedItemType>;
116
117}
118
119/// Defines how to send events (to a [Uni] or [Multi]).
120pub trait ChannelProducer<'a, ItemType:        'a + Debug + Send + Sync,
121                              DerivedItemType: 'a + Debug> {
122
123    /// Similar to [Self::send_with()], but for sending the already-built `item`.\
124    /// See there for how to deal with the returned type.\
125    /// IMPLEMENTORS: #[inline(always)]
126    #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), ItemType>`"]
127    fn send(&self, item: ItemType) -> keen_retry::RetryConsumerResult<(), ItemType, ()>;
128
129    /// Calls `setter`, passing a slot so the payload may be filled there, then sends the event through this channel asynchronously.\
130    /// The returned type is conversible to `Result<(), F>` by calling .into() on it, returning `Err<setter>` when the buffer is full,
131    /// to allow the caller to try again; otherwise you may add any retrying logic using the `keen-retry` crate's API like in:
132    /// ```nocompile
133    ///     xxxx.send_with(|slot| *slot = 42)
134    ///         .retry_with(|setter| xxxx.send_with(setter))
135    ///         .spinning_until_timeout(Duration::from_millis(300), ())     // go see the other options
136    ///         .map_errors(|_, setter| (setter, _), |e| e)                 // map the unconsumed `setter` payload into `Err(setter)` when converting to `Result` ahead
137    ///         .into()?;
138    /// ```
139    /// NOTE: this type may allow the compiler some extra optimization steps when compared to [Self::send()]. When tuning for performance,
140    /// it is advisable to try this method.\
141    /// IMPLEMENTORS: #[inline(always)]
142    #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), F>`"]
143    fn send_with<F: FnOnce(&mut ItemType)>
144                (&self,
145                 setter: F)
146                -> keen_retry::RetryConsumerResult<(), F, ()>;
147    
148    /// Similar to [Self::send_with(), but accepts an async `setter`.
149    /// This method is useful for sending operations that depend on data acquired by async blocks, allowing
150    /// select loops (like the following) to be built:
151    /// ```nocompile
152    ///     tokio::select! {
153    ///         _ => async {
154    ///             channel_producer.send_with_async(|slot| async {
155    ///                 let data = data_source.read().await;
156    ///                 fill_slot(data, &mut slot);
157    ///                 slot
158    ///             }).await
159    ///         },
160    ///        (...other select arms that may execute concurrently with the above arm...)
161    ///     }
162    /// ```
163    /// IMPLEMENTORS: #[inline(always)]
164    fn send_with_async<F:   FnOnce(&'a mut ItemType) -> Fut + Send,
165                       Fut: Future<Output=&'a mut ItemType> + Send>
166                      (&'a self,
167                       setter: F) -> impl Future<Output=keen_retry::RetryConsumerResult<(), F, ()>> + Send;
168    
169    // TODO: 2024-03-04: this is to be filled in by **(f21)**. Possibly an extra dependency on the allocator will be needed for the `BoundedOgreAllocator::OwnedSlotType`
170    // fn send_with_external_alloc();
171
172    /// For channels that stores the `DerivedItemType` instead of the `ItemType`, this method may be useful
173    /// -- for instance: if the Stream consumes `OgreArc<Type>` (the derived item type) and the channel is for `Type`, with this method one may send an `OgreArc` directly.\
174    /// IMPLEMENTORS: #[inline(always)]
175    #[inline(always)]
176    #[must_use = "The return type should be examined in case retrying is needed"]
177    fn send_derived(&self, _derived_item: &DerivedItemType) -> bool {
178        todo!("The default `ChannelProducer.send_derived()` was not re-implemented, meaning it is not available for this channel -- is only available for channels whose `Stream` items will see different types than the produced one -- example: send(`string`) / Stream<Item=OgreArc<String>>")
179    }
180    
181    /// Proxy to [crate::prelude::advanced::BoundedOgreAllocator::alloc_ref()] from the underlying allocator,
182    /// allowing caller to fill in the data as they wish -- in a non-blocking prone API.\
183    /// See also [Self::send_reserved()] and [Self::cancel_slot_reserve()].
184    fn reserve_slot(&self) -> Option<&mut ItemType>;
185
186    /// Attempts to send an item previously reserved by [Self::reserve_slot()].
187    /// Failure to do so (when `false` is returned) might be part of the normal channel operation,
188    /// so retrying is advised.
189    /// More: some channel implementations are optimized (or even only accept) sending the slots
190    ///       in the same order they were reserved.
191    #[must_use = "You need the returned value to retry the operation until it succeeds, implementing a suitable spin loop logic (like tokio's yield_now())"]
192    fn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool;
193    
194    /// Attempts to give up sending an item previously reserved by [Self::reserve_slot()], freeing it / setting its resources for reuse.
195    /// Two important things to note:
196    ///   1. Failure (when `false` is returned) might be part of the normal channel operation,
197    ///      so retrying is advised;
198    ///   2. Some channel implementations are optimized (or even only accept) cancelling the slots
199    ///      in the same order they were reserved;
200    ///   3. These, more restricted & more optimized channels, might not allow publishing any reserved
201    ///      slots if there are cancelled slots in-between -- in which case, publishing will only be done
202    ///      when the missing slots are, eventually, published. So, be careful when using the cancellation
203    ///      semantics: ideally, it should only be allowed for the last slot and when no sending occurs in-between.
204    #[must_use = "You need the returned value to retry the operation until it succeeds, implementing a suitable spin loop logic (like tokio's yield_now())"]
205    fn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool;
206
207}
208
209/// Source of events for [MutinyStream].
210pub trait ChannelConsumer<'a, DerivedItemType: 'a + Debug> {
211
212    /// Delivers the next event, whenever the Stream wants it.\
213    /// IMPLEMENTORS: use #[inline(always)]
214    fn consume(&self, stream_id: u32) -> Option<DerivedItemType>;
215
216    /// Returns `false` if the `Stream` has been signaled to end its operations, causing it to report "out-of-elements" as soon as possible.\
217    /// IMPLEMENTORS: use #[inline(always)]
218    fn keep_stream_running(&self, stream_id: u32) -> bool;
219
220    /// Shares, to implementors concern, how `stream_id` may be awaken.\
221    /// IMPLEMENTORS: use #[inline(always)]
222    fn register_stream_waker(&self, stream_id: u32, waker: &Waker);
223
224        /// Reports no more elements will be required through [provide()].\
225    /// IMPLEMENTORS: use #[inline(always)]
226    fn drop_resources(&self, stream_id: u32);
227}
228
229
230/// Defines a fully fledged `Uni` channel, that has both the producer and consumer parts
231/// Also, laverages generic programming by allowing simpler generic parameters:
232/// ```nocompile
233///     struct MyGenericStruct<T: FullDuplexUniChannel> { the_channel: T }
234///     let the_channel = uni::channels::xxx<Lots,And,Lots<Of,Generic,Arguments>>::new();
235///     let my_struct = MyGenericStruct { the_channel };
236///     // see more at `tests/use_cases.rs`
237pub trait FullDuplexUniChannel:
238              ChannelCommon<Self::ItemType, Self::DerivedItemType> +
239              ChannelUni<'static, Self::ItemType, Self::DerivedItemType> +
240              ChannelProducer<'static, Self::ItemType, Self::DerivedItemType> +
241              ChannelConsumer<'static, Self::DerivedItemType> {
242
243    const MAX_STREAMS: usize;
244    const BUFFER_SIZE: usize;
245    type ItemType: 'static + Debug + Send + Sync;
246    type DerivedItemType: 'static + Debug + Send + Sync;
247            
248    /// Returns this channel's name
249    fn name(&self) -> &str;
250}
251
252/// A fully fledged `Multi` channel, that has both the producer and consumer parts
253/// Also, laverages generic programming by allowing simpler generic parameters:
254/// ```nocompile
255///     struct MyGenericStruct<T: FullDuplexUniChannel> { the_channel: T }
256///     let the_channel = uni::channels::xxx<Lots,And,Lots<Of,Generic,Arguments>>::new();
257///     let my_struct = MyGenericStruct { the_channel };
258///     // see more at `tests/use_cases.rs`
259pub trait FullDuplexMultiChannel:
260              ChannelCommon<Self::ItemType, Self::DerivedItemType> +
261              ChannelMulti<'static, Self::ItemType, Self::DerivedItemType> +
262              ChannelProducer<'static, Self::ItemType, Self::DerivedItemType> +
263              ChannelConsumer<'static, Self::DerivedItemType> {
264
265    const MAX_STREAMS: usize;
266    const BUFFER_SIZE: usize;
267    type ItemType: 'static + Debug + Send + Sync;
268    type DerivedItemType: 'static + Debug + Send + Sync;
269}