Skip to main content

loa_core/core/
config.rs

1//! Config Actor - Manages agent configuration via HTTP polling
2//!
3//! This actor:
4//! - Fetches configuration from backend via HTTP when notified by heartbeat
5//! - Applies config changes to agent state (heartbeat interval, metrics collectors)
6//! - Maintains local-first fallback config (Tier 0)
7
8use ractor::factory::{FactoryMessage, Job, JobOptions};
9use ractor::{async_trait, registry, Actor, ActorProcessingErr, ActorRef, SpawnErr};
10use serde::{Deserialize, Serialize};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::task::JoinHandle;
14
15// ============================================================================
16// MESSAGES
17// ============================================================================
18
19#[derive(Debug, Clone)]
20pub enum ConfigMessage {
21    /// Fetch configuration from backend via HTTP
22    FetchConfig,
23    /// Apply config update (called after HTTP fetch completes)
24    ApplyConfig(serde_json::Value),
25    /// Spawn metrics collectors based on config (called after config applied)
26    SpawnMetricsCollectors,
27    /// Check if tier has changed (called from heartbeat response)
28    CheckTier(u32),
29}
30
31// ============================================================================
32// STATE
33// ============================================================================
34
35pub struct ConfigState {
36    #[allow(dead_code)]
37    identity: Arc<crate::AgentIdentity>,
38    peer_id: String,
39    api_url: String,
40    heartbeat_interval_secs: Option<u64>,
41    heartbeat_task: Option<tokio::task::JoinHandle<()>>,
42    /// Current tier ID (used to detect tier changes from heartbeat)
43    tier_id: Option<u32>,
44    /// Metrics configuration from remote
45    pub metrics_config: Option<MetricsConfig>,
46    /// HTTP client with retry middleware for config fetches
47    http_client: reqwest_middleware::ClientWithMiddleware,
48}
49
50/// Arguments for spawning the Config actor
51pub struct ConfigArgs {
52    pub identity: Arc<crate::AgentIdentity>,
53    pub api_url: Option<String>,
54}
55
56// ============================================================================
57// CONFIGURATION
58// ============================================================================
59
60/// Metrics configuration - native sink settings
61#[derive(Debug, Clone, Deserialize, Serialize)]
62pub struct NativeSinkConfig {
63    pub enabled: bool,
64    pub interval_secs: u64,
65    pub lttb_threshold: usize,
66}
67
68/// Metrics configuration - controls which sinks are enabled
69#[derive(Debug, Clone, Deserialize, Serialize)]
70pub struct MetricsConfig {
71    pub native: Option<NativeSinkConfig>,
72    // Future: prometheus, grafana, etc.
73}
74
75/// Configuration response from HTTP endpoint
76#[derive(Debug, Deserialize, Serialize)]
77struct ConfigResponse {
78    config: TierConfig,
79}
80
81/// Tier configuration (heartbeat, metrics)
82#[derive(Debug, Deserialize, Serialize)]
83struct TierConfig {
84    tier_id: u32,
85    heartbeat: serde_json::Value,
86    metrics: MetricsConfig,
87    // Boost thresholds (not used by agent, but included in API response)
88    // Backend sends these in camelCase
89    #[serde(rename = "boostMin")]
90    #[allow(dead_code)]
91    boost_min: u32,
92    #[serde(rename = "boostMax")]
93    #[allow(dead_code)]
94    boost_max: Option<u32>,
95}
96
97// ============================================================================
98// ACTOR IMPLEMENTATION
99// ============================================================================
100
101pub struct Config;
102
103#[async_trait]
104impl Actor for Config {
105    type Msg = ConfigMessage;
106    type State = ConfigState;
107    type Arguments = ConfigArgs;
108
109    async fn pre_start(
110        &self,
111        _myself: ActorRef<Self::Msg>,
112        args: Self::Arguments,
113    ) -> Result<Self::State, ActorProcessingErr> {
114        tracing::debug!("Config actor starting");
115        tracing::debug!("  PeerId: {}", args.identity.peer_id());
116
117        let peer_id = args.identity.peer_id().to_string();
118        let api_url = args.api_url.unwrap_or_else(crate::constants::api_url);
119
120        tracing::debug!("  API URL: {}", api_url);
121
122        Ok(ConfigState {
123            identity: args.identity,
124            peer_id,
125            api_url,
126            heartbeat_interval_secs: None,
127            heartbeat_task: None,
128            tier_id: None,
129            metrics_config: None,
130            http_client: crate::http::build_http_client(),
131        })
132    }
133
134    async fn post_start(
135        &self,
136        myself: ActorRef<Self::Msg>,
137        state: &mut Self::State,
138    ) -> Result<(), ActorProcessingErr> {
139        tracing::info!("Config actor ready - starting heartbeat loop");
140
141        // Start heartbeat loop FIRST with default Tier 0 interval
142        // This ensures heartbeats run even if config fetch fails
143        // Heartbeat-first design: if 404, alerts worker will register then heartbeat
144        let default_interval = crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS;
145        state.heartbeat_interval_secs = Some(default_interval);
146        let task = tokio::spawn(schedule_heartbeat_loop(default_interval));
147        state.heartbeat_task = Some(task);
148
149        tracing::info!("Started heartbeat loop (default: {}s)", default_interval);
150
151        // Trigger initial config fetch (non-blocking)
152        // If this fails, heartbeats still run and will trigger config fetch on success
153        if let Err(e) = myself.cast(ConfigMessage::FetchConfig) {
154            tracing::warn!("Config: Failed to trigger initial config fetch: {}", e);
155        }
156
157        Ok(())
158    }
159
160    async fn handle(
161        &self,
162        myself: ActorRef<Self::Msg>,
163        message: Self::Msg,
164        state: &mut Self::State,
165    ) -> Result<(), ActorProcessingErr> {
166        match message {
167            ConfigMessage::FetchConfig => {
168                // Fetch config via HTTP GET using shared client with retry middleware
169                let api_url = state.api_url.clone();
170                let peer_id = state.peer_id.clone();
171                let config_url = format!("{}/v1/agents/{}/config", api_url, peer_id);
172                let http_client = state.http_client.clone();
173
174                tracing::debug!("Config: Fetching config from {}", config_url);
175
176                let myself_clone = myself.clone();
177                tokio::spawn(async move {
178                    match http_client.get(&config_url).send().await {
179                        Ok(response) => {
180                            if response.status().is_success() {
181                                match response.json::<serde_json::Value>().await {
182                                    Ok(config) => {
183                                        tracing::debug!("Config: Successfully fetched config");
184                                        let _ =
185                                            myself_clone.cast(ConfigMessage::ApplyConfig(config));
186                                    }
187                                    Err(e) => {
188                                        tracing::warn!(
189                                            "Config: Failed to parse config response: {}",
190                                            e
191                                        );
192                                    }
193                                }
194                            } else {
195                                tracing::warn!(
196                                    "Config: HTTP {} from config endpoint",
197                                    response.status()
198                                );
199                            }
200                        }
201                        Err(e) => {
202                            tracing::warn!("Config: Failed to fetch config: {}", e);
203                        }
204                    }
205                });
206            }
207
208            ConfigMessage::ApplyConfig(config) => {
209                tracing::debug!("Config: Applying config update");
210
211                // Parse configuration response
212                match serde_json::from_value::<ConfigResponse>(config) {
213                    Ok(config_response) => {
214                        tracing::debug!("Config: Parsed config successfully");
215                        tracing::debug!("  Tier ID: {}", config_response.config.tier_id);
216
217                        // Store tier_id in state (used to detect tier changes)
218                        state.tier_id = Some(config_response.config.tier_id);
219
220                        // Store metrics config in state
221                        state.metrics_config = Some(config_response.config.metrics.clone());
222
223                        // Determine metrics status for summary
224                        let metrics_status =
225                            if let Some(native) = &config_response.config.metrics.native {
226                                if native.enabled {
227                                    format!("enabled ({}s interval)", native.interval_secs)
228                                } else {
229                                    "disabled".to_string()
230                                }
231                            } else {
232                                "disabled".to_string()
233                            };
234
235                        // Extract heartbeat interval, fallback to default if missing
236                        let interval_secs: u64 = config_response
237                            .config
238                            .heartbeat
239                            .get("interval_secs")
240                            .and_then(|v| v.as_u64())
241                            .unwrap_or_else(|| {
242                                tracing::warn!(
243                                    "Config: No interval_secs found in heartbeat config, using default {}s",
244                                    crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS
245                                );
246                                crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS
247                            });
248
249                        // Cancel existing heartbeat task if any
250                        if let Some(task) = state.heartbeat_task.take() {
251                            task.abort();
252                            tracing::debug!("Config: Cancelled previous heartbeat task");
253                        }
254
255                        // Schedule new heartbeat task
256                        state.heartbeat_interval_secs = Some(interval_secs);
257                        let task = tokio::spawn(async move {
258                            schedule_heartbeat_loop(interval_secs).await;
259                        });
260                        state.heartbeat_task = Some(task);
261
262                        // Single summary log
263                        tracing::info!(
264                            "Loaded tier {} config (heartbeat: {}s, metrics: {})",
265                            config_response.config.tier_id,
266                            interval_secs,
267                            metrics_status
268                        );
269
270                        // Trigger metrics collectors spawn based on config
271                        if let Err(e) = myself.cast(ConfigMessage::SpawnMetricsCollectors) {
272                            tracing::warn!("Config: Failed to trigger metrics spawn: {}", e);
273                        }
274                    }
275                    Err(e) => {
276                        tracing::warn!("Config: Failed to parse config response: {}", e);
277                        tracing::info!(
278                            "Config: Falling back to default heartbeat interval ({}s)",
279                            crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS
280                        );
281
282                        // Fallback to default Tier 0 interval
283                        let interval_secs = crate::constants::DEFAULT_HEARTBEAT_INTERVAL_SECS;
284
285                        // Cancel existing heartbeat task if any
286                        if let Some(task) = state.heartbeat_task.take() {
287                            task.abort();
288                            tracing::debug!("Config: Cancelled previous heartbeat task");
289                        }
290
291                        // Schedule heartbeat with default interval
292                        state.heartbeat_interval_secs = Some(interval_secs);
293                        let task = tokio::spawn(async move {
294                            schedule_heartbeat_loop(interval_secs).await;
295                        });
296                        state.heartbeat_task = Some(task);
297                    }
298                }
299            }
300
301            ConfigMessage::CheckTier(backend_tier_id) => {
302                tracing::debug!(
303                    "Config: Checking tier - backend: {}, local: {:?}",
304                    backend_tier_id,
305                    state.tier_id
306                );
307
308                // If we don't have a local tier yet, or tier changed, fetch config
309                let should_fetch = match state.tier_id {
310                    None => {
311                        tracing::info!("Config: No local tier - fetching config");
312                        true
313                    }
314                    Some(local_tier_id) => {
315                        if backend_tier_id != local_tier_id {
316                            tracing::info!(
317                                "Config: Tier changed (local: {}, backend: {}) - fetching updated config",
318                                local_tier_id,
319                                backend_tier_id
320                            );
321                            true
322                        } else {
323                            tracing::debug!("Config: Tier {} up-to-date", local_tier_id);
324                            false
325                        }
326                    }
327                };
328
329                if should_fetch {
330                    if let Err(e) = myself.cast(ConfigMessage::FetchConfig) {
331                        tracing::warn!("Config: Failed to trigger config fetch: {}", e);
332                    }
333                }
334            }
335
336            ConfigMessage::SpawnMetricsCollectors => {
337                tracing::info!("Config: Spawning metrics collectors based on config");
338
339                // Check if we have metrics config with native sink enabled
340                if let Some(metrics_config) = &state.metrics_config {
341                    if let Some(native) = &metrics_config.native {
342                        if native.enabled {
343                            tracing::info!(
344                                "Config: Native sink enabled - interval: {}s, lttb_threshold: {}",
345                                native.interval_secs,
346                                native.lttb_threshold
347                            );
348
349                            // Look up MetricsSupervisor and tell it to spawn NativeSink
350                            if let Some(metrics_cell) =
351                                registry::where_is("MetricsSupervisor".to_string())
352                            {
353                                let metrics_ref: ActorRef<crate::core::metrics::MetricsMessage> =
354                                    metrics_cell.into();
355
356                                // Send message to spawn NativeSink with config
357                                if let Err(e) = metrics_ref.cast(
358                                    crate::core::metrics::MetricsMessage::SpawnNativeSink {
359                                        interval_secs: native.interval_secs,
360                                        lttb_threshold: native.lttb_threshold,
361                                    },
362                                ) {
363                                    tracing::warn!(
364                                        "Config: Failed to send SpawnNativeSink message: {}",
365                                        e
366                                    );
367                                }
368                            } else {
369                                tracing::warn!("Config: MetricsSupervisor not found in registry");
370                            }
371                        } else {
372                            tracing::info!("Config: Native sink disabled (enabled = false)");
373                        }
374                    } else {
375                        tracing::info!("Config: Native sink disabled (no native config)");
376                    }
377                } else {
378                    tracing::warn!("Config: No metrics config available");
379                }
380            }
381        }
382
383        Ok(())
384    }
385
386    async fn post_stop(
387        &self,
388        _myself: ActorRef<Self::Msg>,
389        state: &mut Self::State,
390    ) -> Result<(), ActorProcessingErr> {
391        // Cancel heartbeat task on shutdown
392        if let Some(task) = state.heartbeat_task.take() {
393            task.abort();
394            tracing::debug!("Config: Cancelled heartbeat task on shutdown");
395        }
396
397        tracing::info!("Config actor stopped");
398        Ok(())
399    }
400}
401
402// ============================================================================
403// HEARTBEAT SCHEDULING
404// ============================================================================
405
406/// Periodic heartbeat loop that dispatches heartbeats to AlertsFactory
407async fn schedule_heartbeat_loop(interval_secs: u64) {
408    let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
409    // First tick fires immediately, triggering registration if needed
410
411    loop {
412        interval.tick().await;
413
414        // Look up AlertsFactory from registry
415        match registry::where_is("AlertsFactory".to_string()) {
416            Some(factory_cell) => {
417                let factory_ref: ActorRef<FactoryMessage<(), crate::core::alerts::AlertMessage>> =
418                    factory_cell.into();
419
420                // Dispatch heartbeat job to factory
421                if let Err(e) = factory_ref.cast(FactoryMessage::Dispatch(Job {
422                    key: (),
423                    msg: crate::core::alerts::AlertMessage::Heartbeat,
424                    options: JobOptions::default(),
425                    accepted: None,
426                })) {
427                    tracing::error!(
428                        "Config: Failed to dispatch heartbeat to AlertsFactory: {}",
429                        e
430                    );
431                } else {
432                    tracing::debug!("Config: Dispatched heartbeat to AlertsFactory");
433                }
434            }
435            None => {
436                tracing::warn!("Config: AlertsFactory not found in registry");
437            }
438        }
439    }
440}
441
442// ============================================================================
443// SPAWN HELPER
444// ============================================================================
445
446/// Spawn the Config actor
447///
448/// Returns an ActorRef and its join handle.
449pub async fn spawn_config(
450    identity: Arc<crate::AgentIdentity>,
451    api_url: Option<String>,
452) -> Result<(ActorRef<ConfigMessage>, JoinHandle<()>), SpawnErr> {
453    tracing::debug!("Spawning Config actor...");
454
455    let args = ConfigArgs { identity, api_url };
456    let (actor_ref, actor_handle) =
457        Actor::spawn(Some("Config".to_string()), Config, args).await?;
458
459    tracing::debug!("✓ Config actor spawned successfully");
460
461    Ok((actor_ref, actor_handle))
462}
463
464// ============================================================================
465// UNIT TESTS
466// ============================================================================
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use serde_json::json;
472
473    #[test]
474    fn test_config_response_deserialization() {
475        // Test that we can correctly parse HTTP config response
476        let config_json = json!({
477            "config": {
478                "tier_id": 2,
479                "boostMin": 0,
480                "boostMax": 3,
481                "heartbeat": {
482                    "interval_secs": 30
483                },
484                "metrics": {
485                    "native": {
486                        "enabled": true,
487                        "interval_secs": 300,
488                        "lttb_threshold": 200
489                    }
490                }
491            }
492        });
493
494        let config: Result<ConfigResponse, _> = serde_json::from_value(config_json);
495        assert!(config.is_ok());
496
497        let config = config.unwrap();
498        assert_eq!(config.config.tier_id, 2);
499        assert_eq!(
500            config
501                .config
502                .heartbeat
503                .get("interval_secs")
504                .and_then(|v| v.as_u64()),
505            Some(30)
506        );
507    }
508
509    #[test]
510    fn test_tier_config_heartbeat_extraction() {
511        // Test extracting interval_secs from heartbeat config
512        let tier_config = TierConfig {
513            tier_id: 2,
514            heartbeat: json!({"interval_secs": 60}),
515            metrics: MetricsConfig { native: None },
516            boost_min: 0,
517            boost_max: None,
518        };
519
520        let interval = tier_config
521            .heartbeat
522            .get("interval_secs")
523            .and_then(|v| v.as_u64());
524        assert_eq!(interval, Some(60));
525    }
526
527    #[test]
528    fn test_config_message_types() {
529        // Verify all ConfigMessage variants can be created
530        let _fetch = ConfigMessage::FetchConfig;
531        let _apply = ConfigMessage::ApplyConfig(json!({"config": {}}));
532        let _spawn = ConfigMessage::SpawnMetricsCollectors;
533        let _check = ConfigMessage::CheckTier(2);
534
535        // All variants should compile and be Debug
536        assert!(true);
537    }
538}