use std::sync::Arc;
use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;
use uni_common::core::fork::ForkId;
use crate::runtime::wal::WriteAheadLog;
#[must_use]
pub fn wal_prefix(fork_id: &ForkId) -> ObjectStorePath {
ObjectStorePath::from(format!("wal_forks/{fork_id}"))
}
#[must_use]
pub fn new_for_fork(store: Arc<dyn ObjectStore>, fork_id: &ForkId) -> WriteAheadLog {
WriteAheadLog::new(store, wal_prefix(fork_id))
}
#[must_use]
pub fn new_for_fork_arc(store: Arc<dyn ObjectStore>, fork_id: &ForkId) -> Arc<WriteAheadLog> {
Arc::new(new_for_fork(store, fork_id))
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::local::LocalFileSystem;
use tempfile::TempDir;
use crate::runtime::wal::Mutation;
use uni_common::Vid;
async fn fresh_store() -> (TempDir, Arc<dyn ObjectStore>) {
let dir = TempDir::new().unwrap();
let store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(dir.path()).unwrap());
(dir, store)
}
#[tokio::test]
async fn prefix_includes_fork_id_under_wal_forks() {
let id = ForkId::new();
let p = wal_prefix(&id);
let s = p.to_string();
assert!(s.starts_with("wal_forks/"), "got {s}");
assert!(s.contains(&id.to_string()));
assert!(
!s.starts_with("wal/"),
"fork WAL must not nest under primary WAL"
);
}
#[tokio::test]
async fn fork_wal_initialize_yields_zero_lsn_on_empty_dir() {
let (_dir, store) = fresh_store().await;
let id = ForkId::new();
let wal = new_for_fork(store, &id);
let max = wal.initialize().await.unwrap();
assert_eq!(max, 0, "empty fork WAL has max LSN 0");
}
#[tokio::test]
async fn append_flush_replay_roundtrip_on_fork_wal() {
let (dir, store) = fresh_store().await;
let id = ForkId::new();
let wal = new_for_fork(store.clone(), &id);
wal.initialize().await.unwrap();
wal.append(&Mutation::DeleteVertex {
vid: Vid::new(7),
labels: vec![],
})
.unwrap();
wal.append(&Mutation::DeleteVertex {
vid: Vid::new(8),
labels: vec![],
})
.unwrap();
let lsn = wal.flush().await.unwrap();
assert!(lsn >= 1, "flush returned LSN {lsn}");
let wal2 = new_for_fork(store.clone(), &id);
let max = wal2.initialize().await.unwrap();
assert_eq!(max, lsn);
let replayed = wal2.replay().await.unwrap();
assert_eq!(replayed.len(), 2);
assert!(matches!(&replayed[0], Mutation::DeleteVertex { vid, .. } if *vid == Vid::new(7)));
assert!(matches!(&replayed[1], Mutation::DeleteVertex { vid, .. } if *vid == Vid::new(8)));
let _ = dir;
}
#[tokio::test]
async fn primary_wal_unaffected_by_fork_wal_writes() {
let (_dir, store) = fresh_store().await;
let id = ForkId::new();
let fork_wal = new_for_fork(store.clone(), &id);
fork_wal.initialize().await.unwrap();
fork_wal
.append(&Mutation::DeleteVertex {
vid: Vid::new(99),
labels: vec![],
})
.unwrap();
fork_wal.flush().await.unwrap();
let primary_wal = WriteAheadLog::new(store, ObjectStorePath::from("wal"));
let max = primary_wal.initialize().await.unwrap();
assert_eq!(max, 0, "primary WAL must not see fork-side segments");
}
#[tokio::test]
async fn two_forks_have_independent_wal_prefixes() {
let (_dir, store) = fresh_store().await;
let id_a = ForkId::new();
let id_b = ForkId::new();
let wal_a = new_for_fork(store.clone(), &id_a);
let wal_b = new_for_fork(store.clone(), &id_b);
wal_a.initialize().await.unwrap();
wal_b.initialize().await.unwrap();
wal_a
.append(&Mutation::DeleteVertex {
vid: Vid::new(1),
labels: vec![],
})
.unwrap();
wal_a.flush().await.unwrap();
let max_b = WriteAheadLog::new(store, wal_prefix(&id_b))
.initialize()
.await
.unwrap();
assert_eq!(max_b, 0);
}
}