use crate::error::{Result, TriviumError};
use crate::node::NodeId;
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Serialize, Deserialize)]
pub enum WalEntry<T> {
TxBegin {
tx_id: u64,
},
TxCommit {
tx_id: u64,
},
Insert {
id: NodeId,
vector: Vec<T>,
payload: String, },
Link {
src: NodeId,
dst: NodeId,
label: String,
weight: f32,
},
Delete {
id: NodeId,
},
Unlink {
src: NodeId,
dst: NodeId,
},
UpdatePayload {
id: NodeId,
payload: String, },
UpdateVector {
id: NodeId,
vector: Vec<T>,
},
}
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Default)]
pub enum SyncMode {
Full,
#[default]
Normal,
Off,
}
pub struct Wal {
wal_path: PathBuf,
writer: Option<BufWriter<File>>,
sync_mode: SyncMode,
}
impl Wal {
pub fn open(db_path: &str) -> Result<Self> {
Self::open_with_sync(db_path, SyncMode::default())
}
pub fn open_with_sync(db_path: &str, sync_mode: SyncMode) -> Result<Self> {
let wal_path = PathBuf::from(format!("{}.wal", db_path));
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&wal_path)?;
Ok(Self {
wal_path,
writer: Some(BufWriter::new(file)),
sync_mode,
})
}
pub fn set_sync_mode(&mut self, mode: SyncMode) {
self.sync_mode = mode;
}
pub fn append<T: serde::Serialize>(&mut self, entry: &WalEntry<T>) -> Result<()> {
if let Some(ref mut writer) = self.writer {
let data = bincode::serialize(entry).map_err(TriviumError::Serialization)?;
let checksum = crc32fast::hash(&data);
let len = data.len() as u32;
writer.write_all(&len.to_le_bytes())?;
writer.write_all(&data)?;
writer.write_all(&checksum.to_le_bytes())?;
match self.sync_mode {
SyncMode::Full => {
writer.flush()?;
writer.get_ref().sync_data()?; }
SyncMode::Normal => {
writer.flush()?; }
SyncMode::Off => {
}
}
Ok(())
} else {
Err(TriviumError::Generic("WAL writer closed".into()))
}
}
pub fn append_batch<T: serde::Serialize>(
&mut self,
tx_id: u64,
entries: &[WalEntry<T>],
) -> Result<()> {
if let Some(ref mut writer) = self.writer {
let mut write_single = |entry: &WalEntry<T>| -> Result<()> {
let data = bincode::serialize(entry).map_err(TriviumError::Serialization)?;
let checksum = crc32fast::hash(&data);
let len = data.len() as u32;
writer.write_all(&len.to_le_bytes())?;
writer.write_all(&data)?;
writer.write_all(&checksum.to_le_bytes())?;
Ok(())
};
write_single(&WalEntry::TxBegin { tx_id })?;
for e in entries {
write_single(e)?;
}
write_single(&WalEntry::TxCommit { tx_id })?;
match self.sync_mode {
SyncMode::Full => {
writer.flush()?;
writer.get_ref().sync_data()?;
}
SyncMode::Normal => {
writer.flush()?;
}
SyncMode::Off => {}
}
Ok(())
} else {
Err(TriviumError::Generic("WAL writer closed".into()))
}
}
pub fn read_entries<T: serde::de::DeserializeOwned>(db_path: &str) -> Result<(Vec<WalEntry<T>>, u64)> {
let wal_path = format!("{}.wal", db_path);
if !Path::new(&wal_path).exists() {
return Ok((Vec::new(), 0));
}
let file = File::open(&wal_path)?;
let mut reader = BufReader::new(file);
let mut entries_with_offset = Vec::new();
let mut physical_offset: u64 = 0;
loop {
let mut len_buf = [0u8; 4];
match reader.read_exact(&mut len_buf) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(TriviumError::Io(e)),
}
let len = u32::from_le_bytes(len_buf) as usize;
if len > 256 * 1024 * 1024 {
break; }
let mut data = vec![0u8; len];
match reader.read_exact(&mut data) {
Ok(_) => {}
Err(_) => break, }
let mut crc_buf = [0u8; 4];
match reader.read_exact(&mut crc_buf) {
Ok(_) => {}
Err(_) => break, }
let stored_crc = u32::from_le_bytes(crc_buf);
let computed_crc = crc32fast::hash(&data);
if stored_crc != computed_crc {
tracing::error!(
"WAL CRC mismatch at entry {}: stored={:#010x}, computed={:#010x}. Stopping recovery.",
entries_with_offset.len(),
stored_crc,
computed_crc
);
break;
}
physical_offset += 4 + (len as u64) + 4;
match bincode::deserialize::<WalEntry<T>>(&data) {
Ok(entry) => entries_with_offset.push((entry, physical_offset)),
Err(e) => {
tracing::error!(
"WAL Deserialize error at entry {}: {}. Stopping recovery.",
entries_with_offset.len(),
e
);
break;
}
}
}
let mut committed = Vec::new();
let mut pending_tx = Vec::new();
let mut in_tx = false;
let mut current_tx_id = 0;
let mut safe_commit_offset = 0;
for (entry, offset) in entries_with_offset {
match entry {
WalEntry::TxBegin { tx_id } => {
in_tx = true;
current_tx_id = tx_id;
pending_tx.clear(); }
WalEntry::TxCommit { tx_id } => {
if in_tx && tx_id == current_tx_id {
committed.append(&mut pending_tx);
in_tx = false;
safe_commit_offset = offset; }
}
other => {
if in_tx {
pending_tx.push(other);
} else {
committed.push(other);
safe_commit_offset = offset;
}
}
}
}
if in_tx && !pending_tx.is_empty() {
tracing::warn!(
"Discarded a partial transaction ({} operations) due to missing TxCommit (Power loss simulation). Truncating WAL to offset {}.",
pending_tx.len(),
safe_commit_offset
);
}
Ok((committed, safe_commit_offset))
}
pub fn clear(&mut self) -> Result<()> {
self.writer.take();
let mode = self.sync_mode;
{
let file = OpenOptions::new()
.write(true)
.truncate(true)
.open(&self.wal_path)?;
file.sync_all()?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.wal_path)?;
self.writer = Some(BufWriter::new(file));
self.sync_mode = mode;
Ok(())
}
pub fn flush_writer(&mut self) {
if let Some(ref mut writer) = self.writer {
let _ = writer.flush();
let _ = writer.get_ref().sync_all();
}
}
pub fn needs_recovery(db_path: &str) -> bool {
let wal_path = format!("{}.wal", db_path);
match std::fs::metadata(&wal_path) {
Ok(meta) => meta.len() > 0,
Err(_) => false,
}
}
}