use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use axum::Json;
use axum::extract::{Query, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use serde::Deserialize;
use tracing::warn;
use orca_core::api_types::{
ClusterBackupsResponse, LastBackupResult, NodeBackupStatus, NodeRole, TriggerBackupResponse,
};
use orca_core::backup::enumerate_local_backups;
use orca_core::ws_types::{BackupStatusReportData, MasterMessage};
use crate::state::AppState;
const BACKUP_REPORT_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) async fn cluster_backups(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let master = collect_master_status(&state).await;
let agents = collect_agent_statuses(&state).await;
let mut nodes = vec![master];
nodes.extend(agents);
Json(ClusterBackupsResponse { nodes })
}
async fn collect_master_status(state: &AppState) -> NodeBackupStatus {
let snapshots = match dirs_next::home_dir() {
Some(home) => enumerate_local_backups(&home),
None => Vec::new(),
};
let last_result = state.master_last_backup_result.read().await.clone();
NodeBackupStatus {
node_id: None,
hostname: master_hostname(),
role: NodeRole::Master,
snapshots,
last_result,
reachable: true,
}
}
async fn collect_agent_statuses(state: &AppState) -> Vec<NodeBackupStatus> {
let agent_ids: Vec<u64> = state.ws_agents.read().await.keys().copied().collect();
if agent_ids.is_empty() {
return Vec::new();
}
let (tx, mut rx) = tokio::sync::mpsc::channel::<BackupStatusReportData>(agent_ids.len() + 1);
let mut request_ids: Vec<String> = Vec::with_capacity(agent_ids.len());
{
let mut listeners = state.backup_listeners.write().await;
for _ in 0..agent_ids.len() {
let req = uuid::Uuid::new_v4().to_string();
listeners.insert(req.clone(), tx.clone());
request_ids.push(req);
}
}
let mut dispatched = HashMap::<u64, String>::new();
{
let agents = state.ws_agents.read().await;
for (node_id, req_id) in agent_ids.iter().zip(request_ids.iter()) {
if let Some(agent_tx) = agents.get(node_id) {
let sent = agent_tx
.send(MasterMessage::BackupStatusRequest {
request_id: req_id.clone(),
})
.await
.is_ok();
if sent {
dispatched.insert(*node_id, req_id.clone());
}
}
}
}
let mut reports = HashMap::<u64, BackupStatusReportData>::new();
let deadline = tokio::time::sleep(BACKUP_REPORT_TIMEOUT);
tokio::pin!(deadline);
while reports.len() < dispatched.len() {
tokio::select! {
biased;
msg = rx.recv() => {
match msg {
Some(data) => {
reports.insert(data.node_id, data);
}
None => break,
}
}
_ = &mut deadline => {
warn!(
"cluster_backups: timed out after {}s ({} of {} agents responded)",
BACKUP_REPORT_TIMEOUT.as_secs(),
reports.len(),
dispatched.len(),
);
break;
}
}
}
{
let mut listeners = state.backup_listeners.write().await;
for req in &request_ids {
listeners.remove(req);
}
}
let last_results = state.last_backup_results.read().await;
agent_ids
.into_iter()
.map(|node_id| {
let last = last_results.get(&node_id).map(|r| LastBackupResult {
success: r.success,
message: r.message.clone(),
recorded_at: r.recorded_at,
});
match reports.remove(&node_id) {
Some(data) => NodeBackupStatus {
node_id: Some(node_id),
hostname: data.hostname,
role: NodeRole::Agent,
snapshots: data.snapshots,
last_result: last,
reachable: true,
},
None => NodeBackupStatus {
node_id: Some(node_id),
hostname: format!("node-{node_id}"),
role: NodeRole::Agent,
snapshots: Vec::new(),
last_result: last,
reachable: false,
},
}
})
.collect()
}
fn master_hostname() -> String {
std::fs::read_to_string("/etc/hostname")
.map(|s| s.trim().to_string())
.or_else(|_| std::env::var("HOSTNAME"))
.unwrap_or_else(|_| "master".to_string())
}
#[derive(Debug, Deserialize)]
pub struct TriggerQuery {
pub node_id: Option<u64>,
#[serde(default)]
pub master: bool,
}
pub(crate) async fn trigger_cluster_backup(
State(state): State<Arc<AppState>>,
Query(q): Query<TriggerQuery>,
) -> impl IntoResponse {
let Some(config) = state.cluster_config.backup.clone() else {
return (
StatusCode::BAD_REQUEST,
"no [backup] configuration in cluster.toml — cannot trigger".to_string(),
)
.into_response();
};
let trigger_master = q.master || q.node_id.is_none();
let agent_targets: Vec<u64> = match q.node_id {
Some(id) => {
let agents = state.ws_agents.read().await;
if !agents.contains_key(&id) {
return (StatusCode::NOT_FOUND, format!("node {id} is not connected"))
.into_response();
}
vec![id]
}
None if q.master => Vec::new(),
None => state.ws_agents.read().await.keys().copied().collect(),
};
let mut master_dispatched = false;
if trigger_master {
let state = state.clone();
let config = config.clone();
tokio::spawn(async move {
crate::backup_scheduler::run_master_backup(&state, &config).await;
});
master_dispatched = true;
}
let dispatched = dispatch_to_agents(&state, &config, &agent_targets).await;
Json(TriggerBackupResponse {
dispatched_to: dispatched,
master_dispatched,
})
.into_response()
}
async fn dispatch_to_agents(
state: &AppState,
config: &orca_core::backup::BackupConfig,
targets: &[u64],
) -> Vec<u64> {
if targets.is_empty() {
return Vec::new();
}
let service_hooks = collect_service_hooks(state).await;
let agents = state.ws_agents.read().await;
let mut sent_to = Vec::with_capacity(targets.len());
for id in targets {
if let Some(tx) = agents.get(id) {
let ok = tx
.send(MasterMessage::BackupRequest {
config: config.clone(),
service_hooks: service_hooks.clone(),
})
.await
.is_ok();
if ok {
sent_to.push(*id);
}
}
}
sent_to
}
async fn collect_service_hooks(state: &AppState) -> HashMap<String, String> {
let services = state.services.read().await;
services
.values()
.filter_map(|svc| {
let hook = svc.config.backup.as_ref()?.pre_hook.as_ref()?;
Some((svc.config.name.clone(), hook.clone()))
})
.collect()
}