use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use anyhow::Context;
use tokio::sync::{broadcast, mpsc};
use crate::{
measurement::MeasurementBuffer,
metrics::online::MetricReader,
pipeline::{error::PipelineError, naming::TransformName},
};
use super::{error::TransformError, Transform, TransformContext};
pub async fn run_all_in_order(
mut transforms: Vec<(TransformName, Box<dyn Transform>)>,
mut rx: mpsc::Receiver<MeasurementBuffer>,
tx: broadcast::Sender<MeasurementBuffer>,
active_flags: Arc<AtomicU64>,
metrics_reader: MetricReader,
) -> Result<(), PipelineError> {
log::trace!(
"Running transforms: {}",
transforms
.iter()
.map(|(name, _)| name.to_string())
.collect::<Vec<_>>()
.join(", ")
);
loop {
if let Some(mut measurements) = rx.recv().await {
let current_flags = active_flags.load(Ordering::Relaxed);
log::trace!("current 'enabled' bitset: {current_flags}");
let metrics = &metrics_reader.read().await;
let ctx = TransformContext { metrics };
for (i, (name, t)) in &mut transforms.iter_mut().enumerate() {
let t_flag = 1 << i;
if current_flags & t_flag != 0 {
match t.apply(&mut measurements, &ctx) {
Ok(()) => (),
Err(TransformError::UnexpectedInput(e)) => {
log::error!("Transform {name} received unexpected measurements: {e:#}");
}
Err(TransformError::Fatal(e)) => {
log::error!("Fatal error in transform {name} (this breaks the transform task!): {e:?}");
return Err(PipelineError::for_element(name.to_owned(), e));
}
}
}
}
tx.send(measurements)
.context("could not send the measurements from transforms to the outputs")?;
} else {
log::debug!("The channel connected to the transform step has been closed, the transforms will stop.");
break;
}
}
Ok(())
}