1use std::sync::Arc;
12
13use async_trait::async_trait;
14use chrono::{Duration, Utc};
15use tokio::sync::RwLock;
16use tracing::info;
17
18use orca_ai::backend::{LlmBackend, OpenAiCompatibleBackend};
19use orca_ai::channels::Dispatcher;
20use orca_ai::context::{ClusterContext, NodeSummary, ServiceSummary};
21use orca_ai::conversation::ConversationEngine;
22use orca_ai::monitor::{AiMonitor, ContextProvider};
23use orca_core::config::AiConfig;
24use orca_core::types::{RuntimeKind, WorkloadStatus};
25
26use crate::state::AppState;
27
28pub type AlertEngine = ConversationEngine<Box<dyn LlmBackend>>;
29pub type SharedAlertEngine = Arc<RwLock<AlertEngine>>;
30
31pub fn try_build_alert_engine(cfg: Option<&AiConfig>) -> Option<SharedAlertEngine> {
35 let ai = cfg?;
36 let endpoint = ai.endpoint.as_ref()?;
37 let model = ai.model.as_ref()?;
38
39 let backend: Box<dyn LlmBackend> = Box::new(OpenAiCompatibleBackend::new(
40 endpoint.clone(),
41 model.clone(),
42 ai.api_key.clone(),
43 ));
44
45 let dispatcher = ai
46 .alerts
47 .as_ref()
48 .and_then(|a| a.channels.as_ref())
49 .map(Dispatcher::from_config)
50 .unwrap_or_default();
51 let names = dispatcher.channel_names();
52 if !names.is_empty() {
53 info!("Alert delivery channels configured: {names:?}");
54 }
55
56 Some(Arc::new(RwLock::new(ConversationEngine::with_dispatcher(
57 backend, dispatcher,
58 ))))
59}
60
61pub fn spawn_alert_monitor(state: Arc<AppState>) -> Option<tokio::task::JoinHandle<()>> {
64 let engine = state.alerts.as_ref()?.clone();
65 let ai = state.cluster_config.ai.as_ref()?;
66 let alerts_cfg = ai.alerts.as_ref()?;
67 if !alerts_cfg.enabled {
68 return None;
69 }
70 let interval = alerts_cfg.analysis_interval_secs;
71 let provider: Arc<dyn ContextProvider> =
72 Arc::new(StateContextProvider::for_state(state.clone()));
73 info!("Spawning AI alert monitor (interval: {interval}s)");
74 Some(tokio::spawn(async move {
75 let monitor = AiMonitor::new(engine, interval);
76 monitor.run(provider).await;
77 }))
78}
79
80pub struct StateContextProvider {
85 state: Arc<AppState>,
86}
87
88impl StateContextProvider {
89 pub fn for_state(state: Arc<AppState>) -> Self {
90 Self { state }
91 }
92}
93
94#[async_trait]
95impl ContextProvider for StateContextProvider {
96 async fn snapshot(&self) -> anyhow::Result<ClusterContext> {
97 let cluster_name = self.state.cluster_config.cluster.name.clone();
98
99 let services = self.state.services.read().await;
100 let events = self.state.instance_events.read().await;
101 let now = Utc::now();
102 let one_hour = Duration::hours(1);
103 let day = Duration::hours(24);
104 let services: Vec<ServiceSummary> = services
105 .values()
106 .map(|svc| {
107 let running = svc
108 .instances
109 .iter()
110 .filter(|i| matches!(i.status, WorkloadStatus::Running))
111 .count() as u32;
112 let status = if svc.instances.is_empty() {
113 "stopped".into()
114 } else if running == svc.desired_replicas {
115 "healthy".into()
116 } else {
117 "degraded".into()
118 };
119 let (errors_1h, restarts_24h) = events
120 .get(&svc.config.name)
121 .map(|log| (log.failures_in(now, one_hour), log.restarts_in(now, day)))
122 .unwrap_or((0, 0));
123 ServiceSummary {
124 name: svc.config.name.clone(),
125 runtime: match svc.config.runtime {
126 RuntimeKind::Container => "container".into(),
127 RuntimeKind::Wasm => "wasm".into(),
128 },
129 replicas_running: running,
130 replicas_desired: svc.desired_replicas,
131 status,
132 uses_gpu: false,
133 recent_logs: Vec::new(),
134 error_count_1h: errors_1h,
135 restart_count_24h: restarts_24h,
136 }
137 })
138 .collect();
139
140 let nodes = self.state.registered_nodes.read().await;
141 let nodes: Vec<NodeSummary> = nodes
142 .values()
143 .map(|n| NodeSummary {
144 id: n.node_id.to_string(),
145 address: n.address.clone(),
146 status: if n.drain {
147 "draining".into()
148 } else {
149 "healthy".into()
150 },
151 cpu_percent: n.cpu_percent,
152 memory_percent: if n.memory_total > 0 {
153 (n.memory_bytes as f64 / n.memory_total as f64) * 100.0
154 } else {
155 0.0
156 },
157 gpu_summary: Vec::new(),
158 })
159 .collect();
160
161 Ok(ClusterContext {
162 cluster_name,
163 nodes,
164 services,
165 recent_events: Vec::new(),
166 active_alerts: Vec::new(),
167 })
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use orca_core::config::ClusterConfig;
175
176 #[test]
177 fn try_build_returns_none_when_no_ai_config() {
178 assert!(try_build_alert_engine(None).is_none());
179 }
180
181 #[test]
182 fn try_build_returns_none_when_endpoint_missing() {
183 let ai = AiConfig {
184 provider: "ollama".into(),
185 endpoint: None,
186 model: Some("llama3".into()),
187 api_key: None,
188 alerts: None,
189 auto_remediate: None,
190 };
191 assert!(try_build_alert_engine(Some(&ai)).is_none());
192 }
193
194 #[test]
195 fn try_build_returns_engine_when_minimum_config_set() {
196 let ai = AiConfig {
197 provider: "ollama".into(),
198 endpoint: Some("http://127.0.0.1:11434".into()),
199 model: Some("llama3".into()),
200 api_key: None,
201 alerts: None,
202 auto_remediate: None,
203 };
204 assert!(try_build_alert_engine(Some(&ai)).is_some());
205 }
206
207 #[test]
211 fn default_cluster_config_has_no_ai() {
212 let cfg = ClusterConfig::default();
213 assert!(cfg.ai.is_none());
214 assert!(try_build_alert_engine(cfg.ai.as_ref()).is_none());
215 }
216}