reactive_mutiny/multi/
mod.rs

1//! Allows creating `multi`s, which represent pairs of (`producer`, `event pipeline`) that may be used to
2//! `produce()` asynchronous payloads to be processed by a multiple `event pipeline` Streams -- and executed
3//! by async tasks.
4//!
5//! Usage:
6//! ```nocompile
7//!    fn local_on_event(stream: impl Stream<Item=String>) -> impl Stream<Item=Arc<String>> {
8//!        stream.inspect(|message| println!("To Zeta: '{}'", message))
9//!    }
10//!    fn zeta_on_event(stream: impl Stream<Item=String>) -> impl Stream<Item=Arc<String>> {
11//!        stream.inspect(|message| println!("ZETA: Received a message: '{}'", message))
12//!    }
13//!    fn earth_on_event(stream: impl Stream<Item=String>) -> impl Stream<Item=Arc<String>> {
14//!        stream.inspect(|sneak_peeked_message| println!("EARTH: Sneak peeked a message to Zeta Reticuli: '{}'", sneak_peeked_message))
15//!    }
16//!    let multi = MultiBuilder::new()
17//!        .on_stream_close(|_| async {})
18//!        .into_executable()
19//!        .spawn_non_futures_non_fallible_executor("doc_test() local onEvent()", local_on_event).await
20//!        .spawn_non_futures_non_fallible_executor("doc_test() zeta  onEvent()", zeta_on_event).await
21//!        .spawn_non_futures_non_fallible_executor("doc_test() earth onEvent()", earth_on_event).await
22//!        .handle();
23//!    let producer = multi.producer_closure();
24//!    producer("I've just arrived!".to_string()).await;
25//!    producer("Nothing really interesting here... heading back home!".to_string()).await;
26//!    multi.close().await;
27//! ```
28
29mod multi;
30pub use multi::*;
31
32pub mod channels;
33
34
35/// Tests & enforces the requisites & expose good practices & exercises the API of of the [multi](self) module
36#[cfg(any(test,doc))]
37mod tests {
38    use super::*;
39    use super::super::{
40        instruments::Instruments,
41        mutiny_stream::MutinyStream,
42        types::FullDuplexMultiChannel,
43    };
44    use std::{
45        future::Future,
46        sync::{
47            Arc,
48            atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering::Relaxed},
49        },
50        time::{
51            Duration,
52            SystemTime,
53        },
54        io::Write,
55    };
56    use futures::stream::{self, Stream, StreamExt};
57    use minstant::Instant;
58    use tokio::sync::Mutex;
59
60
61    type MultiChannelType<ItemType, const BUFFER_SIZE: usize, const MAX_STREAMS: usize> = channels::arc::crossbeam::Crossbeam<'static, ItemType, BUFFER_SIZE, MAX_STREAMS>;
62
63
64    #[ctor::ctor]
65    fn suite_setup() {
66        simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
67    }
68
69    /// exercises the code present on the documentation
70    #[cfg_attr(not(doc),tokio::test)]
71    async fn doc_tests() -> Result<(), Box<dyn std::error::Error>> {
72        fn local_on_event(stream: impl Stream<Item=Arc<String>>) -> impl Stream<Item=Arc<String>> {
73            stream.inspect(|message| println!("To Zeta: '{}'", message))
74        }
75        fn zeta_on_event(stream: impl Stream<Item=Arc<String>>) -> impl Stream<Item=Arc<String>> {
76            stream.inspect(|message| println!("ZETA: Received a message: '{}'", message))
77        }
78        fn earth_on_event(stream: impl Stream<Item=Arc<String>>) -> impl Stream<Item=Arc<String>> {
79            stream.inspect(|sneak_peeked_message| println!("EARTH: Sneak peeked a message to Zeta Reticuli: '{}'", sneak_peeked_message))
80        }
81        let multi: Multi<String, MultiChannelType<String, 1024, 4>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new("doc_test() event");
82        multi.spawn_non_futures_non_fallible_executor(1, "local screen", local_on_event, |_| async {}).await?;
83        multi.spawn_non_futures_non_fallible_executor(1, "zeta receiver", zeta_on_event, |_| async {}).await?;
84        multi.spawn_non_futures_non_fallible_executor(1, "earth snapper", earth_on_event, |_| async {}).await?;
85        let producer = |item: &str| multi.send(item.to_string());
86        producer("I've just arrived!");
87        producer("Nothing really interesting here... heading back home!");
88        multi.close(Duration::ZERO).await;
89        Ok(())
90    }
91
92    /// guarantees that one of the simplest possible testable 'multi' pipelines will get executed all the way through
93    #[cfg_attr(not(doc),tokio::test)]
94    async fn simple_pipelines() -> Result<(), Box<dyn std::error::Error>> {
95        const EXPECTED_SUM: u32 = 17;
96        const PARTS: &[u32] = &[9, 8];
97
98        // two pipelines are set for this test -- these are the variables they change
99        let observed_sum_1 = Arc::new(AtomicU32::new(0));
100        let observed_sum_2 = Arc::new(AtomicU32::new(0));
101
102        let multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("Simple Event");
103            // #1 -- event pipeline
104        multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #1",
105                                                      |stream| {
106                                                          let observed_sum = Arc::clone(&observed_sum_1);
107                                                          stream.map(move |number: Arc<u32>| observed_sum.fetch_add(*number, Relaxed))
108                                                      },
109                                                      |_| async {}).await?;
110            // #2 -- event pipeline
111        multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #2",
112                                                      |stream| {
113                                                           let observed_sum = Arc::clone(&observed_sum_2);
114                                                           stream.map(move |number| observed_sum.fetch_add(*number, Relaxed))
115                                                       },
116                                                      |_| async {}).await?;
117        let producer = |item| multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
118
119        // produces some events concurrently -- they will be shared with all executable pipelines
120        let shared_producer = &producer;
121        stream::iter(PARTS)
122            .for_each_concurrent(1, |number| async move {
123                shared_producer(*number).expect_ok("Couldn't produce");
124            }).await;
125
126        multi.close(Duration::ZERO).await;
127        assert_eq!(observed_sum_1.load(Relaxed), EXPECTED_SUM, "not all events passed through our pipeline #1");
128        assert_eq!(observed_sum_2.load(Relaxed), EXPECTED_SUM, "not all events passed through our pipeline #2");
129        Ok(())
130    }
131
132    /// shows how pipelines / executors may be cancelled / deleted / unsubscribed
133    /// from the main event producer
134    #[cfg_attr(not(doc),tokio::test)]
135    async fn delete_pipelines() {
136        const PIPELINE_1: &str     = "Pipeline #1";
137        const PIPELINE_2: &str     = "Pipeline #2";
138        const TIMEOUT:    Duration = Duration::ZERO;
139
140        let multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("Event with come and go pipelines");
141
142        // correct creating & cancelling executors
143        multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_1, |s| s, |_| async {}).await
144            .expect("Single instance of PIPELINE_1 should have been created");
145        multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_2, |s| s, |_| async {}).await
146            .expect("Single instance of PIPELINE_2 should have been created");
147        assert!(multi.flush_and_cancel_executor(PIPELINE_1, TIMEOUT).await, "'{}' was spawned, therefore it should have been cancelled", PIPELINE_1);
148        assert!(multi.flush_and_cancel_executor(PIPELINE_2, TIMEOUT).await, "'{}' was spawned, therefore it should have been cancelled", PIPELINE_2);
149
150        // attempt to double create -- which is an error condition
151        multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_1, |s| s, |_| async {}).await
152            .expect("Single instance of PIPELINE_1 should have been created");
153        let result = multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_1, |s| s, |_| async {}).await;
154        assert!(result.is_err(), "Second attempt to insert PIPELINE_1 should have failed");
155
156            // attempt to double cancel
157        assert!(multi.flush_and_cancel_executor(PIPELINE_1, TIMEOUT).await, "'{}' was spawned once, therefore it should have been cancelled", PIPELINE_1);
158        assert!(!multi.flush_and_cancel_executor(PIPELINE_1, TIMEOUT).await, "A pipeline cannot be cancelled twice");
159
160        // attempt to cancel non-existing
161        assert!(!multi.flush_and_cancel_executor(PIPELINE_2, TIMEOUT).await, "An unexisting pipeline cannot be reported as having been cancelled");
162
163        // stress test
164        // (maybe this may be improved by detecting any memory leaks)
165        for _ in 0..128 {
166            multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_1, |s| s, |_| async {}).await
167                .expect("Single instance of PIPELINE_1 should have been created");
168            multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_2, |s| s, |_| async {}).await
169                .expect("Single instance of PIPELINE_2 should have been created");
170            assert!(multi.flush_and_cancel_executor(PIPELINE_1, TIMEOUT).await, "'{}' was spawned, therefore it should have been cancelled", PIPELINE_1);
171            assert!(multi.flush_and_cancel_executor(PIPELINE_2, TIMEOUT).await, "'{}' was spawned, therefore it should have been cancelled", PIPELINE_2);
172        }
173
174        // finally, produce an event to check that all is fine
175        let last_message = Arc::new(Mutex::new(0u32));
176        let last_message_ref = Arc::clone(&last_message);
177        multi.spawn_non_futures_non_fallible_executor(1, PIPELINE_2,
178                                                      |s| s.inspect(move |n| *last_message_ref.try_lock().unwrap() = *(n as &u32)),
179                                                      |_| async {} ).await
180            .expect("Single instance of PIPELINE_2 should have been created");
181
182        multi.send_with(|slot| *slot = 97).expect_ok("couldn't send");
183        multi.close(Duration::ZERO).await;
184
185        assert_eq!(*last_message.try_lock().unwrap(), 97, "event didn't complete");
186
187        // executor will be a tokio task -- join handle -- which we may simply cancel
188        // multi_builder might have a copy of it? or we should let the caller to hold it & pass it on? I should write a realistic test to check on that
189        // depending on the decision above, the key would be the pipeline name... and I must assure no two pipelines are added with the same name, I should assert they won't be deleted twice, etc...
190        // maybe we do have to key the tokio join handles anyway... because deleting an executor / pipeline means it should be taken out of the pipelines vector -- or, now, the pipelines hashmap
191    }
192
193    /// shows how we may call async functions inside `multi` pipelines
194    /// and work with "future" elements
195    #[cfg_attr(not(doc),tokio::test)]
196    async fn async_elements() -> Result<(), Box<dyn std::error::Error>> {
197        const EXPECTED_SUM: u32 = 30;
198        const PARTS: &[u32] = &[9, 8, 7, 6];
199
200        // each pipeline will report their execution to the following variables
201        let observed_sum_1 = Arc::new(AtomicU32::new(0));
202        let observed_sum_2 = Arc::new(AtomicU32::new(0));
203
204        // now goes the stream builder functions -- or pipeline builders.
205        // notice how to transform a regular event into a future event &
206        // how to pass it down the pipeline. Also notice the required (as of Rust 1.63)
207        // moving of Arc local variables so they will be accessible
208
209        let pipeline1_builder = |stream: MutinyStream<'static, u32, _, Arc<u32>>| {
210            let observed_sum = Arc::clone(&observed_sum_1);
211            stream
212                .map(|number| async move {
213                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
214                    number
215                })
216                .map(move |number| {
217                    let observed_sum = Arc::clone(&observed_sum);
218                    async move {
219                        let number = number.await;
220                        observed_sum.fetch_add(*number, Relaxed);
221                        number
222                    }
223                })
224                .map(|number| async move {
225                    let number = number.await;
226                    println!("PIPELINE 1: Just added # {}", number);
227                    Ok(number)
228                })
229        };
230        let pipeline2_builder = |stream: MutinyStream<'static, u32, _, Arc<u32>>| {
231            let observed_sum = Arc::clone(&observed_sum_2);
232            stream
233                .map(|number| async move {
234                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
235                    number
236                })
237                .map(move |number| {
238                    let observed_sum = Arc::clone(&observed_sum);
239                    async move {
240                        let number = number.await;
241                        observed_sum.fetch_add(*number, Relaxed);
242                        number
243                    }
244                })
245                .map(|number| async move {
246                    let number = number.await;
247                    println!("PIPELINE 2: Just added # {}", number);
248                    Ok(number)
249                })
250        };
251
252        let multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("async event");
253        multi.spawn_executor(PARTS.len() as u32, Duration::from_secs(2), "Stream Pipeline #1", pipeline1_builder, |_| async {}, |_| async {}).await?;
254        multi.spawn_executor(PARTS.len() as u32, Duration::from_secs(2), "Stream Pipeline #2", pipeline2_builder, |_| async {}, |_| async {}).await?;
255
256        let producer = |item| multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
257
258        let shared_producer = &producer;
259        stream::iter(PARTS)
260            .for_each_concurrent(1, |number| async move {
261                shared_producer(*number).expect_ok("Couldn't produce");
262            }).await;
263
264        multi.close(Duration::ZERO).await;
265        assert_eq!(observed_sum_1.load(Relaxed), EXPECTED_SUM, "not all events passed through our async pipeline #1");
266        assert_eq!(observed_sum_2.load(Relaxed), EXPECTED_SUM, "not all events passed through our async pipeline #2");
267        Ok(())
268    }
269
270    /// assures stats are computed appropriately for every executor,
271    /// according to the right instrumentation specifications
272    #[cfg_attr(not(doc),tokio::test(flavor="multi_thread", worker_threads=2))]
273    #[ignore]   // flaky if ran in multi-thread: timeout measurements go south
274    async fn stats() -> Result<(), Box<dyn std::error::Error>> {
275        #[cfg(not(debug_assertions))]
276        const N_PIPELINES: usize = 256;
277        #[cfg(debug_assertions)]
278        const N_PIPELINES: usize = 128;
279
280        // asserts spawn_non_futures_non_fallible_executor() register statistics appropriately:
281        // with counters, but without average futures resolution time measurements
282        let event_name = "non_future/non_fallible event";
283        let multi: Multi<String, MultiChannelType<String, 256, N_PIPELINES>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new(event_name);
284        for i in 0..N_PIPELINES {
285            multi.spawn_non_futures_non_fallible_executor(1, format!("Pipeline #{} for {}", i, event_name), |stream| stream, |_| async {}).await?;
286        }
287        let producer = |item: &str| multi.send(item.to_string()).expect_ok("couldn't send");
288        producer("'only count successes' payload");
289        multi.close(Duration::from_secs(5)).await;
290        assert_eq!(N_PIPELINES, multi.executor_infos.read().await.len(), "Number of created pipelines doesn't match");
291        for (i, executor_info) in multi.executor_infos.read().await.values().enumerate() {
292            let (ok_counter, ok_avg_futures_resolution_duration) = executor_info.executor_stats.ok_events_avg_future_duration().lightweight_probe();
293            assert_eq!(ok_counter,                               1,    "counter of successful '{event_name}' events is wrong for pipeline #{i}");
294            assert_eq!(ok_avg_futures_resolution_duration,       -1.0, "avg futures resolution time of successful '{event_name}' events is wrong  for pipeline #{i} -- since it is a non-future, avg times should be always -1.0");
295            let (failures_counter, failures_avg_futures_resolution_duration) = executor_info.executor_stats.failed_events_avg_future_duration().lightweight_probe();
296            assert_eq!(failures_counter,                         0,    "counter of unsuccessful '{event_name}' events is wrong  for pipeline #{i} -- since it is a non-fallible event, failures should always be 0");
297            assert_eq!(failures_avg_futures_resolution_duration, 0.0,  "avg futures resolution time of unsuccessful '{event_name}' events is wrong  for pipeline #{i} -- since it is a non-fallible event,, avg times should be always 0.0");
298            let (timeouts_counter, timeouts_avg_futures_resolution_duration) = executor_info.executor_stats.timed_out_events_avg_future_duration().lightweight_probe();
299            assert_eq!(timeouts_counter,                         0,    "counter of timed out '{event_name}' events is wrong  for pipeline #{i} -- since it is a non-future event, timeouts should always be 0");
300            assert_eq!(timeouts_avg_futures_resolution_duration, 0.0,  "avg futures resolution time of timed out '{event_name}' events is wrong  for pipeline #{i} -- since it is a non-future event,, avg timeouts should be always 0.0");
301        }
302
303        // asserts spawn_executor() register statistics appropriately:
304        // with counters & with average futures resolution time measurements
305        let event_name = "future & fallible event";
306        let multi: Multi<String, MultiChannelType<String, 256, N_PIPELINES>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new(event_name);
307        for i in 0..N_PIPELINES {
308            multi.spawn_executor(1, Duration::from_millis(150), format!("Pipeline #{} for {}", i, event_name),
309                                 |stream| {
310                                         stream.map(|payload| async move {
311                                             if payload.contains("unsuccessful") {
312                                                 tokio::time::sleep(Duration::from_millis(50)).await;
313                                                 Err(Box::from(String::from("failing the pipeline, as requested")))
314                                             } else if payload.contains("timeout") {
315                                                 tokio::time::sleep(Duration::from_millis(200)).await;
316                                                 Ok(Arc::new(String::from("this answer will never make it -- stream executor times out after 100ms")))
317                                             } else {
318                                                 tokio::time::sleep(Duration::from_millis(100)).await;
319                                                 Ok(payload)
320                                             }
321                                         })
322                                     },
323                                 |_| async {},
324                                 |_| async {}
325            ).await?;
326        }
327        let producer = |item: &str| multi.send(item.to_string()).expect_ok("couldn't send");
328        // for this test, produce each event twice
329        for _ in 0..2 {
330            producer("'successful' payload");
331            producer("'unsuccessful' payload");
332            producer("'timeout' payload");
333        }
334        multi.close(Duration::from_secs(5)).await;
335        assert_eq!(N_PIPELINES, multi.executor_infos.read().await.len(), "Number of created pipelines doesn't match");
336        for (i, executor_info) in multi.executor_infos.read().await.values().enumerate() {
337            let (ok_counter, ok_avg_futures_resolution_duration) = executor_info.executor_stats.ok_events_avg_future_duration().lightweight_probe();
338            assert_eq!(ok_counter,                                             2,    "counter of successful '{event_name}' events is wrong for pipeline #{i}");
339            assert!((ok_avg_futures_resolution_duration-0.100).abs()        < 15e-2, "avg futures resolution time of successful '{event_name}' events is wrong for pipeline #{i} -- it should be 0.1s, but was {ok_avg_futures_resolution_duration}s");
340            let (failures_counter, failures_avg_futures_resolution_duration) = executor_info.executor_stats.failed_events_avg_future_duration().lightweight_probe();
341            assert_eq!(failures_counter,                                      2,    "counter of unsuccessful '{event_name}' events is wrong for pipeline #{i}");
342            assert!((failures_avg_futures_resolution_duration-0.050).abs() < 15e-2, "avg futures resolution time of unsuccessful '{event_name}' events is wrong for pipeline #{i} -- it should be 0.05s, but was {failures_avg_futures_resolution_duration}s");
343            let (timeouts_counter, timeouts_avg_futures_resolution_duration) = executor_info.executor_stats.timed_out_events_avg_future_duration().lightweight_probe();
344            assert_eq!(timeouts_counter,                                      2,    "counter of timed out '{event_name}' events is wrong for pipeline #{i}");
345            assert!((timeouts_avg_futures_resolution_duration-0.150).abs() < 15e-2, "avg futures resolution time of timed out '{event_name}' events is wrong for pipeline #{i} -- it should be 0.150s, but was {timeouts_avg_futures_resolution_duration}s");
346        }
347        Ok(())
348    }
349
350
351    /// shows how to fuse multiple `multi`s, triggering payloads for another `multi` when certain conditions are met:
352    /// events TWO and FOUR will set a shared state between them, firing SIX.
353    /// NOTE: every 'on_X_event()' function will be called twice, since we're setting 2 pipelines for each `multi`
354    #[cfg_attr(not(doc),tokio::test)]
355    async fn demux() -> Result<(), Box<dyn std::error::Error>> {
356        let shared_state = Arc::new(AtomicU32::new(0));
357        let two_fire_count = Arc::new(AtomicU32::new(0));
358        let four_fire_count = Arc::new(AtomicU32::new(0));
359        let six_fire_count = Arc::new(AtomicU32::new(0));
360
361        // SIX event
362        let on_six_event = |stream: MutinyStream<'static, bool, _, Arc<bool>>| {
363            let six_fire_count = Arc::clone(&six_fire_count);
364            stream.inspect(move |_| {
365                six_fire_count.fetch_add(1, Relaxed);
366            })
367        };
368        let six_multi = Multi::<bool, MultiChannelType<bool, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<bool>>::new("SIX");
369        six_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #1", on_six_event, |_| async {}).await?;
370        six_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #2", on_six_event, |_| async {}).await?;
371        let six_multi = Arc::new(six_multi);
372        // assures we'll close SIX only once
373        let can_six_be_closed = Arc::new(AtomicBool::new(true));
374        let six_multi_ref = Arc::clone(&six_multi);
375        let six_closer = Arc::new(move || {
376            let can_six_be_closed = Arc::clone(&can_six_be_closed);
377            let six_multi = Arc::clone(&six_multi_ref);
378            async move {
379                if can_six_be_closed.swap(false, Relaxed) {
380                    six_multi.close(Duration::ZERO).await;
381                }
382            }
383        });
384
385        // TWO event
386        let on_two_event = |stream: MutinyStream<'static, u32, _, Arc<u32>>| {
387            let two_fire_count = Arc::clone(&two_fire_count);
388            let shared_state = Arc::clone(&shared_state);
389            let six_multi = Arc::clone(&six_multi);
390            stream
391                .map(move |payload| {
392                    let two_fire_count = Arc::clone(&two_fire_count);
393                    let shared_state = Arc::clone(&shared_state);
394                    let six_multi = Arc::clone(&six_multi);
395                    async move {
396                        two_fire_count.fetch_add(1, Relaxed);
397                        if *payload & 2 == 2 {
398                            let previous_state = shared_state.fetch_or(2, Relaxed);
399                            if previous_state & 6 == 6 {
400                                shared_state.store(0, Relaxed); // reset the triggering state
401                                six_multi.send_with(|slot| *slot = true).expect_ok("couldn't send");
402                            }
403                        } else if *payload == 97 {
404                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
405                        }
406                        payload
407                    }
408            })
409            .buffer_unordered(1)
410        };
411        let six_closer_for_two = Arc::clone(&six_closer);
412        let on_two_close_builder = || {
413            let six_closer_for_two = Arc::clone(&six_closer_for_two);
414            move |_| {
415                let six_closer_for_two = Arc::clone(&six_closer_for_two);
416                async move {
417                    six_closer_for_two().await;
418                }
419            }
420        };
421        let two_multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("TWO");
422        two_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #1", on_two_event, on_two_close_builder()).await?;
423        two_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #2", on_two_event, on_two_close_builder()).await?;
424        let two_multi = Arc::new(two_multi);
425        let two_producer = |item| two_multi.send_with(move |slot| *slot = item).expect_ok("couldn't send `two`");
426
427        // FOUR event
428        let on_four_event = |stream: MutinyStream<'static, u32, _, Arc<u32>>| {
429            let four_fire_count = Arc::clone(&four_fire_count);
430            let shared_state = Arc::clone(&shared_state);
431            let six_multi = Arc::clone(&six_multi);
432            stream
433                .map(move |payload| {
434                    let four_fire_count = Arc::clone(&four_fire_count);
435                    let shared_state = Arc::clone(&shared_state);
436                    let six_multi = Arc::clone(&six_multi);
437                    async move {
438                        four_fire_count.fetch_add(1, Relaxed);
439                        if *payload & 4 == 4 {
440                            let previous_state = shared_state.fetch_or(4, Relaxed);
441                            if previous_state & 6 == 6 {
442                                shared_state.store(0, Relaxed); // reset the triggering state
443                                six_multi.send_with(|slot| *slot = true).expect_ok("couldn't send `six`");
444                            }
445                        } else if *payload == 97 {
446                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
447                        }
448                        payload
449                    }
450                })
451                .buffer_unordered(1)
452        };
453        let six_closer_for_four = Arc::clone(&six_closer);
454        let on_four_close_builder = || {
455            let six_closer_for_four = Arc::clone(&six_closer_for_four);
456            move |_| {
457                let six_closer_for_four = Arc::clone(&six_closer_for_four);
458                async move {
459                    six_closer_for_four().await;
460                }
461            }
462        };
463        let four_multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("FOUR");
464        four_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #1", on_four_event, on_four_close_builder()).await?;
465        four_multi.spawn_non_futures_non_fallible_executor(1, "Pipeline #2", on_four_event, on_four_close_builder()).await?;
466        let four_multi = Arc::new(four_multi);
467        let four_producer = |item| four_multi.send_with(move |slot| *slot = item).expect_ok("couldn't send `four`");
468
469        // NOTE: the special value of 97 causes a sleep on both TWO and FOUR pipelines
470        //       so we can test race conditions for the 'close producer' functions
471        two_producer(1);
472        two_producer(2);
473        four_producer(97);    // sleeps, forcing any bugs regarding racing conditions to blow up
474        four_producer(1);
475        four_producer(2);
476        four_producer(3);
477        four_producer(4);
478        two_producer(3);
479        two_producer(4);
480        four_producer(5);
481        // closing TWO (and, therefore, SIX) before all elements of FOUR are processed would cause the later consumer to try to publish to SIX (when it is already closed) --
482        // this is why both events should be closed atomically in this case -- both share the closeable resource SIX -- which happens to be another multi, but could be any other resource
483        multis_close_async!(Duration::ZERO, two_multi, four_multi);  // notice SIX is closed here as well -- when either TWO or FOUR are closed
484
485        assert_eq!(two_fire_count.load(Relaxed),  4 * 2, "Wrong number of events processed for TWO");
486        assert_eq!(four_fire_count.load(Relaxed), 6 * 2, "Wrong number of events processed for FOUR");
487        assert_eq!(six_fire_count.load(Relaxed),  1 * 2, "Wrong number of events processed for SIX");
488        Ok(())
489    }
490
491    /// shows how to handle errors when they happen anywhere down the pipeline
492    /// -- and what happens when they are not handled.
493    /// + tests meaningful messages are produced
494    #[cfg_attr(not(doc),tokio::test)]
495    async fn error_handling() -> Result<(), Box<dyn std::error::Error>> {
496
497        let on_err_count = Arc::new(AtomicU32::new(0));
498
499        fn on_fail_when_odd_event(stream: impl Stream<Item=Arc<u32>>) -> impl Stream<Item = impl Future<Output = Result<u32, Box<dyn std::error::Error + Send + Sync>> > + Send> {
500            stream
501                .map(|payload| async move {
502                    if *payload % 2 == 0 {
503                        Ok(*payload)
504                    } else if *payload % 79 == 0 {
505                        Err(format!("BLOW CODE received: {}", payload))
506                    } else {
507                        Err(format!("ODD payload received: {}", payload))
508                    }
509                })
510                // treat known errors
511                .filter_map(|payload| async {
512                    let payload = payload.await;
513                    match &payload {
514                        Ok(ok_payload ) => {
515                            println!("Payload {} ACCURATELY PROCESSED!", ok_payload);
516                            Some(payload)
517                        },
518                        Err(ref err) => {
519                            if err.contains("ODD") {
520                                println!("Payload {} ERROR LOG -- this error is tolerable and this event will be skipped for the rest of the pipeline", err);
521                                None
522                            } else {
523                                // other errors are "unknown" -- therefore, not tolerable nor treated nor recovered from... and will explode down the pipeline, causing the stream to close
524                                Some(payload)
525                            }
526                        }
527                        //unknown_error => Some(unknown_error),
528                    }
529                })
530                .map(|payload| async {
531                    let payload = payload?;
532                    // if this is executed, the payload had no errors OR the error was handled and the failed event was filtered out
533                    println!("Payload {} continued down the pipe ", payload);
534                    Ok(payload)
535                })
536        }
537        let on_err_count_clone_1 = Arc::clone(&on_err_count);
538        let on_err_count_clone_2 = Arc::clone(&on_err_count);
539        let multi: Multi<u32, MultiChannelType<u32, 1024, 2>, {Instruments::LogsWithMetrics.into()}, Arc<u32>> = Multi::new("Event with error handling");
540        multi.spawn_executor(1,
541                             Duration::from_millis(100),
542                             "Pipeline #1",
543                             on_fail_when_odd_event,
544                             move |err| {
545                                 let on_err_count_clone = Arc::clone(&on_err_count_clone_1);
546                                 async move {
547                                     on_err_count_clone.fetch_add(1, Relaxed);
548                                     println!("Pipeline #1: ERROR CALLBACK WAS CALLED: '{:?}'", err);
549                                 }
550                             },
551                             |_| async {}
552            ).await?;
553        multi.spawn_executor(1,
554                             Duration::from_millis(100),
555                             "Pipeline #2",
556                             on_fail_when_odd_event,
557                             move |err| {
558                                 let on_err_count_clone = Arc::clone(&on_err_count_clone_2);
559                                 async move {
560                                     on_err_count_clone.fetch_add(1, Relaxed);
561                                     println!("Pipeline #2: ERROR CALLBACK WAS CALLED: '{:?}'", err);
562                                 }
563                             },
564                             |_| async {}
565            ).await?;
566        let producer = |item| multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
567        producer(0);
568        producer(1);
569        producer(2);
570        producer(79);
571        producer(80);
572        multi.close(Duration::ZERO).await;
573
574        assert_eq!(on_err_count.load(Relaxed), 1 * 2, "'on_err()' callback contract broken: events with handled errors should not call on_err(), the ones not 'caught', should -- twice, since we have 2 pipelines");
575        Ok(())
576    }
577
578    /// verifies that our executors (each one on their own Tokio task) don't blow the latencies up -- when we have many of them in the waiting state.
579    /// Two [multi]s are set: SIMPLE (with a single pipeline) and BLOATED (with several thousands)
580    /// 1) Time is measured to produce & consume a SIMPLE event
581    /// 2) BLOATED is created and a single payload is produced to get all of them activated -- a check is done that all got processed
582    /// 3) (1) is repeated -- the production-to-consumption time should be (nearly?) unaffected
583    #[cfg_attr(not(doc),tokio::test)]
584    async fn undegradable_latencies() -> Result<(), Box<dyn std::error::Error>> {
585        const BLOATED_PIPELINES_COUNT: usize = 256;
586
587        let simple_count = Arc::new(AtomicU32::new(0));
588        let simple_last_elapsed_nanos = Arc::new(AtomicU64::new(0));
589        let bloated_count = Arc::new(AtomicU32::new(0));
590        let bloated_last_elapsed_nanos = Arc::new(AtomicU64::new(0));
591
592        let simple_multi: Multi<SystemTime, MultiChannelType<SystemTime, 1024, 1>, {Instruments::LogsWithMetrics.into()}, Arc<SystemTime>> = Multi::new("SIMPLE");
593        simple_multi.spawn_non_futures_non_fallible_executor(1, "solo pipeline",
594                                                             |stream| {
595                                                                 let simple_count = Arc::clone(&simple_count);
596                                                                 let simple_last_elapsed_nanos = Arc::clone(&simple_last_elapsed_nanos);
597                                                                 stream.map(move |start: Arc<SystemTime>| {
598                                                                     simple_last_elapsed_nanos.store(start.elapsed().unwrap().as_nanos() as u64, Relaxed);
599                                                                     simple_count.fetch_add(1, Relaxed)
600                                                                 })
601                                                             },
602                                                             |_| async {}).await?;
603        let simple_producer = |item| simple_multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
604
605        // 1) Measure the time to produce & consume a SIMPLE event -- No other multi tokio tasks are available
606        tokio::time::sleep(Duration::from_millis(10)).await;
607        simple_producer(SystemTime::now());
608        // waits for the event to be processed
609        while simple_count.load(Relaxed) != 1 {
610            tokio::task::yield_now().await;
611        }
612        let first_simple_duration = Duration::from_nanos(simple_last_elapsed_nanos.load(Relaxed));
613        println!("1. Time to produce & consume a SIMPLE event (no other Multi Tokio tasks): {:?}", first_simple_duration);
614
615        let bloated_multi: Multi<SystemTime, MultiChannelType<SystemTime, 16, BLOATED_PIPELINES_COUNT>, {Instruments::LogsWithMetrics.into()}, Arc<SystemTime>> = Multi::new("BLOATED");
616        for i in 0..BLOATED_PIPELINES_COUNT {
617            bloated_multi.spawn_non_futures_non_fallible_executor(1, format!("#{i})"),
618                                                                  |stream| {
619                                                                          let bloated_count = Arc::clone(&bloated_count);
620                                                                          let bloated_last_elapsed_nanos = Arc::clone(&bloated_last_elapsed_nanos);
621                                                                          stream.map(move |start| {
622                                                                              bloated_last_elapsed_nanos.store(start.elapsed().unwrap().as_nanos() as u64, Relaxed);
623                                                                              bloated_count.fetch_add(1, Relaxed)
624                                                                          })
625                                                                      },
626                                                                  |_| async {}).await?;
627        }
628        let bloated_producer = |item| bloated_multi.send_with(move |slot| *slot = item).expect_ok("couldn't send");
629
630        // 2) Bloat the Tokio Runtime with a lot of tasks -- multi executors in our case -- verifying all of them will run once
631        tokio::time::sleep(Duration::from_millis(10)).await;
632        bloated_producer(SystemTime::now());
633        // waits for the event to be processed
634        while bloated_count.load(Relaxed) != BLOATED_PIPELINES_COUNT as u32 {
635            tokio::task::yield_now().await;
636        }
637        let bloated_duration = Duration::from_nanos(bloated_last_elapsed_nanos.load(Relaxed));
638        println!("2. Tokio Runtime is now BLOATED with {BLOATED_PIPELINES_COUNT} tasks -- all of them are multi executors. Time to produce + time for all pipelines to consume it: {:?}", bloated_duration);
639
640        // 3) Measure (1) again. All BLOATED tasks are sleeping... so there should be no latency
641        tokio::time::sleep(Duration::from_millis(10)).await;    // give a little time for all Streams to settle
642        simple_producer(SystemTime::now());
643        // waits for the event to be processed
644        while simple_count.load(Relaxed) != 2 {
645            tokio::task::yield_now().await;
646        }
647        let second_simple_duration = Duration::from_nanos(simple_last_elapsed_nanos.load(Relaxed));
648        println!("3. Time to produce & consume another SIMPLE event (with lots of -- {BLOATED_PIPELINES_COUNT} -- sleeping Multi Tokio tasks): {:?}", second_simple_duration);
649
650        const TOLERANCE_MICROS: u128 = 10;
651        assert!(second_simple_duration < first_simple_duration || second_simple_duration.as_micros() - first_simple_duration.as_micros() < TOLERANCE_MICROS,
652                "Tokio tasks' latency must be unaffected by whatever number of sleeping tasks there are (tasks executing our multi stream pipelines) -- task execution latencies exceeded the {TOLERANCE_MICROS}µs tolerance: with 0 sleeping: {:?}; with {BLOATED_PIPELINES_COUNT} sleeping: {:?}",
653                first_simple_duration,
654                second_simple_duration);
655
656        simple_multi.close(Duration::ZERO).await;
657        bloated_multi.close(Duration::ZERO).await;
658        Ok(())
659    }
660
661    /// assures we're able to chain multiple multis while reusing the `Arc<T>` without any overhead
662    #[cfg_attr(not(doc),tokio::test)]
663    async fn chained_multis() -> Result<(), Box<dyn std::error::Error>> {
664        let expected_msgs = vec![
665            "Hello, beautiful world!",
666            "Oh me, oh my, this will never do... Goodbye, cruel world!"
667        ];
668        let first_multi_msgs = Arc::new(std::sync::Mutex::new(vec![]));
669        let first_multi_msgs_ref = Arc::clone(&first_multi_msgs);
670        let second_multi_msgs = Arc::new(std::sync::Mutex::new(vec![]));
671        let second_multi_msgs_ref = Arc::clone(&second_multi_msgs);
672
673        let second_multi: Multi<String, MultiChannelType<String, 1024, 4>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new("second chained multi, receiving the Arc-wrapped event -- with no copying (and no additional Arc cloning)");
674        second_multi.spawn_non_futures_non_fallible_executor(1, "second executor", move |stream| {
675                stream.map(move |message| {
676                    println!("`second_multi` received '{:?}'", message);
677                    second_multi_msgs_ref
678                        .lock().unwrap()
679                        .push(message);
680                })
681            }, |_| async {}).await?;
682        let second_multi = Arc::new(second_multi);
683        let second_multi_ref = Arc::clone(&second_multi);
684        let first_multi: Multi<String, MultiChannelType<String, 1024, 4>, {Instruments::LogsWithMetrics.into()}, Arc<String>> = Multi::new("first chained multi, receiving the original events");
685        first_multi.spawn_non_futures_non_fallible_executor(1, "first executor", move |stream| {
686                stream.map(move |message: Arc<String>| {
687                    println!("`first_multi` received '{:?}'", message);
688                    assert!(second_multi_ref.send_derived(&message), "couldn't send derived");
689                    first_multi_msgs_ref
690                        .lock().unwrap()
691                        .push(message);
692                })
693            }, |_| async {}).await?;
694        let producer = |item: &str| first_multi.send(item.to_string()).expect_ok("couldn't send");
695        expected_msgs.iter().for_each(|&msg| { producer(msg); });
696        multis_close_async!(Duration::ZERO, first_multi, second_multi);
697        let expected_msgs: Vec<Arc<String>> = expected_msgs.into_iter()
698            .map(|msg| Arc::new(msg.to_string()))
699            .collect();
700        assert_eq!(*first_multi_msgs.lock().unwrap(), expected_msgs, "First multi didn't receive the expected messages");
701        assert_eq!(*second_multi_msgs.lock().unwrap(), expected_msgs, "Second multi didn't receive the expected messages");
702        Ok(())
703
704    }
705
706        /// assures performance won't be degraded when we make changes
707    #[cfg_attr(not(doc),tokio::test(flavor="multi_thread", worker_threads=2))]
708    #[ignore]   // must run in a single thread for accurate measurements
709    async fn performance_measurements() -> Result<(), Box<dyn std::error::Error>> {
710
711        #[cfg(not(debug_assertions))]
712        const FACTOR: u32 = 4096;
713        #[cfg(debug_assertions)]
714        const FACTOR: u32 = 32;
715
716        /// measure how long it takes to stream a certain number of elements through the given `multi`
717        async fn profile_multi<MultiChannelType:  FullDuplexMultiChannel<ItemType=u32, DerivedItemType=Arc<u32>> + Sync + Send + 'static,
718                               const INSTRUMENTS: usize>
719                              (multi:          &Multi<u32, MultiChannelType, INSTRUMENTS, Arc<u32>>,
720                               profiling_name: &str,
721                               count:          u32) {
722            print!("{profiling_name} "); std::io::stdout().flush().unwrap();
723            let start = Instant::now();
724            let mut e = 0;
725            while e < count {
726                let buffer_entries_left = multi.buffer_size() - multi.pending_items_count();
727                for _ in 0..buffer_entries_left {
728                    multi.send_with(|slot| *slot = e)
729                        .retry_with(|setter| multi.send_with(setter))
730                        .spinning_forever();
731                    e += 1;
732                }
733                std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
734                std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
735                std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
736                std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop(); std::hint::spin_loop();
737            }
738            multi.close(Duration::from_secs(5)).await;
739            let elapsed = start.elapsed();
740            println!("{:10.2}/s -- {} items processed in {:?}",
741                     count as f64 / elapsed.as_secs_f64(),
742                     count,
743                     elapsed);
744        }
745
746        println!();
747
748        type MultiChannelType = channels::arc::crossbeam::Crossbeam<'static, u32, 8192, 1>;
749        type DerivedType      = Arc<u32>;
750
751        let profiling_name = "metricfull_non_futures_non_fallible_multi:    ";
752        let multi: Multi<u32, MultiChannelType, {Instruments::MetricsWithoutLogs.into()}, DerivedType> = Multi::new(profiling_name.trim());
753        multi.spawn_non_futures_non_fallible_executor(1, "", |stream| stream, |_| async {}).await?;
754        profile_multi(&multi, profiling_name, 1024*FACTOR).await;
755
756        let profiling_name = "metricless_non_futures_non_fallible_multi:    ";
757        let multi: Multi<u32, MultiChannelType, {Instruments::NoInstruments.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
758        multi.spawn_non_futures_non_fallible_executor(1, "", |stream| stream, |_| async {}).await?;
759        profile_multi(&multi, profiling_name, 1024*FACTOR).await;
760
761        let profiling_name = "par_metricless_non_futures_non_fallible_multi:";
762        let multi: Multi<u32, MultiChannelType, {Instruments::NoInstruments.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
763        multi.spawn_non_futures_non_fallible_executor(12, "", |stream| stream, |_| async {}).await?;
764        profile_multi(&multi, profiling_name, 1024*FACTOR).await;
765
766        let profiling_name = "metricfull_futures_fallible_multi:            ";
767        let multi: Multi<u32, MultiChannelType, {Instruments::MetricsWithoutLogs.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
768        multi.spawn_executor(1, Duration::ZERO, "",
769                             |stream| {
770                                 stream.map(|number| async move {
771                                     Ok(number)
772                                 })
773                             },
774                             |_err| async {},
775                             |_| async {}).await?;
776        profile_multi(&multi, profiling_name, 1024*FACTOR).await;
777
778        let profiling_name = "metricless_futures_fallible_multi:            ";
779        let multi: Multi<u32, MultiChannelType, {Instruments::NoInstruments.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
780        multi.spawn_executor(1, Duration::ZERO, "",
781                             |stream| {
782                                 stream.map(|number| async move {
783                                     Ok(number)
784                                 })
785                             },
786                             |_err| async {},
787                             |_| async {}).await?;
788        profile_multi(&multi, profiling_name, 1024*FACTOR).await;
789
790        let profiling_name = "timeoutable_metricfull_futures_fallible_multi:";
791        let multi: Multi<u32, MultiChannelType, {Instruments::MetricsWithoutLogs.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
792        multi.spawn_executor(1, Duration::from_millis(100), "",
793                             |stream| {
794                                  stream.map(|number| async move {
795                                      Ok(number)
796                                  })
797                              },
798                             |_err| async {},
799                             |_| async {}).await?;
800        profile_multi(&multi, profiling_name, 768*FACTOR).await;
801
802        let profiling_name = "timeoutable_metricless_futures_fallible_multi:";
803        let multi: Multi<u32, MultiChannelType, {Instruments::NoInstruments.into()}, Arc<u32>> = Multi::new(profiling_name.trim());
804        multi.spawn_executor(1, Duration::from_millis(100), "",
805                             |stream| {
806                                 stream.map(|number| async move {
807                                     Ok(number)
808                                 })
809                             },
810                             |_err| async {},
811                             |_| async {}).await?;
812        profile_multi(&multi, profiling_name, 768*FACTOR).await;
813
814        Ok(())
815        /*
816
817        As of Sept, 22th (after using multi-threaded tokio tests):
818
819        RUSTFLAGS="-C target-cpu=native" cargo test --release performance_measurements -- --test-threads 1 --nocapture
820
821        test mutiny::multi::tests::performance_measurements ...
822        metricfull_non_futures_non_fallible_multi:     1062998.22/s -- 1048576 items processed in 986.432507ms
823        metricless_non_futures_non_fallible_multi:     1122625.49/s -- 1048576 items processed in 934.039005ms
824        par_metricless_non_futures_non_fallible_multi: 1018904.36/s -- 1048576 items processed in 1.029121125s
825        metricfull_futures_fallible_multi:              918609.20/s -- 1048576 items processed in 1.141482139s
826        metricless_futures_fallible_multi:              934254.75/s -- 1048576 items processed in 1.122366245s
827        timeoutable_metricfull_futures_fallible_multi:  739489.91/s -- 786432 items processed in 1.063479014s
828        timeoutable_metricless_futures_fallible_multi:  786373.07/s -- 786432 items processed in 1.000074935s
829
830        */
831    }
832
833}