1use crate::store_utils::{
5 DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22 let filename = path.filename()?;
23 if filename.len() < 20 {
24 return None;
25 }
26 filename.get(..20).and_then(|s| s.parse::<u64>().ok())
29}
30
31const WAL_V2_MAGIC: &[u8] = b"UNIWAL2\n";
37
38const WAL_V2_HASH_HEX_LEN: usize = 64;
40
41fn encode_segment_envelope(payload_json: &[u8]) -> Vec<u8> {
43 let hash = blake3::hash(payload_json);
44 let mut out =
45 Vec::with_capacity(WAL_V2_MAGIC.len() + WAL_V2_HASH_HEX_LEN + 1 + payload_json.len());
46 out.extend_from_slice(WAL_V2_MAGIC);
47 out.extend_from_slice(hash.to_hex().as_bytes());
48 out.push(b'\n');
49 out.extend_from_slice(payload_json);
50 out
51}
52
53#[doc(hidden)]
63pub fn decode_segment(bytes: &[u8]) -> std::result::Result<WalSegment, String> {
64 if let Some(rest) = bytes.strip_prefix(WAL_V2_MAGIC) {
65 if rest.len() < WAL_V2_HASH_HEX_LEN + 1 || rest[WAL_V2_HASH_HEX_LEN] != b'\n' {
66 return Err("truncated v2 segment header".to_string());
67 }
68 let (hex, payload_nl) = rest.split_at(WAL_V2_HASH_HEX_LEN);
69 let payload = &payload_nl[1..];
70 let expected =
71 std::str::from_utf8(hex).map_err(|_| "non-utf8 checksum header".to_string())?;
72 let actual = blake3::hash(payload);
73 if actual.to_hex().as_str() != expected {
74 return Err(format!(
75 "checksum mismatch (expected {expected}, computed {})",
76 actual.to_hex()
77 ));
78 }
79 serde_json::from_slice(payload).map_err(|e| format!("v2 payload parse: {e}"))
80 } else {
81 serde_json::from_slice(bytes).map_err(|e| format!("legacy segment parse: {e}"))
83 }
84}
85
86#[cfg(test)]
90pub(crate) static FAIL_NEXT_FSYNC: std::sync::atomic::AtomicBool =
91 std::sync::atomic::AtomicBool::new(false);
92
93pub(crate) fn sync_file_and_parent(path: &std::path::Path) -> std::io::Result<()> {
101 std::fs::File::open(path)?.sync_all()?;
102 #[cfg(unix)]
103 if let Some(dir) = path.parent() {
104 std::fs::File::open(dir)?.sync_all()?;
105 }
106 Ok(())
107}
108
109mod cv_props {
125 use base64::Engine;
126 use serde::{Deserialize, Deserializer, Serialize, Serializer};
127 use std::collections::HashMap;
128 use uni_common::{Properties, Value};
129
130 const CV_PREFIX: &str = "\u{1}uni_cv:";
133
134 pub fn serialize<S: Serializer>(props: &Properties, s: S) -> Result<S::Ok, S::Error> {
135 let engine = base64::engine::general_purpose::STANDARD;
136 let encoded: HashMap<&String, String> = props
137 .iter()
138 .map(|(k, v)| {
139 let bytes = uni_common::cypher_value_codec::encode(v);
140 (k, format!("{CV_PREFIX}{}", engine.encode(bytes)))
141 })
142 .collect();
143 encoded.serialize(s)
144 }
145
146 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Properties, D::Error> {
147 let raw: HashMap<String, serde_json::Value> = HashMap::deserialize(d)?;
148 let engine = base64::engine::general_purpose::STANDARD;
149 let mut out = Properties::with_capacity(raw.len());
150 for (k, jv) in raw {
151 let value = match &jv {
152 serde_json::Value::String(s) if s.starts_with(CV_PREFIX) => {
153 let bytes = engine
154 .decode(&s[CV_PREFIX.len()..])
155 .map_err(serde::de::Error::custom)?;
156 uni_common::cypher_value_codec::decode(&bytes)
157 .map_err(serde::de::Error::custom)?
158 }
159 _ => serde_json::from_value::<Value>(jv).map_err(serde::de::Error::custom)?,
163 };
164 out.insert(k, value);
165 }
166 Ok(out)
167 }
168}
169
170#[derive(Serialize, Deserialize, Debug, Clone)]
171pub enum Mutation {
172 InsertEdge {
173 src_vid: Vid,
174 dst_vid: Vid,
175 edge_type: u32,
176 eid: Eid,
177 version: u64,
178 #[serde(with = "cv_props")]
179 properties: Properties,
180 #[serde(default)]
182 edge_type_name: Option<String>,
183 },
184 DeleteEdge {
185 eid: Eid,
186 src_vid: Vid,
187 dst_vid: Vid,
188 edge_type: u32,
189 version: u64,
190 },
191 InsertVertex {
192 vid: Vid,
193 #[serde(with = "cv_props")]
194 properties: Properties,
195 #[serde(default)]
196 labels: Vec<String>,
197 },
198 DeleteVertex {
199 vid: Vid,
200 #[serde(default)]
201 labels: Vec<String>,
202 },
203 SetVertexLabels { vid: Vid, labels: Vec<String> },
209}
210
211#[derive(Serialize, Deserialize, Debug, Clone)]
213pub struct WalSegment {
214 pub lsn: u64,
216 pub mutations: Vec<Mutation>,
218}
219
220#[derive(Serialize, Debug)]
226struct WalSegmentRef<'a> {
227 lsn: u64,
228 mutations: &'a [Mutation],
229}
230
231pub struct WriteAheadLog {
232 store: Arc<dyn ObjectStore>,
233 prefix: Path,
234 local_root: Option<std::path::PathBuf>,
241 state: Mutex<WalState>,
242}
243
244struct WalState {
245 buffer: Vec<Mutation>,
246 next_lsn: u64,
248 flushed_lsn: u64,
250}
251
252impl WriteAheadLog {
253 pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
254 Self {
255 store,
256 prefix,
257 local_root: None,
258 state: Mutex::new(WalState {
259 buffer: Vec::new(),
260 next_lsn: 1, flushed_lsn: 0,
262 }),
263 }
264 }
265
266 #[must_use]
269 pub fn with_local_root(mut self, local_root: Option<std::path::PathBuf>) -> Self {
270 self.local_root = local_root;
271 self
272 }
273
274 pub async fn initialize(&self) -> Result<u64> {
276 let max_lsn = self.find_max_lsn().await?;
277 {
278 let mut state = acquire_mutex(&self.state, "wal_state")?;
279 state.next_lsn = max_lsn + 1;
280 state.flushed_lsn = max_lsn;
281 }
282 Ok(max_lsn)
283 }
284
285 async fn find_max_lsn(&self) -> Result<u64> {
288 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
289 let mut max_lsn: u64 = 0;
290
291 for meta in metas {
292 if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
294 max_lsn = max_lsn.max(lsn);
295 } else {
296 warn!(
298 path = %meta.location,
299 "WAL filename doesn't match expected format, downloading segment"
300 );
301 let get_result =
302 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
303 let bytes = get_result.bytes().await?;
304 if bytes.is_empty() {
305 continue;
306 }
307 match decode_segment(&bytes) {
311 Ok(segment) => max_lsn = max_lsn.max(segment.lsn),
312 Err(reason) => {
313 warn!(path = %meta.location, reason = %reason,
314 "Skipping corrupt WAL segment during max-LSN probe");
315 }
316 }
317 }
318 }
319
320 Ok(max_lsn)
321 }
322
323 #[instrument(skip(self, mutation), level = "trace")]
324 pub fn append(&self, mutation: Mutation) -> Result<()> {
325 let mut state = acquire_mutex(&self.state, "wal_state")?;
326 state.buffer.push(mutation);
327 metrics::counter!("uni_wal_entries_total").increment(1);
328 Ok(())
329 }
330
331 #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
333 pub async fn flush(&self) -> Result<u64> {
334 let start = std::time::Instant::now();
335 let (batch, lsn) = {
336 let mut state = acquire_mutex(&self.state, "wal_state")?;
337 if state.buffer.is_empty() {
338 return Ok(state.flushed_lsn);
339 }
340 let lsn = state.next_lsn;
341 state.next_lsn += 1;
342 (std::mem::take(&mut state.buffer), lsn)
343 };
344
345 tracing::Span::current().record("lsn", lsn);
346 tracing::Span::current().record("mutations_count", batch.len());
347
348 let segment = WalSegmentRef {
353 lsn,
354 mutations: &batch,
355 };
356
357 let json = match serde_json::to_vec(&segment) {
359 Ok(j) => j,
360 Err(e) => {
361 warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
362 let mut state = acquire_mutex(&self.state, "wal_state")?;
364 let new_mutations = std::mem::take(&mut state.buffer);
365 state.buffer = batch;
366 state.buffer.extend(new_mutations);
367 return Err(e.into());
369 }
370 };
371 let body = encode_segment_envelope(&json);
374 tracing::Span::current().record("size_bytes", body.len());
375 metrics::counter!("uni_wal_bytes_written_total").increment(body.len() as u64);
376
377 let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
379 let path = self.prefix.clone().join(filename);
380
381 if let Err(e) = put_with_timeout(&self.store, &path, body.into(), DEFAULT_TIMEOUT).await {
383 warn!(
384 lsn,
385 error = %e,
386 "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
387 );
388 let mut state = acquire_mutex(&self.state, "wal_state")?;
390 let new_mutations = std::mem::take(&mut state.buffer);
392 state.buffer = batch;
393 state.buffer.extend(new_mutations);
394 return Err(e);
397 }
398
399 if let Some(root) = &self.local_root {
406 let file_path = root.join(path.as_ref());
407 #[cfg(test)]
408 let synced = if FAIL_NEXT_FSYNC.swap(false, std::sync::atomic::Ordering::SeqCst) {
409 Ok(Err(std::io::Error::other("injected fsync failure")))
410 } else {
411 tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await
412 };
413 #[cfg(not(test))]
414 let synced =
415 tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await;
416 let fsync_err: Option<anyhow::Error> = match synced {
417 Ok(Ok(())) => None,
418 Ok(Err(e)) => Some(e.into()),
419 Err(e) => Some(e.into()),
420 };
421 if let Some(err) = fsync_err {
422 warn!(
423 lsn,
424 error = %err,
425 "WAL segment fsync failed — deleting the non-durable segment to avoid a ghost commit on replay"
426 );
427 if let Err(del_err) = delete_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await
432 {
433 return Err(anyhow::anyhow!(
434 "WAL segment fsync failed ({err}) and the cleanup delete \
435 of segment at lsn {lsn} also failed ({del_err}); the WAL \
436 may contain a non-durable segment"
437 ));
438 }
439 return Err(err);
440 }
441 }
442
443 {
445 let mut state = acquire_mutex(&self.state, "wal_state")?;
446 state.flushed_lsn = lsn;
447 }
448
449 let duration = start.elapsed();
450 metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
451 metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
452
453 if duration.as_millis() > 100 {
454 warn!(
455 lsn,
456 duration_ms = duration.as_millis(),
457 "Slow WAL flush detected"
458 );
459 } else {
460 debug!(
461 lsn,
462 duration_ms = duration.as_millis(),
463 "WAL flush completed"
464 );
465 }
466
467 Ok(lsn)
468 }
469
470 pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
476 let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
477 Ok(guard.flushed_lsn)
478 }
479
480 #[instrument(skip(self), level = "debug")]
491 pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
492 let start = std::time::Instant::now();
493 debug!(high_water_mark, "Replaying WAL segments");
494 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
495 let mut mutations = Vec::new();
496
497 let mut paths: Vec<_> = metas
500 .into_iter()
501 .map(|m| m.location)
502 .filter(|p| {
503 parse_lsn_from_filename(p).is_none_or(|lsn| lsn > high_water_mark)
506 })
507 .collect();
508 paths.sort();
509
510 let mut segments_replayed = 0;
511
512 for (idx, path) in paths.iter().enumerate() {
513 let get_result = get_with_timeout(&self.store, path, DEFAULT_TIMEOUT).await?;
514 let bytes = get_result.bytes().await?;
515
516 let decoded = if bytes.is_empty() {
518 Err("empty segment file".to_string())
519 } else {
520 decode_segment(&bytes)
521 };
522
523 let segment = match decoded {
524 Ok(segment) => segment,
525 Err(reason) => {
526 let is_tail = idx + 1 == paths.len();
527 if is_tail {
528 warn!(
529 path = %path,
530 reason = %reason,
531 "Corrupt tail WAL segment — torn write from a crash; \
532 treating as end of WAL (the commit was never acknowledged)"
533 );
534 break;
535 }
536 return Err(anyhow::anyhow!(
537 "corrupt WAL segment '{path}' ({reason}) with {} later segment(s) \
538 present; refusing to skip — manual inspection required",
539 paths.len() - idx - 1
540 ));
541 }
542 };
543
544 if segment.lsn > high_water_mark {
546 mutations.extend(segment.mutations);
547 segments_replayed += 1;
548 }
549 }
550
551 info!(
552 segments_replayed,
553 mutations_count = mutations.len(),
554 "WAL replay completed"
555 );
556 metrics::histogram!("uni_wal_replay_duration_seconds")
557 .record(start.elapsed().as_secs_f64());
558
559 Ok(mutations)
560 }
561
562 pub async fn replay(&self) -> Result<Vec<Mutation>> {
564 self.replay_since(0).await
565 }
566
567 #[instrument(skip(self), level = "info")]
570 pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
571 info!(high_water_mark, "Truncating WAL segments");
572 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
573
574 let mut deleted_count = 0;
575 for meta in metas {
576 let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
578 lsn <= high_water_mark
579 } else {
580 warn!(
582 path = %meta.location,
583 "WAL filename doesn't match expected format, downloading segment"
584 );
585 let get_result =
586 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
587 let bytes = get_result.bytes().await?;
588 if bytes.is_empty() {
589 true
591 } else {
592 match decode_segment(&bytes) {
593 Ok(segment) => segment.lsn <= high_water_mark,
594 Err(reason) => {
595 warn!(path = %meta.location, reason = %reason,
599 "Keeping corrupt WAL segment during truncation");
600 false
601 }
602 }
603 }
604 };
605
606 if should_delete {
607 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
608 deleted_count += 1;
609 }
610 }
611 info!(deleted_count, "WAL truncation completed");
612 Ok(())
613 }
614
615 pub async fn has_segments(&self) -> Result<bool> {
617 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
618 Ok(!metas.is_empty())
619 }
620
621 pub async fn truncate(&self) -> Result<()> {
622 info!("Truncating all WAL segments");
623 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
624
625 let mut deleted_count = 0;
626 for meta in metas {
627 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
628 deleted_count += 1;
629 }
630 info!(deleted_count, "Full WAL truncation completed");
631 Ok(())
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638 use object_store::ObjectStoreExt;
639 use object_store::local::LocalFileSystem;
640 use std::collections::HashMap;
641 use tempfile::tempdir;
642
643 #[tokio::test]
644 async fn test_wal_append_replay() -> Result<()> {
645 let dir = tempdir()?;
646 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
647 let prefix = Path::from("wal");
648
649 let wal = WriteAheadLog::new(store, prefix);
650
651 let mutation = Mutation::InsertVertex {
652 vid: Vid::new(1),
653 properties: HashMap::new(),
654 labels: vec![],
655 };
656
657 wal.append(mutation)?;
658 wal.flush().await?;
659
660 let mutations = wal.replay().await?;
661 assert_eq!(mutations.len(), 1);
662 if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
663 assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
664 } else {
665 panic!("Wrong mutation type");
666 }
667
668 wal.truncate().await?;
669 let mutations2 = wal.replay().await?;
670 assert_eq!(mutations2.len(), 0);
671
672 Ok(())
673 }
674
675 #[tokio::test]
676 async fn test_lsn_monotonicity() -> Result<()> {
677 let dir = tempdir()?;
679 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
680 let prefix = Path::from("wal");
681
682 let wal = WriteAheadLog::new(store, prefix);
683
684 let mutation1 = Mutation::InsertVertex {
685 vid: Vid::new(1),
686 properties: HashMap::new(),
687 labels: vec![],
688 };
689 let mutation2 = Mutation::InsertVertex {
690 vid: Vid::new(2),
691 properties: HashMap::new(),
692 labels: vec![],
693 };
694 let mutation3 = Mutation::InsertVertex {
695 vid: Vid::new(3),
696 properties: HashMap::new(),
697 labels: vec![],
698 };
699
700 wal.append(mutation1)?;
702 let lsn1 = wal.flush().await?;
703
704 wal.append(mutation2)?;
706 let lsn2 = wal.flush().await?;
707
708 wal.append(mutation3)?;
710 let lsn3 = wal.flush().await?;
711
712 assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
714 assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
715
716 assert_eq!(lsn2, lsn1 + 1);
718 assert_eq!(lsn3, lsn2 + 1);
719
720 Ok(())
721 }
722
723 #[tokio::test]
727 async fn fsync_failure_deletes_segment_no_ghost_commit() -> Result<()> {
728 let dir = tempdir()?;
729 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
730 let prefix = Path::from("wal");
731 let wal = WriteAheadLog::new(store, prefix).with_local_root(Some(dir.path().to_path_buf()));
733
734 wal.append(Mutation::InsertVertex {
735 vid: Vid::new(1),
736 properties: HashMap::new(),
737 labels: vec![],
738 })?;
739
740 FAIL_NEXT_FSYNC.store(true, std::sync::atomic::Ordering::SeqCst);
742 let result = wal.flush().await;
743 assert!(
744 result.is_err(),
745 "flush must report failure when the segment fsync fails"
746 );
747
748 let replayed = wal.replay().await?;
750 assert!(
751 replayed.is_empty(),
752 "a segment whose fsync failed must not be replayable (ghost commit); got {} mutations",
753 replayed.len()
754 );
755 Ok(())
756 }
757
758 #[test]
759 fn test_parse_lsn_from_filename() {
760 let path = Path::from("00000000000000000042_a1b2c3d4.wal");
762 assert_eq!(parse_lsn_from_filename(&path), Some(42));
763
764 let path = Path::from("00000000000000001234_e5f6a7b8.wal");
765 assert_eq!(parse_lsn_from_filename(&path), Some(1234));
766
767 let path = Path::from("00000000000000000001_xyz.wal");
769 assert_eq!(parse_lsn_from_filename(&path), Some(1));
770
771 let path = Path::from("12345678901234567890_uuid.wal");
773 assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
774
775 let path = Path::from("invalid.wal");
777 assert_eq!(parse_lsn_from_filename(&path), None);
778
779 let path = Path::from("123.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
781
782 let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
784
785 let path = Path::from("00000000000000000100.wal");
787 assert_eq!(parse_lsn_from_filename(&path), Some(100));
788
789 let path = Path::from("");
791 assert_eq!(parse_lsn_from_filename(&path), None);
792 }
793
794 #[test]
808 fn test_parse_lsn_from_filename_multibyte_no_panic() {
809 let name = format!("{}{}.wal", "0".repeat(19), "é"); let path = Path::parse(name).expect("multi-byte segment is a valid object_store path");
811 assert_eq!(parse_lsn_from_filename(&path), None); }
813
814 #[tokio::test]
817 async fn test_find_max_lsn_scalability() -> Result<()> {
818 let dir = tempdir()?;
819 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
820 let prefix = Path::from("wal");
821
822 let wal = WriteAheadLog::new(store, prefix);
823
824 for i in 1..=100 {
826 let mutation = Mutation::InsertVertex {
827 vid: Vid::new(i),
828 properties: HashMap::new(),
829 labels: vec![],
830 };
831 wal.append(mutation)?;
832 wal.flush().await?;
833 }
834
835 let start = std::time::Instant::now();
837 let max_lsn = wal.find_max_lsn().await?;
838 let duration = start.elapsed();
839
840 assert_eq!(max_lsn, 100, "Max LSN should be 100");
842
843 assert!(
845 duration.as_millis() < 1000,
846 "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
847 duration.as_millis()
848 );
849
850 Ok(())
851 }
852
853 #[tokio::test]
855 async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
856 let dir = tempdir()?;
857 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
858 let prefix = Path::from("wal");
859
860 let wal = WriteAheadLog::new(store.clone(), prefix.clone());
861
862 wal.append(Mutation::InsertVertex {
864 vid: Vid::new(1),
865 properties: HashMap::new(),
866 labels: vec![],
867 })?;
868 let lsn1 = wal.flush().await?;
869 assert_eq!(lsn1, 1);
870
871 wal.append(Mutation::InsertVertex {
873 vid: Vid::new(2),
874 properties: HashMap::new(),
875 labels: vec![],
876 })?;
877 let lsn2 = wal.flush().await?;
878 assert_eq!(lsn2, 2);
879
880 wal.append(Mutation::InsertVertex {
887 vid: Vid::new(3),
888 properties: HashMap::new(),
889 labels: vec![],
890 })?;
891
892 wal.append(Mutation::InsertVertex {
894 vid: Vid::new(4),
895 properties: HashMap::new(),
896 labels: vec![],
897 })?;
898 let lsn4 = wal.flush().await?;
899
900 assert_eq!(lsn4, 3, "LSN should increment monotonically");
902
903 let mutations = wal.replay().await?;
905 assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
906
907 Ok(())
908 }
909
910 #[tokio::test]
912 async fn test_lsn_watermark_no_reuse() -> Result<()> {
913 let dir = tempdir()?;
914 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
915 let prefix = Path::from("wal");
916
917 let wal = WriteAheadLog::new(store, prefix);
918
919 let mut seen_lsns = std::collections::HashSet::new();
921
922 for i in 1..=50 {
924 wal.append(Mutation::InsertVertex {
925 vid: Vid::new(i),
926 properties: HashMap::new(),
927 labels: vec![],
928 })?;
929 let lsn = wal.flush().await?;
930
931 assert!(
933 !seen_lsns.contains(&lsn),
934 "LSN {} was reused! This violates monotonicity.",
935 lsn
936 );
937 seen_lsns.insert(lsn);
938
939 assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
941 }
942
943 Ok(())
944 }
945
946 #[tokio::test]
949 async fn test_truncate_scalability() -> Result<()> {
950 let dir = tempdir()?;
951 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
952 let prefix = Path::from("wal");
953
954 let wal = WriteAheadLog::new(store, prefix);
955
956 for i in 1..=100 {
958 let mutation = Mutation::InsertVertex {
959 vid: Vid::new(i),
960 properties: HashMap::new(),
961 labels: vec![],
962 };
963 wal.append(mutation)?;
964 wal.flush().await?;
965 }
966
967 let start = std::time::Instant::now();
969 wal.truncate_before(50).await?;
970 let duration = start.elapsed();
971
972 let mutations = wal.replay().await?;
974 assert_eq!(
975 mutations.len(),
976 50,
977 "Should have 50 mutations remaining (51-100)"
978 );
979
980 assert!(
982 duration.as_millis() < 1000,
983 "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
984 duration.as_millis()
985 );
986
987 Ok(())
988 }
989
990 #[tokio::test]
992 async fn test_replay_since_skips_old_segments() -> Result<()> {
993 let dir = tempdir()?;
994 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
995 let prefix = Path::from("wal");
996
997 let wal = WriteAheadLog::new(store, prefix);
998
999 for i in 1..=100 {
1001 let mutation = Mutation::InsertVertex {
1002 vid: Vid::new(i),
1003 properties: HashMap::new(),
1004 labels: vec![],
1005 };
1006 wal.append(mutation)?;
1007 wal.flush().await?;
1008 }
1009
1010 let start = std::time::Instant::now();
1012 let mutations = wal.replay_since(90).await?;
1013 let duration = start.elapsed();
1014
1015 assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
1017
1018 assert!(
1020 duration.as_millis() < 500,
1021 "replay_since took {}ms, expected < 500ms (should skip by filename)",
1022 duration.as_millis()
1023 );
1024
1025 Ok(())
1026 }
1027
1028 #[tokio::test]
1030 async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
1031 let dir = tempdir()?;
1032 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1033 let prefix = Path::from("wal");
1034
1035 let wal = Arc::new(WriteAheadLog::new(store, prefix));
1036
1037 wal.append(Mutation::InsertVertex {
1039 vid: Vid::new(42),
1040 properties: {
1041 let mut props = HashMap::new();
1042 props.insert(
1043 "name".to_string(),
1044 uni_common::Value::String("Alice".to_string()),
1045 );
1046 props
1047 },
1048 labels: vec!["Person".to_string(), "User".to_string()],
1049 })?;
1050
1051 wal.flush().await?;
1053
1054 let mutations = wal.replay().await?;
1056 assert_eq!(mutations.len(), 1);
1057
1058 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
1060 assert_eq!(vid.as_u64(), 42);
1061 assert_eq!(labels.len(), 2);
1062 assert!(labels.contains(&"Person".to_string()));
1063 assert!(labels.contains(&"User".to_string()));
1064 } else {
1065 panic!("Expected InsertVertex mutation");
1066 }
1067
1068 Ok(())
1069 }
1070
1071 #[tokio::test]
1073 async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
1074 let dir = tempdir()?;
1075 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1076 let prefix = Path::from("wal");
1077
1078 let wal = Arc::new(WriteAheadLog::new(store, prefix));
1079
1080 wal.append(Mutation::DeleteVertex {
1082 vid: Vid::new(99),
1083 labels: vec!["Person".to_string(), "Admin".to_string()],
1084 })?;
1085
1086 wal.flush().await?;
1088
1089 let mutations = wal.replay().await?;
1091 assert_eq!(mutations.len(), 1);
1092
1093 if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
1095 assert_eq!(vid.as_u64(), 99);
1096 assert_eq!(labels.len(), 2);
1097 assert!(labels.contains(&"Person".to_string()));
1098 assert!(labels.contains(&"Admin".to_string()));
1099 } else {
1100 panic!("Expected DeleteVertex mutation");
1101 }
1102
1103 Ok(())
1104 }
1105
1106 #[tokio::test]
1108 async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
1109 let dir = tempdir()?;
1110 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1111 let prefix = Path::from("wal");
1112
1113 let wal = Arc::new(WriteAheadLog::new(store, prefix));
1114
1115 wal.append(Mutation::InsertEdge {
1117 src_vid: Vid::new(1),
1118 dst_vid: Vid::new(2),
1119 edge_type: 100,
1120 eid: Eid::new(500),
1121 version: 1,
1122 properties: {
1123 let mut props = HashMap::new();
1124 props.insert("since".to_string(), uni_common::Value::Int(2020));
1125 props
1126 },
1127 edge_type_name: Some("KNOWS".to_string()),
1128 })?;
1129
1130 wal.flush().await?;
1132
1133 let mutations = wal.replay().await?;
1135 assert_eq!(mutations.len(), 1);
1136
1137 if let Mutation::InsertEdge {
1139 eid,
1140 edge_type_name,
1141 ..
1142 } = &mutations[0]
1143 {
1144 assert_eq!(eid.as_u64(), 500);
1145 assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
1146 } else {
1147 panic!("Expected InsertEdge mutation");
1148 }
1149
1150 Ok(())
1151 }
1152
1153 #[tokio::test]
1155 async fn test_wal_backward_compatibility_labels() -> Result<()> {
1156 let dir = tempdir()?;
1157 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1158 let prefix = Path::from("wal");
1159
1160 let old_format_json = r#"{
1162 "lsn": 1,
1163 "mutations": [
1164 {
1165 "InsertVertex": {
1166 "vid": 123,
1167 "properties": {}
1168 }
1169 }
1170 ]
1171 }"#;
1172
1173 let path = prefix.clone().join("00000000000000000001_test.wal");
1174 store.put(&path, old_format_json.into()).await?;
1175
1176 let wal = WriteAheadLog::new(store, prefix);
1178 let mutations = wal.replay().await?;
1179
1180 assert_eq!(mutations.len(), 1);
1182 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
1183 assert_eq!(vid.as_u64(), 123);
1184 assert_eq!(
1185 labels.len(),
1186 0,
1187 "Old format should deserialize with empty labels"
1188 );
1189 } else {
1190 panic!("Expected InsertVertex mutation");
1191 }
1192
1193 Ok(())
1194 }
1195
1196 #[test]
1200 fn wal_segment_ref_serializes_identically() {
1201 let mut props = HashMap::new();
1202 props.insert("p".to_string(), uni_common::Value::Int(7));
1203 let mutations = vec![
1204 Mutation::InsertVertex {
1205 vid: Vid::new(1),
1206 properties: props,
1207 labels: vec!["L".to_string()],
1208 },
1209 Mutation::DeleteEdge {
1210 eid: Eid::new(2),
1211 src_vid: Vid::new(1),
1212 dst_vid: Vid::new(3),
1213 edge_type: 4,
1214 version: 5,
1215 },
1216 ];
1217 let owned = WalSegment {
1218 lsn: 42,
1219 mutations: mutations.clone(),
1220 };
1221 let borrowed = WalSegmentRef {
1222 lsn: 42,
1223 mutations: &mutations,
1224 };
1225 assert_eq!(
1226 serde_json::to_vec(&owned).unwrap(),
1227 serde_json::to_vec(&borrowed).unwrap()
1228 );
1229 }
1230}