1use ractor::factory::{FactoryMessage, Job, JobOptions};
9use ractor::{async_trait, registry, Actor, ActorProcessingErr, ActorRef, SpawnErr};
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone)]
20pub enum ConfigMessage {
21 FetchConfig,
23 ApplyConfig(serde_json::Value),
25 SpawnMetricsCollectors,
27 CheckTier(u32),
29}
30
31pub struct ConfigState {
36 #[allow(dead_code)]
37 identity: Arc<crate::AgentIdentity>,
38 peer_id: String,
39 api_url: String,
40 heartbeat_interval_secs: Option<u64>,
41 heartbeat_task: Option<tokio::task::JoinHandle<()>>,
42 tier_id: Option<u32>,
44 pub metrics_config: Option<MetricsConfig>,
46 http_client: reqwest_middleware::ClientWithMiddleware,
48}
49
50pub struct ConfigArgs {
52 pub identity: Arc<crate::AgentIdentity>,
53 pub api_url: Option<String>,
54}
55
56#[derive(Debug, Clone, Deserialize, Serialize)]
62pub struct NativeSinkConfig {
63 pub enabled: bool,
64 pub interval_secs: u64,
65 pub lttb_threshold: usize,
66}
67
68#[derive(Debug, Clone, Deserialize, Serialize)]
70pub struct MetricsConfig {
71 pub native: Option<NativeSinkConfig>,
72 }
74
75#[derive(Debug, Deserialize, Serialize)]
77struct ConfigResponse {
78 config: TierConfig,
79}
80
81#[derive(Debug, Deserialize, Serialize)]
83struct TierConfig {
84 tier_id: u32,
85 heartbeat: serde_json::Value,
86 metrics: MetricsConfig,
87 #[serde(rename = "boostMin")]
90 #[allow(dead_code)]
91 boost_min: u32,
92 #[serde(rename = "boostMax")]
93 #[allow(dead_code)]
94 boost_max: Option<u32>,
95}
96
97pub struct Config;
102
103#[async_trait]
104impl Actor for Config {
105 type Msg = ConfigMessage;
106 type State = ConfigState;
107 type Arguments = ConfigArgs;
108
109 async fn pre_start(
110 &self,
111 _myself: ActorRef<Self::Msg>,
112 args: Self::Arguments,
113 ) -> Result<Self::State, ActorProcessingErr> {
114 tracing::debug!("Config actor starting");
115 tracing::debug!(" PeerId: {}", args.identity.peer_id());
116
117 let peer_id = args.identity.peer_id().to_string();
118 let api_url = args.api_url.unwrap_or_else(crate::constants::api_url);
119
120 tracing::debug!(" API URL: {}", api_url);
121
122 Ok(ConfigState {
123 identity: args.identity,
124 peer_id,
125 api_url,
126 heartbeat_interval_secs: None,
127 heartbeat_task: None,
128 tier_id: None,
129 metrics_config: None,
130 http_client: crate::http::build_http_client(),
131 })
132 }
133
134 async fn post_start(
135 &self,
136 myself: ActorRef<Self::Msg>,
137 state: &mut Self::State,
138 ) -> Result<(), ActorProcessingErr> {
139 tracing::info!("Config actor ready - starting heartbeat loop");
140
141 let default_interval = crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS;
145 state.heartbeat_interval_secs = Some(default_interval);
146 let task = tokio::spawn(schedule_heartbeat_loop(default_interval));
147 state.heartbeat_task = Some(task);
148
149 tracing::info!("Started heartbeat loop (default: {}s)", default_interval);
150
151 if let Err(e) = myself.cast(ConfigMessage::FetchConfig) {
154 tracing::warn!("Config: Failed to trigger initial config fetch: {}", e);
155 }
156
157 Ok(())
158 }
159
160 async fn handle(
161 &self,
162 myself: ActorRef<Self::Msg>,
163 message: Self::Msg,
164 state: &mut Self::State,
165 ) -> Result<(), ActorProcessingErr> {
166 match message {
167 ConfigMessage::FetchConfig => {
168 let api_url = state.api_url.clone();
170 let peer_id = state.peer_id.clone();
171 let config_url = format!("{}/v1/agents/{}/config", api_url, peer_id);
172 let http_client = state.http_client.clone();
173
174 tracing::debug!("Config: Fetching config from {}", config_url);
175
176 let myself_clone = myself.clone();
177 tokio::spawn(async move {
178 match http_client.get(&config_url).send().await {
179 Ok(response) => {
180 if response.status().is_success() {
181 match response.json::<serde_json::Value>().await {
182 Ok(config) => {
183 tracing::debug!("Config: Successfully fetched config");
184 let _ =
185 myself_clone.cast(ConfigMessage::ApplyConfig(config));
186 }
187 Err(e) => {
188 tracing::warn!(
189 "Config: Failed to parse config response: {}",
190 e
191 );
192 }
193 }
194 } else {
195 tracing::warn!(
196 "Config: HTTP {} from config endpoint",
197 response.status()
198 );
199 }
200 }
201 Err(e) => {
202 tracing::warn!("Config: Failed to fetch config: {}", e);
203 }
204 }
205 });
206 }
207
208 ConfigMessage::ApplyConfig(config) => {
209 tracing::debug!("Config: Applying config update");
210
211 match serde_json::from_value::<ConfigResponse>(config) {
213 Ok(config_response) => {
214 tracing::debug!("Config: Parsed config successfully");
215 tracing::debug!(" Tier ID: {}", config_response.config.tier_id);
216
217 state.tier_id = Some(config_response.config.tier_id);
219
220 state.metrics_config = Some(config_response.config.metrics.clone());
222
223 let metrics_status =
225 if let Some(native) = &config_response.config.metrics.native {
226 if native.enabled {
227 format!("enabled ({}s interval)", native.interval_secs)
228 } else {
229 "disabled".to_string()
230 }
231 } else {
232 "disabled".to_string()
233 };
234
235 let interval_secs: u64 = config_response
237 .config
238 .heartbeat
239 .get("interval_secs")
240 .and_then(|v| v.as_u64())
241 .unwrap_or_else(|| {
242 tracing::warn!(
243 "Config: No interval_secs found in heartbeat config, using default {}s",
244 crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS
245 );
246 crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS
247 });
248
249 if let Some(task) = state.heartbeat_task.take() {
251 task.abort();
252 tracing::debug!("Config: Cancelled previous heartbeat task");
253 }
254
255 state.heartbeat_interval_secs = Some(interval_secs);
257 let task = tokio::spawn(async move {
258 schedule_heartbeat_loop(interval_secs).await;
259 });
260 state.heartbeat_task = Some(task);
261
262 tracing::info!(
264 "Loaded tier {} config (heartbeat: {}s, metrics: {})",
265 config_response.config.tier_id,
266 interval_secs,
267 metrics_status
268 );
269
270 if let Err(e) = myself.cast(ConfigMessage::SpawnMetricsCollectors) {
272 tracing::warn!("Config: Failed to trigger metrics spawn: {}", e);
273 }
274 }
275 Err(e) => {
276 tracing::warn!("Config: Failed to parse config response: {}", e);
277 tracing::info!(
278 "Config: Falling back to default heartbeat interval ({}s)",
279 crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS
280 );
281
282 let interval_secs = crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS;
284
285 if let Some(task) = state.heartbeat_task.take() {
287 task.abort();
288 tracing::debug!("Config: Cancelled previous heartbeat task");
289 }
290
291 state.heartbeat_interval_secs = Some(interval_secs);
293 let task = tokio::spawn(async move {
294 schedule_heartbeat_loop(interval_secs).await;
295 });
296 state.heartbeat_task = Some(task);
297 }
298 }
299 }
300
301 ConfigMessage::CheckTier(backend_tier_id) => {
302 tracing::debug!(
303 "Config: Checking tier - backend: {}, local: {:?}",
304 backend_tier_id,
305 state.tier_id
306 );
307
308 let should_fetch = match state.tier_id {
310 None => {
311 tracing::info!("Config: No local tier - fetching config");
312 true
313 }
314 Some(local_tier_id) => {
315 if backend_tier_id != local_tier_id {
316 tracing::info!(
317 "Config: Tier changed (local: {}, backend: {}) - fetching updated config",
318 local_tier_id,
319 backend_tier_id
320 );
321 true
322 } else {
323 tracing::debug!("Config: Tier {} up-to-date", local_tier_id);
324 false
325 }
326 }
327 };
328
329 if should_fetch {
330 if let Err(e) = myself.cast(ConfigMessage::FetchConfig) {
331 tracing::warn!("Config: Failed to trigger config fetch: {}", e);
332 }
333 }
334 }
335
336 ConfigMessage::SpawnMetricsCollectors => {
337 tracing::info!("Config: Spawning metrics collectors based on config");
338
339 if let Some(metrics_config) = &state.metrics_config {
341 if let Some(native) = &metrics_config.native {
342 if native.enabled {
343 tracing::info!(
344 "Config: Native sink enabled - interval: {}s, lttb_threshold: {}",
345 native.interval_secs,
346 native.lttb_threshold
347 );
348
349 if let Some(metrics_cell) =
351 registry::where_is("MetricsSupervisor".to_string())
352 {
353 let metrics_ref: ActorRef<crate::core::metrics::MetricsMessage> =
354 metrics_cell.into();
355
356 if let Err(e) = metrics_ref.cast(
358 crate::core::metrics::MetricsMessage::SpawnNativeSink {
359 interval_secs: native.interval_secs,
360 lttb_threshold: native.lttb_threshold,
361 },
362 ) {
363 tracing::warn!(
364 "Config: Failed to send SpawnNativeSink message: {}",
365 e
366 );
367 }
368 } else {
369 tracing::warn!("Config: MetricsSupervisor not found in registry");
370 }
371 } else {
372 tracing::info!("Config: Native sink disabled (enabled = false)");
373 }
374 } else {
375 tracing::info!("Config: Native sink disabled (no native config)");
376 }
377 } else {
378 tracing::warn!("Config: No metrics config available");
379 }
380 }
381 }
382
383 Ok(())
384 }
385
386 async fn post_stop(
387 &self,
388 _myself: ActorRef<Self::Msg>,
389 state: &mut Self::State,
390 ) -> Result<(), ActorProcessingErr> {
391 if let Some(task) = state.heartbeat_task.take() {
393 task.abort();
394 tracing::debug!("Config: Cancelled heartbeat task on shutdown");
395 }
396
397 tracing::info!("Config actor stopped");
398 Ok(())
399 }
400}
401
402async fn schedule_heartbeat_loop(interval_secs: u64) {
408 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
409 loop {
412 interval.tick().await;
413
414 match registry::where_is("AlertsFactory".to_string()) {
416 Some(factory_cell) => {
417 let factory_ref: ActorRef<FactoryMessage<(), crate::core::alerts::AlertMessage>> =
418 factory_cell.into();
419
420 if let Err(e) = factory_ref.cast(FactoryMessage::Dispatch(Job {
422 key: (),
423 msg: crate::core::alerts::AlertMessage::Heartbeat,
424 options: JobOptions::default(),
425 accepted: None,
426 })) {
427 tracing::error!(
428 "Config: Failed to dispatch heartbeat to AlertsFactory: {}",
429 e
430 );
431 } else {
432 tracing::debug!("Config: Dispatched heartbeat to AlertsFactory");
433 }
434 }
435 None => {
436 tracing::warn!("Config: AlertsFactory not found in registry");
437 }
438 }
439 }
440}
441
442pub async fn spawn_config(
450 identity: Arc<crate::AgentIdentity>,
451 api_url: Option<String>,
452) -> Result<(ActorRef<ConfigMessage>, JoinHandle<()>), SpawnErr> {
453 tracing::debug!("Spawning Config actor...");
454
455 let args = ConfigArgs { identity, api_url };
456 let (actor_ref, actor_handle) =
457 Actor::spawn(Some("Config".to_string()), Config, args).await?;
458
459 tracing::debug!("✓ Config actor spawned successfully");
460
461 Ok((actor_ref, actor_handle))
462}
463
464#[cfg(test)]
469mod tests {
470 use super::*;
471 use serde_json::json;
472
473 #[test]
474 fn test_config_response_deserialization() {
475 let config_json = json!({
477 "config": {
478 "tier_id": 2,
479 "boostMin": 0,
480 "boostMax": 3,
481 "heartbeat": {
482 "interval_secs": 30
483 },
484 "metrics": {
485 "native": {
486 "enabled": true,
487 "interval_secs": 300,
488 "lttb_threshold": 200
489 }
490 }
491 }
492 });
493
494 let config: Result<ConfigResponse, _> = serde_json::from_value(config_json);
495 assert!(config.is_ok());
496
497 let config = config.unwrap();
498 assert_eq!(config.config.tier_id, 2);
499 assert_eq!(
500 config
501 .config
502 .heartbeat
503 .get("interval_secs")
504 .and_then(|v| v.as_u64()),
505 Some(30)
506 );
507 }
508
509 #[test]
510 fn test_tier_config_heartbeat_extraction() {
511 let tier_config = TierConfig {
513 tier_id: 2,
514 heartbeat: json!({"interval_secs": 60}),
515 metrics: MetricsConfig { native: None },
516 boost_min: 0,
517 boost_max: None,
518 };
519
520 let interval = tier_config
521 .heartbeat
522 .get("interval_secs")
523 .and_then(|v| v.as_u64());
524 assert_eq!(interval, Some(60));
525 }
526
527 #[test]
528 fn test_config_message_types() {
529 let _fetch = ConfigMessage::FetchConfig;
531 let _apply = ConfigMessage::ApplyConfig(json!({"config": {}}));
532 let _spawn = ConfigMessage::SpawnMetricsCollectors;
533 let _check = ConfigMessage::CheckTier(2);
534
535 assert!(true);
537 }
538}