use std::fs::{self, OpenOptions};
use std::path::Path;
use crate::error::{DbError, DbResult};
const MAGIC: [u8; 4] = *b"RCUR";
const VERSION: u32 = 2; const CURSOR_SIZE: usize = 16;
#[derive(Debug, Clone, Copy, Default)]
pub struct ReplicationCursor {
pub last_gsn: u64,
}
impl ReplicationCursor {
pub fn load(path: &Path) -> DbResult<Option<Self>> {
let bytes = match fs::read(path) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(DbError::Io(e)),
};
if bytes.len() != CURSOR_SIZE {
return Ok(None); }
if bytes[..4] != MAGIC {
return Ok(None);
}
let version = u32::from_le_bytes(bytes[4..8].try_into().expect("len 4"));
if version != VERSION {
tracing::warn!(
path = %path.display(),
found_version = version,
expected_version = VERSION,
"replication cursor has unexpected version, discarding"
);
return Ok(None); }
let last_gsn = u64::from_le_bytes(bytes[8..16].try_into().expect("len 8"));
Ok(Some(Self { last_gsn }))
}
pub fn save(&self, path: &Path) -> DbResult<()> {
use std::io::Write;
let mut buf = [0u8; CURSOR_SIZE];
buf[..4].copy_from_slice(&MAGIC);
buf[4..8].copy_from_slice(&VERSION.to_le_bytes());
buf[8..16].copy_from_slice(&self.last_gsn.to_le_bytes());
let mut f = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path)
.map_err(DbError::Io)?;
f.write_all(&buf).map_err(DbError::Io)?;
f.sync_data().map_err(DbError::Io)?;
if let Some(parent) = path.parent()
&& let Ok(dir) = std::fs::File::open(parent)
{
let _ = dir.sync_all();
}
Ok(())
}
pub fn advance(&mut self, last_gsn: u64) {
self.last_gsn = last_gsn;
}
}
#[cfg(test)]
mod cursor_tests {
use super::*;
#[test]
fn from_gsn_saturates_at_u64_max() {
let cursor = ReplicationCursor { last_gsn: u64::MAX };
assert_eq!(cursor.last_gsn.saturating_add(1), u64::MAX);
}
}