use std::time::Duration;
use tracing::info;
use orca_core::runtime::Runtime;
use orca_core::types::Replicas;
use crate::reconciler::{get_runtime, reconcile_service};
use crate::state::AppState;
const GRACEFUL_TIMEOUT: Duration = Duration::from_secs(5);
pub async fn stop(state: &AppState, service_name: &str) -> anyhow::Result<()> {
scale(state, service_name, 0).await?;
let mut services = state.services.write().await;
services.remove(service_name);
let mut routes = state.route_table.write().await;
routes.retain(|_, targets| {
targets.retain(|t| t.service_name != service_name);
!targets.is_empty()
});
let mut triggers = state.wasm_triggers.write().await;
triggers.retain(|t| t.service_name != service_name);
info!("Stopped service: {service_name}");
Ok(())
}
pub async fn stop_all(state: &AppState) -> anyhow::Result<()> {
let names: Vec<String> = {
let services = state.services.read().await;
services.keys().cloned().collect()
};
for name in &names {
if let Err(e) = stop(state, name).await {
tracing::error!("Failed to stop {name}: {e}");
}
}
Ok(())
}
pub async fn redeploy(state: &AppState, service_name: &str) -> anyhow::Result<()> {
let config = {
let services = state.services.read().await;
let svc = services
.get(service_name)
.ok_or_else(|| anyhow::anyhow!("service '{}' not found", service_name))?;
svc.config.clone()
};
let runtime = get_runtime(state, config.runtime)?;
let old_handles: Vec<_> = {
let services = state.services.read().await;
services
.get(service_name)
.map(|svc| svc.instances.iter().map(|i| i.handle.clone()).collect())
.unwrap_or_default()
};
{
let mut services = state.services.write().await;
if let Some(svc) = services.get_mut(service_name) {
svc.instances.clear();
}
}
reconcile_service(state, &config).await?;
for handle in &old_handles {
let _ = runtime.stop(handle, GRACEFUL_TIMEOUT).await;
let _ = runtime.remove(handle).await;
}
info!("Redeployed service: {service_name}");
Ok(())
}
pub async fn rollback(state: &AppState, service_name: &str) -> anyhow::Result<()> {
let previous_config = {
let history = state.deploy_history.read().await;
history
.get_previous(service_name)
.map(|r| r.config.clone())
.ok_or_else(|| anyhow::anyhow!("no previous deploy for '{service_name}'"))?
};
info!("Rolling back {service_name} to previous config");
let runtime = get_runtime(state, previous_config.runtime)?;
{
let mut services = state.services.write().await;
if let Some(svc) = services.get_mut(service_name) {
for instance in svc.instances.drain(..) {
let _ = runtime
.stop(&instance.handle, Duration::from_secs(10))
.await;
let _ = runtime.remove(&instance.handle).await;
}
}
}
reconcile_service(state, &previous_config).await?;
let mut history = state.deploy_history.write().await;
history.record(&previous_config);
info!("Rolled back service: {service_name}");
Ok(())
}
pub async fn scale(state: &AppState, service_name: &str, replicas: u32) -> anyhow::Result<()> {
let config = {
let services = state.services.read().await;
let svc = services
.get(service_name)
.ok_or_else(|| anyhow::anyhow!("service '{}' not found", service_name))?;
let mut config = svc.config.clone();
config.replicas = Replicas::Fixed(replicas);
config
};
reconcile_service(state, &config).await
}
const DRAIN_PERIOD: Duration = Duration::from_secs(5);
pub(crate) async fn rolling_update(
state: &AppState,
runtime: &dyn Runtime,
config: &orca_core::config::ServiceConfig,
spec: &orca_core::types::WorkloadSpec,
desired: u32,
) -> anyhow::Result<()> {
let old_handles: Vec<_> = {
let services = state.services.read().await;
services
.get(&config.name)
.map(|svc| svc.instances.iter().map(|i| i.handle.clone()).collect())
.unwrap_or_default()
};
for i in 0..desired {
let mut replica_spec = spec.clone();
if desired > 1 {
replica_spec.name = format!("{}-{i}", spec.name);
}
match crate::instance::create_and_start_instance(runtime, &replica_spec).await {
Ok(new_instance) => {
let mut services = state.services.write().await;
if let Some(svc) = services.get_mut(&config.name) {
if (i as usize) < svc.instances.len() {
svc.instances[i as usize] = new_instance;
} else {
svc.instances.push(new_instance);
}
}
}
Err(e) => {
tracing::error!("Rolling update failed for {}-{i}: {e}", config.name);
}
}
}
update_routes_for_runtime(state, config).await;
tokio::time::sleep(DRAIN_PERIOD).await;
for handle in &old_handles {
let _ = runtime.stop(handle, GRACEFUL_TIMEOUT).await;
let _ = runtime.remove(handle).await;
}
info!("Rolling update complete for {}", config.name);
Ok(())
}
async fn update_routes_for_runtime(state: &AppState, config: &orca_core::config::ServiceConfig) {
match config.runtime {
orca_core::types::RuntimeKind::Container => {
crate::routes::update_container_routes(state, config).await;
}
orca_core::types::RuntimeKind::Wasm => {
crate::routes::update_wasm_triggers(state, config).await;
}
}
}
pub(crate) use crate::canary::canary_deploy;
pub use crate::canary::promote;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn graceful_timeout_is_5_seconds() {
assert_eq!(GRACEFUL_TIMEOUT, Duration::from_secs(5));
}
#[test]
fn drain_period_is_5_seconds() {
assert_eq!(DRAIN_PERIOD, Duration::from_secs(5));
}
#[test]
fn test_rolling_update_drains_before_stop() {
assert!(DRAIN_PERIOD.as_secs() > 0, "drain period must be positive");
assert!(
GRACEFUL_TIMEOUT.as_secs() > 0,
"graceful timeout must be positive"
);
}
}