alopex-server 0.5.0

Server component for Alopex DB
Documentation
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use alopex_core::kv::any::AnyKV;
use alopex_core::lsm::{LsmKV, LsmKVConfig, RecoveryResult};
use serde::{Deserialize, Serialize};

use crate::error::Result;
use crate::ops::state::{LifecycleStateManager, Mode};

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum RecoveryOutcome {
    Success,
    ReadOnly,
    Failed,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RecoveryInfo {
    pub outcome: RecoveryOutcome,
    pub duration_ms: u64,
    pub entries_replayed: u64,
    pub warnings: u64,
    pub finished_at_ms: u64,
    pub reason: Option<String>,
}

impl RecoveryInfo {
    fn from_result(result: &RecoveryResult, elapsed: std::time::Duration) -> Self {
        let outcome = if result.stop_reason.is_some() {
            RecoveryOutcome::ReadOnly
        } else {
            RecoveryOutcome::Success
        };
        Self {
            outcome,
            duration_ms: elapsed.as_millis() as u64,
            entries_replayed: result.entries_recovered as u64,
            warnings: result.warnings.len() as u64,
            finished_at_ms: now_ms(),
            reason: result.stop_reason.clone(),
        }
    }
}

pub struct RecoveryCoordinator;

impl RecoveryCoordinator {
    pub fn open_store(data_dir: &Path) -> Result<(AnyKV, RecoveryInfo)> {
        let start = Instant::now();
        match LsmKV::open_with_config(data_dir, LsmKVConfig::default()) {
            Ok((store, recovery)) => {
                let info = RecoveryInfo::from_result(&recovery, start.elapsed());
                Ok((AnyKV::Lsm(Box::new(store)), info))
            }
            Err(err) => {
                let mut reason = format!("initial recovery failed: {err}");
                match quarantine_wal(data_dir) {
                    Ok(Some(path)) => {
                        reason = format!("{reason}; quarantined WAL to {}", path.display());
                    }
                    Ok(None) => {
                        reason = format!("{reason}; WAL not found for quarantine");
                    }
                    Err(rename_err) => {
                        reason = format!("{reason}; failed to quarantine WAL: {rename_err}");
                    }
                }

                let (store, recovery) = LsmKV::open_with_config(data_dir, LsmKVConfig::default())?;
                let mut info = RecoveryInfo::from_result(&recovery, start.elapsed());
                info.outcome = RecoveryOutcome::ReadOnly;
                info.reason = Some(reason);
                Ok((AnyKV::Lsm(Box::new(store)), info))
            }
        }
    }

    pub fn apply_initial_mode(state: &LifecycleStateManager, info: &RecoveryInfo) {
        if matches!(
            info.outcome,
            RecoveryOutcome::ReadOnly | RecoveryOutcome::Failed
        ) {
            state.set_mode(Mode::ReadOnly);
        }
    }
}

fn now_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis() as u64
}

fn quarantine_wal(data_dir: &Path) -> std::io::Result<Option<PathBuf>> {
    let wal_path = data_dir.join("lsm.wal");
    if !wal_path.exists() {
        return Ok(None);
    }
    let bad_path = wal_path.with_extension(format!("wal.bad.{}", now_ms()));
    fs::rename(&wal_path, &bad_path)?;
    Ok(Some(bad_path))
}