use super::{
instruments::Instruments,
incremental_averages::AtomicIncrementalAverage64,
};
use std::{
sync::{
Arc,
atomic::{
AtomicU64,
Ordering::Relaxed,
},
},
future::Future,
fmt::Debug,
time::Duration,
error::Error,
future,
};
use atomic_enum::atomic_enum;
use futures::stream::{Stream,StreamExt};
use tokio::time::timeout;
use minstant::Instant;
use log::{trace,info,warn,error};
pub trait StreamExecutorStats: Debug {
fn executor_name(&self) -> &String;
fn futures_timeout(&self) -> &Duration;
fn creation_time(&self) -> &Instant;
fn executor_status(&self) -> &AtomicExecutorStatus;
fn execution_start_delta_nanos(&self) -> u64;
fn execution_finish_delta_nanos(&self) -> u64;
fn ok_events_avg_future_duration(&self) -> &AtomicIncrementalAverage64;
fn timed_out_events_avg_future_duration(&self) -> &AtomicIncrementalAverage64;
fn failed_events_avg_future_duration(&self) -> &AtomicIncrementalAverage64;
fn report_scheduled_to_finish(&self);
}
#[derive(Debug)]
pub struct StreamExecutor<const INSTRUMENTS_USIZE: usize> {
executor_name: String,
futures_timeout: Duration,
creation_time: Instant,
executor_status: AtomicExecutorStatus,
execution_start_delta_nanos: AtomicU64,
execution_finish_delta_nanos: AtomicU64,
pub ok_events_avg_future_duration: AtomicIncrementalAverage64,
pub timed_out_events_avg_future_duration: AtomicIncrementalAverage64,
pub failed_events_avg_future_duration: AtomicIncrementalAverage64,
}
macro_rules! on_executor_start {
($self: expr, $future: expr, $fallible: expr, $futures_timeout: expr, $INSTRUMENTS: expr) => {
$self.register_execution_start();
if $INSTRUMENTS.logging() {
info!("✓✓✓✓ Stream Executor '{}' started: {}Futures{} / {}Fallible Items & {}Metrics",
$self.executor_name,
if $future {""} else {"Non-"},
if $future {
if $futures_timeout != Duration::ZERO {format!(" (with timeouts of {:?})", $futures_timeout)} else {" (NO timeouts)".to_string()}
} else {
"".to_string()
},
if $fallible {""} else {"Non-"},
if !$INSTRUMENTS.metrics() {"NO "} else {""});
}
}
}
macro_rules! on_non_timed_ok_item {
($self: expr, $item: expr, $INSTRUMENTS: expr) => {
{
if $INSTRUMENTS.cheap_profiling() {
$self.ok_events_avg_future_duration.inc(-1.0); }
if $INSTRUMENTS.tracing() {
trace!("✓✓✓✓ Executor '{}' yielded '{:?}'", $self.executor_name, $item);
}
}
}
}
macro_rules! on_timed_ok_item {
($self: expr, $item: expr, $elapsed: expr, $INSTRUMENTS: expr) => {
{
if $INSTRUMENTS.cheap_profiling() {
$self.ok_events_avg_future_duration.inc($elapsed.as_secs_f32());
} else {
panic!("\nThis macro can only be used if at least one of the Instruments' PROFILING are enabled -- otherwise you should use `on_non_timed_ok_item!(...)` instead");
}
if $INSTRUMENTS.tracing() {
trace!("✓✓✓✓ Executor '{}' yielded '{:?}' in {:?}", $self.executor_name, $item, $elapsed);
}
}
}
}
macro_rules! on_non_timed_err_item {
($self: expr, $err: expr, $INSTRUMENTS: expr) => {
{
if $INSTRUMENTS.cheap_profiling() {
$self.failed_events_avg_future_duration.inc(-1.0); }
if $INSTRUMENTS.logging() {
error!("✗✗✗✗ Executor '{}' yielded ERROR '{:?}'", $self.executor_name, $err);
}
}
}
}
macro_rules! on_timed_err_item {
($self: expr, $err: expr, $elapsed: expr, $INSTRUMENTS: expr) => {
{
if $INSTRUMENTS.cheap_profiling() {
$self.failed_events_avg_future_duration.inc($elapsed.as_secs_f32());
} else {
panic!("This macro can only be used if at least one of the Instruments' PROFILING are enabled -- otherwise you should use `on_non_timed_err_item!(...)` instead");
}
if $INSTRUMENTS.logging() {
error!("✗✗✗✗ Executor '{}' yielded ERROR '{:?}' in {:?}", $self.executor_name, $err, $elapsed);
}
}
}
}
macro_rules! on_executor_end {
($self: expr, $future: expr, $fallible: expr, $futures_timeout: expr, $INSTRUMENTS: expr) => {
$self.register_execution_finish();
let stream_ended = $self.executor_status.load(Relaxed) == ExecutorStatus::StreamEnded;
let execution_nanos = $self.execution_finish_delta_nanos.load(Relaxed) - $self.execution_start_delta_nanos.load(Relaxed);
if $INSTRUMENTS.logging() && $INSTRUMENTS.cheap_profiling() {
let (ok_counter, ok_avg_seconds) = $self.ok_events_avg_future_duration.probe();
let (timed_out_counter, timed_out_avg_seconds) = $self.timed_out_events_avg_future_duration.probe();
let (failed_counter, failed_avg_seconds) = $self.failed_events_avg_future_duration.probe();
let execution_secs: f64 = Duration::from_nanos(execution_nanos).as_secs_f64() + f64::MIN_POSITIVE ;
let ok_stats = if $future {
format!("ok: {} events; avg {:?} - {:.5}/sec", ok_counter, Duration::from_secs_f32(ok_avg_seconds), ok_counter as f64 / execution_secs)
} else {
format!("ok: {} events", ok_counter)
};
let timed_out_stats = if $future && $futures_timeout != Duration::ZERO {
format!(" | time out: {} events; avg {:?} - {:.5}/sec", timed_out_counter, Duration::from_secs_f32(timed_out_avg_seconds), timed_out_counter as f64 / execution_secs)
} else {
format!("")
};
let failed_stats = if $future && $fallible {
format!(" | failed: {} events; avg {:?} - {:.5}/sec", failed_counter, Duration::from_secs_f32(failed_avg_seconds), failed_counter as f64 / execution_secs)
} else if $fallible {
format!(" | failed: {} events", failed_counter)
} else {
format!("")
};
warn!("✓✓✓✓ {} '{}' ended after running for {:?} -- stats: | {}{}{}",
if stream_ended {"Stream"} else {"Executor"},
$self.executor_name,
Duration::from_nanos(execution_nanos),
ok_stats,
timed_out_stats,
failed_stats);
} else if $INSTRUMENTS.logging() {
warn!("✓✓✓✓ {} '{}' ended after running for {:?} -- metrics were disabled",
if stream_ended {"Stream"} else {"Executor"},
$self.executor_name,
Duration::from_nanos(execution_nanos));
}
}
}
impl<const INSTRUMENTS_USIZE: usize>
StreamExecutor<INSTRUMENTS_USIZE> {
const INSTRUMENTS: Instruments = {Instruments::from(INSTRUMENTS_USIZE)};
pub fn new<IntoString: Into<String>>(executor_name: IntoString) -> Arc<Self> {
Self::with_futures_timeout(executor_name, Duration::ZERO)
}
pub fn with_futures_timeout<IntoString: Into<String>>(executor_name: IntoString, futures_timeout: Duration) -> Arc<Self> {
Arc::new(Self {
executor_name: executor_name.into(),
futures_timeout,
creation_time: Instant::now(),
executor_status: AtomicExecutorStatus::new(ExecutorStatus::NotStarted),
execution_start_delta_nanos: AtomicU64::new(u64::MAX),
execution_finish_delta_nanos: AtomicU64::new(u64::MAX),
ok_events_avg_future_duration: AtomicIncrementalAverage64::new(),
failed_events_avg_future_duration: AtomicIncrementalAverage64::new(),
timed_out_events_avg_future_duration: AtomicIncrementalAverage64::new(),
})
}
fn register_execution_start(&self) {
self.executor_status.store(ExecutorStatus::Running, Relaxed);
self.execution_start_delta_nanos.store(self.creation_time.elapsed().as_nanos() as u64, Relaxed);
}
fn register_execution_finish(&self) {
loop {
if self.executor_status.compare_exchange(ExecutorStatus::Running, ExecutorStatus::StreamEnded, Relaxed, Relaxed).is_ok() ||
self.executor_status.compare_exchange(ExecutorStatus::ScheduledToFinish, ExecutorStatus::ProgrammaticallyEnded, Relaxed, Relaxed).is_ok() {
break
}
}
self.execution_finish_delta_nanos.store(self.creation_time.elapsed().as_nanos() as u64, Relaxed);
}
pub fn spawn_executor<OutItemType: Send + Debug,
FutureItemType: Future<Output=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send,
CloseVoidAsyncType: Future<Output=()> + Send + 'static,
ErrVoidAsyncType: Future<Output=()> + Send + 'static>
(self: Arc<Self>,
concurrency_limit: u32,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) -> ErrVoidAsyncType + Send + Sync + 'static,
stream_ended_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
stream: impl Stream<Item=FutureItemType> + Send + 'static) {
match self.futures_timeout {
Duration::ZERO => {
tokio::spawn(async move {
let self_ref: &Self = &self;
let on_err_callback_ref = &on_err_callback;
on_executor_start!(self_ref, true, true, self_ref.futures_timeout, Self::INSTRUMENTS);
let mut start = Instant::now(); let item_processor = |future_element| {
async move {
if Self::INSTRUMENTS.cheap_profiling() {
start = Instant::now();
}
match future_element.await {
Ok(yielded_item) => {
if Self::INSTRUMENTS.cheap_profiling() {
let elapsed = start.elapsed();
on_timed_ok_item!(self_ref, yielded_item, elapsed, Self::INSTRUMENTS);
} else {
on_non_timed_ok_item!(self_ref, yielded_item, Self::INSTRUMENTS);
}
},
Err(err) => {
if Self::INSTRUMENTS.cheap_profiling() {
let elapsed = start.elapsed();
on_timed_err_item!(self_ref, err, elapsed, Self::INSTRUMENTS);
} else {
on_non_timed_err_item!(self_ref, err, Self::INSTRUMENTS);
}
on_err_callback_ref(err).await;
},
}
}
};
match concurrency_limit {
1 => stream.for_each(item_processor).await, _ => stream.for_each_concurrent(concurrency_limit as usize, item_processor).await,
}
on_executor_end!(self_ref, true, true, Duration::ZERO, Self::INSTRUMENTS);
stream_ended_callback(self).await;
});
},
_ => {
tokio::spawn(async move {
let self_ref: &Self = &self;
let on_err_callback_ref = &on_err_callback;
on_executor_start!(self_ref, true, true, self_ref.futures_timeout, Self::INSTRUMENTS);
let mut start = Instant::now(); let item_processor = |future_element| {
async move {
if Self::INSTRUMENTS.cheap_profiling() {
start = Instant::now();
}
match timeout(self_ref.futures_timeout, future_element).await {
Ok(non_timed_out_result) => match non_timed_out_result {
Ok(yielded_item) => {
if Self::INSTRUMENTS.cheap_profiling() {
let elapsed = start.elapsed();
on_timed_ok_item!(self_ref, yielded_item, elapsed, Self::INSTRUMENTS);
} else {
on_non_timed_ok_item!(self_ref, yielded_item, Self::INSTRUMENTS);
}
},
Err(err) => {
if Self::INSTRUMENTS.cheap_profiling() {
let elapsed = start.elapsed();
on_timed_err_item!(self_ref, err, elapsed, Self::INSTRUMENTS);
} else {
on_non_timed_err_item!(self_ref, err, Self::INSTRUMENTS);
}
on_err_callback_ref(err).await;
},
},
Err(_time_out_err) => {
if Self::INSTRUMENTS.cheap_profiling() {
let elapsed = start.elapsed();
self_ref.timed_out_events_avg_future_duration.inc(elapsed.as_secs_f32());
if Self::INSTRUMENTS.logging() {
error!("🕝🕝🕝🕝 Executor '{}' TIMED OUT after {:?}", self_ref.executor_name, elapsed);
}
} else if Self::INSTRUMENTS.logging() {
error!("🕝🕝🕝🕝 Executor '{}' TIMED OUT", self_ref.executor_name);
}
}
}
}
};
match concurrency_limit {
1 => stream.for_each(item_processor).await, _ => stream.for_each_concurrent(concurrency_limit as usize, item_processor).await,
}
on_executor_end!(self_ref, true, true, self_ref.futures_timeout, Self::INSTRUMENTS);
stream_ended_callback(self).await;
});
},
}
}
pub fn spawn_futures_executor<OutItemType: Send + Debug,
FutureItemType: Future<Output=OutItemType> + Send,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(self: Arc<Self>,
concurrency_limit: u32,
stream_ended_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
stream: impl Stream<Item=FutureItemType> + Send + 'static) {
match self.futures_timeout {
Duration::ZERO => {
tokio::spawn(async move {
let self_ref: &Self = &self;
on_executor_start!(self_ref, true, true, self_ref.futures_timeout, Self::INSTRUMENTS);
let mut start = Instant::now(); let item_processor = |future_element| {
async move {
if Self::INSTRUMENTS.cheap_profiling() {
start = Instant::now();
}
let yielded_item = future_element.await;
if Self::INSTRUMENTS.cheap_profiling() {
let elapsed = start.elapsed();
on_timed_ok_item!(self_ref, yielded_item, elapsed, Self::INSTRUMENTS);
} else {
on_non_timed_ok_item!(self_ref, yielded_item, Self::INSTRUMENTS);
}
}
};
match concurrency_limit {
1 => stream.for_each(item_processor).await, _ => stream.for_each_concurrent(concurrency_limit as usize, item_processor).await,
}
on_executor_end!(self_ref, true, true, Duration::ZERO, Self::INSTRUMENTS);
stream_ended_callback(self).await;
});
},
_ => {
tokio::spawn(async move {
let self_ref: &Self = &self;
on_executor_start!(self_ref, true, true, self_ref.futures_timeout, Self::INSTRUMENTS);
let mut start = Instant::now(); let item_processor = |future_element| {
async move {
if Self::INSTRUMENTS.cheap_profiling() {
start = Instant::now();
}
match timeout(self_ref.futures_timeout, future_element).await {
Ok(non_timed_out_result) => {
if Self::INSTRUMENTS.cheap_profiling() {
let elapsed = start.elapsed();
on_timed_ok_item!(self_ref, non_timed_out_result, elapsed, Self::INSTRUMENTS);
} else {
on_non_timed_ok_item!(self_ref, non_timed_out_result, Self::INSTRUMENTS);
}
}
Err(_time_out_err) => {
if Self::INSTRUMENTS.cheap_profiling() {
let elapsed = start.elapsed();
self_ref.timed_out_events_avg_future_duration.inc(elapsed.as_secs_f32());
if Self::INSTRUMENTS.logging() {
error!("🕝🕝🕝🕝 Executor '{}' TIMED OUT after {:?}", self_ref.executor_name, elapsed);
}
} else if Self::INSTRUMENTS.logging() {
error!("🕝🕝🕝🕝 Executor '{}' TIMED OUT", self_ref.executor_name);
}
}
}
}
};
match concurrency_limit {
1 => stream.for_each(item_processor).await, _ => stream.for_each_concurrent(concurrency_limit as usize, item_processor).await,
}
on_executor_end!(self_ref, true, true, self_ref.futures_timeout, Self::INSTRUMENTS); stream_ended_callback(self).await;
});
},
}
}
pub fn spawn_fallibles_executor<OutItemType: Send + Debug,
CloseVoidAsyncType: Future<Output=()> + Send + 'static>
(self: Arc<Self>,
concurrency_limit: u32,
on_err_callback: impl Fn(Box<dyn Error + Send + Sync>) + Send + Sync + 'static,
stream_ended_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> CloseVoidAsyncType + Send + Sync + 'static,
stream: impl Stream<Item=Result<OutItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static) {
tokio::spawn(async move {
on_executor_start!(self, true, true, Duration::ZERO, Self::INSTRUMENTS);
let item_processor = |element| {
match element {
Ok(yielded_item) => {
on_non_timed_ok_item!(self, yielded_item, Self::INSTRUMENTS);
},
Err(err) => {
on_non_timed_err_item!(self, err, Self::INSTRUMENTS);
on_err_callback(err);
}
}
};
match concurrency_limit {
1 => stream.for_each(|item| future::ready(item_processor(item))).await, _ => stream.for_each_concurrent(concurrency_limit as usize, |item| future::ready(item_processor(item))).await,
}
on_executor_end!(self, false, true, Duration::ZERO, Self::INSTRUMENTS);
stream_ended_callback(self).await;
});
}
pub fn spawn_non_futures_executor<ItemType: Send + Debug,
VoidAsyncType: Future<Output=()> + Send + 'static>
(self: Arc<Self>,
concurrency_limit: u32,
stream_ended_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> VoidAsyncType + Send + Sync + 'static,
stream: impl Stream<Item=Result<ItemType, Box<dyn std::error::Error + Send + Sync>>> + Send + 'static) {
tokio::spawn(async move {
on_executor_start!(self, false, true, Duration::ZERO, Self::INSTRUMENTS);
let item_processor = |fallible_element| {
match fallible_element {
Ok(yielded_item) => on_non_timed_ok_item!(self, yielded_item, Self::INSTRUMENTS),
Err(err) => on_non_timed_err_item!(self, err, Self::INSTRUMENTS),
}
};
match concurrency_limit {
1 => stream.for_each(|fallible_item| future::ready(item_processor(fallible_item))).await, _ => stream.for_each_concurrent(concurrency_limit as usize, |fallible_item| future::ready(item_processor(fallible_item))).await,
}
on_executor_end!(self, false, true, Duration::ZERO, Self::INSTRUMENTS);
stream_ended_callback(self).await;
});
}
pub fn spawn_non_futures_non_fallibles_executor<OutItemType: Send + Debug,
VoidAsyncType: Future<Output=()> + Send + 'static>
(self: Arc<Self>,
concurrency_limit: u32,
stream_ended_callback: impl FnOnce(Arc<dyn StreamExecutorStats + Send + Sync>) -> VoidAsyncType + Send + Sync + 'static,
stream: impl Stream<Item=OutItemType> + Send + 'static) {
tokio::spawn(async move {
on_executor_start!(self, false, false, Duration::ZERO, Self::INSTRUMENTS);
let item_processor = |yielded_item| on_non_timed_ok_item!(self, yielded_item, Self::INSTRUMENTS);
match concurrency_limit {
1 => stream.for_each(|item| future::ready(item_processor(item))).await, _ => stream.for_each_concurrent(concurrency_limit as usize, |item| future::ready(item_processor(item))).await,
}
on_executor_end!(self, false, false, Duration::ZERO, Self::INSTRUMENTS);
stream_ended_callback(self).await;
});
}
}
impl<const INSTRUMENTS_USIZE: usize> StreamExecutorStats for
StreamExecutor<INSTRUMENTS_USIZE> {
fn executor_name(&self) -> &String {
&self.executor_name
}
fn futures_timeout(&self) -> &Duration {
&self.futures_timeout
}
fn creation_time(&self) -> &Instant {
&self.creation_time
}
fn executor_status(&self) -> &AtomicExecutorStatus {
&self.executor_status
}
fn execution_start_delta_nanos(&self) -> u64 {
self.execution_start_delta_nanos.load(Relaxed)
}
fn execution_finish_delta_nanos(&self) -> u64 {
self.execution_finish_delta_nanos.load(Relaxed)
}
fn ok_events_avg_future_duration(&self) -> &AtomicIncrementalAverage64 {
&self.ok_events_avg_future_duration
}
fn timed_out_events_avg_future_duration(&self) -> &AtomicIncrementalAverage64 {
&self.timed_out_events_avg_future_duration
}
fn failed_events_avg_future_duration(&self) -> &AtomicIncrementalAverage64 {
&self.failed_events_avg_future_duration
}
fn report_scheduled_to_finish(&self) {
self.executor_status.store(ExecutorStatus::ScheduledToFinish, Relaxed);
}
}
#[cfg(any(test,doc))]
mod tests {
use super::*;
use std::sync::atomic::AtomicU32;
use futures::{
stream::{self, StreamExt},
channel::mpsc,
SinkExt,
};
#[ctor::ctor]
fn suite_setup() {
simple_logger::SimpleLogger::new().with_utc_timestamps().init().unwrap_or_else(|_| eprintln!("--> LOGGER WAS ALREADY STARTED"));
info!("minstant: is TSC / RDTSC instruction available for time measurement? {}", minstant::is_tsc_available());
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_fallible_executor_with_logs_and_metrics() {
assert_spawn_futures_fallible_executor::<{Instruments::LogsWithMetrics.into()}>(StreamExecutor::new("executor with logs & metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_fallible_executor_with_metrics_and_no_logs() {
assert_spawn_futures_fallible_executor::<{Instruments::MetricsWithoutLogs.into()}>(StreamExecutor::new("executor with metrics & NO logs")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_fallible_executor_with_logs_and_no_metrics() {
assert_spawn_futures_fallible_executor::<{Instruments::LogsWithoutMetrics.into()}>(StreamExecutor::new("executor with logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_fallible_executor_with_no_logs_and_no_metrics() {
assert_spawn_futures_fallible_executor::<{Instruments::NoInstruments.into()}>(StreamExecutor::new("executor with NO logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_fallible_executor_with_logs_and_metrics() {
assert_spawn_futures_fallible_executor::<{Instruments::LogsWithMetrics.into()}>(StreamExecutor::with_futures_timeout("executor with logs & metrics", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_fallible_executor_with_metrics_and_no_logs() {
assert_spawn_futures_fallible_executor::<{Instruments::MetricsWithoutLogs.into()}>(StreamExecutor::with_futures_timeout("executor with metrics & NO logs", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_fallible_executor_with_logs_and_no_metrics() {
assert_spawn_futures_fallible_executor::<{Instruments::LogsWithoutMetrics.into()}>(StreamExecutor::with_futures_timeout("executor with logs & NO metrics", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_fallible_executor_with_no_logs_and_no_metrics() {
assert_spawn_futures_fallible_executor::<{Instruments::NoInstruments.into()}>(StreamExecutor::with_futures_timeout("executor with NO logs & NO metrics", Duration::from_millis(100))).await;
}
async fn assert_spawn_futures_fallible_executor<const INSTRUMENTS: usize>
(executor: Arc<StreamExecutor<INSTRUMENTS>>) {
async fn to_future(item: Result<u32, Box<dyn Error + Send + Sync>>) -> Result<u32, Box<dyn Error + Send + Sync>> {
if item.is_ok() && item.as_ref().unwrap() > &100 {
tokio::time::sleep(Duration::from_millis(150)).await;
}
item
}
let (tx, mut rx) = mpsc::channel::<bool>(10);
let error_counter = Arc::new(AtomicU32::new(0));
let error_counter_ref = Arc::clone(&error_counter);
let cloned_executor = Arc::clone(&executor);
let timeout_enabled = *executor.futures_timeout() > Duration::ZERO;
let expected_timeout_count = if timeout_enabled {2} else {0};
executor.spawn_executor::<_, _, _, _>
(1,
move |_| { let error_counter = Arc::clone(&error_counter_ref); async move {error_counter.fetch_add(1, Relaxed);} },
move |_| { let mut tx = tx.clone(); async move {tx.send(true).await.unwrap()} },
stream::iter(vec![to_future(Ok(17)),
to_future(Err(Box::from("17"))),
to_future(Ok(170)), to_future(Ok(19)),
to_future(Err(Box::from("19"))),
to_future(Ok(190))] ));
assert!(rx.next().await.expect("consumption_done_reporter() wasn't called"), "consumption_done_reporter yielded the wrong value");
assert_eq!(error_counter.load(Relaxed), 2, "Error callback wasn't called the right number of times");
assert_metrics::<INSTRUMENTS>
(cloned_executor, 4 - expected_timeout_count, expected_timeout_count, 2);
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_executor_with_logs_and_metrics() {
assert_spawn_futures_executor::<{Instruments::LogsWithMetrics.into()}>(StreamExecutor::new("executor with logs & metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_executor_with_metrics_and_no_logs() {
assert_spawn_futures_executor::<{Instruments::MetricsWithoutLogs.into()}>(StreamExecutor::new("executor with metrics & NO logs")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_executor_with_logs_and_no_metrics() {
assert_spawn_futures_executor::<{Instruments::LogsWithoutMetrics.into()}>(StreamExecutor::new("executor with logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_timeout_futures_executor_with_no_logs_and_no_metrics() {
assert_spawn_futures_executor::<{Instruments::NoInstruments.into()}>(StreamExecutor::new("executor with NO logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_executor_with_logs_and_metrics() {
assert_spawn_futures_executor::<{Instruments::LogsWithMetrics.into()}>(StreamExecutor::with_futures_timeout("executor with logs & metrics", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_executor_with_metrics_and_no_logs() {
assert_spawn_futures_executor::<{Instruments::MetricsWithoutLogs.into()}>(StreamExecutor::with_futures_timeout("executor with metrics & NO logs", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_executor_with_logs_and_no_metrics() {
assert_spawn_futures_executor::<{Instruments::LogsWithoutMetrics.into()}>(StreamExecutor::with_futures_timeout("executor with logs & NO metrics", Duration::from_millis(100))).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_timeout_futures_executor_with_no_logs_and_no_metrics() {
assert_spawn_futures_executor::<{Instruments::NoInstruments.into()}>(StreamExecutor::with_futures_timeout("executor with NO logs & NO metrics", Duration::from_millis(100))).await;
}
async fn assert_spawn_futures_executor<const INSTRUMENTS: usize>
(executor: Arc<StreamExecutor<INSTRUMENTS>>) {
async fn to_future(item: u32) -> u32 {
if item > 100 {
tokio::time::sleep(Duration::from_millis(150)).await;
}
item
}
let (tx, mut rx) = mpsc::channel::<bool>(10);
let cloned_executor = Arc::clone(&executor);
let timeout_enabled = executor.futures_timeout > Duration::ZERO;
let expected_timeout_count = if timeout_enabled {2} else {0};
executor.spawn_futures_executor::<_, _, _>
(1,
move |_| { let mut tx = tx.clone(); async move {tx.send(true).await.unwrap()} },
stream::iter(vec![to_future(17),
to_future(170), to_future(19),
to_future(190)] ));
assert!(rx.next().await.expect("consumption_done_reporter() wasn't called"), "consumption_done_reporter yielded the wrong value");
assert_metrics::<INSTRUMENTS>
(cloned_executor, 4 - expected_timeout_count, expected_timeout_count, 0);
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_fallibles_executor_with_logs_and_metrics() {
assert_spawn_fallibles_executor::<{Instruments::LogsWithMetrics.into()}>(StreamExecutor::new("executor with logs & metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_fallibles_executor_with_metrics_and_no_logs() {
assert_spawn_fallibles_executor::<{Instruments::MetricsWithoutLogs.into()}>(StreamExecutor::new("executor with metrics & NO logs")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_fallibles_executor_with_logs_and_no_metrics() {
assert_spawn_fallibles_executor::<{Instruments::LogsWithoutMetrics.into()}>(StreamExecutor::new("executor with logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_fallibles_executor_with_no_logs_and_no_metrics() {
assert_spawn_fallibles_executor::<{Instruments::NoInstruments.into()}>(StreamExecutor::new("executor with NO logs & NO metrics")).await;
}
async fn assert_spawn_fallibles_executor<const INSTRUMENTS: usize>
(executor: Arc<StreamExecutor<INSTRUMENTS>>) {
let error_count = Arc::new(AtomicU32::new(0));
let error_count_ref = Arc::clone(&error_count);
let (mut tx, mut rx) = mpsc::channel::<bool>(10);
let cloned_executor = Arc::clone(&executor);
executor.spawn_fallibles_executor::<_, _>
(1,
move |_err| {
error_count_ref.fetch_add(1, Relaxed);
},
move |_| async move {
tx.send(true).await.unwrap()
},
stream::iter(vec![Ok(17), Ok(19), Err(Box::from(String::from("Error on 20th")))]) );
assert!(rx.next().await.expect("consumption_done_reporter() wasn't called"), "consumption_done_reporter yielded the wrong value");
assert_eq!(error_count.load(Relaxed), 1, "Error count is wrong, as computed by the error callback");
assert_metrics::<INSTRUMENTS>
(cloned_executor, 2, 0, 1);
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_futures_non_fallibles_executor_with_logs_and_metrics() {
assert_spawn_non_futures_non_fallibles_executor::<{Instruments::LogsWithMetrics.into()}>(StreamExecutor::new("executor with logs & metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_futures_non_fallibles_executor_with_metrics_and_no_logs() {
assert_spawn_non_futures_non_fallibles_executor::<{Instruments::MetricsWithoutLogs.into()}>(StreamExecutor::new("executor with metrics & NO logs")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_futures_non_fallibles_executor_with_logs_and_no_metrics() {
assert_spawn_non_futures_non_fallibles_executor::<{Instruments::LogsWithoutMetrics.into()}>(StreamExecutor::new("executor with logs & NO metrics")).await;
}
#[cfg_attr(not(doc),tokio::test)]
async fn spawn_non_futures_non_fallibles_executor_with_no_logs_and_no_metrics() {
assert_spawn_non_futures_non_fallibles_executor::<{Instruments::NoInstruments.into()}>(StreamExecutor::new("executor with NO logs & NO metrics")).await;
}
async fn assert_spawn_non_futures_non_fallibles_executor<const INSTRUMENTS: usize>
(executor: Arc<StreamExecutor<INSTRUMENTS>>) {
let (mut tx, mut rx) = mpsc::channel::<bool>(10);
let cloned_executor = Arc::clone(&executor);
executor.spawn_non_futures_non_fallibles_executor::<_, _>
(1,
move |_| async move {tx.send(true).await.unwrap()},
stream::iter(vec![17, 19]) );
assert!(rx.next().await.expect("consumption_done_reporter() wasn't called"), "consumption_done_reporter yielded the wrong value");
assert_metrics::<INSTRUMENTS>
(cloned_executor, 2, 0, 0);
}
fn assert_metrics<const INSTRUMENTS: usize>
(executor: Arc<StreamExecutor<INSTRUMENTS>>,
expected_ok_counter: u32,
expected_timed_out_counter: u32,
expected_failed_counter: u32) {
println!("### Stats assertions for Stream pipeline executor named '{}' (Logs? {}; Metrics? {}) ####",
executor.executor_name, Instruments::from(INSTRUMENTS).logging(), Instruments::from(INSTRUMENTS).metrics());
let creation_duration = executor.creation_time.elapsed();
let execution_start_delta_nanos = executor.execution_start_delta_nanos.load(Relaxed);
let execution_finish_delta_nanos = executor.execution_finish_delta_nanos.load(Relaxed);
let (ok_counter, ok_average) = executor.ok_events_avg_future_duration.lightweight_probe();
let (timed_out_counter, timed_out_average) = executor.timed_out_events_avg_future_duration.lightweight_probe();
let (failed_counter, failed_average) = executor.failed_events_avg_future_duration.lightweight_probe();
println!("Creation time: {:?} ago", creation_duration);
println!("Execution Start: {:?} after creation", Duration::from_nanos(execution_start_delta_nanos));
println!("Execution Finish: {:?} after creation", Duration::from_nanos(execution_finish_delta_nanos));
println!("OK elements count: {ok_counter}; OK elements average Future resolution time: {ok_average}s{}{}",
if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});
println!("TIMED OUT elements count: {timed_out_counter}; TIMED OUT elements average Future resolution time: {timed_out_average}s{}{}",
if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});
println!("FAILED elements count: {failed_counter}; FAILED elements average Future resolution time: {failed_average}s{}{}",
if Instruments::from(INSTRUMENTS).metrics() {""} else {" -- metrics are DISABLED"},
if Instruments::from(INSTRUMENTS).logging() {" -- verify these values against the \"executor closed\" message"} else {" -- logs are DISABLED"});
assert_ne!(execution_start_delta_nanos, u64::MAX, "'execution_start_delta_nanos' wasn't set");
assert_ne!(execution_finish_delta_nanos, u64::MAX, "'execution_finish_delta_nanos' wasn't set");
assert!(execution_finish_delta_nanos >= execution_start_delta_nanos, "INSTRUMENTATION ERROR: 'execution_start_delta_nanos' was set after 'execution_finish_delta_nanos'");
if Instruments::from(INSTRUMENTS).metrics() {
assert_eq!(ok_counter, expected_ok_counter, "OK elements counter doesn't match -- Metrics are ENABLED");
assert_eq!(timed_out_counter, expected_timed_out_counter, "TIMED OUT elements counter doesn't match -- Metrics are ENABLED");
assert_eq!(failed_counter, expected_failed_counter, "FAILED elements counter doesn't match -- Metrics are ENABLED");
} else {
assert_eq!(ok_counter, 0, "Metrics are DISABLED, so the reported OK elements should be ZERO");
assert_eq!(timed_out_counter, 0, "Metrics are DISABLED, so the reported TIMED OUT elements should be ZERO");
assert_eq!(failed_counter, 0, "Metrics are DISABLED, so the reported FAILED elements should be ZERO");
}
}
}
#[atomic_enum]
#[derive(PartialEq)]
pub enum ExecutorStatus {
NotStarted,
Running,
ScheduledToFinish,
ProgrammaticallyEnded,
StreamEnded,
}