Skip to main content

orca_control/
state.rs

1//! Shared application state for the control plane.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use tokio::sync::RwLock;
7
8use orca_core::config::{ClusterConfig, ServiceConfig};
9use orca_core::runtime::{Runtime, WorkloadHandle};
10use orca_core::types::{HealthState, Replicas, WorkloadStatus};
11
12use crate::stats::ContainerStats;
13use crate::webhook::WebhookStore;
14
15pub use orca_proxy::{RouteTarget, SharedWasmTriggers, WasmTrigger};
16
17/// Shared route table type, compatible with [`orca_proxy::run_proxy`].
18pub type SharedRouteTable = Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>;
19
20/// Shared state for the control plane, accessible by the API server and reconciler.
21pub struct AppState {
22    /// Cluster configuration.
23    pub cluster_config: ClusterConfig,
24    /// Container runtime (Docker).
25    pub container_runtime: Arc<dyn Runtime>,
26    /// Wasm runtime (wasmtime). Trait object to avoid coupling to concrete type.
27    pub wasm_runtime: Option<Arc<dyn Runtime>>,
28    /// Current service state, keyed by service name.
29    pub services: RwLock<HashMap<String, ServiceState>>,
30    /// Routing table for container workloads, shared with the reverse proxy.
31    pub route_table: SharedRouteTable,
32    /// Wasm HTTP triggers, shared with the reverse proxy.
33    pub wasm_triggers: SharedWasmTriggers,
34    /// Registered cluster nodes (M2 in-memory, will move to Raft store).
35    pub registered_nodes: RwLock<HashMap<u64, RegisteredNode>>,
36    /// Webhook configurations for push-triggered deploys.
37    pub webhooks: WebhookStore,
38    /// API bearer tokens for authentication (empty = allow all).
39    pub api_tokens: Vec<String>,
40    /// Pending commands for agent nodes, keyed by node_id.
41    /// Uses serde_json::Value to avoid circular dependency on orca-agent types.
42    pub pending_commands: RwLock<HashMap<u64, Vec<serde_json::Value>>>,
43    /// Deploy history for rollback support.
44    pub deploy_history: RwLock<crate::deploy_history::DeployHistory>,
45    /// ACME manager for hot cert provisioning (None if no TLS).
46    pub acme_manager: Option<orca_proxy::acme::AcmeManager>,
47    /// Dynamic cert resolver shared with the HTTPS listener.
48    pub cert_resolver: Option<orca_proxy::SharedCertResolver>,
49    /// Cached container stats, keyed by service name.
50    pub container_stats: RwLock<HashMap<String, ContainerStats>>,
51    /// Persistent cluster store (redb). None in tests without persistence.
52    pub store: Option<Arc<crate::store::ClusterStore>>,
53    /// WebSocket senders for connected agent nodes, keyed by node_id.
54    pub ws_agents: RwLock<HashMap<u64, crate::ws_handler::AgentSender>>,
55    /// Log stream listeners: request_id → (data, done) sender.
56    pub log_listeners: RwLock<HashMap<String, tokio::sync::mpsc::Sender<(String, bool)>>>,
57}
58
59/// A node registered in the cluster.
60#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
61pub struct RegisteredNode {
62    /// Node ID.
63    pub node_id: u64,
64    /// Node address (ip:port).
65    pub address: String,
66    /// Node labels.
67    pub labels: HashMap<String, String>,
68    /// Last heartbeat time.
69    pub last_heartbeat: chrono::DateTime<chrono::Utc>,
70    /// Whether the node is in drain mode (no new workloads scheduled).
71    #[serde(default)]
72    pub drain: bool,
73    /// Latest CPU / memory / disk / network sample reported in a heartbeat.
74    /// Populated for both the master (via its local collector) and joined
75    /// nodes (via their heartbeat body).
76    #[serde(default)]
77    pub cpu_percent: f64,
78    #[serde(default)]
79    pub memory_bytes: u64,
80    #[serde(default)]
81    pub memory_total: u64,
82    #[serde(default)]
83    pub disk_used: u64,
84    #[serde(default)]
85    pub disk_total: u64,
86    #[serde(default)]
87    pub net_rx: u64,
88    #[serde(default)]
89    pub net_tx: u64,
90}
91
92/// State of a deployed service.
93#[derive(Debug)]
94pub struct ServiceState {
95    /// The service configuration.
96    pub config: ServiceConfig,
97    /// Desired number of replicas.
98    pub desired_replicas: u32,
99    /// Running instances.
100    pub instances: Vec<InstanceState>,
101}
102
103/// State of a single workload instance (one replica).
104#[derive(Debug)]
105pub struct InstanceState {
106    /// Handle to the running workload.
107    pub handle: WorkloadHandle,
108    /// Current status.
109    pub status: WorkloadStatus,
110    /// Host port mapped to the container's primary port (containers only).
111    pub host_port: Option<u16>,
112    /// Container address on Docker network (ip:port) for direct proxy routing.
113    pub container_address: Option<String>,
114    /// Health check state.
115    pub health: HealthState,
116    /// Whether this instance is a canary (new version during canary deploy).
117    pub is_canary: bool,
118    /// When this instance was created (for initial_delay_secs).
119    pub started_at: std::time::Instant,
120}
121
122impl AppState {
123    /// Create with shared route table and Wasm triggers (for sharing with the proxy).
124    pub fn new(
125        cluster_config: ClusterConfig,
126        container_runtime: Arc<dyn Runtime>,
127        wasm_runtime: Option<Arc<dyn Runtime>>,
128        route_table: SharedRouteTable,
129        wasm_triggers: SharedWasmTriggers,
130    ) -> Self {
131        let api_tokens = cluster_config.api_tokens.clone();
132        Self {
133            cluster_config,
134            container_runtime,
135            wasm_runtime,
136            services: RwLock::new(HashMap::new()),
137            route_table,
138            wasm_triggers,
139            registered_nodes: RwLock::new(HashMap::new()),
140            pending_commands: RwLock::new(HashMap::new()),
141            webhooks: crate::webhook::new_store(),
142            api_tokens,
143            deploy_history: RwLock::new(crate::deploy_history::DeployHistory::new()),
144            acme_manager: None,
145            cert_resolver: None,
146            container_stats: RwLock::new(HashMap::new()),
147            store: None,
148            ws_agents: RwLock::new(HashMap::new()),
149            log_listeners: RwLock::new(HashMap::new()),
150        }
151    }
152
153    /// Set persistent store for service state.
154    pub fn with_store(mut self, store: Arc<crate::store::ClusterStore>) -> Self {
155        self.store = Some(store);
156        self
157    }
158
159    /// Set ACME manager and cert resolver for hot cert provisioning.
160    pub fn with_acme(
161        mut self,
162        manager: orca_proxy::acme::AcmeManager,
163        resolver: orca_proxy::SharedCertResolver,
164    ) -> Self {
165        self.acme_manager = Some(manager);
166        self.cert_resolver = Some(resolver);
167        self
168    }
169}
170
171impl ServiceState {
172    /// Create from a service config.
173    pub fn from_config(config: ServiceConfig) -> Self {
174        let desired_replicas = match &config.replicas {
175            Replicas::Fixed(n) => *n,
176            Replicas::Auto => 1,
177        };
178        Self {
179            config,
180            desired_replicas,
181            instances: Vec::new(),
182        }
183    }
184
185    /// Count how many instances are currently running.
186    pub fn running_count(&self) -> u32 {
187        self.instances
188            .iter()
189            .filter(|i| i.status == WorkloadStatus::Running)
190            .count() as u32
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use std::collections::HashMap;
198
199    use orca_core::config::ServiceConfig;
200    use orca_core::runtime::WorkloadHandle;
201    use orca_core::types::{Replicas, WorkloadStatus};
202
203    fn minimal_config(replicas: Replicas) -> ServiceConfig {
204        ServiceConfig {
205            name: "test-svc".to_string(),
206            project: None,
207            runtime: Default::default(),
208            image: Some("nginx:latest".to_string()),
209            module: None,
210            replicas,
211            port: Some(8080),
212            domain: None,
213            health: None,
214            readiness: None,
215            liveness: None,
216            env: HashMap::new(),
217            resources: None,
218            volume: None,
219            deploy: None,
220            placement: None,
221            network: None,
222            aliases: vec![],
223            mounts: vec![],
224            routes: vec![],
225            host_port: None,
226            triggers: Vec::new(),
227            assets: None,
228            build: None,
229            tls_cert: None,
230            tls_key: None,
231            internal: false,
232            depends_on: vec![],
233            cmd: vec![],
234            extra_ports: vec![],
235            strip_prefix: None,
236            pull_policy: Default::default(),
237        }
238    }
239
240    fn make_instance(status: WorkloadStatus) -> InstanceState {
241        InstanceState {
242            handle: WorkloadHandle {
243                runtime_id: "test-id".to_string(),
244                name: "test-instance".to_string(),
245                metadata: HashMap::new(),
246            },
247            status,
248            host_port: None,
249            container_address: None,
250            health: HealthState::Unknown,
251            is_canary: false,
252            started_at: std::time::Instant::now(),
253        }
254    }
255
256    #[test]
257    fn from_config_fixed_sets_desired_replicas() {
258        let state = ServiceState::from_config(minimal_config(Replicas::Fixed(3)));
259        assert_eq!(state.desired_replicas, 3);
260    }
261
262    #[test]
263    fn from_config_auto_defaults_to_one() {
264        let state = ServiceState::from_config(minimal_config(Replicas::Auto));
265        assert_eq!(state.desired_replicas, 1);
266    }
267
268    #[test]
269    fn running_count_with_mixed_statuses() {
270        let mut state = ServiceState::from_config(minimal_config(Replicas::Fixed(4)));
271        state.instances = vec![
272            make_instance(WorkloadStatus::Running),
273            make_instance(WorkloadStatus::Stopped),
274            make_instance(WorkloadStatus::Running),
275            make_instance(WorkloadStatus::Failed),
276        ];
277        assert_eq!(state.running_count(), 2);
278    }
279}