reactive_mutiny/types.rs
1//! Common types across this module
2
3pub use crate::instruments::Instruments;
4use crate::mutiny_stream::MutinyStream;
5use std::{
6 time::Duration,
7 task::Waker,
8 fmt::Debug,
9};
10use std::future::Future;
11use std::sync::Arc;
12
13
14/// Defines common abstractions on how [Uni]s receives produced events and delivers them to `Stream`s.\
15/// Implementors should also implement one of [ChannelProducer] or [UniZeroCopyChannel].
16/// NOTE: all async functions are out of the hot path, so the `async_trait` won't impose performance penalties
17pub trait ChannelCommon<ItemType: Debug + Send + Sync,
18 DerivedItemType: Debug> {
19
20 /// Creates a new instance of this channel, to be referred to (in logs) as `name`
21 fn new<IntoString: Into<String>>(name: IntoString) -> Arc<Self>;
22
23 /// Waits until all pending items are taken from this channel, up until `timeout` elapses.\
24 /// Returns the number of still unconsumed items -- which is 0 if it was not interrupted by the timeout
25 #[must_use = "Returns 0 if all elements could be flushed within the given `timeout` or the number of elements yet flushing"]
26 fn flush(&self, timeout: Duration) -> impl Future<Output=u32> + Send;
27
28 /// Tells weather this channel is still enabled to process elements
29 /// (true before calling the "end stream" / "cancel stream" functions)
30 fn is_channel_open(&self) -> bool;
31
32 /// Flushes & signals that the given `stream_id` should cease its activities when there are no more elements left
33 /// to process, waiting for the operation to complete for up to `timeout`.\
34 /// Returns `true` if the stream ended within the given `timeout` or `false` if it is still processing elements.
35 #[must_use = "Returns true if the Channel could be closed within the given time"]
36 fn gracefully_end_stream(&self, stream_id: u32, timeout: Duration) -> impl Future<Output=bool> + Send;
37
38 /// Flushes & signals that all streams should cease their activities when there are no more elements left
39 /// to process, waiting for the operation to complete for up to `timeout`.\
40 /// Returns the number of un-ended streams -- which is 0 if it was not interrupted by the timeout
41 #[must_use = "Returns 0 if all elements could be flushed within the given `timeout` or the number of elements that got unsent after the channel closing"]
42 fn gracefully_end_all_streams(&self, timeout: Duration) -> impl Future<Output=u32> + Send;
43
44 /// Sends a signal to all streams, urging them to cease their operations.\
45 /// In opposition to [end_all_streams()], this method does not wait for any confirmation,
46 /// nor cares if there are remaining elements to be processed.
47 fn cancel_all_streams(&self);
48
49 /// Informs the caller how many active streams are currently managed by this channel
50 /// IMPLEMENTORS: #[inline(always)]
51 fn running_streams_count(&self) -> u32;
52
53 /// Tells how many events are waiting to be taken out of this channel.\
54 /// IMPLEMENTORS: #[inline(always)]
55 fn pending_items_count(&self) -> u32;
56
57 /// Tells how many events may be produced ahead of the consumers.\
58 /// IMPLEMENTORS: #[inline(always)]
59 fn buffer_size(&self) -> u32;
60}
61
62/// Defines abstractions specific to [Uni] channels
63pub trait ChannelUni<'a, ItemType: Debug + Send + Sync,
64 DerivedItemType: Debug> {
65
66 /// Returns a `Stream` (and its `stream_id`) able to receive elements sent through this channel.\
67 /// If called more than once, each `Stream` will receive a different element -- "consumer pattern".\
68 /// Currently `panic`s if called more times than allowed by [Uni]'s `MAX_STREAMS`
69 fn create_stream(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32)
70 where Self: ChannelConsumer<'a, DerivedItemType>;
71
72}
73
74/// Defines abstractions specific to [Uni] channels
75pub trait ChannelMulti<'a, ItemType: Debug + Send + Sync,
76 DerivedItemType: Debug> {
77
78 /// Implemented only for a few [Multi] channels, returns a `Stream` (and its `stream_id`) able to receive elements
79 /// that were sent through this channel *before the call to this method*.\
80 /// It is up to each implementor to define how back in the past those events may go, but it is known that `mmap log`
81 /// based channels are able to see all past events.\
82 /// If called more than once, every stream will see all the past events available.\
83 /// Currently `panic`s if called more times than allowed by [Multi]'s `MAX_STREAMS`
84 fn create_stream_for_old_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32)
85 where Self: ChannelConsumer<'a, DerivedItemType>;
86
87 /// Returns a `Stream` (and its `stream_id`) able to receive elements sent through this channel *after the call to this method*.\
88 /// If called more than once, each `Stream` will see all new elements -- "listener pattern".\
89 /// Currently `panic`s if called more times than allowed by [Multi]'s `MAX_STREAMS`
90 fn create_stream_for_new_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32)
91 where Self: ChannelConsumer<'a, DerivedItemType>;
92
93 /// Implemented only for a few [Multi] channels, returns two `Stream`s (and their `stream_id`s):
94 /// - one for the past events (that, once exhausted, won't see any of the forthcoming events)
95 /// - another for the forthcoming events.
96 ///
97 /// The split is guaranteed not to miss any events: no events will be lost between the last of the "past" and
98 /// the first of the "forthcoming" events.\
99 /// It is up to each implementor to define how back in the past those events may go, but it is known that `mmap log`
100 /// based channels are able to see all past events.\
101 /// If called more than once, every stream will see all the past events available, as well as all future events after this method call.\
102 /// Currently `panic`s if called more times than allowed by [Multi]'s `MAX_STREAMS`
103 fn create_streams_for_old_and_new_events(self: &Arc<Self>) -> ((MutinyStream<'a, ItemType, Self, DerivedItemType>, u32),
104 (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32))
105 where Self: ChannelConsumer<'a, DerivedItemType>;
106
107 /// Implemented only for a few [Multi] channels, returns a single `Stream` (and its `stream_id`) able to receive elements
108 /// that were sent through this channel either *before and after the call to this method*.\
109 /// It is up to each implementor to define how back in the past those events may go, but it is known that `mmap log`
110 /// based channels are able to see all past events.\
111 /// Notice that, with this method, there is no way of discriminating where the "old" events end and where the "new" events start.\
112 /// If called more than once, every stream will see all the past events available, as well as all future events after this method call.\
113 /// Currently `panic`s if called more times than allowed by [Multi]'s `MAX_STREAMS`
114 fn create_stream_for_old_and_new_events(self: &Arc<Self>) -> (MutinyStream<'a, ItemType, Self, DerivedItemType>, u32)
115 where Self: ChannelConsumer<'a, DerivedItemType>;
116
117}
118
119/// Defines how to send events (to a [Uni] or [Multi]).
120pub trait ChannelProducer<'a, ItemType: 'a + Debug + Send + Sync,
121 DerivedItemType: 'a + Debug> {
122
123 /// Similar to [Self::send_with()], but for sending the already-built `item`.\
124 /// See there for how to deal with the returned type.\
125 /// IMPLEMENTORS: #[inline(always)]
126 #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), ItemType>`"]
127 fn send(&self, item: ItemType) -> keen_retry::RetryConsumerResult<(), ItemType, ()>;
128
129 /// Calls `setter`, passing a slot so the payload may be filled there, then sends the event through this channel asynchronously.\
130 /// The returned type is conversible to `Result<(), F>` by calling .into() on it, returning `Err<setter>` when the buffer is full,
131 /// to allow the caller to try again; otherwise you may add any retrying logic using the `keen-retry` crate's API like in:
132 /// ```nocompile
133 /// xxxx.send_with(|slot| *slot = 42)
134 /// .retry_with(|setter| xxxx.send_with(setter))
135 /// .spinning_until_timeout(Duration::from_millis(300), ()) // go see the other options
136 /// .map_errors(|_, setter| (setter, _), |e| e) // map the unconsumed `setter` payload into `Err(setter)` when converting to `Result` ahead
137 /// .into()?;
138 /// ```
139 /// NOTE: this type may allow the compiler some extra optimization steps when compared to [Self::send()]. When tuning for performance,
140 /// it is advisable to try this method.\
141 /// IMPLEMENTORS: #[inline(always)]
142 #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), F>`"]
143 fn send_with<F: FnOnce(&mut ItemType)>
144 (&self,
145 setter: F)
146 -> keen_retry::RetryConsumerResult<(), F, ()>;
147
148 /// Similar to [Self::send_with(), but accepts an async `setter`.
149 /// This method is useful for sending operations that depend on data acquired by async blocks, allowing
150 /// select loops (like the following) to be built:
151 /// ```nocompile
152 /// tokio::select! {
153 /// _ => async {
154 /// channel_producer.send_with_async(|slot| async {
155 /// let data = data_source.read().await;
156 /// fill_slot(data, &mut slot);
157 /// slot
158 /// }).await
159 /// },
160 /// (...other select arms that may execute concurrently with the above arm...)
161 /// }
162 /// ```
163 /// IMPLEMENTORS: #[inline(always)]
164 fn send_with_async<F: FnOnce(&'a mut ItemType) -> Fut + Send,
165 Fut: Future<Output=&'a mut ItemType> + Send>
166 (&'a self,
167 setter: F) -> impl Future<Output=keen_retry::RetryConsumerResult<(), F, ()>> + Send;
168
169 // TODO: 2024-03-04: this is to be filled in by **(f21)**. Possibly an extra dependency on the allocator will be needed for the `BoundedOgreAllocator::OwnedSlotType`
170 // fn send_with_external_alloc();
171
172 /// For channels that stores the `DerivedItemType` instead of the `ItemType`, this method may be useful
173 /// -- for instance: if the Stream consumes `OgreArc<Type>` (the derived item type) and the channel is for `Type`, with this method one may send an `OgreArc` directly.\
174 /// IMPLEMENTORS: #[inline(always)]
175 #[inline(always)]
176 #[must_use = "The return type should be examined in case retrying is needed"]
177 fn send_derived(&self, _derived_item: &DerivedItemType) -> bool {
178 todo!("The default `ChannelProducer.send_derived()` was not re-implemented, meaning it is not available for this channel -- is only available for channels whose `Stream` items will see different types than the produced one -- example: send(`string`) / Stream<Item=OgreArc<String>>")
179 }
180
181 /// Proxy to [crate::prelude::advanced::BoundedOgreAllocator::alloc_ref()] from the underlying allocator,
182 /// allowing caller to fill in the data as they wish -- in a non-blocking prone API.\
183 /// See also [Self::send_reserved()] and [Self::cancel_slot_reserve()].
184 fn reserve_slot(&self) -> Option<&mut ItemType>;
185
186 /// Attempts to send an item previously reserved by [Self::reserve_slot()].
187 /// Failure to do so (when `false` is returned) might be part of the normal channel operation,
188 /// so retrying is advised.
189 /// More: some channel implementations are optimized (or even only accept) sending the slots
190 /// in the same order they were reserved.
191 #[must_use = "You need the returned value to retry the operation until it succeeds, implementing a suitable spin loop logic (like tokio's yield_now())"]
192 fn try_send_reserved(&self, reserved_slot: &mut ItemType) -> bool;
193
194 /// Attempts to give up sending an item previously reserved by [Self::reserve_slot()], freeing it / setting its resources for reuse.
195 /// Two important things to note:
196 /// 1. Failure (when `false` is returned) might be part of the normal channel operation,
197 /// so retrying is advised;
198 /// 2. Some channel implementations are optimized (or even only accept) cancelling the slots
199 /// in the same order they were reserved;
200 /// 3. These, more restricted & more optimized channels, might not allow publishing any reserved
201 /// slots if there are cancelled slots in-between -- in which case, publishing will only be done
202 /// when the missing slots are, eventually, published. So, be careful when using the cancellation
203 /// semantics: ideally, it should only be allowed for the last slot and when no sending occurs in-between.
204 #[must_use = "You need the returned value to retry the operation until it succeeds, implementing a suitable spin loop logic (like tokio's yield_now())"]
205 fn try_cancel_slot_reserve(&self, reserved_slot: &mut ItemType) -> bool;
206
207}
208
209/// Source of events for [MutinyStream].
210pub trait ChannelConsumer<'a, DerivedItemType: 'a + Debug> {
211
212 /// Delivers the next event, whenever the Stream wants it.\
213 /// IMPLEMENTORS: use #[inline(always)]
214 fn consume(&self, stream_id: u32) -> Option<DerivedItemType>;
215
216 /// Returns `false` if the `Stream` has been signaled to end its operations, causing it to report "out-of-elements" as soon as possible.\
217 /// IMPLEMENTORS: use #[inline(always)]
218 fn keep_stream_running(&self, stream_id: u32) -> bool;
219
220 /// Shares, to implementors concern, how `stream_id` may be awaken.\
221 /// IMPLEMENTORS: use #[inline(always)]
222 fn register_stream_waker(&self, stream_id: u32, waker: &Waker);
223
224 /// Reports no more elements will be required through [provide()].\
225 /// IMPLEMENTORS: use #[inline(always)]
226 fn drop_resources(&self, stream_id: u32);
227}
228
229
230/// Defines a fully fledged `Uni` channel, that has both the producer and consumer parts
231/// Also, laverages generic programming by allowing simpler generic parameters:
232/// ```nocompile
233/// struct MyGenericStruct<T: FullDuplexUniChannel> { the_channel: T }
234/// let the_channel = uni::channels::xxx<Lots,And,Lots<Of,Generic,Arguments>>::new();
235/// let my_struct = MyGenericStruct { the_channel };
236/// // see more at `tests/use_cases.rs`
237pub trait FullDuplexUniChannel:
238 ChannelCommon<Self::ItemType, Self::DerivedItemType> +
239 ChannelUni<'static, Self::ItemType, Self::DerivedItemType> +
240 ChannelProducer<'static, Self::ItemType, Self::DerivedItemType> +
241 ChannelConsumer<'static, Self::DerivedItemType> {
242
243 const MAX_STREAMS: usize;
244 const BUFFER_SIZE: usize;
245 type ItemType: 'static + Debug + Send + Sync;
246 type DerivedItemType: 'static + Debug + Send + Sync;
247
248 /// Returns this channel's name
249 fn name(&self) -> &str;
250}
251
252/// A fully fledged `Multi` channel, that has both the producer and consumer parts
253/// Also, laverages generic programming by allowing simpler generic parameters:
254/// ```nocompile
255/// struct MyGenericStruct<T: FullDuplexUniChannel> { the_channel: T }
256/// let the_channel = uni::channels::xxx<Lots,And,Lots<Of,Generic,Arguments>>::new();
257/// let my_struct = MyGenericStruct { the_channel };
258/// // see more at `tests/use_cases.rs`
259pub trait FullDuplexMultiChannel:
260 ChannelCommon<Self::ItemType, Self::DerivedItemType> +
261 ChannelMulti<'static, Self::ItemType, Self::DerivedItemType> +
262 ChannelProducer<'static, Self::ItemType, Self::DerivedItemType> +
263 ChannelConsumer<'static, Self::DerivedItemType> {
264
265 const MAX_STREAMS: usize;
266 const BUFFER_SIZE: usize;
267 type ItemType: 'static + Debug + Send + Sync;
268 type DerivedItemType: 'static + Debug + Send + Sync;
269}