Skip to main content

uni_store/runtime/
wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::store_utils::{
5    DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19/// Parse LSN from WAL segment filename format `{:020}_{uuid}.wal`.
20/// Returns None if the filename doesn't match the expected format.
21fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22    let filename = path.filename()?;
23    if filename.len() < 20 {
24        return None;
25    }
26    // `str::get(..20)` yields `None` (instead of panicking) when byte index 20
27    // falls mid-UTF-8-character, so a foreign multibyte filename is skipped.
28    filename.get(..20).and_then(|s| s.parse::<u64>().ok())
29}
30
31/// Magic prefix of checksummed (v2) WAL segments.
32///
33/// v2 layout: `UNIWAL2\n<64-hex-char blake3 of payload>\n<payload JSON>`.
34/// Segments without the magic are legacy (pre-2.0.7) raw JSON and are still
35/// readable; they just have no integrity protection.
36const WAL_V2_MAGIC: &[u8] = b"UNIWAL2\n";
37
38/// Length of the hex-encoded blake3 checksum in the v2 header.
39const WAL_V2_HASH_HEX_LEN: usize = 64;
40
41/// Wrap a serialized segment payload in the checksummed v2 envelope.
42fn encode_segment_envelope(payload_json: &[u8]) -> Vec<u8> {
43    let hash = blake3::hash(payload_json);
44    let mut out =
45        Vec::with_capacity(WAL_V2_MAGIC.len() + WAL_V2_HASH_HEX_LEN + 1 + payload_json.len());
46    out.extend_from_slice(WAL_V2_MAGIC);
47    out.extend_from_slice(hash.to_hex().as_bytes());
48    out.push(b'\n');
49    out.extend_from_slice(payload_json);
50    out
51}
52
53/// Decode a WAL segment from its on-disk bytes, verifying the checksum for
54/// v2 envelopes and falling back to legacy raw-JSON parsing otherwise.
55///
56/// Returns a human-readable corruption description on failure — the caller
57/// decides whether that is fatal (corrupt middle segment) or a tolerated
58/// torn tail (see [`WriteAheadLog::replay_since`]).
59///
60/// `pub` + `doc(hidden)` solely so `fuzz/fuzz_targets/wal_decode.rs` can
61/// drive it with arbitrary bytes; it is not part of the public API.
62#[doc(hidden)]
63pub fn decode_segment(bytes: &[u8]) -> std::result::Result<WalSegment, String> {
64    if let Some(rest) = bytes.strip_prefix(WAL_V2_MAGIC) {
65        if rest.len() < WAL_V2_HASH_HEX_LEN + 1 || rest[WAL_V2_HASH_HEX_LEN] != b'\n' {
66            return Err("truncated v2 segment header".to_string());
67        }
68        let (hex, payload_nl) = rest.split_at(WAL_V2_HASH_HEX_LEN);
69        let payload = &payload_nl[1..];
70        let expected =
71            std::str::from_utf8(hex).map_err(|_| "non-utf8 checksum header".to_string())?;
72        let actual = blake3::hash(payload);
73        if actual.to_hex().as_str() != expected {
74            return Err(format!(
75                "checksum mismatch (expected {expected}, computed {})",
76                actual.to_hex()
77            ));
78        }
79        serde_json::from_slice(payload).map_err(|e| format!("v2 payload parse: {e}"))
80    } else {
81        // Legacy (pre-2.0.7) segment: raw JSON, no checksum.
82        serde_json::from_slice(bytes).map_err(|e| format!("legacy segment parse: {e}"))
83    }
84}
85
86/// Fsync a freshly written file and its parent directory.
87///
88/// The directory fsync makes the new directory entry itself durable across
89/// a crash (pattern borrowed from uni-sidecar's atomic `store_value`).
90fn sync_file_and_parent(path: &std::path::Path) -> std::io::Result<()> {
91    std::fs::File::open(path)?.sync_all()?;
92    #[cfg(unix)]
93    if let Some(dir) = path.parent() {
94        std::fs::File::open(dir)?.sync_all()?;
95    }
96    Ok(())
97}
98
99#[derive(Serialize, Deserialize, Debug, Clone)]
100pub enum Mutation {
101    InsertEdge {
102        src_vid: Vid,
103        dst_vid: Vid,
104        edge_type: u32,
105        eid: Eid,
106        version: u64,
107        properties: Properties,
108        /// Edge type name for metadata recovery. Optional for backward compatibility.
109        #[serde(default)]
110        edge_type_name: Option<String>,
111    },
112    DeleteEdge {
113        eid: Eid,
114        src_vid: Vid,
115        dst_vid: Vid,
116        edge_type: u32,
117        version: u64,
118    },
119    InsertVertex {
120        vid: Vid,
121        properties: Properties,
122        #[serde(default)]
123        labels: Vec<String>,
124    },
125    DeleteVertex {
126        vid: Vid,
127        #[serde(default)]
128        labels: Vec<String>,
129    },
130    /// Replaces a vertex's full label set (a `SET n:Label` / `REMOVE n:Label`
131    /// that touched no properties). Carries the complete resolved label set so
132    /// replay can REPLACE (removals included). Added after the original four
133    /// variants — externally-tagged serde_json, so old WAL segments (which never
134    /// contain it) deserialize unchanged.
135    SetVertexLabels { vid: Vid, labels: Vec<String> },
136}
137
138/// WAL segment with LSN for idempotent recovery
139#[derive(Serialize, Deserialize, Debug, Clone)]
140pub struct WalSegment {
141    /// Log Sequence Number - monotonically increasing per segment
142    pub lsn: u64,
143    /// Mutations in this segment
144    pub mutations: Vec<Mutation>,
145}
146
147/// Borrowed serialization view of [`WalSegment`].
148///
149/// Field names and order match `WalSegment` exactly, so the serde output is
150/// byte-identical; `flush` serializes through this to avoid deep-cloning the
151/// whole mutation batch per flush.
152#[derive(Serialize, Debug)]
153struct WalSegmentRef<'a> {
154    lsn: u64,
155    mutations: &'a [Mutation],
156}
157
158pub struct WriteAheadLog {
159    store: Arc<dyn ObjectStore>,
160    prefix: Path,
161    /// Filesystem root backing `store` when it is a local store. When set,
162    /// every flushed segment is fsync'd (file + parent directory) before the
163    /// flush is reported durable — `object_store::LocalFileSystem` does not
164    /// fsync on `put`, so without this a power loss can drop acknowledged
165    /// commits. `None` for remote stores (the PUT ack is the durability
166    /// point there).
167    local_root: Option<std::path::PathBuf>,
168    state: Mutex<WalState>,
169}
170
171struct WalState {
172    buffer: Vec<Mutation>,
173    /// Current LSN counter (incremented per flush)
174    next_lsn: u64,
175    /// Highest LSN successfully flushed
176    flushed_lsn: u64,
177}
178
179impl WriteAheadLog {
180    pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
181        Self {
182            store,
183            prefix,
184            local_root: None,
185            state: Mutex::new(WalState {
186                buffer: Vec::new(),
187                next_lsn: 1, // Start at 1 so 0 means "no WAL"
188                flushed_lsn: 0,
189            }),
190        }
191    }
192
193    /// Set the local filesystem root backing the object store, enabling
194    /// fsync-on-flush. See the field docs on `local_root`.
195    #[must_use]
196    pub fn with_local_root(mut self, local_root: Option<std::path::PathBuf>) -> Self {
197        self.local_root = local_root;
198        self
199    }
200
201    /// Initialize WAL state from existing segments (called on startup)
202    pub async fn initialize(&self) -> Result<u64> {
203        let max_lsn = self.find_max_lsn().await?;
204        {
205            let mut state = acquire_mutex(&self.state, "wal_state")?;
206            state.next_lsn = max_lsn + 1;
207            state.flushed_lsn = max_lsn;
208        }
209        Ok(max_lsn)
210    }
211
212    /// Find the maximum LSN in existing WAL segments by parsing filenames.
213    /// Only downloads segments if filename parsing fails (fallback).
214    async fn find_max_lsn(&self) -> Result<u64> {
215        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
216        let mut max_lsn: u64 = 0;
217
218        for meta in metas {
219            // Try to parse LSN from filename first (fast path)
220            if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
221                max_lsn = max_lsn.max(lsn);
222            } else {
223                // Fallback: download and parse segment if filename doesn't match expected format
224                warn!(
225                    path = %meta.location,
226                    "WAL filename doesn't match expected format, downloading segment"
227                );
228                let get_result =
229                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
230                let bytes = get_result.bytes().await?;
231                if bytes.is_empty() {
232                    continue;
233                }
234                // This is only a max-LSN probe; a corrupt segment is skipped
235                // here (with a warning) and adjudicated by `replay_since`'s
236                // tail-vs-middle policy during actual recovery.
237                match decode_segment(&bytes) {
238                    Ok(segment) => max_lsn = max_lsn.max(segment.lsn),
239                    Err(reason) => {
240                        warn!(path = %meta.location, reason = %reason,
241                            "Skipping corrupt WAL segment during max-LSN probe");
242                    }
243                }
244            }
245        }
246
247        Ok(max_lsn)
248    }
249
250    #[instrument(skip(self, mutation), level = "trace")]
251    pub fn append(&self, mutation: Mutation) -> Result<()> {
252        let mut state = acquire_mutex(&self.state, "wal_state")?;
253        state.buffer.push(mutation);
254        metrics::counter!("uni_wal_entries_total").increment(1);
255        Ok(())
256    }
257
258    /// Flush buffered mutations to a WAL segment. Returns the LSN of the flushed segment.
259    #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
260    pub async fn flush(&self) -> Result<u64> {
261        let start = std::time::Instant::now();
262        let (batch, lsn) = {
263            let mut state = acquire_mutex(&self.state, "wal_state")?;
264            if state.buffer.is_empty() {
265                return Ok(state.flushed_lsn);
266            }
267            let lsn = state.next_lsn;
268            state.next_lsn += 1;
269            (std::mem::take(&mut state.buffer), lsn)
270        };
271
272        tracing::Span::current().record("lsn", lsn);
273        tracing::Span::current().record("mutations_count", batch.len());
274
275        // Serialize a borrowed view of the segment (serde output is identical
276        // to `WalSegment` — see `wal_segment_ref_serializes_identically`), so
277        // the batch is not deep-cloned per flush. `batch` itself stays owned
278        // for the restore-on-failure paths below.
279        let segment = WalSegmentRef {
280            lsn,
281            mutations: &batch,
282        };
283
284        // Serialize segment; restore buffer on failure
285        let json = match serde_json::to_vec(&segment) {
286            Ok(j) => j,
287            Err(e) => {
288                warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
289                // Restore buffer on serialization failure
290                let mut state = acquire_mutex(&self.state, "wal_state")?;
291                let new_mutations = std::mem::take(&mut state.buffer);
292                state.buffer = batch;
293                state.buffer.extend(new_mutations);
294                // Don't roll back LSN - gap is harmless and maintains monotonicity
295                return Err(e.into());
296            }
297        };
298        // Wrap in the checksummed v2 envelope so torn/corrupt segments are
299        // detected at replay instead of surfacing as opaque parse errors.
300        let body = encode_segment_envelope(&json);
301        tracing::Span::current().record("size_bytes", body.len());
302        metrics::counter!("uni_wal_bytes_written_total").increment(body.len() as u64);
303
304        // Include LSN in filename for easy ordering and identification
305        let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
306        let path = self.prefix.clone().join(filename);
307
308        // Attempt to write; restore buffer on failure to prevent data loss
309        if let Err(e) = put_with_timeout(&self.store, &path, body.into(), DEFAULT_TIMEOUT).await {
310            warn!(
311                lsn,
312                error = %e,
313                "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
314            );
315            // Restore buffer so data isn't lost on transient failures
316            let mut state = acquire_mutex(&self.state, "wal_state")?;
317            // Prepend the failed batch to any new mutations that arrived
318            let new_mutations = std::mem::take(&mut state.buffer);
319            state.buffer = batch;
320            state.buffer.extend(new_mutations);
321            // Don't roll back LSN - gap is harmless and maintains strict monotonicity
322            // All WAL consumers use `>` / `<=` comparisons, not equality checks
323            return Err(e);
324        }
325
326        // Local stores: fsync the segment + its directory before reporting
327        // the flush durable. On fsync failure the buffer is NOT restored —
328        // the bytes are already written (re-flushing would duplicate them
329        // under a new LSN) — but the flush reports failure because
330        // durability cannot be guaranteed.
331        if let Some(root) = &self.local_root {
332            let file_path = root.join(path.as_ref());
333            let synced =
334                tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await;
335            match synced {
336                Ok(Ok(())) => {}
337                Ok(Err(e)) => {
338                    warn!(lsn, error = %e, "WAL segment fsync failed — durability not guaranteed");
339                    return Err(e.into());
340                }
341                Err(e) => {
342                    warn!(lsn, error = %e, "WAL fsync task failed");
343                    return Err(e.into());
344                }
345            }
346        }
347
348        // Update flushed LSN on success
349        {
350            let mut state = acquire_mutex(&self.state, "wal_state")?;
351            state.flushed_lsn = lsn;
352        }
353
354        let duration = start.elapsed();
355        metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
356        metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
357
358        if duration.as_millis() > 100 {
359            warn!(
360                lsn,
361                duration_ms = duration.as_millis(),
362                "Slow WAL flush detected"
363            );
364        } else {
365            debug!(
366                lsn,
367                duration_ms = duration.as_millis(),
368                "WAL flush completed"
369            );
370        }
371
372        Ok(lsn)
373    }
374
375    /// Get the highest LSN that has been flushed.
376    ///
377    /// # Errors
378    ///
379    /// Returns error if the WAL state lock is poisoned (see issue #18/#150).
380    pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
381        let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
382        Ok(guard.flushed_lsn)
383    }
384
385    /// Replay WAL segments with LSN > high_water_mark.
386    /// Returns mutations from segments that haven't been applied yet.
387    /// Optimized to skip downloading segments with LSN <= high_water_mark (parsed from filename).
388    ///
389    /// Corruption policy: a corrupt (bad checksum / unparseable / empty)
390    /// segment at the **tail** of the log is the classic torn write from a
391    /// crash — it is logged prominently and treated as end-of-WAL, since the
392    /// commit it belonged to was never acknowledged. A corrupt segment with
393    /// valid segments **after** it is real data loss and fails recovery with
394    /// an error naming the file.
395    #[instrument(skip(self), level = "debug")]
396    pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
397        let start = std::time::Instant::now();
398        debug!(high_water_mark, "Replaying WAL segments");
399        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
400        let mut mutations = Vec::new();
401
402        // Collect candidate paths and sort by LSN (filename prefix).
403        // Lexicographical sort works for the zero-padded LSN prefix.
404        let mut paths: Vec<_> = metas
405            .into_iter()
406            .map(|m| m.location)
407            .filter(|p| {
408                // Skip segments identifiable as <= high_water_mark without
409                // downloading. Unparseable filenames stay in (legacy safety).
410                parse_lsn_from_filename(p).is_none_or(|lsn| lsn > high_water_mark)
411            })
412            .collect();
413        paths.sort();
414
415        let mut segments_replayed = 0;
416
417        for (idx, path) in paths.iter().enumerate() {
418            let get_result = get_with_timeout(&self.store, path, DEFAULT_TIMEOUT).await?;
419            let bytes = get_result.bytes().await?;
420
421            // Empty files and decode failures share one corruption policy.
422            let decoded = if bytes.is_empty() {
423                Err("empty segment file".to_string())
424            } else {
425                decode_segment(&bytes)
426            };
427
428            let segment = match decoded {
429                Ok(segment) => segment,
430                Err(reason) => {
431                    let is_tail = idx + 1 == paths.len();
432                    if is_tail {
433                        warn!(
434                            path = %path,
435                            reason = %reason,
436                            "Corrupt tail WAL segment — torn write from a crash; \
437                             treating as end of WAL (the commit was never acknowledged)"
438                        );
439                        break;
440                    }
441                    return Err(anyhow::anyhow!(
442                        "corrupt WAL segment '{path}' ({reason}) with {} later segment(s) \
443                         present; refusing to skip — manual inspection required",
444                        paths.len() - idx - 1
445                    ));
446                }
447            };
448
449            // Double-check LSN from segment content (handles fallback case)
450            if segment.lsn > high_water_mark {
451                mutations.extend(segment.mutations);
452                segments_replayed += 1;
453            }
454        }
455
456        info!(
457            segments_replayed,
458            mutations_count = mutations.len(),
459            "WAL replay completed"
460        );
461        metrics::histogram!("uni_wal_replay_duration_seconds")
462            .record(start.elapsed().as_secs_f64());
463
464        Ok(mutations)
465    }
466
467    /// Replay all WAL segments.
468    pub async fn replay(&self) -> Result<Vec<Mutation>> {
469        self.replay_since(0).await
470    }
471
472    /// Deletes WAL segments with LSN <= high_water_mark by parsing filenames.
473    /// Only downloads segments if filename parsing fails (fallback).
474    #[instrument(skip(self), level = "info")]
475    pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
476        info!(high_water_mark, "Truncating WAL segments");
477        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
478
479        let mut deleted_count = 0;
480        for meta in metas {
481            // Try to parse LSN from filename first (fast path)
482            let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
483                lsn <= high_water_mark
484            } else {
485                // Fallback: download and parse segment if filename doesn't match expected format
486                warn!(
487                    path = %meta.location,
488                    "WAL filename doesn't match expected format, downloading segment"
489                );
490                let get_result =
491                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
492                let bytes = get_result.bytes().await?;
493                if bytes.is_empty() {
494                    // Empty segments should be deleted
495                    true
496                } else {
497                    match decode_segment(&bytes) {
498                        Ok(segment) => segment.lsn <= high_water_mark,
499                        Err(reason) => {
500                            // Never delete a corrupt segment during
501                            // truncation — keep the evidence; replay's
502                            // tail-vs-middle policy adjudicates it.
503                            warn!(path = %meta.location, reason = %reason,
504                                "Keeping corrupt WAL segment during truncation");
505                            false
506                        }
507                    }
508                }
509            };
510
511            if should_delete {
512                delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
513                deleted_count += 1;
514            }
515        }
516        info!(deleted_count, "WAL truncation completed");
517        Ok(())
518    }
519
520    /// Check if any WAL segments exist (for detecting database with lost manifest).
521    pub async fn has_segments(&self) -> Result<bool> {
522        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
523        Ok(!metas.is_empty())
524    }
525
526    pub async fn truncate(&self) -> Result<()> {
527        info!("Truncating all WAL segments");
528        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
529
530        let mut deleted_count = 0;
531        for meta in metas {
532            delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
533            deleted_count += 1;
534        }
535        info!(deleted_count, "Full WAL truncation completed");
536        Ok(())
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use object_store::ObjectStoreExt;
544    use object_store::local::LocalFileSystem;
545    use std::collections::HashMap;
546    use tempfile::tempdir;
547
548    #[tokio::test]
549    async fn test_wal_append_replay() -> Result<()> {
550        let dir = tempdir()?;
551        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
552        let prefix = Path::from("wal");
553
554        let wal = WriteAheadLog::new(store, prefix);
555
556        let mutation = Mutation::InsertVertex {
557            vid: Vid::new(1),
558            properties: HashMap::new(),
559            labels: vec![],
560        };
561
562        wal.append(mutation)?;
563        wal.flush().await?;
564
565        let mutations = wal.replay().await?;
566        assert_eq!(mutations.len(), 1);
567        if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
568            assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
569        } else {
570            panic!("Wrong mutation type");
571        }
572
573        wal.truncate().await?;
574        let mutations2 = wal.replay().await?;
575        assert_eq!(mutations2.len(), 0);
576
577        Ok(())
578    }
579
580    #[tokio::test]
581    async fn test_lsn_monotonicity() -> Result<()> {
582        // Verify that LSN is strictly monotonic even across multiple flushes
583        let dir = tempdir()?;
584        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
585        let prefix = Path::from("wal");
586
587        let wal = WriteAheadLog::new(store, prefix);
588
589        let mutation1 = Mutation::InsertVertex {
590            vid: Vid::new(1),
591            properties: HashMap::new(),
592            labels: vec![],
593        };
594        let mutation2 = Mutation::InsertVertex {
595            vid: Vid::new(2),
596            properties: HashMap::new(),
597            labels: vec![],
598        };
599        let mutation3 = Mutation::InsertVertex {
600            vid: Vid::new(3),
601            properties: HashMap::new(),
602            labels: vec![],
603        };
604
605        // First flush
606        wal.append(mutation1)?;
607        let lsn1 = wal.flush().await?;
608
609        // Second flush
610        wal.append(mutation2)?;
611        let lsn2 = wal.flush().await?;
612
613        // Third flush
614        wal.append(mutation3)?;
615        let lsn3 = wal.flush().await?;
616
617        // Verify strict monotonicity
618        assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
619        assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
620
621        // Verify LSNs are consecutive
622        assert_eq!(lsn2, lsn1 + 1);
623        assert_eq!(lsn3, lsn2 + 1);
624
625        Ok(())
626    }
627
628    #[test]
629    fn test_parse_lsn_from_filename() {
630        // Standard format
631        let path = Path::from("00000000000000000042_a1b2c3d4.wal");
632        assert_eq!(parse_lsn_from_filename(&path), Some(42));
633
634        let path = Path::from("00000000000000001234_e5f6a7b8.wal");
635        assert_eq!(parse_lsn_from_filename(&path), Some(1234));
636
637        // Leading zeros
638        let path = Path::from("00000000000000000001_xyz.wal");
639        assert_eq!(parse_lsn_from_filename(&path), Some(1));
640
641        // Large LSN (within u64 range)
642        let path = Path::from("12345678901234567890_uuid.wal");
643        assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
644
645        // Invalid formats
646        let path = Path::from("invalid.wal");
647        assert_eq!(parse_lsn_from_filename(&path), None);
648
649        let path = Path::from("123.wal"); // Too short
650        assert_eq!(parse_lsn_from_filename(&path), None);
651
652        let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); // Non-numeric
653        assert_eq!(parse_lsn_from_filename(&path), None);
654
655        // Missing underscore separator (but first 20 chars are valid LSN)
656        let path = Path::from("00000000000000000100.wal");
657        assert_eq!(parse_lsn_from_filename(&path), Some(100));
658
659        // Empty path
660        let path = Path::from("");
661        assert_eq!(parse_lsn_from_filename(&path), None);
662    }
663
664    /// Regression for Bug #30: `parse_lsn_from_filename` must not panic on a
665    /// filename whose byte 20 falls in the middle of a multi-byte UTF-8 char.
666    ///
667    /// The length guard uses [`str::len`] (byte length), but the subsequent
668    /// `filename[..20]` slice requires byte 20 to be a char boundary. A name of
669    /// 19 ASCII bytes plus one 2-byte char ('é', bytes 19..21) has a byte length
670    /// of at least 20, passes the guard, then slices mid-'é' and panics today.
671    /// The correct behavior is to return `None` for a non-numeric/unparsable name.
672    ///
673    /// We use [`Path::parse`] (not [`Path::from`]) because `from` percent-encodes
674    /// non-ASCII so that `filename()` would be pure ASCII and never reach the
675    /// mid-char slice; `parse` preserves the raw multi-byte segment verbatim,
676    /// which is exactly what a real on-disk listing can surface.
677    #[test]
678    fn test_parse_lsn_from_filename_multibyte_no_panic() {
679        let name = format!("{}{}.wal", "0".repeat(19), "é"); // byte 20 falls mid-'é'
680        let path = Path::parse(name).expect("multi-byte segment is a valid object_store path");
681        assert_eq!(parse_lsn_from_filename(&path), None); // RED: panics today; correct = None
682    }
683
684    /// Test for Issue #6: WAL initialization should parse LSN from filenames
685    /// without downloading all segments
686    #[tokio::test]
687    async fn test_find_max_lsn_scalability() -> Result<()> {
688        let dir = tempdir()?;
689        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
690        let prefix = Path::from("wal");
691
692        let wal = WriteAheadLog::new(store, prefix);
693
694        // Create 100 WAL segments with increasing LSNs
695        for i in 1..=100 {
696            let mutation = Mutation::InsertVertex {
697                vid: Vid::new(i),
698                properties: HashMap::new(),
699                labels: vec![],
700            };
701            wal.append(mutation)?;
702            wal.flush().await?;
703        }
704
705        // Measure initialization time - should be fast (parsing filenames, not downloading)
706        let start = std::time::Instant::now();
707        let max_lsn = wal.find_max_lsn().await?;
708        let duration = start.elapsed();
709
710        // Verify correctness
711        assert_eq!(max_lsn, 100, "Max LSN should be 100");
712
713        // Verify performance - should complete quickly even with many segments
714        assert!(
715            duration.as_millis() < 1000,
716            "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
717            duration.as_millis()
718        );
719
720        Ok(())
721    }
722
723    /// Test for Issue #11: LSN gaps are preserved on flush failures (watermark pattern)
724    #[tokio::test]
725    async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
726        let dir = tempdir()?;
727        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
728        let prefix = Path::from("wal");
729
730        let wal = WriteAheadLog::new(store.clone(), prefix.clone());
731
732        // Flush mutation 1 successfully
733        wal.append(Mutation::InsertVertex {
734            vid: Vid::new(1),
735            properties: HashMap::new(),
736            labels: vec![],
737        })?;
738        let lsn1 = wal.flush().await?;
739        assert_eq!(lsn1, 1);
740
741        // Flush mutation 2 successfully
742        wal.append(Mutation::InsertVertex {
743            vid: Vid::new(2),
744            properties: HashMap::new(),
745            labels: vec![],
746        })?;
747        let lsn2 = wal.flush().await?;
748        assert_eq!(lsn2, 2);
749
750        // Simulate a scenario where flush might fail by creating a read-only store
751        // (In real scenario, network failures would cause this)
752        // For now, verify that LSN assignment happens BEFORE write attempt
753        // by checking that next_lsn increments even if we don't flush
754
755        // Append mutation 3 but DON'T flush
756        wal.append(Mutation::InsertVertex {
757            vid: Vid::new(3),
758            properties: HashMap::new(),
759            labels: vec![],
760        })?;
761
762        // Now flush mutation 4
763        wal.append(Mutation::InsertVertex {
764            vid: Vid::new(4),
765            properties: HashMap::new(),
766            labels: vec![],
767        })?;
768        let lsn4 = wal.flush().await?;
769
770        // LSN should be 3 (both mutations 3 and 4 flushed together)
771        assert_eq!(lsn4, 3, "LSN should increment monotonically");
772
773        // Verify all mutations can be replayed
774        let mutations = wal.replay().await?;
775        assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
776
777        Ok(())
778    }
779
780    /// Test for Issue #11: Verify LSN watermark pattern - no LSN reuse
781    #[tokio::test]
782    async fn test_lsn_watermark_no_reuse() -> Result<()> {
783        let dir = tempdir()?;
784        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
785        let prefix = Path::from("wal");
786
787        let wal = WriteAheadLog::new(store, prefix);
788
789        // Track all LSNs we've seen
790        let mut seen_lsns = std::collections::HashSet::new();
791
792        // Perform 50 flushes
793        for i in 1..=50 {
794            wal.append(Mutation::InsertVertex {
795                vid: Vid::new(i),
796                properties: HashMap::new(),
797                labels: vec![],
798            })?;
799            let lsn = wal.flush().await?;
800
801            // Verify no LSN reuse
802            assert!(
803                !seen_lsns.contains(&lsn),
804                "LSN {} was reused! This violates monotonicity.",
805                lsn
806            );
807            seen_lsns.insert(lsn);
808
809            // Verify LSN is strictly increasing
810            assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
811        }
812
813        Ok(())
814    }
815
816    /// Test for Issue #33: WAL truncation should parse LSN from filenames
817    /// without downloading all segments
818    #[tokio::test]
819    async fn test_truncate_scalability() -> Result<()> {
820        let dir = tempdir()?;
821        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
822        let prefix = Path::from("wal");
823
824        let wal = WriteAheadLog::new(store, prefix);
825
826        // Create 100 WAL segments
827        for i in 1..=100 {
828            let mutation = Mutation::InsertVertex {
829                vid: Vid::new(i),
830                properties: HashMap::new(),
831                labels: vec![],
832            };
833            wal.append(mutation)?;
834            wal.flush().await?;
835        }
836
837        // Truncate segments with LSN <= 50
838        let start = std::time::Instant::now();
839        wal.truncate_before(50).await?;
840        let duration = start.elapsed();
841
842        // Verify only segments 51-100 remain
843        let mutations = wal.replay().await?;
844        assert_eq!(
845            mutations.len(),
846            50,
847            "Should have 50 mutations remaining (51-100)"
848        );
849
850        // Verify performance - should be fast (filename parsing, not downloading)
851        assert!(
852            duration.as_millis() < 1000,
853            "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
854            duration.as_millis()
855        );
856
857        Ok(())
858    }
859
860    /// Test for Issue #6: replay_since should skip old segments by filename
861    #[tokio::test]
862    async fn test_replay_since_skips_old_segments() -> Result<()> {
863        let dir = tempdir()?;
864        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
865        let prefix = Path::from("wal");
866
867        let wal = WriteAheadLog::new(store, prefix);
868
869        // Create 100 WAL segments
870        for i in 1..=100 {
871            let mutation = Mutation::InsertVertex {
872                vid: Vid::new(i),
873                properties: HashMap::new(),
874                labels: vec![],
875            };
876            wal.append(mutation)?;
877            wal.flush().await?;
878        }
879
880        // Replay only segments with LSN > 90 (should skip 90 segments by filename)
881        let start = std::time::Instant::now();
882        let mutations = wal.replay_since(90).await?;
883        let duration = start.elapsed();
884
885        // Verify only 10 mutations returned (LSN 91-100)
886        assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
887
888        // Verify performance - should be fast (skips 90 segments by filename)
889        assert!(
890            duration.as_millis() < 500,
891            "replay_since took {}ms, expected < 500ms (should skip by filename)",
892            duration.as_millis()
893        );
894
895        Ok(())
896    }
897
898    /// Test for Issue #23: Vertex labels preserved through WAL replay
899    #[tokio::test]
900    async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
901        let dir = tempdir()?;
902        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
903        let prefix = Path::from("wal");
904
905        let wal = Arc::new(WriteAheadLog::new(store, prefix));
906
907        // Append InsertVertex with labels
908        wal.append(Mutation::InsertVertex {
909            vid: Vid::new(42),
910            properties: {
911                let mut props = HashMap::new();
912                props.insert(
913                    "name".to_string(),
914                    uni_common::Value::String("Alice".to_string()),
915                );
916                props
917            },
918            labels: vec!["Person".to_string(), "User".to_string()],
919        })?;
920
921        // Flush to WAL
922        wal.flush().await?;
923
924        // Replay mutations
925        let mutations = wal.replay().await?;
926        assert_eq!(mutations.len(), 1);
927
928        // Verify labels are preserved
929        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
930            assert_eq!(vid.as_u64(), 42);
931            assert_eq!(labels.len(), 2);
932            assert!(labels.contains(&"Person".to_string()));
933            assert!(labels.contains(&"User".to_string()));
934        } else {
935            panic!("Expected InsertVertex mutation");
936        }
937
938        Ok(())
939    }
940
941    /// Test for Issue #23: DeleteVertex labels preserved through WAL replay
942    #[tokio::test]
943    async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
944        let dir = tempdir()?;
945        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
946        let prefix = Path::from("wal");
947
948        let wal = Arc::new(WriteAheadLog::new(store, prefix));
949
950        // Append DeleteVertex with labels (needed for tombstone flushing - Issue #76)
951        wal.append(Mutation::DeleteVertex {
952            vid: Vid::new(99),
953            labels: vec!["Person".to_string(), "Admin".to_string()],
954        })?;
955
956        // Flush to WAL
957        wal.flush().await?;
958
959        // Replay mutations
960        let mutations = wal.replay().await?;
961        assert_eq!(mutations.len(), 1);
962
963        // Verify labels are preserved in DeleteVertex
964        if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
965            assert_eq!(vid.as_u64(), 99);
966            assert_eq!(labels.len(), 2);
967            assert!(labels.contains(&"Person".to_string()));
968            assert!(labels.contains(&"Admin".to_string()));
969        } else {
970            panic!("Expected DeleteVertex mutation");
971        }
972
973        Ok(())
974    }
975
976    /// Test for Issue #28: Edge type name preserved through WAL replay
977    #[tokio::test]
978    async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
979        let dir = tempdir()?;
980        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
981        let prefix = Path::from("wal");
982
983        let wal = Arc::new(WriteAheadLog::new(store, prefix));
984
985        // Append InsertEdge with edge_type_name
986        wal.append(Mutation::InsertEdge {
987            src_vid: Vid::new(1),
988            dst_vid: Vid::new(2),
989            edge_type: 100,
990            eid: Eid::new(500),
991            version: 1,
992            properties: {
993                let mut props = HashMap::new();
994                props.insert("since".to_string(), uni_common::Value::Int(2020));
995                props
996            },
997            edge_type_name: Some("KNOWS".to_string()),
998        })?;
999
1000        // Flush to WAL
1001        wal.flush().await?;
1002
1003        // Replay mutations
1004        let mutations = wal.replay().await?;
1005        assert_eq!(mutations.len(), 1);
1006
1007        // Verify edge_type_name is preserved
1008        if let Mutation::InsertEdge {
1009            eid,
1010            edge_type_name,
1011            ..
1012        } = &mutations[0]
1013        {
1014            assert_eq!(eid.as_u64(), 500);
1015            assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
1016        } else {
1017            panic!("Expected InsertEdge mutation");
1018        }
1019
1020        Ok(())
1021    }
1022
1023    /// Test for Issue #23: Backward compatibility with old WAL segments (no labels)
1024    #[tokio::test]
1025    async fn test_wal_backward_compatibility_labels() -> Result<()> {
1026        let dir = tempdir()?;
1027        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1028        let prefix = Path::from("wal");
1029
1030        // Manually create a WAL segment with old format (no labels field)
1031        let old_format_json = r#"{
1032            "lsn": 1,
1033            "mutations": [
1034                {
1035                    "InsertVertex": {
1036                        "vid": 123,
1037                        "properties": {}
1038                    }
1039                }
1040            ]
1041        }"#;
1042
1043        let path = prefix.clone().join("00000000000000000001_test.wal");
1044        store.put(&path, old_format_json.into()).await?;
1045
1046        // Create WAL and replay
1047        let wal = WriteAheadLog::new(store, prefix);
1048        let mutations = wal.replay().await?;
1049
1050        // Verify old format deserializes with empty labels (via #[serde(default)])
1051        assert_eq!(mutations.len(), 1);
1052        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
1053            assert_eq!(vid.as_u64(), 123);
1054            assert_eq!(
1055                labels.len(),
1056                0,
1057                "Old format should deserialize with empty labels"
1058            );
1059        } else {
1060            panic!("Expected InsertVertex mutation");
1061        }
1062
1063        Ok(())
1064    }
1065
1066    /// `flush` serializes through the borrowed `WalSegmentRef`; its bytes must
1067    /// be identical to the owned `WalSegment` so replay (which deserializes
1068    /// `WalSegment`) is unaffected.
1069    #[test]
1070    fn wal_segment_ref_serializes_identically() {
1071        let mut props = HashMap::new();
1072        props.insert("p".to_string(), uni_common::Value::Int(7));
1073        let mutations = vec![
1074            Mutation::InsertVertex {
1075                vid: Vid::new(1),
1076                properties: props,
1077                labels: vec!["L".to_string()],
1078            },
1079            Mutation::DeleteEdge {
1080                eid: Eid::new(2),
1081                src_vid: Vid::new(1),
1082                dst_vid: Vid::new(3),
1083                edge_type: 4,
1084                version: 5,
1085            },
1086        ];
1087        let owned = WalSegment {
1088            lsn: 42,
1089            mutations: mutations.clone(),
1090        };
1091        let borrowed = WalSegmentRef {
1092            lsn: 42,
1093            mutations: &mutations,
1094        };
1095        assert_eq!(
1096            serde_json::to_vec(&owned).unwrap(),
1097            serde_json::to_vec(&borrowed).unwrap()
1098        );
1099    }
1100}