armdb 0.1.14

sharded bitcask key-value storage optimized for NVMe
Documentation
//! Per-shard diagnostic cursor for the FixedStore replication follower.
//!
//! Unlike Bitcask's cursor (which resumes from a GSN/file offset), the
//! FixedStore cursor does NOT influence correctness — catch-up is always
//! a full slot scan on reconnect. The cursor exists purely for diagnostics:
//! applied event count, max version observed, last-Ack timestamp.

use std::fs;
use std::path::Path;
use std::time::{Duration, SystemTime};

use crate::error::DbResult;

pub const CURSOR_FILE: &str = "fixed_repl.cursor";
pub const CURSOR_SIZE: usize = 40;
// Layout: shard_id(1) + pad(3) + applied_total(8) + max_version(4) +
//         pad(4) + unix_ms(8) + reserved(12).
pub const CURSOR_SAVE_INTERVAL: usize = 1000;

pub struct FixedReplicationCursor {
    pub shard_id: u8,
    pub applied_total: u64,
    pub max_version_seen: u32,
    pub last_ack_at: SystemTime,
}

impl FixedReplicationCursor {
    pub fn new(shard_id: u8) -> Self {
        Self {
            shard_id,
            applied_total: 0,
            max_version_seen: 0,
            last_ack_at: SystemTime::UNIX_EPOCH,
        }
    }

    pub fn load(shard_id: u8, shard_dir: &Path) -> Option<Self> {
        let path = shard_dir.join(CURSOR_FILE);
        let data = fs::read(&path).ok()?;
        if data.len() < CURSOR_SIZE {
            return None;
        }
        let applied_total = u64::from_le_bytes(data[4..12].try_into().ok()?);
        let max_version_seen = u32::from_le_bytes(data[12..16].try_into().ok()?);
        let unix_ms = u64::from_le_bytes(data[20..28].try_into().ok()?);
        let last_ack_at = SystemTime::UNIX_EPOCH + Duration::from_millis(unix_ms);
        Some(Self {
            shard_id,
            applied_total,
            max_version_seen,
            last_ack_at,
        })
    }

    pub fn save(&self, shard_dir: &Path) -> DbResult<()> {
        let path = shard_dir.join(CURSOR_FILE);
        let mut buf = [0u8; CURSOR_SIZE];
        buf[0] = self.shard_id;
        buf[4..12].copy_from_slice(&self.applied_total.to_le_bytes());
        buf[12..16].copy_from_slice(&self.max_version_seen.to_le_bytes());
        let unix_ms = self
            .last_ack_at
            .duration_since(SystemTime::UNIX_EPOCH)
            .map(|d| d.as_millis() as u64)
            .unwrap_or(0);
        buf[20..28].copy_from_slice(&unix_ms.to_le_bytes());
        fs::write(&path, buf)?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[test]
    fn test_save_load_roundtrip() {
        let dir = tempdir().unwrap();
        let mut c = FixedReplicationCursor::new(3);
        c.applied_total = 42;
        c.max_version_seen = 0xABCD;
        c.last_ack_at = SystemTime::UNIX_EPOCH + Duration::from_millis(1_700_000_000_000);
        c.save(dir.path()).unwrap();
        let loaded = FixedReplicationCursor::load(3, dir.path()).unwrap();
        assert_eq!(loaded.shard_id, 3);
        assert_eq!(loaded.applied_total, 42);
        assert_eq!(loaded.max_version_seen, 0xABCD);
        assert_eq!(loaded.last_ack_at, c.last_ack_at);
    }

    #[test]
    fn test_load_absent() {
        let dir = tempdir().unwrap();
        assert!(FixedReplicationCursor::load(0, dir.path()).is_none());
    }

    #[test]
    fn test_load_truncated() {
        let dir = tempdir().unwrap();
        std::fs::write(dir.path().join(CURSOR_FILE), [0u8; 10]).unwrap();
        assert!(FixedReplicationCursor::load(0, dir.path()).is_none());
    }
}