1pub mod api;
2pub mod auth;
3pub mod backup_scheduler;
4pub(crate) mod canary;
5pub mod cluster_api;
6pub(crate) mod cluster_handlers;
7pub mod cluster_state;
8pub mod deploy_history;
9pub mod health;
10pub(crate) mod instance;
11pub mod metrics;
12pub(crate) mod operations;
13pub mod proto;
14pub mod raft;
15pub mod reconciler;
16pub mod routes;
17pub mod scheduler;
18pub mod state;
19pub mod stats;
20pub mod store;
21pub mod topo_sort;
22pub mod watchdog;
23pub mod webhook;
24pub mod ws_handler;
25
26use std::collections::HashMap;
27use std::sync::Arc;
28
29use orca_core::config::ClusterConfig;
30use orca_core::runtime::Runtime;
31use orca_core::types::WorkloadStatus;
32use tracing::info;
33
34use crate::state::{AppState, InstanceState, SharedRouteTable, SharedWasmTriggers};
35
36pub async fn run_server(
42 cluster_config: ClusterConfig,
43 container_runtime: Arc<dyn Runtime>,
44 wasm_runtime: Option<Arc<dyn Runtime>>,
45 route_table: SharedRouteTable,
46 wasm_triggers: SharedWasmTriggers,
47) -> anyhow::Result<()> {
48 run_server_with_acme(
49 cluster_config,
50 container_runtime,
51 wasm_runtime,
52 route_table,
53 wasm_triggers,
54 None,
55 None,
56 )
57 .await
58}
59
60pub async fn run_server_with_acme(
62 cluster_config: ClusterConfig,
63 container_runtime: Arc<dyn Runtime>,
64 wasm_runtime: Option<Arc<dyn Runtime>>,
65 route_table: SharedRouteTable,
66 wasm_triggers: SharedWasmTriggers,
67 acme_manager: Option<orca_proxy::acme::AcmeManager>,
68 cert_resolver: Option<orca_proxy::SharedCertResolver>,
69) -> anyhow::Result<()> {
70 let mut app_state = AppState::new(
71 cluster_config.clone(),
72 container_runtime,
73 wasm_runtime,
74 route_table,
75 wasm_triggers,
76 );
77 if let (Some(acme), Some(resolver)) = (acme_manager, cert_resolver) {
78 app_state = app_state.with_acme(acme, resolver);
79 }
80
81 let store_path = dirs_next::home_dir()
83 .unwrap_or_else(|| ".".into())
84 .join(".orca/cluster.db");
85 match store::ClusterStore::open(&store_path) {
86 Ok(s) => {
87 info!("Persistent store opened at {}", store_path.display());
88 app_state = app_state.with_store(Arc::new(s));
89 }
90 Err(e) => {
91 tracing::warn!("Failed to open store at {}: {e}", store_path.display());
92 }
93 }
94
95 let state = Arc::new(app_state);
96
97 if let Some(store) = &state.store {
99 match store.get_all_services() {
100 Ok(services) if !services.is_empty() => {
101 info!("Restoring {} persisted services", services.len());
102 for config in services.values() {
103 if let Err(e) = restore_or_reconcile(&state, config).await {
104 tracing::warn!(service = %config.name, "Failed to restore: {e}");
105 }
106 }
107 }
108 Ok(_) => {}
109 Err(e) => tracing::warn!("Failed to load persisted services: {e}"),
110 }
111 }
112
113 register_master_node(&state, cluster_config.cluster.api_port).await;
115 spawn_master_heartbeat(state.clone());
116
117 watchdog::spawn_watchdog(state.clone());
119 health::spawn_health_checker(state.clone());
120 stats::spawn_stats_collector(state.clone());
121
122 let app = api::router(state.clone());
123
124 let addr = format!("0.0.0.0:{}", cluster_config.cluster.api_port);
125 let listener = tokio::net::TcpListener::bind(&addr).await?;
126 info!("API server listening on {addr}");
127
128 axum::serve(listener, app)
129 .with_graceful_shutdown(shutdown_signal())
130 .await?;
131
132 Ok(())
133}
134
135async fn restore_or_reconcile(
139 state: &AppState,
140 config: &orca_core::config::ServiceConfig,
141) -> anyhow::Result<()> {
142 let cr = state
144 .container_runtime
145 .as_any()
146 .downcast_ref::<orca_agent::docker::ContainerRuntime>();
147
148 if let Some(container_rt) = cr {
149 let existing = container_rt.find_existing(&config.name).await?;
150 if !existing.is_empty() {
151 info!(
152 service = %config.name,
153 count = existing.len(),
154 "Re-attached to existing containers, skipping reconciliation"
155 );
156 populate_state_from_existing(state, config, existing).await;
157 return Ok(());
158 }
159 }
160
161 reconciler::reconcile_service(state, config).await
162}
163
164async fn populate_state_from_existing(
166 state: &AppState,
167 config: &orca_core::config::ServiceConfig,
168 handles: Vec<orca_core::runtime::WorkloadHandle>,
169) {
170 let initial_health = if config.health.is_some() || config.liveness.is_some() {
173 orca_core::types::HealthState::Healthy
174 } else {
175 orca_core::types::HealthState::NoCheck
176 };
177
178 let runtime = state.container_runtime.as_ref();
180 let mut instances: Vec<InstanceState> = Vec::new();
181 for handle in handles {
182 let mut host_port = if let Some(p) = config.port {
186 runtime.resolve_host_port(&handle, p).await.ok().flatten()
187 } else {
188 None
189 };
190 if host_port.is_none() {
191 host_port = handle
192 .metadata
193 .get("host_port")
194 .and_then(|p| p.parse::<u16>().ok());
195 }
196 info!(
197 service = %config.name,
198 runtime_id = %&handle.runtime_id[..12],
199 ?host_port,
200 "Restored container instance"
201 );
202 instances.push(InstanceState {
203 handle,
204 status: WorkloadStatus::Running,
205 host_port,
206 container_address: None,
207 health: initial_health,
208 is_canary: false,
209 started_at: std::time::Instant::now(),
210 });
211 }
212
213 let desired = match &config.replicas {
214 orca_core::types::Replicas::Fixed(n) => *n,
215 orca_core::types::Replicas::Auto => 1,
216 };
217
218 let mut services = state.services.write().await;
219 let svc_state = services
220 .entry(config.name.clone())
221 .or_insert_with(|| state::ServiceState::from_config(config.clone()));
222 svc_state.instances = instances;
223 svc_state.desired_replicas = desired;
224 drop(services);
225
226 match config.runtime {
228 orca_core::types::RuntimeKind::Container => {
229 routes::update_container_routes(state, config).await;
230 }
231 orca_core::types::RuntimeKind::Wasm => {
232 routes::update_wasm_triggers(state, config).await;
233 }
234 }
235}
236
237fn master_node_id() -> u64 {
239 use std::hash::{Hash, Hasher};
240 let hostname = std::env::var("HOSTNAME")
241 .or_else(|_| std::env::var("COMPUTERNAME"))
242 .unwrap_or_else(|_| "orca-master".to_string());
243 let mut hasher = std::hash::DefaultHasher::new();
244 hostname.hash(&mut hasher);
245 hasher.finish()
246}
247
248async fn register_master_node(state: &state::AppState, api_port: u16) {
250 let node_id = master_node_id();
251 let mut labels = HashMap::new();
252 labels.insert("role".to_string(), "master".to_string());
253 let node = state::RegisteredNode {
254 node_id,
255 address: format!("localhost:{api_port}"),
256 labels,
257 last_heartbeat: chrono::Utc::now(),
258 drain: false,
259 cpu_percent: 0.0,
260 memory_bytes: 0,
261 memory_total: 0,
262 disk_used: 0,
263 disk_total: 0,
264 net_rx: 0,
265 net_tx: 0,
266 };
267 let mut nodes = state.registered_nodes.write().await;
268 nodes.insert(node_id, node);
269 info!(node_id, "Master node self-registered");
270}
271
272fn spawn_master_heartbeat(state: Arc<state::AppState>) {
280 const STALE_AFTER: chrono::Duration = chrono::Duration::seconds(60);
281 let node_id = master_node_id();
282 let collector = Arc::new(orca_agent::host_stats::HostStatsCollector::new());
283 tokio::spawn(async move {
284 loop {
285 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
286 let sample = collector.sample();
287 let now = chrono::Utc::now();
288 let mut nodes = state.registered_nodes.write().await;
289 if let Some(node) = nodes.get_mut(&node_id) {
290 node.last_heartbeat = now;
291 node.cpu_percent = sample.cpu_percent;
292 node.memory_bytes = sample.memory_bytes;
293 node.memory_total = sample.memory_total;
294 node.disk_used = sample.disk_used;
295 node.disk_total = sample.disk_total;
296 node.net_rx = sample.net_rx;
297 node.net_tx = sample.net_tx;
298 }
299 nodes.retain(|id, node| {
300 if *id == node_id {
301 return true;
302 }
303 let age = now - node.last_heartbeat;
304 age < STALE_AFTER
305 });
306 }
307 });
308}
309
310async fn shutdown_signal() {
311 tokio::signal::ctrl_c()
312 .await
313 .expect("failed to install ctrl+c handler");
314 info!("Shutdown signal received");
315}