use super::{
config::DefaultMetricsConfig,
fsm::{MetricsAggregatorContext, MetricsAggregatorEvent, MetricsAggregatorState},
inputs::MetricsInputs,
supervisor::MetricsAggregatorSupervisor,
};
use crate::supervised_base::{
BuilderError, ChannelBuilder, HandleBuilder, SelfSupervisedExt, StandardHandle,
SupervisorBuilder, SupervisorTaskBuilder,
};
use obzenflow_core::{
event::SystemEvent,
journal::Journal,
metrics::{MetricsExporter, StageMetadata},
StageId,
};
use std::collections::HashMap;
use std::sync::Arc;
pub struct MetricsAggregatorBuilder {
inputs: MetricsInputs,
system_journal: Arc<dyn Journal<SystemEvent>>,
exporter: Arc<dyn MetricsExporter>,
stage_metadata: HashMap<StageId, StageMetadata>,
config: DefaultMetricsConfig,
export_interval_secs: u64,
}
impl MetricsAggregatorBuilder {
pub fn new(
inputs: MetricsInputs,
system_journal: Arc<dyn Journal<SystemEvent>>,
exporter: Arc<dyn MetricsExporter>,
) -> Self {
Self {
inputs,
system_journal,
exporter,
stage_metadata: HashMap::new(),
config: DefaultMetricsConfig::default(),
export_interval_secs: 10, }
}
pub fn with_config(mut self, config: DefaultMetricsConfig) -> Self {
self.config = config;
self
}
pub fn with_export_interval(mut self, seconds: u64) -> Self {
self.export_interval_secs = seconds;
self
}
pub fn with_stage_metadata(mut self, metadata: HashMap<StageId, StageMetadata>) -> Self {
self.stage_metadata = metadata;
self
}
}
#[async_trait::async_trait]
impl SupervisorBuilder for MetricsAggregatorBuilder {
type Handle = StandardHandle<MetricsAggregatorEvent, MetricsAggregatorState>;
type Error = BuilderError;
async fn build(self) -> Result<Self::Handle, Self::Error> {
let system_id = obzenflow_core::id::SystemId::new();
let (metrics_context, metrics_io) = MetricsAggregatorContext::new(
self.inputs.clone(),
self.system_journal.clone(),
Some(self.exporter),
self.export_interval_secs,
system_id,
self.stage_metadata,
)
.await
.map_err(BuilderError::Other)?;
let (event_sender, _event_receiver, state_watcher) =
ChannelBuilder::<MetricsAggregatorEvent, MetricsAggregatorState>::new()
.with_event_buffer(10) .build(MetricsAggregatorState::Initializing);
let supervisor = MetricsAggregatorSupervisor {
name: "metrics_aggregator".to_string(),
system_journal: self.system_journal.clone(),
system_id,
data_subscription: Some(metrics_io.data_subscription),
error_subscription: metrics_io.error_subscription,
system_subscription: Some(metrics_io.system_subscription),
export_timer: None,
state_watcher: state_watcher.clone(),
last_state: Some(MetricsAggregatorState::Initializing),
};
let supervisor_task = SupervisorTaskBuilder::<MetricsAggregatorSupervisor>::new(
"metrics_aggregator",
)
.spawn(move || async move {
SelfSupervisedExt::run(
supervisor,
MetricsAggregatorState::Initializing,
metrics_context,
)
.await
});
HandleBuilder::new()
.with_event_sender(event_sender)
.with_state_watcher(state_watcher)
.with_supervisor_task(supervisor_task)
.build_standard()
.map_err(|e| BuilderError::Other(e.to_string()))
}
}