Skip to main content

orca_control/
lib.rs

1pub mod alerts;
2pub mod api;
3pub mod auth;
4pub mod backup_scheduler;
5pub(crate) mod canary;
6pub mod cleanup_scheduler;
7pub mod cluster_api;
8pub(crate) mod cluster_handlers;
9pub mod cluster_state;
10pub mod deploy_history;
11pub mod health;
12pub(crate) mod instance;
13pub mod metrics;
14pub(crate) mod operations;
15pub mod proto;
16pub mod raft;
17pub mod reconciler;
18pub mod routes;
19pub mod scheduler;
20pub mod state;
21pub mod stats;
22pub mod store;
23pub mod topo_sort;
24pub mod watchdog;
25pub mod webhook;
26pub mod webhook_invocations;
27pub mod ws_handler;
28
29use std::collections::HashMap;
30use std::sync::Arc;
31
32use orca_core::config::ClusterConfig;
33use orca_core::runtime::Runtime;
34use orca_core::types::WorkloadStatus;
35use tracing::info;
36
37use crate::state::{AppState, InstanceState, SharedRouteTable, SharedWasmTriggers};
38
39/// Start the orca control plane (API server).
40///
41/// # Errors
42///
43/// Returns an error if the server fails to bind or encounters a fatal error.
44pub async fn run_server(
45    cluster_config: ClusterConfig,
46    container_runtime: Arc<dyn Runtime>,
47    wasm_runtime: Option<Arc<dyn Runtime>>,
48    route_table: SharedRouteTable,
49    wasm_triggers: SharedWasmTriggers,
50) -> anyhow::Result<()> {
51    run_server_with_acme(
52        cluster_config,
53        container_runtime,
54        wasm_runtime,
55        route_table,
56        wasm_triggers,
57        None,
58        None,
59    )
60    .await
61}
62
63/// Start the orca control plane with optional ACME hot-provisioning.
64pub async fn run_server_with_acme(
65    cluster_config: ClusterConfig,
66    container_runtime: Arc<dyn Runtime>,
67    wasm_runtime: Option<Arc<dyn Runtime>>,
68    route_table: SharedRouteTable,
69    wasm_triggers: SharedWasmTriggers,
70    acme_manager: Option<orca_proxy::acme::AcmeManager>,
71    cert_resolver: Option<orca_proxy::SharedCertResolver>,
72) -> anyhow::Result<()> {
73    let mut app_state = AppState::new(
74        cluster_config.clone(),
75        container_runtime,
76        wasm_runtime,
77        route_table,
78        wasm_triggers,
79    );
80    if let (Some(acme), Some(resolver)) = (acme_manager, cert_resolver) {
81        app_state = app_state.with_acme(acme, resolver);
82    }
83
84    if let Some(engine) = alerts::try_build_alert_engine(cluster_config.ai.as_ref()) {
85        app_state = app_state.with_alerts(engine);
86    }
87
88    // Open persistent store
89    let store_path = dirs_next::home_dir()
90        .unwrap_or_else(|| ".".into())
91        .join(".orca/cluster.db");
92    match store::ClusterStore::open(&store_path) {
93        Ok(s) => {
94            info!("Persistent store opened at {}", store_path.display());
95            app_state = app_state.with_store(Arc::new(s));
96        }
97        Err(e) => {
98            tracing::warn!("Failed to open store at {}: {e}", store_path.display());
99        }
100    }
101
102    let state = Arc::new(app_state);
103
104    // Restore persisted services, re-attaching to existing containers
105    if let Some(store) = &state.store {
106        match store.get_all_services() {
107            Ok(services) if !services.is_empty() => {
108                info!("Restoring {} persisted services", services.len());
109                for config in services.values() {
110                    if let Err(e) = restore_or_reconcile(&state, config).await {
111                        tracing::warn!(service = %config.name, "Failed to restore: {e}");
112                    }
113                }
114            }
115            Ok(_) => {}
116            Err(e) => tracing::warn!("Failed to load persisted services: {e}"),
117        }
118    }
119
120    // Register the master node so it appears in TUI/status.
121    register_master_node(&state, cluster_config.cluster.api_port).await;
122    spawn_master_heartbeat(state.clone());
123
124    // Spawn background resilience tasks.
125    watchdog::spawn_watchdog(state.clone());
126    health::spawn_health_checker(state.clone());
127    stats::spawn_stats_collector(state.clone());
128
129    // Spawn scheduled backup task if configured (needs state for agent dispatch).
130    if let Some(backup_cfg) = cluster_config.backup.clone()
131        && backup_scheduler::spawn_backup_scheduler(backup_cfg, state.clone()).is_some()
132    {
133        info!("Backup scheduler started");
134    }
135
136    if let Some(cleanup_cfg) = cluster_config.cleanup.clone()
137        && cleanup_scheduler::spawn_cleanup_scheduler(cleanup_cfg, state.clone()).is_some()
138    {
139        info!("Cleanup scheduler started");
140    }
141
142    if alerts::spawn_alert_monitor(state.clone()).is_some() {
143        info!("AI alert monitor started");
144    }
145
146    let app = api::router(state.clone());
147
148    let addr = format!("0.0.0.0:{}", cluster_config.cluster.api_port);
149    let listener = tokio::net::TcpListener::bind(&addr).await?;
150    info!("API server listening on {addr}");
151
152    axum::serve(listener, app)
153        .with_graceful_shutdown(shutdown_signal())
154        .await?;
155
156    Ok(())
157}
158
159/// Check if Docker containers already exist for a persisted service.
160/// If they do, populate in-memory state from existing containers.
161/// Otherwise, fall back to full reconciliation.
162async fn restore_or_reconcile(
163    state: &AppState,
164    config: &orca_core::config::ServiceConfig,
165) -> anyhow::Result<()> {
166    // Remote services run on an agent node whose WS connection isn't open yet
167    // at startup. Register a placeholder so send_reconcile includes this service
168    // when the agent connects — the agent will skip deployment if the container
169    // is already running.
170    if config
171        .placement
172        .as_ref()
173        .and_then(|p| p.node.as_ref())
174        .is_some()
175    {
176        let desired = match &config.replicas {
177            orca_core::types::Replicas::Fixed(n) => *n,
178            orca_core::types::Replicas::Auto => 1,
179        };
180        let mut services = state.services.write().await;
181        let svc_state = services
182            .entry(config.name.clone())
183            .or_insert_with(|| state::ServiceState::from_config(config.clone()));
184        svc_state.config = config.clone();
185        svc_state.desired_replicas = desired;
186        info!(service = %config.name, "Registered remote service placeholder");
187        return Ok(());
188    }
189
190    // Local service: try to re-attach existing containers first.
191    let cr = state
192        .container_runtime
193        .as_any()
194        .downcast_ref::<orca_agent::docker::ContainerRuntime>();
195
196    if let Some(container_rt) = cr {
197        let existing = container_rt.find_existing(&config.name).await?;
198        if !existing.is_empty() {
199            info!(
200                service = %config.name,
201                count = existing.len(),
202                "Re-attached to existing containers, skipping reconciliation"
203            );
204            populate_state_from_existing(state, config, existing).await;
205            return Ok(());
206        }
207    }
208
209    reconciler::reconcile_service(state, config).await
210}
211
212/// Populate in-memory `ServiceState` from already-running Docker containers.
213async fn populate_state_from_existing(
214    state: &AppState,
215    config: &orca_core::config::ServiceConfig,
216    handles: Vec<orca_core::runtime::WorkloadHandle>,
217) {
218    // Re-attached containers are already running — mark Healthy so the
219    // route filter accepts them. Health checker will correct on next probe.
220    let initial_health = if config.health.is_some() || config.liveness.is_some() {
221        orca_core::types::HealthState::Healthy
222    } else {
223        orca_core::types::HealthState::NoCheck
224    };
225
226    // Resolve host_port via the runtime (more reliable than metadata extraction)
227    let runtime = state.container_runtime.as_ref();
228    let mut instances: Vec<InstanceState> = Vec::new();
229    for handle in handles {
230        // Always resolve host_port using the configured container port —
231        // metadata's first-port-binding heuristic is unreliable when extra_ports
232        // are present (e.g. gitea SSH on 22222 vs HTTP on 3000).
233        let mut host_port = if let Some(p) = config.port {
234            runtime.resolve_host_port(&handle, p).await.ok().flatten()
235        } else {
236            None
237        };
238        if host_port.is_none() {
239            host_port = handle
240                .metadata
241                .get("host_port")
242                .and_then(|p| p.parse::<u16>().ok());
243        }
244        info!(
245            service = %config.name,
246            runtime_id = %&handle.runtime_id[..12],
247            ?host_port,
248            "Restored container instance"
249        );
250        instances.push(InstanceState {
251            handle,
252            status: WorkloadStatus::Running,
253            host_port,
254            container_address: None,
255            health: initial_health,
256            is_canary: false,
257            started_at: std::time::Instant::now(),
258        });
259    }
260
261    let desired = match &config.replicas {
262        orca_core::types::Replicas::Fixed(n) => *n,
263        orca_core::types::Replicas::Auto => 1,
264    };
265
266    let mut services = state.services.write().await;
267    let svc_state = services
268        .entry(config.name.clone())
269        .or_insert_with(|| state::ServiceState::from_config(config.clone()));
270    svc_state.instances = instances;
271    svc_state.desired_replicas = desired;
272    drop(services);
273
274    // Update routing table for the restored service
275    match config.runtime {
276        orca_core::types::RuntimeKind::Container => {
277            routes::update_container_routes(state, config).await;
278        }
279        orca_core::types::RuntimeKind::Wasm => {
280            routes::update_wasm_triggers(state, config).await;
281        }
282    }
283}
284
285/// Compute a deterministic node ID from the system hostname.
286fn master_node_id() -> u64 {
287    use std::hash::{Hash, Hasher};
288    let hostname = std::env::var("HOSTNAME")
289        .or_else(|_| std::env::var("COMPUTERNAME"))
290        .unwrap_or_else(|_| "orca-master".to_string());
291    let mut hasher = std::hash::DefaultHasher::new();
292    hostname.hash(&mut hasher);
293    hasher.finish()
294}
295
296/// Register the master node in the cluster node map.
297async fn register_master_node(state: &state::AppState, api_port: u16) {
298    let node_id = master_node_id();
299    let mut labels = HashMap::new();
300    labels.insert("role".to_string(), "master".to_string());
301    let node = state::RegisteredNode {
302        node_id,
303        address: format!("localhost:{api_port}"),
304        labels,
305        last_heartbeat: chrono::Utc::now(),
306        drain: false,
307        cpu_percent: 0.0,
308        memory_bytes: 0,
309        memory_total: 0,
310        disk_used: 0,
311        disk_total: 0,
312        net_rx: 0,
313        net_tx: 0,
314    };
315    let mut nodes = state.registered_nodes.write().await;
316    nodes.insert(node_id, node);
317    info!(node_id, "Master node self-registered");
318}
319
320/// Spawn a periodic task that samples host stats and writes them onto the
321/// master node's entry in the cluster node map. Joined nodes push their own
322/// stats via the heartbeat; the master has no heartbeat to piggyback on so
323/// it does this in-process instead. The same loop also prunes zombie
324/// nodes — entries whose last heartbeat is older than 60s — which keeps
325/// the cluster/info endpoint clean after a joined node is restarted with
326/// a fresh id (until node.id persistence lands on every joined box).
327fn spawn_master_heartbeat(state: Arc<state::AppState>) {
328    const STALE_AFTER: chrono::Duration = chrono::Duration::seconds(60);
329    let node_id = master_node_id();
330    let collector = Arc::new(orca_agent::host_stats::HostStatsCollector::new());
331    tokio::spawn(async move {
332        loop {
333            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
334            let sample = collector.sample();
335            let now = chrono::Utc::now();
336            let mut nodes = state.registered_nodes.write().await;
337            if let Some(node) = nodes.get_mut(&node_id) {
338                node.last_heartbeat = now;
339                node.cpu_percent = sample.cpu_percent;
340                node.memory_bytes = sample.memory_bytes;
341                node.memory_total = sample.memory_total;
342                node.disk_used = sample.disk_used;
343                node.disk_total = sample.disk_total;
344                node.net_rx = sample.net_rx;
345                node.net_tx = sample.net_tx;
346            }
347            nodes.retain(|id, node| {
348                if *id == node_id {
349                    return true;
350                }
351                let age = now - node.last_heartbeat;
352                age < STALE_AFTER
353            });
354        }
355    });
356}
357
358async fn shutdown_signal() {
359    tokio::signal::ctrl_c()
360        .await
361        .expect("failed to install ctrl+c handler");
362    info!("Shutdown signal received");
363}