Skip to main content

orca_control/
lib.rs

1pub mod api;
2pub mod auth;
3pub mod backup_scheduler;
4pub(crate) mod canary;
5pub mod cluster_api;
6pub(crate) mod cluster_handlers;
7pub mod cluster_state;
8pub mod deploy_history;
9pub mod health;
10pub(crate) mod instance;
11pub mod metrics;
12pub(crate) mod operations;
13pub mod proto;
14pub mod raft;
15pub mod reconciler;
16pub mod routes;
17pub mod scheduler;
18pub mod state;
19pub mod stats;
20pub mod store;
21pub mod topo_sort;
22pub mod watchdog;
23pub mod webhook;
24pub mod ws_handler;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28
29use orca_core::config::ClusterConfig;
30use orca_core::runtime::Runtime;
31use orca_core::types::WorkloadStatus;
32use tracing::info;
33
34use crate::state::{AppState, InstanceState, SharedRouteTable, SharedWasmTriggers};
35
36/// Start the orca control plane (API server).
37///
38/// # Errors
39///
40/// Returns an error if the server fails to bind or encounters a fatal error.
41pub async fn run_server(
42    cluster_config: ClusterConfig,
43    container_runtime: Arc<dyn Runtime>,
44    wasm_runtime: Option<Arc<dyn Runtime>>,
45    route_table: SharedRouteTable,
46    wasm_triggers: SharedWasmTriggers,
47) -> anyhow::Result<()> {
48    run_server_with_acme(
49        cluster_config,
50        container_runtime,
51        wasm_runtime,
52        route_table,
53        wasm_triggers,
54        None,
55        None,
56    )
57    .await
58}
59
60/// Start the orca control plane with optional ACME hot-provisioning.
61pub async fn run_server_with_acme(
62    cluster_config: ClusterConfig,
63    container_runtime: Arc<dyn Runtime>,
64    wasm_runtime: Option<Arc<dyn Runtime>>,
65    route_table: SharedRouteTable,
66    wasm_triggers: SharedWasmTriggers,
67    acme_manager: Option<orca_proxy::acme::AcmeManager>,
68    cert_resolver: Option<orca_proxy::SharedCertResolver>,
69) -> anyhow::Result<()> {
70    let mut app_state = AppState::new(
71        cluster_config.clone(),
72        container_runtime,
73        wasm_runtime,
74        route_table,
75        wasm_triggers,
76    );
77    if let (Some(acme), Some(resolver)) = (acme_manager, cert_resolver) {
78        app_state = app_state.with_acme(acme, resolver);
79    }
80
81    // Open persistent store
82    let store_path = dirs_next::home_dir()
83        .unwrap_or_else(|| ".".into())
84        .join(".orca/cluster.db");
85    match store::ClusterStore::open(&store_path) {
86        Ok(s) => {
87            info!("Persistent store opened at {}", store_path.display());
88            app_state = app_state.with_store(Arc::new(s));
89        }
90        Err(e) => {
91            tracing::warn!("Failed to open store at {}: {e}", store_path.display());
92        }
93    }
94
95    let state = Arc::new(app_state);
96
97    // Restore persisted services, re-attaching to existing containers
98    if let Some(store) = &state.store {
99        match store.get_all_services() {
100            Ok(services) if !services.is_empty() => {
101                info!("Restoring {} persisted services", services.len());
102                for config in services.values() {
103                    if let Err(e) = restore_or_reconcile(&state, config).await {
104                        tracing::warn!(service = %config.name, "Failed to restore: {e}");
105                    }
106                }
107            }
108            Ok(_) => {}
109            Err(e) => tracing::warn!("Failed to load persisted services: {e}"),
110        }
111    }
112
113    // Register the master node so it appears in TUI/status.
114    register_master_node(&state, cluster_config.cluster.api_port).await;
115    spawn_master_heartbeat(state.clone());
116
117    // Spawn background resilience tasks.
118    watchdog::spawn_watchdog(state.clone());
119    health::spawn_health_checker(state.clone());
120    stats::spawn_stats_collector(state.clone());
121
122    let app = api::router(state.clone());
123
124    let addr = format!("0.0.0.0:{}", cluster_config.cluster.api_port);
125    let listener = tokio::net::TcpListener::bind(&addr).await?;
126    info!("API server listening on {addr}");
127
128    axum::serve(listener, app)
129        .with_graceful_shutdown(shutdown_signal())
130        .await?;
131
132    Ok(())
133}
134
135/// Check if Docker containers already exist for a persisted service.
136/// If they do, populate in-memory state from existing containers.
137/// Otherwise, fall back to full reconciliation.
138async fn restore_or_reconcile(
139    state: &AppState,
140    config: &orca_core::config::ServiceConfig,
141) -> anyhow::Result<()> {
142    // Try to downcast to ContainerRuntime for find_existing
143    let cr = state
144        .container_runtime
145        .as_any()
146        .downcast_ref::<orca_agent::docker::ContainerRuntime>();
147
148    if let Some(container_rt) = cr {
149        let existing = container_rt.find_existing(&config.name).await?;
150        if !existing.is_empty() {
151            info!(
152                service = %config.name,
153                count = existing.len(),
154                "Re-attached to existing containers, skipping reconciliation"
155            );
156            populate_state_from_existing(state, config, existing).await;
157            return Ok(());
158        }
159    }
160
161    reconciler::reconcile_service(state, config).await
162}
163
164/// Populate in-memory `ServiceState` from already-running Docker containers.
165async fn populate_state_from_existing(
166    state: &AppState,
167    config: &orca_core::config::ServiceConfig,
168    handles: Vec<orca_core::runtime::WorkloadHandle>,
169) {
170    // Re-attached containers are already running — mark Healthy so the
171    // route filter accepts them. Health checker will correct on next probe.
172    let initial_health = if config.health.is_some() || config.liveness.is_some() {
173        orca_core::types::HealthState::Healthy
174    } else {
175        orca_core::types::HealthState::NoCheck
176    };
177
178    // Resolve host_port via the runtime (more reliable than metadata extraction)
179    let runtime = state.container_runtime.as_ref();
180    let mut instances: Vec<InstanceState> = Vec::new();
181    for handle in handles {
182        // Always resolve host_port using the configured container port —
183        // metadata's first-port-binding heuristic is unreliable when extra_ports
184        // are present (e.g. gitea SSH on 22222 vs HTTP on 3000).
185        let mut host_port = if let Some(p) = config.port {
186            runtime.resolve_host_port(&handle, p).await.ok().flatten()
187        } else {
188            None
189        };
190        if host_port.is_none() {
191            host_port = handle
192                .metadata
193                .get("host_port")
194                .and_then(|p| p.parse::<u16>().ok());
195        }
196        info!(
197            service = %config.name,
198            runtime_id = %&handle.runtime_id[..12],
199            ?host_port,
200            "Restored container instance"
201        );
202        instances.push(InstanceState {
203            handle,
204            status: WorkloadStatus::Running,
205            host_port,
206            container_address: None,
207            health: initial_health,
208            is_canary: false,
209            started_at: std::time::Instant::now(),
210        });
211    }
212
213    let desired = match &config.replicas {
214        orca_core::types::Replicas::Fixed(n) => *n,
215        orca_core::types::Replicas::Auto => 1,
216    };
217
218    let mut services = state.services.write().await;
219    let svc_state = services
220        .entry(config.name.clone())
221        .or_insert_with(|| state::ServiceState::from_config(config.clone()));
222    svc_state.instances = instances;
223    svc_state.desired_replicas = desired;
224    drop(services);
225
226    // Update routing table for the restored service
227    match config.runtime {
228        orca_core::types::RuntimeKind::Container => {
229            routes::update_container_routes(state, config).await;
230        }
231        orca_core::types::RuntimeKind::Wasm => {
232            routes::update_wasm_triggers(state, config).await;
233        }
234    }
235}
236
237/// Compute a deterministic node ID from the system hostname.
238fn master_node_id() -> u64 {
239    use std::hash::{Hash, Hasher};
240    let hostname = std::env::var("HOSTNAME")
241        .or_else(|_| std::env::var("COMPUTERNAME"))
242        .unwrap_or_else(|_| "orca-master".to_string());
243    let mut hasher = std::hash::DefaultHasher::new();
244    hostname.hash(&mut hasher);
245    hasher.finish()
246}
247
248/// Register the master node in the cluster node map.
249async fn register_master_node(state: &state::AppState, api_port: u16) {
250    let node_id = master_node_id();
251    let mut labels = HashMap::new();
252    labels.insert("role".to_string(), "master".to_string());
253    let node = state::RegisteredNode {
254        node_id,
255        address: format!("localhost:{api_port}"),
256        labels,
257        last_heartbeat: chrono::Utc::now(),
258        drain: false,
259        cpu_percent: 0.0,
260        memory_bytes: 0,
261        memory_total: 0,
262        disk_used: 0,
263        disk_total: 0,
264        net_rx: 0,
265        net_tx: 0,
266    };
267    let mut nodes = state.registered_nodes.write().await;
268    nodes.insert(node_id, node);
269    info!(node_id, "Master node self-registered");
270}
271
272/// Spawn a periodic task that samples host stats and writes them onto the
273/// master node's entry in the cluster node map. Joined nodes push their own
274/// stats via the heartbeat; the master has no heartbeat to piggyback on so
275/// it does this in-process instead. The same loop also prunes zombie
276/// nodes — entries whose last heartbeat is older than 60s — which keeps
277/// the cluster/info endpoint clean after a joined node is restarted with
278/// a fresh id (until node.id persistence lands on every joined box).
279fn spawn_master_heartbeat(state: Arc<state::AppState>) {
280    const STALE_AFTER: chrono::Duration = chrono::Duration::seconds(60);
281    let node_id = master_node_id();
282    let collector = Arc::new(orca_agent::host_stats::HostStatsCollector::new());
283    tokio::spawn(async move {
284        loop {
285            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
286            let sample = collector.sample();
287            let now = chrono::Utc::now();
288            let mut nodes = state.registered_nodes.write().await;
289            if let Some(node) = nodes.get_mut(&node_id) {
290                node.last_heartbeat = now;
291                node.cpu_percent = sample.cpu_percent;
292                node.memory_bytes = sample.memory_bytes;
293                node.memory_total = sample.memory_total;
294                node.disk_used = sample.disk_used;
295                node.disk_total = sample.disk_total;
296                node.net_rx = sample.net_rx;
297                node.net_tx = sample.net_tx;
298            }
299            nodes.retain(|id, node| {
300                if *id == node_id {
301                    return true;
302                }
303                let age = now - node.last_heartbeat;
304                age < STALE_AFTER
305            });
306        }
307    });
308}
309
310async fn shutdown_signal() {
311    tokio::signal::ctrl_c()
312        .await
313        .expect("failed to install ctrl+c handler");
314    info!("Shutdown signal received");
315}