Skip to main content

orca_control/
lib.rs

1pub mod api;
2pub mod auth;
3pub mod backup_scheduler;
4pub(crate) mod canary;
5pub mod cleanup_scheduler;
6pub mod cluster_api;
7pub(crate) mod cluster_handlers;
8pub mod cluster_state;
9pub mod deploy_history;
10pub mod health;
11pub(crate) mod instance;
12pub mod metrics;
13pub(crate) mod operations;
14pub mod proto;
15pub mod raft;
16pub mod reconciler;
17pub mod routes;
18pub mod scheduler;
19pub mod state;
20pub mod stats;
21pub mod store;
22pub mod topo_sort;
23pub mod watchdog;
24pub mod webhook;
25pub mod ws_handler;
26
27use std::collections::HashMap;
28use std::sync::Arc;
29
30use orca_core::config::ClusterConfig;
31use orca_core::runtime::Runtime;
32use orca_core::types::WorkloadStatus;
33use tracing::info;
34
35use crate::state::{AppState, InstanceState, SharedRouteTable, SharedWasmTriggers};
36
37/// Start the orca control plane (API server).
38///
39/// # Errors
40///
41/// Returns an error if the server fails to bind or encounters a fatal error.
42pub async fn run_server(
43    cluster_config: ClusterConfig,
44    container_runtime: Arc<dyn Runtime>,
45    wasm_runtime: Option<Arc<dyn Runtime>>,
46    route_table: SharedRouteTable,
47    wasm_triggers: SharedWasmTriggers,
48) -> anyhow::Result<()> {
49    run_server_with_acme(
50        cluster_config,
51        container_runtime,
52        wasm_runtime,
53        route_table,
54        wasm_triggers,
55        None,
56        None,
57    )
58    .await
59}
60
61/// Start the orca control plane with optional ACME hot-provisioning.
62pub async fn run_server_with_acme(
63    cluster_config: ClusterConfig,
64    container_runtime: Arc<dyn Runtime>,
65    wasm_runtime: Option<Arc<dyn Runtime>>,
66    route_table: SharedRouteTable,
67    wasm_triggers: SharedWasmTriggers,
68    acme_manager: Option<orca_proxy::acme::AcmeManager>,
69    cert_resolver: Option<orca_proxy::SharedCertResolver>,
70) -> anyhow::Result<()> {
71    let mut app_state = AppState::new(
72        cluster_config.clone(),
73        container_runtime,
74        wasm_runtime,
75        route_table,
76        wasm_triggers,
77    );
78    if let (Some(acme), Some(resolver)) = (acme_manager, cert_resolver) {
79        app_state = app_state.with_acme(acme, resolver);
80    }
81
82    // Open persistent store
83    let store_path = dirs_next::home_dir()
84        .unwrap_or_else(|| ".".into())
85        .join(".orca/cluster.db");
86    match store::ClusterStore::open(&store_path) {
87        Ok(s) => {
88            info!("Persistent store opened at {}", store_path.display());
89            app_state = app_state.with_store(Arc::new(s));
90        }
91        Err(e) => {
92            tracing::warn!("Failed to open store at {}: {e}", store_path.display());
93        }
94    }
95
96    let state = Arc::new(app_state);
97
98    // Restore persisted services, re-attaching to existing containers
99    if let Some(store) = &state.store {
100        match store.get_all_services() {
101            Ok(services) if !services.is_empty() => {
102                info!("Restoring {} persisted services", services.len());
103                for config in services.values() {
104                    if let Err(e) = restore_or_reconcile(&state, config).await {
105                        tracing::warn!(service = %config.name, "Failed to restore: {e}");
106                    }
107                }
108            }
109            Ok(_) => {}
110            Err(e) => tracing::warn!("Failed to load persisted services: {e}"),
111        }
112    }
113
114    // Register the master node so it appears in TUI/status.
115    register_master_node(&state, cluster_config.cluster.api_port).await;
116    spawn_master_heartbeat(state.clone());
117
118    // Spawn background resilience tasks.
119    watchdog::spawn_watchdog(state.clone());
120    health::spawn_health_checker(state.clone());
121    stats::spawn_stats_collector(state.clone());
122
123    // Spawn scheduled backup task if configured (needs state for agent dispatch).
124    if let Some(backup_cfg) = cluster_config.backup.clone()
125        && backup_scheduler::spawn_backup_scheduler(backup_cfg, state.clone()).is_some()
126    {
127        info!("Backup scheduler started");
128    }
129
130    if let Some(cleanup_cfg) = cluster_config.cleanup.clone()
131        && cleanup_scheduler::spawn_cleanup_scheduler(cleanup_cfg, state.clone()).is_some()
132    {
133        info!("Cleanup scheduler started");
134    }
135
136    let app = api::router(state.clone());
137
138    let addr = format!("0.0.0.0:{}", cluster_config.cluster.api_port);
139    let listener = tokio::net::TcpListener::bind(&addr).await?;
140    info!("API server listening on {addr}");
141
142    axum::serve(listener, app)
143        .with_graceful_shutdown(shutdown_signal())
144        .await?;
145
146    Ok(())
147}
148
149/// Check if Docker containers already exist for a persisted service.
150/// If they do, populate in-memory state from existing containers.
151/// Otherwise, fall back to full reconciliation.
152async fn restore_or_reconcile(
153    state: &AppState,
154    config: &orca_core::config::ServiceConfig,
155) -> anyhow::Result<()> {
156    // Remote services run on an agent node whose WS connection isn't open yet
157    // at startup. Register a placeholder so send_reconcile includes this service
158    // when the agent connects — the agent will skip deployment if the container
159    // is already running.
160    if config
161        .placement
162        .as_ref()
163        .and_then(|p| p.node.as_ref())
164        .is_some()
165    {
166        let desired = match &config.replicas {
167            orca_core::types::Replicas::Fixed(n) => *n,
168            orca_core::types::Replicas::Auto => 1,
169        };
170        let mut services = state.services.write().await;
171        let svc_state = services
172            .entry(config.name.clone())
173            .or_insert_with(|| state::ServiceState::from_config(config.clone()));
174        svc_state.config = config.clone();
175        svc_state.desired_replicas = desired;
176        info!(service = %config.name, "Registered remote service placeholder");
177        return Ok(());
178    }
179
180    // Local service: try to re-attach existing containers first.
181    let cr = state
182        .container_runtime
183        .as_any()
184        .downcast_ref::<orca_agent::docker::ContainerRuntime>();
185
186    if let Some(container_rt) = cr {
187        let existing = container_rt.find_existing(&config.name).await?;
188        if !existing.is_empty() {
189            info!(
190                service = %config.name,
191                count = existing.len(),
192                "Re-attached to existing containers, skipping reconciliation"
193            );
194            populate_state_from_existing(state, config, existing).await;
195            return Ok(());
196        }
197    }
198
199    reconciler::reconcile_service(state, config).await
200}
201
202/// Populate in-memory `ServiceState` from already-running Docker containers.
203async fn populate_state_from_existing(
204    state: &AppState,
205    config: &orca_core::config::ServiceConfig,
206    handles: Vec<orca_core::runtime::WorkloadHandle>,
207) {
208    // Re-attached containers are already running — mark Healthy so the
209    // route filter accepts them. Health checker will correct on next probe.
210    let initial_health = if config.health.is_some() || config.liveness.is_some() {
211        orca_core::types::HealthState::Healthy
212    } else {
213        orca_core::types::HealthState::NoCheck
214    };
215
216    // Resolve host_port via the runtime (more reliable than metadata extraction)
217    let runtime = state.container_runtime.as_ref();
218    let mut instances: Vec<InstanceState> = Vec::new();
219    for handle in handles {
220        // Always resolve host_port using the configured container port —
221        // metadata's first-port-binding heuristic is unreliable when extra_ports
222        // are present (e.g. gitea SSH on 22222 vs HTTP on 3000).
223        let mut host_port = if let Some(p) = config.port {
224            runtime.resolve_host_port(&handle, p).await.ok().flatten()
225        } else {
226            None
227        };
228        if host_port.is_none() {
229            host_port = handle
230                .metadata
231                .get("host_port")
232                .and_then(|p| p.parse::<u16>().ok());
233        }
234        info!(
235            service = %config.name,
236            runtime_id = %&handle.runtime_id[..12],
237            ?host_port,
238            "Restored container instance"
239        );
240        instances.push(InstanceState {
241            handle,
242            status: WorkloadStatus::Running,
243            host_port,
244            container_address: None,
245            health: initial_health,
246            is_canary: false,
247            started_at: std::time::Instant::now(),
248        });
249    }
250
251    let desired = match &config.replicas {
252        orca_core::types::Replicas::Fixed(n) => *n,
253        orca_core::types::Replicas::Auto => 1,
254    };
255
256    let mut services = state.services.write().await;
257    let svc_state = services
258        .entry(config.name.clone())
259        .or_insert_with(|| state::ServiceState::from_config(config.clone()));
260    svc_state.instances = instances;
261    svc_state.desired_replicas = desired;
262    drop(services);
263
264    // Update routing table for the restored service
265    match config.runtime {
266        orca_core::types::RuntimeKind::Container => {
267            routes::update_container_routes(state, config).await;
268        }
269        orca_core::types::RuntimeKind::Wasm => {
270            routes::update_wasm_triggers(state, config).await;
271        }
272    }
273}
274
275/// Compute a deterministic node ID from the system hostname.
276fn master_node_id() -> u64 {
277    use std::hash::{Hash, Hasher};
278    let hostname = std::env::var("HOSTNAME")
279        .or_else(|_| std::env::var("COMPUTERNAME"))
280        .unwrap_or_else(|_| "orca-master".to_string());
281    let mut hasher = std::hash::DefaultHasher::new();
282    hostname.hash(&mut hasher);
283    hasher.finish()
284}
285
286/// Register the master node in the cluster node map.
287async fn register_master_node(state: &state::AppState, api_port: u16) {
288    let node_id = master_node_id();
289    let mut labels = HashMap::new();
290    labels.insert("role".to_string(), "master".to_string());
291    let node = state::RegisteredNode {
292        node_id,
293        address: format!("localhost:{api_port}"),
294        labels,
295        last_heartbeat: chrono::Utc::now(),
296        drain: false,
297        cpu_percent: 0.0,
298        memory_bytes: 0,
299        memory_total: 0,
300        disk_used: 0,
301        disk_total: 0,
302        net_rx: 0,
303        net_tx: 0,
304    };
305    let mut nodes = state.registered_nodes.write().await;
306    nodes.insert(node_id, node);
307    info!(node_id, "Master node self-registered");
308}
309
310/// Spawn a periodic task that samples host stats and writes them onto the
311/// master node's entry in the cluster node map. Joined nodes push their own
312/// stats via the heartbeat; the master has no heartbeat to piggyback on so
313/// it does this in-process instead. The same loop also prunes zombie
314/// nodes — entries whose last heartbeat is older than 60s — which keeps
315/// the cluster/info endpoint clean after a joined node is restarted with
316/// a fresh id (until node.id persistence lands on every joined box).
317fn spawn_master_heartbeat(state: Arc<state::AppState>) {
318    const STALE_AFTER: chrono::Duration = chrono::Duration::seconds(60);
319    let node_id = master_node_id();
320    let collector = Arc::new(orca_agent::host_stats::HostStatsCollector::new());
321    tokio::spawn(async move {
322        loop {
323            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
324            let sample = collector.sample();
325            let now = chrono::Utc::now();
326            let mut nodes = state.registered_nodes.write().await;
327            if let Some(node) = nodes.get_mut(&node_id) {
328                node.last_heartbeat = now;
329                node.cpu_percent = sample.cpu_percent;
330                node.memory_bytes = sample.memory_bytes;
331                node.memory_total = sample.memory_total;
332                node.disk_used = sample.disk_used;
333                node.disk_total = sample.disk_total;
334                node.net_rx = sample.net_rx;
335                node.net_tx = sample.net_tx;
336            }
337            nodes.retain(|id, node| {
338                if *id == node_id {
339                    return true;
340                }
341                let age = now - node.last_heartbeat;
342                age < STALE_AFTER
343            });
344        }
345    });
346}
347
348async fn shutdown_signal() {
349    tokio::signal::ctrl_c()
350        .await
351        .expect("failed to install ctrl+c handler");
352    info!("Shutdown signal received");
353}