Skip to main content

loa_core/core/metrics/
mod.rs

1//! Metrics Supervisor - Manages metrics collection workers
2//!
3//! This supervisor:
4//! - Receives Arc<Storage> from Database actor
5//! - Spawns SystemCollector (always enabled)
6//! - Spawns NativeSink for cloud metrics export
7
8pub mod downsample;
9pub mod native_sink;
10pub mod system;
11
12use ractor::{async_trait, registry, Actor, ActorCell, ActorProcessingErr, ActorRef, SpawnErr};
13use std::sync::Arc;
14use tsink::Storage;
15
16// ============================================================================
17// STATE
18// ============================================================================
19
20pub struct MetricsSupervisorState {
21    storage: Option<Arc<dyn Storage>>,
22    system_collector_spawned: bool,
23    native_sink_spawned: bool,
24}
25
26// ============================================================================
27// MESSAGES
28// ============================================================================
29
30pub enum MetricsMessage {
31    /// Initialize with storage and spawn collectors
32    Initialize { storage: Arc<dyn Storage> },
33    /// Spawn NativeSink with config-based parameters
34    SpawnNativeSink {
35        interval_secs: u64,
36        lttb_threshold: usize,
37    },
38}
39
40impl std::fmt::Debug for MetricsMessage {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            Self::Initialize { .. } => write!(f, "MetricsMessage::Initialize {{ .. }}"),
44            Self::SpawnNativeSink { interval_secs, lttb_threshold } => {
45                write!(f, "MetricsMessage::SpawnNativeSink {{ interval_secs: {}, lttb_threshold: {} }}", interval_secs, lttb_threshold)
46            }
47        }
48    }
49}
50
51impl Clone for MetricsMessage {
52    fn clone(&self) -> Self {
53        match self {
54            Self::Initialize { storage } => Self::Initialize {
55                storage: storage.clone(),
56            },
57            Self::SpawnNativeSink { interval_secs, lttb_threshold } => Self::SpawnNativeSink {
58                interval_secs: *interval_secs,
59                lttb_threshold: *lttb_threshold,
60            },
61        }
62    }
63}
64
65// ============================================================================
66// ACTOR IMPLEMENTATION
67// ============================================================================
68
69pub struct MetricsSupervisor;
70
71#[async_trait]
72impl Actor for MetricsSupervisor {
73    type Msg = MetricsMessage;
74    type State = MetricsSupervisorState;
75    type Arguments = ();
76
77    async fn pre_start(
78        &self,
79        _myself: ActorRef<Self::Msg>,
80        _args: Self::Arguments,
81    ) -> Result<Self::State, ActorProcessingErr> {
82        tracing::debug!("MetricsSupervisor starting");
83
84        Ok(MetricsSupervisorState {
85            storage: None,
86            system_collector_spawned: false,
87            native_sink_spawned: false,
88        })
89    }
90
91    async fn post_start(
92        &self,
93        _myself: ActorRef<Self::Msg>,
94        state: &mut Self::State,
95    ) -> Result<(), ActorProcessingErr> {
96        tracing::debug!("MetricsSupervisor starting, requesting storage from Database");
97
98        // Look up Database actor
99        let database_cell = registry::where_is("Database".to_string())
100            .ok_or_else(|| ActorProcessingErr::from("Database actor not found in registry"))?;
101
102        let database_ref: ActorRef<crate::core::database::DatabaseMessage> = database_cell.into();
103
104        // Request Arc<Storage> from Database
105        let (tx, rx) = tokio::sync::oneshot::channel();
106        database_ref
107            .cast(crate::core::database::DatabaseMessage::GetStorage { reply: tx })
108            .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
109
110        // Wait for storage
111        let storage = rx
112            .await
113            .map_err(|e| ActorProcessingErr::from(format!("Failed to receive storage: {}", e)))?;
114
115        tracing::debug!("MetricsSupervisor received Arc<Storage>, spawning collectors");
116
117        state.storage = Some(storage.clone());
118
119        // Spawn SystemCollector (always enabled)
120        if !state.system_collector_spawned {
121            system::spawn_system_collector(storage.clone())
122                .await
123                .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
124            state.system_collector_spawned = true;
125            tracing::debug!("SystemCollector spawned (1-second collection interval)");
126        }
127
128        // Spawn NativeSink conditionally based on config
129        // Note: NativeSink will be spawned by Config actor after receiving remote config
130        if !state.native_sink_spawned {
131            tracing::debug!("NativeSink spawn will wait for remote config (tier-based)");
132        }
133
134        tracing::info!("Metrics collection started (SystemCollector: 1s interval)");
135        Ok(())
136    }
137
138    async fn handle(
139        &self,
140        _myself: ActorRef<Self::Msg>,
141        message: Self::Msg,
142        state: &mut Self::State,
143    ) -> Result<(), ActorProcessingErr> {
144        match message {
145            MetricsMessage::Initialize { storage: _ } => {
146                // No-op: MetricsSupervisor now auto-initializes in post_start()
147                tracing::debug!("MetricsMessage::Initialize received (deprecated - auto-initializes in post_start)");
148            }
149
150            MetricsMessage::SpawnNativeSink { interval_secs, lttb_threshold } => {
151                if state.native_sink_spawned {
152                    tracing::warn!("NativeSink already spawned, ignoring duplicate spawn request");
153                    return Ok(());
154                }
155
156                if let Some(storage) = &state.storage {
157                    tracing::info!(
158                        "Spawning NativeSink with interval: {}s, lttb_threshold: {}",
159                        interval_secs,
160                        lttb_threshold
161                    );
162
163                    // Spawn NativeSink with config-based parameters
164                    native_sink::spawn_native_sink(storage.clone(), interval_secs, lttb_threshold)
165                        .await
166                        .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
167
168                    state.native_sink_spawned = true;
169                    tracing::info!("NativeSink spawned successfully");
170                } else {
171                    tracing::warn!("Cannot spawn NativeSink: storage not initialized");
172                }
173            }
174        }
175
176        Ok(())
177    }
178
179    async fn post_stop(
180        &self,
181        _myself: ActorRef<Self::Msg>,
182        _state: &mut Self::State,
183    ) -> Result<(), ActorProcessingErr> {
184        tracing::info!("MetricsSupervisor stopped");
185        Ok(())
186    }
187}
188
189// ============================================================================
190// SPAWN HELPER
191// ============================================================================
192
193/// Spawn the MetricsSupervisor actor
194///
195/// Returns an ActorCell for the supervisor to manage.
196pub async fn spawn_metrics(
197    supervisor_cell: ActorCell,
198    child_id: String,
199) -> Result<ActorCell, SpawnErr> {
200    tracing::debug!("Spawning MetricsSupervisor as child '{}'", child_id);
201
202    let (actor_ref, _handle) =
203        Actor::spawn(Some("MetricsSupervisor".to_string()), MetricsSupervisor, ()).await?;
204
205    // Link to supervisor
206    actor_ref.link(supervisor_cell.clone());
207
208    tracing::debug!("✓ MetricsSupervisor spawned successfully");
209
210    Ok(actor_ref.get_cell())
211}