use std::fs;
use std::path::Path;
use std::time::{Duration, SystemTime};
use crate::error::DbResult;
pub const CURSOR_FILE: &str = "fixed_repl.cursor";
pub const CURSOR_SIZE: usize = 40;
pub const CURSOR_SAVE_INTERVAL: usize = 1000;
pub struct FixedReplicationCursor {
pub shard_id: u8,
pub applied_total: u64,
pub max_version_seen: u32,
pub last_ack_at: SystemTime,
}
impl FixedReplicationCursor {
pub fn new(shard_id: u8) -> Self {
Self {
shard_id,
applied_total: 0,
max_version_seen: 0,
last_ack_at: SystemTime::UNIX_EPOCH,
}
}
pub fn load(shard_id: u8, shard_dir: &Path) -> Option<Self> {
let path = shard_dir.join(CURSOR_FILE);
let data = fs::read(&path).ok()?;
if data.len() < CURSOR_SIZE {
return None;
}
let applied_total = u64::from_le_bytes(data[4..12].try_into().ok()?);
let max_version_seen = u32::from_le_bytes(data[12..16].try_into().ok()?);
let unix_ms = u64::from_le_bytes(data[20..28].try_into().ok()?);
let last_ack_at = SystemTime::UNIX_EPOCH + Duration::from_millis(unix_ms);
Some(Self {
shard_id,
applied_total,
max_version_seen,
last_ack_at,
})
}
pub fn save(&self, shard_dir: &Path) -> DbResult<()> {
let path = shard_dir.join(CURSOR_FILE);
let mut buf = [0u8; CURSOR_SIZE];
buf[0] = self.shard_id;
buf[4..12].copy_from_slice(&self.applied_total.to_le_bytes());
buf[12..16].copy_from_slice(&self.max_version_seen.to_le_bytes());
let unix_ms = self
.last_ack_at
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
buf[20..28].copy_from_slice(&unix_ms.to_le_bytes());
fs::write(&path, buf)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_save_load_roundtrip() {
let dir = tempdir().unwrap();
let mut c = FixedReplicationCursor::new(3);
c.applied_total = 42;
c.max_version_seen = 0xABCD;
c.last_ack_at = SystemTime::UNIX_EPOCH + Duration::from_millis(1_700_000_000_000);
c.save(dir.path()).unwrap();
let loaded = FixedReplicationCursor::load(3, dir.path()).unwrap();
assert_eq!(loaded.shard_id, 3);
assert_eq!(loaded.applied_total, 42);
assert_eq!(loaded.max_version_seen, 0xABCD);
assert_eq!(loaded.last_ack_at, c.last_ack_at);
}
#[test]
fn test_load_absent() {
let dir = tempdir().unwrap();
assert!(FixedReplicationCursor::load(0, dir.path()).is_none());
}
#[test]
fn test_load_truncated() {
let dir = tempdir().unwrap();
std::fs::write(dir.path().join(CURSOR_FILE), [0u8; 10]).unwrap();
assert!(FixedReplicationCursor::load(0, dir.path()).is_none());
}
}