Skip to main content

orca_control/
state.rs

1//! Shared application state for the control plane.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use tokio::sync::RwLock;
7
8use orca_core::config::{ClusterConfig, ServiceConfig};
9use orca_core::runtime::{Runtime, WorkloadHandle};
10use orca_core::types::{HealthState, Replicas, WorkloadStatus};
11
12use crate::stats::ContainerStats;
13use crate::webhook::WebhookStore;
14
15pub use orca_proxy::{RouteTarget, SharedWasmTriggers, WasmTrigger};
16
17/// Shared route table type, compatible with [`orca_proxy::run_proxy`].
18pub type SharedRouteTable = Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>;
19
20/// Shared state for the control plane, accessible by the API server and reconciler.
21pub struct AppState {
22    /// Cluster configuration.
23    pub cluster_config: ClusterConfig,
24    /// Container runtime (Docker).
25    pub container_runtime: Arc<dyn Runtime>,
26    /// Wasm runtime (wasmtime). Trait object to avoid coupling to concrete type.
27    pub wasm_runtime: Option<Arc<dyn Runtime>>,
28    /// Current service state, keyed by service name.
29    pub services: RwLock<HashMap<String, ServiceState>>,
30    /// Routing table for container workloads, shared with the reverse proxy.
31    pub route_table: SharedRouteTable,
32    /// Wasm HTTP triggers, shared with the reverse proxy.
33    pub wasm_triggers: SharedWasmTriggers,
34    /// Registered cluster nodes (M2 in-memory, will move to Raft store).
35    pub registered_nodes: RwLock<HashMap<u64, RegisteredNode>>,
36    /// Webhook configurations for push-triggered deploys.
37    pub webhooks: WebhookStore,
38    /// API bearer tokens for authentication (empty = allow all).
39    pub api_tokens: Vec<String>,
40    /// Pending commands for agent nodes, keyed by node_id.
41    /// Uses serde_json::Value to avoid circular dependency on orca-agent types.
42    pub pending_commands: RwLock<HashMap<u64, Vec<serde_json::Value>>>,
43    /// Deploy history for rollback support.
44    pub deploy_history: RwLock<crate::deploy_history::DeployHistory>,
45    /// ACME manager for hot cert provisioning (None if no TLS).
46    pub acme_manager: Option<orca_proxy::acme::AcmeManager>,
47    /// Dynamic cert resolver shared with the HTTPS listener.
48    pub cert_resolver: Option<orca_proxy::SharedCertResolver>,
49    /// Cached container stats, keyed by service name.
50    pub container_stats: RwLock<HashMap<String, ContainerStats>>,
51    /// Persistent cluster store (redb). None in tests without persistence.
52    pub store: Option<Arc<crate::store::ClusterStore>>,
53    /// WebSocket senders for connected agent nodes, keyed by node_id.
54    pub ws_agents: RwLock<HashMap<u64, crate::ws_handler::AgentSender>>,
55    /// Log stream listeners: request_id → (data, done) sender.
56    pub log_listeners: RwLock<HashMap<String, tokio::sync::mpsc::Sender<(String, bool)>>>,
57    /// Active exec sessions: session_id → output bytes sender (agent → CLI WS).
58    pub exec_sessions: RwLock<HashMap<String, tokio::sync::mpsc::Sender<Vec<u8>>>>,
59    /// Pending deploy result waiters: service_name → oneshot sender.
60    /// Inserted by queue_remote_deploy, resolved by ws_handler on DeployResult.
61    pub pending_deploys: RwLock<HashMap<String, tokio::sync::oneshot::Sender<Result<(), String>>>>,
62}
63
64/// A node registered in the cluster.
65#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
66pub struct RegisteredNode {
67    /// Node ID.
68    pub node_id: u64,
69    /// Node address (ip:port).
70    pub address: String,
71    /// Node labels.
72    pub labels: HashMap<String, String>,
73    /// Last heartbeat time.
74    pub last_heartbeat: chrono::DateTime<chrono::Utc>,
75    /// Whether the node is in drain mode (no new workloads scheduled).
76    #[serde(default)]
77    pub drain: bool,
78    /// Latest CPU / memory / disk / network sample reported in a heartbeat.
79    /// Populated for both the master (via its local collector) and joined
80    /// nodes (via their heartbeat body).
81    #[serde(default)]
82    pub cpu_percent: f64,
83    #[serde(default)]
84    pub memory_bytes: u64,
85    #[serde(default)]
86    pub memory_total: u64,
87    #[serde(default)]
88    pub disk_used: u64,
89    #[serde(default)]
90    pub disk_total: u64,
91    #[serde(default)]
92    pub net_rx: u64,
93    #[serde(default)]
94    pub net_tx: u64,
95}
96
97/// State of a deployed service.
98#[derive(Debug)]
99pub struct ServiceState {
100    /// The service configuration.
101    pub config: ServiceConfig,
102    /// Desired number of replicas.
103    pub desired_replicas: u32,
104    /// Running instances.
105    pub instances: Vec<InstanceState>,
106}
107
108/// State of a single workload instance (one replica).
109#[derive(Debug)]
110pub struct InstanceState {
111    /// Handle to the running workload.
112    pub handle: WorkloadHandle,
113    /// Current status.
114    pub status: WorkloadStatus,
115    /// Host port mapped to the container's primary port (containers only).
116    pub host_port: Option<u16>,
117    /// Container address on Docker network (ip:port) for direct proxy routing.
118    pub container_address: Option<String>,
119    /// Health check state.
120    pub health: HealthState,
121    /// Whether this instance is a canary (new version during canary deploy).
122    pub is_canary: bool,
123    /// When this instance was created (for initial_delay_secs).
124    pub started_at: std::time::Instant,
125}
126
127impl AppState {
128    /// Create with shared route table and Wasm triggers (for sharing with the proxy).
129    pub fn new(
130        cluster_config: ClusterConfig,
131        container_runtime: Arc<dyn Runtime>,
132        wasm_runtime: Option<Arc<dyn Runtime>>,
133        route_table: SharedRouteTable,
134        wasm_triggers: SharedWasmTriggers,
135    ) -> Self {
136        let api_tokens = cluster_config.api_tokens.clone();
137        Self {
138            cluster_config,
139            container_runtime,
140            wasm_runtime,
141            services: RwLock::new(HashMap::new()),
142            route_table,
143            wasm_triggers,
144            registered_nodes: RwLock::new(HashMap::new()),
145            pending_commands: RwLock::new(HashMap::new()),
146            webhooks: crate::webhook::new_store(),
147            api_tokens,
148            deploy_history: RwLock::new(crate::deploy_history::DeployHistory::new()),
149            acme_manager: None,
150            cert_resolver: None,
151            container_stats: RwLock::new(HashMap::new()),
152            store: None,
153            ws_agents: RwLock::new(HashMap::new()),
154            log_listeners: RwLock::new(HashMap::new()),
155            exec_sessions: RwLock::new(HashMap::new()),
156            pending_deploys: RwLock::new(HashMap::new()),
157        }
158    }
159
160    /// Set persistent store for service state.
161    pub fn with_store(mut self, store: Arc<crate::store::ClusterStore>) -> Self {
162        self.store = Some(store);
163        self
164    }
165
166    /// Set ACME manager and cert resolver for hot cert provisioning.
167    pub fn with_acme(
168        mut self,
169        manager: orca_proxy::acme::AcmeManager,
170        resolver: orca_proxy::SharedCertResolver,
171    ) -> Self {
172        self.acme_manager = Some(manager);
173        self.cert_resolver = Some(resolver);
174        self
175    }
176}
177
178impl ServiceState {
179    /// Create from a service config.
180    pub fn from_config(config: ServiceConfig) -> Self {
181        let desired_replicas = match &config.replicas {
182            Replicas::Fixed(n) => *n,
183            Replicas::Auto => 1,
184        };
185        Self {
186            config,
187            desired_replicas,
188            instances: Vec::new(),
189        }
190    }
191
192    /// Count how many instances are currently running.
193    pub fn running_count(&self) -> u32 {
194        self.instances
195            .iter()
196            .filter(|i| i.status == WorkloadStatus::Running)
197            .count() as u32
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use std::collections::HashMap;
205
206    use orca_core::config::ServiceConfig;
207    use orca_core::runtime::WorkloadHandle;
208    use orca_core::types::{Replicas, WorkloadStatus};
209
210    fn minimal_config(replicas: Replicas) -> ServiceConfig {
211        ServiceConfig {
212            name: "test-svc".to_string(),
213            project: None,
214            runtime: Default::default(),
215            image: Some("nginx:latest".to_string()),
216            module: None,
217            replicas,
218            port: Some(8080),
219            domain: None,
220            health: None,
221            readiness: None,
222            liveness: None,
223            env: HashMap::new(),
224            resources: None,
225            volume: None,
226            deploy: None,
227            placement: None,
228            network: None,
229            aliases: vec![],
230            mounts: vec![],
231            routes: vec![],
232            host_port: None,
233            triggers: Vec::new(),
234            assets: None,
235            build: None,
236            tls_cert: None,
237            tls_key: None,
238            internal: false,
239            depends_on: vec![],
240            cmd: vec![],
241            extra_ports: vec![],
242            strip_prefix: None,
243            pull_policy: Default::default(),
244            backup: None,
245        }
246    }
247
248    fn make_instance(status: WorkloadStatus) -> InstanceState {
249        InstanceState {
250            handle: WorkloadHandle {
251                runtime_id: "test-id".to_string(),
252                name: "test-instance".to_string(),
253                metadata: HashMap::new(),
254            },
255            status,
256            host_port: None,
257            container_address: None,
258            health: HealthState::Unknown,
259            is_canary: false,
260            started_at: std::time::Instant::now(),
261        }
262    }
263
264    #[test]
265    fn from_config_fixed_sets_desired_replicas() {
266        let state = ServiceState::from_config(minimal_config(Replicas::Fixed(3)));
267        assert_eq!(state.desired_replicas, 3);
268    }
269
270    #[test]
271    fn from_config_auto_defaults_to_one() {
272        let state = ServiceState::from_config(minimal_config(Replicas::Auto));
273        assert_eq!(state.desired_replicas, 1);
274    }
275
276    #[test]
277    fn running_count_with_mixed_statuses() {
278        let mut state = ServiceState::from_config(minimal_config(Replicas::Fixed(4)));
279        state.instances = vec![
280            make_instance(WorkloadStatus::Running),
281            make_instance(WorkloadStatus::Stopped),
282            make_instance(WorkloadStatus::Running),
283            make_instance(WorkloadStatus::Failed),
284        ];
285        assert_eq!(state.running_count(), 2);
286    }
287}