use std::{cell::RefCell, ops::Deref, rc::Rc, time::Duration};
use fxhash::FxHashMap;
use tokio::sync::RwLockReadGuard;
use wrapped_output::{OutputDone, SetOutputOutputCheck, WrappedOutput};
use wrapped_source::{SetSourceCheck, SourceDone, WrappedManagedSource};
use wrapped_transform::{SetTransformOutputCheck, TransformDone, WrappedTransform};
use crate::{
agent::builder::TestExpectations,
measurement::MeasurementBuffer,
metrics::registry::MetricRegistry,
pipeline::{
control::{
request::{self, any::AnyAnonymousControlRequest},
AnonymousControlHandle,
},
elements::{
output::builder::{BlockingOutputBuilder, OutputBuilder},
source::{
builder::{ManagedSourceBuilder, SourceBuilder},
trigger,
},
transform::builder::TransformBuilder,
},
matching::{OutputNamePattern, SourceNamePattern, TransformNamePattern},
naming::{OutputName, PluginName, SourceName, TransformName},
Output,
},
};
mod pretty;
mod wrapped_output;
mod wrapped_source;
mod wrapped_transform;
#[derive(Default)]
pub struct RuntimeExpectations {
auto_shutdown: bool,
sources: FxHashMap<SourceName, Vec<SourceCheck>>,
transforms: FxHashMap<TransformName, Vec<TransformCheck>>,
outputs: FxHashMap<OutputName, Vec<OutputCheck>>,
}
pub(super) const TESTER_SOURCE_NAME: &str = "_tester";
pub(super) const TESTER_OUTPUT_NAME: &str = "_keep_alive";
pub(super) const TESTER_PLUGIN_NAME: &str = "_test_runtime_expectations";
const CONTROL_TIMEOUT: Duration = Duration::from_millis(100);
type TestControllerMap<N, C> = Rc<RefCell<FxHashMap<N, C>>>;
struct SourceTestController {
checks: Vec<SourceCheck>,
set_tx: tokio::sync::mpsc::Sender<SetSourceCheck>,
done_rx: tokio::sync::mpsc::Receiver<SourceDone>,
}
struct TransformTestController {
checks: Vec<TransformCheck>,
set_tx: tokio::sync::mpsc::Sender<SetTransformOutputCheck>,
done_rx: tokio::sync::mpsc::Receiver<TransformDone>,
}
struct OutputTestController {
checks: Vec<OutputCheck>,
set_tx: tokio::sync::mpsc::Sender<SetOutputOutputCheck>,
done_rx: tokio::sync::mpsc::Receiver<OutputDone>,
}
impl TestExpectations for RuntimeExpectations {
fn setup(mut self, mut builder: crate::agent::Builder) -> crate::agent::Builder {
let source_tests: TestControllerMap<SourceName, SourceTestController> =
Rc::new(RefCell::new(FxHashMap::default()));
let transform_tests: TestControllerMap<TransformName, TransformTestController> =
Rc::new(RefCell::new(FxHashMap::default()));
let output_tests: TestControllerMap<OutputName, OutputTestController> =
Rc::new(RefCell::new(FxHashMap::default()));
let (tester_tx, mut tester_rx) = tokio::sync::mpsc::channel(2);
fn wrap_managed_source_builder(
name: SourceName,
checks: Vec<SourceCheck>,
builder: Box<dyn ManagedSourceBuilder>,
controllers: TestControllerMap<SourceName, SourceTestController>,
) -> Box<dyn ManagedSourceBuilder> {
Box::new(move |ctx| {
let mut source = builder(ctx)?;
source.trigger_spec = trigger::builder::manual() .flush_rounds(1) .update_rounds(1) .build()?;
log::trace!("trigger of {name} replaced by: {:?}", source.trigger_spec);
let (set_tx, set_rx) = tokio::sync::mpsc::channel(1);
let (done_tx, done_rx) = tokio::sync::mpsc::channel(1);
controllers.borrow_mut().insert(
name,
SourceTestController {
checks,
set_tx,
done_rx,
},
);
source.source = Box::new(WrappedManagedSource {
source: source.source,
in_rx: set_rx,
out_tx: done_tx,
});
Ok(source)
})
}
fn wrap_transform_builder(
name: TransformName,
checks: Vec<TransformCheck>,
builder: Box<dyn TransformBuilder>,
controllers: TestControllerMap<TransformName, TransformTestController>,
) -> Box<dyn TransformBuilder> {
Box::new(move |ctx| {
let transform = builder(ctx)?;
let (set_tx, set_rx) = tokio::sync::mpsc::channel(1);
let (done_tx, done_rx) = tokio::sync::mpsc::channel(1);
controllers.borrow_mut().insert(
name,
TransformTestController {
checks,
set_tx,
done_rx,
},
);
let transform = Box::new(WrappedTransform {
transform,
set_rx,
done_tx,
});
Ok(transform)
})
}
fn wrap_blocking_output_builder(
name: OutputName,
checks: Vec<OutputCheck>,
builder: Box<dyn BlockingOutputBuilder>,
controllers: TestControllerMap<OutputName, OutputTestController>,
) -> Box<dyn BlockingOutputBuilder> {
Box::new(move |ctx| {
let output = builder(ctx)?;
let (set_tx, set_rx) = tokio::sync::mpsc::channel(1);
let (done_tx, done_rx) = tokio::sync::mpsc::channel(1);
controllers.borrow_mut().insert(
name,
OutputTestController {
checks,
set_tx,
done_rx,
},
);
let output = Box::new(WrappedOutput {
output,
set_rx,
done_tx,
});
Ok(output)
})
}
let source_tests_before = source_tests.clone();
let transform_tests_before = transform_tests.clone();
let output_tests_before = output_tests.clone();
builder = builder.before_operation_begin(move |pipeline| {
pipeline.replace_sources(|name, builder| {
log::debug!("preparing {name} for testing");
let checks = self.sources.remove(&name).unwrap_or_default();
match builder {
SourceBuilder::Managed(builder) => {
let wrapped = wrap_managed_source_builder(
name,
checks,
builder,
source_tests_before.clone(),
);
SourceBuilder::Managed(wrapped)
},
a @ SourceBuilder::Autonomous(_) => a,
}
});
log::debug!("adding test-controlled source {TESTER_SOURCE_NAME}");
pipeline
.add_source_builder(
PluginName(TESTER_PLUGIN_NAME.to_owned()),
TESTER_SOURCE_NAME,
SourceBuilder::Autonomous(Box::new(|_ctx, cancel, tx| {
Ok(Box::pin(async move {
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
log::debug!("{TESTER_SOURCE_NAME} has been cancelled");
break;
},
m = tester_rx.recv() => {
if let Some(measurements) = m {
log::debug!("{TESTER_SOURCE_NAME} sends new measurements: {measurements:?}");
tx.send(measurements).await.unwrap();
} else {
log::debug!("{TESTER_SOURCE_NAME} channel sender has been closed");
break;
}
}
}
}
Ok(())
}))
})),
)
.unwrap();
pipeline.replace_transforms(|name, builder| {
log::debug!("preparing {name} for testing");
let checks = self.transforms.remove(&name).unwrap_or_default();
wrap_transform_builder(name, checks, builder, transform_tests_before.clone())
});
pipeline.replace_outputs(|name, builder| {
log::debug!("preparing {name} for testing");
match self.outputs.remove(&name) {
Some(checks) => match builder {
OutputBuilder::Blocking(b) => OutputBuilder::Blocking(wrap_blocking_output_builder(
name,
checks,
b,
output_tests_before.clone(),
)),
a @ OutputBuilder::Async(_) => a,
},
None => builder,
}
});
log::debug!("adding test-controlled output {TESTER_OUTPUT_NAME}");
pipeline.add_output_builder(PluginName(TESTER_PLUGIN_NAME.to_owned()), TESTER_OUTPUT_NAME, OutputBuilder::Blocking(Box::new(|_ctx| {
use crate::pipeline::elements::output::OutputContext;
use crate::pipeline::elements::error::WriteError;
struct DummyOutput;
impl Output for DummyOutput {
fn write(&mut self, _measurements: &MeasurementBuffer, _ctx: &OutputContext) -> Result<(), WriteError> {
Ok(())
}
}
Ok(Box::new(DummyOutput))
}))).unwrap();
});
*builder.pipeline().allow_simplified_pipeline() = false;
builder.after_operation_begin(move |pipeline| {
let control = pipeline.control_handle();
let mr = pipeline.metrics_reader().clone();
let source_tests = source_tests.take();
let transform_tests = transform_tests.take();
let output_tests = output_tests.take();
log::debug!(
"source_tests: {}",
source_tests
.keys()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(", ")
);
log::debug!(
"transform_tests: {}",
transform_tests
.keys()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(", ")
);
log::debug!(
"output_tests: {}",
output_tests
.keys()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(", ")
);
async fn send_requests(control: &AnonymousControlHandle, requests: Vec<AnyAnonymousControlRequest>) {
for r in requests {
control
.send_wait(r, CONTROL_TIMEOUT)
.await
.expect("control request failed");
}
}
let task = async move {
let requests: Vec<AnyAnonymousControlRequest> = vec![
request::source(SourceNamePattern::wildcard()).disable().into(),
request::transform(TransformNamePattern::wildcard()).disable().into(),
request::output(OutputNamePattern::wildcard()).disable().into(),
request::output(OutputNamePattern::exact(TESTER_PLUGIN_NAME, TESTER_OUTPUT_NAME))
.enable()
.into(),
];
send_requests(&control, requests).await;
for (name, controller) in source_tests.into_iter() {
let SourceTestController {
checks,
set_tx,
mut done_rx,
} = controller;
if !checks.is_empty() {
log::debug!("Checking {name}...");
}
control
.send_wait(request::source(name.clone()).enable(), CONTROL_TIMEOUT)
.await
.unwrap();
for check in checks {
set_tx.send(SetSourceCheck(check)).await.unwrap();
let trigger = request::source(name.clone()).trigger_now();
control.dispatch(trigger, CONTROL_TIMEOUT).await.unwrap();
if done_rx.recv().await.is_none() {
break;
}
}
control
.send_wait(request::source(name).disable(), CONTROL_TIMEOUT)
.await
.unwrap();
}
for (name, controller) in transform_tests.into_iter() {
let TransformTestController {
checks,
set_tx,
mut done_rx,
} = controller;
if !checks.is_empty() {
log::debug!("Checking {name}...");
}
control
.send_wait(request::transform(name.clone()).enable(), CONTROL_TIMEOUT)
.await
.unwrap();
for check in checks {
set_tx.send(SetTransformOutputCheck(check.check_output)).await.unwrap();
let lock = mr.read().await;
let mut ctx = TransformCheckInputContext { metrics: lock };
let test_data = (check.make_input)(&mut ctx);
tester_tx.send(test_data).await.unwrap();
if done_rx.recv().await.is_none() {
break;
}
}
control
.send_wait(request::transform(name.clone()).disable(), CONTROL_TIMEOUT)
.await
.unwrap();
}
for (name, controller) in output_tests.into_iter() {
let OutputTestController {
checks,
set_tx,
mut done_rx,
} = controller;
if !checks.is_empty() {
log::debug!("Checking {name}...");
}
control
.send_wait(request::output(name.clone()).enable_discard(), CONTROL_TIMEOUT)
.await
.unwrap();
for check in checks {
set_tx.send(SetOutputOutputCheck(check.check_output)).await.unwrap();
let lock = mr.read().await;
let mut ctx = OutputCheckInputContext { metrics: lock };
let test_data = (check.make_input)(&mut ctx);
tester_tx.send(test_data).await.unwrap();
if done_rx.recv().await.is_none() {
log::warn!("done_tx has been dropped");
break;
}
}
control
.send_wait(request::output(name.clone()).disable(), CONTROL_TIMEOUT)
.await
.unwrap();
}
if self.auto_shutdown {
log::debug!("Requesting shutdown...");
control.shutdown();
} else {
log::debug!("Not requesting shutdown. Do you shutdown the pipeline yourself?");
}
};
pipeline.async_runtime().spawn(task);
})
}
}
impl RuntimeExpectations {
pub fn new() -> Self {
Self {
auto_shutdown: true,
..Default::default()
}
}
pub fn auto_shutdown(mut self, auto_shutdown: bool) -> Self {
self.auto_shutdown = auto_shutdown;
self
}
pub fn test_source<Fi, Fo>(mut self, source: SourceName, make_input: Fi, check_output: Fo) -> Self
where
Fi: Fn() + Send + 'static,
Fo: Fn(&MeasurementBuffer) + Send + 'static,
{
let name = source.clone();
self.sources.entry(name).or_default().push(SourceCheck {
make_input: Box::new(make_input),
check_output: Box::new(check_output),
});
self
}
pub fn test_transform<Fi, Fo>(mut self, transform: TransformName, make_input: Fi, check_output: Fo) -> Self
where
Fi: Fn(&mut TransformCheckInputContext) -> MeasurementBuffer + Send + 'static,
Fo: Fn(&MeasurementBuffer) + Send + 'static,
{
let name = transform.clone();
self.transforms.entry(name).or_default().push(TransformCheck {
make_input: Box::new(make_input),
check_output: Box::new(check_output),
});
self
}
pub fn test_output<Fi, Fo>(mut self, output: OutputName, make_input: Fi, check_output: Fo) -> Self
where
Fi: Fn(&mut OutputCheckInputContext) -> MeasurementBuffer + Send + 'static,
Fo: Fn() + Send + 'static,
{
let name = output.clone();
self.outputs.entry(name).or_default().push(OutputCheck {
make_input: Box::new(make_input),
check_output: Box::new(check_output),
});
self
}
}
pub struct SourceCheck {
make_input: Box<dyn Fn() + Send>,
check_output: Box<dyn Fn(&MeasurementBuffer) + Send>,
}
pub struct TransformCheck {
make_input: Box<dyn Fn(&mut TransformCheckInputContext) -> MeasurementBuffer + Send>,
check_output: Box<dyn Fn(&MeasurementBuffer) + Send>,
}
pub struct OutputCheck {
make_input: Box<dyn Fn(&mut OutputCheckInputContext) -> MeasurementBuffer + Send>,
check_output: Box<dyn Fn() + Send>,
}
pub struct TransformCheckInputContext<'a> {
metrics: RwLockReadGuard<'a, MetricRegistry>,
}
impl<'a> TransformCheckInputContext<'a> {
pub fn metrics(&'a self) -> &'a MetricRegistry {
self.metrics.deref()
}
}
pub struct OutputCheckInputContext<'a> {
metrics: RwLockReadGuard<'a, MetricRegistry>,
}
impl<'a> OutputCheckInputContext<'a> {
pub fn metrics(&'a self) -> &'a MetricRegistry {
self.metrics.deref()
}
}