1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
//! Contains the logic for executing [Stream] pipelines on their own Tokio tasks with zero-cost [Instruments] options for gathering stats,
//! logging and so on.\
//! Four executors are provided to attend to different Stream output item types:
//!   1. Items that are non-futures & non-fallible. For instance, `Stream::Item = String`
//!   2. Items that are futures, but are non-fallible: `Stream::Item = Future<Output=DataType>` -- DataType may be, for instance, String
//!   3. Items that are non-futures, but are fallible: `Stream::Item = Result<DataType, Box<dyn std::error::Error>>`
//!   4. Items that are fallible futures: `Stream::Item = Future<Output=Result<DataType, Box<dyn std::error::Error>>>` -- allowing futures to time out
//!
//! Apart from handling the specifig return types, logging & filling in the available metrics, all executors does the same things:
//!   1. Register start & finish metrics
//!   2. Log start (info), items (trace) and finish (warn)
//!
//! Specific executors does additional work:
//!   1. Streams that returns a `Result` computes how many errors occurred and calls the provided `on_err_callback()`, as well as
//!      logging errors that made it all through the pipeline and showed up on the executor (untreated errors)
//!   2. Streams that returns a `Future` will, optionally, compute the time for each future to complete and, also optionally,
//!      register a time out for each future to be completed


use super::{
    instruments::Instruments,
    incremental_averages::AtomicIncrementalAverage64,
};
use std::{
    sync::{
        Arc,
        atomic::{
            AtomicU64,
            Ordering::{Relaxed},
        },
    },
    future::Future,
    fmt::Debug,
    time::{Duration},
    error::Error,
    future,
};
use atomic_enum::atomic_enum;
use futures::{
    stream::{Stream,StreamExt}
};
use tokio::{
    time::{
        timeout,
    },
};
use log::{trace,info,warn,error};
// using this instead of Tokio's Instant -- or even std's Instant -- saves a system call (and a context switch), avoiding a huge performance prejudice
use minstant::Instant;


/// See [Instruments]
#[derive(Debug)]
pub struct StreamExecutor<const INSTRUMENTS: usize> {
    executor_name:                 String,
    futures_timeout:               Duration,
    creation_time:                 Instant,
    executor_status:               AtomicExecutorStatus,
    execution_start_delta_nanos:   AtomicU64,
    execution_finish_delta_nanos:  AtomicU64,

    // data to the fields bellow will depend on computations being enabled when instantiating this struct

    /// computes counter of successful events & average times (seconds) for the future resolution
    pub ok_events_avg_future_duration: AtomicIncrementalAverage64,

    /// computes counter of timed out events & average times (seconds) for the failed future resolution (even if it should be ~ constant)
    /// -- events computed here are NOT computed at [failed_events_avg_future_duration]
    pub timed_out_events_avg_future_duration: AtomicIncrementalAverage64,

    /// computes counter of all *other* failed events & average times (seconds) for the failed future resolution
    /// (timed out events, computed in [timed_out_events_avg_future_duration], are NOT computed here)
    pub failed_events_avg_future_duration: AtomicIncrementalAverage64,

    // currently, we're only able to measure how long it took to execute the future returned by the stream.
    // to automatically measure the time it took for the pipeline to process the event (time taken to build & return the future),
    // a new type has to be introduced -- which would wrap around the payload before passing to the pipeline... and it should
    // be available at the end, even if the actual "return" type changes along the way...
    // this may be done transparently if I have my own Stream implementation doing this... but it seems too much...
    // if needed, applications should do it by their own for now.
}


/// registers metrics & logs (if opted in) that an executor with the given capabilities has started.\
///   - `self`: this executor's instance. Example: `cloned_self`
///   - `future`: true if items yielded by this stream are of type `Future<Output=ItemType>`
///   - `fallible`: true if items yielded by this stream are of type `Result<DataType, ErrType>`
///   - `futures_timeout`: if different than `Duration::ZERO` means we enforce a timeout for every item (`Future<Outut=ItemType>`) when resolving them
macro_rules! on_executor_start {
    ($self: expr, $future: expr, $fallible: expr, $futures_timeout: expr) => {
        $self.register_execution_start();
        if Instruments::from(INSTRUMENTS).logging() {
            info!("✓✓✓✓ Stream Executor '{}' started: {}Futures{} / {}Fallible Items & {}Metrics",
                  $self.executor_name,
                  if $future {""} else {"Non-"},
                  if $future {
                      if $futures_timeout != Duration::ZERO {format!(" (with timeouts of {:?})", $futures_timeout)} else {" (NO timeouts)".to_string()}
                  } else {
                      "".to_string()
                  },
                  if $fallible {""} else {"Non-"},
                  if !Instruments::from(INSTRUMENTS).metrics() {"NO "} else {""});
        }
    }
}

/// logs & registers metrics (if opted in) for an `Ok` item yielded by this stream for which we don't have timings for -- most likely a non-future or future with metrics disabled\
///   - `self`: this executor's instance. Example: `cloned_self`
///   - `item`: the just yielded `Ok` item
macro_rules! on_non_timed_ok_item {
    ($self: expr, $item: expr) => {
        {
            if Instruments::from(INSTRUMENTS).cheap_profiling() {
                $self.ok_events_avg_future_duration.inc(-1.0);  // since there is no time measurement (item is non-future or METRICS=false), the convention is to use -1.0
            }
            if Instruments::from(INSTRUMENTS).tracing() {
                trace!("✓✓✓✓ Executor '{}' yielded '{:?}'", $self.executor_name, $item);
            }
        }
    }
}

/// logs & registers metrics (if opted in) for an `Ok` item yielded by this stream for which we DO have timings for -- most certainly, a future
///   - `self`: this executor's instance. Example: `cloned_self`
///   - `item`: the just yielded `Ok` item
///   - `elapsed`: the `Duration` it took to resolve the this item's `Future`
macro_rules! on_timed_ok_item {
    ($self: expr, $item: expr, $elapsed: expr) => {
        {
            if Instruments::from(INSTRUMENTS).cheap_profiling() {
                $self.ok_events_avg_future_duration.inc($elapsed.as_secs_f32());
            } else {
                panic!("\nThis macro can only be used if at least one of the Instruments' PROFILING are enabled -- otherwise you should use `on_non_timed_ok_item!(...)` instead");
            }
            if Instruments::from(INSTRUMENTS).tracing() {
                trace!("✓✓✓✓ Executor '{}' yielded '{:?}' in {:?}", $self.executor_name, $item, $elapsed);
            }
        }
    }
}

/// logs & registers metrics (if opted in) for an `Err` item yielded by this stream for which we don't have timings for -- most likely a non-future or future with metrics disabled\
///   - `self`: this executor's instance. Example: `cloned_self`
///   - `err`: the just yielded `Err` item
macro_rules! on_non_timed_err_item {
    ($self: expr, $err: expr) => {
        {
            if Instruments::from(INSTRUMENTS).cheap_profiling() {
                $self.failed_events_avg_future_duration.inc(-1.0);  // since there is no time measurement (item is non-future or METRICS=false), the convention is to use -1.0
            }
            if Instruments::from(INSTRUMENTS).logging() {
                error!("✗✗✗✗ Executor '{}' yielded ERROR '{:?}'", $self.executor_name, $err);
            }
        }
    }
}

/// logs & registers metrics (if opted in) for an `Err` item yielded by this stream for which we DO have timings for -- most certainly, a future
///   - `self`: this executor's instance. Example: `cloned_self`
///   - `err`: the just yielded `Err` item
///   - `elapsed`: the `Duration` it took to resolve the this item's `Future`
macro_rules! on_timed_err_item {
    ($self: expr, $err: expr, $elapsed: expr) => {
        {
            if Instruments::from(INSTRUMENTS).cheap_profiling() {
                $self.failed_events_avg_future_duration.inc($elapsed.as_secs_f32());
            } else {
                panic!("This macro can only be used if at least one of the Instruments' PROFILING are enabled -- otherwise you should use `on_non_timed_err_item!(...)` instead");
            }
            if Instruments::from(INSTRUMENTS).logging() {
                error!("✗✗✗✗ Executor '{}' yielded ERROR '{:?}' in {:?}", $self.executor_name, $err, $elapsed);
            }
        }
    }
}

/// registers metrics & logs (if opted in) that the either the stream or the executor ended.\
///   - `self`: this executor's instance. Example: `cloned_self`
///   - `future`: true if items yielded by this stream were of type `Future<Output=ItemType>`
///   - `fallible`: true if items yielded by this stream were of type `Result<DataType, ErrType>`
///   - `futures_timeout`: if different than `Duration::ZERO`, means time out stats could have beem collected and may be logged
macro_rules! on_executor_end {
    ($self: expr, $future: expr, $fallible: expr, $futures_timeout: expr) => {
        $self.register_execution_finish();
        let stream_ended = $self.executor_status.load(Relaxed) == ExecutorStatus::StreamEnded;
        let execution_nanos = $self.execution_finish_delta_nanos.load(Relaxed) - $self.execution_start_delta_nanos.load(Relaxed);
        if Instruments::from(INSTRUMENTS).logging() && Instruments::from(INSTRUMENTS).cheap_profiling() {
            let (ok_counter, ok_avg_seconds) = $self.ok_events_avg_future_duration.probe();
            let (timed_out_counter, timed_out_avg_seconds) = $self.timed_out_events_avg_future_duration.probe();
            let (failed_counter, failed_avg_seconds) = $self.failed_events_avg_future_duration.probe();
            let execution_secs: f64 = Duration::from_nanos(execution_nanos).as_secs_f64() + f64::MIN_POSITIVE /* dirty way to avoid silly /0 divisions */;
            let ok_stats = if $future {
                               format!("ok: {} events; avg {:?} - {:.5}/sec", ok_counter, Duration::from_secs_f32(ok_avg_seconds), ok_counter as f64 / execution_secs)
                           } else {
                               format!("ok: {} events", ok_counter)
                           };
            let timed_out_stats = if $future && $futures_timeout != Duration::ZERO {
                                      format!(" | time out: {} events; avg {:?} - {:.5}/sec", timed_out_counter, Duration::from_secs_f32(timed_out_avg_seconds), timed_out_counter as f64 / execution_secs)
                                  } else {
                                      format!("")
                                  };
            let failed_stats = if $future && $fallible {
                                   format!(" | failed: {} events; avg {:?} - {:.5}/sec", failed_counter, Duration::from_secs_f32(failed_avg_seconds), failed_counter as f64 / execution_secs)
                               } else if $fallible {
                                   format!(" | failed: {} events", failed_counter)
                               } else {
                                   format!("")
                               };
            warn!("✓✓✓✓ {} '{}' ended after running for {:?} -- stats: | {}{}{}",
                  if stream_ended {"Stream"} else {"Executor"},
                  $self.executor_name,
                  Duration::from_nanos(execution_nanos),
                  ok_stats,
                  timed_out_stats,
                  failed_stats);
        } else if Instruments::from(INSTRUMENTS).logging() {
            warn!("✓✓✓✓ {} '{}' ended after running for {:?} -- metrics were disabled",
                  if stream_ended {"Stream"} else {"Executor"},
                  $self.executor_name,
                  Duration::from_nanos(execution_nanos));
        }
    }
}


impl<const INSTRUMENTS: usize> StreamExecutor<INSTRUMENTS> {

    /// initializes an executor that should not timeout any futures returned by the `Stream`
    pub fn new<IntoString: Into<String>>(executor_name: IntoString) -> Arc<Self> {
        Self::with_futures_timeout(executor_name, Duration::ZERO)
    }

    /// initializes an executor that should `timeout` futures returned by the `Stream`
    pub fn with_futures_timeout<IntoString: Into<String>>(executor_name: IntoString, futures_timeout: Duration) -> Arc<Self> {
        Arc::new(Self {
            executor_name: executor_name.into(),
            futures_timeout,
            creation_time:                        Instant::now(),
            executor_status:                      AtomicExecutorStatus::new(ExecutorStatus::NotStarted),
            execution_start_delta_nanos:          AtomicU64::new(u64::MAX),
            execution_finish_delta_nanos:         AtomicU64::new(u64::MAX),
            ok_events_avg_future_duration:        AtomicIncrementalAverage64::new(),
            failed_events_avg_future_duration:    AtomicIncrementalAverage64::new(),
            timed_out_events_avg_future_duration: AtomicIncrementalAverage64::new(),
        })
    }


    pub fn executor_name(self: &Arc<Self>) -> String {
        self.executor_name.clone()
    }

    fn register_execution_start(self: &Arc<Self>) {
        self.executor_status.store(ExecutorStatus::Running, Relaxed);
        self.execution_start_delta_nanos.store(self.creation_time.elapsed().as_nanos() as u64, Relaxed);
    }

    fn register_execution_finish(self: &Arc<Self>) {
        // update the finish status
        loop {
            if self.executor_status.compare_exchange(ExecutorStatus::Running,           ExecutorStatus::StreamEnded,           Relaxed, Relaxed).is_ok() ||
               self.executor_status.compare_exchange(ExecutorStatus::ScheduledToFinish, ExecutorStatus::ProgrammaticallyEnded, Relaxed, Relaxed).is_ok() {
                break
            }
        }
        self.execution_finish_delta_nanos.store(self.creation_time.elapsed().as_nanos() as u64, Relaxed);
    }

    /// tells this executor that its stream will be artificially ended, to cause it to cease its execution
    pub fn report_scheduled_to_finish(self: &Arc<Self>) {
        self.executor_status.store(ExecutorStatus::ScheduledToFinish, Relaxed);
    }

    /// spawns an optimized executor for a Stream of `ItemType`s which are:
    ///   * Futures  -- ItemType = Future<Output=InnerFallibleType>
    ///   * Fallible -- InnerFallibleType = Result<InnerType, Box<dyn std::error::Error>>.\
    /// NOTE: special (optimized) versions are spawned depending if we should or not enforce each item's `Future` a resolution timeout
    pub fn spawn_executor<OutItemType:        Send + Debug,
                          FutureItemType:     Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
                          CloseVoidAsyncType: Future<Output=()> + Send + 'static,
                          ErrVoidAsyncType:   Future<Output=()> + Send + 'static>
                         (self:                  Arc<Self>,
                          concurrency_limit:     u32,
                          on_err_callback:       impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType   + Send + Sync + 'static,
                          stream_ended_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static,
                          stream:                impl Stream<Item=FutureItemType> + 'static + Send) {
        let cloned_self = Arc::clone(&self);
        let on_err_callback = Arc::new(on_err_callback);

        match self.futures_timeout {

            // spawns an optimized executor that do not track `Future`s timeouts
            Duration::ZERO => {
                tokio::spawn(async move {
                    on_executor_start!(cloned_self, true, true, cloned_self.futures_timeout);
                    let mut start = Instant::now();     // item's Future resolution time -- declared here to allow optimizations when METRICS=false
                    let item_processor = |future_element| {
                        let cloned_self = Arc::clone(&cloned_self);
                        let on_err_callback = Arc::clone(&on_err_callback);
                        async move {
                            if Instruments::from(INSTRUMENTS).cheap_profiling() {
                                start = Instant::now();
                            }
                            match future_element.await {
                                Ok(yielded_item) => {
                                    if Instruments::from(INSTRUMENTS).cheap_profiling() {
                                        let elapsed = start.elapsed();
                                        on_timed_ok_item!(cloned_self, yielded_item, elapsed);
                                    } else {
                                        on_non_timed_ok_item!(cloned_self, yielded_item);
                                    }
                                },
                                Err(err) => {
                                    if Instruments::from(INSTRUMENTS).cheap_profiling() {
                                        let elapsed = start.elapsed();
                                        on_timed_err_item!(cloned_self, err, elapsed);
                                    } else {
                                        on_non_timed_err_item!(cloned_self, err);
                                    }
                                    on_err_callback(err).await;
                                },
                            }
                        }
                    };
                    match concurrency_limit {
                        1 => stream.for_each(item_processor).await,     // faster in `futures 0.3` -- may be useless in the future
                        _ => stream.for_each_concurrent(concurrency_limit as usize, item_processor).await,
                    }
                    on_executor_end!(cloned_self, true, true, Duration::ZERO);
                    stream_ended_callback(cloned_self).await;
                });
            },

            // spawns an optimized executor that tracks `Future`s timeouts
            _ => {
                tokio::spawn(async move {
                    on_executor_start!(cloned_self, true, true, cloned_self.futures_timeout);
                    let mut start = Instant::now();     // item's Future resolution time -- declared here to allow optimizations when METRICS=false
                    let item_processor = |future_element| {
                        let cloned_self = Arc::clone(&cloned_self);
                        let on_err_callback = Arc::clone(&on_err_callback);
                        async move {
                            if Instruments::from(INSTRUMENTS).cheap_profiling() {
                                start = Instant::now();
                            }
                            match timeout(cloned_self.futures_timeout, future_element).await {
                                Ok(non_timed_out_result) => match non_timed_out_result {
                                                                                          Ok(yielded_item) => {
                                                                                              if Instruments::from(INSTRUMENTS).cheap_profiling() {
                                                                                                  let elapsed = start.elapsed();
                                                                                                  on_timed_ok_item!(cloned_self, yielded_item, elapsed);
                                                                                              } else {
                                                                                                  on_non_timed_ok_item!(cloned_self, yielded_item);
                                                                                              }
                                                                                          },
                                                                                          Err(err) => {
                                                                                              if Instruments::from(INSTRUMENTS).cheap_profiling() {
                                                                                                  let elapsed = start.elapsed();
                                                                                                  on_timed_err_item!(cloned_self, err, elapsed);
                                                                                              } else {
                                                                                                  on_non_timed_err_item!(cloned_self, err);
                                                                                              }
                                                                                              on_err_callback(err).await;
                                                                                          },
                                                                                      },
                                Err(_time_out_err) => {
                                    if Instruments::from(INSTRUMENTS).cheap_profiling() {
                                        let elapsed = start.elapsed();
                                        cloned_self.timed_out_events_avg_future_duration.inc(elapsed.as_secs_f32());
                                        if Instruments::from(INSTRUMENTS).logging() {
                                            error!("🕝🕝🕝🕝 Executor '{}' TIMED OUT after {:?}", cloned_self.executor_name, elapsed);
                                        }
                                    } else if Instruments::from(INSTRUMENTS).logging() {
                                        error!("🕝🕝🕝🕝 Executor '{}' TIMED OUT", cloned_self.executor_name);
                                    }
                                }
                            }
                        }
                    };
                    match concurrency_limit {
                        1 => stream.for_each(item_processor).await,     // faster in `futures 0.3` -- may be useless in other versions
                        _ => stream.for_each_concurrent(concurrency_limit as usize, item_processor).await,
                    }
                    on_executor_end!(cloned_self, true, true, cloned_self.futures_timeout);
                    stream_ended_callback(cloned_self).await;
                });
            },

        }

    }

    /// spawns an executor for a Stream of `ItemType`s which are not Futures but are fallible:
    ///   * InnerFallibleType = Result<ItemType, Box<dyn std::error::Error>>
    pub fn spawn_non_futures_executor<ItemType:      Send + Debug,
                                      VoidAsyncType: Future<Output=()> + Send + 'static>
                                     (self:                      Arc<Self>,
                                      concurrency_limit:         u32,
                                      stream_ended_callback:     impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> VoidAsyncType + Send + Sync + 'static,
                                      stream:                    impl Stream<Item=Result<ItemType, Box<dyn std::error::Error + Send + Sync>>> + 'static + Send) {
        tokio::spawn(async move {
            on_executor_start!(self, false, true, Duration::ZERO);
            let item_processor = |fallible_element| {
                match fallible_element {
                    Ok(yielded_item) => on_non_timed_ok_item!(self, yielded_item),
                    Err(err) => on_non_timed_err_item!(self, err),
                }
            };
            match concurrency_limit {
                1 => stream.for_each(|fallible_item| future::ready(item_processor(fallible_item))).await,     // faster in `futures 0.3` -- may be useless in other versions
                _ => stream.for_each_concurrent(concurrency_limit as usize, |fallible_item| future::ready(item_processor(fallible_item))).await,
            }
            on_executor_end!(self, false, true, Duration::ZERO);
            stream_ended_callback(self).await;
        });
    }

    /// spawns an optimized executor for a Stream of `ItemType`s which are not Futures and, also, are not fallible
    pub fn spawn_non_futures_non_fallible_executor<OutItemType:   Send + Debug,
                                                   VoidAsyncType: Future<Output=()> + Send + 'static>
                                                  (self:                  Arc<Self>,
                                                   concurrency_limit:     u32,
                                                   stream_ended_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> VoidAsyncType + Send + Sync + 'static,
                                                   stream:                impl Stream<Item=OutItemType> + 'static + Send) {

        tokio::spawn(async move {
            on_executor_start!(self, false, false, Duration::ZERO);
            let item_processor = |yielded_item| on_non_timed_ok_item!(self, yielded_item);
            match concurrency_limit {
                1 => stream.for_each(|item| future::ready(item_processor(item))).await,     // faster in `futures 0.3` -- may be useless in the near future?
                _ => stream.for_each_concurrent(concurrency_limit as usize, |item| future::ready(item_processor(item))).await,
            }
            on_executor_end!(self, false, false, Duration::ZERO);
            stream_ended_callback(self).await;
        });
    }

}

/// Unit tests & enforces the requisites of the [stream_executor](self) module.\
/// Tests here mixes manual & automated assertions -- you should manually inspect the output of each one and check if the log outputs make sense
#[cfg(any(test,doc))]
mod tests {
    use super::*;
    use std::sync::atomic::AtomicU32;
    use futures::{
        stream::{self, StreamExt},
        channel::mpsc,
        SinkExt,
    };


    #[ctor::ctor]
    fn suite_setup() {
        simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
        info!("minstant: is TSC / RDTSC instruction available for time measurement? {}", minstant::is_tsc_available());
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_non_timeout_futures_fallible_executor_with_logs_and_metrics() {
        assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::LogsWithMetrics.into()}>::new("executor with logs & metrics")).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_non_timeout_futures_fallible_executor_with_metrics_and_no_logs() {
        assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::MetricsWithoutLogs.into()}>::new("executor with metrics & NO logs")).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_non_timeout_futures_fallible_executor_with_logs_and_no_metrics() {
        assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::LogsWithoutMetrics.into()}>::new("executor with logs & NO metrics")).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_non_timeout_futures_fallible_executor_with_no_logs_and_no_metrics() {
        assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::NoInstruments.into()}>::new("executor with NO logs & NO metrics")).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_timeout_futures_fallible_executor_with_logs_and_metrics() {
        assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::LogsWithMetrics.into()}>::with_futures_timeout("executor with logs & metrics", Duration::from_millis(100))).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_timeout_futures_fallible_executor_with_metrics_and_no_logs() {
        assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::MetricsWithoutLogs.into()}>::with_futures_timeout("executor with metrics & NO logs", Duration::from_millis(100))).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_timeout_futures_fallible_executor_with_logs_and_no_metrics() {
        assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::LogsWithoutMetrics.into()}>::with_futures_timeout("executor with logs & NO metrics", Duration::from_millis(100))).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_timeout_futures_fallible_executor_with_no_logs_and_no_metrics() {
        assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::NoInstruments.into()}>::with_futures_timeout("executor with NO logs & NO metrics", Duration::from_millis(100))).await;
    }

    /// executes assertions on the given `executor` by spawning the executor and adding futures / fallible elements to it.\
    async fn assert_spawn_futures_fallible_executor<const INSTRUMENTS: usize>
                                                    (executor: Arc<StreamExecutor<INSTRUMENTS>>) {

        async fn to_future(item: Result<u32, Box<dyn Error + Send + Sync>>) -> Result<u32, Box<dyn Error + Send + Sync>> {
            // OK items > 100 will sleep for a while -- intended to cause a timeout, provided the given executor is appropriately configured
            if item.is_ok() && item.as_ref().unwrap() > &100 {
                tokio::time::sleep(Duration::from_millis(150)).await;
            }
            item
        }

        let (tx, mut rx) = mpsc::channel::<bool>(10);
        let error_counter = Arc::new(AtomicU32::new(0));
        let error_counter_ref = Arc::clone(&error_counter);
        let cloned_executor = Arc::clone(&executor);
        let timeout_enabled = executor.futures_timeout > Duration::ZERO;
        let expected_timeout_count = if timeout_enabled {2} else {0};
        executor.spawn_executor(1,
                                move |_| { let error_counter = Arc::clone(&error_counter_ref); async move {error_counter.fetch_add(1, Relaxed);} },
                                move |_| { let mut tx = tx.clone(); async move {tx.send(true).await.unwrap()} },
                                stream::iter(vec![to_future(Ok(17)),
                                                            to_future(Err(Box::from("17"))),
                                                            to_future(Ok(170)),     // times out
                                                            to_future(Ok(19)),
                                                            to_future(Err(Box::from("19"))),
                                                            to_future(Ok(190))]     // times out
                                ));
        assert!(rx.next().await.expect("consumption_done_reporter() wasn't called"), "consumption_done_reporter yielded the wrong value");
        assert_eq!(error_counter.load(Relaxed), 2, "Error callback wasn't called the right number of times");
        assert_metrics(cloned_executor, 4 - expected_timeout_count, expected_timeout_count, 2);

    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_non_futures_non_fallible_executor_with_logs_and_metrics() {
        assert_spawn_non_futures_non_fallible_executor(StreamExecutor::<{Instruments::LogsWithMetrics.into()}>::new("executor with logs & metrics")).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_non_futures_non_fallible_executor_with_metrics_and_no_logs() {
        assert_spawn_non_futures_non_fallible_executor(StreamExecutor::<{Instruments::MetricsWithoutLogs.into()}>::new("executor with metrics & NO logs")).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_non_futures_non_fallible_executor_with_logs_and_no_metrics() {
        assert_spawn_non_futures_non_fallible_executor(StreamExecutor::<{Instruments::LogsWithoutMetrics.into()}>::new("executor with logs & NO metrics")).await;
    }

    #[cfg_attr(not(doc),tokio::test)]
    async fn spawn_non_futures_non_fallible_executor_with_no_logs_and_no_metrics() {
        assert_spawn_non_futures_non_fallible_executor(StreamExecutor::<{Instruments::NoInstruments.into()}>::new("executor with NO logs & NO metrics")).await;
    }

    /// executes assertions on the given `executor` by spawning the executor and adding non-futures / non-fallible elements to it
    async fn assert_spawn_non_futures_non_fallible_executor<const INSTRUMENTS: usize>
                                                           (executor: Arc<StreamExecutor<INSTRUMENTS>>) {
        let (mut tx, mut rx) = mpsc::channel::<bool>(10);
        let cloned_executor = Arc::clone(&executor);
        executor.spawn_non_futures_non_fallible_executor(1, move |_| async move {tx.send(true).await.unwrap()}, stream::iter(vec![17, 19]));
        assert!(rx.next().await.expect("consumption_done_reporter() wasn't called"), "consumption_done_reporter yielded the wrong value");
        assert_metrics(cloned_executor, 2, 0, 0);

    }

    /// apply assertions on metrics for the given `executor`
    fn assert_metrics<const INSTRUMENTS: usize>
                     (executor: Arc<StreamExecutor<INSTRUMENTS>>,
                      expected_ok_counter:         u32,
                      expected_timed_out_counter:  u32,
                      expected_failed_counter:     u32) {

        println!("### Stats assertions for Stream pipeline executor named '{}' (Logs? {}; Metrics? {}) ####",
                 executor.executor_name, Instruments::from(INSTRUMENTS).logging(), Instruments::from(INSTRUMENTS).metrics());
        let creation_duration = executor.creation_time.elapsed();
        let execution_start_delta_nanos = executor.execution_start_delta_nanos.load(Relaxed);
        let execution_finish_delta_nanos = executor.execution_finish_delta_nanos.load(Relaxed);
        let (ok_counter, ok_average) = executor.ok_events_avg_future_duration.lightweight_probe();
        let (timed_out_counter, timed_out_average) = executor.timed_out_events_avg_future_duration.lightweight_probe();
        let (failed_counter, failed_average) = executor.failed_events_avg_future_duration.lightweight_probe();
        println!("Creation time:    {:?} ago", creation_duration);
        println!("Execution Start:  {:?} after creation", Duration::from_nanos(execution_start_delta_nanos));
        println!("Execution Finish: {:?} after creation", Duration::from_nanos(execution_finish_delta_nanos));
        println!("OK elements count: {ok_counter}; OK elements average Future resolution time: {ok_average}s{}{}",
                 if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
                 if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});
        println!("TIMED OUT elements count: {timed_out_counter}; TIMED OUT elements average Future resolution time: {timed_out_average}s{}{}",
                 if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
                 if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});
        println!("FAILED elements count: {failed_counter}; FAILED elements average Future resolution time: {failed_average}s{}{}",
                 if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
                 if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});

        assert_ne!(execution_start_delta_nanos,  u64::MAX, "'execution_start_delta_nanos' wasn't set");
        assert_ne!(execution_finish_delta_nanos, u64::MAX, "'execution_finish_delta_nanos' wasn't set");
        assert!(execution_finish_delta_nanos >= execution_start_delta_nanos, "INSTRUMENTATION ERROR: 'execution_start_delta_nanos' was set after 'execution_finish_delta_nanos'");

        if Instruments::from(INSTRUMENTS).metrics() {
            assert_eq!(ok_counter,        expected_ok_counter,        "OK elements counter doesn't match -- Metrics are ENABLED");
            assert_eq!(timed_out_counter, expected_timed_out_counter, "TIMED OUT elements counter doesn't match -- Metrics are ENABLED");
            assert_eq!(failed_counter,    expected_failed_counter,    "FAILED elements counter doesn't match -- Metrics are ENABLED");
        } else {
            assert_eq!(ok_counter,        0, "Metrics are DISABLED, so the reported OK elements should be ZERO");
            assert_eq!(timed_out_counter, 0, "Metrics are DISABLED, so the reported TIMED OUT elements should be ZERO");
            assert_eq!(failed_counter,    0, "Metrics are DISABLED, so the reported FAILED elements should be ZERO");
        }


        // if executor.instruments.measure_time is set
        // ...
    }
}

/// will derive `AtomicExecutorStatus`
#[atomic_enum]
#[derive(PartialEq)]
enum ExecutorStatus {
    NotStarted,
    Running,
    ScheduledToFinish,
    ProgrammaticallyEnded,
    StreamEnded,
}