use std::time::Duration;
use tracing::info;
use orca_core::types::Replicas;
use crate::reconciler::{get_runtime, reconcile_service};
use crate::state::AppState;
const GRACEFUL_TIMEOUT: Duration = Duration::from_secs(30);
pub async fn stop(state: &AppState, service_name: &str) -> anyhow::Result<()> {
scale(state, service_name, 0).await?;
let mut services = state.services.write().await;
services.remove(service_name);
let mut routes = state.route_table.write().await;
routes.retain(|_, targets| {
targets.retain(|t| t.service_name != service_name);
!targets.is_empty()
});
let mut triggers = state.wasm_triggers.write().await;
triggers.retain(|t| t.service_name != service_name);
info!("Stopped and removed service: {service_name}");
Ok(())
}
pub async fn stop_all(state: &AppState) -> anyhow::Result<()> {
let names: Vec<String> = {
let services = state.services.read().await;
services.keys().cloned().collect()
};
for name in &names {
if let Err(e) = stop(state, name).await {
tracing::error!("Failed to stop {name}: {e}");
}
}
Ok(())
}
pub async fn redeploy(state: &AppState, service_name: &str) -> anyhow::Result<()> {
let config = {
let services = state.services.read().await;
let svc = services
.get(service_name)
.ok_or_else(|| anyhow::anyhow!("service '{}' not found", service_name))?;
svc.config.clone()
};
let runtime = get_runtime(state, config.runtime)?;
let old_handles: Vec<_> = {
let services = state.services.read().await;
services
.get(service_name)
.map(|svc| svc.instances.iter().map(|i| i.handle.clone()).collect())
.unwrap_or_default()
};
{
let mut services = state.services.write().await;
if let Some(svc) = services.get_mut(service_name) {
svc.instances.clear();
}
}
reconcile_service(state, &config).await?;
for handle in &old_handles {
let _ = runtime.stop(handle, GRACEFUL_TIMEOUT).await;
let _ = runtime.remove(handle).await;
}
info!("Redeployed service: {service_name}");
Ok(())
}
pub async fn rollback(state: &AppState, service_name: &str) -> anyhow::Result<()> {
let previous_config = {
let history = state.deploy_history.read().await;
history
.get_previous(service_name)
.map(|r| r.config.clone())
.ok_or_else(|| anyhow::anyhow!("no previous deploy for '{service_name}'"))?
};
info!("Rolling back {service_name} to previous config");
let runtime = get_runtime(state, previous_config.runtime)?;
{
let mut services = state.services.write().await;
if let Some(svc) = services.get_mut(service_name) {
for instance in svc.instances.drain(..) {
let _ = runtime
.stop(&instance.handle, Duration::from_secs(10))
.await;
let _ = runtime.remove(&instance.handle).await;
}
}
}
reconcile_service(state, &previous_config).await?;
let mut history = state.deploy_history.write().await;
history.record(&previous_config);
info!("Rolled back service: {service_name}");
Ok(())
}
pub async fn scale(state: &AppState, service_name: &str, replicas: u32) -> anyhow::Result<()> {
let config = {
let services = state.services.read().await;
let svc = services
.get(service_name)
.ok_or_else(|| anyhow::anyhow!("service '{}' not found", service_name))?;
let mut config = svc.config.clone();
config.replicas = Replicas::Fixed(replicas);
config
};
reconcile_service(state, &config).await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn graceful_timeout_is_30_seconds() {
assert_eq!(GRACEFUL_TIMEOUT, Duration::from_secs(30));
}
}