Skip to main content

uni_store/runtime/
wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::store_utils::{
5    DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19/// Parse LSN from WAL segment filename format `{:020}_{uuid}.wal`.
20/// Returns None if the filename doesn't match the expected format.
21fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22    let filename = path.filename()?;
23    if filename.len() < 20 {
24        return None;
25    }
26    filename[..20].parse::<u64>().ok()
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub enum Mutation {
31    InsertEdge {
32        src_vid: Vid,
33        dst_vid: Vid,
34        edge_type: u32,
35        eid: Eid,
36        version: u64,
37        properties: Properties,
38        /// Edge type name for metadata recovery. Optional for backward compatibility.
39        #[serde(default)]
40        edge_type_name: Option<String>,
41    },
42    DeleteEdge {
43        eid: Eid,
44        src_vid: Vid,
45        dst_vid: Vid,
46        edge_type: u32,
47        version: u64,
48    },
49    InsertVertex {
50        vid: Vid,
51        properties: Properties,
52        #[serde(default)]
53        labels: Vec<String>,
54    },
55    DeleteVertex {
56        vid: Vid,
57        #[serde(default)]
58        labels: Vec<String>,
59    },
60}
61
62/// WAL segment with LSN for idempotent recovery
63#[derive(Serialize, Deserialize, Debug, Clone)]
64pub struct WalSegment {
65    /// Log Sequence Number - monotonically increasing per segment
66    pub lsn: u64,
67    /// Mutations in this segment
68    pub mutations: Vec<Mutation>,
69}
70
71pub struct WriteAheadLog {
72    store: Arc<dyn ObjectStore>,
73    prefix: Path,
74    state: Mutex<WalState>,
75}
76
77struct WalState {
78    buffer: Vec<Mutation>,
79    /// Current LSN counter (incremented per flush)
80    next_lsn: u64,
81    /// Highest LSN successfully flushed
82    flushed_lsn: u64,
83}
84
85impl WriteAheadLog {
86    pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
87        Self {
88            store,
89            prefix,
90            state: Mutex::new(WalState {
91                buffer: Vec::new(),
92                next_lsn: 1, // Start at 1 so 0 means "no WAL"
93                flushed_lsn: 0,
94            }),
95        }
96    }
97
98    /// Initialize WAL state from existing segments (called on startup)
99    pub async fn initialize(&self) -> Result<u64> {
100        let max_lsn = self.find_max_lsn().await?;
101        {
102            let mut state = acquire_mutex(&self.state, "wal_state")?;
103            state.next_lsn = max_lsn + 1;
104            state.flushed_lsn = max_lsn;
105        }
106        Ok(max_lsn)
107    }
108
109    /// Find the maximum LSN in existing WAL segments by parsing filenames.
110    /// Only downloads segments if filename parsing fails (fallback).
111    async fn find_max_lsn(&self) -> Result<u64> {
112        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
113        let mut max_lsn: u64 = 0;
114
115        for meta in metas {
116            // Try to parse LSN from filename first (fast path)
117            if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
118                max_lsn = max_lsn.max(lsn);
119            } else {
120                // Fallback: download and parse segment if filename doesn't match expected format
121                warn!(
122                    path = %meta.location,
123                    "WAL filename doesn't match expected format, downloading segment"
124                );
125                let get_result =
126                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
127                let bytes = get_result.bytes().await?;
128                if bytes.is_empty() {
129                    continue;
130                }
131                let segment: WalSegment = serde_json::from_slice(&bytes)?;
132                max_lsn = max_lsn.max(segment.lsn);
133            }
134        }
135
136        Ok(max_lsn)
137    }
138
139    #[instrument(skip(self, mutation), level = "trace")]
140    pub fn append(&self, mutation: &Mutation) -> Result<()> {
141        let mut state = acquire_mutex(&self.state, "wal_state")?;
142        state.buffer.push(mutation.clone());
143        metrics::counter!("uni_wal_entries_total").increment(1);
144        Ok(())
145    }
146
147    /// Flush buffered mutations to a WAL segment. Returns the LSN of the flushed segment.
148    #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
149    pub async fn flush(&self) -> Result<u64> {
150        let start = std::time::Instant::now();
151        let (batch, lsn) = {
152            let mut state = acquire_mutex(&self.state, "wal_state")?;
153            if state.buffer.is_empty() {
154                return Ok(state.flushed_lsn);
155            }
156            let lsn = state.next_lsn;
157            state.next_lsn += 1;
158            (std::mem::take(&mut state.buffer), lsn)
159        };
160
161        tracing::Span::current().record("lsn", lsn);
162        tracing::Span::current().record("mutations_count", batch.len());
163
164        // Create segment with LSN
165        let segment = WalSegment {
166            lsn,
167            mutations: batch.clone(),
168        };
169
170        // Serialize segment; restore buffer on failure
171        let json = match serde_json::to_vec(&segment) {
172            Ok(j) => j,
173            Err(e) => {
174                warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
175                // Restore buffer on serialization failure
176                let mut state = acquire_mutex(&self.state, "wal_state")?;
177                let new_mutations = std::mem::take(&mut state.buffer);
178                state.buffer = batch;
179                state.buffer.extend(new_mutations);
180                // Don't roll back LSN - gap is harmless and maintains monotonicity
181                return Err(e.into());
182            }
183        };
184        tracing::Span::current().record("size_bytes", json.len());
185        metrics::counter!("uni_wal_bytes_written_total").increment(json.len() as u64);
186
187        // Include LSN in filename for easy ordering and identification
188        let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
189        let path = self.prefix.child(filename);
190
191        // Attempt to write; restore buffer on failure to prevent data loss
192        if let Err(e) = put_with_timeout(&self.store, &path, json.into(), DEFAULT_TIMEOUT).await {
193            warn!(
194                lsn,
195                error = %e,
196                "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
197            );
198            // Restore buffer so data isn't lost on transient failures
199            let mut state = acquire_mutex(&self.state, "wal_state")?;
200            // Prepend the failed batch to any new mutations that arrived
201            let new_mutations = std::mem::take(&mut state.buffer);
202            state.buffer = batch;
203            state.buffer.extend(new_mutations);
204            // Don't roll back LSN - gap is harmless and maintains strict monotonicity
205            // All WAL consumers use `>` / `<=` comparisons, not equality checks
206            return Err(e);
207        }
208
209        // Update flushed LSN on success
210        {
211            let mut state = acquire_mutex(&self.state, "wal_state")?;
212            state.flushed_lsn = lsn;
213        }
214
215        let duration = start.elapsed();
216        metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
217        metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
218
219        if duration.as_millis() > 100 {
220            warn!(
221                lsn,
222                duration_ms = duration.as_millis(),
223                "Slow WAL flush detected"
224            );
225        } else {
226            debug!(
227                lsn,
228                duration_ms = duration.as_millis(),
229                "WAL flush completed"
230            );
231        }
232
233        Ok(lsn)
234    }
235
236    /// Get the highest LSN that has been flushed.
237    ///
238    /// # Errors
239    ///
240    /// Returns error if the WAL state lock is poisoned (see issue #18/#150).
241    pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
242        let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
243        Ok(guard.flushed_lsn)
244    }
245
246    /// Replay WAL segments with LSN > high_water_mark.
247    /// Returns mutations from segments that haven't been applied yet.
248    /// Optimized to skip downloading segments with LSN <= high_water_mark (parsed from filename).
249    #[instrument(skip(self), level = "debug")]
250    pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
251        let start = std::time::Instant::now();
252        debug!(high_water_mark, "Replaying WAL segments");
253        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
254        let mut mutations = Vec::new();
255
256        // Collect paths and sort by LSN (filename prefix)
257        let mut paths: Vec<_> = metas.into_iter().map(|m| m.location).collect();
258        paths.sort(); // Lexicographical sort works for LSN prefix (zero-padded)
259
260        let mut segments_replayed = 0;
261
262        for path in paths {
263            // Skip downloading segments we can identify as below high_water_mark from filename
264            if let Some(lsn) = parse_lsn_from_filename(&path)
265                && lsn <= high_water_mark
266            {
267                continue; // Skip this segment without downloading
268            }
269
270            // Download segment (either LSN > high_water_mark or filename unparseable)
271            let get_result = get_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await?;
272            let bytes = get_result.bytes().await?;
273            if bytes.is_empty() {
274                continue;
275            }
276
277            let segment: WalSegment = serde_json::from_slice(&bytes)?;
278            // Double-check LSN from segment content (handles fallback case)
279            if segment.lsn > high_water_mark {
280                mutations.extend(segment.mutations);
281                segments_replayed += 1;
282            }
283        }
284
285        info!(
286            segments_replayed,
287            mutations_count = mutations.len(),
288            "WAL replay completed"
289        );
290        metrics::histogram!("uni_wal_replay_duration_seconds")
291            .record(start.elapsed().as_secs_f64());
292
293        Ok(mutations)
294    }
295
296    /// Replay all WAL segments.
297    pub async fn replay(&self) -> Result<Vec<Mutation>> {
298        self.replay_since(0).await
299    }
300
301    /// Deletes WAL segments with LSN <= high_water_mark by parsing filenames.
302    /// Only downloads segments if filename parsing fails (fallback).
303    #[instrument(skip(self), level = "info")]
304    pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
305        info!(high_water_mark, "Truncating WAL segments");
306        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
307
308        let mut deleted_count = 0;
309        for meta in metas {
310            // Try to parse LSN from filename first (fast path)
311            let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
312                lsn <= high_water_mark
313            } else {
314                // Fallback: download and parse segment if filename doesn't match expected format
315                warn!(
316                    path = %meta.location,
317                    "WAL filename doesn't match expected format, downloading segment"
318                );
319                let get_result =
320                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
321                let bytes = get_result.bytes().await?;
322                if bytes.is_empty() {
323                    // Empty segments should be deleted
324                    true
325                } else {
326                    let segment: WalSegment = serde_json::from_slice(&bytes)?;
327                    segment.lsn <= high_water_mark
328                }
329            };
330
331            if should_delete {
332                delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
333                deleted_count += 1;
334            }
335        }
336        info!(deleted_count, "WAL truncation completed");
337        Ok(())
338    }
339
340    /// Check if any WAL segments exist (for detecting database with lost manifest).
341    pub async fn has_segments(&self) -> Result<bool> {
342        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
343        Ok(!metas.is_empty())
344    }
345
346    pub async fn truncate(&self) -> Result<()> {
347        info!("Truncating all WAL segments");
348        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
349
350        let mut deleted_count = 0;
351        for meta in metas {
352            delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
353            deleted_count += 1;
354        }
355        info!(deleted_count, "Full WAL truncation completed");
356        Ok(())
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use object_store::local::LocalFileSystem;
364    use std::collections::HashMap;
365    use tempfile::tempdir;
366
367    #[tokio::test]
368    async fn test_wal_append_replay() -> Result<()> {
369        let dir = tempdir()?;
370        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
371        let prefix = Path::from("wal");
372
373        let wal = WriteAheadLog::new(store, prefix);
374
375        let mutation = Mutation::InsertVertex {
376            vid: Vid::new(1),
377            properties: HashMap::new(),
378            labels: vec![],
379        };
380
381        wal.append(&mutation.clone())?;
382        wal.flush().await?;
383
384        let mutations = wal.replay().await?;
385        assert_eq!(mutations.len(), 1);
386        if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
387            assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
388        } else {
389            panic!("Wrong mutation type");
390        }
391
392        wal.truncate().await?;
393        let mutations2 = wal.replay().await?;
394        assert_eq!(mutations2.len(), 0);
395
396        Ok(())
397    }
398
399    #[tokio::test]
400    async fn test_lsn_monotonicity() -> Result<()> {
401        // Verify that LSN is strictly monotonic even across multiple flushes
402        let dir = tempdir()?;
403        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
404        let prefix = Path::from("wal");
405
406        let wal = WriteAheadLog::new(store, prefix);
407
408        let mutation1 = Mutation::InsertVertex {
409            vid: Vid::new(1),
410            properties: HashMap::new(),
411            labels: vec![],
412        };
413        let mutation2 = Mutation::InsertVertex {
414            vid: Vid::new(2),
415            properties: HashMap::new(),
416            labels: vec![],
417        };
418        let mutation3 = Mutation::InsertVertex {
419            vid: Vid::new(3),
420            properties: HashMap::new(),
421            labels: vec![],
422        };
423
424        // First flush
425        wal.append(&mutation1)?;
426        let lsn1 = wal.flush().await?;
427
428        // Second flush
429        wal.append(&mutation2)?;
430        let lsn2 = wal.flush().await?;
431
432        // Third flush
433        wal.append(&mutation3)?;
434        let lsn3 = wal.flush().await?;
435
436        // Verify strict monotonicity
437        assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
438        assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
439
440        // Verify LSNs are consecutive
441        assert_eq!(lsn2, lsn1 + 1);
442        assert_eq!(lsn3, lsn2 + 1);
443
444        Ok(())
445    }
446
447    #[test]
448    fn test_parse_lsn_from_filename() {
449        // Standard format
450        let path = Path::from("00000000000000000042_a1b2c3d4.wal");
451        assert_eq!(parse_lsn_from_filename(&path), Some(42));
452
453        let path = Path::from("00000000000000001234_e5f6a7b8.wal");
454        assert_eq!(parse_lsn_from_filename(&path), Some(1234));
455
456        // Leading zeros
457        let path = Path::from("00000000000000000001_xyz.wal");
458        assert_eq!(parse_lsn_from_filename(&path), Some(1));
459
460        // Large LSN (within u64 range)
461        let path = Path::from("12345678901234567890_uuid.wal");
462        assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
463
464        // Invalid formats
465        let path = Path::from("invalid.wal");
466        assert_eq!(parse_lsn_from_filename(&path), None);
467
468        let path = Path::from("123.wal"); // Too short
469        assert_eq!(parse_lsn_from_filename(&path), None);
470
471        let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); // Non-numeric
472        assert_eq!(parse_lsn_from_filename(&path), None);
473
474        // Missing underscore separator (but first 20 chars are valid LSN)
475        let path = Path::from("00000000000000000100.wal");
476        assert_eq!(parse_lsn_from_filename(&path), Some(100));
477
478        // Empty path
479        let path = Path::from("");
480        assert_eq!(parse_lsn_from_filename(&path), None);
481    }
482
483    /// Test for Issue #6: WAL initialization should parse LSN from filenames
484    /// without downloading all segments
485    #[tokio::test]
486    async fn test_find_max_lsn_scalability() -> Result<()> {
487        let dir = tempdir()?;
488        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
489        let prefix = Path::from("wal");
490
491        let wal = WriteAheadLog::new(store, prefix);
492
493        // Create 100 WAL segments with increasing LSNs
494        for i in 1..=100 {
495            let mutation = Mutation::InsertVertex {
496                vid: Vid::new(i),
497                properties: HashMap::new(),
498                labels: vec![],
499            };
500            wal.append(&mutation)?;
501            wal.flush().await?;
502        }
503
504        // Measure initialization time - should be fast (parsing filenames, not downloading)
505        let start = std::time::Instant::now();
506        let max_lsn = wal.find_max_lsn().await?;
507        let duration = start.elapsed();
508
509        // Verify correctness
510        assert_eq!(max_lsn, 100, "Max LSN should be 100");
511
512        // Verify performance - should complete quickly even with many segments
513        assert!(
514            duration.as_millis() < 1000,
515            "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
516            duration.as_millis()
517        );
518
519        Ok(())
520    }
521
522    /// Test for Issue #11: LSN gaps are preserved on flush failures (watermark pattern)
523    #[tokio::test]
524    async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
525        let dir = tempdir()?;
526        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
527        let prefix = Path::from("wal");
528
529        let wal = WriteAheadLog::new(store.clone(), prefix.clone());
530
531        // Flush mutation 1 successfully
532        wal.append(&Mutation::InsertVertex {
533            vid: Vid::new(1),
534            properties: HashMap::new(),
535            labels: vec![],
536        })?;
537        let lsn1 = wal.flush().await?;
538        assert_eq!(lsn1, 1);
539
540        // Flush mutation 2 successfully
541        wal.append(&Mutation::InsertVertex {
542            vid: Vid::new(2),
543            properties: HashMap::new(),
544            labels: vec![],
545        })?;
546        let lsn2 = wal.flush().await?;
547        assert_eq!(lsn2, 2);
548
549        // Simulate a scenario where flush might fail by creating a read-only store
550        // (In real scenario, network failures would cause this)
551        // For now, verify that LSN assignment happens BEFORE write attempt
552        // by checking that next_lsn increments even if we don't flush
553
554        // Append mutation 3 but DON'T flush
555        wal.append(&Mutation::InsertVertex {
556            vid: Vid::new(3),
557            properties: HashMap::new(),
558            labels: vec![],
559        })?;
560
561        // Now flush mutation 4
562        wal.append(&Mutation::InsertVertex {
563            vid: Vid::new(4),
564            properties: HashMap::new(),
565            labels: vec![],
566        })?;
567        let lsn4 = wal.flush().await?;
568
569        // LSN should be 3 (both mutations 3 and 4 flushed together)
570        assert_eq!(lsn4, 3, "LSN should increment monotonically");
571
572        // Verify all mutations can be replayed
573        let mutations = wal.replay().await?;
574        assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
575
576        Ok(())
577    }
578
579    /// Test for Issue #11: Verify LSN watermark pattern - no LSN reuse
580    #[tokio::test]
581    async fn test_lsn_watermark_no_reuse() -> Result<()> {
582        let dir = tempdir()?;
583        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
584        let prefix = Path::from("wal");
585
586        let wal = WriteAheadLog::new(store, prefix);
587
588        // Track all LSNs we've seen
589        let mut seen_lsns = std::collections::HashSet::new();
590
591        // Perform 50 flushes
592        for i in 1..=50 {
593            wal.append(&Mutation::InsertVertex {
594                vid: Vid::new(i),
595                properties: HashMap::new(),
596                labels: vec![],
597            })?;
598            let lsn = wal.flush().await?;
599
600            // Verify no LSN reuse
601            assert!(
602                !seen_lsns.contains(&lsn),
603                "LSN {} was reused! This violates monotonicity.",
604                lsn
605            );
606            seen_lsns.insert(lsn);
607
608            // Verify LSN is strictly increasing
609            assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
610        }
611
612        Ok(())
613    }
614
615    /// Test for Issue #33: WAL truncation should parse LSN from filenames
616    /// without downloading all segments
617    #[tokio::test]
618    async fn test_truncate_scalability() -> Result<()> {
619        let dir = tempdir()?;
620        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
621        let prefix = Path::from("wal");
622
623        let wal = WriteAheadLog::new(store, prefix);
624
625        // Create 100 WAL segments
626        for i in 1..=100 {
627            let mutation = Mutation::InsertVertex {
628                vid: Vid::new(i),
629                properties: HashMap::new(),
630                labels: vec![],
631            };
632            wal.append(&mutation)?;
633            wal.flush().await?;
634        }
635
636        // Truncate segments with LSN <= 50
637        let start = std::time::Instant::now();
638        wal.truncate_before(50).await?;
639        let duration = start.elapsed();
640
641        // Verify only segments 51-100 remain
642        let mutations = wal.replay().await?;
643        assert_eq!(
644            mutations.len(),
645            50,
646            "Should have 50 mutations remaining (51-100)"
647        );
648
649        // Verify performance - should be fast (filename parsing, not downloading)
650        assert!(
651            duration.as_millis() < 1000,
652            "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
653            duration.as_millis()
654        );
655
656        Ok(())
657    }
658
659    /// Test for Issue #6: replay_since should skip old segments by filename
660    #[tokio::test]
661    async fn test_replay_since_skips_old_segments() -> Result<()> {
662        let dir = tempdir()?;
663        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
664        let prefix = Path::from("wal");
665
666        let wal = WriteAheadLog::new(store, prefix);
667
668        // Create 100 WAL segments
669        for i in 1..=100 {
670            let mutation = Mutation::InsertVertex {
671                vid: Vid::new(i),
672                properties: HashMap::new(),
673                labels: vec![],
674            };
675            wal.append(&mutation)?;
676            wal.flush().await?;
677        }
678
679        // Replay only segments with LSN > 90 (should skip 90 segments by filename)
680        let start = std::time::Instant::now();
681        let mutations = wal.replay_since(90).await?;
682        let duration = start.elapsed();
683
684        // Verify only 10 mutations returned (LSN 91-100)
685        assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
686
687        // Verify performance - should be fast (skips 90 segments by filename)
688        assert!(
689            duration.as_millis() < 500,
690            "replay_since took {}ms, expected < 500ms (should skip by filename)",
691            duration.as_millis()
692        );
693
694        Ok(())
695    }
696
697    /// Test for Issue #23: Vertex labels preserved through WAL replay
698    #[tokio::test]
699    async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
700        let dir = tempdir()?;
701        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
702        let prefix = Path::from("wal");
703
704        let wal = Arc::new(WriteAheadLog::new(store, prefix));
705
706        // Append InsertVertex with labels
707        wal.append(&Mutation::InsertVertex {
708            vid: Vid::new(42),
709            properties: {
710                let mut props = HashMap::new();
711                props.insert(
712                    "name".to_string(),
713                    uni_common::Value::String("Alice".to_string()),
714                );
715                props
716            },
717            labels: vec!["Person".to_string(), "User".to_string()],
718        })?;
719
720        // Flush to WAL
721        wal.flush().await?;
722
723        // Replay mutations
724        let mutations = wal.replay().await?;
725        assert_eq!(mutations.len(), 1);
726
727        // Verify labels are preserved
728        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
729            assert_eq!(vid.as_u64(), 42);
730            assert_eq!(labels.len(), 2);
731            assert!(labels.contains(&"Person".to_string()));
732            assert!(labels.contains(&"User".to_string()));
733        } else {
734            panic!("Expected InsertVertex mutation");
735        }
736
737        Ok(())
738    }
739
740    /// Test for Issue #23: DeleteVertex labels preserved through WAL replay
741    #[tokio::test]
742    async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
743        let dir = tempdir()?;
744        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
745        let prefix = Path::from("wal");
746
747        let wal = Arc::new(WriteAheadLog::new(store, prefix));
748
749        // Append DeleteVertex with labels (needed for tombstone flushing - Issue #76)
750        wal.append(&Mutation::DeleteVertex {
751            vid: Vid::new(99),
752            labels: vec!["Person".to_string(), "Admin".to_string()],
753        })?;
754
755        // Flush to WAL
756        wal.flush().await?;
757
758        // Replay mutations
759        let mutations = wal.replay().await?;
760        assert_eq!(mutations.len(), 1);
761
762        // Verify labels are preserved in DeleteVertex
763        if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
764            assert_eq!(vid.as_u64(), 99);
765            assert_eq!(labels.len(), 2);
766            assert!(labels.contains(&"Person".to_string()));
767            assert!(labels.contains(&"Admin".to_string()));
768        } else {
769            panic!("Expected DeleteVertex mutation");
770        }
771
772        Ok(())
773    }
774
775    /// Test for Issue #28: Edge type name preserved through WAL replay
776    #[tokio::test]
777    async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
778        let dir = tempdir()?;
779        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
780        let prefix = Path::from("wal");
781
782        let wal = Arc::new(WriteAheadLog::new(store, prefix));
783
784        // Append InsertEdge with edge_type_name
785        wal.append(&Mutation::InsertEdge {
786            src_vid: Vid::new(1),
787            dst_vid: Vid::new(2),
788            edge_type: 100,
789            eid: Eid::new(500),
790            version: 1,
791            properties: {
792                let mut props = HashMap::new();
793                props.insert("since".to_string(), uni_common::Value::Int(2020));
794                props
795            },
796            edge_type_name: Some("KNOWS".to_string()),
797        })?;
798
799        // Flush to WAL
800        wal.flush().await?;
801
802        // Replay mutations
803        let mutations = wal.replay().await?;
804        assert_eq!(mutations.len(), 1);
805
806        // Verify edge_type_name is preserved
807        if let Mutation::InsertEdge {
808            eid,
809            edge_type_name,
810            ..
811        } = &mutations[0]
812        {
813            assert_eq!(eid.as_u64(), 500);
814            assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
815        } else {
816            panic!("Expected InsertEdge mutation");
817        }
818
819        Ok(())
820    }
821
822    /// Test for Issue #23: Backward compatibility with old WAL segments (no labels)
823    #[tokio::test]
824    async fn test_wal_backward_compatibility_labels() -> Result<()> {
825        let dir = tempdir()?;
826        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
827        let prefix = Path::from("wal");
828
829        // Manually create a WAL segment with old format (no labels field)
830        let old_format_json = r#"{
831            "lsn": 1,
832            "mutations": [
833                {
834                    "InsertVertex": {
835                        "vid": 123,
836                        "properties": {}
837                    }
838                }
839            ]
840        }"#;
841
842        let path = prefix.child("00000000000000000001_test.wal");
843        store.put(&path, old_format_json.into()).await?;
844
845        // Create WAL and replay
846        let wal = WriteAheadLog::new(store, prefix);
847        let mutations = wal.replay().await?;
848
849        // Verify old format deserializes with empty labels (via #[serde(default)])
850        assert_eq!(mutations.len(), 1);
851        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
852            assert_eq!(vid.as_u64(), 123);
853            assert_eq!(
854                labels.len(),
855                0,
856                "Old format should deserialize with empty labels"
857            );
858        } else {
859            panic!("Expected InsertVertex mutation");
860        }
861
862        Ok(())
863    }
864}