obzenflow_runtime 0.1.2

Runtime services for ObzenFlow - execution and coordination business logic
Documentation
// SPDX-License-Identifier: MIT OR Apache-2.0
// SPDX-FileCopyrightText: 2025-2026 ObzenFlow Contributors
// https://obzenflow.dev

//! Builder for creating MetricsAggregator with proper FSM lifecycle
//!
//! This builder ensures the metrics aggregator is created and started correctly
//! according to the FSM architecture patterns, returning only a handle for control.

use super::{
    config::DefaultMetricsConfig,
    fsm::{MetricsAggregatorContext, MetricsAggregatorEvent, MetricsAggregatorState},
    inputs::MetricsInputs,
    supervisor::MetricsAggregatorSupervisor,
};
use crate::supervised_base::{
    BuilderError, ChannelBuilder, HandleBuilder, SelfSupervisedExt, StandardHandle,
    SupervisorBuilder, SupervisorTaskBuilder,
};
use obzenflow_core::{
    event::SystemEvent,
    journal::Journal,
    metrics::{MetricsExporter, StageMetadata},
    StageId,
};
use std::collections::HashMap;
use std::sync::Arc;

/// Builder for creating a metrics aggregator with proper FSM lifecycle
pub struct MetricsAggregatorBuilder {
    /// Metrics inputs containing stage and system journals
    inputs: MetricsInputs,

    /// System journal for reporting
    system_journal: Arc<dyn Journal<SystemEvent>>,

    /// Metrics exporter
    exporter: Arc<dyn MetricsExporter>,

    /// Stage metadata for display and categorization
    stage_metadata: HashMap<StageId, StageMetadata>,

    config: DefaultMetricsConfig,
    export_interval_secs: u64,
}

impl MetricsAggregatorBuilder {
    /// Create a new metrics aggregator builder with MetricsInputs
    pub fn new(
        inputs: MetricsInputs,
        system_journal: Arc<dyn Journal<SystemEvent>>,
        exporter: Arc<dyn MetricsExporter>,
    ) -> Self {
        Self {
            inputs,
            system_journal,
            exporter,
            stage_metadata: HashMap::new(),
            config: DefaultMetricsConfig::default(),
            export_interval_secs: 10, // Default to 10 seconds
        }
    }

    /// Set a custom configuration
    pub fn with_config(mut self, config: DefaultMetricsConfig) -> Self {
        self.config = config;
        self
    }

    /// Set the export interval in seconds
    pub fn with_export_interval(mut self, seconds: u64) -> Self {
        self.export_interval_secs = seconds;
        self
    }

    /// Set stage metadata for display and categorization
    pub fn with_stage_metadata(mut self, metadata: HashMap<StageId, StageMetadata>) -> Self {
        self.stage_metadata = metadata;
        self
    }
}

#[async_trait::async_trait]
impl SupervisorBuilder for MetricsAggregatorBuilder {
    type Handle = StandardHandle<MetricsAggregatorEvent, MetricsAggregatorState>;
    type Error = BuilderError;

    async fn build(self) -> Result<Self::Handle, Self::Error> {
        // Create system ID for metrics aggregator
        let system_id = obzenflow_core::id::SystemId::new();

        // Create metrics context with all mutable state
        let (metrics_context, metrics_io) = MetricsAggregatorContext::new(
            self.inputs.clone(),
            self.system_journal.clone(),
            Some(self.exporter),
            self.export_interval_secs,
            system_id,
            self.stage_metadata,
        )
        .await
        .map_err(BuilderError::Other)?;

        // Create channels for supervisor communication
        // Even though metrics runs autonomously, we still create channels
        // for consistency and potential future use
        let (event_sender, _event_receiver, state_watcher) =
            ChannelBuilder::<MetricsAggregatorEvent, MetricsAggregatorState>::new()
                .with_event_buffer(10) // Small buffer, rarely used
                .build(MetricsAggregatorState::Initializing);

        // Create supervisor (private struct)
        let supervisor = MetricsAggregatorSupervisor {
            name: "metrics_aggregator".to_string(),
            system_journal: self.system_journal.clone(),
            system_id,
            data_subscription: Some(metrics_io.data_subscription),
            error_subscription: metrics_io.error_subscription,
            system_subscription: Some(metrics_io.system_subscription),
            export_timer: None,
            state_watcher: state_watcher.clone(),
            last_state: Some(MetricsAggregatorState::Initializing),
        };

        // Spawn the supervisor task
        let supervisor_task = SupervisorTaskBuilder::<MetricsAggregatorSupervisor>::new(
            "metrics_aggregator",
        )
        .spawn(move || async move {
            // Run the supervisor directly. Metrics does not use external events.
            SelfSupervisedExt::run(
                supervisor,
                MetricsAggregatorState::Initializing,
                metrics_context,
            )
            .await
        });

        // Build and return the standard handle
        HandleBuilder::new()
            .with_event_sender(event_sender)
            .with_state_watcher(state_watcher)
            .with_supervisor_task(supervisor_task)
            .build_standard()
            .map_err(|e| BuilderError::Other(e.to_string()))
    }
}