use std::{collections::HashMap, ops::DerefMut, time::Duration};
use anyhow::{anyhow, Context};
use thiserror::Error;
use crate::agent::plugin::PluginInfo;
use crate::pipeline::error::PipelineError;
use crate::plugin::phases::PreStartAction;
use crate::plugin::{AlumetPluginStart, AlumetPostStart, ConfigTable, Plugin};
use crate::{
pipeline::{self, naming::PluginName},
plugin::{phases::PostStartAction, AlumetPreStart},
};
use super::plugin::PluginSet;
pub struct RunningAgent {
pub pipeline: pipeline::MeasurementPipeline,
pub initialized_plugins: Vec<Box<dyn Plugin>>,
}
pub struct Builder {
plugins: PluginSet,
pipeline_builder: pipeline::Builder,
callbacks: Callbacks,
}
struct Callbacks {
after_plugins_init: Box<dyn FnOnce(&mut Vec<Box<dyn Plugin>>)>,
after_plugins_start: Box<dyn FnOnce(&mut pipeline::Builder)>,
before_operation_begin: Box<dyn FnOnce(&mut pipeline::Builder)>,
after_operation_begin: Box<dyn FnOnce(&mut pipeline::MeasurementPipeline)>,
}
#[derive(Debug, Error)]
pub enum AgentShutdownError {
#[error("error detected while shutting down the pipeline")]
Pipeline(#[source] PipelineError),
#[error("error while shutting down plugin {0}")]
Plugin(String),
#[error("agent shutdown timeout expired")]
TimeoutExpired,
}
#[derive(Debug, Error)]
#[error("{} errors while shutting the agent down", errors.len())]
pub struct ShutdownError {
pub errors: Vec<AgentShutdownError>,
}
#[cfg(feature = "test")]
pub trait TestExpectations {
fn setup(self, builder: Builder) -> Builder;
}
impl Default for Callbacks {
fn default() -> Self {
Self {
after_plugins_init: Box::new(|_| ()),
after_plugins_start: Box::new(|_| ()),
before_operation_begin: Box::new(|_| ()),
after_operation_begin: Box::new(|_| ()),
}
}
}
impl Builder {
pub fn new(plugins: PluginSet) -> Self {
Self::from_pipeline(plugins, pipeline::Builder::new())
}
pub fn from_pipeline(plugins: PluginSet, pipeline_builder: pipeline::Builder) -> Self {
Self {
plugins,
pipeline_builder,
callbacks: Callbacks::default(),
}
}
pub fn after_plugins_init<F: FnOnce(&mut Vec<Box<dyn Plugin>>) + 'static>(mut self, f: F) -> Self {
self.callbacks.after_plugins_init = Box::new(f);
self
}
pub fn after_plugins_start<F: FnOnce(&mut pipeline::Builder) + 'static>(mut self, f: F) -> Self {
self.callbacks.after_plugins_start = Box::new(f);
self
}
pub fn before_operation_begin<F: FnOnce(&mut pipeline::Builder) + 'static>(mut self, f: F) -> Self {
self.callbacks.before_operation_begin = Box::new(f);
self
}
pub fn after_operation_begin<F: FnOnce(&mut pipeline::MeasurementPipeline) + 'static>(mut self, f: F) -> Self {
self.callbacks.after_operation_begin = Box::new(f);
self
}
pub fn build_and_start(self) -> anyhow::Result<RunningAgent> {
fn init_plugin(p: PluginInfo) -> anyhow::Result<Box<dyn Plugin>> {
let name = p.metadata.name;
let version = p.metadata.version;
let config = ConfigTable(p.config.unwrap_or_default());
log::debug!("Initializing plugin {name} v{version} with config {config:?}...");
let initialized = (p.metadata.init)(config)
.with_context(|| format!("plugin failed to initialize: {} v{}", name, version))?;
if (initialized.name(), initialized.version()) != (&name, &version) {
return Err(anyhow!("invalid plugin: metadata is '{name}' v{version} but the plugin's methods return '{name}' v{version}"));
}
Ok(initialized)
}
fn start_plugin(
p: &mut dyn Plugin,
pipeline_builder: &mut pipeline::Builder,
pre_start_actions: &mut Vec<(PluginName, Box<dyn PreStartAction>)>,
post_start_actions: &mut Vec<(PluginName, Box<dyn PostStartAction>)>,
) -> anyhow::Result<()> {
let name = p.name().to_owned();
let version = p.version().to_owned();
log::debug!("Starting plugin {name} v{version}...");
let mut ctx = AlumetPluginStart {
current_plugin: PluginName(name.clone()),
pipeline_builder,
pre_start_actions,
post_start_actions,
};
p.start(&mut ctx)
.with_context(|| format!("plugin failed to start: {name} v{version}"))
}
fn pre_pipeline_start(
p: &mut dyn Plugin,
pipeline_builder: &mut pipeline::Builder,
actions: &mut HashMap<PluginName, Vec<Box<dyn PreStartAction>>>,
) -> anyhow::Result<()> {
let name = p.name().to_owned();
let version = p.version().to_owned();
log::debug!("Running pre-pipeline-start hook for plugin {name} v{version}...");
let pname = PluginName(name.clone());
let mut ctx = AlumetPreStart {
current_plugin: pname.clone(),
pipeline_builder,
};
p.pre_pipeline_start(&mut ctx)
.with_context(|| format!("plugin pre_pipeline_start failed: {} v{}", p.name(), p.version()))?;
if let Some(actions) = actions.remove(&pname) {
for f in actions {
(f)(&mut ctx)
.with_context(|| format!("plugin post-pipeline-start action failed: {name} v{version}"))?;
}
}
Ok(())
}
fn post_pipeline_start(
p: &mut dyn Plugin,
pipeline: &mut pipeline::MeasurementPipeline,
actions: &mut HashMap<PluginName, Vec<Box<dyn PostStartAction>>>,
) -> anyhow::Result<()> {
let name = p.name().to_owned();
let version = p.version().to_owned();
log::debug!("Running post-pipeline-start hook for plugin {name} v{version}...");
let pname = PluginName(name.clone());
let mut ctx = AlumetPostStart {
current_plugin: pname.clone(),
pipeline,
};
p.post_pipeline_start(&mut ctx)
.with_context(|| format!("plugin post_pipeline_start method failed: {name} v{version}"))?;
if let Some(actions) = actions.remove(&pname) {
for f in actions {
(f)(&mut ctx)
.with_context(|| format!("plugin post-pipeline-start action failed: {name} v{version}"))?;
}
}
Ok(())
}
fn group_plugin_actions<BoxedAction>(
post_start_actions: Vec<(PluginName, BoxedAction)>,
n_plugins: usize,
) -> HashMap<PluginName, Vec<BoxedAction>> {
let mut res = HashMap::with_capacity(n_plugins);
for (plugin, action) in post_start_actions {
let plugin_actions: &mut Vec<_> = res.entry(plugin).or_default();
plugin_actions.push(action);
}
res
}
log::info!("Initializing the plugins...");
let (enabled_plugins, disabled_plugins): (Vec<PluginInfo>, Vec<PluginInfo>) = self.plugins.into_partition();
let initialized_plugins: anyhow::Result<Vec<Box<dyn Plugin>>> =
enabled_plugins.into_iter().map(init_plugin).collect();
let mut initialized_plugins = initialized_plugins?;
let n_plugins = initialized_plugins.len();
match n_plugins {
0 if disabled_plugins.is_empty() => log::warn!("No plugin has been initialized, there may be a problem with your agent implementation. Please check your builder."),
0 => log::warn!("No plugin has been initialized because they were all disabled in the config. Please check your configuration."),
1 => log::info!("1 plugin initialized."),
n => log::info!("{n} plugins initialized."),
};
(self.callbacks.after_plugins_init)(&mut initialized_plugins);
log::info!("Starting the plugins...");
let mut pipeline_builder = self.pipeline_builder;
let mut pre_start_actions = Vec::new();
let mut post_start_actions = Vec::new();
for plugin in initialized_plugins.iter_mut() {
start_plugin(
plugin.deref_mut(),
&mut pipeline_builder,
&mut pre_start_actions,
&mut post_start_actions,
)?;
}
print_stats(&mut pipeline_builder, &initialized_plugins, &disabled_plugins);
(self.callbacks.after_plugins_start)(&mut pipeline_builder);
log::info!("Running pre-pipeline-start hooks...");
let mut pre_actions_per_plugin = group_plugin_actions(pre_start_actions, n_plugins);
for plugin in initialized_plugins.iter_mut() {
pre_pipeline_start(plugin.deref_mut(), &mut pipeline_builder, &mut pre_actions_per_plugin)?;
}
(self.callbacks.before_operation_begin)(&mut pipeline_builder);
log::info!("Starting the measurement pipeline...");
let mut pipeline = pipeline_builder.build().context("Pipeline failed to build")?;
log::info!("🔥 ALUMET measurement pipeline has started.");
log::info!("Running post-pipeline-start hooks...");
let mut post_actions_per_plugin = group_plugin_actions(post_start_actions, n_plugins);
for plugin in initialized_plugins.iter_mut() {
post_pipeline_start(plugin.deref_mut(), &mut pipeline, &mut post_actions_per_plugin)?;
}
(self.callbacks.after_operation_begin)(&mut pipeline);
log::info!("🔥 ALUMET agent is ready.");
let agent = RunningAgent {
pipeline,
initialized_plugins,
};
Ok(agent)
}
#[cfg(feature = "test")]
pub fn with_expectations<E: TestExpectations>(self, expectations: E) -> Self {
expectations.setup(self)
}
#[cfg(feature = "test")]
pub fn pipeline(&mut self) -> &mut pipeline::Builder {
&mut self.pipeline_builder
}
}
impl RunningAgent {
pub fn wait_for_shutdown(self, timeout: Duration) -> Result<(), ShutdownError> {
use std::panic::{catch_unwind, AssertUnwindSafe};
let mut errors = Vec::new();
let timeout = Some(timeout).filter(|d| *d != Duration::MAX);
if let Err(e) = self.pipeline.wait_for_shutdown(timeout) {
match e {
pipeline::builder::ShutdownError::Pipeline(e) => errors.push(AgentShutdownError::Pipeline(e)),
pipeline::builder::ShutdownError::TimeoutExpired => errors.push(AgentShutdownError::TimeoutExpired),
}
}
log::info!("Stopping the plugins...");
for mut plugin in self.initialized_plugins {
let name = plugin.name().to_owned();
let version = plugin.version().to_owned();
log::info!("Stopping plugin {name} v{version}");
match catch_unwind(AssertUnwindSafe(move || {
plugin.stop()
})) {
Ok(Ok(())) => (),
Ok(Err(e)) => {
log::error!("Error while stopping plugin {name} v{version}. {e:?}");
errors.push(AgentShutdownError::Plugin(name));
}
Err(panic_payload) => {
log::error!(
"PANIC while stopping plugin {name} v{version}. There is probably a bug in the plugin!
Please check the implementation of stop (and drop if Drop is implemented for the plugin type)."
);
errors.push(AgentShutdownError::Plugin(name.clone()));
let dropped = catch_unwind(AssertUnwindSafe(move || {
drop(panic_payload);
}));
if let Err(panic2) = dropped {
log::error!(
"PANIC while dropping panic payload generated while stopping plugin {name} v{version}."
);
std::mem::forget(panic2);
}
}
}
}
log::info!("All plugins have stopped.");
if errors.is_empty() {
Ok(())
} else {
Err(ShutdownError { errors })
}
}
}
fn print_stats(
pipeline_builder: &mut pipeline::Builder,
enabled_plugins: &[Box<dyn Plugin>],
disabled_plugins: &[PluginInfo],
) {
macro_rules! pluralize {
($count:expr, $str:expr) => {
if $count > 1 {
concat!($str, "s")
} else {
$str
}
};
}
let enabled_list: String = enabled_plugins
.iter()
.map(|p| format!(" - {} v{}", p.name(), p.version()))
.collect::<Vec<_>>()
.join("\n");
let disabled_list: String = disabled_plugins
.iter()
.map(|p| format!(" - {} v {}", p.metadata.name, p.metadata.version))
.collect::<Vec<_>>()
.join("\n");
let n_enabled = enabled_plugins.len();
let n_disabled = disabled_plugins.len();
let enabled_str = pluralize!(n_enabled, "plugin");
let disabled_str = pluralize!(n_disabled, "plugin");
let metrics = &pipeline_builder.metrics;
let metric_list = if metrics.is_empty() {
String::from(" ∅")
} else {
let mut m = metrics
.iter()
.map(|(id, m)| (id, format!(" - {}: {} ({})", m.name, m.value_type, m.unit)))
.collect::<Vec<_>>();
m.sort_by_key(|(id, _)| id.0);
m.into_iter()
.map(|(_, metric_str)| metric_str)
.collect::<Vec<_>>()
.join("\n")
};
let stats = pipeline_builder.inspect().stats();
let n_sources = stats.sources;
let n_transforms = stats.transforms;
let n_outputs = stats.outputs;
let n_metric_listeners = stats.metric_listeners;
let source_str = pluralize!(n_sources, "source");
let transform_str = pluralize!(n_transforms, "transform");
let output_str = pluralize!(n_outputs, "output");
let metric_listener_str = pluralize!(n_metric_listeners, "metric listener");
let n_metrics = stats.metrics;
let str_metric = pluralize!(n_metrics, "metric");
let msg = indoc::formatdoc! {"
Plugin startup complete.
🧩 {n_enabled} {enabled_str} started:
{enabled_list}
⭕ {n_disabled} {disabled_str} disabled:
{disabled_list}
📏 {n_metrics} {str_metric} registered:
{metric_list}
📥 {n_sources} {source_str}, 🔀 {n_transforms} {transform_str} and 📝 {n_outputs} {output_str} registered.
🔔 {n_metric_listeners} {metric_listener_str} registered.
"
};
log::info!("{msg}");
}