loa_core/core/metrics/
mod.rs1pub mod downsample;
9pub mod native_sink;
10pub mod system;
11
12use ractor::{async_trait, registry, Actor, ActorCell, ActorProcessingErr, ActorRef, SpawnErr};
13use std::sync::Arc;
14use tsink::Storage;
15
16pub struct MetricsSupervisorState {
21 storage: Option<Arc<dyn Storage>>,
22 system_collector_spawned: bool,
23 native_sink_spawned: bool,
24}
25
26pub enum MetricsMessage {
31 Initialize { storage: Arc<dyn Storage> },
33 SpawnNativeSink {
35 interval_secs: u64,
36 lttb_threshold: usize,
37 },
38}
39
40impl std::fmt::Debug for MetricsMessage {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Self::Initialize { .. } => write!(f, "MetricsMessage::Initialize {{ .. }}"),
44 Self::SpawnNativeSink { interval_secs, lttb_threshold } => {
45 write!(f, "MetricsMessage::SpawnNativeSink {{ interval_secs: {}, lttb_threshold: {} }}", interval_secs, lttb_threshold)
46 }
47 }
48 }
49}
50
51impl Clone for MetricsMessage {
52 fn clone(&self) -> Self {
53 match self {
54 Self::Initialize { storage } => Self::Initialize {
55 storage: storage.clone(),
56 },
57 Self::SpawnNativeSink { interval_secs, lttb_threshold } => Self::SpawnNativeSink {
58 interval_secs: *interval_secs,
59 lttb_threshold: *lttb_threshold,
60 },
61 }
62 }
63}
64
65pub struct MetricsSupervisor;
70
71#[async_trait]
72impl Actor for MetricsSupervisor {
73 type Msg = MetricsMessage;
74 type State = MetricsSupervisorState;
75 type Arguments = ();
76
77 async fn pre_start(
78 &self,
79 _myself: ActorRef<Self::Msg>,
80 _args: Self::Arguments,
81 ) -> Result<Self::State, ActorProcessingErr> {
82 tracing::debug!("MetricsSupervisor starting");
83
84 Ok(MetricsSupervisorState {
85 storage: None,
86 system_collector_spawned: false,
87 native_sink_spawned: false,
88 })
89 }
90
91 async fn post_start(
92 &self,
93 _myself: ActorRef<Self::Msg>,
94 state: &mut Self::State,
95 ) -> Result<(), ActorProcessingErr> {
96 tracing::debug!("MetricsSupervisor starting, requesting storage from Database");
97
98 let database_cell = registry::where_is("Database".to_string())
100 .ok_or_else(|| ActorProcessingErr::from("Database actor not found in registry"))?;
101
102 let database_ref: ActorRef<crate::core::database::DatabaseMessage> = database_cell.into();
103
104 let (tx, rx) = tokio::sync::oneshot::channel();
106 database_ref
107 .cast(crate::core::database::DatabaseMessage::GetStorage { reply: tx })
108 .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
109
110 let storage = rx
112 .await
113 .map_err(|e| ActorProcessingErr::from(format!("Failed to receive storage: {}", e)))?;
114
115 tracing::debug!("MetricsSupervisor received Arc<Storage>, spawning collectors");
116
117 state.storage = Some(storage.clone());
118
119 if !state.system_collector_spawned {
121 system::spawn_system_collector(storage.clone())
122 .await
123 .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
124 state.system_collector_spawned = true;
125 tracing::debug!("SystemCollector spawned (1-second collection interval)");
126 }
127
128 if !state.native_sink_spawned {
131 tracing::debug!("NativeSink spawn will wait for remote config (tier-based)");
132 }
133
134 tracing::info!("Metrics collection started (SystemCollector: 1s interval)");
135 Ok(())
136 }
137
138 async fn handle(
139 &self,
140 _myself: ActorRef<Self::Msg>,
141 message: Self::Msg,
142 state: &mut Self::State,
143 ) -> Result<(), ActorProcessingErr> {
144 match message {
145 MetricsMessage::Initialize { storage: _ } => {
146 tracing::debug!("MetricsMessage::Initialize received (deprecated - auto-initializes in post_start)");
148 }
149
150 MetricsMessage::SpawnNativeSink { interval_secs, lttb_threshold } => {
151 if state.native_sink_spawned {
152 tracing::warn!("NativeSink already spawned, ignoring duplicate spawn request");
153 return Ok(());
154 }
155
156 if let Some(storage) = &state.storage {
157 tracing::info!(
158 "Spawning NativeSink with interval: {}s, lttb_threshold: {}",
159 interval_secs,
160 lttb_threshold
161 );
162
163 native_sink::spawn_native_sink(storage.clone(), interval_secs, lttb_threshold)
165 .await
166 .map_err(|e| ActorProcessingErr::from(e.to_string()))?;
167
168 state.native_sink_spawned = true;
169 tracing::info!("NativeSink spawned successfully");
170 } else {
171 tracing::warn!("Cannot spawn NativeSink: storage not initialized");
172 }
173 }
174 }
175
176 Ok(())
177 }
178
179 async fn post_stop(
180 &self,
181 _myself: ActorRef<Self::Msg>,
182 _state: &mut Self::State,
183 ) -> Result<(), ActorProcessingErr> {
184 tracing::info!("MetricsSupervisor stopped");
185 Ok(())
186 }
187}
188
189pub async fn spawn_metrics(
197 supervisor_cell: ActorCell,
198 child_id: String,
199) -> Result<ActorCell, SpawnErr> {
200 tracing::debug!("Spawning MetricsSupervisor as child '{}'", child_id);
201
202 let (actor_ref, _handle) =
203 Actor::spawn(Some("MetricsSupervisor".to_string()), MetricsSupervisor, ()).await?;
204
205 actor_ref.link(supervisor_cell.clone());
207
208 tracing::debug!("✓ MetricsSupervisor spawned successfully");
209
210 Ok(actor_ref.get_cell())
211}