use std::io::BufWriter;
use std::path::{Path, PathBuf};
use crate::collection::graph::{ConcurrentEdgeStore, GraphEdge};
use crate::error::{Error, Result};
use crate::index::wal_framing;
const CTX: &str = "Edge WAL";
const WAL_OP_ADD: u8 = 0x01;
const WAL_OP_REMOVE: u8 = 0x02;
const WAL_OP_REMOVE_NODE: u8 = 0x03;
const EDGE_WAL_FILENAME: &str = "edges.wal";
const ID_ENTRY_BODY_LEN: usize = 1 + 8;
#[must_use]
pub(crate) fn wal_path_for_edges(dir: &Path) -> PathBuf {
dir.join(EDGE_WAL_FILENAME)
}
pub(crate) fn wal_append_add(wal_path: &Path, edge: &GraphEdge) -> Result<()> {
let mut w = wal_framing::open_wal_writer(wal_path, CTX)?;
write_add_entry(&mut w, edge)?;
wal_framing::flush_wal(&mut w, CTX)
}
pub(crate) fn wal_append_add_batch(wal_path: &Path, edges: &[GraphEdge]) -> Result<()> {
if edges.is_empty() {
return Ok(());
}
let mut w = wal_framing::open_wal_writer(wal_path, CTX)?;
for edge in edges {
write_add_entry(&mut w, edge)?;
}
wal_framing::flush_wal(&mut w, CTX)
}
fn write_add_entry(w: &mut BufWriter<std::fs::File>, edge: &GraphEdge) -> Result<()> {
let edge_bytes = serde_json::to_vec(edge)
.map_err(|e| Error::Index(format!("Edge WAL: serialize edge: {e}")))?;
let body_len = add_entry_body_len(edge_bytes.len())?;
wal_framing::wal_write(w, &body_len.to_le_bytes(), CTX)?;
wal_framing::wal_write(w, &[WAL_OP_ADD], CTX)?;
wal_framing::wal_write(w, &edge_bytes, CTX)
}
fn add_entry_body_len(edge_len: usize) -> Result<u32> {
let total = edge_len
.checked_add(1)
.ok_or_else(|| Error::Index("Edge WAL: add entry length overflow".to_string()))?;
u32::try_from(total)
.map_err(|_| Error::Index(format!("Edge WAL: add entry too large ({total} bytes)")))
}
pub(crate) fn wal_append_remove(wal_path: &Path, edge_id: u64) -> Result<()> {
append_id_entry(wal_path, WAL_OP_REMOVE, edge_id)
}
pub(crate) fn wal_append_remove_node(wal_path: &Path, node_id: u64) -> Result<()> {
append_id_entry(wal_path, WAL_OP_REMOVE_NODE, node_id)
}
fn append_id_entry(wal_path: &Path, op: u8, id: u64) -> Result<()> {
let body_len = u32::try_from(ID_ENTRY_BODY_LEN)
.map_err(|_| Error::Index("Edge WAL: id entry header too large".to_string()))?;
let mut w = wal_framing::open_wal_writer(wal_path, CTX)?;
wal_framing::wal_write(&mut w, &body_len.to_le_bytes(), CTX)?;
wal_framing::wal_write(&mut w, &[op], CTX)?;
wal_framing::wal_write(&mut w, &id.to_le_bytes(), CTX)?;
wal_framing::flush_wal(&mut w, CTX)
}
pub(crate) fn wal_truncate(wal_path: &Path) -> Result<()> {
wal_framing::wal_truncate(wal_path, CTX)
}
pub(crate) enum ReplayOp {
Add(GraphEdge),
Remove(u64),
RemoveNode(u64),
}
pub(crate) fn wal_replay<F>(
wal_path: &Path,
store: &ConcurrentEdgeStore,
mut on_op: F,
) -> Result<u64>
where
F: FnMut(&ReplayOp),
{
let data = match std::fs::read(wal_path) {
Ok(d) => d,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
Err(e) => return Err(Error::Index(format!("Edge WAL read: {e}"))),
};
let mut pos = 0usize;
let mut count = 0u64;
while pos < data.len() {
let Some((body_start, body_len)) = read_entry_header(&data, pos) else {
break;
};
if body_start + body_len > data.len() {
tracing::warn!(
"Edge WAL truncated at offset {body_start}: declared {body_len} bytes but only {} remain",
data.len() - body_start
);
break;
}
let op = data[body_start];
let body = &data[body_start + 1..body_start + body_len];
if let Some(replay_op) = decode_entry(op, body, body_start) {
apply_replay_op(store, &replay_op);
on_op(&replay_op);
count += 1;
}
pos = body_start + body_len;
}
Ok(count)
}
fn read_entry_header(data: &[u8], pos: usize) -> Option<(usize, usize)> {
let (body_start, body_len) = wal_framing::read_entry_header(data, pos, CTX)?;
if body_len == 0 {
tracing::warn!("Edge WAL zero-length entry at offset {pos}");
return None;
}
Some((body_start, body_len))
}
fn decode_entry(op: u8, body: &[u8], body_start: usize) -> Option<ReplayOp> {
match op {
WAL_OP_ADD => decode_add(body, body_start),
WAL_OP_REMOVE => decode_id(body, body_start).map(ReplayOp::Remove),
WAL_OP_REMOVE_NODE => decode_id(body, body_start).map(ReplayOp::RemoveNode),
unknown => {
tracing::warn!("Edge WAL unknown op 0x{unknown:02x} at offset {body_start}");
None
}
}
}
fn decode_add(body: &[u8], body_start: usize) -> Option<ReplayOp> {
match serde_json::from_slice::<GraphEdge>(body) {
Ok(edge) => Some(ReplayOp::Add(edge)),
Err(e) => {
tracing::warn!("Edge WAL undecodable add entry at offset {body_start}: {e}");
None
}
}
}
fn decode_id(body: &[u8], body_start: usize) -> Option<u64> {
let Ok(bytes) = <[u8; 8]>::try_from(body) else {
tracing::warn!("Edge WAL malformed id entry at offset {body_start}");
return None;
};
Some(u64::from_le_bytes(bytes))
}
fn apply_replay_op(store: &ConcurrentEdgeStore, op: &ReplayOp) {
match op {
ReplayOp::Add(edge) => match store.add_edge(edge.clone()) {
Ok(()) | Err(Error::EdgeExists(_)) => {}
Err(e) => tracing::warn!("Edge WAL replay add failed for edge {}: {e}", edge.id()),
},
ReplayOp::Remove(id) => {
store.remove_edge(*id);
}
ReplayOp::RemoveNode(id) => store.remove_node_edges(*id),
}
}