Skip to main content

orca_control/
alerts.rs

1//! AI alert pipeline integration for the control plane.
2//!
3//! Builds a `ConversationEngine` from `cluster.toml` `[ai]`, implements
4//! `ContextProvider` against `AppState`, and spawns the `AiMonitor` as
5//! a background task. The engine is held in `Option<SharedAlertEngine>`
6//! on `AppState` so handlers can mutate it for replies / dismiss / etc.
7//!
8//! Conversation persistence is in-memory only — a server restart wipes
9//! active alerts. The TUI (PR3) re-fetches via the HTTP API on startup.
10
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use chrono::{Duration, Utc};
15use tokio::sync::RwLock;
16use tracing::info;
17
18use orca_ai::backend::{LlmBackend, OpenAiCompatibleBackend};
19use orca_ai::channels::Dispatcher;
20use orca_ai::context::{ClusterContext, NodeSummary, ServiceSummary};
21use orca_ai::conversation::ConversationEngine;
22use orca_ai::monitor::{AiMonitor, ContextProvider};
23use orca_core::config::AiConfig;
24use orca_core::types::{RuntimeKind, WorkloadStatus};
25
26use crate::state::AppState;
27
28pub type AlertEngine = ConversationEngine<Box<dyn LlmBackend>>;
29pub type SharedAlertEngine = Arc<RwLock<AlertEngine>>;
30
31/// Build the alert engine if `[ai]` is configured with endpoint + model.
32/// Returns `None` when AI is unconfigured so the caller can degrade
33/// gracefully without erroring out the whole server startup.
34pub fn try_build_alert_engine(cfg: Option<&AiConfig>) -> Option<SharedAlertEngine> {
35    let ai = cfg?;
36    let endpoint = ai.endpoint.as_ref()?;
37    let model = ai.model.as_ref()?;
38
39    let backend: Box<dyn LlmBackend> = Box::new(OpenAiCompatibleBackend::new(
40        endpoint.clone(),
41        model.clone(),
42        ai.api_key.clone(),
43    ));
44
45    let dispatcher = ai
46        .alerts
47        .as_ref()
48        .and_then(|a| a.channels.as_ref())
49        .map(Dispatcher::from_config)
50        .unwrap_or_default();
51    let names = dispatcher.channel_names();
52    if !names.is_empty() {
53        info!("Alert delivery channels configured: {names:?}");
54    }
55
56    Some(Arc::new(RwLock::new(ConversationEngine::with_dispatcher(
57        backend, dispatcher,
58    ))))
59}
60
61/// Spawn `AiMonitor::run` as a background task. No-op when alerts are
62/// disabled or the engine isn't built.
63pub fn spawn_alert_monitor(state: Arc<AppState>) -> Option<tokio::task::JoinHandle<()>> {
64    let engine = state.alerts.as_ref()?.clone();
65    let ai = state.cluster_config.ai.as_ref()?;
66    let alerts_cfg = ai.alerts.as_ref()?;
67    if !alerts_cfg.enabled {
68        return None;
69    }
70    let interval = alerts_cfg.analysis_interval_secs;
71    let provider: Arc<dyn ContextProvider> =
72        Arc::new(StateContextProvider::for_state(state.clone()));
73    info!("Spawning AI alert monitor (interval: {interval}s)");
74    Some(tokio::spawn(async move {
75        let monitor = AiMonitor::new(engine, interval);
76        monitor.run(provider).await;
77    }))
78}
79
80/// Reads `AppState` snapshots into a `ClusterContext` the AI can reason about.
81/// Kept deliberately lean: services + nodes (CPU/mem) are the signal that
82/// matters for the current monitor heuristics. Logs / error counts / GPU
83/// summaries can be enriched later as the heuristics need them.
84pub struct StateContextProvider {
85    state: Arc<AppState>,
86}
87
88impl StateContextProvider {
89    pub fn for_state(state: Arc<AppState>) -> Self {
90        Self { state }
91    }
92}
93
94#[async_trait]
95impl ContextProvider for StateContextProvider {
96    async fn snapshot(&self) -> anyhow::Result<ClusterContext> {
97        let cluster_name = self.state.cluster_config.cluster.name.clone();
98
99        let services = self.state.services.read().await;
100        let events = self.state.instance_events.read().await;
101        let now = Utc::now();
102        let one_hour = Duration::hours(1);
103        let day = Duration::hours(24);
104        let services: Vec<ServiceSummary> = services
105            .values()
106            .map(|svc| {
107                let running = svc
108                    .instances
109                    .iter()
110                    .filter(|i| matches!(i.status, WorkloadStatus::Running))
111                    .count() as u32;
112                let status = if svc.instances.is_empty() {
113                    "stopped".into()
114                } else if running == svc.desired_replicas {
115                    "healthy".into()
116                } else {
117                    "degraded".into()
118                };
119                let (errors_1h, restarts_24h) = events
120                    .get(&svc.config.name)
121                    .map(|log| (log.failures_in(now, one_hour), log.restarts_in(now, day)))
122                    .unwrap_or((0, 0));
123                ServiceSummary {
124                    name: svc.config.name.clone(),
125                    runtime: match svc.config.runtime {
126                        RuntimeKind::Container => "container".into(),
127                        RuntimeKind::Wasm => "wasm".into(),
128                    },
129                    replicas_running: running,
130                    replicas_desired: svc.desired_replicas,
131                    status,
132                    uses_gpu: false,
133                    recent_logs: Vec::new(),
134                    error_count_1h: errors_1h,
135                    restart_count_24h: restarts_24h,
136                }
137            })
138            .collect();
139
140        let nodes = self.state.registered_nodes.read().await;
141        let nodes: Vec<NodeSummary> = nodes
142            .values()
143            .map(|n| NodeSummary {
144                id: n.node_id.to_string(),
145                address: n.address.clone(),
146                status: if n.drain {
147                    "draining".into()
148                } else {
149                    "healthy".into()
150                },
151                cpu_percent: n.cpu_percent,
152                memory_percent: if n.memory_total > 0 {
153                    (n.memory_bytes as f64 / n.memory_total as f64) * 100.0
154                } else {
155                    0.0
156                },
157                gpu_summary: Vec::new(),
158            })
159            .collect();
160
161        Ok(ClusterContext {
162            cluster_name,
163            nodes,
164            services,
165            recent_events: Vec::new(),
166            active_alerts: Vec::new(),
167        })
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use orca_core::config::ClusterConfig;
175
176    #[test]
177    fn try_build_returns_none_when_no_ai_config() {
178        assert!(try_build_alert_engine(None).is_none());
179    }
180
181    #[test]
182    fn try_build_returns_none_when_endpoint_missing() {
183        let ai = AiConfig {
184            provider: "ollama".into(),
185            endpoint: None,
186            model: Some("llama3".into()),
187            api_key: None,
188            alerts: None,
189            auto_remediate: None,
190        };
191        assert!(try_build_alert_engine(Some(&ai)).is_none());
192    }
193
194    #[test]
195    fn try_build_returns_engine_when_minimum_config_set() {
196        let ai = AiConfig {
197            provider: "ollama".into(),
198            endpoint: Some("http://127.0.0.1:11434".into()),
199            model: Some("llama3".into()),
200            api_key: None,
201            alerts: None,
202            auto_remediate: None,
203        };
204        assert!(try_build_alert_engine(Some(&ai)).is_some());
205    }
206
207    // Sanity that ClusterConfig::default() is still constructible; we don't
208    // wire spawn_alert_monitor here because it requires an Arc<AppState>
209    // with a configured AI backend that we can't easily fake in a unit test.
210    #[test]
211    fn default_cluster_config_has_no_ai() {
212        let cfg = ClusterConfig::default();
213        assert!(cfg.ai.is_none());
214        assert!(try_build_alert_engine(cfg.ai.as_ref()).is_none());
215    }
216}