Skip to main content

uni_store/runtime/
wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::store_utils::{
5    DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19/// Parse LSN from WAL segment filename format `{:020}_{uuid}.wal`.
20/// Returns None if the filename doesn't match the expected format.
21fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22    let filename = path.filename()?;
23    if filename.len() < 20 {
24        return None;
25    }
26    // `str::get(..20)` yields `None` (instead of panicking) when byte index 20
27    // falls mid-UTF-8-character, so a foreign multibyte filename is skipped.
28    filename.get(..20).and_then(|s| s.parse::<u64>().ok())
29}
30
31/// Magic prefix of checksummed (v2) WAL segments.
32///
33/// v2 layout: `UNIWAL2\n<64-hex-char blake3 of payload>\n<payload JSON>`.
34/// Segments without the magic are legacy (pre-2.0.7) raw JSON and are still
35/// readable; they just have no integrity protection.
36const WAL_V2_MAGIC: &[u8] = b"UNIWAL2\n";
37
38/// Length of the hex-encoded blake3 checksum in the v2 header.
39const WAL_V2_HASH_HEX_LEN: usize = 64;
40
41/// Wrap a serialized segment payload in the checksummed v2 envelope.
42fn encode_segment_envelope(payload_json: &[u8]) -> Vec<u8> {
43    let hash = blake3::hash(payload_json);
44    let mut out =
45        Vec::with_capacity(WAL_V2_MAGIC.len() + WAL_V2_HASH_HEX_LEN + 1 + payload_json.len());
46    out.extend_from_slice(WAL_V2_MAGIC);
47    out.extend_from_slice(hash.to_hex().as_bytes());
48    out.push(b'\n');
49    out.extend_from_slice(payload_json);
50    out
51}
52
53/// Decode a WAL segment from its on-disk bytes, verifying the checksum for
54/// v2 envelopes and falling back to legacy raw-JSON parsing otherwise.
55///
56/// Returns a human-readable corruption description on failure — the caller
57/// decides whether that is fatal (corrupt middle segment) or a tolerated
58/// torn tail (see [`WriteAheadLog::replay_since`]).
59///
60/// `pub` + `doc(hidden)` solely so `fuzz/fuzz_targets/wal_decode.rs` can
61/// drive it with arbitrary bytes; it is not part of the public API.
62#[doc(hidden)]
63pub fn decode_segment(bytes: &[u8]) -> std::result::Result<WalSegment, String> {
64    if let Some(rest) = bytes.strip_prefix(WAL_V2_MAGIC) {
65        if rest.len() < WAL_V2_HASH_HEX_LEN + 1 || rest[WAL_V2_HASH_HEX_LEN] != b'\n' {
66            return Err("truncated v2 segment header".to_string());
67        }
68        let (hex, payload_nl) = rest.split_at(WAL_V2_HASH_HEX_LEN);
69        let payload = &payload_nl[1..];
70        let expected =
71            std::str::from_utf8(hex).map_err(|_| "non-utf8 checksum header".to_string())?;
72        let actual = blake3::hash(payload);
73        if actual.to_hex().as_str() != expected {
74            return Err(format!(
75                "checksum mismatch (expected {expected}, computed {})",
76                actual.to_hex()
77            ));
78        }
79        serde_json::from_slice(payload).map_err(|e| format!("v2 payload parse: {e}"))
80    } else {
81        // Legacy (pre-2.0.7) segment: raw JSON, no checksum.
82        serde_json::from_slice(bytes).map_err(|e| format!("legacy segment parse: {e}"))
83    }
84}
85
86/// Test-only fault injection: when set, the next local-store segment fsync is
87/// treated as having failed, exercising the clean-abort delete path (review H3)
88/// without needing a real disk fault. Process-isolated under nextest.
89#[cfg(test)]
90pub(crate) static FAIL_NEXT_FSYNC: std::sync::atomic::AtomicBool =
91    std::sync::atomic::AtomicBool::new(false);
92
93/// Fsync a freshly written file and its parent directory.
94///
95/// The directory fsync makes the new directory entry itself durable across
96/// a crash (pattern borrowed from uni-sidecar's atomic `store_value`).
97///
98/// `pub(crate)` so the snapshot durability barrier (review C4) can reuse the
99/// exact same file+parent fsync the WAL uses for its own segments.
100pub(crate) fn sync_file_and_parent(path: &std::path::Path) -> std::io::Result<()> {
101    std::fs::File::open(path)?.sync_all()?;
102    #[cfg(unix)]
103    if let Some(dir) = path.parent() {
104        std::fs::File::open(dir)?.sync_all()?;
105    }
106    Ok(())
107}
108
109#[derive(Serialize, Deserialize, Debug, Clone)]
110pub enum Mutation {
111    InsertEdge {
112        src_vid: Vid,
113        dst_vid: Vid,
114        edge_type: u32,
115        eid: Eid,
116        version: u64,
117        properties: Properties,
118        /// Edge type name for metadata recovery. Optional for backward compatibility.
119        #[serde(default)]
120        edge_type_name: Option<String>,
121    },
122    DeleteEdge {
123        eid: Eid,
124        src_vid: Vid,
125        dst_vid: Vid,
126        edge_type: u32,
127        version: u64,
128    },
129    InsertVertex {
130        vid: Vid,
131        properties: Properties,
132        #[serde(default)]
133        labels: Vec<String>,
134    },
135    DeleteVertex {
136        vid: Vid,
137        #[serde(default)]
138        labels: Vec<String>,
139    },
140    /// Replaces a vertex's full label set (a `SET n:Label` / `REMOVE n:Label`
141    /// that touched no properties). Carries the complete resolved label set so
142    /// replay can REPLACE (removals included). Added after the original four
143    /// variants — externally-tagged serde_json, so old WAL segments (which never
144    /// contain it) deserialize unchanged.
145    SetVertexLabels { vid: Vid, labels: Vec<String> },
146}
147
148/// WAL segment with LSN for idempotent recovery
149#[derive(Serialize, Deserialize, Debug, Clone)]
150pub struct WalSegment {
151    /// Log Sequence Number - monotonically increasing per segment
152    pub lsn: u64,
153    /// Mutations in this segment
154    pub mutations: Vec<Mutation>,
155}
156
157/// Borrowed serialization view of [`WalSegment`].
158///
159/// Field names and order match `WalSegment` exactly, so the serde output is
160/// byte-identical; `flush` serializes through this to avoid deep-cloning the
161/// whole mutation batch per flush.
162#[derive(Serialize, Debug)]
163struct WalSegmentRef<'a> {
164    lsn: u64,
165    mutations: &'a [Mutation],
166}
167
168pub struct WriteAheadLog {
169    store: Arc<dyn ObjectStore>,
170    prefix: Path,
171    /// Filesystem root backing `store` when it is a local store. When set,
172    /// every flushed segment is fsync'd (file + parent directory) before the
173    /// flush is reported durable — `object_store::LocalFileSystem` does not
174    /// fsync on `put`, so without this a power loss can drop acknowledged
175    /// commits. `None` for remote stores (the PUT ack is the durability
176    /// point there).
177    local_root: Option<std::path::PathBuf>,
178    state: Mutex<WalState>,
179}
180
181struct WalState {
182    buffer: Vec<Mutation>,
183    /// Current LSN counter (incremented per flush)
184    next_lsn: u64,
185    /// Highest LSN successfully flushed
186    flushed_lsn: u64,
187}
188
189impl WriteAheadLog {
190    pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
191        Self {
192            store,
193            prefix,
194            local_root: None,
195            state: Mutex::new(WalState {
196                buffer: Vec::new(),
197                next_lsn: 1, // Start at 1 so 0 means "no WAL"
198                flushed_lsn: 0,
199            }),
200        }
201    }
202
203    /// Set the local filesystem root backing the object store, enabling
204    /// fsync-on-flush. See the field docs on `local_root`.
205    #[must_use]
206    pub fn with_local_root(mut self, local_root: Option<std::path::PathBuf>) -> Self {
207        self.local_root = local_root;
208        self
209    }
210
211    /// Initialize WAL state from existing segments (called on startup)
212    pub async fn initialize(&self) -> Result<u64> {
213        let max_lsn = self.find_max_lsn().await?;
214        {
215            let mut state = acquire_mutex(&self.state, "wal_state")?;
216            state.next_lsn = max_lsn + 1;
217            state.flushed_lsn = max_lsn;
218        }
219        Ok(max_lsn)
220    }
221
222    /// Find the maximum LSN in existing WAL segments by parsing filenames.
223    /// Only downloads segments if filename parsing fails (fallback).
224    async fn find_max_lsn(&self) -> Result<u64> {
225        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
226        let mut max_lsn: u64 = 0;
227
228        for meta in metas {
229            // Try to parse LSN from filename first (fast path)
230            if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
231                max_lsn = max_lsn.max(lsn);
232            } else {
233                // Fallback: download and parse segment if filename doesn't match expected format
234                warn!(
235                    path = %meta.location,
236                    "WAL filename doesn't match expected format, downloading segment"
237                );
238                let get_result =
239                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
240                let bytes = get_result.bytes().await?;
241                if bytes.is_empty() {
242                    continue;
243                }
244                // This is only a max-LSN probe; a corrupt segment is skipped
245                // here (with a warning) and adjudicated by `replay_since`'s
246                // tail-vs-middle policy during actual recovery.
247                match decode_segment(&bytes) {
248                    Ok(segment) => max_lsn = max_lsn.max(segment.lsn),
249                    Err(reason) => {
250                        warn!(path = %meta.location, reason = %reason,
251                            "Skipping corrupt WAL segment during max-LSN probe");
252                    }
253                }
254            }
255        }
256
257        Ok(max_lsn)
258    }
259
260    #[instrument(skip(self, mutation), level = "trace")]
261    pub fn append(&self, mutation: Mutation) -> Result<()> {
262        let mut state = acquire_mutex(&self.state, "wal_state")?;
263        state.buffer.push(mutation);
264        metrics::counter!("uni_wal_entries_total").increment(1);
265        Ok(())
266    }
267
268    /// Flush buffered mutations to a WAL segment. Returns the LSN of the flushed segment.
269    #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
270    pub async fn flush(&self) -> Result<u64> {
271        let start = std::time::Instant::now();
272        let (batch, lsn) = {
273            let mut state = acquire_mutex(&self.state, "wal_state")?;
274            if state.buffer.is_empty() {
275                return Ok(state.flushed_lsn);
276            }
277            let lsn = state.next_lsn;
278            state.next_lsn += 1;
279            (std::mem::take(&mut state.buffer), lsn)
280        };
281
282        tracing::Span::current().record("lsn", lsn);
283        tracing::Span::current().record("mutations_count", batch.len());
284
285        // Serialize a borrowed view of the segment (serde output is identical
286        // to `WalSegment` — see `wal_segment_ref_serializes_identically`), so
287        // the batch is not deep-cloned per flush. `batch` itself stays owned
288        // for the restore-on-failure paths below.
289        let segment = WalSegmentRef {
290            lsn,
291            mutations: &batch,
292        };
293
294        // Serialize segment; restore buffer on failure
295        let json = match serde_json::to_vec(&segment) {
296            Ok(j) => j,
297            Err(e) => {
298                warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
299                // Restore buffer on serialization failure
300                let mut state = acquire_mutex(&self.state, "wal_state")?;
301                let new_mutations = std::mem::take(&mut state.buffer);
302                state.buffer = batch;
303                state.buffer.extend(new_mutations);
304                // Don't roll back LSN - gap is harmless and maintains monotonicity
305                return Err(e.into());
306            }
307        };
308        // Wrap in the checksummed v2 envelope so torn/corrupt segments are
309        // detected at replay instead of surfacing as opaque parse errors.
310        let body = encode_segment_envelope(&json);
311        tracing::Span::current().record("size_bytes", body.len());
312        metrics::counter!("uni_wal_bytes_written_total").increment(body.len() as u64);
313
314        // Include LSN in filename for easy ordering and identification
315        let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
316        let path = self.prefix.clone().join(filename);
317
318        // Attempt to write; restore buffer on failure to prevent data loss
319        if let Err(e) = put_with_timeout(&self.store, &path, body.into(), DEFAULT_TIMEOUT).await {
320            warn!(
321                lsn,
322                error = %e,
323                "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
324            );
325            // Restore buffer so data isn't lost on transient failures
326            let mut state = acquire_mutex(&self.state, "wal_state")?;
327            // Prepend the failed batch to any new mutations that arrived
328            let new_mutations = std::mem::take(&mut state.buffer);
329            state.buffer = batch;
330            state.buffer.extend(new_mutations);
331            // Don't roll back LSN - gap is harmless and maintains strict monotonicity
332            // All WAL consumers use `>` / `<=` comparisons, not equality checks
333            return Err(e);
334        }
335
336        // Local stores: fsync the segment + its directory before reporting
337        // the flush durable. On fsync failure we report failure (durability
338        // cannot be guaranteed) AND delete the just-written segment: the bytes
339        // are already on disk, so leaving them would let a later crash + replay
340        // resurrect a transaction the caller was told had FAILED (ghost commit).
341        // `flushed_lsn` is intentionally left un-advanced. (review H3)
342        if let Some(root) = &self.local_root {
343            let file_path = root.join(path.as_ref());
344            #[cfg(test)]
345            let synced = if FAIL_NEXT_FSYNC.swap(false, std::sync::atomic::Ordering::SeqCst) {
346                Ok(Err(std::io::Error::other("injected fsync failure")))
347            } else {
348                tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await
349            };
350            #[cfg(not(test))]
351            let synced =
352                tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await;
353            let fsync_err: Option<anyhow::Error> = match synced {
354                Ok(Ok(())) => None,
355                Ok(Err(e)) => Some(e.into()),
356                Err(e) => Some(e.into()),
357            };
358            if let Some(err) = fsync_err {
359                warn!(
360                    lsn,
361                    error = %err,
362                    "WAL segment fsync failed — deleting the non-durable segment to avoid a ghost commit on replay"
363                );
364                // Best-effort clean abort. If the delete ALSO fails the WAL is
365                // in an indeterminate state (a non-durable segment may survive
366                // and replay); surface that as a hard error rather than a
367                // routine flush failure so the caller does not silently retry.
368                if let Err(del_err) = delete_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await
369                {
370                    return Err(anyhow::anyhow!(
371                        "WAL segment fsync failed ({err}) and the cleanup delete \
372                         of segment at lsn {lsn} also failed ({del_err}); the WAL \
373                         may contain a non-durable segment"
374                    ));
375                }
376                return Err(err);
377            }
378        }
379
380        // Update flushed LSN on success
381        {
382            let mut state = acquire_mutex(&self.state, "wal_state")?;
383            state.flushed_lsn = lsn;
384        }
385
386        let duration = start.elapsed();
387        metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
388        metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
389
390        if duration.as_millis() > 100 {
391            warn!(
392                lsn,
393                duration_ms = duration.as_millis(),
394                "Slow WAL flush detected"
395            );
396        } else {
397            debug!(
398                lsn,
399                duration_ms = duration.as_millis(),
400                "WAL flush completed"
401            );
402        }
403
404        Ok(lsn)
405    }
406
407    /// Get the highest LSN that has been flushed.
408    ///
409    /// # Errors
410    ///
411    /// Returns error if the WAL state lock is poisoned (see issue #18/#150).
412    pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
413        let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
414        Ok(guard.flushed_lsn)
415    }
416
417    /// Replay WAL segments with LSN > high_water_mark.
418    /// Returns mutations from segments that haven't been applied yet.
419    /// Optimized to skip downloading segments with LSN <= high_water_mark (parsed from filename).
420    ///
421    /// Corruption policy: a corrupt (bad checksum / unparseable / empty)
422    /// segment at the **tail** of the log is the classic torn write from a
423    /// crash — it is logged prominently and treated as end-of-WAL, since the
424    /// commit it belonged to was never acknowledged. A corrupt segment with
425    /// valid segments **after** it is real data loss and fails recovery with
426    /// an error naming the file.
427    #[instrument(skip(self), level = "debug")]
428    pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
429        let start = std::time::Instant::now();
430        debug!(high_water_mark, "Replaying WAL segments");
431        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
432        let mut mutations = Vec::new();
433
434        // Collect candidate paths and sort by LSN (filename prefix).
435        // Lexicographical sort works for the zero-padded LSN prefix.
436        let mut paths: Vec<_> = metas
437            .into_iter()
438            .map(|m| m.location)
439            .filter(|p| {
440                // Skip segments identifiable as <= high_water_mark without
441                // downloading. Unparseable filenames stay in (legacy safety).
442                parse_lsn_from_filename(p).is_none_or(|lsn| lsn > high_water_mark)
443            })
444            .collect();
445        paths.sort();
446
447        let mut segments_replayed = 0;
448
449        for (idx, path) in paths.iter().enumerate() {
450            let get_result = get_with_timeout(&self.store, path, DEFAULT_TIMEOUT).await?;
451            let bytes = get_result.bytes().await?;
452
453            // Empty files and decode failures share one corruption policy.
454            let decoded = if bytes.is_empty() {
455                Err("empty segment file".to_string())
456            } else {
457                decode_segment(&bytes)
458            };
459
460            let segment = match decoded {
461                Ok(segment) => segment,
462                Err(reason) => {
463                    let is_tail = idx + 1 == paths.len();
464                    if is_tail {
465                        warn!(
466                            path = %path,
467                            reason = %reason,
468                            "Corrupt tail WAL segment — torn write from a crash; \
469                             treating as end of WAL (the commit was never acknowledged)"
470                        );
471                        break;
472                    }
473                    return Err(anyhow::anyhow!(
474                        "corrupt WAL segment '{path}' ({reason}) with {} later segment(s) \
475                         present; refusing to skip — manual inspection required",
476                        paths.len() - idx - 1
477                    ));
478                }
479            };
480
481            // Double-check LSN from segment content (handles fallback case)
482            if segment.lsn > high_water_mark {
483                mutations.extend(segment.mutations);
484                segments_replayed += 1;
485            }
486        }
487
488        info!(
489            segments_replayed,
490            mutations_count = mutations.len(),
491            "WAL replay completed"
492        );
493        metrics::histogram!("uni_wal_replay_duration_seconds")
494            .record(start.elapsed().as_secs_f64());
495
496        Ok(mutations)
497    }
498
499    /// Replay all WAL segments.
500    pub async fn replay(&self) -> Result<Vec<Mutation>> {
501        self.replay_since(0).await
502    }
503
504    /// Deletes WAL segments with LSN <= high_water_mark by parsing filenames.
505    /// Only downloads segments if filename parsing fails (fallback).
506    #[instrument(skip(self), level = "info")]
507    pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
508        info!(high_water_mark, "Truncating WAL segments");
509        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
510
511        let mut deleted_count = 0;
512        for meta in metas {
513            // Try to parse LSN from filename first (fast path)
514            let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
515                lsn <= high_water_mark
516            } else {
517                // Fallback: download and parse segment if filename doesn't match expected format
518                warn!(
519                    path = %meta.location,
520                    "WAL filename doesn't match expected format, downloading segment"
521                );
522                let get_result =
523                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
524                let bytes = get_result.bytes().await?;
525                if bytes.is_empty() {
526                    // Empty segments should be deleted
527                    true
528                } else {
529                    match decode_segment(&bytes) {
530                        Ok(segment) => segment.lsn <= high_water_mark,
531                        Err(reason) => {
532                            // Never delete a corrupt segment during
533                            // truncation — keep the evidence; replay's
534                            // tail-vs-middle policy adjudicates it.
535                            warn!(path = %meta.location, reason = %reason,
536                                "Keeping corrupt WAL segment during truncation");
537                            false
538                        }
539                    }
540                }
541            };
542
543            if should_delete {
544                delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
545                deleted_count += 1;
546            }
547        }
548        info!(deleted_count, "WAL truncation completed");
549        Ok(())
550    }
551
552    /// Check if any WAL segments exist (for detecting database with lost manifest).
553    pub async fn has_segments(&self) -> Result<bool> {
554        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
555        Ok(!metas.is_empty())
556    }
557
558    pub async fn truncate(&self) -> Result<()> {
559        info!("Truncating all WAL segments");
560        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
561
562        let mut deleted_count = 0;
563        for meta in metas {
564            delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
565            deleted_count += 1;
566        }
567        info!(deleted_count, "Full WAL truncation completed");
568        Ok(())
569    }
570}
571
572#[cfg(test)]
573mod tests {
574    use super::*;
575    use object_store::ObjectStoreExt;
576    use object_store::local::LocalFileSystem;
577    use std::collections::HashMap;
578    use tempfile::tempdir;
579
580    #[tokio::test]
581    async fn test_wal_append_replay() -> Result<()> {
582        let dir = tempdir()?;
583        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
584        let prefix = Path::from("wal");
585
586        let wal = WriteAheadLog::new(store, prefix);
587
588        let mutation = Mutation::InsertVertex {
589            vid: Vid::new(1),
590            properties: HashMap::new(),
591            labels: vec![],
592        };
593
594        wal.append(mutation)?;
595        wal.flush().await?;
596
597        let mutations = wal.replay().await?;
598        assert_eq!(mutations.len(), 1);
599        if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
600            assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
601        } else {
602            panic!("Wrong mutation type");
603        }
604
605        wal.truncate().await?;
606        let mutations2 = wal.replay().await?;
607        assert_eq!(mutations2.len(), 0);
608
609        Ok(())
610    }
611
612    #[tokio::test]
613    async fn test_lsn_monotonicity() -> Result<()> {
614        // Verify that LSN is strictly monotonic even across multiple flushes
615        let dir = tempdir()?;
616        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
617        let prefix = Path::from("wal");
618
619        let wal = WriteAheadLog::new(store, prefix);
620
621        let mutation1 = Mutation::InsertVertex {
622            vid: Vid::new(1),
623            properties: HashMap::new(),
624            labels: vec![],
625        };
626        let mutation2 = Mutation::InsertVertex {
627            vid: Vid::new(2),
628            properties: HashMap::new(),
629            labels: vec![],
630        };
631        let mutation3 = Mutation::InsertVertex {
632            vid: Vid::new(3),
633            properties: HashMap::new(),
634            labels: vec![],
635        };
636
637        // First flush
638        wal.append(mutation1)?;
639        let lsn1 = wal.flush().await?;
640
641        // Second flush
642        wal.append(mutation2)?;
643        let lsn2 = wal.flush().await?;
644
645        // Third flush
646        wal.append(mutation3)?;
647        let lsn3 = wal.flush().await?;
648
649        // Verify strict monotonicity
650        assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
651        assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
652
653        // Verify LSNs are consecutive
654        assert_eq!(lsn2, lsn1 + 1);
655        assert_eq!(lsn3, lsn2 + 1);
656
657        Ok(())
658    }
659
660    /// H3: when a segment's fsync fails, the just-written (non-durable) segment
661    /// must be deleted so a later crash + replay cannot resurrect a transaction
662    /// the caller was told had failed (ghost commit). `flush()` reports failure.
663    #[tokio::test]
664    async fn fsync_failure_deletes_segment_no_ghost_commit() -> Result<()> {
665        let dir = tempdir()?;
666        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
667        let prefix = Path::from("wal");
668        // local_root must be set for the fsync barrier (and thus the H3 path) to run.
669        let wal = WriteAheadLog::new(store, prefix).with_local_root(Some(dir.path().to_path_buf()));
670
671        wal.append(Mutation::InsertVertex {
672            vid: Vid::new(1),
673            properties: HashMap::new(),
674            labels: vec![],
675        })?;
676
677        // Force the next segment fsync to fail.
678        FAIL_NEXT_FSYNC.store(true, std::sync::atomic::Ordering::SeqCst);
679        let result = wal.flush().await;
680        assert!(
681            result.is_err(),
682            "flush must report failure when the segment fsync fails"
683        );
684
685        // The non-durable segment must have been deleted: replay surfaces nothing.
686        let replayed = wal.replay().await?;
687        assert!(
688            replayed.is_empty(),
689            "a segment whose fsync failed must not be replayable (ghost commit); got {} mutations",
690            replayed.len()
691        );
692        Ok(())
693    }
694
695    #[test]
696    fn test_parse_lsn_from_filename() {
697        // Standard format
698        let path = Path::from("00000000000000000042_a1b2c3d4.wal");
699        assert_eq!(parse_lsn_from_filename(&path), Some(42));
700
701        let path = Path::from("00000000000000001234_e5f6a7b8.wal");
702        assert_eq!(parse_lsn_from_filename(&path), Some(1234));
703
704        // Leading zeros
705        let path = Path::from("00000000000000000001_xyz.wal");
706        assert_eq!(parse_lsn_from_filename(&path), Some(1));
707
708        // Large LSN (within u64 range)
709        let path = Path::from("12345678901234567890_uuid.wal");
710        assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
711
712        // Invalid formats
713        let path = Path::from("invalid.wal");
714        assert_eq!(parse_lsn_from_filename(&path), None);
715
716        let path = Path::from("123.wal"); // Too short
717        assert_eq!(parse_lsn_from_filename(&path), None);
718
719        let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); // Non-numeric
720        assert_eq!(parse_lsn_from_filename(&path), None);
721
722        // Missing underscore separator (but first 20 chars are valid LSN)
723        let path = Path::from("00000000000000000100.wal");
724        assert_eq!(parse_lsn_from_filename(&path), Some(100));
725
726        // Empty path
727        let path = Path::from("");
728        assert_eq!(parse_lsn_from_filename(&path), None);
729    }
730
731    /// Regression for Bug #30: `parse_lsn_from_filename` must not panic on a
732    /// filename whose byte 20 falls in the middle of a multi-byte UTF-8 char.
733    ///
734    /// The length guard uses [`str::len`] (byte length), but the subsequent
735    /// `filename[..20]` slice requires byte 20 to be a char boundary. A name of
736    /// 19 ASCII bytes plus one 2-byte char ('é', bytes 19..21) has a byte length
737    /// of at least 20, passes the guard, then slices mid-'é' and panics today.
738    /// The correct behavior is to return `None` for a non-numeric/unparsable name.
739    ///
740    /// We use [`Path::parse`] (not [`Path::from`]) because `from` percent-encodes
741    /// non-ASCII so that `filename()` would be pure ASCII and never reach the
742    /// mid-char slice; `parse` preserves the raw multi-byte segment verbatim,
743    /// which is exactly what a real on-disk listing can surface.
744    #[test]
745    fn test_parse_lsn_from_filename_multibyte_no_panic() {
746        let name = format!("{}{}.wal", "0".repeat(19), "é"); // byte 20 falls mid-'é'
747        let path = Path::parse(name).expect("multi-byte segment is a valid object_store path");
748        assert_eq!(parse_lsn_from_filename(&path), None); // RED: panics today; correct = None
749    }
750
751    /// Test for Issue #6: WAL initialization should parse LSN from filenames
752    /// without downloading all segments
753    #[tokio::test]
754    async fn test_find_max_lsn_scalability() -> Result<()> {
755        let dir = tempdir()?;
756        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
757        let prefix = Path::from("wal");
758
759        let wal = WriteAheadLog::new(store, prefix);
760
761        // Create 100 WAL segments with increasing LSNs
762        for i in 1..=100 {
763            let mutation = Mutation::InsertVertex {
764                vid: Vid::new(i),
765                properties: HashMap::new(),
766                labels: vec![],
767            };
768            wal.append(mutation)?;
769            wal.flush().await?;
770        }
771
772        // Measure initialization time - should be fast (parsing filenames, not downloading)
773        let start = std::time::Instant::now();
774        let max_lsn = wal.find_max_lsn().await?;
775        let duration = start.elapsed();
776
777        // Verify correctness
778        assert_eq!(max_lsn, 100, "Max LSN should be 100");
779
780        // Verify performance - should complete quickly even with many segments
781        assert!(
782            duration.as_millis() < 1000,
783            "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
784            duration.as_millis()
785        );
786
787        Ok(())
788    }
789
790    /// Test for Issue #11: LSN gaps are preserved on flush failures (watermark pattern)
791    #[tokio::test]
792    async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
793        let dir = tempdir()?;
794        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
795        let prefix = Path::from("wal");
796
797        let wal = WriteAheadLog::new(store.clone(), prefix.clone());
798
799        // Flush mutation 1 successfully
800        wal.append(Mutation::InsertVertex {
801            vid: Vid::new(1),
802            properties: HashMap::new(),
803            labels: vec![],
804        })?;
805        let lsn1 = wal.flush().await?;
806        assert_eq!(lsn1, 1);
807
808        // Flush mutation 2 successfully
809        wal.append(Mutation::InsertVertex {
810            vid: Vid::new(2),
811            properties: HashMap::new(),
812            labels: vec![],
813        })?;
814        let lsn2 = wal.flush().await?;
815        assert_eq!(lsn2, 2);
816
817        // Simulate a scenario where flush might fail by creating a read-only store
818        // (In real scenario, network failures would cause this)
819        // For now, verify that LSN assignment happens BEFORE write attempt
820        // by checking that next_lsn increments even if we don't flush
821
822        // Append mutation 3 but DON'T flush
823        wal.append(Mutation::InsertVertex {
824            vid: Vid::new(3),
825            properties: HashMap::new(),
826            labels: vec![],
827        })?;
828
829        // Now flush mutation 4
830        wal.append(Mutation::InsertVertex {
831            vid: Vid::new(4),
832            properties: HashMap::new(),
833            labels: vec![],
834        })?;
835        let lsn4 = wal.flush().await?;
836
837        // LSN should be 3 (both mutations 3 and 4 flushed together)
838        assert_eq!(lsn4, 3, "LSN should increment monotonically");
839
840        // Verify all mutations can be replayed
841        let mutations = wal.replay().await?;
842        assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
843
844        Ok(())
845    }
846
847    /// Test for Issue #11: Verify LSN watermark pattern - no LSN reuse
848    #[tokio::test]
849    async fn test_lsn_watermark_no_reuse() -> Result<()> {
850        let dir = tempdir()?;
851        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
852        let prefix = Path::from("wal");
853
854        let wal = WriteAheadLog::new(store, prefix);
855
856        // Track all LSNs we've seen
857        let mut seen_lsns = std::collections::HashSet::new();
858
859        // Perform 50 flushes
860        for i in 1..=50 {
861            wal.append(Mutation::InsertVertex {
862                vid: Vid::new(i),
863                properties: HashMap::new(),
864                labels: vec![],
865            })?;
866            let lsn = wal.flush().await?;
867
868            // Verify no LSN reuse
869            assert!(
870                !seen_lsns.contains(&lsn),
871                "LSN {} was reused! This violates monotonicity.",
872                lsn
873            );
874            seen_lsns.insert(lsn);
875
876            // Verify LSN is strictly increasing
877            assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
878        }
879
880        Ok(())
881    }
882
883    /// Test for Issue #33: WAL truncation should parse LSN from filenames
884    /// without downloading all segments
885    #[tokio::test]
886    async fn test_truncate_scalability() -> Result<()> {
887        let dir = tempdir()?;
888        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
889        let prefix = Path::from("wal");
890
891        let wal = WriteAheadLog::new(store, prefix);
892
893        // Create 100 WAL segments
894        for i in 1..=100 {
895            let mutation = Mutation::InsertVertex {
896                vid: Vid::new(i),
897                properties: HashMap::new(),
898                labels: vec![],
899            };
900            wal.append(mutation)?;
901            wal.flush().await?;
902        }
903
904        // Truncate segments with LSN <= 50
905        let start = std::time::Instant::now();
906        wal.truncate_before(50).await?;
907        let duration = start.elapsed();
908
909        // Verify only segments 51-100 remain
910        let mutations = wal.replay().await?;
911        assert_eq!(
912            mutations.len(),
913            50,
914            "Should have 50 mutations remaining (51-100)"
915        );
916
917        // Verify performance - should be fast (filename parsing, not downloading)
918        assert!(
919            duration.as_millis() < 1000,
920            "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
921            duration.as_millis()
922        );
923
924        Ok(())
925    }
926
927    /// Test for Issue #6: replay_since should skip old segments by filename
928    #[tokio::test]
929    async fn test_replay_since_skips_old_segments() -> Result<()> {
930        let dir = tempdir()?;
931        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
932        let prefix = Path::from("wal");
933
934        let wal = WriteAheadLog::new(store, prefix);
935
936        // Create 100 WAL segments
937        for i in 1..=100 {
938            let mutation = Mutation::InsertVertex {
939                vid: Vid::new(i),
940                properties: HashMap::new(),
941                labels: vec![],
942            };
943            wal.append(mutation)?;
944            wal.flush().await?;
945        }
946
947        // Replay only segments with LSN > 90 (should skip 90 segments by filename)
948        let start = std::time::Instant::now();
949        let mutations = wal.replay_since(90).await?;
950        let duration = start.elapsed();
951
952        // Verify only 10 mutations returned (LSN 91-100)
953        assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
954
955        // Verify performance - should be fast (skips 90 segments by filename)
956        assert!(
957            duration.as_millis() < 500,
958            "replay_since took {}ms, expected < 500ms (should skip by filename)",
959            duration.as_millis()
960        );
961
962        Ok(())
963    }
964
965    /// Test for Issue #23: Vertex labels preserved through WAL replay
966    #[tokio::test]
967    async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
968        let dir = tempdir()?;
969        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
970        let prefix = Path::from("wal");
971
972        let wal = Arc::new(WriteAheadLog::new(store, prefix));
973
974        // Append InsertVertex with labels
975        wal.append(Mutation::InsertVertex {
976            vid: Vid::new(42),
977            properties: {
978                let mut props = HashMap::new();
979                props.insert(
980                    "name".to_string(),
981                    uni_common::Value::String("Alice".to_string()),
982                );
983                props
984            },
985            labels: vec!["Person".to_string(), "User".to_string()],
986        })?;
987
988        // Flush to WAL
989        wal.flush().await?;
990
991        // Replay mutations
992        let mutations = wal.replay().await?;
993        assert_eq!(mutations.len(), 1);
994
995        // Verify labels are preserved
996        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
997            assert_eq!(vid.as_u64(), 42);
998            assert_eq!(labels.len(), 2);
999            assert!(labels.contains(&"Person".to_string()));
1000            assert!(labels.contains(&"User".to_string()));
1001        } else {
1002            panic!("Expected InsertVertex mutation");
1003        }
1004
1005        Ok(())
1006    }
1007
1008    /// Test for Issue #23: DeleteVertex labels preserved through WAL replay
1009    #[tokio::test]
1010    async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
1011        let dir = tempdir()?;
1012        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1013        let prefix = Path::from("wal");
1014
1015        let wal = Arc::new(WriteAheadLog::new(store, prefix));
1016
1017        // Append DeleteVertex with labels (needed for tombstone flushing - Issue #76)
1018        wal.append(Mutation::DeleteVertex {
1019            vid: Vid::new(99),
1020            labels: vec!["Person".to_string(), "Admin".to_string()],
1021        })?;
1022
1023        // Flush to WAL
1024        wal.flush().await?;
1025
1026        // Replay mutations
1027        let mutations = wal.replay().await?;
1028        assert_eq!(mutations.len(), 1);
1029
1030        // Verify labels are preserved in DeleteVertex
1031        if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
1032            assert_eq!(vid.as_u64(), 99);
1033            assert_eq!(labels.len(), 2);
1034            assert!(labels.contains(&"Person".to_string()));
1035            assert!(labels.contains(&"Admin".to_string()));
1036        } else {
1037            panic!("Expected DeleteVertex mutation");
1038        }
1039
1040        Ok(())
1041    }
1042
1043    /// Test for Issue #28: Edge type name preserved through WAL replay
1044    #[tokio::test]
1045    async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
1046        let dir = tempdir()?;
1047        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1048        let prefix = Path::from("wal");
1049
1050        let wal = Arc::new(WriteAheadLog::new(store, prefix));
1051
1052        // Append InsertEdge with edge_type_name
1053        wal.append(Mutation::InsertEdge {
1054            src_vid: Vid::new(1),
1055            dst_vid: Vid::new(2),
1056            edge_type: 100,
1057            eid: Eid::new(500),
1058            version: 1,
1059            properties: {
1060                let mut props = HashMap::new();
1061                props.insert("since".to_string(), uni_common::Value::Int(2020));
1062                props
1063            },
1064            edge_type_name: Some("KNOWS".to_string()),
1065        })?;
1066
1067        // Flush to WAL
1068        wal.flush().await?;
1069
1070        // Replay mutations
1071        let mutations = wal.replay().await?;
1072        assert_eq!(mutations.len(), 1);
1073
1074        // Verify edge_type_name is preserved
1075        if let Mutation::InsertEdge {
1076            eid,
1077            edge_type_name,
1078            ..
1079        } = &mutations[0]
1080        {
1081            assert_eq!(eid.as_u64(), 500);
1082            assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
1083        } else {
1084            panic!("Expected InsertEdge mutation");
1085        }
1086
1087        Ok(())
1088    }
1089
1090    /// Test for Issue #23: Backward compatibility with old WAL segments (no labels)
1091    #[tokio::test]
1092    async fn test_wal_backward_compatibility_labels() -> Result<()> {
1093        let dir = tempdir()?;
1094        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1095        let prefix = Path::from("wal");
1096
1097        // Manually create a WAL segment with old format (no labels field)
1098        let old_format_json = r#"{
1099            "lsn": 1,
1100            "mutations": [
1101                {
1102                    "InsertVertex": {
1103                        "vid": 123,
1104                        "properties": {}
1105                    }
1106                }
1107            ]
1108        }"#;
1109
1110        let path = prefix.clone().join("00000000000000000001_test.wal");
1111        store.put(&path, old_format_json.into()).await?;
1112
1113        // Create WAL and replay
1114        let wal = WriteAheadLog::new(store, prefix);
1115        let mutations = wal.replay().await?;
1116
1117        // Verify old format deserializes with empty labels (via #[serde(default)])
1118        assert_eq!(mutations.len(), 1);
1119        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
1120            assert_eq!(vid.as_u64(), 123);
1121            assert_eq!(
1122                labels.len(),
1123                0,
1124                "Old format should deserialize with empty labels"
1125            );
1126        } else {
1127            panic!("Expected InsertVertex mutation");
1128        }
1129
1130        Ok(())
1131    }
1132
1133    /// `flush` serializes through the borrowed `WalSegmentRef`; its bytes must
1134    /// be identical to the owned `WalSegment` so replay (which deserializes
1135    /// `WalSegment`) is unaffected.
1136    #[test]
1137    fn wal_segment_ref_serializes_identically() {
1138        let mut props = HashMap::new();
1139        props.insert("p".to_string(), uni_common::Value::Int(7));
1140        let mutations = vec![
1141            Mutation::InsertVertex {
1142                vid: Vid::new(1),
1143                properties: props,
1144                labels: vec!["L".to_string()],
1145            },
1146            Mutation::DeleteEdge {
1147                eid: Eid::new(2),
1148                src_vid: Vid::new(1),
1149                dst_vid: Vid::new(3),
1150                edge_type: 4,
1151                version: 5,
1152            },
1153        ];
1154        let owned = WalSegment {
1155            lsn: 42,
1156            mutations: mutations.clone(),
1157        };
1158        let borrowed = WalSegmentRef {
1159            lsn: 42,
1160            mutations: &mutations,
1161        };
1162        assert_eq!(
1163            serde_json::to_vec(&owned).unwrap(),
1164            serde_json::to_vec(&borrowed).unwrap()
1165        );
1166    }
1167}