Skip to main content

uni_store/runtime/
wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::store_utils::{
5    DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19/// Parse LSN from WAL segment filename format `{:020}_{uuid}.wal`.
20/// Returns None if the filename doesn't match the expected format.
21fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22    let filename = path.filename()?;
23    if filename.len() < 20 {
24        return None;
25    }
26    filename[..20].parse::<u64>().ok()
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub enum Mutation {
31    InsertEdge {
32        src_vid: Vid,
33        dst_vid: Vid,
34        edge_type: u32,
35        eid: Eid,
36        version: u64,
37        properties: Properties,
38        /// Edge type name for metadata recovery. Optional for backward compatibility.
39        #[serde(default)]
40        edge_type_name: Option<String>,
41    },
42    DeleteEdge {
43        eid: Eid,
44        src_vid: Vid,
45        dst_vid: Vid,
46        edge_type: u32,
47        version: u64,
48    },
49    InsertVertex {
50        vid: Vid,
51        properties: Properties,
52        #[serde(default)]
53        labels: Vec<String>,
54    },
55    DeleteVertex {
56        vid: Vid,
57        #[serde(default)]
58        labels: Vec<String>,
59    },
60    /// Replaces a vertex's full label set (a `SET n:Label` / `REMOVE n:Label`
61    /// that touched no properties). Carries the complete resolved label set so
62    /// replay can REPLACE (removals included). Added after the original four
63    /// variants — externally-tagged serde_json, so old WAL segments (which never
64    /// contain it) deserialize unchanged.
65    SetVertexLabels { vid: Vid, labels: Vec<String> },
66}
67
68/// WAL segment with LSN for idempotent recovery
69#[derive(Serialize, Deserialize, Debug, Clone)]
70pub struct WalSegment {
71    /// Log Sequence Number - monotonically increasing per segment
72    pub lsn: u64,
73    /// Mutations in this segment
74    pub mutations: Vec<Mutation>,
75}
76
77pub struct WriteAheadLog {
78    store: Arc<dyn ObjectStore>,
79    prefix: Path,
80    state: Mutex<WalState>,
81}
82
83struct WalState {
84    buffer: Vec<Mutation>,
85    /// Current LSN counter (incremented per flush)
86    next_lsn: u64,
87    /// Highest LSN successfully flushed
88    flushed_lsn: u64,
89}
90
91impl WriteAheadLog {
92    pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
93        Self {
94            store,
95            prefix,
96            state: Mutex::new(WalState {
97                buffer: Vec::new(),
98                next_lsn: 1, // Start at 1 so 0 means "no WAL"
99                flushed_lsn: 0,
100            }),
101        }
102    }
103
104    /// Initialize WAL state from existing segments (called on startup)
105    pub async fn initialize(&self) -> Result<u64> {
106        let max_lsn = self.find_max_lsn().await?;
107        {
108            let mut state = acquire_mutex(&self.state, "wal_state")?;
109            state.next_lsn = max_lsn + 1;
110            state.flushed_lsn = max_lsn;
111        }
112        Ok(max_lsn)
113    }
114
115    /// Find the maximum LSN in existing WAL segments by parsing filenames.
116    /// Only downloads segments if filename parsing fails (fallback).
117    async fn find_max_lsn(&self) -> Result<u64> {
118        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
119        let mut max_lsn: u64 = 0;
120
121        for meta in metas {
122            // Try to parse LSN from filename first (fast path)
123            if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
124                max_lsn = max_lsn.max(lsn);
125            } else {
126                // Fallback: download and parse segment if filename doesn't match expected format
127                warn!(
128                    path = %meta.location,
129                    "WAL filename doesn't match expected format, downloading segment"
130                );
131                let get_result =
132                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
133                let bytes = get_result.bytes().await?;
134                if bytes.is_empty() {
135                    continue;
136                }
137                let segment: WalSegment = serde_json::from_slice(&bytes)?;
138                max_lsn = max_lsn.max(segment.lsn);
139            }
140        }
141
142        Ok(max_lsn)
143    }
144
145    #[instrument(skip(self, mutation), level = "trace")]
146    pub fn append(&self, mutation: &Mutation) -> Result<()> {
147        let mut state = acquire_mutex(&self.state, "wal_state")?;
148        state.buffer.push(mutation.clone());
149        metrics::counter!("uni_wal_entries_total").increment(1);
150        Ok(())
151    }
152
153    /// Flush buffered mutations to a WAL segment. Returns the LSN of the flushed segment.
154    #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
155    pub async fn flush(&self) -> Result<u64> {
156        let start = std::time::Instant::now();
157        let (batch, lsn) = {
158            let mut state = acquire_mutex(&self.state, "wal_state")?;
159            if state.buffer.is_empty() {
160                return Ok(state.flushed_lsn);
161            }
162            let lsn = state.next_lsn;
163            state.next_lsn += 1;
164            (std::mem::take(&mut state.buffer), lsn)
165        };
166
167        tracing::Span::current().record("lsn", lsn);
168        tracing::Span::current().record("mutations_count", batch.len());
169
170        // Create segment with LSN
171        let segment = WalSegment {
172            lsn,
173            mutations: batch.clone(),
174        };
175
176        // Serialize segment; restore buffer on failure
177        let json = match serde_json::to_vec(&segment) {
178            Ok(j) => j,
179            Err(e) => {
180                warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
181                // Restore buffer on serialization failure
182                let mut state = acquire_mutex(&self.state, "wal_state")?;
183                let new_mutations = std::mem::take(&mut state.buffer);
184                state.buffer = batch;
185                state.buffer.extend(new_mutations);
186                // Don't roll back LSN - gap is harmless and maintains monotonicity
187                return Err(e.into());
188            }
189        };
190        tracing::Span::current().record("size_bytes", json.len());
191        metrics::counter!("uni_wal_bytes_written_total").increment(json.len() as u64);
192
193        // Include LSN in filename for easy ordering and identification
194        let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
195        let path = self.prefix.clone().join(filename);
196
197        // Attempt to write; restore buffer on failure to prevent data loss
198        if let Err(e) = put_with_timeout(&self.store, &path, json.into(), DEFAULT_TIMEOUT).await {
199            warn!(
200                lsn,
201                error = %e,
202                "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
203            );
204            // Restore buffer so data isn't lost on transient failures
205            let mut state = acquire_mutex(&self.state, "wal_state")?;
206            // Prepend the failed batch to any new mutations that arrived
207            let new_mutations = std::mem::take(&mut state.buffer);
208            state.buffer = batch;
209            state.buffer.extend(new_mutations);
210            // Don't roll back LSN - gap is harmless and maintains strict monotonicity
211            // All WAL consumers use `>` / `<=` comparisons, not equality checks
212            return Err(e);
213        }
214
215        // Update flushed LSN on success
216        {
217            let mut state = acquire_mutex(&self.state, "wal_state")?;
218            state.flushed_lsn = lsn;
219        }
220
221        let duration = start.elapsed();
222        metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
223        metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
224
225        if duration.as_millis() > 100 {
226            warn!(
227                lsn,
228                duration_ms = duration.as_millis(),
229                "Slow WAL flush detected"
230            );
231        } else {
232            debug!(
233                lsn,
234                duration_ms = duration.as_millis(),
235                "WAL flush completed"
236            );
237        }
238
239        Ok(lsn)
240    }
241
242    /// Get the highest LSN that has been flushed.
243    ///
244    /// # Errors
245    ///
246    /// Returns error if the WAL state lock is poisoned (see issue #18/#150).
247    pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
248        let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
249        Ok(guard.flushed_lsn)
250    }
251
252    /// Replay WAL segments with LSN > high_water_mark.
253    /// Returns mutations from segments that haven't been applied yet.
254    /// Optimized to skip downloading segments with LSN <= high_water_mark (parsed from filename).
255    #[instrument(skip(self), level = "debug")]
256    pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
257        let start = std::time::Instant::now();
258        debug!(high_water_mark, "Replaying WAL segments");
259        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
260        let mut mutations = Vec::new();
261
262        // Collect paths and sort by LSN (filename prefix)
263        let mut paths: Vec<_> = metas.into_iter().map(|m| m.location).collect();
264        paths.sort(); // Lexicographical sort works for LSN prefix (zero-padded)
265
266        let mut segments_replayed = 0;
267
268        for path in paths {
269            // Skip downloading segments we can identify as below high_water_mark from filename
270            if let Some(lsn) = parse_lsn_from_filename(&path)
271                && lsn <= high_water_mark
272            {
273                continue; // Skip this segment without downloading
274            }
275
276            // Download segment (either LSN > high_water_mark or filename unparseable)
277            let get_result = get_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await?;
278            let bytes = get_result.bytes().await?;
279            if bytes.is_empty() {
280                continue;
281            }
282
283            let segment: WalSegment = serde_json::from_slice(&bytes)?;
284            // Double-check LSN from segment content (handles fallback case)
285            if segment.lsn > high_water_mark {
286                mutations.extend(segment.mutations);
287                segments_replayed += 1;
288            }
289        }
290
291        info!(
292            segments_replayed,
293            mutations_count = mutations.len(),
294            "WAL replay completed"
295        );
296        metrics::histogram!("uni_wal_replay_duration_seconds")
297            .record(start.elapsed().as_secs_f64());
298
299        Ok(mutations)
300    }
301
302    /// Replay all WAL segments.
303    pub async fn replay(&self) -> Result<Vec<Mutation>> {
304        self.replay_since(0).await
305    }
306
307    /// Deletes WAL segments with LSN <= high_water_mark by parsing filenames.
308    /// Only downloads segments if filename parsing fails (fallback).
309    #[instrument(skip(self), level = "info")]
310    pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
311        info!(high_water_mark, "Truncating WAL segments");
312        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
313
314        let mut deleted_count = 0;
315        for meta in metas {
316            // Try to parse LSN from filename first (fast path)
317            let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
318                lsn <= high_water_mark
319            } else {
320                // Fallback: download and parse segment if filename doesn't match expected format
321                warn!(
322                    path = %meta.location,
323                    "WAL filename doesn't match expected format, downloading segment"
324                );
325                let get_result =
326                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
327                let bytes = get_result.bytes().await?;
328                if bytes.is_empty() {
329                    // Empty segments should be deleted
330                    true
331                } else {
332                    let segment: WalSegment = serde_json::from_slice(&bytes)?;
333                    segment.lsn <= high_water_mark
334                }
335            };
336
337            if should_delete {
338                delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
339                deleted_count += 1;
340            }
341        }
342        info!(deleted_count, "WAL truncation completed");
343        Ok(())
344    }
345
346    /// Check if any WAL segments exist (for detecting database with lost manifest).
347    pub async fn has_segments(&self) -> Result<bool> {
348        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
349        Ok(!metas.is_empty())
350    }
351
352    pub async fn truncate(&self) -> Result<()> {
353        info!("Truncating all WAL segments");
354        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
355
356        let mut deleted_count = 0;
357        for meta in metas {
358            delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
359            deleted_count += 1;
360        }
361        info!(deleted_count, "Full WAL truncation completed");
362        Ok(())
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use object_store::ObjectStoreExt;
370    use object_store::local::LocalFileSystem;
371    use std::collections::HashMap;
372    use tempfile::tempdir;
373
374    #[tokio::test]
375    async fn test_wal_append_replay() -> Result<()> {
376        let dir = tempdir()?;
377        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
378        let prefix = Path::from("wal");
379
380        let wal = WriteAheadLog::new(store, prefix);
381
382        let mutation = Mutation::InsertVertex {
383            vid: Vid::new(1),
384            properties: HashMap::new(),
385            labels: vec![],
386        };
387
388        wal.append(&mutation.clone())?;
389        wal.flush().await?;
390
391        let mutations = wal.replay().await?;
392        assert_eq!(mutations.len(), 1);
393        if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
394            assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
395        } else {
396            panic!("Wrong mutation type");
397        }
398
399        wal.truncate().await?;
400        let mutations2 = wal.replay().await?;
401        assert_eq!(mutations2.len(), 0);
402
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn test_lsn_monotonicity() -> Result<()> {
408        // Verify that LSN is strictly monotonic even across multiple flushes
409        let dir = tempdir()?;
410        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
411        let prefix = Path::from("wal");
412
413        let wal = WriteAheadLog::new(store, prefix);
414
415        let mutation1 = Mutation::InsertVertex {
416            vid: Vid::new(1),
417            properties: HashMap::new(),
418            labels: vec![],
419        };
420        let mutation2 = Mutation::InsertVertex {
421            vid: Vid::new(2),
422            properties: HashMap::new(),
423            labels: vec![],
424        };
425        let mutation3 = Mutation::InsertVertex {
426            vid: Vid::new(3),
427            properties: HashMap::new(),
428            labels: vec![],
429        };
430
431        // First flush
432        wal.append(&mutation1)?;
433        let lsn1 = wal.flush().await?;
434
435        // Second flush
436        wal.append(&mutation2)?;
437        let lsn2 = wal.flush().await?;
438
439        // Third flush
440        wal.append(&mutation3)?;
441        let lsn3 = wal.flush().await?;
442
443        // Verify strict monotonicity
444        assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
445        assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
446
447        // Verify LSNs are consecutive
448        assert_eq!(lsn2, lsn1 + 1);
449        assert_eq!(lsn3, lsn2 + 1);
450
451        Ok(())
452    }
453
454    #[test]
455    fn test_parse_lsn_from_filename() {
456        // Standard format
457        let path = Path::from("00000000000000000042_a1b2c3d4.wal");
458        assert_eq!(parse_lsn_from_filename(&path), Some(42));
459
460        let path = Path::from("00000000000000001234_e5f6a7b8.wal");
461        assert_eq!(parse_lsn_from_filename(&path), Some(1234));
462
463        // Leading zeros
464        let path = Path::from("00000000000000000001_xyz.wal");
465        assert_eq!(parse_lsn_from_filename(&path), Some(1));
466
467        // Large LSN (within u64 range)
468        let path = Path::from("12345678901234567890_uuid.wal");
469        assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
470
471        // Invalid formats
472        let path = Path::from("invalid.wal");
473        assert_eq!(parse_lsn_from_filename(&path), None);
474
475        let path = Path::from("123.wal"); // Too short
476        assert_eq!(parse_lsn_from_filename(&path), None);
477
478        let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); // Non-numeric
479        assert_eq!(parse_lsn_from_filename(&path), None);
480
481        // Missing underscore separator (but first 20 chars are valid LSN)
482        let path = Path::from("00000000000000000100.wal");
483        assert_eq!(parse_lsn_from_filename(&path), Some(100));
484
485        // Empty path
486        let path = Path::from("");
487        assert_eq!(parse_lsn_from_filename(&path), None);
488    }
489
490    /// Test for Issue #6: WAL initialization should parse LSN from filenames
491    /// without downloading all segments
492    #[tokio::test]
493    async fn test_find_max_lsn_scalability() -> Result<()> {
494        let dir = tempdir()?;
495        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
496        let prefix = Path::from("wal");
497
498        let wal = WriteAheadLog::new(store, prefix);
499
500        // Create 100 WAL segments with increasing LSNs
501        for i in 1..=100 {
502            let mutation = Mutation::InsertVertex {
503                vid: Vid::new(i),
504                properties: HashMap::new(),
505                labels: vec![],
506            };
507            wal.append(&mutation)?;
508            wal.flush().await?;
509        }
510
511        // Measure initialization time - should be fast (parsing filenames, not downloading)
512        let start = std::time::Instant::now();
513        let max_lsn = wal.find_max_lsn().await?;
514        let duration = start.elapsed();
515
516        // Verify correctness
517        assert_eq!(max_lsn, 100, "Max LSN should be 100");
518
519        // Verify performance - should complete quickly even with many segments
520        assert!(
521            duration.as_millis() < 1000,
522            "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
523            duration.as_millis()
524        );
525
526        Ok(())
527    }
528
529    /// Test for Issue #11: LSN gaps are preserved on flush failures (watermark pattern)
530    #[tokio::test]
531    async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
532        let dir = tempdir()?;
533        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
534        let prefix = Path::from("wal");
535
536        let wal = WriteAheadLog::new(store.clone(), prefix.clone());
537
538        // Flush mutation 1 successfully
539        wal.append(&Mutation::InsertVertex {
540            vid: Vid::new(1),
541            properties: HashMap::new(),
542            labels: vec![],
543        })?;
544        let lsn1 = wal.flush().await?;
545        assert_eq!(lsn1, 1);
546
547        // Flush mutation 2 successfully
548        wal.append(&Mutation::InsertVertex {
549            vid: Vid::new(2),
550            properties: HashMap::new(),
551            labels: vec![],
552        })?;
553        let lsn2 = wal.flush().await?;
554        assert_eq!(lsn2, 2);
555
556        // Simulate a scenario where flush might fail by creating a read-only store
557        // (In real scenario, network failures would cause this)
558        // For now, verify that LSN assignment happens BEFORE write attempt
559        // by checking that next_lsn increments even if we don't flush
560
561        // Append mutation 3 but DON'T flush
562        wal.append(&Mutation::InsertVertex {
563            vid: Vid::new(3),
564            properties: HashMap::new(),
565            labels: vec![],
566        })?;
567
568        // Now flush mutation 4
569        wal.append(&Mutation::InsertVertex {
570            vid: Vid::new(4),
571            properties: HashMap::new(),
572            labels: vec![],
573        })?;
574        let lsn4 = wal.flush().await?;
575
576        // LSN should be 3 (both mutations 3 and 4 flushed together)
577        assert_eq!(lsn4, 3, "LSN should increment monotonically");
578
579        // Verify all mutations can be replayed
580        let mutations = wal.replay().await?;
581        assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
582
583        Ok(())
584    }
585
586    /// Test for Issue #11: Verify LSN watermark pattern - no LSN reuse
587    #[tokio::test]
588    async fn test_lsn_watermark_no_reuse() -> Result<()> {
589        let dir = tempdir()?;
590        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
591        let prefix = Path::from("wal");
592
593        let wal = WriteAheadLog::new(store, prefix);
594
595        // Track all LSNs we've seen
596        let mut seen_lsns = std::collections::HashSet::new();
597
598        // Perform 50 flushes
599        for i in 1..=50 {
600            wal.append(&Mutation::InsertVertex {
601                vid: Vid::new(i),
602                properties: HashMap::new(),
603                labels: vec![],
604            })?;
605            let lsn = wal.flush().await?;
606
607            // Verify no LSN reuse
608            assert!(
609                !seen_lsns.contains(&lsn),
610                "LSN {} was reused! This violates monotonicity.",
611                lsn
612            );
613            seen_lsns.insert(lsn);
614
615            // Verify LSN is strictly increasing
616            assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
617        }
618
619        Ok(())
620    }
621
622    /// Test for Issue #33: WAL truncation should parse LSN from filenames
623    /// without downloading all segments
624    #[tokio::test]
625    async fn test_truncate_scalability() -> Result<()> {
626        let dir = tempdir()?;
627        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
628        let prefix = Path::from("wal");
629
630        let wal = WriteAheadLog::new(store, prefix);
631
632        // Create 100 WAL segments
633        for i in 1..=100 {
634            let mutation = Mutation::InsertVertex {
635                vid: Vid::new(i),
636                properties: HashMap::new(),
637                labels: vec![],
638            };
639            wal.append(&mutation)?;
640            wal.flush().await?;
641        }
642
643        // Truncate segments with LSN <= 50
644        let start = std::time::Instant::now();
645        wal.truncate_before(50).await?;
646        let duration = start.elapsed();
647
648        // Verify only segments 51-100 remain
649        let mutations = wal.replay().await?;
650        assert_eq!(
651            mutations.len(),
652            50,
653            "Should have 50 mutations remaining (51-100)"
654        );
655
656        // Verify performance - should be fast (filename parsing, not downloading)
657        assert!(
658            duration.as_millis() < 1000,
659            "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
660            duration.as_millis()
661        );
662
663        Ok(())
664    }
665
666    /// Test for Issue #6: replay_since should skip old segments by filename
667    #[tokio::test]
668    async fn test_replay_since_skips_old_segments() -> Result<()> {
669        let dir = tempdir()?;
670        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
671        let prefix = Path::from("wal");
672
673        let wal = WriteAheadLog::new(store, prefix);
674
675        // Create 100 WAL segments
676        for i in 1..=100 {
677            let mutation = Mutation::InsertVertex {
678                vid: Vid::new(i),
679                properties: HashMap::new(),
680                labels: vec![],
681            };
682            wal.append(&mutation)?;
683            wal.flush().await?;
684        }
685
686        // Replay only segments with LSN > 90 (should skip 90 segments by filename)
687        let start = std::time::Instant::now();
688        let mutations = wal.replay_since(90).await?;
689        let duration = start.elapsed();
690
691        // Verify only 10 mutations returned (LSN 91-100)
692        assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
693
694        // Verify performance - should be fast (skips 90 segments by filename)
695        assert!(
696            duration.as_millis() < 500,
697            "replay_since took {}ms, expected < 500ms (should skip by filename)",
698            duration.as_millis()
699        );
700
701        Ok(())
702    }
703
704    /// Test for Issue #23: Vertex labels preserved through WAL replay
705    #[tokio::test]
706    async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
707        let dir = tempdir()?;
708        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
709        let prefix = Path::from("wal");
710
711        let wal = Arc::new(WriteAheadLog::new(store, prefix));
712
713        // Append InsertVertex with labels
714        wal.append(&Mutation::InsertVertex {
715            vid: Vid::new(42),
716            properties: {
717                let mut props = HashMap::new();
718                props.insert(
719                    "name".to_string(),
720                    uni_common::Value::String("Alice".to_string()),
721                );
722                props
723            },
724            labels: vec!["Person".to_string(), "User".to_string()],
725        })?;
726
727        // Flush to WAL
728        wal.flush().await?;
729
730        // Replay mutations
731        let mutations = wal.replay().await?;
732        assert_eq!(mutations.len(), 1);
733
734        // Verify labels are preserved
735        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
736            assert_eq!(vid.as_u64(), 42);
737            assert_eq!(labels.len(), 2);
738            assert!(labels.contains(&"Person".to_string()));
739            assert!(labels.contains(&"User".to_string()));
740        } else {
741            panic!("Expected InsertVertex mutation");
742        }
743
744        Ok(())
745    }
746
747    /// Test for Issue #23: DeleteVertex labels preserved through WAL replay
748    #[tokio::test]
749    async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
750        let dir = tempdir()?;
751        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
752        let prefix = Path::from("wal");
753
754        let wal = Arc::new(WriteAheadLog::new(store, prefix));
755
756        // Append DeleteVertex with labels (needed for tombstone flushing - Issue #76)
757        wal.append(&Mutation::DeleteVertex {
758            vid: Vid::new(99),
759            labels: vec!["Person".to_string(), "Admin".to_string()],
760        })?;
761
762        // Flush to WAL
763        wal.flush().await?;
764
765        // Replay mutations
766        let mutations = wal.replay().await?;
767        assert_eq!(mutations.len(), 1);
768
769        // Verify labels are preserved in DeleteVertex
770        if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
771            assert_eq!(vid.as_u64(), 99);
772            assert_eq!(labels.len(), 2);
773            assert!(labels.contains(&"Person".to_string()));
774            assert!(labels.contains(&"Admin".to_string()));
775        } else {
776            panic!("Expected DeleteVertex mutation");
777        }
778
779        Ok(())
780    }
781
782    /// Test for Issue #28: Edge type name preserved through WAL replay
783    #[tokio::test]
784    async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
785        let dir = tempdir()?;
786        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
787        let prefix = Path::from("wal");
788
789        let wal = Arc::new(WriteAheadLog::new(store, prefix));
790
791        // Append InsertEdge with edge_type_name
792        wal.append(&Mutation::InsertEdge {
793            src_vid: Vid::new(1),
794            dst_vid: Vid::new(2),
795            edge_type: 100,
796            eid: Eid::new(500),
797            version: 1,
798            properties: {
799                let mut props = HashMap::new();
800                props.insert("since".to_string(), uni_common::Value::Int(2020));
801                props
802            },
803            edge_type_name: Some("KNOWS".to_string()),
804        })?;
805
806        // Flush to WAL
807        wal.flush().await?;
808
809        // Replay mutations
810        let mutations = wal.replay().await?;
811        assert_eq!(mutations.len(), 1);
812
813        // Verify edge_type_name is preserved
814        if let Mutation::InsertEdge {
815            eid,
816            edge_type_name,
817            ..
818        } = &mutations[0]
819        {
820            assert_eq!(eid.as_u64(), 500);
821            assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
822        } else {
823            panic!("Expected InsertEdge mutation");
824        }
825
826        Ok(())
827    }
828
829    /// Test for Issue #23: Backward compatibility with old WAL segments (no labels)
830    #[tokio::test]
831    async fn test_wal_backward_compatibility_labels() -> Result<()> {
832        let dir = tempdir()?;
833        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
834        let prefix = Path::from("wal");
835
836        // Manually create a WAL segment with old format (no labels field)
837        let old_format_json = r#"{
838            "lsn": 1,
839            "mutations": [
840                {
841                    "InsertVertex": {
842                        "vid": 123,
843                        "properties": {}
844                    }
845                }
846            ]
847        }"#;
848
849        let path = prefix.clone().join("00000000000000000001_test.wal");
850        store.put(&path, old_format_json.into()).await?;
851
852        // Create WAL and replay
853        let wal = WriteAheadLog::new(store, prefix);
854        let mutations = wal.replay().await?;
855
856        // Verify old format deserializes with empty labels (via #[serde(default)])
857        assert_eq!(mutations.len(), 1);
858        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
859            assert_eq!(vid.as_u64(), 123);
860            assert_eq!(
861                labels.len(),
862                0,
863                "Old format should deserialize with empty labels"
864            );
865        } else {
866            panic!("Expected InsertVertex mutation");
867        }
868
869        Ok(())
870    }
871}