Skip to main content

orca_control/
state.rs

1//! Shared application state for the control plane.
2
3use std::collections::{HashMap, VecDeque};
4use std::sync::Arc;
5
6use chrono::{DateTime, Duration, Utc};
7use tokio::sync::RwLock;
8
9use orca_core::config::{ClusterConfig, ServiceConfig};
10use orca_core::runtime::{Runtime, WorkloadHandle};
11use orca_core::types::{HealthState, Replicas, WorkloadStatus};
12
13use crate::stats::ContainerStats;
14use crate::webhook::WebhookStore;
15
16pub use orca_proxy::{RouteTarget, SharedWasmTriggers, WasmTrigger};
17
18/// Shared route table type, compatible with [`orca_proxy::run_proxy`].
19pub type SharedRouteTable = Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>;
20
21/// Shared state for the control plane, accessible by the API server and reconciler.
22pub struct AppState {
23    /// Cluster configuration.
24    pub cluster_config: ClusterConfig,
25    /// Container runtime (Docker).
26    pub container_runtime: Arc<dyn Runtime>,
27    /// Wasm runtime (wasmtime). Trait object to avoid coupling to concrete type.
28    pub wasm_runtime: Option<Arc<dyn Runtime>>,
29    /// Current service state, keyed by service name.
30    pub services: RwLock<HashMap<String, ServiceState>>,
31    /// Routing table for container workloads, shared with the reverse proxy.
32    pub route_table: SharedRouteTable,
33    /// Wasm HTTP triggers, shared with the reverse proxy.
34    pub wasm_triggers: SharedWasmTriggers,
35    /// Registered cluster nodes (M2 in-memory, will move to Raft store).
36    pub registered_nodes: RwLock<HashMap<u64, RegisteredNode>>,
37    /// Webhook configurations for push-triggered deploys.
38    pub webhooks: WebhookStore,
39    /// API bearer tokens for authentication (empty = allow all).
40    pub api_tokens: Vec<String>,
41    /// Pending commands for agent nodes, keyed by node_id.
42    /// Uses serde_json::Value to avoid circular dependency on orca-agent types.
43    pub pending_commands: RwLock<HashMap<u64, Vec<serde_json::Value>>>,
44    /// Deploy history for rollback support.
45    pub deploy_history: RwLock<crate::deploy_history::DeployHistory>,
46    /// ACME manager for hot cert provisioning (None if no TLS).
47    pub acme_manager: Option<orca_proxy::acme::AcmeManager>,
48    /// Dynamic cert resolver shared with the HTTPS listener.
49    pub cert_resolver: Option<orca_proxy::SharedCertResolver>,
50    /// Cached container stats, keyed by service name.
51    pub container_stats: RwLock<HashMap<String, ContainerStats>>,
52    /// Persistent cluster store (redb). None in tests without persistence.
53    pub store: Option<Arc<crate::store::ClusterStore>>,
54    /// WebSocket senders for connected agent nodes, keyed by node_id.
55    pub ws_agents: RwLock<HashMap<u64, crate::ws_handler::AgentSender>>,
56    /// Log stream listeners: request_id → (data, done) sender.
57    pub log_listeners: RwLock<HashMap<String, tokio::sync::mpsc::Sender<(String, bool)>>>,
58    /// Backup status listeners: request_id → report sender. Used by the
59    /// `/api/v1/cluster/backups` handler to collect reports dispatched in
60    /// parallel to every connected agent.
61    pub backup_listeners: RwLock<
62        HashMap<String, tokio::sync::mpsc::Sender<orca_core::ws_types::BackupStatusReportData>>,
63    >,
64    /// Network status listeners: request_id → report sender. Same pattern as
65    /// `backup_listeners`, used by the `/api/v1/cluster/networks` handler.
66    pub network_listeners: RwLock<
67        HashMap<String, tokio::sync::mpsc::Sender<orca_core::ws_types::NetworkStatusReportData>>,
68    >,
69    /// Last completed `BackupResult` per agent node, recorded as the result
70    /// arrives over WS. Surfaced alongside the snapshot listing so the
71    /// dashboard can show a node's last-failure message without having to
72    /// scrape logs. Master has its own field — its backups are subprocess-
73    /// driven, not WS-dispatched.
74    pub last_backup_results: RwLock<HashMap<u64, LastBackupResult>>,
75    /// Last completed master backup result, recorded when `run_master_backup`
76    /// finishes. Separate from `last_backup_results` because the master has
77    /// no `node_id` and runs its backups via subprocess rather than WS.
78    pub master_last_backup_result: RwLock<Option<LastBackupResult>>,
79    /// Recent webhook invocations, keyed by `service_name`. Bounded ring
80    /// buffer of the last 10 deliveries per webhook so the TUI can render a
81    /// history view without scraping logs. Lost on restart by design — this
82    /// is operator-visible recent activity, not durable audit.
83    pub webhook_invocations: RwLock<
84        HashMap<String, std::collections::VecDeque<orca_core::api_types::WebhookInvocation>>,
85    >,
86    /// Active exec sessions: session_id → output bytes sender (agent → CLI WS).
87    pub exec_sessions: RwLock<HashMap<String, tokio::sync::mpsc::Sender<Vec<u8>>>>,
88    /// Pending deploy result waiters: service_name → oneshot sender.
89    /// Inserted by queue_remote_deploy, resolved by ws_handler on DeployResult.
90    pub pending_deploys: RwLock<HashMap<String, tokio::sync::oneshot::Sender<Result<(), String>>>>,
91    /// AI conversational-alert engine. `None` when `[ai]` is not configured.
92    /// `AiMonitor::run` (spawned in `lib::run_server_with_acme`) feeds it
93    /// `open_alert` calls; the HTTP `/api/v1/alerts/...` handlers mutate it
94    /// in response to operator actions.
95    pub alerts: Option<crate::alerts::SharedAlertEngine>,
96    /// Per-service log of restart triggers and instance failures.
97    /// Read by the AI alert monitor to populate `restart_count_24h` and
98    /// `error_count_1h` in the cluster context. In-memory only; restart
99    /// wipes the log (the AI monitor reconstructs alerts as conditions
100    /// reappear, so a brief gap after restart is acceptable).
101    pub instance_events: RwLock<HashMap<String, InstanceEventLog>>,
102}
103
104/// Per-service ring of recent failure / restart timestamps. Pruning happens
105/// on every append: entries older than [`InstanceEventLog::WINDOW`] are
106/// dropped so the queues stay bounded to ~24h of activity regardless of
107/// service churn.
108#[derive(Debug, Default)]
109pub struct InstanceEventLog {
110    /// Timestamps of operator/watchdog-triggered restarts.
111    pub restarts: VecDeque<DateTime<Utc>>,
112    /// Timestamps of observed instance failures — non-zero exits and
113    /// individual health-check failures.
114    pub failures: VecDeque<DateTime<Utc>>,
115}
116
117impl InstanceEventLog {
118    /// Longest window any consumer needs; entries older than this are
119    /// pruned eagerly on append.
120    pub const WINDOW: Duration = Duration::hours(24);
121
122    pub fn record_restart(&mut self, now: DateTime<Utc>) {
123        self.restarts.push_back(now);
124        prune_older_than(&mut self.restarts, now - Self::WINDOW);
125    }
126
127    pub fn record_failure(&mut self, now: DateTime<Utc>) {
128        self.failures.push_back(now);
129        prune_older_than(&mut self.failures, now - Self::WINDOW);
130    }
131
132    /// Number of failure events recorded in the last `window` from `now`.
133    pub fn failures_in(&self, now: DateTime<Utc>, window: Duration) -> u64 {
134        let cutoff = now - window;
135        self.failures.iter().filter(|t| **t >= cutoff).count() as u64
136    }
137
138    /// Number of restart events recorded in the last `window` from `now`.
139    pub fn restarts_in(&self, now: DateTime<Utc>, window: Duration) -> u32 {
140        let cutoff = now - window;
141        self.restarts.iter().filter(|t| **t >= cutoff).count() as u32
142    }
143}
144
145fn prune_older_than(deque: &mut VecDeque<DateTime<Utc>>, cutoff: DateTime<Utc>) {
146    while let Some(front) = deque.front() {
147        if *front < cutoff {
148            deque.pop_front();
149        } else {
150            break;
151        }
152    }
153}
154
155pub use orca_core::api_types::LastBackupResult;
156
157/// A node registered in the cluster.
158#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
159pub struct RegisteredNode {
160    /// Node ID.
161    pub node_id: u64,
162    /// Node address (ip:port).
163    pub address: String,
164    /// Node labels.
165    pub labels: HashMap<String, String>,
166    /// Last heartbeat time.
167    pub last_heartbeat: chrono::DateTime<chrono::Utc>,
168    /// Whether the node is in drain mode (no new workloads scheduled).
169    #[serde(default)]
170    pub drain: bool,
171    /// Latest CPU / memory / disk / network sample reported in a heartbeat.
172    /// Populated for both the master (via its local collector) and joined
173    /// nodes (via their heartbeat body).
174    #[serde(default)]
175    pub cpu_percent: f64,
176    #[serde(default)]
177    pub memory_bytes: u64,
178    #[serde(default)]
179    pub memory_total: u64,
180    #[serde(default)]
181    pub disk_used: u64,
182    #[serde(default)]
183    pub disk_total: u64,
184    #[serde(default)]
185    pub net_rx: u64,
186    #[serde(default)]
187    pub net_tx: u64,
188}
189
190/// State of a deployed service.
191#[derive(Debug)]
192pub struct ServiceState {
193    /// The service configuration.
194    pub config: ServiceConfig,
195    /// Desired number of replicas.
196    pub desired_replicas: u32,
197    /// Running instances.
198    pub instances: Vec<InstanceState>,
199}
200
201/// State of a single workload instance (one replica).
202#[derive(Debug)]
203pub struct InstanceState {
204    /// Handle to the running workload.
205    pub handle: WorkloadHandle,
206    /// Current status.
207    pub status: WorkloadStatus,
208    /// Host port mapped to the container's primary port (containers only).
209    pub host_port: Option<u16>,
210    /// Container address on Docker network (ip:port) for direct proxy routing.
211    pub container_address: Option<String>,
212    /// Health check state.
213    pub health: HealthState,
214    /// Whether this instance is a canary (new version during canary deploy).
215    pub is_canary: bool,
216    /// When this instance was created (for initial_delay_secs).
217    pub started_at: std::time::Instant,
218}
219
220impl AppState {
221    /// Create with shared route table and Wasm triggers (for sharing with the proxy).
222    pub fn new(
223        cluster_config: ClusterConfig,
224        container_runtime: Arc<dyn Runtime>,
225        wasm_runtime: Option<Arc<dyn Runtime>>,
226        route_table: SharedRouteTable,
227        wasm_triggers: SharedWasmTriggers,
228    ) -> Self {
229        let api_tokens = cluster_config.api_tokens.clone();
230        Self {
231            cluster_config,
232            container_runtime,
233            wasm_runtime,
234            services: RwLock::new(HashMap::new()),
235            route_table,
236            wasm_triggers,
237            registered_nodes: RwLock::new(HashMap::new()),
238            pending_commands: RwLock::new(HashMap::new()),
239            webhooks: crate::webhook::new_store(),
240            api_tokens,
241            deploy_history: RwLock::new(crate::deploy_history::DeployHistory::new()),
242            acme_manager: None,
243            cert_resolver: None,
244            container_stats: RwLock::new(HashMap::new()),
245            store: None,
246            ws_agents: RwLock::new(HashMap::new()),
247            log_listeners: RwLock::new(HashMap::new()),
248            backup_listeners: RwLock::new(HashMap::new()),
249            network_listeners: RwLock::new(HashMap::new()),
250            last_backup_results: RwLock::new(HashMap::new()),
251            master_last_backup_result: RwLock::new(None),
252            webhook_invocations: RwLock::new(HashMap::new()),
253            exec_sessions: RwLock::new(HashMap::new()),
254            pending_deploys: RwLock::new(HashMap::new()),
255            alerts: None,
256            instance_events: RwLock::new(HashMap::new()),
257        }
258    }
259
260    /// Append a restart event for `service` and prune old entries.
261    /// Cheap (one `now()` + a few VecDeque ops); safe to call from hot paths.
262    pub async fn record_instance_restart(&self, service: &str) {
263        let mut log = self.instance_events.write().await;
264        log.entry(service.to_string())
265            .or_default()
266            .record_restart(Utc::now());
267    }
268
269    /// Append a failure event for `service` (non-zero exit or health-check
270    /// failure) and prune old entries.
271    pub async fn record_instance_failure(&self, service: &str) {
272        let mut log = self.instance_events.write().await;
273        log.entry(service.to_string())
274            .or_default()
275            .record_failure(Utc::now());
276    }
277
278    /// Set persistent store for service state.
279    pub fn with_store(mut self, store: Arc<crate::store::ClusterStore>) -> Self {
280        self.store = Some(store);
281        self
282    }
283
284    /// Attach the AI alert engine. Builder-style so startup code can wire it
285    /// only when `[ai]` is configured.
286    pub fn with_alerts(mut self, engine: crate::alerts::SharedAlertEngine) -> Self {
287        self.alerts = Some(engine);
288        self
289    }
290
291    /// Set ACME manager and cert resolver for hot cert provisioning.
292    pub fn with_acme(
293        mut self,
294        manager: orca_proxy::acme::AcmeManager,
295        resolver: orca_proxy::SharedCertResolver,
296    ) -> Self {
297        self.acme_manager = Some(manager);
298        self.cert_resolver = Some(resolver);
299        self
300    }
301}
302
303impl ServiceState {
304    /// Create from a service config.
305    pub fn from_config(config: ServiceConfig) -> Self {
306        let desired_replicas = match &config.replicas {
307            Replicas::Fixed(n) => *n,
308            Replicas::Auto => 1,
309        };
310        Self {
311            config,
312            desired_replicas,
313            instances: Vec::new(),
314        }
315    }
316
317    /// Count how many instances are currently running.
318    pub fn running_count(&self) -> u32 {
319        self.instances
320            .iter()
321            .filter(|i| i.status == WorkloadStatus::Running)
322            .count() as u32
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use std::collections::HashMap;
330
331    use orca_core::config::ServiceConfig;
332    use orca_core::runtime::WorkloadHandle;
333    use orca_core::types::{Replicas, WorkloadStatus};
334
335    fn minimal_config(replicas: Replicas) -> ServiceConfig {
336        ServiceConfig {
337            name: "test-svc".to_string(),
338            project: None,
339            runtime: Default::default(),
340            image: Some("nginx:latest".to_string()),
341            module: None,
342            replicas,
343            port: Some(8080),
344            domain: None,
345            health: None,
346            readiness: None,
347            liveness: None,
348            env: HashMap::new(),
349            resources: None,
350            volume: None,
351            deploy: None,
352            placement: None,
353            network: None,
354            aliases: vec![],
355            mounts: vec![],
356            routes: vec![],
357            host_port: None,
358            triggers: Vec::new(),
359            assets: None,
360            build: None,
361            tls_cert: None,
362            tls_key: None,
363            internal: false,
364            depends_on: vec![],
365            cmd: vec![],
366            extra_ports: vec![],
367            strip_prefix: None,
368            pull_policy: Default::default(),
369            backup: None,
370        }
371    }
372
373    fn make_instance(status: WorkloadStatus) -> InstanceState {
374        InstanceState {
375            handle: WorkloadHandle {
376                runtime_id: "test-id".to_string(),
377                name: "test-instance".to_string(),
378                metadata: HashMap::new(),
379            },
380            status,
381            host_port: None,
382            container_address: None,
383            health: HealthState::Unknown,
384            is_canary: false,
385            started_at: std::time::Instant::now(),
386        }
387    }
388
389    #[test]
390    fn from_config_fixed_sets_desired_replicas() {
391        let state = ServiceState::from_config(minimal_config(Replicas::Fixed(3)));
392        assert_eq!(state.desired_replicas, 3);
393    }
394
395    #[test]
396    fn from_config_auto_defaults_to_one() {
397        let state = ServiceState::from_config(minimal_config(Replicas::Auto));
398        assert_eq!(state.desired_replicas, 1);
399    }
400
401    #[test]
402    fn running_count_with_mixed_statuses() {
403        let mut state = ServiceState::from_config(minimal_config(Replicas::Fixed(4)));
404        state.instances = vec![
405            make_instance(WorkloadStatus::Running),
406            make_instance(WorkloadStatus::Stopped),
407            make_instance(WorkloadStatus::Running),
408            make_instance(WorkloadStatus::Failed),
409        ];
410        assert_eq!(state.running_count(), 2);
411    }
412
413    #[test]
414    fn instance_event_log_counts_inside_window() {
415        // Inject test data directly so we can simulate timestamps across the
416        // window boundary. Real callers go through `record_*` which always
417        // stamps with `Utc::now()` (monotonically increasing).
418        let now = Utc::now();
419        let mut log = InstanceEventLog::default();
420        log.failures.push_back(now - Duration::minutes(30));
421        log.failures.push_back(now - Duration::minutes(45));
422        log.failures.push_back(now - Duration::hours(2));
423        log.restarts.push_back(now - Duration::hours(48));
424        log.restarts.push_back(now - Duration::hours(20));
425        log.restarts.push_back(now - Duration::minutes(10));
426
427        assert_eq!(log.failures_in(now, Duration::hours(1)), 2);
428        assert_eq!(log.failures_in(now, Duration::hours(24)), 3);
429        assert_eq!(log.restarts_in(now, Duration::hours(1)), 1);
430        assert_eq!(log.restarts_in(now, Duration::hours(24)), 2);
431    }
432
433    #[test]
434    fn instance_event_log_prunes_on_append() {
435        // A real append at `now` should drop anything earlier than 24h.
436        let now = Utc::now();
437        let mut log = InstanceEventLog::default();
438        log.restarts.push_back(now - Duration::hours(48)); // pre-existing stale
439        log.restarts.push_back(now - Duration::hours(20));
440        log.record_restart(now);
441        assert_eq!(
442            log.restarts.len(),
443            2,
444            "the 48h-old entry must be pruned when a fresh `now` arrives"
445        );
446    }
447}