#![cfg(feature = "persist")]
use std::{
fs::{self, File, OpenOptions},
io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub(crate) struct Snapshot {
pub version: u8,
pub data: Vec<f64>,
}
const WAL_MAGIC: u8 = 0xAE;
const WAL_RECORD_LEN: usize = 9;
pub struct PersistenceManager {
snap_path: PathBuf,
snap_tmp_path: PathBuf,
wal_path: PathBuf,
wal_file: File,
pushes_since_snap: usize,
pub snapshot_interval: usize,
}
impl PersistenceManager {
pub fn open(dir: impl AsRef<Path>) -> io::Result<Self> {
let dir = dir.as_ref();
fs::create_dir_all(dir)?;
let snap_path = dir.join("anomalyzer.snap");
let snap_tmp_path = dir.join("anomalyzer.snap.tmp");
let wal_path = dir.join("anomalyzer.wal");
let wal_file = OpenOptions::new()
.create(true)
.append(true)
.open(&wal_path)?;
Ok(Self {
snap_path,
snap_tmp_path,
wal_path,
wal_file,
pushes_since_snap: 0,
snapshot_interval: 1_000,
})
}
pub fn recover(&self) -> io::Result<Vec<f64>> {
let mut data = self.load_snapshot()?;
self.replay_wal(&mut data)?;
Ok(data)
}
fn load_snapshot(&self) -> io::Result<Vec<f64>> {
if !self.snap_path.exists() {
return Ok(Vec::new());
}
let file = File::open(&self.snap_path)?;
let mut reader = BufReader::new(file);
let mut bytes = Vec::new();
reader.read_to_end(&mut bytes)?;
let snap: Snapshot = bincode::deserialize(&bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
if snap.version != 1 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unsupported snapshot version {}", snap.version),
));
}
Ok(snap.data)
}
fn replay_wal(&self, data: &mut Vec<f64>) -> io::Result<()> {
if !self.wal_path.exists() {
return Ok(());
}
let file = File::open(&self.wal_path)?;
let file_len = file.metadata()?.len() as usize;
let mut reader = BufReader::new(file);
let mut buf = [0u8; WAL_RECORD_LEN];
let mut offset = 0usize;
loop {
if offset + WAL_RECORD_LEN > file_len {
break;
}
match reader.read_exact(&mut buf) {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
}
if buf[0] != WAL_MAGIC {
break;
}
let value = f64::from_le_bytes(buf[1..9].try_into().unwrap());
data.push(value);
offset += WAL_RECORD_LEN;
}
Ok(())
}
pub fn record_push(&mut self, value: f64, current_data: &[f64]) -> io::Result<()> {
self.append_wal(value)?;
self.pushes_since_snap += 1;
if self.pushes_since_snap >= self.snapshot_interval {
self.compact(current_data)?;
}
Ok(())
}
fn append_wal(&mut self, value: f64) -> io::Result<()> {
let mut record = [0u8; WAL_RECORD_LEN];
record[0] = WAL_MAGIC;
record[1..9].copy_from_slice(&value.to_le_bytes());
self.wal_file.write_all(&record)?;
self.wal_file.sync_data()?; Ok(())
}
pub fn compact(&mut self, current_data: &[f64]) -> io::Result<()> {
self.write_snapshot(current_data)?;
self.truncate_wal()?;
self.pushes_since_snap = 0;
Ok(())
}
fn write_snapshot(&self, data: &[f64]) -> io::Result<()> {
let snap = Snapshot {
version: 1,
data: data.to_vec(),
};
let bytes = bincode::serialize(&snap)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
{
let tmp = File::create(&self.snap_tmp_path)?;
let mut writer = BufWriter::new(tmp);
writer.write_all(&bytes)?;
writer.flush()?;
writer.get_ref().sync_all()?;
}
fs::rename(&self.snap_tmp_path, &self.snap_path)?;
Ok(())
}
fn truncate_wal(&mut self) -> io::Result<()> {
let new_wal = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&self.wal_path)?;
self.wal_file = OpenOptions::new()
.append(true)
.open(&self.wal_path)?;
drop(new_wal);
Ok(())
}
pub fn wal_size_bytes(&self) -> io::Result<u64> {
if self.wal_path.exists() {
Ok(fs::metadata(&self.wal_path)?.len())
} else {
Ok(0)
}
}
pub fn pending_wal_entries(&self) -> usize {
self.pushes_since_snap
}
}