Skip to main content

uni_store/runtime/
wal.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::store_utils::{
5    DEFAULT_TIMEOUT, delete_with_timeout, get_with_timeout, list_with_timeout, put_with_timeout,
6};
7use anyhow::Result;
8use metrics;
9use object_store::ObjectStore;
10use object_store::path::Path;
11use serde::{Deserialize, Serialize};
12use std::sync::{Arc, Mutex};
13use tracing::{debug, info, instrument, warn};
14use uni_common::Properties;
15use uni_common::core::id::{Eid, Vid};
16use uni_common::sync::acquire_mutex;
17use uuid::Uuid;
18
19/// Parse LSN from WAL segment filename format `{:020}_{uuid}.wal`.
20/// Returns None if the filename doesn't match the expected format.
21fn parse_lsn_from_filename(path: &Path) -> Option<u64> {
22    let filename = path.filename()?;
23    if filename.len() < 20 {
24        return None;
25    }
26    // `str::get(..20)` yields `None` (instead of panicking) when byte index 20
27    // falls mid-UTF-8-character, so a foreign multibyte filename is skipped.
28    filename.get(..20).and_then(|s| s.parse::<u64>().ok())
29}
30
31/// Magic prefix of checksummed (v2) WAL segments.
32///
33/// v2 layout: `UNIWAL2\n<64-hex-char blake3 of payload>\n<payload JSON>`.
34/// Segments without the magic are legacy (pre-2.0.7) raw JSON and are still
35/// readable; they just have no integrity protection.
36const WAL_V2_MAGIC: &[u8] = b"UNIWAL2\n";
37
38/// Length of the hex-encoded blake3 checksum in the v2 header.
39const WAL_V2_HASH_HEX_LEN: usize = 64;
40
41/// Wrap a serialized segment payload in the checksummed v2 envelope.
42fn encode_segment_envelope(payload_json: &[u8]) -> Vec<u8> {
43    let hash = blake3::hash(payload_json);
44    let mut out =
45        Vec::with_capacity(WAL_V2_MAGIC.len() + WAL_V2_HASH_HEX_LEN + 1 + payload_json.len());
46    out.extend_from_slice(WAL_V2_MAGIC);
47    out.extend_from_slice(hash.to_hex().as_bytes());
48    out.push(b'\n');
49    out.extend_from_slice(payload_json);
50    out
51}
52
53/// Decode a WAL segment from its on-disk bytes, verifying the checksum for
54/// v2 envelopes and falling back to legacy raw-JSON parsing otherwise.
55///
56/// Returns a human-readable corruption description on failure — the caller
57/// decides whether that is fatal (corrupt middle segment) or a tolerated
58/// torn tail (see [`WriteAheadLog::replay_since`]).
59///
60/// `pub` + `doc(hidden)` solely so `fuzz/fuzz_targets/wal_decode.rs` can
61/// drive it with arbitrary bytes; it is not part of the public API.
62#[doc(hidden)]
63pub fn decode_segment(bytes: &[u8]) -> std::result::Result<WalSegment, String> {
64    if let Some(rest) = bytes.strip_prefix(WAL_V2_MAGIC) {
65        if rest.len() < WAL_V2_HASH_HEX_LEN + 1 || rest[WAL_V2_HASH_HEX_LEN] != b'\n' {
66            return Err("truncated v2 segment header".to_string());
67        }
68        let (hex, payload_nl) = rest.split_at(WAL_V2_HASH_HEX_LEN);
69        let payload = &payload_nl[1..];
70        let expected =
71            std::str::from_utf8(hex).map_err(|_| "non-utf8 checksum header".to_string())?;
72        let actual = blake3::hash(payload);
73        if actual.to_hex().as_str() != expected {
74            return Err(format!(
75                "checksum mismatch (expected {expected}, computed {})",
76                actual.to_hex()
77            ));
78        }
79        serde_json::from_slice(payload).map_err(|e| format!("v2 payload parse: {e}"))
80    } else {
81        // Legacy (pre-2.0.7) segment: raw JSON, no checksum.
82        serde_json::from_slice(bytes).map_err(|e| format!("legacy segment parse: {e}"))
83    }
84}
85
86/// Test-only fault injection: when set, the next local-store segment fsync is
87/// treated as having failed, exercising the clean-abort delete path (review H3)
88/// without needing a real disk fault. Process-isolated under nextest.
89#[cfg(test)]
90pub(crate) static FAIL_NEXT_FSYNC: std::sync::atomic::AtomicBool =
91    std::sync::atomic::AtomicBool::new(false);
92
93/// Fsync a freshly written file and its parent directory.
94///
95/// The directory fsync makes the new directory entry itself durable across
96/// a crash (pattern borrowed from uni-sidecar's atomic `store_value`).
97///
98/// `pub(crate)` so the snapshot durability barrier (review C4) can reuse the
99/// exact same file+parent fsync the WAL uses for its own segments.
100pub(crate) fn sync_file_and_parent(path: &std::path::Path) -> std::io::Result<()> {
101    std::fs::File::open(path)?.sync_all()?;
102    #[cfg(unix)]
103    if let Some(dir) = path.parent() {
104        std::fs::File::open(dir)?.sync_all()?;
105    }
106    Ok(())
107}
108
109/// Lossless WAL serialization for a property map.
110///
111/// `uni_common::Value` is `#[serde(untagged)]`, so serializing a `Properties`
112/// map straight to serde_json is **lossy**: a `Value::SparseVector` collapses to
113/// a `Map`, a `Value::Vector` to a `List`, nested temporals to strings, etc. —
114/// the variant cannot be recovered on replay. The WAL is a persistence path, so
115/// it must not rely on untagged serde (the exact hazard called out on the
116/// `Value` type).
117///
118/// This module encodes each property value through the explicit, tagged
119/// CypherValue codec (`cypher_value_codec`), stored as a base64 string with a
120/// sentinel prefix. On read, a value carrying the prefix is CV-decoded (lossless,
121/// new format); any other value is a pre-existing legacy segment and is decoded
122/// through the old untagged path unchanged (backward compatible — no WAL version
123/// bump, and old segments behave exactly as before).
124mod cv_props {
125    use base64::Engine;
126    use serde::{Deserialize, Deserializer, Serialize, Serializer};
127    use std::collections::HashMap;
128    use uni_common::{Properties, Value};
129
130    /// Sentinel marking a CV-encoded value. The leading control byte cannot
131    /// begin a legacy untagged-JSON string a user could realistically store.
132    const CV_PREFIX: &str = "\u{1}uni_cv:";
133
134    pub fn serialize<S: Serializer>(props: &Properties, s: S) -> Result<S::Ok, S::Error> {
135        let engine = base64::engine::general_purpose::STANDARD;
136        let encoded: HashMap<&String, String> = props
137            .iter()
138            .map(|(k, v)| {
139                let bytes = uni_common::cypher_value_codec::encode(v);
140                (k, format!("{CV_PREFIX}{}", engine.encode(bytes)))
141            })
142            .collect();
143        encoded.serialize(s)
144    }
145
146    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Properties, D::Error> {
147        let raw: HashMap<String, serde_json::Value> = HashMap::deserialize(d)?;
148        let engine = base64::engine::general_purpose::STANDARD;
149        let mut out = Properties::with_capacity(raw.len());
150        for (k, jv) in raw {
151            let value = match &jv {
152                serde_json::Value::String(s) if s.starts_with(CV_PREFIX) => {
153                    let bytes = engine
154                        .decode(&s[CV_PREFIX.len()..])
155                        .map_err(serde::de::Error::custom)?;
156                    uni_common::cypher_value_codec::decode(&bytes)
157                        .map_err(serde::de::Error::custom)?
158                }
159                // Legacy (pre-CV) segment: an untagged `Value` written directly.
160                // Decode through the old path (lossy for SparseVector/Vector, but
161                // that is exactly the pre-existing behavior for old segments).
162                _ => serde_json::from_value::<Value>(jv).map_err(serde::de::Error::custom)?,
163            };
164            out.insert(k, value);
165        }
166        Ok(out)
167    }
168}
169
170#[derive(Serialize, Deserialize, Debug, Clone)]
171pub enum Mutation {
172    InsertEdge {
173        src_vid: Vid,
174        dst_vid: Vid,
175        edge_type: u32,
176        eid: Eid,
177        version: u64,
178        #[serde(with = "cv_props")]
179        properties: Properties,
180        /// Edge type name for metadata recovery. Optional for backward compatibility.
181        #[serde(default)]
182        edge_type_name: Option<String>,
183    },
184    DeleteEdge {
185        eid: Eid,
186        src_vid: Vid,
187        dst_vid: Vid,
188        edge_type: u32,
189        version: u64,
190    },
191    InsertVertex {
192        vid: Vid,
193        #[serde(with = "cv_props")]
194        properties: Properties,
195        #[serde(default)]
196        labels: Vec<String>,
197    },
198    DeleteVertex {
199        vid: Vid,
200        #[serde(default)]
201        labels: Vec<String>,
202    },
203    /// Replaces a vertex's full label set (a `SET n:Label` / `REMOVE n:Label`
204    /// that touched no properties). Carries the complete resolved label set so
205    /// replay can REPLACE (removals included). Added after the original four
206    /// variants — externally-tagged serde_json, so old WAL segments (which never
207    /// contain it) deserialize unchanged.
208    SetVertexLabels { vid: Vid, labels: Vec<String> },
209}
210
211/// WAL segment with LSN for idempotent recovery
212#[derive(Serialize, Deserialize, Debug, Clone)]
213pub struct WalSegment {
214    /// Log Sequence Number - monotonically increasing per segment
215    pub lsn: u64,
216    /// Mutations in this segment
217    pub mutations: Vec<Mutation>,
218}
219
220/// Borrowed serialization view of [`WalSegment`].
221///
222/// Field names and order match `WalSegment` exactly, so the serde output is
223/// byte-identical; `flush` serializes through this to avoid deep-cloning the
224/// whole mutation batch per flush.
225#[derive(Serialize, Debug)]
226struct WalSegmentRef<'a> {
227    lsn: u64,
228    mutations: &'a [Mutation],
229}
230
231pub struct WriteAheadLog {
232    store: Arc<dyn ObjectStore>,
233    prefix: Path,
234    /// Filesystem root backing `store` when it is a local store. When set,
235    /// every flushed segment is fsync'd (file + parent directory) before the
236    /// flush is reported durable — `object_store::LocalFileSystem` does not
237    /// fsync on `put`, so without this a power loss can drop acknowledged
238    /// commits. `None` for remote stores (the PUT ack is the durability
239    /// point there).
240    local_root: Option<std::path::PathBuf>,
241    state: Mutex<WalState>,
242}
243
244struct WalState {
245    buffer: Vec<Mutation>,
246    /// Current LSN counter (incremented per flush)
247    next_lsn: u64,
248    /// Highest LSN successfully flushed
249    flushed_lsn: u64,
250}
251
252impl WriteAheadLog {
253    pub fn new(store: Arc<dyn ObjectStore>, prefix: Path) -> Self {
254        Self {
255            store,
256            prefix,
257            local_root: None,
258            state: Mutex::new(WalState {
259                buffer: Vec::new(),
260                next_lsn: 1, // Start at 1 so 0 means "no WAL"
261                flushed_lsn: 0,
262            }),
263        }
264    }
265
266    /// Set the local filesystem root backing the object store, enabling
267    /// fsync-on-flush. See the field docs on `local_root`.
268    #[must_use]
269    pub fn with_local_root(mut self, local_root: Option<std::path::PathBuf>) -> Self {
270        self.local_root = local_root;
271        self
272    }
273
274    /// Initialize WAL state from existing segments (called on startup)
275    pub async fn initialize(&self) -> Result<u64> {
276        let max_lsn = self.find_max_lsn().await?;
277        {
278            let mut state = acquire_mutex(&self.state, "wal_state")?;
279            state.next_lsn = max_lsn + 1;
280            state.flushed_lsn = max_lsn;
281        }
282        Ok(max_lsn)
283    }
284
285    /// Find the maximum LSN in existing WAL segments by parsing filenames.
286    /// Only downloads segments if filename parsing fails (fallback).
287    async fn find_max_lsn(&self) -> Result<u64> {
288        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
289        let mut max_lsn: u64 = 0;
290
291        for meta in metas {
292            // Try to parse LSN from filename first (fast path)
293            if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
294                max_lsn = max_lsn.max(lsn);
295            } else {
296                // Fallback: download and parse segment if filename doesn't match expected format
297                warn!(
298                    path = %meta.location,
299                    "WAL filename doesn't match expected format, downloading segment"
300                );
301                let get_result =
302                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
303                let bytes = get_result.bytes().await?;
304                if bytes.is_empty() {
305                    continue;
306                }
307                // This is only a max-LSN probe; a corrupt segment is skipped
308                // here (with a warning) and adjudicated by `replay_since`'s
309                // tail-vs-middle policy during actual recovery.
310                match decode_segment(&bytes) {
311                    Ok(segment) => max_lsn = max_lsn.max(segment.lsn),
312                    Err(reason) => {
313                        warn!(path = %meta.location, reason = %reason,
314                            "Skipping corrupt WAL segment during max-LSN probe");
315                    }
316                }
317            }
318        }
319
320        Ok(max_lsn)
321    }
322
323    #[instrument(skip(self, mutation), level = "trace")]
324    pub fn append(&self, mutation: Mutation) -> Result<()> {
325        let mut state = acquire_mutex(&self.state, "wal_state")?;
326        state.buffer.push(mutation);
327        metrics::counter!("uni_wal_entries_total").increment(1);
328        Ok(())
329    }
330
331    /// Flush buffered mutations to a WAL segment. Returns the LSN of the flushed segment.
332    #[instrument(skip(self), fields(lsn, mutations_count, size_bytes))]
333    pub async fn flush(&self) -> Result<u64> {
334        let start = std::time::Instant::now();
335        let (batch, lsn) = {
336            let mut state = acquire_mutex(&self.state, "wal_state")?;
337            if state.buffer.is_empty() {
338                return Ok(state.flushed_lsn);
339            }
340            let lsn = state.next_lsn;
341            state.next_lsn += 1;
342            (std::mem::take(&mut state.buffer), lsn)
343        };
344
345        tracing::Span::current().record("lsn", lsn);
346        tracing::Span::current().record("mutations_count", batch.len());
347
348        // Serialize a borrowed view of the segment (serde output is identical
349        // to `WalSegment` — see `wal_segment_ref_serializes_identically`), so
350        // the batch is not deep-cloned per flush. `batch` itself stays owned
351        // for the restore-on-failure paths below.
352        let segment = WalSegmentRef {
353            lsn,
354            mutations: &batch,
355        };
356
357        // Serialize segment; restore buffer on failure
358        let json = match serde_json::to_vec(&segment) {
359            Ok(j) => j,
360            Err(e) => {
361                warn!(lsn, error = %e, "Failed to serialize WAL segment, restoring buffer");
362                // Restore buffer on serialization failure
363                let mut state = acquire_mutex(&self.state, "wal_state")?;
364                let new_mutations = std::mem::take(&mut state.buffer);
365                state.buffer = batch;
366                state.buffer.extend(new_mutations);
367                // Don't roll back LSN - gap is harmless and maintains monotonicity
368                return Err(e.into());
369            }
370        };
371        // Wrap in the checksummed v2 envelope so torn/corrupt segments are
372        // detected at replay instead of surfacing as opaque parse errors.
373        let body = encode_segment_envelope(&json);
374        tracing::Span::current().record("size_bytes", body.len());
375        metrics::counter!("uni_wal_bytes_written_total").increment(body.len() as u64);
376
377        // Include LSN in filename for easy ordering and identification
378        let filename = format!("{:020}_{}.wal", lsn, Uuid::new_v4());
379        let path = self.prefix.clone().join(filename);
380
381        // Attempt to write; restore buffer on failure to prevent data loss
382        if let Err(e) = put_with_timeout(&self.store, &path, body.into(), DEFAULT_TIMEOUT).await {
383            warn!(
384                lsn,
385                error = %e,
386                "Failed to flush WAL segment, restoring buffer (LSN gap preserved for monotonicity)"
387            );
388            // Restore buffer so data isn't lost on transient failures
389            let mut state = acquire_mutex(&self.state, "wal_state")?;
390            // Prepend the failed batch to any new mutations that arrived
391            let new_mutations = std::mem::take(&mut state.buffer);
392            state.buffer = batch;
393            state.buffer.extend(new_mutations);
394            // Don't roll back LSN - gap is harmless and maintains strict monotonicity
395            // All WAL consumers use `>` / `<=` comparisons, not equality checks
396            return Err(e);
397        }
398
399        // Local stores: fsync the segment + its directory before reporting
400        // the flush durable. On fsync failure we report failure (durability
401        // cannot be guaranteed) AND delete the just-written segment: the bytes
402        // are already on disk, so leaving them would let a later crash + replay
403        // resurrect a transaction the caller was told had FAILED (ghost commit).
404        // `flushed_lsn` is intentionally left un-advanced. (review H3)
405        if let Some(root) = &self.local_root {
406            let file_path = root.join(path.as_ref());
407            #[cfg(test)]
408            let synced = if FAIL_NEXT_FSYNC.swap(false, std::sync::atomic::Ordering::SeqCst) {
409                Ok(Err(std::io::Error::other("injected fsync failure")))
410            } else {
411                tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await
412            };
413            #[cfg(not(test))]
414            let synced =
415                tokio::task::spawn_blocking(move || sync_file_and_parent(&file_path)).await;
416            let fsync_err: Option<anyhow::Error> = match synced {
417                Ok(Ok(())) => None,
418                Ok(Err(e)) => Some(e.into()),
419                Err(e) => Some(e.into()),
420            };
421            if let Some(err) = fsync_err {
422                warn!(
423                    lsn,
424                    error = %err,
425                    "WAL segment fsync failed — deleting the non-durable segment to avoid a ghost commit on replay"
426                );
427                // Best-effort clean abort. If the delete ALSO fails the WAL is
428                // in an indeterminate state (a non-durable segment may survive
429                // and replay); surface that as a hard error rather than a
430                // routine flush failure so the caller does not silently retry.
431                if let Err(del_err) = delete_with_timeout(&self.store, &path, DEFAULT_TIMEOUT).await
432                {
433                    return Err(anyhow::anyhow!(
434                        "WAL segment fsync failed ({err}) and the cleanup delete \
435                         of segment at lsn {lsn} also failed ({del_err}); the WAL \
436                         may contain a non-durable segment"
437                    ));
438                }
439                return Err(err);
440            }
441        }
442
443        // Update flushed LSN on success
444        {
445            let mut state = acquire_mutex(&self.state, "wal_state")?;
446            state.flushed_lsn = lsn;
447        }
448
449        let duration = start.elapsed();
450        metrics::histogram!("wal_flush_latency_ms").record(duration.as_millis() as f64);
451        metrics::histogram!("uni_wal_flush_duration_seconds").record(duration.as_secs_f64());
452
453        if duration.as_millis() > 100 {
454            warn!(
455                lsn,
456                duration_ms = duration.as_millis(),
457                "Slow WAL flush detected"
458            );
459        } else {
460            debug!(
461                lsn,
462                duration_ms = duration.as_millis(),
463                "WAL flush completed"
464            );
465        }
466
467        Ok(lsn)
468    }
469
470    /// Get the highest LSN that has been flushed.
471    ///
472    /// # Errors
473    ///
474    /// Returns error if the WAL state lock is poisoned (see issue #18/#150).
475    pub fn flushed_lsn(&self) -> Result<u64, uni_common::sync::LockPoisonedError> {
476        let guard = uni_common::sync::acquire_mutex(&self.state, "wal_state")?;
477        Ok(guard.flushed_lsn)
478    }
479
480    /// Replay WAL segments with LSN > high_water_mark.
481    /// Returns mutations from segments that haven't been applied yet.
482    /// Optimized to skip downloading segments with LSN <= high_water_mark (parsed from filename).
483    ///
484    /// Corruption policy: a corrupt (bad checksum / unparseable / empty)
485    /// segment at the **tail** of the log is the classic torn write from a
486    /// crash — it is logged prominently and treated as end-of-WAL, since the
487    /// commit it belonged to was never acknowledged. A corrupt segment with
488    /// valid segments **after** it is real data loss and fails recovery with
489    /// an error naming the file.
490    #[instrument(skip(self), level = "debug")]
491    pub async fn replay_since(&self, high_water_mark: u64) -> Result<Vec<Mutation>> {
492        let start = std::time::Instant::now();
493        debug!(high_water_mark, "Replaying WAL segments");
494        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
495        let mut mutations = Vec::new();
496
497        // Collect candidate paths and sort by LSN (filename prefix).
498        // Lexicographical sort works for the zero-padded LSN prefix.
499        let mut paths: Vec<_> = metas
500            .into_iter()
501            .map(|m| m.location)
502            .filter(|p| {
503                // Skip segments identifiable as <= high_water_mark without
504                // downloading. Unparseable filenames stay in (legacy safety).
505                parse_lsn_from_filename(p).is_none_or(|lsn| lsn > high_water_mark)
506            })
507            .collect();
508        paths.sort();
509
510        let mut segments_replayed = 0;
511
512        for (idx, path) in paths.iter().enumerate() {
513            let get_result = get_with_timeout(&self.store, path, DEFAULT_TIMEOUT).await?;
514            let bytes = get_result.bytes().await?;
515
516            // Empty files and decode failures share one corruption policy.
517            let decoded = if bytes.is_empty() {
518                Err("empty segment file".to_string())
519            } else {
520                decode_segment(&bytes)
521            };
522
523            let segment = match decoded {
524                Ok(segment) => segment,
525                Err(reason) => {
526                    let is_tail = idx + 1 == paths.len();
527                    if is_tail {
528                        warn!(
529                            path = %path,
530                            reason = %reason,
531                            "Corrupt tail WAL segment — torn write from a crash; \
532                             treating as end of WAL (the commit was never acknowledged)"
533                        );
534                        break;
535                    }
536                    return Err(anyhow::anyhow!(
537                        "corrupt WAL segment '{path}' ({reason}) with {} later segment(s) \
538                         present; refusing to skip — manual inspection required",
539                        paths.len() - idx - 1
540                    ));
541                }
542            };
543
544            // Double-check LSN from segment content (handles fallback case)
545            if segment.lsn > high_water_mark {
546                mutations.extend(segment.mutations);
547                segments_replayed += 1;
548            }
549        }
550
551        info!(
552            segments_replayed,
553            mutations_count = mutations.len(),
554            "WAL replay completed"
555        );
556        metrics::histogram!("uni_wal_replay_duration_seconds")
557            .record(start.elapsed().as_secs_f64());
558
559        Ok(mutations)
560    }
561
562    /// Replay all WAL segments.
563    pub async fn replay(&self) -> Result<Vec<Mutation>> {
564        self.replay_since(0).await
565    }
566
567    /// Deletes WAL segments with LSN <= high_water_mark by parsing filenames.
568    /// Only downloads segments if filename parsing fails (fallback).
569    #[instrument(skip(self), level = "info")]
570    pub async fn truncate_before(&self, high_water_mark: u64) -> Result<()> {
571        info!(high_water_mark, "Truncating WAL segments");
572        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
573
574        let mut deleted_count = 0;
575        for meta in metas {
576            // Try to parse LSN from filename first (fast path)
577            let should_delete = if let Some(lsn) = parse_lsn_from_filename(&meta.location) {
578                lsn <= high_water_mark
579            } else {
580                // Fallback: download and parse segment if filename doesn't match expected format
581                warn!(
582                    path = %meta.location,
583                    "WAL filename doesn't match expected format, downloading segment"
584                );
585                let get_result =
586                    get_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
587                let bytes = get_result.bytes().await?;
588                if bytes.is_empty() {
589                    // Empty segments should be deleted
590                    true
591                } else {
592                    match decode_segment(&bytes) {
593                        Ok(segment) => segment.lsn <= high_water_mark,
594                        Err(reason) => {
595                            // Never delete a corrupt segment during
596                            // truncation — keep the evidence; replay's
597                            // tail-vs-middle policy adjudicates it.
598                            warn!(path = %meta.location, reason = %reason,
599                                "Keeping corrupt WAL segment during truncation");
600                            false
601                        }
602                    }
603                }
604            };
605
606            if should_delete {
607                delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
608                deleted_count += 1;
609            }
610        }
611        info!(deleted_count, "WAL truncation completed");
612        Ok(())
613    }
614
615    /// Check if any WAL segments exist (for detecting database with lost manifest).
616    pub async fn has_segments(&self) -> Result<bool> {
617        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
618        Ok(!metas.is_empty())
619    }
620
621    pub async fn truncate(&self) -> Result<()> {
622        info!("Truncating all WAL segments");
623        let metas = list_with_timeout(&self.store, Some(&self.prefix), DEFAULT_TIMEOUT).await?;
624
625        let mut deleted_count = 0;
626        for meta in metas {
627            delete_with_timeout(&self.store, &meta.location, DEFAULT_TIMEOUT).await?;
628            deleted_count += 1;
629        }
630        info!(deleted_count, "Full WAL truncation completed");
631        Ok(())
632    }
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638    use object_store::ObjectStoreExt;
639    use object_store::local::LocalFileSystem;
640    use std::collections::HashMap;
641    use tempfile::tempdir;
642
643    #[tokio::test]
644    async fn test_wal_append_replay() -> Result<()> {
645        let dir = tempdir()?;
646        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
647        let prefix = Path::from("wal");
648
649        let wal = WriteAheadLog::new(store, prefix);
650
651        let mutation = Mutation::InsertVertex {
652            vid: Vid::new(1),
653            properties: HashMap::new(),
654            labels: vec![],
655        };
656
657        wal.append(mutation)?;
658        wal.flush().await?;
659
660        let mutations = wal.replay().await?;
661        assert_eq!(mutations.len(), 1);
662        if let Mutation::InsertVertex { vid, .. } = &mutations[0] {
663            assert_eq!(vid.as_u64(), Vid::new(1).as_u64());
664        } else {
665            panic!("Wrong mutation type");
666        }
667
668        wal.truncate().await?;
669        let mutations2 = wal.replay().await?;
670        assert_eq!(mutations2.len(), 0);
671
672        Ok(())
673    }
674
675    #[tokio::test]
676    async fn test_lsn_monotonicity() -> Result<()> {
677        // Verify that LSN is strictly monotonic even across multiple flushes
678        let dir = tempdir()?;
679        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
680        let prefix = Path::from("wal");
681
682        let wal = WriteAheadLog::new(store, prefix);
683
684        let mutation1 = Mutation::InsertVertex {
685            vid: Vid::new(1),
686            properties: HashMap::new(),
687            labels: vec![],
688        };
689        let mutation2 = Mutation::InsertVertex {
690            vid: Vid::new(2),
691            properties: HashMap::new(),
692            labels: vec![],
693        };
694        let mutation3 = Mutation::InsertVertex {
695            vid: Vid::new(3),
696            properties: HashMap::new(),
697            labels: vec![],
698        };
699
700        // First flush
701        wal.append(mutation1)?;
702        let lsn1 = wal.flush().await?;
703
704        // Second flush
705        wal.append(mutation2)?;
706        let lsn2 = wal.flush().await?;
707
708        // Third flush
709        wal.append(mutation3)?;
710        let lsn3 = wal.flush().await?;
711
712        // Verify strict monotonicity
713        assert!(lsn2 > lsn1, "LSN2 ({}) should be > LSN1 ({})", lsn2, lsn1);
714        assert!(lsn3 > lsn2, "LSN3 ({}) should be > LSN2 ({})", lsn3, lsn2);
715
716        // Verify LSNs are consecutive
717        assert_eq!(lsn2, lsn1 + 1);
718        assert_eq!(lsn3, lsn2 + 1);
719
720        Ok(())
721    }
722
723    /// H3: when a segment's fsync fails, the just-written (non-durable) segment
724    /// must be deleted so a later crash + replay cannot resurrect a transaction
725    /// the caller was told had failed (ghost commit). `flush()` reports failure.
726    #[tokio::test]
727    async fn fsync_failure_deletes_segment_no_ghost_commit() -> Result<()> {
728        let dir = tempdir()?;
729        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
730        let prefix = Path::from("wal");
731        // local_root must be set for the fsync barrier (and thus the H3 path) to run.
732        let wal = WriteAheadLog::new(store, prefix).with_local_root(Some(dir.path().to_path_buf()));
733
734        wal.append(Mutation::InsertVertex {
735            vid: Vid::new(1),
736            properties: HashMap::new(),
737            labels: vec![],
738        })?;
739
740        // Force the next segment fsync to fail.
741        FAIL_NEXT_FSYNC.store(true, std::sync::atomic::Ordering::SeqCst);
742        let result = wal.flush().await;
743        assert!(
744            result.is_err(),
745            "flush must report failure when the segment fsync fails"
746        );
747
748        // The non-durable segment must have been deleted: replay surfaces nothing.
749        let replayed = wal.replay().await?;
750        assert!(
751            replayed.is_empty(),
752            "a segment whose fsync failed must not be replayable (ghost commit); got {} mutations",
753            replayed.len()
754        );
755        Ok(())
756    }
757
758    #[test]
759    fn test_parse_lsn_from_filename() {
760        // Standard format
761        let path = Path::from("00000000000000000042_a1b2c3d4.wal");
762        assert_eq!(parse_lsn_from_filename(&path), Some(42));
763
764        let path = Path::from("00000000000000001234_e5f6a7b8.wal");
765        assert_eq!(parse_lsn_from_filename(&path), Some(1234));
766
767        // Leading zeros
768        let path = Path::from("00000000000000000001_xyz.wal");
769        assert_eq!(parse_lsn_from_filename(&path), Some(1));
770
771        // Large LSN (within u64 range)
772        let path = Path::from("12345678901234567890_uuid.wal");
773        assert_eq!(parse_lsn_from_filename(&path), Some(12345678901234567890));
774
775        // Invalid formats
776        let path = Path::from("invalid.wal");
777        assert_eq!(parse_lsn_from_filename(&path), None);
778
779        let path = Path::from("123.wal"); // Too short
780        assert_eq!(parse_lsn_from_filename(&path), None);
781
782        let path = Path::from("abcdefghijklmnopqrst_uuid.wal"); // Non-numeric
783        assert_eq!(parse_lsn_from_filename(&path), None);
784
785        // Missing underscore separator (but first 20 chars are valid LSN)
786        let path = Path::from("00000000000000000100.wal");
787        assert_eq!(parse_lsn_from_filename(&path), Some(100));
788
789        // Empty path
790        let path = Path::from("");
791        assert_eq!(parse_lsn_from_filename(&path), None);
792    }
793
794    /// Regression for Bug #30: `parse_lsn_from_filename` must not panic on a
795    /// filename whose byte 20 falls in the middle of a multi-byte UTF-8 char.
796    ///
797    /// The length guard uses [`str::len`] (byte length), but the subsequent
798    /// `filename[..20]` slice requires byte 20 to be a char boundary. A name of
799    /// 19 ASCII bytes plus one 2-byte char ('é', bytes 19..21) has a byte length
800    /// of at least 20, passes the guard, then slices mid-'é' and panics today.
801    /// The correct behavior is to return `None` for a non-numeric/unparsable name.
802    ///
803    /// We use [`Path::parse`] (not [`Path::from`]) because `from` percent-encodes
804    /// non-ASCII so that `filename()` would be pure ASCII and never reach the
805    /// mid-char slice; `parse` preserves the raw multi-byte segment verbatim,
806    /// which is exactly what a real on-disk listing can surface.
807    #[test]
808    fn test_parse_lsn_from_filename_multibyte_no_panic() {
809        let name = format!("{}{}.wal", "0".repeat(19), "é"); // byte 20 falls mid-'é'
810        let path = Path::parse(name).expect("multi-byte segment is a valid object_store path");
811        assert_eq!(parse_lsn_from_filename(&path), None); // RED: panics today; correct = None
812    }
813
814    /// Test for Issue #6: WAL initialization should parse LSN from filenames
815    /// without downloading all segments
816    #[tokio::test]
817    async fn test_find_max_lsn_scalability() -> Result<()> {
818        let dir = tempdir()?;
819        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
820        let prefix = Path::from("wal");
821
822        let wal = WriteAheadLog::new(store, prefix);
823
824        // Create 100 WAL segments with increasing LSNs
825        for i in 1..=100 {
826            let mutation = Mutation::InsertVertex {
827                vid: Vid::new(i),
828                properties: HashMap::new(),
829                labels: vec![],
830            };
831            wal.append(mutation)?;
832            wal.flush().await?;
833        }
834
835        // Measure initialization time - should be fast (parsing filenames, not downloading)
836        let start = std::time::Instant::now();
837        let max_lsn = wal.find_max_lsn().await?;
838        let duration = start.elapsed();
839
840        // Verify correctness
841        assert_eq!(max_lsn, 100, "Max LSN should be 100");
842
843        // Verify performance - should complete quickly even with many segments
844        assert!(
845            duration.as_millis() < 1000,
846            "find_max_lsn took {}ms, expected < 1000ms (filename parsing should be fast)",
847            duration.as_millis()
848        );
849
850        Ok(())
851    }
852
853    /// Test for Issue #11: LSN gaps are preserved on flush failures (watermark pattern)
854    #[tokio::test]
855    async fn test_lsn_gaps_preserved_on_flush_failure() -> Result<()> {
856        let dir = tempdir()?;
857        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
858        let prefix = Path::from("wal");
859
860        let wal = WriteAheadLog::new(store.clone(), prefix.clone());
861
862        // Flush mutation 1 successfully
863        wal.append(Mutation::InsertVertex {
864            vid: Vid::new(1),
865            properties: HashMap::new(),
866            labels: vec![],
867        })?;
868        let lsn1 = wal.flush().await?;
869        assert_eq!(lsn1, 1);
870
871        // Flush mutation 2 successfully
872        wal.append(Mutation::InsertVertex {
873            vid: Vid::new(2),
874            properties: HashMap::new(),
875            labels: vec![],
876        })?;
877        let lsn2 = wal.flush().await?;
878        assert_eq!(lsn2, 2);
879
880        // Simulate a scenario where flush might fail by creating a read-only store
881        // (In real scenario, network failures would cause this)
882        // For now, verify that LSN assignment happens BEFORE write attempt
883        // by checking that next_lsn increments even if we don't flush
884
885        // Append mutation 3 but DON'T flush
886        wal.append(Mutation::InsertVertex {
887            vid: Vid::new(3),
888            properties: HashMap::new(),
889            labels: vec![],
890        })?;
891
892        // Now flush mutation 4
893        wal.append(Mutation::InsertVertex {
894            vid: Vid::new(4),
895            properties: HashMap::new(),
896            labels: vec![],
897        })?;
898        let lsn4 = wal.flush().await?;
899
900        // LSN should be 3 (both mutations 3 and 4 flushed together)
901        assert_eq!(lsn4, 3, "LSN should increment monotonically");
902
903        // Verify all mutations can be replayed
904        let mutations = wal.replay().await?;
905        assert_eq!(mutations.len(), 4, "All 4 mutations should be replayed");
906
907        Ok(())
908    }
909
910    /// Test for Issue #11: Verify LSN watermark pattern - no LSN reuse
911    #[tokio::test]
912    async fn test_lsn_watermark_no_reuse() -> Result<()> {
913        let dir = tempdir()?;
914        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
915        let prefix = Path::from("wal");
916
917        let wal = WriteAheadLog::new(store, prefix);
918
919        // Track all LSNs we've seen
920        let mut seen_lsns = std::collections::HashSet::new();
921
922        // Perform 50 flushes
923        for i in 1..=50 {
924            wal.append(Mutation::InsertVertex {
925                vid: Vid::new(i),
926                properties: HashMap::new(),
927                labels: vec![],
928            })?;
929            let lsn = wal.flush().await?;
930
931            // Verify no LSN reuse
932            assert!(
933                !seen_lsns.contains(&lsn),
934                "LSN {} was reused! This violates monotonicity.",
935                lsn
936            );
937            seen_lsns.insert(lsn);
938
939            // Verify LSN is strictly increasing
940            assert_eq!(lsn, i, "LSN should be {}, got {}", i, lsn);
941        }
942
943        Ok(())
944    }
945
946    /// Test for Issue #33: WAL truncation should parse LSN from filenames
947    /// without downloading all segments
948    #[tokio::test]
949    async fn test_truncate_scalability() -> Result<()> {
950        let dir = tempdir()?;
951        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
952        let prefix = Path::from("wal");
953
954        let wal = WriteAheadLog::new(store, prefix);
955
956        // Create 100 WAL segments
957        for i in 1..=100 {
958            let mutation = Mutation::InsertVertex {
959                vid: Vid::new(i),
960                properties: HashMap::new(),
961                labels: vec![],
962            };
963            wal.append(mutation)?;
964            wal.flush().await?;
965        }
966
967        // Truncate segments with LSN <= 50
968        let start = std::time::Instant::now();
969        wal.truncate_before(50).await?;
970        let duration = start.elapsed();
971
972        // Verify only segments 51-100 remain
973        let mutations = wal.replay().await?;
974        assert_eq!(
975            mutations.len(),
976            50,
977            "Should have 50 mutations remaining (51-100)"
978        );
979
980        // Verify performance - should be fast (filename parsing, not downloading)
981        assert!(
982            duration.as_millis() < 1000,
983            "truncate_before took {}ms, expected < 1000ms (filename parsing should be fast)",
984            duration.as_millis()
985        );
986
987        Ok(())
988    }
989
990    /// Test for Issue #6: replay_since should skip old segments by filename
991    #[tokio::test]
992    async fn test_replay_since_skips_old_segments() -> Result<()> {
993        let dir = tempdir()?;
994        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
995        let prefix = Path::from("wal");
996
997        let wal = WriteAheadLog::new(store, prefix);
998
999        // Create 100 WAL segments
1000        for i in 1..=100 {
1001            let mutation = Mutation::InsertVertex {
1002                vid: Vid::new(i),
1003                properties: HashMap::new(),
1004                labels: vec![],
1005            };
1006            wal.append(mutation)?;
1007            wal.flush().await?;
1008        }
1009
1010        // Replay only segments with LSN > 90 (should skip 90 segments by filename)
1011        let start = std::time::Instant::now();
1012        let mutations = wal.replay_since(90).await?;
1013        let duration = start.elapsed();
1014
1015        // Verify only 10 mutations returned (LSN 91-100)
1016        assert_eq!(mutations.len(), 10, "Should replay only LSNs 91-100");
1017
1018        // Verify performance - should be fast (skips 90 segments by filename)
1019        assert!(
1020            duration.as_millis() < 500,
1021            "replay_since took {}ms, expected < 500ms (should skip by filename)",
1022            duration.as_millis()
1023        );
1024
1025        Ok(())
1026    }
1027
1028    /// Test for Issue #23: Vertex labels preserved through WAL replay
1029    #[tokio::test]
1030    async fn test_wal_replay_preserves_vertex_labels() -> Result<()> {
1031        let dir = tempdir()?;
1032        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1033        let prefix = Path::from("wal");
1034
1035        let wal = Arc::new(WriteAheadLog::new(store, prefix));
1036
1037        // Append InsertVertex with labels
1038        wal.append(Mutation::InsertVertex {
1039            vid: Vid::new(42),
1040            properties: {
1041                let mut props = HashMap::new();
1042                props.insert(
1043                    "name".to_string(),
1044                    uni_common::Value::String("Alice".to_string()),
1045                );
1046                props
1047            },
1048            labels: vec!["Person".to_string(), "User".to_string()],
1049        })?;
1050
1051        // Flush to WAL
1052        wal.flush().await?;
1053
1054        // Replay mutations
1055        let mutations = wal.replay().await?;
1056        assert_eq!(mutations.len(), 1);
1057
1058        // Verify labels are preserved
1059        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
1060            assert_eq!(vid.as_u64(), 42);
1061            assert_eq!(labels.len(), 2);
1062            assert!(labels.contains(&"Person".to_string()));
1063            assert!(labels.contains(&"User".to_string()));
1064        } else {
1065            panic!("Expected InsertVertex mutation");
1066        }
1067
1068        Ok(())
1069    }
1070
1071    /// Test for Issue #23: DeleteVertex labels preserved through WAL replay
1072    #[tokio::test]
1073    async fn test_wal_replay_preserves_delete_vertex_labels() -> Result<()> {
1074        let dir = tempdir()?;
1075        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1076        let prefix = Path::from("wal");
1077
1078        let wal = Arc::new(WriteAheadLog::new(store, prefix));
1079
1080        // Append DeleteVertex with labels (needed for tombstone flushing - Issue #76)
1081        wal.append(Mutation::DeleteVertex {
1082            vid: Vid::new(99),
1083            labels: vec!["Person".to_string(), "Admin".to_string()],
1084        })?;
1085
1086        // Flush to WAL
1087        wal.flush().await?;
1088
1089        // Replay mutations
1090        let mutations = wal.replay().await?;
1091        assert_eq!(mutations.len(), 1);
1092
1093        // Verify labels are preserved in DeleteVertex
1094        if let Mutation::DeleteVertex { vid, labels } = &mutations[0] {
1095            assert_eq!(vid.as_u64(), 99);
1096            assert_eq!(labels.len(), 2);
1097            assert!(labels.contains(&"Person".to_string()));
1098            assert!(labels.contains(&"Admin".to_string()));
1099        } else {
1100            panic!("Expected DeleteVertex mutation");
1101        }
1102
1103        Ok(())
1104    }
1105
1106    /// Test for Issue #28: Edge type name preserved through WAL replay
1107    #[tokio::test]
1108    async fn test_wal_replay_preserves_edge_type_name() -> Result<()> {
1109        let dir = tempdir()?;
1110        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1111        let prefix = Path::from("wal");
1112
1113        let wal = Arc::new(WriteAheadLog::new(store, prefix));
1114
1115        // Append InsertEdge with edge_type_name
1116        wal.append(Mutation::InsertEdge {
1117            src_vid: Vid::new(1),
1118            dst_vid: Vid::new(2),
1119            edge_type: 100,
1120            eid: Eid::new(500),
1121            version: 1,
1122            properties: {
1123                let mut props = HashMap::new();
1124                props.insert("since".to_string(), uni_common::Value::Int(2020));
1125                props
1126            },
1127            edge_type_name: Some("KNOWS".to_string()),
1128        })?;
1129
1130        // Flush to WAL
1131        wal.flush().await?;
1132
1133        // Replay mutations
1134        let mutations = wal.replay().await?;
1135        assert_eq!(mutations.len(), 1);
1136
1137        // Verify edge_type_name is preserved
1138        if let Mutation::InsertEdge {
1139            eid,
1140            edge_type_name,
1141            ..
1142        } = &mutations[0]
1143        {
1144            assert_eq!(eid.as_u64(), 500);
1145            assert_eq!(edge_type_name.as_deref(), Some("KNOWS"));
1146        } else {
1147            panic!("Expected InsertEdge mutation");
1148        }
1149
1150        Ok(())
1151    }
1152
1153    /// Test for Issue #23: Backward compatibility with old WAL segments (no labels)
1154    #[tokio::test]
1155    async fn test_wal_backward_compatibility_labels() -> Result<()> {
1156        let dir = tempdir()?;
1157        let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
1158        let prefix = Path::from("wal");
1159
1160        // Manually create a WAL segment with old format (no labels field)
1161        let old_format_json = r#"{
1162            "lsn": 1,
1163            "mutations": [
1164                {
1165                    "InsertVertex": {
1166                        "vid": 123,
1167                        "properties": {}
1168                    }
1169                }
1170            ]
1171        }"#;
1172
1173        let path = prefix.clone().join("00000000000000000001_test.wal");
1174        store.put(&path, old_format_json.into()).await?;
1175
1176        // Create WAL and replay
1177        let wal = WriteAheadLog::new(store, prefix);
1178        let mutations = wal.replay().await?;
1179
1180        // Verify old format deserializes with empty labels (via #[serde(default)])
1181        assert_eq!(mutations.len(), 1);
1182        if let Mutation::InsertVertex { vid, labels, .. } = &mutations[0] {
1183            assert_eq!(vid.as_u64(), 123);
1184            assert_eq!(
1185                labels.len(),
1186                0,
1187                "Old format should deserialize with empty labels"
1188            );
1189        } else {
1190            panic!("Expected InsertVertex mutation");
1191        }
1192
1193        Ok(())
1194    }
1195
1196    /// `flush` serializes through the borrowed `WalSegmentRef`; its bytes must
1197    /// be identical to the owned `WalSegment` so replay (which deserializes
1198    /// `WalSegment`) is unaffected.
1199    #[test]
1200    fn wal_segment_ref_serializes_identically() {
1201        let mut props = HashMap::new();
1202        props.insert("p".to_string(), uni_common::Value::Int(7));
1203        let mutations = vec![
1204            Mutation::InsertVertex {
1205                vid: Vid::new(1),
1206                properties: props,
1207                labels: vec!["L".to_string()],
1208            },
1209            Mutation::DeleteEdge {
1210                eid: Eid::new(2),
1211                src_vid: Vid::new(1),
1212                dst_vid: Vid::new(3),
1213                edge_type: 4,
1214                version: 5,
1215            },
1216        ];
1217        let owned = WalSegment {
1218            lsn: 42,
1219            mutations: mutations.clone(),
1220        };
1221        let borrowed = WalSegmentRef {
1222            lsn: 42,
1223            mutations: &mutations,
1224        };
1225        assert_eq!(
1226            serde_json::to_vec(&owned).unwrap(),
1227            serde_json::to_vec(&borrowed).unwrap()
1228        );
1229    }
1230}