Skip to main content

orca_control/
state.rs

1//! Shared application state for the control plane.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use tokio::sync::RwLock;
7
8use orca_core::config::{ClusterConfig, ServiceConfig};
9use orca_core::runtime::{Runtime, WorkloadHandle};
10use orca_core::types::{HealthState, Replicas, WorkloadStatus};
11
12use crate::stats::ContainerStats;
13use crate::webhook::WebhookStore;
14
15pub use orca_proxy::{RouteTarget, SharedWasmTriggers, WasmTrigger};
16
17/// Shared route table type, compatible with [`orca_proxy::run_proxy`].
18pub type SharedRouteTable = Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>;
19
20/// Shared state for the control plane, accessible by the API server and reconciler.
21pub struct AppState {
22    /// Cluster configuration.
23    pub cluster_config: ClusterConfig,
24    /// Container runtime (Docker).
25    pub container_runtime: Arc<dyn Runtime>,
26    /// Wasm runtime (wasmtime). Trait object to avoid coupling to concrete type.
27    pub wasm_runtime: Option<Arc<dyn Runtime>>,
28    /// Current service state, keyed by service name.
29    pub services: RwLock<HashMap<String, ServiceState>>,
30    /// Routing table for container workloads, shared with the reverse proxy.
31    pub route_table: SharedRouteTable,
32    /// Wasm HTTP triggers, shared with the reverse proxy.
33    pub wasm_triggers: SharedWasmTriggers,
34    /// Registered cluster nodes (M2 in-memory, will move to Raft store).
35    pub registered_nodes: RwLock<HashMap<u64, RegisteredNode>>,
36    /// Webhook configurations for push-triggered deploys.
37    pub webhooks: WebhookStore,
38    /// API bearer tokens for authentication (empty = allow all).
39    pub api_tokens: Vec<String>,
40    /// Pending commands for agent nodes, keyed by node_id.
41    /// Uses serde_json::Value to avoid circular dependency on orca-agent types.
42    pub pending_commands: RwLock<HashMap<u64, Vec<serde_json::Value>>>,
43    /// Deploy history for rollback support.
44    pub deploy_history: RwLock<crate::deploy_history::DeployHistory>,
45    /// ACME manager for hot cert provisioning (None if no TLS).
46    pub acme_manager: Option<orca_proxy::acme::AcmeManager>,
47    /// Dynamic cert resolver shared with the HTTPS listener.
48    pub cert_resolver: Option<orca_proxy::SharedCertResolver>,
49    /// Cached container stats, keyed by service name.
50    pub container_stats: RwLock<HashMap<String, ContainerStats>>,
51    /// Persistent cluster store (redb). None in tests without persistence.
52    pub store: Option<Arc<crate::store::ClusterStore>>,
53    /// WebSocket senders for connected agent nodes, keyed by node_id.
54    pub ws_agents: RwLock<HashMap<u64, crate::ws_handler::AgentSender>>,
55    /// Log stream listeners: request_id → (data, done) sender.
56    pub log_listeners: RwLock<HashMap<String, tokio::sync::mpsc::Sender<(String, bool)>>>,
57    /// Backup status listeners: request_id → report sender. Used by the
58    /// `/api/v1/cluster/backups` handler to collect reports dispatched in
59    /// parallel to every connected agent.
60    pub backup_listeners: RwLock<
61        HashMap<String, tokio::sync::mpsc::Sender<orca_core::ws_types::BackupStatusReportData>>,
62    >,
63    /// Network status listeners: request_id → report sender. Same pattern as
64    /// `backup_listeners`, used by the `/api/v1/cluster/networks` handler.
65    pub network_listeners: RwLock<
66        HashMap<String, tokio::sync::mpsc::Sender<orca_core::ws_types::NetworkStatusReportData>>,
67    >,
68    /// Last completed `BackupResult` per agent node, recorded as the result
69    /// arrives over WS. Surfaced alongside the snapshot listing so the
70    /// dashboard can show a node's last-failure message without having to
71    /// scrape logs. Master has its own field — its backups are subprocess-
72    /// driven, not WS-dispatched.
73    pub last_backup_results: RwLock<HashMap<u64, LastBackupResult>>,
74    /// Last completed master backup result, recorded when `run_master_backup`
75    /// finishes. Separate from `last_backup_results` because the master has
76    /// no `node_id` and runs its backups via subprocess rather than WS.
77    pub master_last_backup_result: RwLock<Option<LastBackupResult>>,
78    /// Recent webhook invocations, keyed by `service_name`. Bounded ring
79    /// buffer of the last 10 deliveries per webhook so the TUI can render a
80    /// history view without scraping logs. Lost on restart by design — this
81    /// is operator-visible recent activity, not durable audit.
82    pub webhook_invocations: RwLock<
83        HashMap<String, std::collections::VecDeque<orca_core::api_types::WebhookInvocation>>,
84    >,
85    /// Active exec sessions: session_id → output bytes sender (agent → CLI WS).
86    pub exec_sessions: RwLock<HashMap<String, tokio::sync::mpsc::Sender<Vec<u8>>>>,
87    /// Pending deploy result waiters: service_name → oneshot sender.
88    /// Inserted by queue_remote_deploy, resolved by ws_handler on DeployResult.
89    pub pending_deploys: RwLock<HashMap<String, tokio::sync::oneshot::Sender<Result<(), String>>>>,
90}
91
92pub use orca_core::api_types::LastBackupResult;
93
94/// A node registered in the cluster.
95#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
96pub struct RegisteredNode {
97    /// Node ID.
98    pub node_id: u64,
99    /// Node address (ip:port).
100    pub address: String,
101    /// Node labels.
102    pub labels: HashMap<String, String>,
103    /// Last heartbeat time.
104    pub last_heartbeat: chrono::DateTime<chrono::Utc>,
105    /// Whether the node is in drain mode (no new workloads scheduled).
106    #[serde(default)]
107    pub drain: bool,
108    /// Latest CPU / memory / disk / network sample reported in a heartbeat.
109    /// Populated for both the master (via its local collector) and joined
110    /// nodes (via their heartbeat body).
111    #[serde(default)]
112    pub cpu_percent: f64,
113    #[serde(default)]
114    pub memory_bytes: u64,
115    #[serde(default)]
116    pub memory_total: u64,
117    #[serde(default)]
118    pub disk_used: u64,
119    #[serde(default)]
120    pub disk_total: u64,
121    #[serde(default)]
122    pub net_rx: u64,
123    #[serde(default)]
124    pub net_tx: u64,
125}
126
127/// State of a deployed service.
128#[derive(Debug)]
129pub struct ServiceState {
130    /// The service configuration.
131    pub config: ServiceConfig,
132    /// Desired number of replicas.
133    pub desired_replicas: u32,
134    /// Running instances.
135    pub instances: Vec<InstanceState>,
136}
137
138/// State of a single workload instance (one replica).
139#[derive(Debug)]
140pub struct InstanceState {
141    /// Handle to the running workload.
142    pub handle: WorkloadHandle,
143    /// Current status.
144    pub status: WorkloadStatus,
145    /// Host port mapped to the container's primary port (containers only).
146    pub host_port: Option<u16>,
147    /// Container address on Docker network (ip:port) for direct proxy routing.
148    pub container_address: Option<String>,
149    /// Health check state.
150    pub health: HealthState,
151    /// Whether this instance is a canary (new version during canary deploy).
152    pub is_canary: bool,
153    /// When this instance was created (for initial_delay_secs).
154    pub started_at: std::time::Instant,
155}
156
157impl AppState {
158    /// Create with shared route table and Wasm triggers (for sharing with the proxy).
159    pub fn new(
160        cluster_config: ClusterConfig,
161        container_runtime: Arc<dyn Runtime>,
162        wasm_runtime: Option<Arc<dyn Runtime>>,
163        route_table: SharedRouteTable,
164        wasm_triggers: SharedWasmTriggers,
165    ) -> Self {
166        let api_tokens = cluster_config.api_tokens.clone();
167        Self {
168            cluster_config,
169            container_runtime,
170            wasm_runtime,
171            services: RwLock::new(HashMap::new()),
172            route_table,
173            wasm_triggers,
174            registered_nodes: RwLock::new(HashMap::new()),
175            pending_commands: RwLock::new(HashMap::new()),
176            webhooks: crate::webhook::new_store(),
177            api_tokens,
178            deploy_history: RwLock::new(crate::deploy_history::DeployHistory::new()),
179            acme_manager: None,
180            cert_resolver: None,
181            container_stats: RwLock::new(HashMap::new()),
182            store: None,
183            ws_agents: RwLock::new(HashMap::new()),
184            log_listeners: RwLock::new(HashMap::new()),
185            backup_listeners: RwLock::new(HashMap::new()),
186            network_listeners: RwLock::new(HashMap::new()),
187            last_backup_results: RwLock::new(HashMap::new()),
188            master_last_backup_result: RwLock::new(None),
189            webhook_invocations: RwLock::new(HashMap::new()),
190            exec_sessions: RwLock::new(HashMap::new()),
191            pending_deploys: RwLock::new(HashMap::new()),
192        }
193    }
194
195    /// Set persistent store for service state.
196    pub fn with_store(mut self, store: Arc<crate::store::ClusterStore>) -> Self {
197        self.store = Some(store);
198        self
199    }
200
201    /// Set ACME manager and cert resolver for hot cert provisioning.
202    pub fn with_acme(
203        mut self,
204        manager: orca_proxy::acme::AcmeManager,
205        resolver: orca_proxy::SharedCertResolver,
206    ) -> Self {
207        self.acme_manager = Some(manager);
208        self.cert_resolver = Some(resolver);
209        self
210    }
211}
212
213impl ServiceState {
214    /// Create from a service config.
215    pub fn from_config(config: ServiceConfig) -> Self {
216        let desired_replicas = match &config.replicas {
217            Replicas::Fixed(n) => *n,
218            Replicas::Auto => 1,
219        };
220        Self {
221            config,
222            desired_replicas,
223            instances: Vec::new(),
224        }
225    }
226
227    /// Count how many instances are currently running.
228    pub fn running_count(&self) -> u32 {
229        self.instances
230            .iter()
231            .filter(|i| i.status == WorkloadStatus::Running)
232            .count() as u32
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use std::collections::HashMap;
240
241    use orca_core::config::ServiceConfig;
242    use orca_core::runtime::WorkloadHandle;
243    use orca_core::types::{Replicas, WorkloadStatus};
244
245    fn minimal_config(replicas: Replicas) -> ServiceConfig {
246        ServiceConfig {
247            name: "test-svc".to_string(),
248            project: None,
249            runtime: Default::default(),
250            image: Some("nginx:latest".to_string()),
251            module: None,
252            replicas,
253            port: Some(8080),
254            domain: None,
255            health: None,
256            readiness: None,
257            liveness: None,
258            env: HashMap::new(),
259            resources: None,
260            volume: None,
261            deploy: None,
262            placement: None,
263            network: None,
264            aliases: vec![],
265            mounts: vec![],
266            routes: vec![],
267            host_port: None,
268            triggers: Vec::new(),
269            assets: None,
270            build: None,
271            tls_cert: None,
272            tls_key: None,
273            internal: false,
274            depends_on: vec![],
275            cmd: vec![],
276            extra_ports: vec![],
277            strip_prefix: None,
278            pull_policy: Default::default(),
279            backup: None,
280        }
281    }
282
283    fn make_instance(status: WorkloadStatus) -> InstanceState {
284        InstanceState {
285            handle: WorkloadHandle {
286                runtime_id: "test-id".to_string(),
287                name: "test-instance".to_string(),
288                metadata: HashMap::new(),
289            },
290            status,
291            host_port: None,
292            container_address: None,
293            health: HealthState::Unknown,
294            is_canary: false,
295            started_at: std::time::Instant::now(),
296        }
297    }
298
299    #[test]
300    fn from_config_fixed_sets_desired_replicas() {
301        let state = ServiceState::from_config(minimal_config(Replicas::Fixed(3)));
302        assert_eq!(state.desired_replicas, 3);
303    }
304
305    #[test]
306    fn from_config_auto_defaults_to_one() {
307        let state = ServiceState::from_config(minimal_config(Replicas::Auto));
308        assert_eq!(state.desired_replicas, 1);
309    }
310
311    #[test]
312    fn running_count_with_mixed_statuses() {
313        let mut state = ServiceState::from_config(minimal_config(Replicas::Fixed(4)));
314        state.instances = vec![
315            make_instance(WorkloadStatus::Running),
316            make_instance(WorkloadStatus::Stopped),
317            make_instance(WorkloadStatus::Running),
318            make_instance(WorkloadStatus::Failed),
319        ];
320        assert_eq!(state.running_count(), 2);
321    }
322}