Skip to main content

orca_control/
lib.rs

1pub mod api;
2pub mod auth;
3pub mod backup_scheduler;
4pub(crate) mod canary;
5pub mod cleanup_scheduler;
6pub mod cluster_api;
7pub(crate) mod cluster_handlers;
8pub mod cluster_state;
9pub mod deploy_history;
10pub mod health;
11pub(crate) mod instance;
12pub mod metrics;
13pub(crate) mod operations;
14pub mod proto;
15pub mod raft;
16pub mod reconciler;
17pub mod routes;
18pub mod scheduler;
19pub mod state;
20pub mod stats;
21pub mod store;
22pub mod topo_sort;
23pub mod watchdog;
24pub mod webhook;
25pub mod webhook_invocations;
26pub mod ws_handler;
27
28use std::collections::HashMap;
29use std::sync::Arc;
30
31use orca_core::config::ClusterConfig;
32use orca_core::runtime::Runtime;
33use orca_core::types::WorkloadStatus;
34use tracing::info;
35
36use crate::state::{AppState, InstanceState, SharedRouteTable, SharedWasmTriggers};
37
38/// Start the orca control plane (API server).
39///
40/// # Errors
41///
42/// Returns an error if the server fails to bind or encounters a fatal error.
43pub async fn run_server(
44    cluster_config: ClusterConfig,
45    container_runtime: Arc<dyn Runtime>,
46    wasm_runtime: Option<Arc<dyn Runtime>>,
47    route_table: SharedRouteTable,
48    wasm_triggers: SharedWasmTriggers,
49) -> anyhow::Result<()> {
50    run_server_with_acme(
51        cluster_config,
52        container_runtime,
53        wasm_runtime,
54        route_table,
55        wasm_triggers,
56        None,
57        None,
58    )
59    .await
60}
61
62/// Start the orca control plane with optional ACME hot-provisioning.
63pub async fn run_server_with_acme(
64    cluster_config: ClusterConfig,
65    container_runtime: Arc<dyn Runtime>,
66    wasm_runtime: Option<Arc<dyn Runtime>>,
67    route_table: SharedRouteTable,
68    wasm_triggers: SharedWasmTriggers,
69    acme_manager: Option<orca_proxy::acme::AcmeManager>,
70    cert_resolver: Option<orca_proxy::SharedCertResolver>,
71) -> anyhow::Result<()> {
72    let mut app_state = AppState::new(
73        cluster_config.clone(),
74        container_runtime,
75        wasm_runtime,
76        route_table,
77        wasm_triggers,
78    );
79    if let (Some(acme), Some(resolver)) = (acme_manager, cert_resolver) {
80        app_state = app_state.with_acme(acme, resolver);
81    }
82
83    // Open persistent store
84    let store_path = dirs_next::home_dir()
85        .unwrap_or_else(|| ".".into())
86        .join(".orca/cluster.db");
87    match store::ClusterStore::open(&store_path) {
88        Ok(s) => {
89            info!("Persistent store opened at {}", store_path.display());
90            app_state = app_state.with_store(Arc::new(s));
91        }
92        Err(e) => {
93            tracing::warn!("Failed to open store at {}: {e}", store_path.display());
94        }
95    }
96
97    let state = Arc::new(app_state);
98
99    // Restore persisted services, re-attaching to existing containers
100    if let Some(store) = &state.store {
101        match store.get_all_services() {
102            Ok(services) if !services.is_empty() => {
103                info!("Restoring {} persisted services", services.len());
104                for config in services.values() {
105                    if let Err(e) = restore_or_reconcile(&state, config).await {
106                        tracing::warn!(service = %config.name, "Failed to restore: {e}");
107                    }
108                }
109            }
110            Ok(_) => {}
111            Err(e) => tracing::warn!("Failed to load persisted services: {e}"),
112        }
113    }
114
115    // Register the master node so it appears in TUI/status.
116    register_master_node(&state, cluster_config.cluster.api_port).await;
117    spawn_master_heartbeat(state.clone());
118
119    // Spawn background resilience tasks.
120    watchdog::spawn_watchdog(state.clone());
121    health::spawn_health_checker(state.clone());
122    stats::spawn_stats_collector(state.clone());
123
124    // Spawn scheduled backup task if configured (needs state for agent dispatch).
125    if let Some(backup_cfg) = cluster_config.backup.clone()
126        && backup_scheduler::spawn_backup_scheduler(backup_cfg, state.clone()).is_some()
127    {
128        info!("Backup scheduler started");
129    }
130
131    if let Some(cleanup_cfg) = cluster_config.cleanup.clone()
132        && cleanup_scheduler::spawn_cleanup_scheduler(cleanup_cfg, state.clone()).is_some()
133    {
134        info!("Cleanup scheduler started");
135    }
136
137    let app = api::router(state.clone());
138
139    let addr = format!("0.0.0.0:{}", cluster_config.cluster.api_port);
140    let listener = tokio::net::TcpListener::bind(&addr).await?;
141    info!("API server listening on {addr}");
142
143    axum::serve(listener, app)
144        .with_graceful_shutdown(shutdown_signal())
145        .await?;
146
147    Ok(())
148}
149
150/// Check if Docker containers already exist for a persisted service.
151/// If they do, populate in-memory state from existing containers.
152/// Otherwise, fall back to full reconciliation.
153async fn restore_or_reconcile(
154    state: &AppState,
155    config: &orca_core::config::ServiceConfig,
156) -> anyhow::Result<()> {
157    // Remote services run on an agent node whose WS connection isn't open yet
158    // at startup. Register a placeholder so send_reconcile includes this service
159    // when the agent connects — the agent will skip deployment if the container
160    // is already running.
161    if config
162        .placement
163        .as_ref()
164        .and_then(|p| p.node.as_ref())
165        .is_some()
166    {
167        let desired = match &config.replicas {
168            orca_core::types::Replicas::Fixed(n) => *n,
169            orca_core::types::Replicas::Auto => 1,
170        };
171        let mut services = state.services.write().await;
172        let svc_state = services
173            .entry(config.name.clone())
174            .or_insert_with(|| state::ServiceState::from_config(config.clone()));
175        svc_state.config = config.clone();
176        svc_state.desired_replicas = desired;
177        info!(service = %config.name, "Registered remote service placeholder");
178        return Ok(());
179    }
180
181    // Local service: try to re-attach existing containers first.
182    let cr = state
183        .container_runtime
184        .as_any()
185        .downcast_ref::<orca_agent::docker::ContainerRuntime>();
186
187    if let Some(container_rt) = cr {
188        let existing = container_rt.find_existing(&config.name).await?;
189        if !existing.is_empty() {
190            info!(
191                service = %config.name,
192                count = existing.len(),
193                "Re-attached to existing containers, skipping reconciliation"
194            );
195            populate_state_from_existing(state, config, existing).await;
196            return Ok(());
197        }
198    }
199
200    reconciler::reconcile_service(state, config).await
201}
202
203/// Populate in-memory `ServiceState` from already-running Docker containers.
204async fn populate_state_from_existing(
205    state: &AppState,
206    config: &orca_core::config::ServiceConfig,
207    handles: Vec<orca_core::runtime::WorkloadHandle>,
208) {
209    // Re-attached containers are already running — mark Healthy so the
210    // route filter accepts them. Health checker will correct on next probe.
211    let initial_health = if config.health.is_some() || config.liveness.is_some() {
212        orca_core::types::HealthState::Healthy
213    } else {
214        orca_core::types::HealthState::NoCheck
215    };
216
217    // Resolve host_port via the runtime (more reliable than metadata extraction)
218    let runtime = state.container_runtime.as_ref();
219    let mut instances: Vec<InstanceState> = Vec::new();
220    for handle in handles {
221        // Always resolve host_port using the configured container port —
222        // metadata's first-port-binding heuristic is unreliable when extra_ports
223        // are present (e.g. gitea SSH on 22222 vs HTTP on 3000).
224        let mut host_port = if let Some(p) = config.port {
225            runtime.resolve_host_port(&handle, p).await.ok().flatten()
226        } else {
227            None
228        };
229        if host_port.is_none() {
230            host_port = handle
231                .metadata
232                .get("host_port")
233                .and_then(|p| p.parse::<u16>().ok());
234        }
235        info!(
236            service = %config.name,
237            runtime_id = %&handle.runtime_id[..12],
238            ?host_port,
239            "Restored container instance"
240        );
241        instances.push(InstanceState {
242            handle,
243            status: WorkloadStatus::Running,
244            host_port,
245            container_address: None,
246            health: initial_health,
247            is_canary: false,
248            started_at: std::time::Instant::now(),
249        });
250    }
251
252    let desired = match &config.replicas {
253        orca_core::types::Replicas::Fixed(n) => *n,
254        orca_core::types::Replicas::Auto => 1,
255    };
256
257    let mut services = state.services.write().await;
258    let svc_state = services
259        .entry(config.name.clone())
260        .or_insert_with(|| state::ServiceState::from_config(config.clone()));
261    svc_state.instances = instances;
262    svc_state.desired_replicas = desired;
263    drop(services);
264
265    // Update routing table for the restored service
266    match config.runtime {
267        orca_core::types::RuntimeKind::Container => {
268            routes::update_container_routes(state, config).await;
269        }
270        orca_core::types::RuntimeKind::Wasm => {
271            routes::update_wasm_triggers(state, config).await;
272        }
273    }
274}
275
276/// Compute a deterministic node ID from the system hostname.
277fn master_node_id() -> u64 {
278    use std::hash::{Hash, Hasher};
279    let hostname = std::env::var("HOSTNAME")
280        .or_else(|_| std::env::var("COMPUTERNAME"))
281        .unwrap_or_else(|_| "orca-master".to_string());
282    let mut hasher = std::hash::DefaultHasher::new();
283    hostname.hash(&mut hasher);
284    hasher.finish()
285}
286
287/// Register the master node in the cluster node map.
288async fn register_master_node(state: &state::AppState, api_port: u16) {
289    let node_id = master_node_id();
290    let mut labels = HashMap::new();
291    labels.insert("role".to_string(), "master".to_string());
292    let node = state::RegisteredNode {
293        node_id,
294        address: format!("localhost:{api_port}"),
295        labels,
296        last_heartbeat: chrono::Utc::now(),
297        drain: false,
298        cpu_percent: 0.0,
299        memory_bytes: 0,
300        memory_total: 0,
301        disk_used: 0,
302        disk_total: 0,
303        net_rx: 0,
304        net_tx: 0,
305    };
306    let mut nodes = state.registered_nodes.write().await;
307    nodes.insert(node_id, node);
308    info!(node_id, "Master node self-registered");
309}
310
311/// Spawn a periodic task that samples host stats and writes them onto the
312/// master node's entry in the cluster node map. Joined nodes push their own
313/// stats via the heartbeat; the master has no heartbeat to piggyback on so
314/// it does this in-process instead. The same loop also prunes zombie
315/// nodes — entries whose last heartbeat is older than 60s — which keeps
316/// the cluster/info endpoint clean after a joined node is restarted with
317/// a fresh id (until node.id persistence lands on every joined box).
318fn spawn_master_heartbeat(state: Arc<state::AppState>) {
319    const STALE_AFTER: chrono::Duration = chrono::Duration::seconds(60);
320    let node_id = master_node_id();
321    let collector = Arc::new(orca_agent::host_stats::HostStatsCollector::new());
322    tokio::spawn(async move {
323        loop {
324            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
325            let sample = collector.sample();
326            let now = chrono::Utc::now();
327            let mut nodes = state.registered_nodes.write().await;
328            if let Some(node) = nodes.get_mut(&node_id) {
329                node.last_heartbeat = now;
330                node.cpu_percent = sample.cpu_percent;
331                node.memory_bytes = sample.memory_bytes;
332                node.memory_total = sample.memory_total;
333                node.disk_used = sample.disk_used;
334                node.disk_total = sample.disk_total;
335                node.net_rx = sample.net_rx;
336                node.net_tx = sample.net_tx;
337            }
338            nodes.retain(|id, node| {
339                if *id == node_id {
340                    return true;
341                }
342                let age = now - node.last_heartbeat;
343                age < STALE_AFTER
344            });
345        }
346    });
347}
348
349async fn shutdown_signal() {
350    tokio::signal::ctrl_c()
351        .await
352        .expect("failed to install ctrl+c handler");
353    info!("Shutdown signal received");
354}