1use crate::store_utils::{
5 DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22 let filename = path.filename()?;
23 if filename.len() < 20 {
24 return None;
25 }
26 filename[..20].parse::<u64>().ok()
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub enum Mutation {
31 InsertEdge {
32 src_vid: Vid,
33 dst_vid: Vid,
34 edge_type: u32,
35 eid: Eid,
36 version: u64,
37 properties: Properties,
38 #[serde(default)]
40 edge_type_name: Option<String>,
41 },
42 DeleteEdge {
43 eid: Eid,
44 src_vid: Vid,
45 dst_vid: Vid,
46 edge_type: u32,
47 version: u64,
48 },
49 InsertVertex {
50 vid: Vid,
51 properties: Properties,
52 #[serde(default)]
53 labels: Vec<String>,
54 },
55 DeleteVertex {
56 vid: Vid,
57 #[serde(default)]
58 labels: Vec<String>,
59 },
60}
61
62#[derive(Serialize, Deserialize, Debug, Clone)]
64pub struct WalSegment {
65 pub lsn: u64,
67 pub mutations: Vec<Mutation>,
69}
70
71pub struct WriteAheadLog {
72 store: Arc<dyn ObjectStore>,
73 prefix: Path,
74 state: Mutex<WalState>,
75}
76
77struct WalState {
78 buffer: Vec<Mutation>,
79 next_lsn: u64,
81 flushed_lsn: u64,
83}
84
85impl WriteAheadLog {
86 pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
87 Self {
88 store,
89 prefix,
90 state: Mutex::new(WalState {
91 buffer: Vec::new(),
92 next_lsn: 1, flushed_lsn: 0,
94 }),
95 }
96 }
97
98 pub async fn initialize(&self) -> Result<u64> {
100 let max_lsn = self.find_max_lsn().await?;
101 {
102 let mut state = acquire_mutex(&self.state, "wal_state")?;
103 state.next_lsn = max_lsn + 1;
104 state.flushed_lsn = max_lsn;
105 }
106 Ok(max_lsn)
107 }
108
109 async fn find_max_lsn(&self) -> Result<u64> {
112 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
113 let mut max_lsn: u64 = 0;
114
115 for meta in metas {
116 if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
118 max_lsn = max_lsn.max(lsn);
119 } else {
120 warn!(
122 path = %meta.location,
123 "WAL filename doesn't match expected format, downloading segment"
124 );
125 let get_result =
126 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
127 let bytes = get_result.bytes().await?;
128 if bytes.is_empty() {
129 continue;
130 }
131 let segment: WalSegment = serde_json::from_slice(&bytes)?;
132 max_lsn = max_lsn.max(segment.lsn);
133 }
134 }
135
136 Ok(max_lsn)
137 }
138
139 #[instrument(skip(self, mutation), level = "trace")]
140 pub fn append(&self, mutation: &Mutation) -> Result<()> {
141 let mut state = acquire_mutex(&self.state, "wal_state")?;
142 state.buffer.push(mutation.clone());
143 metrics::counter!("uni_wal_entries_total").increment(1);
144 Ok(())
145 }
146
147 #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
149 pub async fn flush(&self) -> Result<u64> {
150 let start = std::time::Instant::now();
151 let (batch, lsn) = {
152 let mut state = acquire_mutex(&self.state, "wal_state")?;
153 if state.buffer.is_empty() {
154 return Ok(state.flushed_lsn);
155 }
156 let lsn = state.next_lsn;
157 state.next_lsn += 1;
158 (std::mem::take(&mut state.buffer), lsn)
159 };
160
161 tracing::Span::current().record("lsn", lsn);
162 tracing::Span::current().record("mutations_count", batch.len());
163
164 let segment = WalSegment {
166 lsn,
167 mutations: batch.clone(),
168 };
169
170 let json = match serde_json::to_vec(&segment) {
172 Ok(j) => j,
173 Err(e) => {
174 warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
175 let mut state = acquire_mutex(&self.state, "wal_state")?;
177 let new_mutations = std::mem::take(&mut state.buffer);
178 state.buffer = batch;
179 state.buffer.extend(new_mutations);
180 return Err(e.into());
182 }
183 };
184 tracing::Span::current().record("size_bytes", json.len());
185 metrics::counter!("uni_wal_bytes_written_total").increment(json.len() as u64);
186
187 let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
189 let path = self.prefix.child(filename);
190
191 if let Err(e) = put_with_timeout(&self.store, &path, json.into(), DEFAULT_TIMEOUT).await {
193 warn!(
194 lsn,
195 error = %e,
196 "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
197 );
198 let mut state = acquire_mutex(&self.state, "wal_state")?;
200 let new_mutations = std::mem::take(&mut state.buffer);
202 state.buffer = batch;
203 state.buffer.extend(new_mutations);
204 return Err(e);
207 }
208
209 {
211 let mut state = acquire_mutex(&self.state, "wal_state")?;
212 state.flushed_lsn = lsn;
213 }
214
215 let duration = start.elapsed();
216 metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
217 metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
218
219 if duration.as_millis() > 100 {
220 warn!(
221 lsn,
222 duration_ms = duration.as_millis(),
223 "Slow WAL flush detected"
224 );
225 } else {
226 debug!(
227 lsn,
228 duration_ms = duration.as_millis(),
229 "WAL flush completed"
230 );
231 }
232
233 Ok(lsn)
234 }
235
236 pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
242 let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
243 Ok(guard.flushed_lsn)
244 }
245
246 #[instrument(skip(self), level = "debug")]
250 pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
251 let start = std::time::Instant::now();
252 debug!(high_water_mark, "Replaying WAL segments");
253 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
254 let mut mutations = Vec::new();
255
256 let mut paths: Vec<_> = metas.into_iter().map(|m| m.location).collect();
258 paths.sort(); let mut segments_replayed = 0;
261
262 for path in paths {
263 if let Some(lsn) = parse_lsn_from_filename(&path)
265 && lsn <= high_water_mark
266 {
267 continue; }
269
270 let get_result = get_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await?;
272 let bytes = get_result.bytes().await?;
273 if bytes.is_empty() {
274 continue;
275 }
276
277 let segment: WalSegment = serde_json::from_slice(&bytes)?;
278 if segment.lsn > high_water_mark {
280 mutations.extend(segment.mutations);
281 segments_replayed += 1;
282 }
283 }
284
285 info!(
286 segments_replayed,
287 mutations_count = mutations.len(),
288 "WAL replay completed"
289 );
290 metrics::histogram!("uni_wal_replay_duration_seconds")
291 .record(start.elapsed().as_secs_f64());
292
293 Ok(mutations)
294 }
295
296 pub async fn replay(&self) -> Result<Vec<Mutation>> {
298 self.replay_since(0).await
299 }
300
301 #[instrument(skip(self), level = "info")]
304 pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
305 info!(high_water_mark, "Truncating WAL segments");
306 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
307
308 let mut deleted_count = 0;
309 for meta in metas {
310 let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
312 lsn <= high_water_mark
313 } else {
314 warn!(
316 path = %meta.location,
317 "WAL filename doesn't match expected format, downloading segment"
318 );
319 let get_result =
320 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
321 let bytes = get_result.bytes().await?;
322 if bytes.is_empty() {
323 true
325 } else {
326 let segment: WalSegment = serde_json::from_slice(&bytes)?;
327 segment.lsn <= high_water_mark
328 }
329 };
330
331 if should_delete {
332 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
333 deleted_count += 1;
334 }
335 }
336 info!(deleted_count, "WAL truncation completed");
337 Ok(())
338 }
339
340 pub async fn has_segments(&self) -> Result<bool> {
342 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
343 Ok(!metas.is_empty())
344 }
345
346 pub async fn truncate(&self) -> Result<()> {
347 info!("Truncating all WAL segments");
348 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
349
350 let mut deleted_count = 0;
351 for meta in metas {
352 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
353 deleted_count += 1;
354 }
355 info!(deleted_count, "Full WAL truncation completed");
356 Ok(())
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use object_store::local::LocalFileSystem;
364 use std::collections::HashMap;
365 use tempfile::tempdir;
366
367 #[tokio::test]
368 async fn test_wal_append_replay() -> Result<()> {
369 let dir = tempdir()?;
370 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
371 let prefix = Path::from("wal");
372
373 let wal = WriteAheadLog::new(store, prefix);
374
375 let mutation = Mutation::InsertVertex {
376 vid: Vid::new(1),
377 properties: HashMap::new(),
378 labels: vec![],
379 };
380
381 wal.append(&mutation.clone())?;
382 wal.flush().await?;
383
384 let mutations = wal.replay().await?;
385 assert_eq!(mutations.len(), 1);
386 if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
387 assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
388 } else {
389 panic!("Wrong mutation type");
390 }
391
392 wal.truncate().await?;
393 let mutations2 = wal.replay().await?;
394 assert_eq!(mutations2.len(), 0);
395
396 Ok(())
397 }
398
399 #[tokio::test]
400 async fn test_lsn_monotonicity() -> Result<()> {
401 let dir = tempdir()?;
403 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
404 let prefix = Path::from("wal");
405
406 let wal = WriteAheadLog::new(store, prefix);
407
408 let mutation1 = Mutation::InsertVertex {
409 vid: Vid::new(1),
410 properties: HashMap::new(),
411 labels: vec![],
412 };
413 let mutation2 = Mutation::InsertVertex {
414 vid: Vid::new(2),
415 properties: HashMap::new(),
416 labels: vec![],
417 };
418 let mutation3 = Mutation::InsertVertex {
419 vid: Vid::new(3),
420 properties: HashMap::new(),
421 labels: vec![],
422 };
423
424 wal.append(&mutation1)?;
426 let lsn1 = wal.flush().await?;
427
428 wal.append(&mutation2)?;
430 let lsn2 = wal.flush().await?;
431
432 wal.append(&mutation3)?;
434 let lsn3 = wal.flush().await?;
435
436 assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
438 assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
439
440 assert_eq!(lsn2, lsn1 + 1);
442 assert_eq!(lsn3, lsn2 + 1);
443
444 Ok(())
445 }
446
447 #[test]
448 fn test_parse_lsn_from_filename() {
449 let path = Path::from("00000000000000000042_a1b2c3d4.wal");
451 assert_eq!(parse_lsn_from_filename(&path), Some(42));
452
453 let path = Path::from("00000000000000001234_e5f6a7b8.wal");
454 assert_eq!(parse_lsn_from_filename(&path), Some(1234));
455
456 let path = Path::from("00000000000000000001_xyz.wal");
458 assert_eq!(parse_lsn_from_filename(&path), Some(1));
459
460 let path = Path::from("12345678901234567890_uuid.wal");
462 assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
463
464 let path = Path::from("invalid.wal");
466 assert_eq!(parse_lsn_from_filename(&path), None);
467
468 let path = Path::from("123.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
470
471 let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
473
474 let path = Path::from("00000000000000000100.wal");
476 assert_eq!(parse_lsn_from_filename(&path), Some(100));
477
478 let path = Path::from("");
480 assert_eq!(parse_lsn_from_filename(&path), None);
481 }
482
483 #[tokio::test]
486 async fn test_find_max_lsn_scalability() -> Result<()> {
487 let dir = tempdir()?;
488 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
489 let prefix = Path::from("wal");
490
491 let wal = WriteAheadLog::new(store, prefix);
492
493 for i in 1..=100 {
495 let mutation = Mutation::InsertVertex {
496 vid: Vid::new(i),
497 properties: HashMap::new(),
498 labels: vec![],
499 };
500 wal.append(&mutation)?;
501 wal.flush().await?;
502 }
503
504 let start = std::time::Instant::now();
506 let max_lsn = wal.find_max_lsn().await?;
507 let duration = start.elapsed();
508
509 assert_eq!(max_lsn, 100, "Max LSN should be 100");
511
512 assert!(
514 duration.as_millis() < 1000,
515 "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
516 duration.as_millis()
517 );
518
519 Ok(())
520 }
521
522 #[tokio::test]
524 async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
525 let dir = tempdir()?;
526 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
527 let prefix = Path::from("wal");
528
529 let wal = WriteAheadLog::new(store.clone(), prefix.clone());
530
531 wal.append(&Mutation::InsertVertex {
533 vid: Vid::new(1),
534 properties: HashMap::new(),
535 labels: vec![],
536 })?;
537 let lsn1 = wal.flush().await?;
538 assert_eq!(lsn1, 1);
539
540 wal.append(&Mutation::InsertVertex {
542 vid: Vid::new(2),
543 properties: HashMap::new(),
544 labels: vec![],
545 })?;
546 let lsn2 = wal.flush().await?;
547 assert_eq!(lsn2, 2);
548
549 wal.append(&Mutation::InsertVertex {
556 vid: Vid::new(3),
557 properties: HashMap::new(),
558 labels: vec![],
559 })?;
560
561 wal.append(&Mutation::InsertVertex {
563 vid: Vid::new(4),
564 properties: HashMap::new(),
565 labels: vec![],
566 })?;
567 let lsn4 = wal.flush().await?;
568
569 assert_eq!(lsn4, 3, "LSN should increment monotonically");
571
572 let mutations = wal.replay().await?;
574 assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
575
576 Ok(())
577 }
578
579 #[tokio::test]
581 async fn test_lsn_watermark_no_reuse() -> Result<()> {
582 let dir = tempdir()?;
583 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
584 let prefix = Path::from("wal");
585
586 let wal = WriteAheadLog::new(store, prefix);
587
588 let mut seen_lsns = std::collections::HashSet::new();
590
591 for i in 1..=50 {
593 wal.append(&Mutation::InsertVertex {
594 vid: Vid::new(i),
595 properties: HashMap::new(),
596 labels: vec![],
597 })?;
598 let lsn = wal.flush().await?;
599
600 assert!(
602 !seen_lsns.contains(&lsn),
603 "LSN {} was reused! This violates monotonicity.",
604 lsn
605 );
606 seen_lsns.insert(lsn);
607
608 assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
610 }
611
612 Ok(())
613 }
614
615 #[tokio::test]
618 async fn test_truncate_scalability() -> Result<()> {
619 let dir = tempdir()?;
620 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
621 let prefix = Path::from("wal");
622
623 let wal = WriteAheadLog::new(store, prefix);
624
625 for i in 1..=100 {
627 let mutation = Mutation::InsertVertex {
628 vid: Vid::new(i),
629 properties: HashMap::new(),
630 labels: vec![],
631 };
632 wal.append(&mutation)?;
633 wal.flush().await?;
634 }
635
636 let start = std::time::Instant::now();
638 wal.truncate_before(50).await?;
639 let duration = start.elapsed();
640
641 let mutations = wal.replay().await?;
643 assert_eq!(
644 mutations.len(),
645 50,
646 "Should have 50 mutations remaining (51-100)"
647 );
648
649 assert!(
651 duration.as_millis() < 1000,
652 "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
653 duration.as_millis()
654 );
655
656 Ok(())
657 }
658
659 #[tokio::test]
661 async fn test_replay_since_skips_old_segments() -> Result<()> {
662 let dir = tempdir()?;
663 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
664 let prefix = Path::from("wal");
665
666 let wal = WriteAheadLog::new(store, prefix);
667
668 for i in 1..=100 {
670 let mutation = Mutation::InsertVertex {
671 vid: Vid::new(i),
672 properties: HashMap::new(),
673 labels: vec![],
674 };
675 wal.append(&mutation)?;
676 wal.flush().await?;
677 }
678
679 let start = std::time::Instant::now();
681 let mutations = wal.replay_since(90).await?;
682 let duration = start.elapsed();
683
684 assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
686
687 assert!(
689 duration.as_millis() < 500,
690 "replay_since took {}ms, expected < 500ms (should skip by filename)",
691 duration.as_millis()
692 );
693
694 Ok(())
695 }
696
697 #[tokio::test]
699 async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
700 let dir = tempdir()?;
701 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
702 let prefix = Path::from("wal");
703
704 let wal = Arc::new(WriteAheadLog::new(store, prefix));
705
706 wal.append(&Mutation::InsertVertex {
708 vid: Vid::new(42),
709 properties: {
710 let mut props = HashMap::new();
711 props.insert(
712 "name".to_string(),
713 uni_common::Value::String("Alice".to_string()),
714 );
715 props
716 },
717 labels: vec!["Person".to_string(), "User".to_string()],
718 })?;
719
720 wal.flush().await?;
722
723 let mutations = wal.replay().await?;
725 assert_eq!(mutations.len(), 1);
726
727 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
729 assert_eq!(vid.as_u64(), 42);
730 assert_eq!(labels.len(), 2);
731 assert!(labels.contains(&"Person".to_string()));
732 assert!(labels.contains(&"User".to_string()));
733 } else {
734 panic!("Expected InsertVertex mutation");
735 }
736
737 Ok(())
738 }
739
740 #[tokio::test]
742 async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
743 let dir = tempdir()?;
744 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
745 let prefix = Path::from("wal");
746
747 let wal = Arc::new(WriteAheadLog::new(store, prefix));
748
749 wal.append(&Mutation::DeleteVertex {
751 vid: Vid::new(99),
752 labels: vec!["Person".to_string(), "Admin".to_string()],
753 })?;
754
755 wal.flush().await?;
757
758 let mutations = wal.replay().await?;
760 assert_eq!(mutations.len(), 1);
761
762 if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
764 assert_eq!(vid.as_u64(), 99);
765 assert_eq!(labels.len(), 2);
766 assert!(labels.contains(&"Person".to_string()));
767 assert!(labels.contains(&"Admin".to_string()));
768 } else {
769 panic!("Expected DeleteVertex mutation");
770 }
771
772 Ok(())
773 }
774
775 #[tokio::test]
777 async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
778 let dir = tempdir()?;
779 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
780 let prefix = Path::from("wal");
781
782 let wal = Arc::new(WriteAheadLog::new(store, prefix));
783
784 wal.append(&Mutation::InsertEdge {
786 src_vid: Vid::new(1),
787 dst_vid: Vid::new(2),
788 edge_type: 100,
789 eid: Eid::new(500),
790 version: 1,
791 properties: {
792 let mut props = HashMap::new();
793 props.insert("since".to_string(), uni_common::Value::Int(2020));
794 props
795 },
796 edge_type_name: Some("KNOWS".to_string()),
797 })?;
798
799 wal.flush().await?;
801
802 let mutations = wal.replay().await?;
804 assert_eq!(mutations.len(), 1);
805
806 if let Mutation::InsertEdge {
808 eid,
809 edge_type_name,
810 ..
811 } = &mutations[0]
812 {
813 assert_eq!(eid.as_u64(), 500);
814 assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
815 } else {
816 panic!("Expected InsertEdge mutation");
817 }
818
819 Ok(())
820 }
821
822 #[tokio::test]
824 async fn test_wal_backward_compatibility_labels() -> Result<()> {
825 let dir = tempdir()?;
826 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
827 let prefix = Path::from("wal");
828
829 let old_format_json = r#"{
831 "lsn": 1,
832 "mutations": [
833 {
834 "InsertVertex": {
835 "vid": 123,
836 "properties": {}
837 }
838 }
839 ]
840 }"#;
841
842 let path = prefix.child("00000000000000000001_test.wal");
843 store.put(&path, old_format_json.into()).await?;
844
845 let wal = WriteAheadLog::new(store, prefix);
847 let mutations = wal.replay().await?;
848
849 assert_eq!(mutations.len(), 1);
851 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
852 assert_eq!(vid.as_u64(), 123);
853 assert_eq!(
854 labels.len(),
855 0,
856 "Old format should deserialize with empty labels"
857 );
858 } else {
859 panic!("Expected InsertVertex mutation");
860 }
861
862 Ok(())
863 }
864}