use super::{
instruments::Instruments,
incremental_averages::AtomicIncrementalAverage64,
};
use std::{
sync::{
Arc,
atomic::{
AtomicU64,
Ordering::{Relaxed},
},
},
future::Future,
fmt::Debug,
time::{Duration},
error::Error,
future,
};
use atomic_enum::atomic_enum;
use futures::{
stream::{Stream,StreamExt}
};
use tokio::{
time::{
timeout,
},
};
use log::{trace,info,warn,error};
use minstant::Instant;
#[derive(Debug)]
pub struct StreamExecutor<const INSTRUMENTS: usize> {
executor_name: String,
futures_timeout: Duration,
creation_time: Instant,
executor_status: AtomicExecutorStatus,
execution_start_delta_nanos: AtomicU64,
execution_finish_delta_nanos: AtomicU64,
pub ok_events_avg_future_duration: AtomicIncrementalAverage64,
pub timed_out_events_avg_future_duration: AtomicIncrementalAverage64,
pub failed_events_avg_future_duration: AtomicIncrementalAverage64,
}
macro_rules! on_executor_start {
($self: expr, $future: expr, $fallible: expr, $futures_timeout: expr) => {
$self.register_execution_start();
if Instruments::from(INSTRUMENTS).logging() {
info!("✓✓✓✓ Stream Executor '{}' started: {}Futures{} / {}Fallible Items & {}Metrics",
$self.executor_name,
if $future {""} else {"Non-"},
if $future {
if $futures_timeout != Duration::ZERO {format!(" (with timeouts of {:?})", $futures_timeout)} else {" (NO timeouts)".to_string()}
} else {
"".to_string()
},
if $fallible {""} else {"Non-"},
if !Instruments::from(INSTRUMENTS).metrics() {"NO "} else {""});
}
}
}
macro_rules! on_non_timed_ok_item {
($self: expr, $item: expr) => {
{
if Instruments::from(INSTRUMENTS).cheap_profiling() {
$self.ok_events_avg_future_duration.inc(-1.0); }
if Instruments::from(INSTRUMENTS).tracing() {
trace!("✓✓✓✓ Executor '{}' yielded '{:?}'", $self.executor_name, $item);
}
}
}
}
macro_rules! on_timed_ok_item {
($self: expr, $item: expr, $elapsed: expr) => {
{
if Instruments::from(INSTRUMENTS).cheap_profiling() {
$self.ok_events_avg_future_duration.inc($elapsed.as_secs_f32());
} else {
panic!("\nThis macro can only be used if at least one of the Instruments' PROFILING are enabled -- otherwise you should use `on_non_timed_ok_item!(...)` instead");
}
if Instruments::from(INSTRUMENTS).tracing() {
trace!("✓✓✓✓ Executor '{}' yielded '{:?}' in {:?}", $self.executor_name, $item, $elapsed);
}
}
}
}
macro_rules! on_non_timed_err_item {
($self: expr, $err: expr) => {
{
if Instruments::from(INSTRUMENTS).cheap_profiling() {
$self.failed_events_avg_future_duration.inc(-1.0); }
if Instruments::from(INSTRUMENTS).logging() {
error!("✗✗✗✗ Executor '{}' yielded ERROR '{:?}'", $self.executor_name, $err);
}
}
}
}
macro_rules! on_timed_err_item {
($self: expr, $err: expr, $elapsed: expr) => {
{
if Instruments::from(INSTRUMENTS).cheap_profiling() {
$self.failed_events_avg_future_duration.inc($elapsed.as_secs_f32());
} else {
panic!("This macro can only be used if at least one of the Instruments' PROFILING are enabled -- otherwise you should use `on_non_timed_err_item!(...)` instead");
}
if Instruments::from(INSTRUMENTS).logging() {
error!("✗✗✗✗ Executor '{}' yielded ERROR '{:?}' in {:?}", $self.executor_name, $err, $elapsed);
}
}
}
}
macro_rules! on_executor_end {
($self: expr, $future: expr, $fallible: expr, $futures_timeout: expr) => {
$self.register_execution_finish();
let stream_ended = $self.executor_status.load(Relaxed) == ExecutorStatus::StreamEnded;
let execution_nanos = $self.execution_finish_delta_nanos.load(Relaxed) - $self.execution_start_delta_nanos.load(Relaxed);
if Instruments::from(INSTRUMENTS).logging() && Instruments::from(INSTRUMENTS).cheap_profiling() {
let (ok_counter, ok_avg_seconds) = $self.ok_events_avg_future_duration.probe();
let (timed_out_counter, timed_out_avg_seconds) = $self.timed_out_events_avg_future_duration.probe();
let (failed_counter, failed_avg_seconds) = $self.failed_events_avg_future_duration.probe();
let execution_secs: f64 = Duration::from_nanos(execution_nanos).as_secs_f64() + f64::MIN_POSITIVE ;
let ok_stats = if $future {
format!("ok: {} events; avg {:?} - {:.5}/sec", ok_counter, Duration::from_secs_f32(ok_avg_seconds), ok_counter as f64 / execution_secs)
} else {
format!("ok: {} events", ok_counter)
};
let timed_out_stats = if $future && $futures_timeout != Duration::ZERO {
format!(" | time out: {} events; avg {:?} - {:.5}/sec", timed_out_counter, Duration::from_secs_f32(timed_out_avg_seconds), timed_out_counter as f64 / execution_secs)
} else {
format!("")
};
let failed_stats = if $future && $fallible {
format!(" | failed: {} events; avg {:?} - {:.5}/sec", failed_counter, Duration::from_secs_f32(failed_avg_seconds), failed_counter as f64 / execution_secs)
} else if $fallible {
format!(" | failed: {} events", failed_counter)
} else {
format!("")
};
warn!("✓✓✓✓ {} '{}' ended after running for {:?} -- stats: | {}{}{}",
if stream_ended {"Stream"} else {"Executor"},
$self.executor_name,
Duration::from_nanos(execution_nanos),
ok_stats,
timed_out_stats,
failed_stats);
} else if Instruments::from(INSTRUMENTS).logging() {
warn!("✓✓✓✓ {} '{}' ended after running for {:?} -- metrics were disabled",
if stream_ended {"Stream"} else {"Executor"},
$self.executor_name,
Duration::from_nanos(execution_nanos));
}
}
}
impl<const INSTRUMENTS: usize> StreamExecutor<INSTRUMENTS> {
pub fn new<IntoString: Into<String>>(executor_name: IntoString) -> Arc<Self> {
Self::with_futures_timeout(executor_name, Duration::ZERO)
}
pub fn with_futures_timeout<IntoString: Into<String>>(executor_name: IntoString, futures_timeout: Duration) -> Arc<Self> {
Arc::new(Self {
executor_name: executor_name.into(),
futures_timeout,
creation_time: Instant::now(),
executor_status: AtomicExecutorStatus::new(ExecutorStatus::NotStarted),
execution_start_delta_nanos: AtomicU64::new(u64::MAX),
execution_finish_delta_nanos: AtomicU64::new(u64::MAX),
ok_events_avg_future_duration: AtomicIncrementalAverage64::new(),
failed_events_avg_future_duration: AtomicIncrementalAverage64::new(),
timed_out_events_avg_future_duration: AtomicIncrementalAverage64::new(),
})
}
pub fn executor_name(self: &Arc<Self>) -> String {
self.executor_name.clone()
}
fn register_execution_start(self: &Arc<Self>) {
self.executor_status.store(ExecutorStatus::Running, Relaxed);
self.execution_start_delta_nanos.store(self.creation_time.elapsed().as_nanos() as u64, Relaxed);
}
fn register_execution_finish(self: &Arc<Self>) {
loop {
if self.executor_status.compare_exchange(ExecutorStatus::Running, ExecutorStatus::StreamEnded, Relaxed, Relaxed).is_ok() ||
self.executor_status.compare_exchange(ExecutorStatus::ScheduledToFinish, ExecutorStatus::ProgrammaticallyEnded, Relaxed, Relaxed).is_ok() {
break
}
}
self.execution_finish_delta_nanos.store(self.creation_time.elapsed().as_nanos() as u64, Relaxed);
}
pub fn report_scheduled_to_finish(self: &Arc<Self>) {
self.executor_status.store(ExecutorStatus::ScheduledToFinish, Relaxed);
}
pub fn spawn_executor<OutItemType: Send + Debug,
FutureItemType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
CloseVoidAsyncType: Future<Output=()> + Send + 'static,
ErrVoidAsyncType: Future<Output=()> + Send + 'static>
(self: Arc<Self>,
concurrency_limit: u32,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
stream_ended_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> CloseVoidAsyncType + Send + Sync + 'static,
stream: impl Stream<Item=FutureItemType> + 'static + Send) {
let cloned_self = Arc::clone(&self);
let on_err_callback = Arc::new(on_err_callback);
match self.futures_timeout {
Duration::ZERO => {
tokio::spawn(async move {
on_executor_start!(cloned_self, true, true, cloned_self.futures_timeout);
let mut start = Instant::now(); let item_processor = |future_element| {
let cloned_self = Arc::clone(&cloned_self);
let on_err_callback = Arc::clone(&on_err_callback);
async move {
if Instruments::from(INSTRUMENTS).cheap_profiling() {
start = Instant::now();
}
match future_element.await {
Ok(yielded_item) => {
if Instruments::from(INSTRUMENTS).cheap_profiling() {
let elapsed = start.elapsed();
on_timed_ok_item!(cloned_self, yielded_item, elapsed);
} else {
on_non_timed_ok_item!(cloned_self, yielded_item);
}
},
Err(err) => {
if Instruments::from(INSTRUMENTS).cheap_profiling() {
let elapsed = start.elapsed();
on_timed_err_item!(cloned_self, err, elapsed);
} else {
on_non_timed_err_item!(cloned_self, err);
}
on_err_callback(err).await;
},
}
}
};
match concurrency_limit {
1 => stream.for_each(item_processor).await, _ => stream.for_each_concurrent(concurrency_limit as usize, item_processor).await,
}
on_executor_end!(cloned_self, true, true, Duration::ZERO);
stream_ended_callback(cloned_self).await;
});
},
_ => {
tokio::spawn(async move {
on_executor_start!(cloned_self, true, true, cloned_self.futures_timeout);
let mut start = Instant::now(); let item_processor = |future_element| {
let cloned_self = Arc::clone(&cloned_self);
let on_err_callback = Arc::clone(&on_err_callback);
async move {
if Instruments::from(INSTRUMENTS).cheap_profiling() {
start = Instant::now();
}
match timeout(cloned_self.futures_timeout, future_element).await {
Ok(non_timed_out_result) => match non_timed_out_result {
Ok(yielded_item) => {
if Instruments::from(INSTRUMENTS).cheap_profiling() {
let elapsed = start.elapsed();
on_timed_ok_item!(cloned_self, yielded_item, elapsed);
} else {
on_non_timed_ok_item!(cloned_self, yielded_item);
}
},
Err(err) => {
if Instruments::from(INSTRUMENTS).cheap_profiling() {
let elapsed = start.elapsed();
on_timed_err_item!(cloned_self, err, elapsed);
} else {
on_non_timed_err_item!(cloned_self, err);
}
on_err_callback(err).await;
},
},
Err(_time_out_err) => {
if Instruments::from(INSTRUMENTS).cheap_profiling() {
let elapsed = start.elapsed();
cloned_self.timed_out_events_avg_future_duration.inc(elapsed.as_secs_f32());
if Instruments::from(INSTRUMENTS).logging() {
error!("🕝🕝🕝🕝 Executor '{}' TIMED OUT after {:?}", cloned_self.executor_name, elapsed);
}
} else if Instruments::from(INSTRUMENTS).logging() {
error!("🕝🕝🕝🕝 Executor '{}' TIMED OUT", cloned_self.executor_name);
}
}
}
}
};
match concurrency_limit {
1 => stream.for_each(item_processor).await, _ => stream.for_each_concurrent(concurrency_limit as usize, item_processor).await,
}
on_executor_end!(cloned_self, true, true, cloned_self.futures_timeout);
stream_ended_callback(cloned_self).await;
});
},
}
}
pub fn spawn_non_futures_executor<ItemType: Send + Debug,
VoidAsyncType: Future<Output=()> + Send + 'static>
(self: Arc<Self>,
concurrency_limit: u32,
stream_ended_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> VoidAsyncType + Send + Sync + 'static,
stream: impl Stream<Item=Result<ItemType, Box<dyn std::error::Error + Send + Sync>>> + 'static + Send) {
tokio::spawn(async move {
on_executor_start!(self, false, true, Duration::ZERO);
let item_processor = |fallible_element| {
match fallible_element {
Ok(yielded_item) => on_non_timed_ok_item!(self, yielded_item),
Err(err) => on_non_timed_err_item!(self, err),
}
};
match concurrency_limit {
1 => stream.for_each(|fallible_item| future::ready(item_processor(fallible_item))).await, _ => stream.for_each_concurrent(concurrency_limit as usize, |fallible_item| future::ready(item_processor(fallible_item))).await,
}
on_executor_end!(self, false, true, Duration::ZERO);
stream_ended_callback(self).await;
});
}
pub fn spawn_non_futures_non_fallible_executor<OutItemType: Send + Debug,
VoidAsyncType: Future<Output=()> + Send + 'static>
(self: Arc<Self>,
concurrency_limit: u32,
stream_ended_callback: impl FnOnce(Arc<StreamExecutor<INSTRUMENTS>>) -> VoidAsyncType + Send + Sync + 'static,
stream: impl Stream<Item=OutItemType> + 'static + Send) {
tokio::spawn(async move {
on_executor_start!(self, false, false, Duration::ZERO);
let item_processor = |yielded_item| on_non_timed_ok_item!(self, yielded_item);
match concurrency_limit {
1 => stream.for_each(|item| future::ready(item_processor(item))).await, _ => stream.for_each_concurrent(concurrency_limit as usize, |item| future::ready(item_processor(item))).await,
}
on_executor_end!(self, false, false, Duration::ZERO);
stream_ended_callback(self).await;
});
}
}
#[cfg(any(test,doc))]
mod tests {
use super::*;
use std::sync::atomic::AtomicU32;
use futures::{
stream::{self, StreamExt},
channel::mpsc,
SinkExt,
};
#[ctor::ctor]
fn suite_setup() {
simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
info!("minstant: is TSC / RDTSC instruction available for time measurement? {}", minstant::is_tsc_available());
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_fallible_executor_with_logs_and_metrics() {
assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::LogsWithMetrics.into()}>::new("executor with logs & metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_fallible_executor_with_metrics_and_no_logs() {
assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::MetricsWithoutLogs.into()}>::new("executor with metrics & NO logs")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_fallible_executor_with_logs_and_no_metrics() {
assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::LogsWithoutMetrics.into()}>::new("executor with logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_fallible_executor_with_no_logs_and_no_metrics() {
assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::NoInstruments.into()}>::new("executor with NO logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_fallible_executor_with_logs_and_metrics() {
assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::LogsWithMetrics.into()}>::with_futures_timeout("executor with logs & metrics", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_fallible_executor_with_metrics_and_no_logs() {
assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::MetricsWithoutLogs.into()}>::with_futures_timeout("executor with metrics & NO logs", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_fallible_executor_with_logs_and_no_metrics() {
assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::LogsWithoutMetrics.into()}>::with_futures_timeout("executor with logs & NO metrics", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_fallible_executor_with_no_logs_and_no_metrics() {
assert_spawn_futures_fallible_executor(StreamExecutor::<{Instruments::NoInstruments.into()}>::with_futures_timeout("executor with NO logs & NO metrics", Duration::from_millis(100))).await;
}
async fn assert_spawn_futures_fallible_executor<const INSTRUMENTS: usize>
(executor: Arc<StreamExecutor<INSTRUMENTS>>) {
async fn to_future(item: Result<u32, Box<dyn Error + Send + Sync>>) -> Result<u32, Box<dyn Error + Send + Sync>> {
if item.is_ok() && item.as_ref().unwrap() > &100 {
tokio::time::sleep(Duration::from_millis(150)).await;
}
item
}
let (tx, mut rx) = mpsc::channel::<bool>(10);
let error_counter = Arc::new(AtomicU32::new(0));
let error_counter_ref = Arc::clone(&error_counter);
let cloned_executor = Arc::clone(&executor);
let timeout_enabled = executor.futures_timeout > Duration::ZERO;
let expected_timeout_count = if timeout_enabled {2} else {0};
executor.spawn_executor(1,
move |_| { let error_counter = Arc::clone(&error_counter_ref); async move {error_counter.fetch_add(1, Relaxed);} },
move |_| { let mut tx = tx.clone(); async move {tx.send(true).await.unwrap()} },
stream::iter(vec![to_future(Ok(17)),
to_future(Err(Box::from("17"))),
to_future(Ok(170)), to_future(Ok(19)),
to_future(Err(Box::from("19"))),
to_future(Ok(190))] ));
assert!(rx.next().await.expect("consumption_done_reporter() wasn't called"), "consumption_done_reporter yielded the wrong value");
assert_eq!(error_counter.load(Relaxed), 2, "Error callback wasn't called the right number of times");
assert_metrics(cloned_executor, 4 - expected_timeout_count, expected_timeout_count, 2);
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_futures_non_fallible_executor_with_logs_and_metrics() {
assert_spawn_non_futures_non_fallible_executor(StreamExecutor::<{Instruments::LogsWithMetrics.into()}>::new("executor with logs & metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_futures_non_fallible_executor_with_metrics_and_no_logs() {
assert_spawn_non_futures_non_fallible_executor(StreamExecutor::<{Instruments::MetricsWithoutLogs.into()}>::new("executor with metrics & NO logs")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_futures_non_fallible_executor_with_logs_and_no_metrics() {
assert_spawn_non_futures_non_fallible_executor(StreamExecutor::<{Instruments::LogsWithoutMetrics.into()}>::new("executor with logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_futures_non_fallible_executor_with_no_logs_and_no_metrics() {
assert_spawn_non_futures_non_fallible_executor(StreamExecutor::<{Instruments::NoInstruments.into()}>::new("executor with NO logs & NO metrics")).await;
}
async fn assert_spawn_non_futures_non_fallible_executor<const INSTRUMENTS: usize>
(executor: Arc<StreamExecutor<INSTRUMENTS>>) {
let (mut tx, mut rx) = mpsc::channel::<bool>(10);
let cloned_executor = Arc::clone(&executor);
executor.spawn_non_futures_non_fallible_executor(1, move |_| async move {tx.send(true).await.unwrap()}, stream::iter(vec![17, 19]));
assert!(rx.next().await.expect("consumption_done_reporter() wasn't called"), "consumption_done_reporter yielded the wrong value");
assert_metrics(cloned_executor, 2, 0, 0);
}
fn assert_metrics<const INSTRUMENTS: usize>
(executor: Arc<StreamExecutor<INSTRUMENTS>>,
expected_ok_counter: u32,
expected_timed_out_counter: u32,
expected_failed_counter: u32) {
println!("### Stats assertions for Stream pipeline executor named '{}' (Logs? {}; Metrics? {}) ####",
executor.executor_name, Instruments::from(INSTRUMENTS).logging(), Instruments::from(INSTRUMENTS).metrics());
let creation_duration = executor.creation_time.elapsed();
let execution_start_delta_nanos = executor.execution_start_delta_nanos.load(Relaxed);
let execution_finish_delta_nanos = executor.execution_finish_delta_nanos.load(Relaxed);
let (ok_counter, ok_average) = executor.ok_events_avg_future_duration.lightweight_probe();
let (timed_out_counter, timed_out_average) = executor.timed_out_events_avg_future_duration.lightweight_probe();
let (failed_counter, failed_average) = executor.failed_events_avg_future_duration.lightweight_probe();
println!("Creation time: {:?} ago", creation_duration);
println!("Execution Start: {:?} after creation", Duration::from_nanos(execution_start_delta_nanos));
println!("Execution Finish: {:?} after creation", Duration::from_nanos(execution_finish_delta_nanos));
println!("OK elements count: {ok_counter}; OK elements average Future resolution time: {ok_average}s{}{}",
if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});
println!("TIMED OUT elements count: {timed_out_counter}; TIMED OUT elements average Future resolution time: {timed_out_average}s{}{}",
if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});
println!("FAILED elements count: {failed_counter}; FAILED elements average Future resolution time: {failed_average}s{}{}",
if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});
assert_ne!(execution_start_delta_nanos, u64::MAX, "'execution_start_delta_nanos' wasn't set");
assert_ne!(execution_finish_delta_nanos, u64::MAX, "'execution_finish_delta_nanos' wasn't set");
assert!(execution_finish_delta_nanos >= execution_start_delta_nanos, "INSTRUMENTATION ERROR: 'execution_start_delta_nanos' was set after 'execution_finish_delta_nanos'");
if Instruments::from(INSTRUMENTS).metrics() {
assert_eq!(ok_counter, expected_ok_counter, "OK elements counter doesn't match -- Metrics are ENABLED");
assert_eq!(timed_out_counter, expected_timed_out_counter, "TIMED OUT elements counter doesn't match -- Metrics are ENABLED");
assert_eq!(failed_counter, expected_failed_counter, "FAILED elements counter doesn't match -- Metrics are ENABLED");
} else {
assert_eq!(ok_counter, 0, "Metrics are DISABLED, so the reported OK elements should be ZERO");
assert_eq!(timed_out_counter, 0, "Metrics are DISABLED, so the reported TIMED OUT elements should be ZERO");
assert_eq!(failed_counter, 0, "Metrics are DISABLED, so the reported FAILED elements should be ZERO");
}
}
}
#[atomic_enum]
#[derive(PartialEq)]
enum ExecutorStatus {
NotStarted,
Running,
ScheduledToFinish,
ProgrammaticallyEnded,
StreamEnded,
}