use std::path::{Path, PathBuf};
use std::sync::Mutex;
use nodedb_wal::record::RecordType;
use nodedb_wal::segmented::{SegmentedWal, SegmentedWalConfig};
pub struct AuditWalSegment {
wal: Mutex<SegmentedWal>,
}
impl AuditWalSegment {
pub fn open(audit_dir: &Path, use_direct_io: bool) -> crate::Result<Self> {
std::fs::create_dir_all(audit_dir).map_err(|e| crate::Error::Storage {
engine: "audit_wal".into(),
detail: format!("failed to create audit WAL directory: {e}"),
})?;
let mut config = SegmentedWalConfig::new(PathBuf::from(audit_dir));
config.segment_target_size = 256 * 1024 * 1024;
config.writer_config.use_direct_io = use_direct_io;
let wal = SegmentedWal::open(config).map_err(crate::Error::Wal)?;
Ok(Self {
wal: Mutex::new(wal),
})
}
pub fn append(&self, audit_bytes: &[u8], data_lsn: u64) -> crate::Result<u64> {
let mut wal = self.wal.lock().map_err(|_| crate::Error::Internal {
detail: "audit WAL lock poisoned".into(),
})?;
let mut payload = Vec::with_capacity(8 + audit_bytes.len());
payload.extend_from_slice(&data_lsn.to_le_bytes());
payload.extend_from_slice(audit_bytes);
let lsn = wal
.append(RecordType::Put as u32, 0, 0, 0, &payload)
.map_err(crate::Error::Wal)?;
Ok(lsn)
}
pub fn sync(&self) -> crate::Result<()> {
let mut wal = self.wal.lock().map_err(|_| crate::Error::Internal {
detail: "audit WAL lock poisoned".into(),
})?;
wal.sync().map_err(crate::Error::Wal)
}
pub fn append_durable(&self, audit_bytes: &[u8], data_lsn: u64) -> crate::Result<u64> {
let mut wal = self.wal.lock().map_err(|_| crate::Error::Internal {
detail: "audit WAL lock poisoned".into(),
})?;
let mut payload = Vec::with_capacity(8 + audit_bytes.len());
payload.extend_from_slice(&data_lsn.to_le_bytes());
payload.extend_from_slice(audit_bytes);
let lsn = wal
.append(RecordType::Put as u32, 0, 0, 0, &payload)
.map_err(crate::Error::Wal)?;
wal.sync().map_err(crate::Error::Wal)?;
Ok(lsn)
}
pub fn recover(&self) -> crate::Result<Vec<(u64, Vec<u8>)>> {
let wal = self.wal.lock().map_err(|_| crate::Error::Internal {
detail: "audit WAL lock poisoned".into(),
})?;
let records = wal.replay().map_err(crate::Error::Wal)?;
let mut entries = Vec::with_capacity(records.len());
for record in records {
if record.payload.len() < 8 {
tracing::warn!(
payload_len = record.payload.len(),
"skipping malformed audit WAL record (payload < 8 bytes)"
);
continue;
}
let data_lsn = u64::from_le_bytes(
record.payload[..8]
.try_into()
.expect("length checked above"),
);
let audit_bytes = record.payload[8..].to_vec();
entries.push((data_lsn, audit_bytes));
}
Ok(entries)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_and_recover() {
let dir = tempfile::tempdir().expect("create temp dir");
let audit_dir = dir.path().join("audit.wal");
let segment = AuditWalSegment::open(&audit_dir, false).expect("open audit WAL");
let lsn1 = segment
.append_durable(b"audit-entry-1", 100)
.expect("append entry 1");
let lsn2 = segment
.append_durable(b"audit-entry-2", 200)
.expect("append entry 2");
assert!(lsn2 > lsn1);
let recovered = segment.recover().expect("recover audit WAL");
assert_eq!(recovered.len(), 2);
assert_eq!(recovered[0].0, 100); assert_eq!(recovered[0].1, b"audit-entry-1");
assert_eq!(recovered[1].0, 200);
assert_eq!(recovered[1].1, b"audit-entry-2");
}
#[test]
fn empty_recovery() {
let dir = tempfile::tempdir().expect("create temp dir");
let audit_dir = dir.path().join("audit.wal");
let segment = AuditWalSegment::open(&audit_dir, false).expect("open audit WAL");
let recovered = segment.recover().expect("recover empty audit WAL");
assert!(recovered.is_empty());
}
}