1use crate::store_utils::{
5 DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22 let filename = path.filename()?;
23 if filename.len() < 20 {
24 return None;
25 }
26 filename.get(..20).and_then(|s| s.parse::<u64>().ok())
29}
30
31const WAL_V2_MAGIC: &[u8] = b"UNIWAL2\n";
37
38const WAL_V2_HASH_HEX_LEN: usize = 64;
40
41fn encode_segment_envelope(payload_json: &[u8]) -> Vec<u8> {
43 let hash = blake3::hash(payload_json);
44 let mut out =
45 Vec::with_capacity(WAL_V2_MAGIC.len() + WAL_V2_HASH_HEX_LEN + 1 + payload_json.len());
46 out.extend_from_slice(WAL_V2_MAGIC);
47 out.extend_from_slice(hash.to_hex().as_bytes());
48 out.push(b'\n');
49 out.extend_from_slice(payload_json);
50 out
51}
52
53#[doc(hidden)]
63pub fn decode_segment(bytes: &[u8]) -> std::result::Result<WalSegment, String> {
64 if let Some(rest) = bytes.strip_prefix(WAL_V2_MAGIC) {
65 if rest.len() < WAL_V2_HASH_HEX_LEN + 1 || rest[WAL_V2_HASH_HEX_LEN] != b'\n' {
66 return Err("truncated v2 segment header".to_string());
67 }
68 let (hex, payload_nl) = rest.split_at(WAL_V2_HASH_HEX_LEN);
69 let payload = &payload_nl[1..];
70 let expected =
71 std::str::from_utf8(hex).map_err(|_| "non-utf8 checksum header".to_string())?;
72 let actual = blake3::hash(payload);
73 if actual.to_hex().as_str() != expected {
74 return Err(format!(
75 "checksum mismatch (expected {expected}, computed {})",
76 actual.to_hex()
77 ));
78 }
79 serde_json::from_slice(payload).map_err(|e| format!("v2 payload parse: {e}"))
80 } else {
81 serde_json::from_slice(bytes).map_err(|e| format!("legacy segment parse: {e}"))
83 }
84}
85
86#[cfg(test)]
90pub(crate) static FAIL_NEXT_FSYNC: std::sync::atomic::AtomicBool =
91 std::sync::atomic::AtomicBool::new(false);
92
93pub(crate) fn sync_file_and_parent(path: &std::path::Path) -> std::io::Result<()> {
101 std::fs::File::open(path)?.sync_all()?;
102 #[cfg(unix)]
103 if let Some(dir) = path.parent() {
104 std::fs::File::open(dir)?.sync_all()?;
105 }
106 Ok(())
107}
108
109#[derive(Serialize, Deserialize, Debug, Clone)]
110pub enum Mutation {
111 InsertEdge {
112 src_vid: Vid,
113 dst_vid: Vid,
114 edge_type: u32,
115 eid: Eid,
116 version: u64,
117 properties: Properties,
118 #[serde(default)]
120 edge_type_name: Option<String>,
121 },
122 DeleteEdge {
123 eid: Eid,
124 src_vid: Vid,
125 dst_vid: Vid,
126 edge_type: u32,
127 version: u64,
128 },
129 InsertVertex {
130 vid: Vid,
131 properties: Properties,
132 #[serde(default)]
133 labels: Vec<String>,
134 },
135 DeleteVertex {
136 vid: Vid,
137 #[serde(default)]
138 labels: Vec<String>,
139 },
140 SetVertexLabels { vid: Vid, labels: Vec<String> },
146}
147
148#[derive(Serialize, Deserialize, Debug, Clone)]
150pub struct WalSegment {
151 pub lsn: u64,
153 pub mutations: Vec<Mutation>,
155}
156
157#[derive(Serialize, Debug)]
163struct WalSegmentRef<'a> {
164 lsn: u64,
165 mutations: &'a [Mutation],
166}
167
168pub struct WriteAheadLog {
169 store: Arc<dyn ObjectStore>,
170 prefix: Path,
171 local_root: Option<std::path::PathBuf>,
178 state: Mutex<WalState>,
179}
180
181struct WalState {
182 buffer: Vec<Mutation>,
183 next_lsn: u64,
185 flushed_lsn: u64,
187}
188
189impl WriteAheadLog {
190 pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
191 Self {
192 store,
193 prefix,
194 local_root: None,
195 state: Mutex::new(WalState {
196 buffer: Vec::new(),
197 next_lsn: 1, flushed_lsn: 0,
199 }),
200 }
201 }
202
203 #[must_use]
206 pub fn with_local_root(mut self, local_root: Option<std::path::PathBuf>) -> Self {
207 self.local_root = local_root;
208 self
209 }
210
211 pub async fn initialize(&self) -> Result<u64> {
213 let max_lsn = self.find_max_lsn().await?;
214 {
215 let mut state = acquire_mutex(&self.state, "wal_state")?;
216 state.next_lsn = max_lsn + 1;
217 state.flushed_lsn = max_lsn;
218 }
219 Ok(max_lsn)
220 }
221
222 async fn find_max_lsn(&self) -> Result<u64> {
225 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
226 let mut max_lsn: u64 = 0;
227
228 for meta in metas {
229 if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
231 max_lsn = max_lsn.max(lsn);
232 } else {
233 warn!(
235 path = %meta.location,
236 "WAL filename doesn't match expected format, downloading segment"
237 );
238 let get_result =
239 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
240 let bytes = get_result.bytes().await?;
241 if bytes.is_empty() {
242 continue;
243 }
244 match decode_segment(&bytes) {
248 Ok(segment) => max_lsn = max_lsn.max(segment.lsn),
249 Err(reason) => {
250 warn!(path = %meta.location, reason = %reason,
251 "Skipping corrupt WAL segment during max-LSN probe");
252 }
253 }
254 }
255 }
256
257 Ok(max_lsn)
258 }
259
260 #[instrument(skip(self, mutation), level = "trace")]
261 pub fn append(&self, mutation: Mutation) -> Result<()> {
262 let mut state = acquire_mutex(&self.state, "wal_state")?;
263 state.buffer.push(mutation);
264 metrics::counter!("uni_wal_entries_total").increment(1);
265 Ok(())
266 }
267
268 #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
270 pub async fn flush(&self) -> Result<u64> {
271 let start = std::time::Instant::now();
272 let (batch, lsn) = {
273 let mut state = acquire_mutex(&self.state, "wal_state")?;
274 if state.buffer.is_empty() {
275 return Ok(state.flushed_lsn);
276 }
277 let lsn = state.next_lsn;
278 state.next_lsn += 1;
279 (std::mem::take(&mut state.buffer), lsn)
280 };
281
282 tracing::Span::current().record("lsn", lsn);
283 tracing::Span::current().record("mutations_count", batch.len());
284
285 let segment = WalSegmentRef {
290 lsn,
291 mutations: &batch,
292 };
293
294 let json = match serde_json::to_vec(&segment) {
296 Ok(j) => j,
297 Err(e) => {
298 warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
299 let mut state = acquire_mutex(&self.state, "wal_state")?;
301 let new_mutations = std::mem::take(&mut state.buffer);
302 state.buffer = batch;
303 state.buffer.extend(new_mutations);
304 return Err(e.into());
306 }
307 };
308 let body = encode_segment_envelope(&json);
311 tracing::Span::current().record("size_bytes", body.len());
312 metrics::counter!("uni_wal_bytes_written_total").increment(body.len() as u64);
313
314 let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
316 let path = self.prefix.clone().join(filename);
317
318 if let Err(e) = put_with_timeout(&self.store, &path, body.into(), DEFAULT_TIMEOUT).await {
320 warn!(
321 lsn,
322 error = %e,
323 "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
324 );
325 let mut state = acquire_mutex(&self.state, "wal_state")?;
327 let new_mutations = std::mem::take(&mut state.buffer);
329 state.buffer = batch;
330 state.buffer.extend(new_mutations);
331 return Err(e);
334 }
335
336 if let Some(root) = &self.local_root {
343 let file_path = root.join(path.as_ref());
344 #[cfg(test)]
345 let synced = if FAIL_NEXT_FSYNC.swap(false, std::sync::atomic::Ordering::SeqCst) {
346 Ok(Err(std::io::Error::other("injected fsync failure")))
347 } else {
348 tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await
349 };
350 #[cfg(not(test))]
351 let synced =
352 tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await;
353 let fsync_err: Option<anyhow::Error> = match synced {
354 Ok(Ok(())) => None,
355 Ok(Err(e)) => Some(e.into()),
356 Err(e) => Some(e.into()),
357 };
358 if let Some(err) = fsync_err {
359 warn!(
360 lsn,
361 error = %err,
362 "WAL segment fsync failed — deleting the non-durable segment to avoid a ghost commit on replay"
363 );
364 if let Err(del_err) = delete_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await
369 {
370 return Err(anyhow::anyhow!(
371 "WAL segment fsync failed ({err}) and the cleanup delete \
372 of segment at lsn {lsn} also failed ({del_err}); the WAL \
373 may contain a non-durable segment"
374 ));
375 }
376 return Err(err);
377 }
378 }
379
380 {
382 let mut state = acquire_mutex(&self.state, "wal_state")?;
383 state.flushed_lsn = lsn;
384 }
385
386 let duration = start.elapsed();
387 metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
388 metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
389
390 if duration.as_millis() > 100 {
391 warn!(
392 lsn,
393 duration_ms = duration.as_millis(),
394 "Slow WAL flush detected"
395 );
396 } else {
397 debug!(
398 lsn,
399 duration_ms = duration.as_millis(),
400 "WAL flush completed"
401 );
402 }
403
404 Ok(lsn)
405 }
406
407 pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
413 let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
414 Ok(guard.flushed_lsn)
415 }
416
417 #[instrument(skip(self), level = "debug")]
428 pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
429 let start = std::time::Instant::now();
430 debug!(high_water_mark, "Replaying WAL segments");
431 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
432 let mut mutations = Vec::new();
433
434 let mut paths: Vec<_> = metas
437 .into_iter()
438 .map(|m| m.location)
439 .filter(|p| {
440 parse_lsn_from_filename(p).is_none_or(|lsn| lsn > high_water_mark)
443 })
444 .collect();
445 paths.sort();
446
447 let mut segments_replayed = 0;
448
449 for (idx, path) in paths.iter().enumerate() {
450 let get_result = get_with_timeout(&self.store, path, DEFAULT_TIMEOUT).await?;
451 let bytes = get_result.bytes().await?;
452
453 let decoded = if bytes.is_empty() {
455 Err("empty segment file".to_string())
456 } else {
457 decode_segment(&bytes)
458 };
459
460 let segment = match decoded {
461 Ok(segment) => segment,
462 Err(reason) => {
463 let is_tail = idx + 1 == paths.len();
464 if is_tail {
465 warn!(
466 path = %path,
467 reason = %reason,
468 "Corrupt tail WAL segment — torn write from a crash; \
469 treating as end of WAL (the commit was never acknowledged)"
470 );
471 break;
472 }
473 return Err(anyhow::anyhow!(
474 "corrupt WAL segment '{path}' ({reason}) with {} later segment(s) \
475 present; refusing to skip — manual inspection required",
476 paths.len() - idx - 1
477 ));
478 }
479 };
480
481 if segment.lsn > high_water_mark {
483 mutations.extend(segment.mutations);
484 segments_replayed += 1;
485 }
486 }
487
488 info!(
489 segments_replayed,
490 mutations_count = mutations.len(),
491 "WAL replay completed"
492 );
493 metrics::histogram!("uni_wal_replay_duration_seconds")
494 .record(start.elapsed().as_secs_f64());
495
496 Ok(mutations)
497 }
498
499 pub async fn replay(&self) -> Result<Vec<Mutation>> {
501 self.replay_since(0).await
502 }
503
504 #[instrument(skip(self), level = "info")]
507 pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
508 info!(high_water_mark, "Truncating WAL segments");
509 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
510
511 let mut deleted_count = 0;
512 for meta in metas {
513 let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
515 lsn <= high_water_mark
516 } else {
517 warn!(
519 path = %meta.location,
520 "WAL filename doesn't match expected format, downloading segment"
521 );
522 let get_result =
523 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
524 let bytes = get_result.bytes().await?;
525 if bytes.is_empty() {
526 true
528 } else {
529 match decode_segment(&bytes) {
530 Ok(segment) => segment.lsn <= high_water_mark,
531 Err(reason) => {
532 warn!(path = %meta.location, reason = %reason,
536 "Keeping corrupt WAL segment during truncation");
537 false
538 }
539 }
540 }
541 };
542
543 if should_delete {
544 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
545 deleted_count += 1;
546 }
547 }
548 info!(deleted_count, "WAL truncation completed");
549 Ok(())
550 }
551
552 pub async fn has_segments(&self) -> Result<bool> {
554 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
555 Ok(!metas.is_empty())
556 }
557
558 pub async fn truncate(&self) -> Result<()> {
559 info!("Truncating all WAL segments");
560 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
561
562 let mut deleted_count = 0;
563 for meta in metas {
564 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
565 deleted_count += 1;
566 }
567 info!(deleted_count, "Full WAL truncation completed");
568 Ok(())
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use super::*;
575 use object_store::ObjectStoreExt;
576 use object_store::local::LocalFileSystem;
577 use std::collections::HashMap;
578 use tempfile::tempdir;
579
580 #[tokio::test]
581 async fn test_wal_append_replay() -> Result<()> {
582 let dir = tempdir()?;
583 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
584 let prefix = Path::from("wal");
585
586 let wal = WriteAheadLog::new(store, prefix);
587
588 let mutation = Mutation::InsertVertex {
589 vid: Vid::new(1),
590 properties: HashMap::new(),
591 labels: vec![],
592 };
593
594 wal.append(mutation)?;
595 wal.flush().await?;
596
597 let mutations = wal.replay().await?;
598 assert_eq!(mutations.len(), 1);
599 if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
600 assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
601 } else {
602 panic!("Wrong mutation type");
603 }
604
605 wal.truncate().await?;
606 let mutations2 = wal.replay().await?;
607 assert_eq!(mutations2.len(), 0);
608
609 Ok(())
610 }
611
612 #[tokio::test]
613 async fn test_lsn_monotonicity() -> Result<()> {
614 let dir = tempdir()?;
616 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
617 let prefix = Path::from("wal");
618
619 let wal = WriteAheadLog::new(store, prefix);
620
621 let mutation1 = Mutation::InsertVertex {
622 vid: Vid::new(1),
623 properties: HashMap::new(),
624 labels: vec![],
625 };
626 let mutation2 = Mutation::InsertVertex {
627 vid: Vid::new(2),
628 properties: HashMap::new(),
629 labels: vec![],
630 };
631 let mutation3 = Mutation::InsertVertex {
632 vid: Vid::new(3),
633 properties: HashMap::new(),
634 labels: vec![],
635 };
636
637 wal.append(mutation1)?;
639 let lsn1 = wal.flush().await?;
640
641 wal.append(mutation2)?;
643 let lsn2 = wal.flush().await?;
644
645 wal.append(mutation3)?;
647 let lsn3 = wal.flush().await?;
648
649 assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
651 assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
652
653 assert_eq!(lsn2, lsn1 + 1);
655 assert_eq!(lsn3, lsn2 + 1);
656
657 Ok(())
658 }
659
660 #[tokio::test]
664 async fn fsync_failure_deletes_segment_no_ghost_commit() -> Result<()> {
665 let dir = tempdir()?;
666 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
667 let prefix = Path::from("wal");
668 let wal = WriteAheadLog::new(store, prefix).with_local_root(Some(dir.path().to_path_buf()));
670
671 wal.append(Mutation::InsertVertex {
672 vid: Vid::new(1),
673 properties: HashMap::new(),
674 labels: vec![],
675 })?;
676
677 FAIL_NEXT_FSYNC.store(true, std::sync::atomic::Ordering::SeqCst);
679 let result = wal.flush().await;
680 assert!(
681 result.is_err(),
682 "flush must report failure when the segment fsync fails"
683 );
684
685 let replayed = wal.replay().await?;
687 assert!(
688 replayed.is_empty(),
689 "a segment whose fsync failed must not be replayable (ghost commit); got {} mutations",
690 replayed.len()
691 );
692 Ok(())
693 }
694
695 #[test]
696 fn test_parse_lsn_from_filename() {
697 let path = Path::from("00000000000000000042_a1b2c3d4.wal");
699 assert_eq!(parse_lsn_from_filename(&path), Some(42));
700
701 let path = Path::from("00000000000000001234_e5f6a7b8.wal");
702 assert_eq!(parse_lsn_from_filename(&path), Some(1234));
703
704 let path = Path::from("00000000000000000001_xyz.wal");
706 assert_eq!(parse_lsn_from_filename(&path), Some(1));
707
708 let path = Path::from("12345678901234567890_uuid.wal");
710 assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
711
712 let path = Path::from("invalid.wal");
714 assert_eq!(parse_lsn_from_filename(&path), None);
715
716 let path = Path::from("123.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
718
719 let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
721
722 let path = Path::from("00000000000000000100.wal");
724 assert_eq!(parse_lsn_from_filename(&path), Some(100));
725
726 let path = Path::from("");
728 assert_eq!(parse_lsn_from_filename(&path), None);
729 }
730
731 #[test]
745 fn test_parse_lsn_from_filename_multibyte_no_panic() {
746 let name = format!("{}{}.wal", "0".repeat(19), "é"); let path = Path::parse(name).expect("multi-byte segment is a valid object_store path");
748 assert_eq!(parse_lsn_from_filename(&path), None); }
750
751 #[tokio::test]
754 async fn test_find_max_lsn_scalability() -> Result<()> {
755 let dir = tempdir()?;
756 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
757 let prefix = Path::from("wal");
758
759 let wal = WriteAheadLog::new(store, prefix);
760
761 for i in 1..=100 {
763 let mutation = Mutation::InsertVertex {
764 vid: Vid::new(i),
765 properties: HashMap::new(),
766 labels: vec![],
767 };
768 wal.append(mutation)?;
769 wal.flush().await?;
770 }
771
772 let start = std::time::Instant::now();
774 let max_lsn = wal.find_max_lsn().await?;
775 let duration = start.elapsed();
776
777 assert_eq!(max_lsn, 100, "Max LSN should be 100");
779
780 assert!(
782 duration.as_millis() < 1000,
783 "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
784 duration.as_millis()
785 );
786
787 Ok(())
788 }
789
790 #[tokio::test]
792 async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
793 let dir = tempdir()?;
794 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
795 let prefix = Path::from("wal");
796
797 let wal = WriteAheadLog::new(store.clone(), prefix.clone());
798
799 wal.append(Mutation::InsertVertex {
801 vid: Vid::new(1),
802 properties: HashMap::new(),
803 labels: vec![],
804 })?;
805 let lsn1 = wal.flush().await?;
806 assert_eq!(lsn1, 1);
807
808 wal.append(Mutation::InsertVertex {
810 vid: Vid::new(2),
811 properties: HashMap::new(),
812 labels: vec![],
813 })?;
814 let lsn2 = wal.flush().await?;
815 assert_eq!(lsn2, 2);
816
817 wal.append(Mutation::InsertVertex {
824 vid: Vid::new(3),
825 properties: HashMap::new(),
826 labels: vec![],
827 })?;
828
829 wal.append(Mutation::InsertVertex {
831 vid: Vid::new(4),
832 properties: HashMap::new(),
833 labels: vec![],
834 })?;
835 let lsn4 = wal.flush().await?;
836
837 assert_eq!(lsn4, 3, "LSN should increment monotonically");
839
840 let mutations = wal.replay().await?;
842 assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
843
844 Ok(())
845 }
846
847 #[tokio::test]
849 async fn test_lsn_watermark_no_reuse() -> Result<()> {
850 let dir = tempdir()?;
851 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
852 let prefix = Path::from("wal");
853
854 let wal = WriteAheadLog::new(store, prefix);
855
856 let mut seen_lsns = std::collections::HashSet::new();
858
859 for i in 1..=50 {
861 wal.append(Mutation::InsertVertex {
862 vid: Vid::new(i),
863 properties: HashMap::new(),
864 labels: vec![],
865 })?;
866 let lsn = wal.flush().await?;
867
868 assert!(
870 !seen_lsns.contains(&lsn),
871 "LSN {} was reused! This violates monotonicity.",
872 lsn
873 );
874 seen_lsns.insert(lsn);
875
876 assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
878 }
879
880 Ok(())
881 }
882
883 #[tokio::test]
886 async fn test_truncate_scalability() -> Result<()> {
887 let dir = tempdir()?;
888 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
889 let prefix = Path::from("wal");
890
891 let wal = WriteAheadLog::new(store, prefix);
892
893 for i in 1..=100 {
895 let mutation = Mutation::InsertVertex {
896 vid: Vid::new(i),
897 properties: HashMap::new(),
898 labels: vec![],
899 };
900 wal.append(mutation)?;
901 wal.flush().await?;
902 }
903
904 let start = std::time::Instant::now();
906 wal.truncate_before(50).await?;
907 let duration = start.elapsed();
908
909 let mutations = wal.replay().await?;
911 assert_eq!(
912 mutations.len(),
913 50,
914 "Should have 50 mutations remaining (51-100)"
915 );
916
917 assert!(
919 duration.as_millis() < 1000,
920 "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
921 duration.as_millis()
922 );
923
924 Ok(())
925 }
926
927 #[tokio::test]
929 async fn test_replay_since_skips_old_segments() -> Result<()> {
930 let dir = tempdir()?;
931 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
932 let prefix = Path::from("wal");
933
934 let wal = WriteAheadLog::new(store, prefix);
935
936 for i in 1..=100 {
938 let mutation = Mutation::InsertVertex {
939 vid: Vid::new(i),
940 properties: HashMap::new(),
941 labels: vec![],
942 };
943 wal.append(mutation)?;
944 wal.flush().await?;
945 }
946
947 let start = std::time::Instant::now();
949 let mutations = wal.replay_since(90).await?;
950 let duration = start.elapsed();
951
952 assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
954
955 assert!(
957 duration.as_millis() < 500,
958 "replay_since took {}ms, expected < 500ms (should skip by filename)",
959 duration.as_millis()
960 );
961
962 Ok(())
963 }
964
965 #[tokio::test]
967 async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
968 let dir = tempdir()?;
969 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
970 let prefix = Path::from("wal");
971
972 let wal = Arc::new(WriteAheadLog::new(store, prefix));
973
974 wal.append(Mutation::InsertVertex {
976 vid: Vid::new(42),
977 properties: {
978 let mut props = HashMap::new();
979 props.insert(
980 "name".to_string(),
981 uni_common::Value::String("Alice".to_string()),
982 );
983 props
984 },
985 labels: vec!["Person".to_string(), "User".to_string()],
986 })?;
987
988 wal.flush().await?;
990
991 let mutations = wal.replay().await?;
993 assert_eq!(mutations.len(), 1);
994
995 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
997 assert_eq!(vid.as_u64(), 42);
998 assert_eq!(labels.len(), 2);
999 assert!(labels.contains(&"Person".to_string()));
1000 assert!(labels.contains(&"User".to_string()));
1001 } else {
1002 panic!("Expected InsertVertex mutation");
1003 }
1004
1005 Ok(())
1006 }
1007
1008 #[tokio::test]
1010 async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
1011 let dir = tempdir()?;
1012 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1013 let prefix = Path::from("wal");
1014
1015 let wal = Arc::new(WriteAheadLog::new(store, prefix));
1016
1017 wal.append(Mutation::DeleteVertex {
1019 vid: Vid::new(99),
1020 labels: vec!["Person".to_string(), "Admin".to_string()],
1021 })?;
1022
1023 wal.flush().await?;
1025
1026 let mutations = wal.replay().await?;
1028 assert_eq!(mutations.len(), 1);
1029
1030 if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
1032 assert_eq!(vid.as_u64(), 99);
1033 assert_eq!(labels.len(), 2);
1034 assert!(labels.contains(&"Person".to_string()));
1035 assert!(labels.contains(&"Admin".to_string()));
1036 } else {
1037 panic!("Expected DeleteVertex mutation");
1038 }
1039
1040 Ok(())
1041 }
1042
1043 #[tokio::test]
1045 async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
1046 let dir = tempdir()?;
1047 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1048 let prefix = Path::from("wal");
1049
1050 let wal = Arc::new(WriteAheadLog::new(store, prefix));
1051
1052 wal.append(Mutation::InsertEdge {
1054 src_vid: Vid::new(1),
1055 dst_vid: Vid::new(2),
1056 edge_type: 100,
1057 eid: Eid::new(500),
1058 version: 1,
1059 properties: {
1060 let mut props = HashMap::new();
1061 props.insert("since".to_string(), uni_common::Value::Int(2020));
1062 props
1063 },
1064 edge_type_name: Some("KNOWS".to_string()),
1065 })?;
1066
1067 wal.flush().await?;
1069
1070 let mutations = wal.replay().await?;
1072 assert_eq!(mutations.len(), 1);
1073
1074 if let Mutation::InsertEdge {
1076 eid,
1077 edge_type_name,
1078 ..
1079 } = &mutations[0]
1080 {
1081 assert_eq!(eid.as_u64(), 500);
1082 assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
1083 } else {
1084 panic!("Expected InsertEdge mutation");
1085 }
1086
1087 Ok(())
1088 }
1089
1090 #[tokio::test]
1092 async fn test_wal_backward_compatibility_labels() -> Result<()> {
1093 let dir = tempdir()?;
1094 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1095 let prefix = Path::from("wal");
1096
1097 let old_format_json = r#"{
1099 "lsn": 1,
1100 "mutations": [
1101 {
1102 "InsertVertex": {
1103 "vid": 123,
1104 "properties": {}
1105 }
1106 }
1107 ]
1108 }"#;
1109
1110 let path = prefix.clone().join("00000000000000000001_test.wal");
1111 store.put(&path, old_format_json.into()).await?;
1112
1113 let wal = WriteAheadLog::new(store, prefix);
1115 let mutations = wal.replay().await?;
1116
1117 assert_eq!(mutations.len(), 1);
1119 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
1120 assert_eq!(vid.as_u64(), 123);
1121 assert_eq!(
1122 labels.len(),
1123 0,
1124 "Old format should deserialize with empty labels"
1125 );
1126 } else {
1127 panic!("Expected InsertVertex mutation");
1128 }
1129
1130 Ok(())
1131 }
1132
1133 #[test]
1137 fn wal_segment_ref_serializes_identically() {
1138 let mut props = HashMap::new();
1139 props.insert("p".to_string(), uni_common::Value::Int(7));
1140 let mutations = vec![
1141 Mutation::InsertVertex {
1142 vid: Vid::new(1),
1143 properties: props,
1144 labels: vec!["L".to_string()],
1145 },
1146 Mutation::DeleteEdge {
1147 eid: Eid::new(2),
1148 src_vid: Vid::new(1),
1149 dst_vid: Vid::new(3),
1150 edge_type: 4,
1151 version: 5,
1152 },
1153 ];
1154 let owned = WalSegment {
1155 lsn: 42,
1156 mutations: mutations.clone(),
1157 };
1158 let borrowed = WalSegmentRef {
1159 lsn: 42,
1160 mutations: &mutations,
1161 };
1162 assert_eq!(
1163 serde_json::to_vec(&owned).unwrap(),
1164 serde_json::to_vec(&borrowed).unwrap()
1165 );
1166 }
1167}