1use crate::store_utils::{
5 DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22 let filename = path.filename()?;
23 if filename.len() < 20 {
24 return None;
25 }
26 filename.get(..20).and_then(|s| s.parse::<u64>().ok())
29}
30
31const WAL_V2_MAGIC: &[u8] = b"UNIWAL2\n";
37
38const WAL_V2_HASH_HEX_LEN: usize = 64;
40
41fn encode_segment_envelope(payload_json: &[u8]) -> Vec<u8> {
43 let hash = blake3::hash(payload_json);
44 let mut out =
45 Vec::with_capacity(WAL_V2_MAGIC.len() + WAL_V2_HASH_HEX_LEN + 1 + payload_json.len());
46 out.extend_from_slice(WAL_V2_MAGIC);
47 out.extend_from_slice(hash.to_hex().as_bytes());
48 out.push(b'\n');
49 out.extend_from_slice(payload_json);
50 out
51}
52
53#[doc(hidden)]
63pub fn decode_segment(bytes: &[u8]) -> std::result::Result<WalSegment, String> {
64 if let Some(rest) = bytes.strip_prefix(WAL_V2_MAGIC) {
65 if rest.len() < WAL_V2_HASH_HEX_LEN + 1 || rest[WAL_V2_HASH_HEX_LEN] != b'\n' {
66 return Err("truncated v2 segment header".to_string());
67 }
68 let (hex, payload_nl) = rest.split_at(WAL_V2_HASH_HEX_LEN);
69 let payload = &payload_nl[1..];
70 let expected =
71 std::str::from_utf8(hex).map_err(|_| "non-utf8 checksum header".to_string())?;
72 let actual = blake3::hash(payload);
73 if actual.to_hex().as_str() != expected {
74 return Err(format!(
75 "checksum mismatch (expected {expected}, computed {})",
76 actual.to_hex()
77 ));
78 }
79 serde_json::from_slice(payload).map_err(|e| format!("v2 payload parse: {e}"))
80 } else {
81 serde_json::from_slice(bytes).map_err(|e| format!("legacy segment parse: {e}"))
83 }
84}
85
86fn sync_file_and_parent(path: &std::path::Path) -> std::io::Result<()> {
91 std::fs::File::open(path)?.sync_all()?;
92 #[cfg(unix)]
93 if let Some(dir) = path.parent() {
94 std::fs::File::open(dir)?.sync_all()?;
95 }
96 Ok(())
97}
98
99#[derive(Serialize, Deserialize, Debug, Clone)]
100pub enum Mutation {
101 InsertEdge {
102 src_vid: Vid,
103 dst_vid: Vid,
104 edge_type: u32,
105 eid: Eid,
106 version: u64,
107 properties: Properties,
108 #[serde(default)]
110 edge_type_name: Option<String>,
111 },
112 DeleteEdge {
113 eid: Eid,
114 src_vid: Vid,
115 dst_vid: Vid,
116 edge_type: u32,
117 version: u64,
118 },
119 InsertVertex {
120 vid: Vid,
121 properties: Properties,
122 #[serde(default)]
123 labels: Vec<String>,
124 },
125 DeleteVertex {
126 vid: Vid,
127 #[serde(default)]
128 labels: Vec<String>,
129 },
130 SetVertexLabels { vid: Vid, labels: Vec<String> },
136}
137
138#[derive(Serialize, Deserialize, Debug, Clone)]
140pub struct WalSegment {
141 pub lsn: u64,
143 pub mutations: Vec<Mutation>,
145}
146
147#[derive(Serialize, Debug)]
153struct WalSegmentRef<'a> {
154 lsn: u64,
155 mutations: &'a [Mutation],
156}
157
158pub struct WriteAheadLog {
159 store: Arc<dyn ObjectStore>,
160 prefix: Path,
161 local_root: Option<std::path::PathBuf>,
168 state: Mutex<WalState>,
169}
170
171struct WalState {
172 buffer: Vec<Mutation>,
173 next_lsn: u64,
175 flushed_lsn: u64,
177}
178
179impl WriteAheadLog {
180 pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
181 Self {
182 store,
183 prefix,
184 local_root: None,
185 state: Mutex::new(WalState {
186 buffer: Vec::new(),
187 next_lsn: 1, flushed_lsn: 0,
189 }),
190 }
191 }
192
193 #[must_use]
196 pub fn with_local_root(mut self, local_root: Option<std::path::PathBuf>) -> Self {
197 self.local_root = local_root;
198 self
199 }
200
201 pub async fn initialize(&self) -> Result<u64> {
203 let max_lsn = self.find_max_lsn().await?;
204 {
205 let mut state = acquire_mutex(&self.state, "wal_state")?;
206 state.next_lsn = max_lsn + 1;
207 state.flushed_lsn = max_lsn;
208 }
209 Ok(max_lsn)
210 }
211
212 async fn find_max_lsn(&self) -> Result<u64> {
215 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
216 let mut max_lsn: u64 = 0;
217
218 for meta in metas {
219 if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
221 max_lsn = max_lsn.max(lsn);
222 } else {
223 warn!(
225 path = %meta.location,
226 "WAL filename doesn't match expected format, downloading segment"
227 );
228 let get_result =
229 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
230 let bytes = get_result.bytes().await?;
231 if bytes.is_empty() {
232 continue;
233 }
234 match decode_segment(&bytes) {
238 Ok(segment) => max_lsn = max_lsn.max(segment.lsn),
239 Err(reason) => {
240 warn!(path = %meta.location, reason = %reason,
241 "Skipping corrupt WAL segment during max-LSN probe");
242 }
243 }
244 }
245 }
246
247 Ok(max_lsn)
248 }
249
250 #[instrument(skip(self, mutation), level = "trace")]
251 pub fn append(&self, mutation: Mutation) -> Result<()> {
252 let mut state = acquire_mutex(&self.state, "wal_state")?;
253 state.buffer.push(mutation);
254 metrics::counter!("uni_wal_entries_total").increment(1);
255 Ok(())
256 }
257
258 #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
260 pub async fn flush(&self) -> Result<u64> {
261 let start = std::time::Instant::now();
262 let (batch, lsn) = {
263 let mut state = acquire_mutex(&self.state, "wal_state")?;
264 if state.buffer.is_empty() {
265 return Ok(state.flushed_lsn);
266 }
267 let lsn = state.next_lsn;
268 state.next_lsn += 1;
269 (std::mem::take(&mut state.buffer), lsn)
270 };
271
272 tracing::Span::current().record("lsn", lsn);
273 tracing::Span::current().record("mutations_count", batch.len());
274
275 let segment = WalSegmentRef {
280 lsn,
281 mutations: &batch,
282 };
283
284 let json = match serde_json::to_vec(&segment) {
286 Ok(j) => j,
287 Err(e) => {
288 warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
289 let mut state = acquire_mutex(&self.state, "wal_state")?;
291 let new_mutations = std::mem::take(&mut state.buffer);
292 state.buffer = batch;
293 state.buffer.extend(new_mutations);
294 return Err(e.into());
296 }
297 };
298 let body = encode_segment_envelope(&json);
301 tracing::Span::current().record("size_bytes", body.len());
302 metrics::counter!("uni_wal_bytes_written_total").increment(body.len() as u64);
303
304 let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
306 let path = self.prefix.clone().join(filename);
307
308 if let Err(e) = put_with_timeout(&self.store, &path, body.into(), DEFAULT_TIMEOUT).await {
310 warn!(
311 lsn,
312 error = %e,
313 "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
314 );
315 let mut state = acquire_mutex(&self.state, "wal_state")?;
317 let new_mutations = std::mem::take(&mut state.buffer);
319 state.buffer = batch;
320 state.buffer.extend(new_mutations);
321 return Err(e);
324 }
325
326 if let Some(root) = &self.local_root {
332 let file_path = root.join(path.as_ref());
333 let synced =
334 tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await;
335 match synced {
336 Ok(Ok(())) => {}
337 Ok(Err(e)) => {
338 warn!(lsn, error = %e, "WAL segment fsync failed — durability not guaranteed");
339 return Err(e.into());
340 }
341 Err(e) => {
342 warn!(lsn, error = %e, "WAL fsync task failed");
343 return Err(e.into());
344 }
345 }
346 }
347
348 {
350 let mut state = acquire_mutex(&self.state, "wal_state")?;
351 state.flushed_lsn = lsn;
352 }
353
354 let duration = start.elapsed();
355 metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
356 metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
357
358 if duration.as_millis() > 100 {
359 warn!(
360 lsn,
361 duration_ms = duration.as_millis(),
362 "Slow WAL flush detected"
363 );
364 } else {
365 debug!(
366 lsn,
367 duration_ms = duration.as_millis(),
368 "WAL flush completed"
369 );
370 }
371
372 Ok(lsn)
373 }
374
375 pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
381 let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
382 Ok(guard.flushed_lsn)
383 }
384
385 #[instrument(skip(self), level = "debug")]
396 pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
397 let start = std::time::Instant::now();
398 debug!(high_water_mark, "Replaying WAL segments");
399 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
400 let mut mutations = Vec::new();
401
402 let mut paths: Vec<_> = metas
405 .into_iter()
406 .map(|m| m.location)
407 .filter(|p| {
408 parse_lsn_from_filename(p).is_none_or(|lsn| lsn > high_water_mark)
411 })
412 .collect();
413 paths.sort();
414
415 let mut segments_replayed = 0;
416
417 for (idx, path) in paths.iter().enumerate() {
418 let get_result = get_with_timeout(&self.store, path, DEFAULT_TIMEOUT).await?;
419 let bytes = get_result.bytes().await?;
420
421 let decoded = if bytes.is_empty() {
423 Err("empty segment file".to_string())
424 } else {
425 decode_segment(&bytes)
426 };
427
428 let segment = match decoded {
429 Ok(segment) => segment,
430 Err(reason) => {
431 let is_tail = idx + 1 == paths.len();
432 if is_tail {
433 warn!(
434 path = %path,
435 reason = %reason,
436 "Corrupt tail WAL segment — torn write from a crash; \
437 treating as end of WAL (the commit was never acknowledged)"
438 );
439 break;
440 }
441 return Err(anyhow::anyhow!(
442 "corrupt WAL segment '{path}' ({reason}) with {} later segment(s) \
443 present; refusing to skip — manual inspection required",
444 paths.len() - idx - 1
445 ));
446 }
447 };
448
449 if segment.lsn > high_water_mark {
451 mutations.extend(segment.mutations);
452 segments_replayed += 1;
453 }
454 }
455
456 info!(
457 segments_replayed,
458 mutations_count = mutations.len(),
459 "WAL replay completed"
460 );
461 metrics::histogram!("uni_wal_replay_duration_seconds")
462 .record(start.elapsed().as_secs_f64());
463
464 Ok(mutations)
465 }
466
467 pub async fn replay(&self) -> Result<Vec<Mutation>> {
469 self.replay_since(0).await
470 }
471
472 #[instrument(skip(self), level = "info")]
475 pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
476 info!(high_water_mark, "Truncating WAL segments");
477 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
478
479 let mut deleted_count = 0;
480 for meta in metas {
481 let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
483 lsn <= high_water_mark
484 } else {
485 warn!(
487 path = %meta.location,
488 "WAL filename doesn't match expected format, downloading segment"
489 );
490 let get_result =
491 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
492 let bytes = get_result.bytes().await?;
493 if bytes.is_empty() {
494 true
496 } else {
497 match decode_segment(&bytes) {
498 Ok(segment) => segment.lsn <= high_water_mark,
499 Err(reason) => {
500 warn!(path = %meta.location, reason = %reason,
504 "Keeping corrupt WAL segment during truncation");
505 false
506 }
507 }
508 }
509 };
510
511 if should_delete {
512 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
513 deleted_count += 1;
514 }
515 }
516 info!(deleted_count, "WAL truncation completed");
517 Ok(())
518 }
519
520 pub async fn has_segments(&self) -> Result<bool> {
522 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
523 Ok(!metas.is_empty())
524 }
525
526 pub async fn truncate(&self) -> Result<()> {
527 info!("Truncating all WAL segments");
528 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
529
530 let mut deleted_count = 0;
531 for meta in metas {
532 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
533 deleted_count += 1;
534 }
535 info!(deleted_count, "Full WAL truncation completed");
536 Ok(())
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use object_store::ObjectStoreExt;
544 use object_store::local::LocalFileSystem;
545 use std::collections::HashMap;
546 use tempfile::tempdir;
547
548 #[tokio::test]
549 async fn test_wal_append_replay() -> Result<()> {
550 let dir = tempdir()?;
551 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
552 let prefix = Path::from("wal");
553
554 let wal = WriteAheadLog::new(store, prefix);
555
556 let mutation = Mutation::InsertVertex {
557 vid: Vid::new(1),
558 properties: HashMap::new(),
559 labels: vec![],
560 };
561
562 wal.append(mutation)?;
563 wal.flush().await?;
564
565 let mutations = wal.replay().await?;
566 assert_eq!(mutations.len(), 1);
567 if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
568 assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
569 } else {
570 panic!("Wrong mutation type");
571 }
572
573 wal.truncate().await?;
574 let mutations2 = wal.replay().await?;
575 assert_eq!(mutations2.len(), 0);
576
577 Ok(())
578 }
579
580 #[tokio::test]
581 async fn test_lsn_monotonicity() -> Result<()> {
582 let dir = tempdir()?;
584 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
585 let prefix = Path::from("wal");
586
587 let wal = WriteAheadLog::new(store, prefix);
588
589 let mutation1 = Mutation::InsertVertex {
590 vid: Vid::new(1),
591 properties: HashMap::new(),
592 labels: vec![],
593 };
594 let mutation2 = Mutation::InsertVertex {
595 vid: Vid::new(2),
596 properties: HashMap::new(),
597 labels: vec![],
598 };
599 let mutation3 = Mutation::InsertVertex {
600 vid: Vid::new(3),
601 properties: HashMap::new(),
602 labels: vec![],
603 };
604
605 wal.append(mutation1)?;
607 let lsn1 = wal.flush().await?;
608
609 wal.append(mutation2)?;
611 let lsn2 = wal.flush().await?;
612
613 wal.append(mutation3)?;
615 let lsn3 = wal.flush().await?;
616
617 assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
619 assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
620
621 assert_eq!(lsn2, lsn1 + 1);
623 assert_eq!(lsn3, lsn2 + 1);
624
625 Ok(())
626 }
627
628 #[test]
629 fn test_parse_lsn_from_filename() {
630 let path = Path::from("00000000000000000042_a1b2c3d4.wal");
632 assert_eq!(parse_lsn_from_filename(&path), Some(42));
633
634 let path = Path::from("00000000000000001234_e5f6a7b8.wal");
635 assert_eq!(parse_lsn_from_filename(&path), Some(1234));
636
637 let path = Path::from("00000000000000000001_xyz.wal");
639 assert_eq!(parse_lsn_from_filename(&path), Some(1));
640
641 let path = Path::from("12345678901234567890_uuid.wal");
643 assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
644
645 let path = Path::from("invalid.wal");
647 assert_eq!(parse_lsn_from_filename(&path), None);
648
649 let path = Path::from("123.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
651
652 let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
654
655 let path = Path::from("00000000000000000100.wal");
657 assert_eq!(parse_lsn_from_filename(&path), Some(100));
658
659 let path = Path::from("");
661 assert_eq!(parse_lsn_from_filename(&path), None);
662 }
663
664 #[test]
678 fn test_parse_lsn_from_filename_multibyte_no_panic() {
679 let name = format!("{}{}.wal", "0".repeat(19), "é"); let path = Path::parse(name).expect("multi-byte segment is a valid object_store path");
681 assert_eq!(parse_lsn_from_filename(&path), None); }
683
684 #[tokio::test]
687 async fn test_find_max_lsn_scalability() -> Result<()> {
688 let dir = tempdir()?;
689 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
690 let prefix = Path::from("wal");
691
692 let wal = WriteAheadLog::new(store, prefix);
693
694 for i in 1..=100 {
696 let mutation = Mutation::InsertVertex {
697 vid: Vid::new(i),
698 properties: HashMap::new(),
699 labels: vec![],
700 };
701 wal.append(mutation)?;
702 wal.flush().await?;
703 }
704
705 let start = std::time::Instant::now();
707 let max_lsn = wal.find_max_lsn().await?;
708 let duration = start.elapsed();
709
710 assert_eq!(max_lsn, 100, "Max LSN should be 100");
712
713 assert!(
715 duration.as_millis() < 1000,
716 "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
717 duration.as_millis()
718 );
719
720 Ok(())
721 }
722
723 #[tokio::test]
725 async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
726 let dir = tempdir()?;
727 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
728 let prefix = Path::from("wal");
729
730 let wal = WriteAheadLog::new(store.clone(), prefix.clone());
731
732 wal.append(Mutation::InsertVertex {
734 vid: Vid::new(1),
735 properties: HashMap::new(),
736 labels: vec![],
737 })?;
738 let lsn1 = wal.flush().await?;
739 assert_eq!(lsn1, 1);
740
741 wal.append(Mutation::InsertVertex {
743 vid: Vid::new(2),
744 properties: HashMap::new(),
745 labels: vec![],
746 })?;
747 let lsn2 = wal.flush().await?;
748 assert_eq!(lsn2, 2);
749
750 wal.append(Mutation::InsertVertex {
757 vid: Vid::new(3),
758 properties: HashMap::new(),
759 labels: vec![],
760 })?;
761
762 wal.append(Mutation::InsertVertex {
764 vid: Vid::new(4),
765 properties: HashMap::new(),
766 labels: vec![],
767 })?;
768 let lsn4 = wal.flush().await?;
769
770 assert_eq!(lsn4, 3, "LSN should increment monotonically");
772
773 let mutations = wal.replay().await?;
775 assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
776
777 Ok(())
778 }
779
780 #[tokio::test]
782 async fn test_lsn_watermark_no_reuse() -> Result<()> {
783 let dir = tempdir()?;
784 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
785 let prefix = Path::from("wal");
786
787 let wal = WriteAheadLog::new(store, prefix);
788
789 let mut seen_lsns = std::collections::HashSet::new();
791
792 for i in 1..=50 {
794 wal.append(Mutation::InsertVertex {
795 vid: Vid::new(i),
796 properties: HashMap::new(),
797 labels: vec![],
798 })?;
799 let lsn = wal.flush().await?;
800
801 assert!(
803 !seen_lsns.contains(&lsn),
804 "LSN {} was reused! This violates monotonicity.",
805 lsn
806 );
807 seen_lsns.insert(lsn);
808
809 assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
811 }
812
813 Ok(())
814 }
815
816 #[tokio::test]
819 async fn test_truncate_scalability() -> Result<()> {
820 let dir = tempdir()?;
821 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
822 let prefix = Path::from("wal");
823
824 let wal = WriteAheadLog::new(store, prefix);
825
826 for i in 1..=100 {
828 let mutation = Mutation::InsertVertex {
829 vid: Vid::new(i),
830 properties: HashMap::new(),
831 labels: vec![],
832 };
833 wal.append(mutation)?;
834 wal.flush().await?;
835 }
836
837 let start = std::time::Instant::now();
839 wal.truncate_before(50).await?;
840 let duration = start.elapsed();
841
842 let mutations = wal.replay().await?;
844 assert_eq!(
845 mutations.len(),
846 50,
847 "Should have 50 mutations remaining (51-100)"
848 );
849
850 assert!(
852 duration.as_millis() < 1000,
853 "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
854 duration.as_millis()
855 );
856
857 Ok(())
858 }
859
860 #[tokio::test]
862 async fn test_replay_since_skips_old_segments() -> Result<()> {
863 let dir = tempdir()?;
864 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
865 let prefix = Path::from("wal");
866
867 let wal = WriteAheadLog::new(store, prefix);
868
869 for i in 1..=100 {
871 let mutation = Mutation::InsertVertex {
872 vid: Vid::new(i),
873 properties: HashMap::new(),
874 labels: vec![],
875 };
876 wal.append(mutation)?;
877 wal.flush().await?;
878 }
879
880 let start = std::time::Instant::now();
882 let mutations = wal.replay_since(90).await?;
883 let duration = start.elapsed();
884
885 assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
887
888 assert!(
890 duration.as_millis() < 500,
891 "replay_since took {}ms, expected < 500ms (should skip by filename)",
892 duration.as_millis()
893 );
894
895 Ok(())
896 }
897
898 #[tokio::test]
900 async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
901 let dir = tempdir()?;
902 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
903 let prefix = Path::from("wal");
904
905 let wal = Arc::new(WriteAheadLog::new(store, prefix));
906
907 wal.append(Mutation::InsertVertex {
909 vid: Vid::new(42),
910 properties: {
911 let mut props = HashMap::new();
912 props.insert(
913 "name".to_string(),
914 uni_common::Value::String("Alice".to_string()),
915 );
916 props
917 },
918 labels: vec!["Person".to_string(), "User".to_string()],
919 })?;
920
921 wal.flush().await?;
923
924 let mutations = wal.replay().await?;
926 assert_eq!(mutations.len(), 1);
927
928 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
930 assert_eq!(vid.as_u64(), 42);
931 assert_eq!(labels.len(), 2);
932 assert!(labels.contains(&"Person".to_string()));
933 assert!(labels.contains(&"User".to_string()));
934 } else {
935 panic!("Expected InsertVertex mutation");
936 }
937
938 Ok(())
939 }
940
941 #[tokio::test]
943 async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
944 let dir = tempdir()?;
945 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
946 let prefix = Path::from("wal");
947
948 let wal = Arc::new(WriteAheadLog::new(store, prefix));
949
950 wal.append(Mutation::DeleteVertex {
952 vid: Vid::new(99),
953 labels: vec!["Person".to_string(), "Admin".to_string()],
954 })?;
955
956 wal.flush().await?;
958
959 let mutations = wal.replay().await?;
961 assert_eq!(mutations.len(), 1);
962
963 if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
965 assert_eq!(vid.as_u64(), 99);
966 assert_eq!(labels.len(), 2);
967 assert!(labels.contains(&"Person".to_string()));
968 assert!(labels.contains(&"Admin".to_string()));
969 } else {
970 panic!("Expected DeleteVertex mutation");
971 }
972
973 Ok(())
974 }
975
976 #[tokio::test]
978 async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
979 let dir = tempdir()?;
980 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
981 let prefix = Path::from("wal");
982
983 let wal = Arc::new(WriteAheadLog::new(store, prefix));
984
985 wal.append(Mutation::InsertEdge {
987 src_vid: Vid::new(1),
988 dst_vid: Vid::new(2),
989 edge_type: 100,
990 eid: Eid::new(500),
991 version: 1,
992 properties: {
993 let mut props = HashMap::new();
994 props.insert("since".to_string(), uni_common::Value::Int(2020));
995 props
996 },
997 edge_type_name: Some("KNOWS".to_string()),
998 })?;
999
1000 wal.flush().await?;
1002
1003 let mutations = wal.replay().await?;
1005 assert_eq!(mutations.len(), 1);
1006
1007 if let Mutation::InsertEdge {
1009 eid,
1010 edge_type_name,
1011 ..
1012 } = &mutations[0]
1013 {
1014 assert_eq!(eid.as_u64(), 500);
1015 assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
1016 } else {
1017 panic!("Expected InsertEdge mutation");
1018 }
1019
1020 Ok(())
1021 }
1022
1023 #[tokio::test]
1025 async fn test_wal_backward_compatibility_labels() -> Result<()> {
1026 let dir = tempdir()?;
1027 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1028 let prefix = Path::from("wal");
1029
1030 let old_format_json = r#"{
1032 "lsn": 1,
1033 "mutations": [
1034 {
1035 "InsertVertex": {
1036 "vid": 123,
1037 "properties": {}
1038 }
1039 }
1040 ]
1041 }"#;
1042
1043 let path = prefix.clone().join("00000000000000000001_test.wal");
1044 store.put(&path, old_format_json.into()).await?;
1045
1046 let wal = WriteAheadLog::new(store, prefix);
1048 let mutations = wal.replay().await?;
1049
1050 assert_eq!(mutations.len(), 1);
1052 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
1053 assert_eq!(vid.as_u64(), 123);
1054 assert_eq!(
1055 labels.len(),
1056 0,
1057 "Old format should deserialize with empty labels"
1058 );
1059 } else {
1060 panic!("Expected InsertVertex mutation");
1061 }
1062
1063 Ok(())
1064 }
1065
1066 #[test]
1070 fn wal_segment_ref_serializes_identically() {
1071 let mut props = HashMap::new();
1072 props.insert("p".to_string(), uni_common::Value::Int(7));
1073 let mutations = vec![
1074 Mutation::InsertVertex {
1075 vid: Vid::new(1),
1076 properties: props,
1077 labels: vec!["L".to_string()],
1078 },
1079 Mutation::DeleteEdge {
1080 eid: Eid::new(2),
1081 src_vid: Vid::new(1),
1082 dst_vid: Vid::new(3),
1083 edge_type: 4,
1084 version: 5,
1085 },
1086 ];
1087 let owned = WalSegment {
1088 lsn: 42,
1089 mutations: mutations.clone(),
1090 };
1091 let borrowed = WalSegmentRef {
1092 lsn: 42,
1093 mutations: &mutations,
1094 };
1095 assert_eq!(
1096 serde_json::to_vec(&owned).unwrap(),
1097 serde_json::to_vec(&borrowed).unwrap()
1098 );
1099 }
1100}