1use crate::store_utils::{
5 DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22 let filename = path.filename()?;
23 if filename.len() < 20 {
24 return None;
25 }
26 filename[..20].parse::<u64>().ok()
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub enum Mutation {
31 InsertEdge {
32 src_vid: Vid,
33 dst_vid: Vid,
34 edge_type: u32,
35 eid: Eid,
36 version: u64,
37 properties: Properties,
38 #[serde(default)]
40 edge_type_name: Option<String>,
41 },
42 DeleteEdge {
43 eid: Eid,
44 src_vid: Vid,
45 dst_vid: Vid,
46 edge_type: u32,
47 version: u64,
48 },
49 InsertVertex {
50 vid: Vid,
51 properties: Properties,
52 #[serde(default)]
53 labels: Vec<String>,
54 },
55 DeleteVertex {
56 vid: Vid,
57 #[serde(default)]
58 labels: Vec<String>,
59 },
60 SetVertexLabels { vid: Vid, labels: Vec<String> },
66}
67
68#[derive(Serialize, Deserialize, Debug, Clone)]
70pub struct WalSegment {
71 pub lsn: u64,
73 pub mutations: Vec<Mutation>,
75}
76
77pub struct WriteAheadLog {
78 store: Arc<dyn ObjectStore>,
79 prefix: Path,
80 state: Mutex<WalState>,
81}
82
83struct WalState {
84 buffer: Vec<Mutation>,
85 next_lsn: u64,
87 flushed_lsn: u64,
89}
90
91impl WriteAheadLog {
92 pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
93 Self {
94 store,
95 prefix,
96 state: Mutex::new(WalState {
97 buffer: Vec::new(),
98 next_lsn: 1, flushed_lsn: 0,
100 }),
101 }
102 }
103
104 pub async fn initialize(&self) -> Result<u64> {
106 let max_lsn = self.find_max_lsn().await?;
107 {
108 let mut state = acquire_mutex(&self.state, "wal_state")?;
109 state.next_lsn = max_lsn + 1;
110 state.flushed_lsn = max_lsn;
111 }
112 Ok(max_lsn)
113 }
114
115 async fn find_max_lsn(&self) -> Result<u64> {
118 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
119 let mut max_lsn: u64 = 0;
120
121 for meta in metas {
122 if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
124 max_lsn = max_lsn.max(lsn);
125 } else {
126 warn!(
128 path = %meta.location,
129 "WAL filename doesn't match expected format, downloading segment"
130 );
131 let get_result =
132 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
133 let bytes = get_result.bytes().await?;
134 if bytes.is_empty() {
135 continue;
136 }
137 let segment: WalSegment = serde_json::from_slice(&bytes)?;
138 max_lsn = max_lsn.max(segment.lsn);
139 }
140 }
141
142 Ok(max_lsn)
143 }
144
145 #[instrument(skip(self, mutation), level = "trace")]
146 pub fn append(&self, mutation: &Mutation) -> Result<()> {
147 let mut state = acquire_mutex(&self.state, "wal_state")?;
148 state.buffer.push(mutation.clone());
149 metrics::counter!("uni_wal_entries_total").increment(1);
150 Ok(())
151 }
152
153 #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
155 pub async fn flush(&self) -> Result<u64> {
156 let start = std::time::Instant::now();
157 let (batch, lsn) = {
158 let mut state = acquire_mutex(&self.state, "wal_state")?;
159 if state.buffer.is_empty() {
160 return Ok(state.flushed_lsn);
161 }
162 let lsn = state.next_lsn;
163 state.next_lsn += 1;
164 (std::mem::take(&mut state.buffer), lsn)
165 };
166
167 tracing::Span::current().record("lsn", lsn);
168 tracing::Span::current().record("mutations_count", batch.len());
169
170 let segment = WalSegment {
172 lsn,
173 mutations: batch.clone(),
174 };
175
176 let json = match serde_json::to_vec(&segment) {
178 Ok(j) => j,
179 Err(e) => {
180 warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
181 let mut state = acquire_mutex(&self.state, "wal_state")?;
183 let new_mutations = std::mem::take(&mut state.buffer);
184 state.buffer = batch;
185 state.buffer.extend(new_mutations);
186 return Err(e.into());
188 }
189 };
190 tracing::Span::current().record("size_bytes", json.len());
191 metrics::counter!("uni_wal_bytes_written_total").increment(json.len() as u64);
192
193 let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
195 let path = self.prefix.clone().join(filename);
196
197 if let Err(e) = put_with_timeout(&self.store, &path, json.into(), DEFAULT_TIMEOUT).await {
199 warn!(
200 lsn,
201 error = %e,
202 "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
203 );
204 let mut state = acquire_mutex(&self.state, "wal_state")?;
206 let new_mutations = std::mem::take(&mut state.buffer);
208 state.buffer = batch;
209 state.buffer.extend(new_mutations);
210 return Err(e);
213 }
214
215 {
217 let mut state = acquire_mutex(&self.state, "wal_state")?;
218 state.flushed_lsn = lsn;
219 }
220
221 let duration = start.elapsed();
222 metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
223 metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
224
225 if duration.as_millis() > 100 {
226 warn!(
227 lsn,
228 duration_ms = duration.as_millis(),
229 "Slow WAL flush detected"
230 );
231 } else {
232 debug!(
233 lsn,
234 duration_ms = duration.as_millis(),
235 "WAL flush completed"
236 );
237 }
238
239 Ok(lsn)
240 }
241
242 pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
248 let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
249 Ok(guard.flushed_lsn)
250 }
251
252 #[instrument(skip(self), level = "debug")]
256 pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
257 let start = std::time::Instant::now();
258 debug!(high_water_mark, "Replaying WAL segments");
259 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
260 let mut mutations = Vec::new();
261
262 let mut paths: Vec<_> = metas.into_iter().map(|m| m.location).collect();
264 paths.sort(); let mut segments_replayed = 0;
267
268 for path in paths {
269 if let Some(lsn) = parse_lsn_from_filename(&path)
271 && lsn <= high_water_mark
272 {
273 continue; }
275
276 let get_result = get_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await?;
278 let bytes = get_result.bytes().await?;
279 if bytes.is_empty() {
280 continue;
281 }
282
283 let segment: WalSegment = serde_json::from_slice(&bytes)?;
284 if segment.lsn > high_water_mark {
286 mutations.extend(segment.mutations);
287 segments_replayed += 1;
288 }
289 }
290
291 info!(
292 segments_replayed,
293 mutations_count = mutations.len(),
294 "WAL replay completed"
295 );
296 metrics::histogram!("uni_wal_replay_duration_seconds")
297 .record(start.elapsed().as_secs_f64());
298
299 Ok(mutations)
300 }
301
302 pub async fn replay(&self) -> Result<Vec<Mutation>> {
304 self.replay_since(0).await
305 }
306
307 #[instrument(skip(self), level = "info")]
310 pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
311 info!(high_water_mark, "Truncating WAL segments");
312 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
313
314 let mut deleted_count = 0;
315 for meta in metas {
316 let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
318 lsn <= high_water_mark
319 } else {
320 warn!(
322 path = %meta.location,
323 "WAL filename doesn't match expected format, downloading segment"
324 );
325 let get_result =
326 get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
327 let bytes = get_result.bytes().await?;
328 if bytes.is_empty() {
329 true
331 } else {
332 let segment: WalSegment = serde_json::from_slice(&bytes)?;
333 segment.lsn <= high_water_mark
334 }
335 };
336
337 if should_delete {
338 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
339 deleted_count += 1;
340 }
341 }
342 info!(deleted_count, "WAL truncation completed");
343 Ok(())
344 }
345
346 pub async fn has_segments(&self) -> Result<bool> {
348 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
349 Ok(!metas.is_empty())
350 }
351
352 pub async fn truncate(&self) -> Result<()> {
353 info!("Truncating all WAL segments");
354 let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
355
356 let mut deleted_count = 0;
357 for meta in metas {
358 delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
359 deleted_count += 1;
360 }
361 info!(deleted_count, "Full WAL truncation completed");
362 Ok(())
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use object_store::ObjectStoreExt;
370 use object_store::local::LocalFileSystem;
371 use std::collections::HashMap;
372 use tempfile::tempdir;
373
374 #[tokio::test]
375 async fn test_wal_append_replay() -> Result<()> {
376 let dir = tempdir()?;
377 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
378 let prefix = Path::from("wal");
379
380 let wal = WriteAheadLog::new(store, prefix);
381
382 let mutation = Mutation::InsertVertex {
383 vid: Vid::new(1),
384 properties: HashMap::new(),
385 labels: vec![],
386 };
387
388 wal.append(&mutation.clone())?;
389 wal.flush().await?;
390
391 let mutations = wal.replay().await?;
392 assert_eq!(mutations.len(), 1);
393 if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
394 assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
395 } else {
396 panic!("Wrong mutation type");
397 }
398
399 wal.truncate().await?;
400 let mutations2 = wal.replay().await?;
401 assert_eq!(mutations2.len(), 0);
402
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn test_lsn_monotonicity() -> Result<()> {
408 let dir = tempdir()?;
410 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
411 let prefix = Path::from("wal");
412
413 let wal = WriteAheadLog::new(store, prefix);
414
415 let mutation1 = Mutation::InsertVertex {
416 vid: Vid::new(1),
417 properties: HashMap::new(),
418 labels: vec![],
419 };
420 let mutation2 = Mutation::InsertVertex {
421 vid: Vid::new(2),
422 properties: HashMap::new(),
423 labels: vec![],
424 };
425 let mutation3 = Mutation::InsertVertex {
426 vid: Vid::new(3),
427 properties: HashMap::new(),
428 labels: vec![],
429 };
430
431 wal.append(&mutation1)?;
433 let lsn1 = wal.flush().await?;
434
435 wal.append(&mutation2)?;
437 let lsn2 = wal.flush().await?;
438
439 wal.append(&mutation3)?;
441 let lsn3 = wal.flush().await?;
442
443 assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
445 assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
446
447 assert_eq!(lsn2, lsn1 + 1);
449 assert_eq!(lsn3, lsn2 + 1);
450
451 Ok(())
452 }
453
454 #[test]
455 fn test_parse_lsn_from_filename() {
456 let path = Path::from("00000000000000000042_a1b2c3d4.wal");
458 assert_eq!(parse_lsn_from_filename(&path), Some(42));
459
460 let path = Path::from("00000000000000001234_e5f6a7b8.wal");
461 assert_eq!(parse_lsn_from_filename(&path), Some(1234));
462
463 let path = Path::from("00000000000000000001_xyz.wal");
465 assert_eq!(parse_lsn_from_filename(&path), Some(1));
466
467 let path = Path::from("12345678901234567890_uuid.wal");
469 assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
470
471 let path = Path::from("invalid.wal");
473 assert_eq!(parse_lsn_from_filename(&path), None);
474
475 let path = Path::from("123.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
477
478 let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); assert_eq!(parse_lsn_from_filename(&path), None);
480
481 let path = Path::from("00000000000000000100.wal");
483 assert_eq!(parse_lsn_from_filename(&path), Some(100));
484
485 let path = Path::from("");
487 assert_eq!(parse_lsn_from_filename(&path), None);
488 }
489
490 #[tokio::test]
493 async fn test_find_max_lsn_scalability() -> Result<()> {
494 let dir = tempdir()?;
495 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
496 let prefix = Path::from("wal");
497
498 let wal = WriteAheadLog::new(store, prefix);
499
500 for i in 1..=100 {
502 let mutation = Mutation::InsertVertex {
503 vid: Vid::new(i),
504 properties: HashMap::new(),
505 labels: vec![],
506 };
507 wal.append(&mutation)?;
508 wal.flush().await?;
509 }
510
511 let start = std::time::Instant::now();
513 let max_lsn = wal.find_max_lsn().await?;
514 let duration = start.elapsed();
515
516 assert_eq!(max_lsn, 100, "Max LSN should be 100");
518
519 assert!(
521 duration.as_millis() < 1000,
522 "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
523 duration.as_millis()
524 );
525
526 Ok(())
527 }
528
529 #[tokio::test]
531 async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
532 let dir = tempdir()?;
533 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
534 let prefix = Path::from("wal");
535
536 let wal = WriteAheadLog::new(store.clone(), prefix.clone());
537
538 wal.append(&Mutation::InsertVertex {
540 vid: Vid::new(1),
541 properties: HashMap::new(),
542 labels: vec![],
543 })?;
544 let lsn1 = wal.flush().await?;
545 assert_eq!(lsn1, 1);
546
547 wal.append(&Mutation::InsertVertex {
549 vid: Vid::new(2),
550 properties: HashMap::new(),
551 labels: vec![],
552 })?;
553 let lsn2 = wal.flush().await?;
554 assert_eq!(lsn2, 2);
555
556 wal.append(&Mutation::InsertVertex {
563 vid: Vid::new(3),
564 properties: HashMap::new(),
565 labels: vec![],
566 })?;
567
568 wal.append(&Mutation::InsertVertex {
570 vid: Vid::new(4),
571 properties: HashMap::new(),
572 labels: vec![],
573 })?;
574 let lsn4 = wal.flush().await?;
575
576 assert_eq!(lsn4, 3, "LSN should increment monotonically");
578
579 let mutations = wal.replay().await?;
581 assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
582
583 Ok(())
584 }
585
586 #[tokio::test]
588 async fn test_lsn_watermark_no_reuse() -> Result<()> {
589 let dir = tempdir()?;
590 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
591 let prefix = Path::from("wal");
592
593 let wal = WriteAheadLog::new(store, prefix);
594
595 let mut seen_lsns = std::collections::HashSet::new();
597
598 for i in 1..=50 {
600 wal.append(&Mutation::InsertVertex {
601 vid: Vid::new(i),
602 properties: HashMap::new(),
603 labels: vec![],
604 })?;
605 let lsn = wal.flush().await?;
606
607 assert!(
609 !seen_lsns.contains(&lsn),
610 "LSN {} was reused! This violates monotonicity.",
611 lsn
612 );
613 seen_lsns.insert(lsn);
614
615 assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
617 }
618
619 Ok(())
620 }
621
622 #[tokio::test]
625 async fn test_truncate_scalability() -> Result<()> {
626 let dir = tempdir()?;
627 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
628 let prefix = Path::from("wal");
629
630 let wal = WriteAheadLog::new(store, prefix);
631
632 for i in 1..=100 {
634 let mutation = Mutation::InsertVertex {
635 vid: Vid::new(i),
636 properties: HashMap::new(),
637 labels: vec![],
638 };
639 wal.append(&mutation)?;
640 wal.flush().await?;
641 }
642
643 let start = std::time::Instant::now();
645 wal.truncate_before(50).await?;
646 let duration = start.elapsed();
647
648 let mutations = wal.replay().await?;
650 assert_eq!(
651 mutations.len(),
652 50,
653 "Should have 50 mutations remaining (51-100)"
654 );
655
656 assert!(
658 duration.as_millis() < 1000,
659 "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
660 duration.as_millis()
661 );
662
663 Ok(())
664 }
665
666 #[tokio::test]
668 async fn test_replay_since_skips_old_segments() -> Result<()> {
669 let dir = tempdir()?;
670 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
671 let prefix = Path::from("wal");
672
673 let wal = WriteAheadLog::new(store, prefix);
674
675 for i in 1..=100 {
677 let mutation = Mutation::InsertVertex {
678 vid: Vid::new(i),
679 properties: HashMap::new(),
680 labels: vec![],
681 };
682 wal.append(&mutation)?;
683 wal.flush().await?;
684 }
685
686 let start = std::time::Instant::now();
688 let mutations = wal.replay_since(90).await?;
689 let duration = start.elapsed();
690
691 assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
693
694 assert!(
696 duration.as_millis() < 500,
697 "replay_since took {}ms, expected < 500ms (should skip by filename)",
698 duration.as_millis()
699 );
700
701 Ok(())
702 }
703
704 #[tokio::test]
706 async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
707 let dir = tempdir()?;
708 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
709 let prefix = Path::from("wal");
710
711 let wal = Arc::new(WriteAheadLog::new(store, prefix));
712
713 wal.append(&Mutation::InsertVertex {
715 vid: Vid::new(42),
716 properties: {
717 let mut props = HashMap::new();
718 props.insert(
719 "name".to_string(),
720 uni_common::Value::String("Alice".to_string()),
721 );
722 props
723 },
724 labels: vec!["Person".to_string(), "User".to_string()],
725 })?;
726
727 wal.flush().await?;
729
730 let mutations = wal.replay().await?;
732 assert_eq!(mutations.len(), 1);
733
734 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
736 assert_eq!(vid.as_u64(), 42);
737 assert_eq!(labels.len(), 2);
738 assert!(labels.contains(&"Person".to_string()));
739 assert!(labels.contains(&"User".to_string()));
740 } else {
741 panic!("Expected InsertVertex mutation");
742 }
743
744 Ok(())
745 }
746
747 #[tokio::test]
749 async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
750 let dir = tempdir()?;
751 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
752 let prefix = Path::from("wal");
753
754 let wal = Arc::new(WriteAheadLog::new(store, prefix));
755
756 wal.append(&Mutation::DeleteVertex {
758 vid: Vid::new(99),
759 labels: vec!["Person".to_string(), "Admin".to_string()],
760 })?;
761
762 wal.flush().await?;
764
765 let mutations = wal.replay().await?;
767 assert_eq!(mutations.len(), 1);
768
769 if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
771 assert_eq!(vid.as_u64(), 99);
772 assert_eq!(labels.len(), 2);
773 assert!(labels.contains(&"Person".to_string()));
774 assert!(labels.contains(&"Admin".to_string()));
775 } else {
776 panic!("Expected DeleteVertex mutation");
777 }
778
779 Ok(())
780 }
781
782 #[tokio::test]
784 async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
785 let dir = tempdir()?;
786 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
787 let prefix = Path::from("wal");
788
789 let wal = Arc::new(WriteAheadLog::new(store, prefix));
790
791 wal.append(&Mutation::InsertEdge {
793 src_vid: Vid::new(1),
794 dst_vid: Vid::new(2),
795 edge_type: 100,
796 eid: Eid::new(500),
797 version: 1,
798 properties: {
799 let mut props = HashMap::new();
800 props.insert("since".to_string(), uni_common::Value::Int(2020));
801 props
802 },
803 edge_type_name: Some("KNOWS".to_string()),
804 })?;
805
806 wal.flush().await?;
808
809 let mutations = wal.replay().await?;
811 assert_eq!(mutations.len(), 1);
812
813 if let Mutation::InsertEdge {
815 eid,
816 edge_type_name,
817 ..
818 } = &mutations[0]
819 {
820 assert_eq!(eid.as_u64(), 500);
821 assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
822 } else {
823 panic!("Expected InsertEdge mutation");
824 }
825
826 Ok(())
827 }
828
829 #[tokio::test]
831 async fn test_wal_backward_compatibility_labels() -> Result<()> {
832 let dir = tempdir()?;
833 let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
834 let prefix = Path::from("wal");
835
836 let old_format_json = r#"{
838 "lsn": 1,
839 "mutations": [
840 {
841 "InsertVertex": {
842 "vid": 123,
843 "properties": {}
844 }
845 }
846 ]
847 }"#;
848
849 let path = prefix.clone().join("00000000000000000001_test.wal");
850 store.put(&path, old_format_json.into()).await?;
851
852 let wal = WriteAheadLog::new(store, prefix);
854 let mutations = wal.replay().await?;
855
856 assert_eq!(mutations.len(), 1);
858 if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
859 assert_eq!(vid.as_u64(), 123);
860 assert_eq!(
861 labels.len(),
862 0,
863 "Old format should deserialize with empty labels"
864 );
865 } else {
866 panic!("Expected InsertVertex mutation");
867 }
868
869 Ok(())
870 }
871}