kevy-persist 1.2.0

kevy persistence — RDB-style snapshots + AOF, pure Rust, zero deps.
Documentation
//! AOF replay path — turns a byte stream back into the command series
//! that wrote it. Carved out of lib.rs to keep the production cap honest;
//! the public re-export in lib.rs keeps the API surface unchanged.

use std::fs::File;
use std::io::{self, Read};
use std::path::Path;

use kevy_resp::Argv;

/// Replay the command log at `path`, calling `apply` for each complete command.
///
/// Always emits a one-line summary to stderr when the file has any bytes,
/// so operators can immediately see how many commands were replayed and
/// how many bytes were dropped (truncated tail or parse error). This
/// caught the mailrs incident only *after* a 70-day silent failure window
/// — making the summary always-on is cheap (one line per restart) and
/// turns silent-empty-store from a multi-hour outage into a one-line log
/// hit.
///
/// Three outcomes:
///
/// * **Clean** — every byte consumed by valid RESP frames. Logs
///   `replayed N commands from M bytes`.
/// * **Truncated tail** — a crash mid-append left a partial frame. The
///   prefix is intact and replays normally; the trailing partial bytes
///   are silently OK. Logs `replayed N commands; trailing K bytes were
///   a partial frame (crash mid-append, recoverable)`.
/// * **Corrupt frame** — parser hit invalid bytes mid-file. The prefix
///   replayed; the tail (including the bad frame) is dropped. Logs a
///   loud WARN with the byte offset, parser error, and a hex+ascii
///   preview of the bad region. Common cause: deploy pipeline wrote
///   non-kevy bytes (e.g. SSH stderr) into the AOF path.
///
/// A missing file is treated as an empty log (returns Ok(()) silently,
/// no log line).
///
/// Note: RESP has an *inline* form (space-separated tokens) for backward
/// compatibility, so a stderr line like `Warning: Permanently added ...`
/// will parse as a valid (if nonsense) command. The summary line is the
/// signal — an unexpected count of replayed commands at boot is the
/// operator's cue to inspect the AOF byte-by-byte.
pub fn replay_aof<F: FnMut(Argv)>(path: &Path, mut apply: F) -> io::Result<()> {
    let mut data = Vec::new();
    match File::open(path) {
        Ok(mut f) => {
            f.read_to_end(&mut data)?;
        }
        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
        Err(e) => return Err(e),
    }
    let total = data.len();
    if total == 0 {
        return Ok(());
    }
    // Skip the 9-byte AOF_MAGIC header if present. Legacy bare-RESP
    // AOFs (pre-1.2.0) parse identically from position 0. Future
    // format bumps should add a version check here.
    let mut pos = if data.len() >= crate::aof::AOF_MAGIC.len()
        && &data[..crate::aof::AOF_MAGIC.len()] == crate::aof::AOF_MAGIC
    {
        crate::aof::AOF_MAGIC.len()
    } else {
        0
    };
    let mut replayed: u64 = 0;
    let stop = loop {
        if pos >= total {
            break ReplayStop::Clean;
        }
        match kevy_resp::parse_command(&data[pos..]) {
            Ok(Some((args, consumed))) => {
                apply(args);
                pos += consumed;
                replayed += 1;
            }
            Ok(None) => break ReplayStop::TruncatedTail,
            Err(e) => break ReplayStop::CorruptFrame(format!("{e:?}")),
        }
    };
    log_replay_summary(path, total, pos, replayed, &data[pos.min(total)..], stop);
    Ok(())
}

/// Outcome of an AOF replay run — drives the summary log shape.
enum ReplayStop {
    Clean,
    TruncatedTail,
    CorruptFrame(String),
}

/// Emit the one-line replay summary. Goes to stderr because kevy-persist
/// has no log-crate dependency (pure-Rust + 0 deps charter); production
/// deployments route stderr to their existing log sink.
fn log_replay_summary(
    path: &Path,
    total: usize,
    pos: usize,
    replayed: u64,
    remainder: &[u8],
    stop: ReplayStop,
) {
    let display = path.display();
    let dropped = total - pos;
    match stop {
        ReplayStop::Clean => {
            eprintln!(
                "kevy: AOF {display} replayed {replayed} commands from {total} bytes (clean)"
            );
        }
        ReplayStop::TruncatedTail => {
            eprintln!(
                "kevy: AOF {display} replayed {replayed} commands; trailing {dropped} bytes \
                 were a partial frame (crash mid-append, recoverable)"
            );
        }
        ReplayStop::CorruptFrame(err) => {
            let preview = preview_bytes(remainder);
            eprintln!(
                "kevy WARN: AOF {display} replayed {replayed} commands then hit a corrupt \
                 frame at byte {pos}; dropping the trailing {dropped} bytes. \
                 Preview: {preview}. Parser error: {err}. \
                 Common cause: non-kevy bytes got written into this file path \
                 (e.g. deploy pipeline redirecting stderr to the AOF)."
            );
        }
    }
}

/// Hex + ASCII preview of up to 16 bytes, for diagnostic eprintlns.
fn preview_bytes(b: &[u8]) -> String {
    let n = b.len().min(16);
    let mut hex = String::with_capacity(n * 3);
    let mut ascii = String::with_capacity(n);
    for &x in &b[..n] {
        if !hex.is_empty() {
            hex.push(' ');
        }
        use std::fmt::Write;
        let _ = write!(hex, "{:02x}", x);
        ascii.push(if (0x20..0x7f).contains(&x) { x as char } else { '.' });
    }
    format!("hex=[{hex}] ascii=[{ascii}]")
}