use crate::store_utils::{
DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
};
use anyhow::Result;
use metrics;
use object_store::ObjectStore;
use object_store::path::Path;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use tracing::{debug, info, instrument, warn};
use uni_common::Properties;
use uni_common::core::id::{Eid, Vid};
use uni_common::sync::acquire_mutex;
use uuid::Uuid;
fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
let filename = path.filename()?;
if filename.len() < 20 {
return None;
}
filename.get(..20).and_then(|s| s.parse::<u64>().ok())
}
const WAL_V2_MAGIC: &[u8] = b"UNIWAL2\n";
const WAL_V2_HASH_HEX_LEN: usize = 64;
fn encode_segment_envelope(payload_json: &[u8]) -> Vec<u8> {
let hash = blake3::hash(payload_json);
let mut out =
Vec::with_capacity(WAL_V2_MAGIC.len() + WAL_V2_HASH_HEX_LEN + 1 + payload_json.len());
out.extend_from_slice(WAL_V2_MAGIC);
out.extend_from_slice(hash.to_hex().as_bytes());
out.push(b'\n');
out.extend_from_slice(payload_json);
out
}
#[doc(hidden)]
pub fn decode_segment(bytes: &[u8]) -> std::result::Result<WalSegment, String> {
if let Some(rest) = bytes.strip_prefix(WAL_V2_MAGIC) {
if rest.len() < WAL_V2_HASH_HEX_LEN + 1 || rest[WAL_V2_HASH_HEX_LEN] != b'\n' {
return Err("truncated v2 segment header".to_string());
}
let (hex, payload_nl) = rest.split_at(WAL_V2_HASH_HEX_LEN);
let payload = &payload_nl[1..];
let expected =
std::str::from_utf8(hex).map_err(|_| "non-utf8 checksum header".to_string())?;
let actual = blake3::hash(payload);
if actual.to_hex().as_str() != expected {
return Err(format!(
"checksum mismatch (expected {expected}, computed {})",
actual.to_hex()
));
}
serde_json::from_slice(payload).map_err(|e| format!("v2 payload parse: {e}"))
} else {
serde_json::from_slice(bytes).map_err(|e| format!("legacy segment parse: {e}"))
}
}
#[cfg(test)]
pub(crate) static FAIL_NEXT_FSYNC: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
pub(crate) fn sync_file_and_parent(path: &std::path::Path) -> std::io::Result<()> {
std::fs::File::open(path)?.sync_all()?;
#[cfg(unix)]
if let Some(dir) = path.parent() {
std::fs::File::open(dir)?.sync_all()?;
}
Ok(())
}
mod cv_props {
use base64::Engine;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashMap;
use uni_common::{Properties, Value};
const CV_PREFIX: &str = "\u{1}uni_cv:";
pub fn serialize<S: Serializer>(props: &Properties, s: S) -> Result<S::Ok, S::Error> {
let engine = base64::engine::general_purpose::STANDARD;
let encoded: HashMap<&String, String> = props
.iter()
.map(|(k, v)| {
let bytes = uni_common::cypher_value_codec::encode(v);
(k, format!("{CV_PREFIX}{}", engine.encode(bytes)))
})
.collect();
encoded.serialize(s)
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Properties, D::Error> {
let raw: HashMap<String, serde_json::Value> = HashMap::deserialize(d)?;
let engine = base64::engine::general_purpose::STANDARD;
let mut out = Properties::with_capacity(raw.len());
for (k, jv) in raw {
let value = match &jv {
serde_json::Value::String(s) if s.starts_with(CV_PREFIX) => {
let bytes = engine
.decode(&s[CV_PREFIX.len()..])
.map_err(serde::de::Error::custom)?;
uni_common::cypher_value_codec::decode(&bytes)
.map_err(serde::de::Error::custom)?
}
_ => serde_json::from_value::<Value>(jv).map_err(serde::de::Error::custom)?,
};
out.insert(k, value);
}
Ok(out)
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Mutation {
InsertEdge {
src_vid: Vid,
dst_vid: Vid,
edge_type: u32,
eid: Eid,
version: u64,
#[serde(with = "cv_props")]
properties: Properties,
#[serde(default)]
edge_type_name: Option<String>,
},
DeleteEdge {
eid: Eid,
src_vid: Vid,
dst_vid: Vid,
edge_type: u32,
version: u64,
},
InsertVertex {
vid: Vid,
#[serde(with = "cv_props")]
properties: Properties,
#[serde(default)]
labels: Vec<String>,
},
DeleteVertex {
vid: Vid,
#[serde(default)]
labels: Vec<String>,
},
SetVertexLabels { vid: Vid, labels: Vec<String> },
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WalSegment {
pub lsn: u64,
pub mutations: Vec<Mutation>,
}
#[derive(Serialize, Debug)]
struct WalSegmentRef<'a> {
lsn: u64,
mutations: &'a [Mutation],
}
pub struct WriteAheadLog {
store: Arc<dyn ObjectStore>,
prefix: Path,
local_root: Option<std::path::PathBuf>,
state: Mutex<WalState>,
}
struct WalState {
buffer: Vec<Mutation>,
next_lsn: u64,
flushed_lsn: u64,
}
impl WriteAheadLog {
pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
Self {
store,
prefix,
local_root: None,
state: Mutex::new(WalState {
buffer: Vec::new(),
next_lsn: 1, flushed_lsn: 0,
}),
}
}
#[must_use]
pub fn with_local_root(mut self, local_root: Option<std::path::PathBuf>) -> Self {
self.local_root = local_root;
self
}
pub async fn initialize(&self) -> Result<u64> {
let max_lsn = self.find_max_lsn().await?;
{
let mut state = acquire_mutex(&self.state, "wal_state")?;
state.next_lsn = max_lsn + 1;
state.flushed_lsn = max_lsn;
}
Ok(max_lsn)
}
async fn find_max_lsn(&self) -> Result<u64> {
let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
let mut max_lsn: u64 = 0;
for meta in metas {
if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
max_lsn = max_lsn.max(lsn);
} else {
warn!(
path = %meta.location,
"WAL filename doesn't match expected format, downloading segment"
);
let get_result =
get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
let bytes = get_result.bytes().await?;
if bytes.is_empty() {
continue;
}
match decode_segment(&bytes) {
Ok(segment) => max_lsn = max_lsn.max(segment.lsn),
Err(reason) => {
warn!(path = %meta.location, reason = %reason,
"Skipping corrupt WAL segment during max-LSN probe");
}
}
}
}
Ok(max_lsn)
}
#[instrument(skip(self, mutation), level = "trace")]
pub fn append(&self, mutation: Mutation) -> Result<()> {
let mut state = acquire_mutex(&self.state, "wal_state")?;
state.buffer.push(mutation);
metrics::counter!("uni_wal_entries_total").increment(1);
Ok(())
}
#[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
pub async fn flush(&self) -> Result<u64> {
let start = std::time::Instant::now();
let (batch, lsn) = {
let mut state = acquire_mutex(&self.state, "wal_state")?;
if state.buffer.is_empty() {
return Ok(state.flushed_lsn);
}
let lsn = state.next_lsn;
state.next_lsn += 1;
(std::mem::take(&mut state.buffer), lsn)
};
tracing::Span::current().record("lsn", lsn);
tracing::Span::current().record("mutations_count", batch.len());
let segment = WalSegmentRef {
lsn,
mutations: &batch,
};
let json = match serde_json::to_vec(&segment) {
Ok(j) => j,
Err(e) => {
warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
let mut state = acquire_mutex(&self.state, "wal_state")?;
let new_mutations = std::mem::take(&mut state.buffer);
state.buffer = batch;
state.buffer.extend(new_mutations);
return Err(e.into());
}
};
let body = encode_segment_envelope(&json);
tracing::Span::current().record("size_bytes", body.len());
metrics::counter!("uni_wal_bytes_written_total").increment(body.len() as u64);
let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
let path = self.prefix.clone().join(filename);
if let Err(e) = put_with_timeout(&self.store, &path, body.into(), DEFAULT_TIMEOUT).await {
warn!(
lsn,
error = %e,
"Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
);
let mut state = acquire_mutex(&self.state, "wal_state")?;
let new_mutations = std::mem::take(&mut state.buffer);
state.buffer = batch;
state.buffer.extend(new_mutations);
return Err(e);
}
if let Some(root) = &self.local_root {
let file_path = root.join(path.as_ref());
#[cfg(test)]
let synced = if FAIL_NEXT_FSYNC.swap(false, std::sync::atomic::Ordering::SeqCst) {
Ok(Err(std::io::Error::other("injected fsync failure")))
} else {
tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await
};
#[cfg(not(test))]
let synced =
tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await;
let fsync_err: Option<anyhow::Error> = match synced {
Ok(Ok(())) => None,
Ok(Err(e)) => Some(e.into()),
Err(e) => Some(e.into()),
};
if let Some(err) = fsync_err {
warn!(
lsn,
error = %err,
"WAL segment fsync failed — deleting the non-durable segment to avoid a ghost commit on replay"
);
if let Err(del_err) = delete_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await
{
return Err(anyhow::anyhow!(
"WAL segment fsync failed ({err}) and the cleanup delete \
of segment at lsn {lsn} also failed ({del_err}); the WAL \
may contain a non-durable segment"
));
}
return Err(err);
}
}
{
let mut state = acquire_mutex(&self.state, "wal_state")?;
state.flushed_lsn = lsn;
}
let duration = start.elapsed();
metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
if duration.as_millis() > 100 {
warn!(
lsn,
duration_ms = duration.as_millis(),
"Slow WAL flush detected"
);
} else {
debug!(
lsn,
duration_ms = duration.as_millis(),
"WAL flush completed"
);
}
Ok(lsn)
}
pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
Ok(guard.flushed_lsn)
}
#[instrument(skip(self), level = "debug")]
pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
let start = std::time::Instant::now();
debug!(high_water_mark, "Replaying WAL segments");
let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
let mut mutations = Vec::new();
let mut paths: Vec<_> = metas
.into_iter()
.map(|m| m.location)
.filter(|p| {
parse_lsn_from_filename(p).is_none_or(|lsn| lsn > high_water_mark)
})
.collect();
paths.sort();
let mut segments_replayed = 0;
for (idx, path) in paths.iter().enumerate() {
let get_result = get_with_timeout(&self.store, path, DEFAULT_TIMEOUT).await?;
let bytes = get_result.bytes().await?;
let decoded = if bytes.is_empty() {
Err("empty segment file".to_string())
} else {
decode_segment(&bytes)
};
let segment = match decoded {
Ok(segment) => segment,
Err(reason) => {
let is_tail = idx + 1 == paths.len();
if is_tail {
warn!(
path = %path,
reason = %reason,
"Corrupt tail WAL segment — torn write from a crash; \
treating as end of WAL (the commit was never acknowledged)"
);
break;
}
return Err(anyhow::anyhow!(
"corrupt WAL segment '{path}' ({reason}) with {} later segment(s) \
present; refusing to skip — manual inspection required",
paths.len() - idx - 1
));
}
};
if segment.lsn > high_water_mark {
mutations.extend(segment.mutations);
segments_replayed += 1;
}
}
info!(
segments_replayed,
mutations_count = mutations.len(),
"WAL replay completed"
);
metrics::histogram!("uni_wal_replay_duration_seconds")
.record(start.elapsed().as_secs_f64());
Ok(mutations)
}
pub async fn replay(&self) -> Result<Vec<Mutation>> {
self.replay_since(0).await
}
#[instrument(skip(self), level = "info")]
pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
info!(high_water_mark, "Truncating WAL segments");
let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
let mut deleted_count = 0;
for meta in metas {
let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
lsn <= high_water_mark
} else {
warn!(
path = %meta.location,
"WAL filename doesn't match expected format, downloading segment"
);
let get_result =
get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
let bytes = get_result.bytes().await?;
if bytes.is_empty() {
true
} else {
match decode_segment(&bytes) {
Ok(segment) => segment.lsn <= high_water_mark,
Err(reason) => {
warn!(path = %meta.location, reason = %reason,
"Keeping corrupt WAL segment during truncation");
false
}
}
}
};
if should_delete {
delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
deleted_count += 1;
}
}
info!(deleted_count, "WAL truncation completed");
Ok(())
}
pub async fn has_segments(&self) -> Result<bool> {
let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
Ok(!metas.is_empty())
}
pub async fn truncate(&self) -> Result<()> {
info!("Truncating all WAL segments");
let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
let mut deleted_count = 0;
for meta in metas {
delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
deleted_count += 1;
}
info!(deleted_count, "Full WAL truncation completed");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::ObjectStoreExt;
use object_store::local::LocalFileSystem;
use std::collections::HashMap;
use tempfile::tempdir;
#[tokio::test]
async fn test_wal_append_replay() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = WriteAheadLog::new(store, prefix);
let mutation = Mutation::InsertVertex {
vid: Vid::new(1),
properties: HashMap::new(),
labels: vec![],
};
wal.append(mutation)?;
wal.flush().await?;
let mutations = wal.replay().await?;
assert_eq!(mutations.len(), 1);
if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
} else {
panic!("Wrong mutation type");
}
wal.truncate().await?;
let mutations2 = wal.replay().await?;
assert_eq!(mutations2.len(), 0);
Ok(())
}
#[tokio::test]
async fn test_lsn_monotonicity() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = WriteAheadLog::new(store, prefix);
let mutation1 = Mutation::InsertVertex {
vid: Vid::new(1),
properties: HashMap::new(),
labels: vec![],
};
let mutation2 = Mutation::InsertVertex {
vid: Vid::new(2),
properties: HashMap::new(),
labels: vec![],
};
let mutation3 = Mutation::InsertVertex {
vid: Vid::new(3),
properties: HashMap::new(),
labels: vec![],
};
wal.append(mutation1)?;
let lsn1 = wal.flush().await?;
wal.append(mutation2)?;
let lsn2 = wal.flush().await?;
wal.append(mutation3)?;
let lsn3 = wal.flush().await?;
assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
assert_eq!(lsn2, lsn1 + 1);
assert_eq!(lsn3, lsn2 + 1);
Ok(())
}
#[tokio::test]
async fn fsync_failure_deletes_segment_no_ghost_commit() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = WriteAheadLog::new(store, prefix).with_local_root(Some(dir.path().to_path_buf()));
wal.append(Mutation::InsertVertex {
vid: Vid::new(1),
properties: HashMap::new(),
labels: vec![],
})?;
FAIL_NEXT_FSYNC.store(true, std::sync::atomic::Ordering::SeqCst);
let result = wal.flush().await;
assert!(
result.is_err(),
"flush must report failure when the segment fsync fails"
);
let replayed = wal.replay().await?;
assert!(
replayed.is_empty(),
"a segment whose fsync failed must not be replayable (ghost commit); got {} mutations",
replayed.len()
);
Ok(())
}
#[test]
fn test_parse_lsn_from_filename() {
let path = Path::from("00000000000000000042_a1b2c3d4.wal");
assert_eq!(parse_lsn_from_filename(&path), Some(42));
let path = Path::from("00000000000000001234_e5f6a7b8.wal");
assert_eq!(parse_lsn_from_filename(&path), Some(1234));
let path = Path::from("00000000000000000001_xyz.wal");
assert_eq!(parse_lsn_from_filename(&path), Some(1));
let path = Path::from("12345678901234567890_uuid.wal");
assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
let path = Path::from("invalid.wal");
assert_eq!(parse_lsn_from_filename(&path), None);
let path = Path::from("123.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
let path = Path::from("00000000000000000100.wal");
assert_eq!(parse_lsn_from_filename(&path), Some(100));
let path = Path::from("");
assert_eq!(parse_lsn_from_filename(&path), None);
}
#[test]
fn test_parse_lsn_from_filename_multibyte_no_panic() {
let name = format!("{}{}.wal", "0".repeat(19), "é"); let path = Path::parse(name).expect("multi-byte segment is a valid object_store path");
assert_eq!(parse_lsn_from_filename(&path), None); }
#[tokio::test]
async fn test_find_max_lsn_scalability() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = WriteAheadLog::new(store, prefix);
for i in 1..=100 {
let mutation = Mutation::InsertVertex {
vid: Vid::new(i),
properties: HashMap::new(),
labels: vec![],
};
wal.append(mutation)?;
wal.flush().await?;
}
let start = std::time::Instant::now();
let max_lsn = wal.find_max_lsn().await?;
let duration = start.elapsed();
assert_eq!(max_lsn, 100, "Max LSN should be 100");
assert!(
duration.as_millis() < 1000,
"find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
duration.as_millis()
);
Ok(())
}
#[tokio::test]
async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = WriteAheadLog::new(store.clone(), prefix.clone());
wal.append(Mutation::InsertVertex {
vid: Vid::new(1),
properties: HashMap::new(),
labels: vec![],
})?;
let lsn1 = wal.flush().await?;
assert_eq!(lsn1, 1);
wal.append(Mutation::InsertVertex {
vid: Vid::new(2),
properties: HashMap::new(),
labels: vec![],
})?;
let lsn2 = wal.flush().await?;
assert_eq!(lsn2, 2);
wal.append(Mutation::InsertVertex {
vid: Vid::new(3),
properties: HashMap::new(),
labels: vec![],
})?;
wal.append(Mutation::InsertVertex {
vid: Vid::new(4),
properties: HashMap::new(),
labels: vec![],
})?;
let lsn4 = wal.flush().await?;
assert_eq!(lsn4, 3, "LSN should increment monotonically");
let mutations = wal.replay().await?;
assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
Ok(())
}
#[tokio::test]
async fn test_lsn_watermark_no_reuse() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = WriteAheadLog::new(store, prefix);
let mut seen_lsns = std::collections::HashSet::new();
for i in 1..=50 {
wal.append(Mutation::InsertVertex {
vid: Vid::new(i),
properties: HashMap::new(),
labels: vec![],
})?;
let lsn = wal.flush().await?;
assert!(
!seen_lsns.contains(&lsn),
"LSN {} was reused! This violates monotonicity.",
lsn
);
seen_lsns.insert(lsn);
assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
}
Ok(())
}
#[tokio::test]
async fn test_truncate_scalability() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = WriteAheadLog::new(store, prefix);
for i in 1..=100 {
let mutation = Mutation::InsertVertex {
vid: Vid::new(i),
properties: HashMap::new(),
labels: vec![],
};
wal.append(mutation)?;
wal.flush().await?;
}
let start = std::time::Instant::now();
wal.truncate_before(50).await?;
let duration = start.elapsed();
let mutations = wal.replay().await?;
assert_eq!(
mutations.len(),
50,
"Should have 50 mutations remaining (51-100)"
);
assert!(
duration.as_millis() < 1000,
"truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
duration.as_millis()
);
Ok(())
}
#[tokio::test]
async fn test_replay_since_skips_old_segments() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = WriteAheadLog::new(store, prefix);
for i in 1..=100 {
let mutation = Mutation::InsertVertex {
vid: Vid::new(i),
properties: HashMap::new(),
labels: vec![],
};
wal.append(mutation)?;
wal.flush().await?;
}
let start = std::time::Instant::now();
let mutations = wal.replay_since(90).await?;
let duration = start.elapsed();
assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
assert!(
duration.as_millis() < 500,
"replay_since took {}ms, expected < 500ms (should skip by filename)",
duration.as_millis()
);
Ok(())
}
#[tokio::test]
async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = Arc::new(WriteAheadLog::new(store, prefix));
wal.append(Mutation::InsertVertex {
vid: Vid::new(42),
properties: {
let mut props = HashMap::new();
props.insert(
"name".to_string(),
uni_common::Value::String("Alice".to_string()),
);
props
},
labels: vec!["Person".to_string(), "User".to_string()],
})?;
wal.flush().await?;
let mutations = wal.replay().await?;
assert_eq!(mutations.len(), 1);
if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
assert_eq!(vid.as_u64(), 42);
assert_eq!(labels.len(), 2);
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"User".to_string()));
} else {
panic!("Expected InsertVertex mutation");
}
Ok(())
}
#[tokio::test]
async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = Arc::new(WriteAheadLog::new(store, prefix));
wal.append(Mutation::DeleteVertex {
vid: Vid::new(99),
labels: vec!["Person".to_string(), "Admin".to_string()],
})?;
wal.flush().await?;
let mutations = wal.replay().await?;
assert_eq!(mutations.len(), 1);
if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
assert_eq!(vid.as_u64(), 99);
assert_eq!(labels.len(), 2);
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"Admin".to_string()));
} else {
panic!("Expected DeleteVertex mutation");
}
Ok(())
}
#[tokio::test]
async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let wal = Arc::new(WriteAheadLog::new(store, prefix));
wal.append(Mutation::InsertEdge {
src_vid: Vid::new(1),
dst_vid: Vid::new(2),
edge_type: 100,
eid: Eid::new(500),
version: 1,
properties: {
let mut props = HashMap::new();
props.insert("since".to_string(), uni_common::Value::Int(2020));
props
},
edge_type_name: Some("KNOWS".to_string()),
})?;
wal.flush().await?;
let mutations = wal.replay().await?;
assert_eq!(mutations.len(), 1);
if let Mutation::InsertEdge {
eid,
edge_type_name,
..
} = &mutations[0]
{
assert_eq!(eid.as_u64(), 500);
assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
} else {
panic!("Expected InsertEdge mutation");
}
Ok(())
}
#[tokio::test]
async fn test_wal_backward_compatibility_labels() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let prefix = Path::from("wal");
let old_format_json = r#"{
"lsn": 1,
"mutations": [
{
"InsertVertex": {
"vid": 123,
"properties": {}
}
}
]
}"#;
let path = prefix.clone().join("00000000000000000001_test.wal");
store.put(&path, old_format_json.into()).await?;
let wal = WriteAheadLog::new(store, prefix);
let mutations = wal.replay().await?;
assert_eq!(mutations.len(), 1);
if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
assert_eq!(vid.as_u64(), 123);
assert_eq!(
labels.len(),
0,
"Old format should deserialize with empty labels"
);
} else {
panic!("Expected InsertVertex mutation");
}
Ok(())
}
#[test]
fn wal_segment_ref_serializes_identically() {
let mut props = HashMap::new();
props.insert("p".to_string(), uni_common::Value::Int(7));
let mutations = vec![
Mutation::InsertVertex {
vid: Vid::new(1),
properties: props,
labels: vec!["L".to_string()],
},
Mutation::DeleteEdge {
eid: Eid::new(2),
src_vid: Vid::new(1),
dst_vid: Vid::new(3),
edge_type: 4,
version: 5,
},
];
let owned = WalSegment {
lsn: 42,
mutations: mutations.clone(),
};
let borrowed = WalSegmentRef {
lsn: 42,
mutations: &mutations,
};
assert_eq!(
serde_json::to_vec(&owned).unwrap(),
serde_json::to_vec(&borrowed).unwrap()
);
}
}