1pub mod api;
2pub mod auth;
3pub mod backup_scheduler;
4pub(crate) mod canary;
5pub mod cleanup_scheduler;
6pub mod cluster_api;
7pub(crate) mod cluster_handlers;
8pub mod cluster_state;
9pub mod deploy_history;
10pub mod health;
11pub(crate) mod instance;
12pub mod metrics;
13pub(crate) mod operations;
14pub mod proto;
15pub mod raft;
16pub mod reconciler;
17pub mod routes;
18pub mod scheduler;
19pub mod state;
20pub mod stats;
21pub mod store;
22pub mod topo_sort;
23pub mod watchdog;
24pub mod webhook;
25pub mod ws_handler;
26
27use std::collections::HashMap;
28use std::sync::Arc;
29
30use orca_core::config::ClusterConfig;
31use orca_core::runtime::Runtime;
32use orca_core::types::WorkloadStatus;
33use tracing::info;
34
35use crate::state::{AppState, InstanceState, SharedRouteTable, SharedWasmTriggers};
36
37pub async fn run_server(
43 cluster_config: ClusterConfig,
44 container_runtime: Arc<dyn Runtime>,
45 wasm_runtime: Option<Arc<dyn Runtime>>,
46 route_table: SharedRouteTable,
47 wasm_triggers: SharedWasmTriggers,
48) -> anyhow::Result<()> {
49 run_server_with_acme(
50 cluster_config,
51 container_runtime,
52 wasm_runtime,
53 route_table,
54 wasm_triggers,
55 None,
56 None,
57 )
58 .await
59}
60
61pub async fn run_server_with_acme(
63 cluster_config: ClusterConfig,
64 container_runtime: Arc<dyn Runtime>,
65 wasm_runtime: Option<Arc<dyn Runtime>>,
66 route_table: SharedRouteTable,
67 wasm_triggers: SharedWasmTriggers,
68 acme_manager: Option<orca_proxy::acme::AcmeManager>,
69 cert_resolver: Option<orca_proxy::SharedCertResolver>,
70) -> anyhow::Result<()> {
71 let mut app_state = AppState::new(
72 cluster_config.clone(),
73 container_runtime,
74 wasm_runtime,
75 route_table,
76 wasm_triggers,
77 );
78 if let (Some(acme), Some(resolver)) = (acme_manager, cert_resolver) {
79 app_state = app_state.with_acme(acme, resolver);
80 }
81
82 let store_path = dirs_next::home_dir()
84 .unwrap_or_else(|| ".".into())
85 .join(".orca/cluster.db");
86 match store::ClusterStore::open(&store_path) {
87 Ok(s) => {
88 info!("Persistent store opened at {}", store_path.display());
89 app_state = app_state.with_store(Arc::new(s));
90 }
91 Err(e) => {
92 tracing::warn!("Failed to open store at {}: {e}", store_path.display());
93 }
94 }
95
96 let state = Arc::new(app_state);
97
98 if let Some(store) = &state.store {
100 match store.get_all_services() {
101 Ok(services) if !services.is_empty() => {
102 info!("Restoring {} persisted services", services.len());
103 for config in services.values() {
104 if let Err(e) = restore_or_reconcile(&state, config).await {
105 tracing::warn!(service = %config.name, "Failed to restore: {e}");
106 }
107 }
108 }
109 Ok(_) => {}
110 Err(e) => tracing::warn!("Failed to load persisted services: {e}"),
111 }
112 }
113
114 register_master_node(&state, cluster_config.cluster.api_port).await;
116 spawn_master_heartbeat(state.clone());
117
118 watchdog::spawn_watchdog(state.clone());
120 health::spawn_health_checker(state.clone());
121 stats::spawn_stats_collector(state.clone());
122
123 if let Some(backup_cfg) = cluster_config.backup.clone()
125 && backup_scheduler::spawn_backup_scheduler(backup_cfg, state.clone()).is_some()
126 {
127 info!("Backup scheduler started");
128 }
129
130 if let Some(cleanup_cfg) = cluster_config.cleanup.clone()
131 && cleanup_scheduler::spawn_cleanup_scheduler(cleanup_cfg, state.clone()).is_some()
132 {
133 info!("Cleanup scheduler started");
134 }
135
136 let app = api::router(state.clone());
137
138 let addr = format!("0.0.0.0:{}", cluster_config.cluster.api_port);
139 let listener = tokio::net::TcpListener::bind(&addr).await?;
140 info!("API server listening on {addr}");
141
142 axum::serve(listener, app)
143 .with_graceful_shutdown(shutdown_signal())
144 .await?;
145
146 Ok(())
147}
148
149async fn restore_or_reconcile(
153 state: &AppState,
154 config: &orca_core::config::ServiceConfig,
155) -> anyhow::Result<()> {
156 if config
161 .placement
162 .as_ref()
163 .and_then(|p| p.node.as_ref())
164 .is_some()
165 {
166 let desired = match &config.replicas {
167 orca_core::types::Replicas::Fixed(n) => *n,
168 orca_core::types::Replicas::Auto => 1,
169 };
170 let mut services = state.services.write().await;
171 let svc_state = services
172 .entry(config.name.clone())
173 .or_insert_with(|| state::ServiceState::from_config(config.clone()));
174 svc_state.config = config.clone();
175 svc_state.desired_replicas = desired;
176 info!(service = %config.name, "Registered remote service placeholder");
177 return Ok(());
178 }
179
180 let cr = state
182 .container_runtime
183 .as_any()
184 .downcast_ref::<orca_agent::docker::ContainerRuntime>();
185
186 if let Some(container_rt) = cr {
187 let existing = container_rt.find_existing(&config.name).await?;
188 if !existing.is_empty() {
189 info!(
190 service = %config.name,
191 count = existing.len(),
192 "Re-attached to existing containers, skipping reconciliation"
193 );
194 populate_state_from_existing(state, config, existing).await;
195 return Ok(());
196 }
197 }
198
199 reconciler::reconcile_service(state, config).await
200}
201
202async fn populate_state_from_existing(
204 state: &AppState,
205 config: &orca_core::config::ServiceConfig,
206 handles: Vec<orca_core::runtime::WorkloadHandle>,
207) {
208 let initial_health = if config.health.is_some() || config.liveness.is_some() {
211 orca_core::types::HealthState::Healthy
212 } else {
213 orca_core::types::HealthState::NoCheck
214 };
215
216 let runtime = state.container_runtime.as_ref();
218 let mut instances: Vec<InstanceState> = Vec::new();
219 for handle in handles {
220 let mut host_port = if let Some(p) = config.port {
224 runtime.resolve_host_port(&handle, p).await.ok().flatten()
225 } else {
226 None
227 };
228 if host_port.is_none() {
229 host_port = handle
230 .metadata
231 .get("host_port")
232 .and_then(|p| p.parse::<u16>().ok());
233 }
234 info!(
235 service = %config.name,
236 runtime_id = %&handle.runtime_id[..12],
237 ?host_port,
238 "Restored container instance"
239 );
240 instances.push(InstanceState {
241 handle,
242 status: WorkloadStatus::Running,
243 host_port,
244 container_address: None,
245 health: initial_health,
246 is_canary: false,
247 started_at: std::time::Instant::now(),
248 });
249 }
250
251 let desired = match &config.replicas {
252 orca_core::types::Replicas::Fixed(n) => *n,
253 orca_core::types::Replicas::Auto => 1,
254 };
255
256 let mut services = state.services.write().await;
257 let svc_state = services
258 .entry(config.name.clone())
259 .or_insert_with(|| state::ServiceState::from_config(config.clone()));
260 svc_state.instances = instances;
261 svc_state.desired_replicas = desired;
262 drop(services);
263
264 match config.runtime {
266 orca_core::types::RuntimeKind::Container => {
267 routes::update_container_routes(state, config).await;
268 }
269 orca_core::types::RuntimeKind::Wasm => {
270 routes::update_wasm_triggers(state, config).await;
271 }
272 }
273}
274
275fn master_node_id() -> u64 {
277 use std::hash::{Hash, Hasher};
278 let hostname = std::env::var("HOSTNAME")
279 .or_else(|_| std::env::var("COMPUTERNAME"))
280 .unwrap_or_else(|_| "orca-master".to_string());
281 let mut hasher = std::hash::DefaultHasher::new();
282 hostname.hash(&mut hasher);
283 hasher.finish()
284}
285
286async fn register_master_node(state: &state::AppState, api_port: u16) {
288 let node_id = master_node_id();
289 let mut labels = HashMap::new();
290 labels.insert("role".to_string(), "master".to_string());
291 let node = state::RegisteredNode {
292 node_id,
293 address: format!("localhost:{api_port}"),
294 labels,
295 last_heartbeat: chrono::Utc::now(),
296 drain: false,
297 cpu_percent: 0.0,
298 memory_bytes: 0,
299 memory_total: 0,
300 disk_used: 0,
301 disk_total: 0,
302 net_rx: 0,
303 net_tx: 0,
304 };
305 let mut nodes = state.registered_nodes.write().await;
306 nodes.insert(node_id, node);
307 info!(node_id, "Master node self-registered");
308}
309
310fn spawn_master_heartbeat(state: Arc<state::AppState>) {
318 const STALE_AFTER: chrono::Duration = chrono::Duration::seconds(60);
319 let node_id = master_node_id();
320 let collector = Arc::new(orca_agent::host_stats::HostStatsCollector::new());
321 tokio::spawn(async move {
322 loop {
323 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
324 let sample = collector.sample();
325 let now = chrono::Utc::now();
326 let mut nodes = state.registered_nodes.write().await;
327 if let Some(node) = nodes.get_mut(&node_id) {
328 node.last_heartbeat = now;
329 node.cpu_percent = sample.cpu_percent;
330 node.memory_bytes = sample.memory_bytes;
331 node.memory_total = sample.memory_total;
332 node.disk_used = sample.disk_used;
333 node.disk_total = sample.disk_total;
334 node.net_rx = sample.net_rx;
335 node.net_tx = sample.net_tx;
336 }
337 nodes.retain(|id, node| {
338 if *id == node_id {
339 return true;
340 }
341 let age = now - node.last_heartbeat;
342 age < STALE_AFTER
343 });
344 }
345 });
346}
347
348async fn shutdown_signal() {
349 tokio::signal::ctrl_c()
350 .await
351 .expect("failed to install ctrl+c handler");
352 info!("Shutdown signal received");
353}