1pub mod api;
2pub mod auth;
3pub mod backup_scheduler;
4pub(crate) mod canary;
5pub mod cleanup_scheduler;
6pub mod cluster_api;
7pub(crate) mod cluster_handlers;
8pub mod cluster_state;
9pub mod deploy_history;
10pub mod health;
11pub(crate) mod instance;
12pub mod metrics;
13pub(crate) mod operations;
14pub mod proto;
15pub mod raft;
16pub mod reconciler;
17pub mod routes;
18pub mod scheduler;
19pub mod state;
20pub mod stats;
21pub mod store;
22pub mod topo_sort;
23pub mod watchdog;
24pub mod webhook;
25pub mod webhook_invocations;
26pub mod ws_handler;
27
28use std::collections::HashMap;
29use std::sync::Arc;
30
31use orca_core::config::ClusterConfig;
32use orca_core::runtime::Runtime;
33use orca_core::types::WorkloadStatus;
34use tracing::info;
35
36use crate::state::{AppState, InstanceState, SharedRouteTable, SharedWasmTriggers};
37
38pub async fn run_server(
44 cluster_config: ClusterConfig,
45 container_runtime: Arc<dyn Runtime>,
46 wasm_runtime: Option<Arc<dyn Runtime>>,
47 route_table: SharedRouteTable,
48 wasm_triggers: SharedWasmTriggers,
49) -> anyhow::Result<()> {
50 run_server_with_acme(
51 cluster_config,
52 container_runtime,
53 wasm_runtime,
54 route_table,
55 wasm_triggers,
56 None,
57 None,
58 )
59 .await
60}
61
62pub async fn run_server_with_acme(
64 cluster_config: ClusterConfig,
65 container_runtime: Arc<dyn Runtime>,
66 wasm_runtime: Option<Arc<dyn Runtime>>,
67 route_table: SharedRouteTable,
68 wasm_triggers: SharedWasmTriggers,
69 acme_manager: Option<orca_proxy::acme::AcmeManager>,
70 cert_resolver: Option<orca_proxy::SharedCertResolver>,
71) -> anyhow::Result<()> {
72 let mut app_state = AppState::new(
73 cluster_config.clone(),
74 container_runtime,
75 wasm_runtime,
76 route_table,
77 wasm_triggers,
78 );
79 if let (Some(acme), Some(resolver)) = (acme_manager, cert_resolver) {
80 app_state = app_state.with_acme(acme, resolver);
81 }
82
83 let store_path = dirs_next::home_dir()
85 .unwrap_or_else(|| ".".into())
86 .join(".orca/cluster.db");
87 match store::ClusterStore::open(&store_path) {
88 Ok(s) => {
89 info!("Persistent store opened at {}", store_path.display());
90 app_state = app_state.with_store(Arc::new(s));
91 }
92 Err(e) => {
93 tracing::warn!("Failed to open store at {}: {e}", store_path.display());
94 }
95 }
96
97 let state = Arc::new(app_state);
98
99 if let Some(store) = &state.store {
101 match store.get_all_services() {
102 Ok(services) if !services.is_empty() => {
103 info!("Restoring {} persisted services", services.len());
104 for config in services.values() {
105 if let Err(e) = restore_or_reconcile(&state, config).await {
106 tracing::warn!(service = %config.name, "Failed to restore: {e}");
107 }
108 }
109 }
110 Ok(_) => {}
111 Err(e) => tracing::warn!("Failed to load persisted services: {e}"),
112 }
113 }
114
115 register_master_node(&state, cluster_config.cluster.api_port).await;
117 spawn_master_heartbeat(state.clone());
118
119 watchdog::spawn_watchdog(state.clone());
121 health::spawn_health_checker(state.clone());
122 stats::spawn_stats_collector(state.clone());
123
124 if let Some(backup_cfg) = cluster_config.backup.clone()
126 && backup_scheduler::spawn_backup_scheduler(backup_cfg, state.clone()).is_some()
127 {
128 info!("Backup scheduler started");
129 }
130
131 if let Some(cleanup_cfg) = cluster_config.cleanup.clone()
132 && cleanup_scheduler::spawn_cleanup_scheduler(cleanup_cfg, state.clone()).is_some()
133 {
134 info!("Cleanup scheduler started");
135 }
136
137 let app = api::router(state.clone());
138
139 let addr = format!("0.0.0.0:{}", cluster_config.cluster.api_port);
140 let listener = tokio::net::TcpListener::bind(&addr).await?;
141 info!("API server listening on {addr}");
142
143 axum::serve(listener, app)
144 .with_graceful_shutdown(shutdown_signal())
145 .await?;
146
147 Ok(())
148}
149
150async fn restore_or_reconcile(
154 state: &AppState,
155 config: &orca_core::config::ServiceConfig,
156) -> anyhow::Result<()> {
157 if config
162 .placement
163 .as_ref()
164 .and_then(|p| p.node.as_ref())
165 .is_some()
166 {
167 let desired = match &config.replicas {
168 orca_core::types::Replicas::Fixed(n) => *n,
169 orca_core::types::Replicas::Auto => 1,
170 };
171 let mut services = state.services.write().await;
172 let svc_state = services
173 .entry(config.name.clone())
174 .or_insert_with(|| state::ServiceState::from_config(config.clone()));
175 svc_state.config = config.clone();
176 svc_state.desired_replicas = desired;
177 info!(service = %config.name, "Registered remote service placeholder");
178 return Ok(());
179 }
180
181 let cr = state
183 .container_runtime
184 .as_any()
185 .downcast_ref::<orca_agent::docker::ContainerRuntime>();
186
187 if let Some(container_rt) = cr {
188 let existing = container_rt.find_existing(&config.name).await?;
189 if !existing.is_empty() {
190 info!(
191 service = %config.name,
192 count = existing.len(),
193 "Re-attached to existing containers, skipping reconciliation"
194 );
195 populate_state_from_existing(state, config, existing).await;
196 return Ok(());
197 }
198 }
199
200 reconciler::reconcile_service(state, config).await
201}
202
203async fn populate_state_from_existing(
205 state: &AppState,
206 config: &orca_core::config::ServiceConfig,
207 handles: Vec<orca_core::runtime::WorkloadHandle>,
208) {
209 let initial_health = if config.health.is_some() || config.liveness.is_some() {
212 orca_core::types::HealthState::Healthy
213 } else {
214 orca_core::types::HealthState::NoCheck
215 };
216
217 let runtime = state.container_runtime.as_ref();
219 let mut instances: Vec<InstanceState> = Vec::new();
220 for handle in handles {
221 let mut host_port = if let Some(p) = config.port {
225 runtime.resolve_host_port(&handle, p).await.ok().flatten()
226 } else {
227 None
228 };
229 if host_port.is_none() {
230 host_port = handle
231 .metadata
232 .get("host_port")
233 .and_then(|p| p.parse::<u16>().ok());
234 }
235 info!(
236 service = %config.name,
237 runtime_id = %&handle.runtime_id[..12],
238 ?host_port,
239 "Restored container instance"
240 );
241 instances.push(InstanceState {
242 handle,
243 status: WorkloadStatus::Running,
244 host_port,
245 container_address: None,
246 health: initial_health,
247 is_canary: false,
248 started_at: std::time::Instant::now(),
249 });
250 }
251
252 let desired = match &config.replicas {
253 orca_core::types::Replicas::Fixed(n) => *n,
254 orca_core::types::Replicas::Auto => 1,
255 };
256
257 let mut services = state.services.write().await;
258 let svc_state = services
259 .entry(config.name.clone())
260 .or_insert_with(|| state::ServiceState::from_config(config.clone()));
261 svc_state.instances = instances;
262 svc_state.desired_replicas = desired;
263 drop(services);
264
265 match config.runtime {
267 orca_core::types::RuntimeKind::Container => {
268 routes::update_container_routes(state, config).await;
269 }
270 orca_core::types::RuntimeKind::Wasm => {
271 routes::update_wasm_triggers(state, config).await;
272 }
273 }
274}
275
276fn master_node_id() -> u64 {
278 use std::hash::{Hash, Hasher};
279 let hostname = std::env::var("HOSTNAME")
280 .or_else(|_| std::env::var("COMPUTERNAME"))
281 .unwrap_or_else(|_| "orca-master".to_string());
282 let mut hasher = std::hash::DefaultHasher::new();
283 hostname.hash(&mut hasher);
284 hasher.finish()
285}
286
287async fn register_master_node(state: &state::AppState, api_port: u16) {
289 let node_id = master_node_id();
290 let mut labels = HashMap::new();
291 labels.insert("role".to_string(), "master".to_string());
292 let node = state::RegisteredNode {
293 node_id,
294 address: format!("localhost:{api_port}"),
295 labels,
296 last_heartbeat: chrono::Utc::now(),
297 drain: false,
298 cpu_percent: 0.0,
299 memory_bytes: 0,
300 memory_total: 0,
301 disk_used: 0,
302 disk_total: 0,
303 net_rx: 0,
304 net_tx: 0,
305 };
306 let mut nodes = state.registered_nodes.write().await;
307 nodes.insert(node_id, node);
308 info!(node_id, "Master node self-registered");
309}
310
311fn spawn_master_heartbeat(state: Arc<state::AppState>) {
319 const STALE_AFTER: chrono::Duration = chrono::Duration::seconds(60);
320 let node_id = master_node_id();
321 let collector = Arc::new(orca_agent::host_stats::HostStatsCollector::new());
322 tokio::spawn(async move {
323 loop {
324 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
325 let sample = collector.sample();
326 let now = chrono::Utc::now();
327 let mut nodes = state.registered_nodes.write().await;
328 if let Some(node) = nodes.get_mut(&node_id) {
329 node.last_heartbeat = now;
330 node.cpu_percent = sample.cpu_percent;
331 node.memory_bytes = sample.memory_bytes;
332 node.memory_total = sample.memory_total;
333 node.disk_used = sample.disk_used;
334 node.disk_total = sample.disk_total;
335 node.net_rx = sample.net_rx;
336 node.net_tx = sample.net_tx;
337 }
338 nodes.retain(|id, node| {
339 if *id == node_id {
340 return true;
341 }
342 let age = now - node.last_heartbeat;
343 age < STALE_AFTER
344 });
345 }
346 });
347}
348
349async fn shutdown_signal() {
350 tokio::signal::ctrl_c()
351 .await
352 .expect("failed to install ctrl+c handler");
353 info!("Shutdown signal received");
354}