reactive_mutiny/multi/
multi.rs

1//! See [super]
2
3use super::super::{
4    stream_executor::StreamExecutor,
5    mutiny_stream::MutinyStream,
6    types::FullDuplexMultiChannel,
7};
8use std::{
9    sync::Arc,
10    fmt::Debug,
11    time::Duration,
12    future::Future,
13    marker::PhantomData,
14};
15use indexmap::IndexMap;
16use futures::Stream;
17use tokio::sync::RwLock;
18
19
20/// `Multi` is an event handler capable of having several "listeners" -- all of which receives all events.\
21/// With this struct, it is possible to:
22///   - produce events
23///   - spawn new `Stream`s & executors
24///   - close `Stream`s (and executors)
25/// 
26/// Example:
27/// ```nocompile
28/// {reactive_mutiny::Instruments::MetricsWithoutLogs.into()}
29pub struct Multi<ItemType:          Debug + Sync + Send + 'static,
30                 MultiChannelType:  FullDuplexMultiChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Sync + Send + 'static,
31                 const INSTRUMENTS: usize,
32                 DerivedItemType:   Debug + Sync + Send + 'static> {
33    pub multi_name:     String,
34    pub channel:        Arc<MultiChannelType>,
35    pub executor_infos: RwLock<IndexMap<String, ExecutorInfo>>,
36        _phantom:       PhantomData<(ItemType, MultiChannelType, DerivedItemType)>,
37}
38
39impl<ItemType:          Debug + Send + Sync + 'static,
40     MultiChannelType:  FullDuplexMultiChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Sync + Send + 'static,
41     const INSTRUMENTS: usize,
42     DerivedItemType:   Debug + Sync + Send + 'static>
43GenericMulti<INSTRUMENTS> for
44Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType> {
45    const INSTRUMENTS: usize = INSTRUMENTS;
46    type ItemType            = ItemType;
47    type MultiChannelType    = MultiChannelType;
48    type DerivedItemType     = DerivedItemType;
49    type MutinyStreamType    = MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>;
50
51    fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self {
52        Self::new(multi_name)
53    }
54    fn to_multi(self) -> Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType> {
55        self
56    }
57}
58
59impl<ItemType:          Debug + Send + Sync + 'static,
60     MultiChannelType:  FullDuplexMultiChannel<ItemType=ItemType, DerivedItemType=DerivedItemType> + Sync + Send + 'static,
61     const INSTRUMENTS: usize,
62     DerivedItemType:   Debug + Sync + Send + 'static>
63Multi<ItemType, MultiChannelType, INSTRUMENTS, DerivedItemType> {
64
65    /// Creates a [Multi], which implements the `listener pattern`, capable of:
66    ///   - creating `Stream`s;
67    ///   - applying a user-provided `processor` to the `Stream`s and executing them to depletion --
68    ///     the final `Stream`s may produce a combination of fallible/non-fallible &
69    ///     futures/non-futures events;
70    ///   - producing events that are sent to those `Stream`s.
71    /// 
72    /// `multi_name` is used for instrumentation purposes, depending on the `INSTRUMENT` generic
73    /// argument passed to the [Multi] struct.
74    pub fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self {
75        let multi_name = multi_name.into();
76        Multi {
77            multi_name:     multi_name.clone(),
78            channel:        MultiChannelType::new(multi_name.clone()),
79            executor_infos: RwLock::new(IndexMap::new()),
80            _phantom:       PhantomData,
81        }
82    }
83
84    /// Returns this Multi's name
85    pub fn name(&self) -> &str {
86        &self.multi_name
87    }
88
89    #[inline(always)]
90    #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), ItemType>`"]
91    pub fn send(&self, item: ItemType) -> keen_retry::RetryConsumerResult<(), ItemType, ()> {
92        self.channel.send(item)
93    }
94
95    #[inline(always)]
96    #[must_use = "The return type should be examined in case retrying is needed -- or call map(...).into() to transform it into a `Result<(), F>`"]
97    pub fn send_with<F: FnOnce(&mut ItemType)>(&self, setter: F) -> keen_retry::RetryConsumerResult<(), F, ()> {
98        self.channel.send_with(setter)
99    }
100
101    #[inline(always)]
102    #[must_use = "The return type should be examined in case retrying is needed"]
103    pub fn send_derived(&self, arc_item: &DerivedItemType) -> bool {
104        self.channel.send_derived(arc_item)
105    }
106
107    #[inline(always)]
108    #[must_use]
109    pub fn buffer_size(&self) -> u32 {
110        self.channel.buffer_size()
111    }
112
113    #[inline(always)]
114    #[must_use]
115    pub fn pending_items_count(&self) -> u32 {
116        self.channel.pending_items_count()
117    }
118
119    /// Spawns a new listener of all subsequent events sent to this `Multi`, processing them through the `Stream` returned by `pipeline_builder()`,
120    /// which generates events that are Futures & Fallible.
121    pub async fn spawn_executor<IntoString:             Into<String>,
122                                OutItemType:            Send + Debug,
123                                OutStreamType:          Stream<Item=OutType> + Send + 'static,
124                                OutType:                Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
125                                ErrVoidAsyncType:       Future<Output=()> + Send + 'static,
126                                CloseVoidAsyncType:     Future<Output=()> + Send + 'static>
127
128                               (&self,
129                                concurrency_limit:         u32,
130                                futures_timeout:           Duration,
131                                pipeline_name:             IntoString,
132                                pipeline_builder:          impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
133                                on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                               -> ErrVoidAsyncType   + Send + Sync + 'static,
134                                on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> CloseVoidAsyncType + Send + Sync + 'static)
135
136                               -> Result<(), Box<dyn std::error::Error>> {
137
138        let (in_stream, in_stream_id) = self.channel.create_stream_for_new_events();
139        let out_stream = pipeline_builder(in_stream);
140        self.spawn_executor_from_stream(concurrency_limit, futures_timeout, pipeline_name, in_stream_id, out_stream, on_err_callback, on_close_callback).await
141    }
142
143    /// For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this `Multi`:
144    ///   1) One for past events -- to be processed by the stream returned by `oldies_pipeline_builder()`;
145    ///   2) Another one for subsequent events -- to be processed by the stream returned by `newies_pipeline_builder()`.
146    /// 
147    /// By using this method, it is assumed that both pipeline builders returns `Future<Result>` events. If this is not so, see one of the sibling methods.\
148    /// The stream splitting is guaranteed not to drop any events and `sequential_transition` may be used to indicate if old events should be processed first or if both old and new events
149    /// may be processed simultaneously (in an inevitable out-of-order fashion).
150    pub async fn spawn_oldies_executor<IntoString:               Into<String>,
151                                       OutItemType:              Send + Debug,
152                                       OldiesOutStreamType:      Stream<Item=OldiesOutType> + Sync + Send + 'static,
153                                       NewiesOutStreamType:      Stream<Item=NewiesOutType> + Sync + Send + 'static,
154                                       OldiesOutType:            Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
155                                       NewiesOutType:            Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
156                                       ErrVoidAsyncType:         Future<Output=()>   + Send + 'static,
157                                       OldiesCloseVoidAsyncType: Future<Output=()>   + Send + 'static,
158                                       NewiesCloseVoidAsyncType: Future<Output=()>   + Send + 'static>
159
160                                      (self:                      &Arc<Self>,
161                                       concurrency_limit:         u32,
162                                       sequential_transition:     bool,
163                                       futures_timeout:           Duration,
164                                       oldies_pipeline_name:      IntoString,
165                                       oldies_pipeline_builder:   impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
166                                       oldies_on_close_callback:  impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
167                                       newies_pipeline_name:      IntoString,
168                                       newies_pipeline_builder:   impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType      + Send + Sync + 'static,
169                                       newies_on_close_callback:  impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
170                                       on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                               -> ErrVoidAsyncType         + Send + Sync + 'static)
171
172                                      -> Result<(), Box<dyn std::error::Error>> {
173
174        let ((oldies_in_stream, oldies_in_stream_id),
175             (newies_in_stream, newies_in_stream_id)) = self.channel.create_streams_for_old_and_new_events();
176
177        let cloned_self = Arc::clone(self);
178        let oldies_pipeline_name = oldies_pipeline_name.into();
179        let newies_pipeline_name = Arc::new(newies_pipeline_name.into());
180        let on_err_callback_ref1 = Arc::new(on_err_callback);
181        let on_err_callback_ref2 = Arc::clone(&on_err_callback_ref1);
182        let oldies_out_stream = oldies_pipeline_builder(oldies_in_stream);
183        let newies_out_stream = newies_pipeline_builder(newies_in_stream);
184
185        match sequential_transition {
186            true => {
187                self.spawn_executor_from_stream(concurrency_limit, futures_timeout, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
188                                                move |err| on_err_callback_ref1(err),
189                                                move |executor| {
190                                                    let cloned_self = Arc::clone(&cloned_self);
191                                                    let on_err_callback_ref2 = Arc::clone(&on_err_callback_ref2);
192                                                    let newies_pipeline_name = Arc::clone(&newies_pipeline_name);
193                                                    async move {
194                                                        cloned_self.spawn_executor_from_stream(concurrency_limit, futures_timeout, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
195                                                                                              move |err| on_err_callback_ref2(err),
196                                                                                              newies_on_close_callback).await
197                                                            .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies`/sequential executor: {:?}", err))
198                                                            .expect("CANNOT SPAWN NEWIES EXECUTOR AFTER OLDIES HAD COMPLETE");
199                                                        oldies_on_close_callback(executor).await;
200                                                    }
201                                                } ).await
202                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies`/sequential executor: {:?}", err))?;
203            },
204            false => {
205                self.spawn_executor_from_stream(concurrency_limit, futures_timeout, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
206                                                move |err| on_err_callback_ref1(err),
207                                                oldies_on_close_callback).await
208                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies` executor: {:?}", err))?;
209                self.spawn_executor_from_stream(concurrency_limit, futures_timeout, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
210                                                move |err| on_err_callback_ref2(err),
211                                                newies_on_close_callback).await
212                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies` executor: {:?}", err))?;
213            },
214        }
215        Ok(())
216    }
217
218    /// Internal method with common code for [Self::spawn_executor()] & [Self::spawn_oldies_executor()].
219    async fn spawn_executor_from_stream<IntoString:             Into<String>,
220                                        OutItemType:            Send + Debug,
221                                        OutStreamType:          Stream<Item=OutType> + Send + 'static,
222                                        OutType:                Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
223                                        ErrVoidAsyncType:       Future<Output=()>   + Send + 'static,
224                                        CloseVoidAsyncType:     Future<Output=()>   + Send + 'static>
225
226                                       (&self,
227                                        concurrency_limit:         u32,
228                                        futures_timeout:           Duration,
229                                        pipeline_name:             IntoString,
230                                        stream_id:                 u32,
231                                        pipelined_stream:          OutStreamType,
232                                        on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)       -> ErrVoidAsyncType   + Send + Sync + 'static,
233                                        on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
234
235                                       -> Result<(), Box<dyn std::error::Error>> {
236
237        let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(format!("{}: {}", self.name(), pipeline_name.into()), futures_timeout);
238        self.add_executor(executor.clone(), stream_id).await?;
239        executor
240            .spawn_executor::<_, _, _, _>(
241                concurrency_limit,
242                on_err_callback,
243                on_close_callback,
244                pipelined_stream
245            );
246        Ok(())
247    }
248
249    /// Spawns a new listener of all subsequent events sent to this `Multi`, processing them through the `Stream` returned by `pipeline_builder()`,
250    /// which generates events that are Futures.
251    pub async fn spawn_futures_executor<IntoString:             Into<String>,
252                                        OutItemType:            Send + Debug,
253                                        OutStreamType:          Stream<Item=OutType> + Send + 'static,
254                                        OutType:                Future<Output=OutItemType> + Send,
255                                        CloseVoidAsyncType:     Future<Output=()> + Send + 'static>
256
257                                       (&self,
258                                        concurrency_limit:         u32,
259                                        futures_timeout:           Duration,
260                                        pipeline_name:             IntoString,
261                                        pipeline_builder:          impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
262                                        on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> CloseVoidAsyncType + Send + Sync + 'static)
263
264                                       -> Result<(), Box<dyn std::error::Error>> {
265
266        let (in_stream, in_stream_id) = self.channel.create_stream_for_new_events();
267        let out_stream = pipeline_builder(in_stream);
268        self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, pipeline_name, in_stream_id, out_stream, on_close_callback).await
269    }
270
271    /// For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this `Multi`:
272    ///   1) One for past events -- to be processed by the stream returned by `oldies_pipeline_builder()`;
273    ///   2) Another one for subsequent events -- to be processed by the stream returned by `newies_pipeline_builder()`.
274    /// 
275    /// By using this method, it is assumed that both pipeline builders returns `Future` events. If this is not so, see one of the sibling methods.\
276    /// The stream splitting is guaranteed not to drop any events and `sequential_transition` may be used to indicate if old events should be processed first or if both old and new events
277    /// may be processed simultaneously (in an inevitable out-of-order fashion).
278    pub async fn spawn_futures_oldies_executor<IntoString:               Into<String>,
279                                               OutItemType:              Send + Debug,
280                                               OldiesOutStreamType:      Stream<Item=OldiesOutType> + Sync + Send + 'static,
281                                               NewiesOutStreamType:      Stream<Item=NewiesOutType> + Sync + Send + 'static,
282                                               OldiesOutType:            Future<Output=OutItemType> + Send,
283                                               NewiesOutType:            Future<Output=OutItemType> + Send,
284                                               OldiesCloseVoidAsyncType: Future<Output=()> + Send + 'static,
285                                               NewiesCloseVoidAsyncType: Future<Output=()> + Send + 'static>
286
287                                              (self:                      &Arc<Self>,
288                                               concurrency_limit:         u32,
289                                               sequential_transition:     bool,
290                                               futures_timeout:           Duration,
291                                               oldies_pipeline_name:      IntoString,
292                                               oldies_pipeline_builder:   impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
293                                               oldies_on_close_callback:  impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
294                                               newies_pipeline_name:      IntoString,
295                                               newies_pipeline_builder:   impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType       + Send + Sync + 'static,
296                                               newies_on_close_callback:  impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> NewiesCloseVoidAsyncType  + Send + Sync + 'static)
297
298                                              -> Result<(), Box<dyn std::error::Error>> {
299
300        let ((oldies_in_stream, oldies_in_stream_id),
301             (newies_in_stream, newies_in_stream_id)) = self.channel.create_streams_for_old_and_new_events();
302
303        let cloned_self = Arc::clone(self);
304        let oldies_pipeline_name = oldies_pipeline_name.into();
305        let newies_pipeline_name = Arc::new(newies_pipeline_name.into());
306        let oldies_out_stream = oldies_pipeline_builder(oldies_in_stream);
307        let newies_out_stream = newies_pipeline_builder(newies_in_stream);
308
309        match sequential_transition {
310            true => {
311                self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
312                                                        move |executor| {
313                                                            let cloned_self = Arc::clone(&cloned_self);
314                                                            let newies_pipeline_name = Arc::clone(&newies_pipeline_name);
315                                                            async move {
316                                                                cloned_self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
317                                                                                                               newies_on_close_callback).await
318                                                                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies`/sequential executor: {:?}", err))
319                                                                    .expect("CANNOT SPAWN NEWIES EXECUTOR AFTER OLDIES HAD COMPLETE");
320                                                                oldies_on_close_callback(executor).await;
321                                                            }
322                                                        } ).await
323                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies`/sequential executor: {:?}", err))?;
324            },
325            false => {
326                self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
327                                                        oldies_on_close_callback).await
328                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies` executor: {:?}", err))?;
329                self.spawn_futures_executor_from_stream(concurrency_limit, futures_timeout, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
330                                                        newies_on_close_callback).await
331                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies` executor: {:?}", err))?;
332            },
333        }
334        Ok(())
335    }
336
337    /// Internal method with common code for [Self::spawn_futures_executor()] & [Self::spawn_futures_oldies_executor()].
338    async fn spawn_futures_executor_from_stream<IntoString:             Into<String>,
339                                                OutItemType:            Send + Debug,
340                                                OutStreamType:          Stream<Item=OutType>       + Send + 'static,
341                                                OutType:                Future<Output=OutItemType> + Send,
342                                                CloseVoidAsyncType:     Future<Output=()>          + Send + 'static>
343
344                                               (&self,
345                                                concurrency_limit:         u32,
346                                                futures_timeout:           Duration,
347                                                pipeline_name:             IntoString,
348                                                stream_id:                 u32,
349                                                pipelined_stream:          OutStreamType,
350                                                on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
351
352                                               -> Result<(), Box<dyn std::error::Error>> {
353
354        let executor = StreamExecutor::<INSTRUMENTS>::with_futures_timeout(format!("{}: {}", self.name(), pipeline_name.into()), futures_timeout);
355        self.add_executor(executor.clone(), stream_id).await?;
356        executor
357            .spawn_futures_executor::<_, _, _>(
358                concurrency_limit,
359                on_close_callback,
360                pipelined_stream
361            );
362        Ok(())
363    }
364
365    /// Spawns a new listener of all subsequent events sent to this `Multi`, processing them through the `Stream` returned by `pipeline_builder()`,
366    /// which generates events that are Fallible.
367    pub async fn spawn_fallibles_executor<IntoString:             Into<String>,
368                                          OutItemType:            Send + Debug,
369                                          OutStreamType:          Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
370                                          CloseVoidAsyncType:     Future<Output=()> + Send + 'static>
371
372                                         (&self,
373                                          concurrency_limit:         u32,
374                                          pipeline_name:             IntoString,
375                                          pipeline_builder:          impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
376                                          on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                                                     + Send + Sync + 'static,
377                                          on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> CloseVoidAsyncType + Send + Sync + 'static)
378
379                                         -> Result<(), Box<dyn std::error::Error>> {
380
381        let (in_stream, in_stream_id) = self.channel.create_stream_for_new_events();
382        let out_stream = pipeline_builder(in_stream);
383        self.spawn_fallibles_executor_from_stream(concurrency_limit, pipeline_name, in_stream_id, out_stream, on_err_callback, on_close_callback).await
384    }
385
386    /// For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this `Multi`:
387    ///   1) One for past events -- to be processed by the stream returned by `oldies_pipeline_builder()`;
388    ///   2) Another one for subsequent events -- to be processed by the stream returned by `newies_pipeline_builder()`.
389    /// 
390    /// By using this method, it is assumed that both pipeline builders returns Fallible events. If this is not so, see one of the sibling methods.\
391    /// The stream splitting is guaranteed not to drop any events and `sequential_transition` may be used to indicate if old events should be processed first or if both old and new events
392    /// may be processed simultaneously (in an inevitable out-of-order fashion).
393    pub async fn spawn_fallibles_oldies_executor<IntoString:               Into<String>,
394                                                 OutItemType:              Send + Debug,
395                                                 OldiesOutStreamType:      Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Sync + Send + 'static,
396                                                 NewiesOutStreamType:      Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Sync + Send + 'static,
397                                                 OldiesCloseVoidAsyncType: Future<Output=()> + Send + 'static,
398                                                 NewiesCloseVoidAsyncType: Future<Output=()> + Send + 'static>
399
400                                                (self:                      &Arc<Self>,
401                                                 concurrency_limit:         u32,
402                                                 sequential_transition:     bool,
403                                                 oldies_pipeline_name:      IntoString,
404                                                 oldies_pipeline_builder:   impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
405                                                 oldies_on_close_callback:  impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
406                                                 newies_pipeline_name:      IntoString,
407                                                 newies_pipeline_builder:   impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType      + Send + Sync + 'static,
408                                                 newies_on_close_callback:  impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> NewiesCloseVoidAsyncType + Send + Sync + 'static,
409                                                 on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                                                           + Send + Sync + 'static)
410
411                                                -> Result<(), Box<dyn std::error::Error>> {
412
413        let ((oldies_in_stream, oldies_in_stream_id),
414             (newies_in_stream, newies_in_stream_id)) = self.channel.create_streams_for_old_and_new_events();
415
416        let cloned_self = Arc::clone(self);
417        let oldies_pipeline_name = oldies_pipeline_name.into();
418        let newies_pipeline_name = Arc::new(newies_pipeline_name.into());
419        let on_err_callback_ref1 = Arc::new(on_err_callback);
420        let on_err_callback_ref2 = Arc::clone(&on_err_callback_ref1);
421        let oldies_out_stream = oldies_pipeline_builder(oldies_in_stream);
422        let newies_out_stream = newies_pipeline_builder(newies_in_stream);
423
424        match sequential_transition {
425            true => {
426                self.spawn_fallibles_executor_from_stream(concurrency_limit, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
427                                                         move |err| on_err_callback_ref1(err),
428                                                         move |executor| {
429                                                             let cloned_self = Arc::clone(&cloned_self);
430                                                             let on_err_callback_ref2 = Arc::clone(&on_err_callback_ref2);
431                                                             let newies_pipeline_name = Arc::clone(&newies_pipeline_name);
432                                                             async move {
433                                                                 cloned_self.spawn_fallibles_executor_from_stream(concurrency_limit, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
434                                                                                                                  move |err| on_err_callback_ref2(err),
435                                                                                                                  newies_on_close_callback).await
436                                                                     .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies`/sequential executor: {:?}", err))
437                                                                     .expect("CANNOT SPAWN NEWIES EXECUTOR AFTER OLDIES HAD COMPLETE");
438                                                                 oldies_on_close_callback(executor).await;
439                                                             }
440                                                         } ).await
441                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies`/sequential executor: {:?}", err))?;
442            },
443            false => {
444                self.spawn_fallibles_executor_from_stream(concurrency_limit, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
445                                                          move |err| on_err_callback_ref1(err),
446                                                          oldies_on_close_callback).await
447                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `oldies` executor: {:?}", err))?;
448                self.spawn_fallibles_executor_from_stream(concurrency_limit, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
449                                                          move |err| on_err_callback_ref2(err),
450                                                          newies_on_close_callback).await
451                    .map_err(|err| format!("Multi::spawn_oldies_executor(): could not start `newies` executor: {:?}", err))?;
452            },
453        }
454        Ok(())
455    }
456
457    /// Internal method with common code for [Self::spawn_fallibles_executor()] & [Self::spawn_oldies_fallibles_executor()].
458    async fn spawn_fallibles_executor_from_stream<IntoString:             Into<String>,
459                                                  OutItemType:            Send + Debug,
460                                                  OutStreamType:          Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
461                                                  CloseVoidAsyncType:     Future<Output=()> + Send + 'static>
462
463                                       (&self,
464                                        concurrency_limit:         u32,
465                                        pipeline_name:             IntoString,
466                                        stream_id:                 u32,
467                                        pipelined_stream:          OutStreamType,
468                                        on_err_callback:           impl Fn(Box<dyn std::error::Error + Send + Sync>)                             + Send + Sync + 'static,
469                                        on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
470
471                                       -> Result<(), Box<dyn std::error::Error>> {
472
473        let executor = StreamExecutor::<INSTRUMENTS>::new(format!("{}: {}", self.name(), pipeline_name.into()));
474        self.add_executor(executor.clone(), stream_id).await?;
475        executor
476            .spawn_fallibles_executor::<_, _>(
477                concurrency_limit,
478                on_err_callback,
479                on_close_callback,
480                pipelined_stream
481            );
482        Ok(())
483    }
484
485    /// Spawns a new listener of all subsequent events sent to this `Multi`, processing them through the `Stream` returned by `pipeline_builder()`,
486    /// which generates events that are Non-Futures & Non-Fallible.
487    pub async fn spawn_non_futures_non_fallible_executor<IntoString:         Into<String>,
488                                                         OutItemType:        Send + Debug,
489                                                         OutStreamType:      Stream<Item=OutItemType> + Send + 'static,
490                                                         CloseVoidAsyncType: Future<Output=()>        + Send + 'static>
491
492                                                        (&self,
493                                                         concurrency_limit:        u32,
494                                                         pipeline_name:            IntoString,
495                                                         pipeline_builder:         impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OutStreamType,
496                                                         on_close_callback:        impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> CloseVoidAsyncType + Send + Sync + 'static)
497
498                                                        -> Result<(), Box<dyn std::error::Error>> {
499
500        let (in_stream, in_stream_id) = self.channel.create_stream_for_new_events();
501        let out_stream = pipeline_builder(in_stream);
502        self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, pipeline_name, in_stream_id, out_stream, on_close_callback).await
503    }
504
505    /// For channels that allow it (like [channels::reference::mmap_log::MmapLog]), spawns two listeners for events sent to this `Multi`:
506    ///   1) One for past events -- to be processed by the stream returned by `oldies_pipeline_builder()`;
507    ///   2) Another one for subsequent events -- to be processed by the stream returned by `newies_pipeline_builder()`.
508    /// 
509    /// By using this method, it is assumed that both pipeline builders returns non-Futures & non-Fallible events. If this is not so, see [spawn_oldies_executor].\
510    /// The stream splitting is guaranteed not to drop any events and `sequential_transition` may be used to indicate if old events should be processed first or if both old and new events
511    /// may be processed simultaneously (in an inevitable out-of-order fashion).
512    pub async fn spawn_non_futures_non_fallible_oldies_executor<IntoString:               Into<String>,
513                                                                OldiesOutItemType:        Send + Debug,
514                                                                NewiesOutItemType:        Send + Debug,
515                                                                OldiesOutStreamType:      Stream<Item=OldiesOutItemType> + Sync + Send + 'static,
516                                                                NewiesOutStreamType:      Stream<Item=NewiesOutItemType> + Sync + Send + 'static,
517                                                                OldiesCloseVoidAsyncType: Future<Output=()> + Send + 'static,
518                                                                NewiesCloseVoidAsyncType: Future<Output=()> + Send + 'static>
519
520                                                               (self:                     &Arc<Self>,
521                                                                concurrency_limit:        u32,
522                                                                sequential_transition:    bool,
523                                                                oldies_pipeline_name:     IntoString,
524                                                                oldies_pipeline_builder:  impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> OldiesOutStreamType,
525                                                                oldies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> OldiesCloseVoidAsyncType + Send + Sync + 'static,
526                                                                newies_pipeline_name:     IntoString,
527                                                                newies_pipeline_builder:  impl FnOnce(MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>) -> NewiesOutStreamType,
528                                                                newies_on_close_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>)                         -> NewiesCloseVoidAsyncType + Send + Sync + 'static)
529
530                                                               -> Result<(), Box<dyn std::error::Error>> {
531
532        let ((oldies_in_stream, oldies_in_stream_id),
533             (newies_in_stream, newies_in_stream_id)) = self.channel.create_streams_for_old_and_new_events();
534
535        let cloned_self = Arc::clone(self);
536        let oldies_pipeline_name = oldies_pipeline_name.into();
537        let newies_pipeline_name = Arc::new(newies_pipeline_name.into());
538        let oldies_out_stream = oldies_pipeline_builder(oldies_in_stream);
539        let newies_out_stream = newies_pipeline_builder(newies_in_stream);
540
541        match sequential_transition {
542            true => {
543                self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
544                                                                         move |executor| {
545                                                                             let cloned_self = Arc::clone(&cloned_self);
546                                                                             let newies_pipeline_name = Arc::clone(&newies_pipeline_name);
547                                                                             async move {
548                                                                                 cloned_self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
549                                                                                                                                                newies_on_close_callback).await
550                                                                                     .map_err(|err| format!("Multi::spawn_non_futures_non_fallible_oldies_executor(): could not start `newies` executor: {:?}", err))
551                                                                                     .expect("CANNOT SPAWN NEWIES EXECUTOR AFTER OLDIES HAD COMPLETE");
552                                                                                 oldies_on_close_callback(executor).await;
553                                                                             }
554                                                                         }).await
555                    .map_err(|err| format!("Multi::spawn_non_futures_non_fallible_oldies_executor(): could not start `oldies`/sequential executor: {:?}", err))?;
556
557            },
558            false => {
559                self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, oldies_pipeline_name, oldies_in_stream_id, oldies_out_stream,
560                                                                         oldies_on_close_callback).await
561                    .map_err(|err| format!("Multi::spawn_non_futures_non_fallible_oldies_executor(): could not start `oldies` executor: {:?}", err))?;
562                self.spawn_non_futures_non_fallible_executor_from_stream(concurrency_limit, newies_pipeline_name.as_str(), newies_in_stream_id, newies_out_stream,
563                                                                         newies_on_close_callback).await
564                    .map_err(|err| format!("Multi::spawn_non_futures_non_fallible_oldies_executor(): could not start `newies` executor: {:?}", err))?;
565            },
566        }
567        Ok(())
568    }
569
570    /// Internal method with common code for [spawn_non_futures_non_fallible_executor()] & [spawn_non_futures_non_fallible_oldies_executor()].
571    async fn spawn_non_futures_non_fallible_executor_from_stream<IntoString:             Into<String>,
572                                                                 OutItemType:            Send + Debug,
573                                                                 OutStreamType:          Stream<Item=OutItemType> + Send + 'static,
574                                                                 CloseVoidAsyncType:     Future<Output=()> + Send + 'static>
575
576                                                                (&self,
577                                                                 concurrency_limit:         u32,
578                                                                 pipeline_name:             IntoString,
579                                                                 stream_id:                 u32,
580                                                                 pipelined_stream:          OutStreamType,
581                                                                 on_close_callback:         impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static)
582
583                                                                -> Result<(), Box<dyn std::error::Error>> {
584
585        let executor = StreamExecutor::<INSTRUMENTS>::new(format!("{}: {}", self.name(), pipeline_name.into()));
586        self.add_executor(executor.clone(), stream_id).await?;
587        executor
588            .spawn_non_futures_non_fallibles_executor::<_, _>(
589                concurrency_limit,
590                on_close_callback,
591                pipelined_stream
592            );
593        Ok(())
594    }
595
596    /// Closes this `Multi`, in isolation -- flushing pending events, closing the producers,
597    /// waiting for all events to be fully processed and calling all executor's "on close" callbacks.\
598    /// If this `Multi` share resources with another one (which will get dumped by the "on close"
599    /// callback), most probably you want to close them atomically -- see [multis_close_async!()].\
600    /// Returns `true` if all events could be flushed within the given `timeout`.
601    pub async fn close(&self, timeout: Duration) -> bool {
602        self.channel.gracefully_end_all_streams(timeout).await == 0
603    }
604
605    /// Asynchronously blocks until all resources associated with the executor responsible for `pipeline_name` are freed:
606    ///   1) immediately causes `pipeline_name` to cease receiving new elements by removing it from the active list
607    ///   2) wait for all pending elements to be processed
608    ///   3) remove the queue/channel and wake the Stream to see that it has ended
609    ///   4) waits for the executor to inform it ceased its execution
610    ///   5) return, dropping all resources.
611    /// 
612    /// Note it might make sense to spawn this operation by a `Tokio task`, for it may block indefinitely if the Stream has no timeout.\
613    /// Also note that timing out this operation is not advisable, for resources won't be freed until it reaches the last step.\
614    /// Returns false if there was no executor associated with `pipeline_name`.
615    #[must_use = "futures do nothing unless you `.await` or poll them"]
616    pub async fn flush_and_cancel_executor<IntoString: Into<String>>
617                                          (&self,
618                                           pipeline_name: IntoString,
619                                           timeout:       Duration) -> bool {
620
621        let executor_name = format!("{}: {}", self.multi_name, pipeline_name.into());
622        // remove the pipeline from the active list
623        let mut executor_infos = self.executor_infos.write().await;
624        let executor_info = match executor_infos.swap_remove(&executor_name) {
625            Some(executor) => executor,
626            None => return false,
627        };
628        drop(executor_infos);
629
630        // wait until all elements are taken out from the queue
631        executor_info.executor_stats.report_scheduled_to_finish();
632        self.channel.gracefully_end_stream(executor_info.stream_id, timeout).await;
633        true
634    }
635
636    /// Registers an executor within this `Multi` so it can be managed -- closed, inquired for stats, etc
637    async fn add_executor(&self, stream_executor: Arc<dyn StreamExecutorStats + Send + Sync>, stream_id: u32) -> Result<(), Box<dyn std::error::Error>> {
638        let mut internal_multis = self.executor_infos.write().await;
639        if internal_multis.contains_key(stream_executor.executor_name()) {
640            Err(Box::from(format!("an executor with the same name is already present: '{}'", stream_executor.executor_name())))
641        } else {
642            internal_multis.insert(stream_executor.executor_name().to_owned(), ExecutorInfo { executor_stats: stream_executor, stream_id });
643            Ok(())
644        }
645    }
646
647}
648
649
650/// This trait exists to allow simplifying generic declarations of concrete [Multi] types.
651/// See also [GenericUni].\
652/// Usage:
653/// ```nocompile
654///     struct MyGenericStruct<T: GenericMulti> { the_multi: T }
655///     let the_multi = Multi<Lots,And,Lots<Of,Generic,Arguments>>::new();
656///     let my_struct = MyGenericStruct { the_multi };
657pub trait GenericMulti<const INSTRUMENTS: usize> {
658    /// The instruments this Multi will collect/report
659    const INSTRUMENTS: usize;
660    /// The payload type this Multi's producers will receive
661    type ItemType: Debug + Sync + Send + 'static;
662    /// The payload type this [Multi]'s `Stream`s will yield
663    type DerivedItemType: Debug + Sync + Send + 'static;
664    /// The channel through which payloads will travel from producers to listeners (see [Multi] for more info)
665    type MultiChannelType: FullDuplexMultiChannel<ItemType=Self::ItemType, DerivedItemType=Self::DerivedItemType> + Sync + Send;
666    /// Defined as `MutinyStream<'static, ItemType, MultiChannelType, DerivedItemType>`,\
667    /// the concrete type for the `Stream` of `DerivedItemType`s to be given to listeners
668    type MutinyStreamType;
669
670    /// See [Multi::new()]
671    fn new<IntoString: Into<String>>(multi_name: IntoString) -> Self;
672    fn to_multi(self) -> Multi<Self::ItemType, Self::MultiChannelType, INSTRUMENTS, Self::DerivedItemType>;
673}
674
675
676/// Macro to close, atomically-ish, all [Multi]s passed in as parameters
677#[macro_export]
678macro_rules! multis_close_async {
679    ($timeout: expr,
680     $($multi: expr),+) => {
681        {
682            tokio::join!( $( $multi.channel.flush($timeout), )+ );
683            tokio::join!( $( $multi.channel.gracefully_end_all_streams($timeout), )+ );
684        }
685    }
686}
687pub use multis_close_async;
688use crate::stream_executor::StreamExecutorStats;
689pub use crate::types::ChannelCommon;
690
691/// Keeps track of the `stream_executor` associated to each `stream_id`
692pub struct ExecutorInfo {
693    pub executor_stats: Arc<dyn StreamExecutorStats + Send + Sync>,
694    pub stream_id:      u32,
695}