Skip to main content

uni_store/runtime/
l0.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15/// Items a read-write transaction observed during execution, used for SSI
16/// read-write antidependency detection at commit.
17///
18/// Shared (via `Arc<Mutex<_>>`) between the read path — which records reads
19/// through the transaction's `QueryContext` — and the commit path, which checks
20/// it against concurrently-committed write-sets. Item-level granularity;
21/// phantoms are out of scope (handled by the `FOR UPDATE` escape hatch).
22#[derive(Debug, Default)]
23pub struct OccReadSet {
24    /// Vertices the transaction read.
25    pub vertices: HashSet<Vid>,
26    /// Edges the transaction read.
27    pub edges: HashSet<Eid>,
28}
29
30impl OccReadSet {
31    /// `true` when nothing has been read yet. Used to decide whether a `FOR
32    /// UPDATE` acquisition may safely re-pin a still-fresh transaction.
33    pub fn is_empty(&self) -> bool {
34        self.vertices.is_empty() && self.edges.is_empty()
35    }
36}
37
38/// Returns the current timestamp in nanoseconds since Unix epoch.
39fn now_nanos() -> i64 {
40    SystemTime::now()
41        .duration_since(UNIX_EPOCH)
42        .map(|d| d.as_nanos() as i64)
43        .unwrap_or(0)
44}
45
46/// Returns the [`Crdt`] a property value encodes, or `None` if it is not one.
47///
48/// `Crdt` is `#[serde(tag = "t", content = "d")]`, so it deserializes only from a
49/// JSON object, and only `Value::Map(_)` produces one — gating on `Map` avoids
50/// allocating a JSON tree for large non-map values (e.g. embedding columns).
51///
52/// This is the single source of truth for "is this value CRDT-mergeable": both
53/// the commit-time merge ([`L0Buffer::merge_crdt_properties`]) and the OCC
54/// write-set carve-out ([`crate::runtime::occ::WriteSet::from_l0`]) consult it,
55/// so the carve-out can never exclude an item the merge would actually overwrite
56/// (which would silently lose an update).
57pub(crate) fn try_as_crdt(v: &Value) -> Option<Crdt> {
58    if !matches!(v, Value::Map(_)) {
59        return None;
60    }
61    serde_json::from_value::<Crdt>(v.clone().into()).ok()
62}
63
64/// Serialize a constraint key for O(1) uniqueness checks.
65/// Format: label + separator + sorted (prop_name, value) pairs.
66pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
67    let mut buf = label.as_bytes().to_vec();
68    buf.push(0); // separator
69    let mut sorted = key_values.to_vec();
70    sorted.sort_by(|a, b| a.0.cmp(&b.0));
71    for (k, v) in &sorted {
72        buf.extend(k.as_bytes());
73        buf.push(0);
74        // Use serde_json serialization for deterministic value encoding
75        buf.extend(serde_json::to_vec(v).unwrap_or_default());
76        buf.push(0);
77    }
78    buf
79}
80
81/// Per-type mutation counters accumulated by the L0 buffer.
82///
83/// Used to provide detailed mutation statistics (e.g., `nodes_created`,
84/// `relationships_deleted`) on `ExecuteResult`. Callers snapshot before/after
85/// execution and call [`diff()`](MutationStats::diff) to get the delta.
86#[derive(Debug, Clone, Default)]
87pub struct MutationStats {
88    pub nodes_created: usize,
89    pub nodes_deleted: usize,
90    pub relationships_created: usize,
91    pub relationships_deleted: usize,
92    pub properties_set: usize,
93    pub properties_removed: usize,
94    pub labels_added: usize,
95    pub labels_removed: usize,
96}
97
98impl MutationStats {
99    /// Compute the field-wise difference `self - before`.
100    pub fn diff(&self, before: &Self) -> Self {
101        Self {
102            nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
103            nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
104            relationships_created: self
105                .relationships_created
106                .saturating_sub(before.relationships_created),
107            relationships_deleted: self
108                .relationships_deleted
109                .saturating_sub(before.relationships_deleted),
110            properties_set: self.properties_set.saturating_sub(before.properties_set),
111            properties_removed: self
112                .properties_removed
113                .saturating_sub(before.properties_removed),
114            labels_added: self.labels_added.saturating_sub(before.labels_added),
115            labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
116        }
117    }
118}
119
120#[derive(Clone, Debug)]
121pub struct TombstoneEntry {
122    pub eid: Eid,
123    pub src_vid: Vid,
124    pub dst_vid: Vid,
125    pub edge_type: u32,
126}
127
128pub struct L0Buffer {
129    /// Graph topology using simple adjacency lists
130    pub graph: SimpleGraph,
131    /// Soft-deleted edges (tombstones for LSM-style merging)
132    pub tombstones: HashMap<Eid, TombstoneEntry>,
133    /// Soft-deleted vertices
134    pub vertex_tombstones: HashSet<Vid>,
135    /// Edge version tracking for MVCC
136    pub edge_versions: HashMap<Eid, u64>,
137    /// Vertex version tracking for MVCC
138    pub vertex_versions: HashMap<Vid, u64>,
139    /// Edge properties (stored separately from topology)
140    pub edge_properties: HashMap<Eid, Properties>,
141    /// Vertex properties (stored separately from topology)
142    pub vertex_properties: HashMap<Vid, Properties>,
143    /// Edge endpoint lookup: eid -> (src, dst, type)
144    pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
145    /// Vertex labels (VID -> list of label names)
146    /// New in storage design: vertices can have multiple labels
147    pub vertex_labels: HashMap<Vid, Vec<String>>,
148    /// Reverse index: label name → set of VIDs with that label. Maintained
149    /// alongside `vertex_labels` for O(1) label-based vertex lookups.
150    pub label_to_vids: HashMap<String, HashSet<Vid>>,
151    /// Vids whose FULL label set was explicitly replaced by a label mutation
152    /// (`SET n:Label` / `REMOVE n:Label`) in this buffer, via
153    /// [`L0Buffer::set_vertex_labels`]. Distinguishes a deliberate label
154    /// replacement from the empty `vertex_labels` entry a property-only write
155    /// incidentally creates (`entry().or_default()`), so `merge` knows to REPLACE
156    /// (not append) these vids' labels and `WriteSet::from_l0` knows they are
157    /// conflictable writes. A transaction-buffer concept; empty on main L0.
158    pub vertex_label_overwrites: HashSet<Vid>,
159    /// Edge types (EID -> type name)
160    pub edge_types: HashMap<Eid, String>,
161    /// Current version counter
162    pub current_version: u64,
163    /// Mutation count for flush decisions
164    pub mutation_count: usize,
165    /// Per-type mutation counters for detailed statistics.
166    pub mutation_stats: MutationStats,
167    /// Write-ahead log for durability
168    pub wal: Option<Arc<WriteAheadLog>>,
169    /// WAL LSN at the time this L0 was rotated for flush.
170    /// Used to ensure WAL truncation doesn't remove entries needed by pending flushes.
171    pub wal_lsn_at_flush: u64,
172    /// Vertex creation timestamps (nanoseconds since epoch)
173    pub vertex_created_at: HashMap<Vid, i64>,
174    /// Vertex update timestamps (nanoseconds since epoch)
175    pub vertex_updated_at: HashMap<Vid, i64>,
176    /// Edge creation timestamps (nanoseconds since epoch)
177    pub edge_created_at: HashMap<Eid, i64>,
178    /// Edge update timestamps (nanoseconds since epoch)
179    pub edge_updated_at: HashMap<Eid, i64>,
180    /// Estimated size in bytes for memory limit enforcement.
181    /// Incremented O(1) on each mutation to avoid O(V+E) size_bytes() calls.
182    pub estimated_size: usize,
183    /// Per-constraint index for O(1) unique key checks.
184    /// Key: constraint composite key (label + sorted property values serialized).
185    /// Value: Vid that owns this key.
186    pub constraint_index: HashMap<Vec<u8>, Vid>,
187    /// Per-VID set of property keys that should land via Lance MergeInsert
188    /// (partial-column update) at flush time. Populated by
189    /// `insert_vertex_partial`; cleared by full-row inserts and deletes.
190    /// A VID present here at flush time is emitted to the partial batch;
191    /// absent VIDs flush via the existing full-row Append.
192    pub vertex_partial_keys: HashMap<Vid, HashSet<String>>,
193    /// Edge analog of `vertex_partial_keys` (Round 12 §A). Populated by
194    /// `insert_edge_partial_full`; cleared by full-row inserts and edge
195    /// deletes. Per-edge-type delta-table flush honors these by emitting
196    /// a `MergeInsertBuilder` source with only the touched schema
197    /// columns plus `eid`, `op`, `_version`, `_updated_at`, and
198    /// `overflow_json` (when an overflow prop was touched).
199    pub edge_partial_keys: HashMap<Eid, HashSet<String>>,
200    /// Phase B (UniConfig::defer_embeddings): VIDs whose auto-embedding
201    /// was skipped at insert time and is owed at flush. Value = primary
202    /// label name (the rest of the embedding config is looked up from the
203    /// schema at flush time). Drained by `flush_stream_l1` before column
204    /// extraction; entries are removed when the embedding lands in the
205    /// vertex's L0 property map.
206    pub pending_embeddings: HashMap<Vid, String>,
207    /// Optimistic-concurrency read sequence (SSI). Stamped on a transaction's
208    /// private L0 at creation with the Writer's commit-sequence at that moment,
209    /// and consulted at commit to detect intervening conflicting commits. `0`
210    /// for the main L0 and when SSI is disabled.
211    pub occ_read_seq: u64,
212    /// Optimistic-concurrency read-set (SSI). `Some` on a read-write
213    /// transaction's private L0 when SSI tracking is active; the read path
214    /// records observed ids here and commit checks them for antidependencies.
215    /// `None` for the main L0 and read-only / SSI-disabled paths.
216    pub occ_read_set: Option<Arc<parking_lot::Mutex<OccReadSet>>>,
217}
218
219impl std::fmt::Debug for L0Buffer {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("L0Buffer")
222            .field("vertex_count", &self.graph.vertex_count())
223            .field("edge_count", &self.graph.edge_count())
224            .field("tombstones", &self.tombstones.len())
225            .field("vertex_tombstones", &self.vertex_tombstones.len())
226            .field("current_version", &self.current_version)
227            .field("mutation_count", &self.mutation_count)
228            .finish()
229    }
230}
231
232impl Clone for L0Buffer {
233    /// Clone the L0 buffer for fork/restore (ASSUME/ABDUCE).
234    ///
235    /// The cloned buffer does NOT share the WAL reference — forked L0s are
236    /// ephemeral and should not write to the WAL.
237    fn clone(&self) -> Self {
238        Self {
239            graph: self.graph.clone(),
240            tombstones: self.tombstones.clone(),
241            vertex_tombstones: self.vertex_tombstones.clone(),
242            edge_versions: self.edge_versions.clone(),
243            vertex_versions: self.vertex_versions.clone(),
244            edge_properties: self.edge_properties.clone(),
245            vertex_properties: self.vertex_properties.clone(),
246            edge_endpoints: self.edge_endpoints.clone(),
247            vertex_labels: self.vertex_labels.clone(),
248            label_to_vids: self.label_to_vids.clone(),
249            vertex_label_overwrites: self.vertex_label_overwrites.clone(),
250            edge_types: self.edge_types.clone(),
251            current_version: self.current_version,
252            mutation_count: self.mutation_count,
253            mutation_stats: self.mutation_stats.clone(),
254            wal: None, // Forked L0s don't share the WAL
255            wal_lsn_at_flush: self.wal_lsn_at_flush,
256            vertex_created_at: self.vertex_created_at.clone(),
257            vertex_updated_at: self.vertex_updated_at.clone(),
258            edge_created_at: self.edge_created_at.clone(),
259            edge_updated_at: self.edge_updated_at.clone(),
260            estimated_size: self.estimated_size,
261            constraint_index: self.constraint_index.clone(),
262            vertex_partial_keys: self.vertex_partial_keys.clone(),
263            edge_partial_keys: self.edge_partial_keys.clone(),
264            pending_embeddings: self.pending_embeddings.clone(),
265            occ_read_seq: self.occ_read_seq,
266            // Forked L0s (ASSUME/ABDUCE) do not participate in OCC tracking.
267            occ_read_set: None,
268        }
269    }
270}
271
272impl L0Buffer {
273    /// Append labels to a vec, skipping duplicates.
274    fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
275        for label in labels {
276            if !existing.contains(label) {
277                existing.push(label.clone());
278            }
279        }
280    }
281
282    /// Add a VID to the reverse label index for each of the given labels.
283    fn index_labels_for_vid(&mut self, vid: Vid, labels: &[String]) {
284        for label in labels {
285            self.label_to_vids
286                .entry(label.clone())
287                .or_default()
288                .insert(vid);
289        }
290    }
291
292    /// Remove a VID from all label entries in the reverse index.
293    fn remove_vid_from_label_index(&mut self, vid: Vid) {
294        if let Some(labels) = self.vertex_labels.get(&vid) {
295            for label in labels {
296                if let Some(set) = self.label_to_vids.get_mut(label) {
297                    set.remove(&vid);
298                }
299            }
300        }
301    }
302
303    /// Replaces a vertex's FULL label set — the semantics of `SET n:Label` /
304    /// `REMOVE n:Label`, which resolve the new complete set before writing.
305    ///
306    /// Unlike [`add_vertex_labels`](Self::add_vertex_labels) (append), this clears
307    /// the vid's existing labels from the reverse index, sets the new set, and
308    /// re-indexes — so a removal actually removes. It marks the vid in
309    /// `vertex_label_overwrites` so `merge` REPLACES (not appends) these labels at
310    /// commit and `WriteSet::from_l0` treats the change as a conflictable write.
311    /// Increments `mutation_count` (a label change is a real mutation; its sibling
312    /// `remove_vertex_label` already does so).
313    pub fn set_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
314        self.remove_vid_from_label_index(vid);
315        self.vertex_labels.insert(vid, labels.to_vec());
316        self.index_labels_for_vid(vid, labels);
317        self.vertex_label_overwrites.insert(vid);
318        self.current_version += 1;
319        self.mutation_count += 1;
320    }
321
322    /// Merge CRDT properties into an existing property map.
323    /// Attempts CRDT merge if both values are valid CRDTs, falls back to overwrite.
324    ///
325    /// When the entry is empty (new vertex insert), skips the expensive JSON
326    /// round-trip and directly assigns the properties.
327    ///
328    /// Logs a warning when a CRDT value is overwritten by a non-CRDT scalar
329    /// (limitation R1).
330    fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
331        // Fast path: new vertex with no existing properties — skip JSON round-trip
332        if entry.is_empty() {
333            *entry = properties;
334            return;
335        }
336
337        for (k, v) in properties {
338            // `try_as_crdt` performs the Map-gated CRDT probe (see its docs for the
339            // wide-row perf rationale). Sharing it with `WriteSet::from_l0` keeps the
340            // OCC carve-out consistent with this merge-versus-overwrite decision.
341            if let Some(mut new_crdt) = try_as_crdt(&v)
342                && let Some(existing_v) = entry.get(&k)
343                && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
344            {
345                // Use try_merge to avoid panic on type mismatch.
346                if new_crdt.try_merge(&existing_crdt).is_ok()
347                    && let Ok(merged_json) = serde_json::to_value(new_crdt)
348                {
349                    entry.insert(k, uni_common::Value::from(merged_json));
350                    continue;
351                }
352                // CRDT variant mismatch (or a failed re-serialize): fall through to
353                // a last-writer-wins overwrite, discarding the existing CRDT's
354                // merged state. The OCC commit-time carve-out check
355                // (`occ::crdt_carveout_overwrite`) aborts a *concurrent* writer
356                // before reaching here, and write-time schema enforcement rejects a
357                // wrong declared variant; this warns on any residual (e.g. a
358                // single-writer variant change) so the discarded state is visible.
359                tracing::warn!(
360                    property = %k,
361                    existing_variant = existing_crdt.type_name(),
362                    "overwriting CRDT property with a different CRDT variant \
363                     (last-writer-wins); merged CRDT state is discarded"
364                );
365            } else if try_as_crdt(&v).is_none()
366                && entry.get(&k).is_some_and(|e| try_as_crdt(e).is_some())
367            {
368                // R1: an existing CRDT value is overwritten by a non-CRDT scalar
369                // (last-writer-wins), silently discarding the CRDT's merged state
370                // — a property written as BOTH a CRDT and last-writer-wins. The OCC
371                // write-set carve-out lets CRDT-only writers commit without
372                // conflicting, so a concurrent LWW write on the same property
373                // cannot be flagged as a conflict; surface it here instead.
374                tracing::warn!(
375                    property = %k,
376                    "overwriting CRDT property with non-CRDT value (last-writer-wins); \
377                     merged CRDT state is discarded"
378                );
379            }
380            // Fallback: Overwrite (last-writer-wins).
381            entry.insert(k, v);
382        }
383    }
384
385    /// Helper function to estimate property map size in bytes.
386    fn estimate_properties_size(props: &Properties) -> usize {
387        props.keys().map(|k| k.len() + 32).sum()
388    }
389
390    /// Returns an estimate of the buffer size in bytes.
391    /// Includes all fields for accurate memory accounting.
392    pub fn size_bytes(&self) -> usize {
393        let mut total = 0;
394
395        // Topology
396        total += self.graph.vertex_count() * 8;
397        total += self.graph.edge_count() * 24;
398
399        // Properties (rough estimate: key string + 32 bytes for value)
400        for props in self.vertex_properties.values() {
401            total += Self::estimate_properties_size(props);
402        }
403        for props in self.edge_properties.values() {
404            total += Self::estimate_properties_size(props);
405        }
406
407        // Metadata
408        total += self.tombstones.len() * 64;
409        total += self.vertex_tombstones.len() * 8;
410        total += self.edge_versions.len() * 16;
411        total += self.vertex_versions.len() * 16;
412        total += self.edge_endpoints.len() * 28; // (Vid, Vid, u32) = 8+8+4 + overhead
413
414        // Vertex labels
415        for labels in self.vertex_labels.values() {
416            total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
417        }
418
419        // Reverse label index (label_to_vids)
420        for (label, vids) in &self.label_to_vids {
421            total += label.len() + 24 + vids.len() * 8 + 48; // string + HashSet overhead
422        }
423
424        // Edge types
425        for type_name in self.edge_types.values() {
426            total += type_name.len() + 24;
427        }
428
429        // Timestamps (4 maps, each entry is 16 bytes: 8-byte key + 8-byte i64 value)
430        total += self.vertex_created_at.len() * 16;
431        total += self.vertex_updated_at.len() * 16;
432        total += self.edge_created_at.len() * 16;
433        total += self.edge_updated_at.len() * 16;
434
435        total
436    }
437
438    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
439        Self {
440            graph: SimpleGraph::new(),
441            tombstones: HashMap::new(),
442            vertex_tombstones: HashSet::new(),
443            edge_versions: HashMap::new(),
444            vertex_versions: HashMap::new(),
445            edge_properties: HashMap::new(),
446            vertex_properties: HashMap::new(),
447            edge_endpoints: HashMap::new(),
448            vertex_labels: HashMap::new(),
449            label_to_vids: HashMap::new(),
450            vertex_label_overwrites: HashSet::new(),
451            edge_types: HashMap::new(),
452            current_version: start_version,
453            mutation_count: 0,
454            mutation_stats: MutationStats::default(),
455            wal,
456            wal_lsn_at_flush: 0,
457            vertex_created_at: HashMap::new(),
458            vertex_updated_at: HashMap::new(),
459            edge_created_at: HashMap::new(),
460            edge_updated_at: HashMap::new(),
461            estimated_size: 0,
462            constraint_index: HashMap::new(),
463            vertex_partial_keys: HashMap::new(),
464            edge_partial_keys: HashMap::new(),
465            pending_embeddings: HashMap::new(),
466            occ_read_seq: 0,
467            occ_read_set: None,
468        }
469    }
470
471    pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
472        self.insert_vertex_with_labels(vid, properties, &[]);
473    }
474
475    /// Insert a vertex with associated labels.
476    pub fn insert_vertex_with_labels(
477        &mut self,
478        vid: Vid,
479        properties: Properties,
480        labels: &[String],
481    ) {
482        self.insert_vertex_with_labels_impl(vid, properties, labels, false);
483    }
484
485    /// Core vertex insertion. When `skip_wal` is true, skips WAL append
486    /// (used during merge where the caller already wrote to WAL).
487    fn insert_vertex_with_labels_impl(
488        &mut self,
489        vid: Vid,
490        properties: Properties,
491        labels: &[String],
492        skip_wal: bool,
493    ) {
494        self.current_version += 1;
495        let version = self.current_version;
496        let now = now_nanos();
497
498        if !skip_wal && let Some(wal) = &self.wal {
499            let _ = wal.append(&Mutation::InsertVertex {
500                vid,
501                properties: properties.clone(),
502                labels: labels.to_vec(),
503            });
504        }
505
506        self.vertex_tombstones.remove(&vid);
507
508        // Full-row insert supersedes any pending partial-update state for
509        // this VID.
510        self.vertex_partial_keys.remove(&vid);
511
512        let entry = self.vertex_properties.entry(vid).or_default();
513        Self::merge_crdt_properties(entry, properties.clone());
514        self.vertex_versions.insert(vid, version);
515
516        // Set timestamps - created_at only set if this is a new vertex
517        self.vertex_created_at.entry(vid).or_insert(now);
518        self.vertex_updated_at.insert(vid, now);
519
520        // Track labels — always create an entry so unlabeled vertices are
521        // distinguishable from "not in L0" when queried via get_vertex_labels.
522        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
523        let existing = self.vertex_labels.entry(vid).or_default();
524        Self::append_unique_labels(existing, labels);
525        self.index_labels_for_vid(vid, labels);
526
527        self.graph.add_vertex(vid);
528        self.mutation_count += 1;
529        self.mutation_stats.nodes_created += 1;
530        self.mutation_stats.properties_set += properties.len();
531        self.mutation_stats.labels_added += labels.len();
532
533        let props_size = Self::estimate_properties_size(&properties);
534        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
535    }
536
537    /// Insert a vertex's FULL property row, tagging `touched_keys` so the
538    /// flush emits exactly those columns via Lance `MergeInsertBuilder`
539    /// instead of a full-row Append.
540    ///
541    /// `props` MUST be the fully-merged property map (storage union
542    /// in-flight L0 union the new touched values, per
543    /// `PropertyManager::get_all_vertex_props_with_ctx`). The caller is
544    /// responsible for the union; L0 here just stores it so scans see
545    /// the complete row without per-key reconciliation.
546    ///
547    /// `touched_keys` lists the property keys this SET statement
548    /// actually assigned — the union of those across all coalesced
549    /// SetItems on this VID. Lance MergeInsert sends a source batch
550    /// with `_vid`, `_deleted`, `_version`, `_updated_at`, and those
551    /// touched columns; non-touched columns retain their pre-merge
552    /// values on the Lance side, skipping the wide-row write.
553    ///
554    /// A subsequent full-row `insert_vertex_with_labels` or
555    /// `delete_vertex` on the same VID clears the partial-keys entry
556    /// so partial state never outlives a stronger write.
557    pub fn insert_vertex_partial_full(
558        &mut self,
559        vid: Vid,
560        props: Properties,
561        touched_keys: HashSet<String>,
562        labels: &[String],
563    ) {
564        // Stage the full row through the existing partial-impl (same
565        // CRDT merge / version bump / timestamps), preserving the
566        // partial-keys entry so we can extend it below.
567        self.insert_vertex_with_labels_partial_impl(vid, props, labels, false);
568        self.vertex_partial_keys
569            .entry(vid)
570            .or_default()
571            .extend(touched_keys);
572    }
573
574    /// Legacy partial-only variant used by some uni-store paths. Kept
575    /// for source-compatibility but new uni-query callers should use
576    /// `insert_vertex_partial_full` to preserve scan-side L0 visibility.
577    pub fn insert_vertex_partial(&mut self, vid: Vid, touched: Properties, labels: &[String]) {
578        // Record dirty keys BEFORE the full-row impl runs (which would
579        // clear them). The keys come from the touched set; the values
580        // are merged into L0 by the shared CRDT path below.
581        let touched_keys: Vec<String> = touched.keys().cloned().collect();
582
583        // If the VID already has a full-row pending insert (e.g., CREATE
584        // earlier in the same tx), we must NOT downgrade it to partial.
585        // Detected by: VID is in vertex_properties WITH a version stamp
586        // AND not currently in vertex_partial_keys → it was written as
587        // a full row recently. The conservative rule: only enable the
588        // partial path when there's no full-row pending insert. We
589        // approximate "no full-row pending" by checking that the VID's
590        // current entry in vertex_partial_keys is non-empty OR the VID
591        // is not in vertex_properties (fresh row, but caller asked
592        // partial — let it through and the post-flush union covers it).
593        let already_full = self.vertex_properties.contains_key(&vid)
594            && !self.vertex_partial_keys.contains_key(&vid);
595
596        // Stage the CRDT merge through the existing path. We bypass the
597        // full-row `insert_vertex_with_labels_impl` clearing of
598        // partial_keys by inlining the work, then restoring/extending
599        // the partial-key set.
600        self.insert_vertex_with_labels_partial_impl(vid, touched, labels, false);
601
602        if !already_full {
603            self.vertex_partial_keys
604                .entry(vid)
605                .or_default()
606                .extend(touched_keys);
607        }
608    }
609
610    /// Core partial-insert: same as `insert_vertex_with_labels_impl` but
611    /// preserves any existing `vertex_partial_keys[vid]` entry so the
612    /// caller can extend it after the merge.
613    fn insert_vertex_with_labels_partial_impl(
614        &mut self,
615        vid: Vid,
616        properties: Properties,
617        labels: &[String],
618        skip_wal: bool,
619    ) {
620        self.current_version += 1;
621        let version = self.current_version;
622        let now = now_nanos();
623
624        if !skip_wal && let Some(wal) = &self.wal {
625            // WAL records the partial as a full-row InsertVertex; on replay
626            // the full-row path runs (which clears partial_keys). This is
627            // semantically correct — L0 in memory always holds the union of
628            // partial deltas via merge_crdt_properties; recovery doesn't
629            // need to preserve partial-vs-full distinction.
630            let _ = wal.append(&Mutation::InsertVertex {
631                vid,
632                properties: properties.clone(),
633                labels: labels.to_vec(),
634            });
635        }
636
637        self.vertex_tombstones.remove(&vid);
638        // NOTE: deliberately DOES NOT remove from vertex_partial_keys.
639        // The caller (`insert_vertex_partial`) extends that set after.
640
641        let entry = self.vertex_properties.entry(vid).or_default();
642        Self::merge_crdt_properties(entry, properties.clone());
643        self.vertex_versions.insert(vid, version);
644
645        self.vertex_created_at.entry(vid).or_insert(now);
646        self.vertex_updated_at.insert(vid, now);
647
648        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
649        let existing = self.vertex_labels.entry(vid).or_default();
650        Self::append_unique_labels(existing, labels);
651        self.index_labels_for_vid(vid, labels);
652
653        self.graph.add_vertex(vid);
654        self.mutation_count += 1;
655        // Partial writes don't create new nodes — they update existing ones.
656        // But counting under properties_set is correct.
657        self.mutation_stats.properties_set += properties.len();
658        self.mutation_stats.labels_added += labels.len();
659
660        let props_size = Self::estimate_properties_size(&properties);
661        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
662    }
663
664    /// Add labels to an existing vertex.
665    pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
666        let existing = self.vertex_labels.entry(vid).or_default();
667        Self::append_unique_labels(existing, labels);
668        self.index_labels_for_vid(vid, labels);
669    }
670
671    /// Remove a label from an existing vertex.
672    /// Returns true if the label was found and removed, false otherwise.
673    pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
674        if let Some(labels) = self.vertex_labels.get_mut(&vid)
675            && let Some(pos) = labels.iter().position(|l| l == label)
676        {
677            labels.remove(pos);
678            if let Some(set) = self.label_to_vids.get_mut(label) {
679                set.remove(&vid);
680            }
681            self.current_version += 1;
682            self.mutation_count += 1;
683            self.mutation_stats.labels_removed += 1;
684            // Note: WAL logging for label mutations not yet implemented
685            // Currently consistent with add_vertex_labels behavior
686            return true;
687        }
688        false
689    }
690
691    /// Set the type for an edge.
692    pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
693        self.edge_types.insert(eid, edge_type);
694    }
695
696    pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
697        self.current_version += 1;
698
699        if let Some(wal) = &mut self.wal {
700            let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
701            wal.append(&Mutation::DeleteVertex { vid, labels })?;
702        }
703
704        self.apply_vertex_deletion(vid);
705        Ok(())
706    }
707
708    /// Cascade-delete a vertex: tombstone all connected edges and remove the vertex.
709    ///
710    /// Shared between `delete_vertex` (live mutations) and `replay_mutations` (WAL recovery).
711    fn apply_vertex_deletion(&mut self, vid: Vid) {
712        let version = self.current_version;
713
714        // Collect edges to delete using O(degree) neighbors() instead of O(E) scan
715        let mut edges_to_remove = HashSet::new();
716
717        // Collect outgoing edges
718        for entry in self.graph.neighbors(vid, Direction::Outgoing) {
719            edges_to_remove.insert(entry.eid);
720        }
721
722        // Collect incoming edges
723        for entry in self.graph.neighbors(vid, Direction::Incoming) {
724            edges_to_remove.insert(entry.eid); // HashSet handles self-loop deduplication
725        }
726
727        let cascaded_edges_count = edges_to_remove.len();
728
729        // Tombstone and remove all collected edges
730        for eid in edges_to_remove {
731            // Retrieve edge endpoints from the map to create tombstone
732            if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
733                self.tombstones.insert(
734                    eid,
735                    TombstoneEntry {
736                        eid,
737                        src_vid: *src,
738                        dst_vid: *dst,
739                        edge_type: *etype,
740                    },
741                );
742                self.edge_versions.insert(eid, version);
743                self.edge_endpoints.remove(&eid);
744                self.edge_properties.remove(&eid);
745                self.graph.remove_edge(eid);
746                self.mutation_count += 1;
747                self.mutation_stats.relationships_deleted += 1;
748            }
749        }
750
751        self.remove_vid_from_label_index(vid);
752        self.vertex_tombstones.insert(vid);
753        self.vertex_properties.remove(&vid);
754        // Deletion supersedes any pending partial-update state.
755        self.vertex_partial_keys.remove(&vid);
756        self.vertex_versions.insert(vid, version);
757        self.graph.remove_vertex(vid);
758        self.mutation_count += 1;
759        self.mutation_stats.nodes_deleted += 1;
760
761        // Remove constraint index entries for this vertex
762        self.constraint_index.retain(|_, v| *v != vid);
763
764        // 64 bytes per edge tombstone + 8 for vertex tombstone
765        self.estimated_size += cascaded_edges_count * 72 + 8;
766    }
767
768    pub fn insert_edge(
769        &mut self,
770        src_vid: Vid,
771        dst_vid: Vid,
772        edge_type: u32,
773        eid: Eid,
774        properties: Properties,
775        edge_type_name: Option<String>,
776    ) -> Result<()> {
777        self.current_version += 1;
778        let now = now_nanos();
779
780        if let Some(wal) = &mut self.wal {
781            wal.append(&Mutation::InsertEdge {
782                src_vid,
783                dst_vid,
784                edge_type,
785                eid,
786                version: self.current_version,
787                properties: properties.clone(),
788                edge_type_name: edge_type_name.clone(),
789            })?;
790        }
791
792        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
793
794        // Store edge type name in metadata if provided
795        let type_name_size = if let Some(ref name) = edge_type_name {
796            let size = name.len() + 24;
797            self.edge_types.insert(eid, name.clone());
798            size
799        } else {
800            0
801        };
802
803        // Set timestamps - created_at only set if this is a new edge
804        self.edge_created_at.entry(eid).or_insert(now);
805        self.edge_updated_at.insert(eid, now);
806
807        // A full-row insert supersedes any pending partial-update state
808        // for this EID (Round 12 §A).
809        self.edge_partial_keys.remove(&eid);
810
811        self.estimated_size += type_name_size;
812
813        Ok(())
814    }
815
816    /// Insert an edge's FULL property row plus a touched-keys hint so the
817    /// flush emits only those schema columns via Lance `MergeInsert` on
818    /// the per-edge-type delta tables. Edge analog of
819    /// `insert_vertex_partial_full` (Round 12 §A).
820    #[allow(clippy::too_many_arguments)]
821    pub fn insert_edge_partial_full(
822        &mut self,
823        src_vid: Vid,
824        dst_vid: Vid,
825        edge_type: u32,
826        eid: Eid,
827        properties: Properties,
828        edge_type_name: Option<String>,
829        touched_keys: HashSet<String>,
830    ) -> Result<()> {
831        self.current_version += 1;
832        let now = now_nanos();
833
834        if let Some(wal) = &mut self.wal {
835            wal.append(&Mutation::InsertEdge {
836                src_vid,
837                dst_vid,
838                edge_type,
839                eid,
840                version: self.current_version,
841                properties: properties.clone(),
842                edge_type_name: edge_type_name.clone(),
843            })?;
844        }
845
846        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
847
848        // `apply_edge_insertion` cleared the partial-keys entry as a
849        // safety measure (full-row insert supersedes partial). Re-insert
850        // with the touched-keys hint so the flush emits a partial source.
851        self.edge_partial_keys
852            .entry(eid)
853            .or_default()
854            .extend(touched_keys);
855
856        let type_name_size = if let Some(ref name) = edge_type_name {
857            let size = name.len() + 24;
858            self.edge_types.insert(eid, name.clone());
859            size
860        } else {
861            0
862        };
863
864        self.edge_created_at.entry(eid).or_insert(now);
865        self.edge_updated_at.insert(eid, now);
866
867        self.estimated_size += type_name_size;
868
869        Ok(())
870    }
871
872    /// Core edge insertion logic: add vertices, add edge, merge properties, update metadata.
873    ///
874    /// Shared between `insert_edge` (live mutations) and `replay_mutations` (WAL recovery).
875    ///
876    /// # Errors
877    ///
878    /// Returns error if either endpoint vertex has been deleted (exists in vertex_tombstones).
879    /// This prevents "ghost vertex" resurrection via edge insertion. See issue #77.
880    fn apply_edge_insertion(
881        &mut self,
882        src_vid: Vid,
883        dst_vid: Vid,
884        edge_type: u32,
885        eid: Eid,
886        properties: Properties,
887    ) -> Result<()> {
888        let version = self.current_version;
889
890        // Check if either endpoint has been deleted. Inserting an edge to a deleted
891        // vertex would resurrect it as a "ghost vertex" with no properties. See issue #77.
892        if self.vertex_tombstones.contains(&src_vid) {
893            anyhow::bail!(
894                "Cannot insert edge: source vertex {} has been deleted (issue #77)",
895                src_vid
896            );
897        }
898        if self.vertex_tombstones.contains(&dst_vid) {
899            anyhow::bail!(
900                "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
901                dst_vid
902            );
903        }
904
905        // Add vertices to graph topology if they don't exist.
906        // IMPORTANT: Only add to graph structure, do NOT call insert_vertex.
907        // insert_vertex creates a new version with empty properties, which would
908        // cause MVCC to pick the empty version as "latest", losing original properties.
909        if !self.graph.contains_vertex(src_vid) {
910            self.graph.add_vertex(src_vid);
911        }
912        if !self.graph.contains_vertex(dst_vid) {
913            self.graph.add_vertex(dst_vid);
914        }
915
916        self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
917
918        // Store metadata with CRDT merge logic
919        let props_size = Self::estimate_properties_size(&properties);
920        let props_count = properties.len();
921        if !properties.is_empty() {
922            let entry = self.edge_properties.entry(eid).or_default();
923            Self::merge_crdt_properties(entry, properties);
924        }
925
926        self.edge_versions.insert(eid, version);
927        self.edge_endpoints
928            .insert(eid, (src_vid, dst_vid, edge_type));
929        self.tombstones.remove(&eid);
930        self.mutation_count += 1;
931        self.mutation_stats.relationships_created += 1;
932        self.mutation_stats.properties_set += props_count;
933
934        // 24 edge + props + 16 version + 28 endpoints + 32 timestamps
935        self.estimated_size += 24 + props_size + 16 + 28 + 32;
936
937        Ok(())
938    }
939
940    pub fn delete_edge(
941        &mut self,
942        eid: Eid,
943        src_vid: Vid,
944        dst_vid: Vid,
945        edge_type: u32,
946    ) -> Result<()> {
947        self.current_version += 1;
948        let now = now_nanos();
949
950        if let Some(wal) = &mut self.wal {
951            wal.append(&Mutation::DeleteEdge {
952                eid,
953                src_vid,
954                dst_vid,
955                edge_type,
956                version: self.current_version,
957            })?;
958        }
959
960        self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
961
962        // Update timestamp - deletion is an update
963        self.edge_updated_at.insert(eid, now);
964
965        Ok(())
966    }
967
968    /// Core edge deletion logic: tombstone the edge, update version, remove from graph.
969    ///
970    /// Shared between `delete_edge` (live mutations) and `replay_mutations` (WAL recovery).
971    fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
972        let version = self.current_version;
973
974        self.tombstones.insert(
975            eid,
976            TombstoneEntry {
977                eid,
978                src_vid,
979                dst_vid,
980                edge_type,
981            },
982        );
983        self.edge_versions.insert(eid, version);
984        // Deletion supersedes any pending partial-update state for this
985        // EID (Round 12 §A).
986        self.edge_partial_keys.remove(&eid);
987        self.graph.remove_edge(eid);
988        self.mutation_count += 1;
989        self.mutation_stats.relationships_deleted += 1;
990
991        // 64 bytes tombstone + 16 bytes version
992        self.estimated_size += 80;
993    }
994
995    /// Returns neighbors in the specified direction.
996    /// O(degree) complexity - iterates only edges connected to the vertex.
997    pub fn get_neighbors(
998        &self,
999        vid: Vid,
1000        edge_type: u32,
1001        direction: Direction,
1002    ) -> Vec<(Vid, Eid, u64)> {
1003        let edges = self.graph.neighbors(vid, direction);
1004
1005        edges
1006            .iter()
1007            .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
1008            .map(|e| {
1009                let neighbor = match direction {
1010                    Direction::Outgoing => e.dst_vid,
1011                    Direction::Incoming => e.src_vid,
1012                };
1013                let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
1014                (neighbor, e.eid, version)
1015            })
1016            .collect()
1017    }
1018
1019    pub fn is_tombstoned(&self, eid: Eid) -> bool {
1020        self.tombstones.contains_key(&eid)
1021    }
1022
1023    /// Returns all VIDs in vertex_labels that match the given label name.
1024    /// O(1) lookup via the reverse label index.
1025    pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
1026        self.label_to_vids
1027            .get(label_name)
1028            .map(|set| set.iter().copied().collect())
1029            .unwrap_or_default()
1030    }
1031
1032    /// Returns all vertex VIDs in the L0 buffer.
1033    ///
1034    /// Used for schemaless scanning (MATCH (n) without label).
1035    pub fn all_vertex_vids(&self) -> Vec<Vid> {
1036        self.vertex_properties.keys().copied().collect()
1037    }
1038
1039    /// Returns all VIDs in vertex_labels that match any of the given label names.
1040    /// Uses the reverse label index — O(sum of matching set sizes).
1041    pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1042        let mut result = HashSet::new();
1043        for label_name in label_names {
1044            if let Some(set) = self.label_to_vids.get(*label_name) {
1045                result.extend(set.iter().copied());
1046            }
1047        }
1048        result.into_iter().collect()
1049    }
1050
1051    /// Returns all VIDs that have ALL specified labels.
1052    /// Uses the reverse label index — intersects the per-label sets.
1053    pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1054        if label_names.is_empty() {
1055            return Vec::new();
1056        }
1057        // Collect the per-label sets; if any label is missing from the index,
1058        // the intersection is empty.
1059        let sets: Vec<&HashSet<Vid>> = match label_names
1060            .iter()
1061            .map(|ln| self.label_to_vids.get(*ln))
1062            .collect::<Option<Vec<_>>>()
1063        {
1064            Some(s) => s,
1065            None => return Vec::new(),
1066        };
1067        // Start from the smallest set for efficiency.
1068        let smallest = sets.iter().min_by_key(|s| s.len()).unwrap();
1069        smallest
1070            .iter()
1071            .copied()
1072            .filter(|vid| sets.iter().all(|s| s.contains(vid)))
1073            .collect()
1074    }
1075
1076    /// Gets the labels for a VID.
1077    pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
1078        self.vertex_labels.get(&vid).map(|v| v.as_slice())
1079    }
1080
1081    /// Gets the edge type for an EID.
1082    pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
1083        self.edge_types.get(&eid).map(|s| s.as_str())
1084    }
1085
1086    /// Returns all EIDs in edge_types that match the given type name.
1087    /// Used for L0 overlay during schemaless edge scanning.
1088    pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
1089        self.edge_types
1090            .iter()
1091            .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
1092            .map(|(eid, _)| *eid)
1093            .collect()
1094    }
1095
1096    /// Returns all edge EIDs in the L0 buffer (non-tombstoned).
1097    ///
1098    /// Used for schemaless scanning (`MATCH ()-[r]->()`) without type.
1099    pub fn all_edge_eids(&self) -> Vec<Eid> {
1100        self.edge_endpoints
1101            .keys()
1102            .filter(|eid| !self.tombstones.contains_key(eid))
1103            .copied()
1104            .collect()
1105    }
1106
1107    /// Returns edge endpoint data (src_vid, dst_vid) for an EID.
1108    pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
1109        self.edge_endpoints
1110            .get(&eid)
1111            .map(|(src, dst, _)| (*src, *dst))
1112    }
1113
1114    /// Returns full edge endpoint data (src_vid, dst_vid, edge_type_id) for an EID.
1115    pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
1116        self.edge_endpoints.get(&eid).copied()
1117    }
1118
1119    /// Insert a constraint key into the index for O(1) duplicate detection.
1120    pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
1121        self.constraint_index.insert(key, vid);
1122    }
1123
1124    /// Check if a constraint key exists in the index, excluding a specific VID.
1125    /// Returns true if the key exists and is owned by a different vertex.
1126    pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1127        self.constraint_index
1128            .get(key)
1129            .is_some_and(|&v| v != exclude_vid)
1130    }
1131
1132    #[instrument(skip(self, other), level = "trace")]
1133    pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
1134        trace!(
1135            other_mutation_count = other.mutation_count,
1136            "Merging L0 buffer"
1137        );
1138        // Merge Vertices
1139        for &vid in &other.vertex_tombstones {
1140            self.delete_vertex(vid)?;
1141        }
1142
1143        for (vid, props) in &other.vertex_properties {
1144            let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1145            // skip_wal=true: the caller (commit_transaction_l0) already wrote
1146            // these mutations to WAL before invoking merge.
1147            self.insert_vertex_with_labels_impl(*vid, props.clone(), &labels, true);
1148        }
1149
1150        // Merge vertex labels that might not have properties
1151        for (vid, labels) in &other.vertex_labels {
1152            if !self.vertex_labels.contains_key(vid) {
1153                self.vertex_labels.insert(*vid, labels.clone());
1154                for label in labels {
1155                    self.label_to_vids
1156                        .entry(label.clone())
1157                        .or_default()
1158                        .insert(*vid);
1159                }
1160            }
1161        }
1162
1163        // Label-overwrite pass: a `SET n:Label` / `REMOVE n:Label` resolved the
1164        // FULL new label set into `other.vertex_labels[vid]` and flagged the vid.
1165        // REPLACE (not append) so removals actually remove and an existing
1166        // vertex's label change lands — overriding any append from the property
1167        // loop above. Skip vids deleted in the same commit. (The append loops
1168        // stay correct for property-path label unions, which are NOT flagged.)
1169        for vid in &other.vertex_label_overwrites {
1170            if other.vertex_tombstones.contains(vid) {
1171                continue;
1172            }
1173            let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1174            self.remove_vid_from_label_index(*vid);
1175            self.vertex_labels.insert(*vid, labels.clone());
1176            self.index_labels_for_vid(*vid, &labels);
1177        }
1178
1179        // Merge Edges - insert all edges from edge_endpoints, using empty props if none exist
1180        for (eid, (src, dst, etype)) in &other.edge_endpoints {
1181            if other.tombstones.contains_key(eid) {
1182                self.delete_edge(*eid, *src, *dst, *etype)?;
1183            } else {
1184                let props = other.edge_properties.get(eid).cloned().unwrap_or_default();
1185                let etype_name = other.edge_types.get(eid).cloned();
1186                self.insert_edge(*src, *dst, *etype, *eid, props, etype_name)?;
1187            }
1188        }
1189
1190        // Merge tombstones for edges that only exist in the target buffer (self),
1191        // not in the source buffer's edge_endpoints.  Without this, transaction
1192        // DELETEs of pre-existing edges are silently lost on commit.
1193        for (eid, tombstone) in &other.tombstones {
1194            if !other.edge_endpoints.contains_key(eid) {
1195                self.delete_edge(
1196                    *eid,
1197                    tombstone.src_vid,
1198                    tombstone.dst_vid,
1199                    tombstone.edge_type,
1200                )?;
1201            }
1202        }
1203
1204        // Edge types are now merged inside insert_edge, so no separate loop needed
1205
1206        // Merge timestamps - preserve semantics of or_insert (keep oldest created_at)
1207        // and insert (use latest updated_at)
1208        for (vid, ts) in &other.vertex_created_at {
1209            self.vertex_created_at.entry(*vid).or_insert(*ts); // keep oldest
1210        }
1211        for (vid, ts) in &other.vertex_updated_at {
1212            self.vertex_updated_at.insert(*vid, *ts); // use latest (tx wins)
1213        }
1214
1215        for (eid, ts) in &other.edge_created_at {
1216            self.edge_created_at.entry(*eid).or_insert(*ts); // keep oldest
1217        }
1218        for (eid, ts) in &other.edge_updated_at {
1219            self.edge_updated_at.insert(*eid, *ts); // use latest (tx wins)
1220        }
1221
1222        // Conservatively add other's estimated size (may overcount due to
1223        // deduplication, but that's safe for a memory limit).
1224        self.estimated_size += other.estimated_size;
1225
1226        // Merge constraint index
1227        for (key, vid) in &other.constraint_index {
1228            self.constraint_index.insert(key.clone(), *vid);
1229        }
1230
1231        Ok(())
1232    }
1233
1234    /// Replay mutations from WAL without re-logging them.
1235    /// Used during startup recovery to restore L0 state from persisted WAL.
1236    /// Uses CRDT merge semantics to ensure recovered state matches pre-crash state.
1237    #[instrument(skip(self, mutations), level = "debug")]
1238    pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
1239        trace!(count = mutations.len(), "Replaying mutations");
1240        for mutation in mutations {
1241            match mutation {
1242                Mutation::InsertVertex {
1243                    vid,
1244                    properties,
1245                    labels,
1246                } => {
1247                    // Apply without WAL logging, with CRDT merge semantics
1248                    self.current_version += 1;
1249                    let version = self.current_version;
1250
1251                    self.vertex_tombstones.remove(&vid);
1252                    let entry = self.vertex_properties.entry(vid).or_default();
1253                    Self::merge_crdt_properties(entry, properties);
1254                    self.vertex_versions.insert(vid, version);
1255                    self.graph.add_vertex(vid);
1256                    self.mutation_count += 1;
1257
1258                    // Restore vertex labels from WAL
1259                    let existing = self.vertex_labels.entry(vid).or_default();
1260                    Self::append_unique_labels(existing, &labels);
1261                    for label in &labels {
1262                        self.label_to_vids
1263                            .entry(label.clone())
1264                            .or_default()
1265                            .insert(vid);
1266                    }
1267                }
1268                Mutation::DeleteVertex { vid, labels } => {
1269                    self.current_version += 1;
1270                    // Restore labels BEFORE apply_vertex_deletion
1271                    if !labels.is_empty() {
1272                        let existing = self.vertex_labels.entry(vid).or_default();
1273                        Self::append_unique_labels(existing, &labels);
1274                        for label in &labels {
1275                            self.label_to_vids
1276                                .entry(label.clone())
1277                                .or_default()
1278                                .insert(vid);
1279                        }
1280                    }
1281                    self.apply_vertex_deletion(vid);
1282                }
1283                Mutation::SetVertexLabels { vid, labels } => {
1284                    // REPLACE the vid's full label set (a label-only mutation
1285                    // resolved the complete set). Replace, not append, so a
1286                    // replayed removal removes; clears the old reverse-index
1287                    // entries first.
1288                    self.current_version += 1;
1289                    self.remove_vid_from_label_index(vid);
1290                    self.vertex_labels.insert(vid, labels.clone());
1291                    self.index_labels_for_vid(vid, &labels);
1292                    self.mutation_count += 1;
1293                }
1294                Mutation::InsertEdge {
1295                    src_vid,
1296                    dst_vid,
1297                    edge_type,
1298                    eid,
1299                    version: _,
1300                    properties,
1301                    edge_type_name,
1302                } => {
1303                    self.current_version += 1;
1304                    self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
1305                    // Restore edge type name metadata if present
1306                    if let Some(name) = edge_type_name {
1307                        self.edge_types.insert(eid, name);
1308                    }
1309                }
1310                Mutation::DeleteEdge {
1311                    eid,
1312                    src_vid,
1313                    dst_vid,
1314                    edge_type,
1315                    version: _,
1316                } => {
1317                    self.current_version += 1;
1318                    self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1319                }
1320            }
1321        }
1322        Ok(())
1323    }
1324}
1325
1326#[cfg(test)]
1327mod tests {
1328    use super::*;
1329
1330    #[test]
1331    fn test_l0_buffer_ops() -> Result<()> {
1332        let mut l0 = L0Buffer::new(0, None);
1333        let vid_a = Vid::new(1);
1334        let vid_b = Vid::new(2);
1335        let eid_ab = Eid::new(101);
1336
1337        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1338
1339        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1340        assert_eq!(neighbors.len(), 1);
1341        assert_eq!(neighbors[0].0, vid_b);
1342        assert_eq!(neighbors[0].1, eid_ab);
1343
1344        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1345        assert!(l0.is_tombstoned(eid_ab));
1346
1347        // Verify neighbors are empty after deletion
1348        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1349        assert_eq!(neighbors_after.len(), 0);
1350
1351        Ok(())
1352    }
1353
1354    #[test]
1355    fn test_l0_buffer_multiple_edges() -> Result<()> {
1356        let mut l0 = L0Buffer::new(0, None);
1357        let vid_a = Vid::new(1);
1358        let vid_b = Vid::new(2);
1359        let vid_c = Vid::new(3);
1360        let eid_ab = Eid::new(101);
1361        let eid_ac = Eid::new(102);
1362
1363        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1364        l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
1365
1366        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1367        assert_eq!(neighbors.len(), 2);
1368
1369        // Delete one edge
1370        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1371
1372        // Should still have one neighbor
1373        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1374        assert_eq!(neighbors_after.len(), 1);
1375        assert_eq!(neighbors_after[0].0, vid_c);
1376
1377        Ok(())
1378    }
1379
1380    #[test]
1381    fn test_l0_buffer_edge_type_filter() -> Result<()> {
1382        let mut l0 = L0Buffer::new(0, None);
1383        let vid_a = Vid::new(1);
1384        let vid_b = Vid::new(2);
1385        let vid_c = Vid::new(3);
1386        let eid_ab = Eid::new(101);
1387        let eid_ac = Eid::new(201); // Different edge type
1388
1389        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1390        l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
1391
1392        // Filter by edge type 1
1393        let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1394        assert_eq!(type1_neighbors.len(), 1);
1395        assert_eq!(type1_neighbors[0].0, vid_b);
1396
1397        // Filter by edge type 2
1398        let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
1399        assert_eq!(type2_neighbors.len(), 1);
1400        assert_eq!(type2_neighbors[0].0, vid_c);
1401
1402        Ok(())
1403    }
1404
1405    #[test]
1406    fn test_l0_buffer_incoming_edges() -> Result<()> {
1407        let mut l0 = L0Buffer::new(0, None);
1408        let vid_a = Vid::new(1);
1409        let vid_b = Vid::new(2);
1410        let vid_c = Vid::new(3);
1411        let eid_ab = Eid::new(101);
1412        let eid_cb = Eid::new(102);
1413
1414        // a -> b and c -> b
1415        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1416        l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
1417
1418        // Check incoming edges to b
1419        let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
1420        assert_eq!(incoming.len(), 2);
1421
1422        Ok(())
1423    }
1424
1425    /// Regression test: merge should preserve edges without properties
1426    #[test]
1427    fn test_merge_empty_props_edge() -> Result<()> {
1428        let mut main_l0 = L0Buffer::new(0, None);
1429        let mut tx_l0 = L0Buffer::new(0, None);
1430
1431        let vid_a = Vid::new(1);
1432        let vid_b = Vid::new(2);
1433        let eid_ab = Eid::new(101);
1434
1435        // Insert edge with empty properties in transaction L0
1436        tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1437
1438        // Verify edge exists in tx_l0
1439        assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
1440        assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); // No properties entry
1441
1442        // Merge into main L0
1443        main_l0.merge(&tx_l0)?;
1444
1445        // Edge should exist in main L0 after merge
1446        assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
1447        let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1448        assert_eq!(neighbors.len(), 1);
1449        assert_eq!(neighbors[0].0, vid_b);
1450
1451        Ok(())
1452    }
1453
1454    /// Regression test: WAL replay should use CRDT merge semantics
1455    #[test]
1456    fn test_replay_crdt_merge() -> Result<()> {
1457        use crate::runtime::wal::Mutation;
1458        use serde_json::json;
1459        use uni_common::Value;
1460
1461        let mut l0 = L0Buffer::new(0, None);
1462        let vid = Vid::new(1);
1463
1464        // Create GCounter CRDT values using correct serde format:
1465        // {"t": "gc", "d": {"counts": {...}}}
1466        let counter1: Value = json!({
1467            "t": "gc",
1468            "d": {"counts": {"node1": 5}}
1469        })
1470        .into();
1471        let counter2: Value = json!({
1472            "t": "gc",
1473            "d": {"counts": {"node2": 3}}
1474        })
1475        .into();
1476
1477        // First mutation: insert vertex with counter1
1478        let mut props1 = HashMap::new();
1479        props1.insert("counter".to_string(), counter1.clone());
1480        l0.replay_mutations(vec![Mutation::InsertVertex {
1481            vid,
1482            properties: props1,
1483            labels: vec![],
1484        }])?;
1485
1486        // Second mutation: insert same vertex with counter2 (should merge)
1487        let mut props2 = HashMap::new();
1488        props2.insert("counter".to_string(), counter2.clone());
1489        l0.replay_mutations(vec![Mutation::InsertVertex {
1490            vid,
1491            properties: props2,
1492            labels: vec![],
1493        }])?;
1494
1495        // Verify CRDT was merged (both node1 and node2 counts present)
1496        let stored_props = l0.vertex_properties.get(&vid).unwrap();
1497        let stored_counter = stored_props.get("counter").unwrap();
1498
1499        // Convert back to serde_json::Value for nested access
1500        let stored_json: serde_json::Value = stored_counter.clone().into();
1501        // The merged counter should have both node1: 5 and node2: 3
1502        let data = stored_json.get("d").unwrap();
1503        let counts = data.get("counts").unwrap();
1504        assert_eq!(counts.get("node1"), Some(&json!(5)));
1505        assert_eq!(counts.get("node2"), Some(&json!(3)));
1506
1507        Ok(())
1508    }
1509
1510    #[test]
1511    fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1512        let mut l0_main = L0Buffer::new(0, None);
1513        let mut l0_tx = L0Buffer::new(0, None);
1514        let vid = Vid::new(1);
1515
1516        // Main buffer: insert vertex with timestamp T1
1517        let ts_main_created = 1000;
1518        let ts_main_updated = 1100;
1519        l0_main.insert_vertex(vid, HashMap::new());
1520        l0_main.vertex_created_at.insert(vid, ts_main_created);
1521        l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1522
1523        // Transaction buffer: update same vertex with timestamp T2 (later)
1524        let ts_tx_created = 2000; // should be ignored (main has older created_at)
1525        let ts_tx_updated = 2100; // should win (tx has newer updated_at)
1526        l0_tx.insert_vertex(vid, HashMap::new());
1527        l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1528        l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1529
1530        // Merge transaction into main
1531        l0_main.merge(&l0_tx)?;
1532
1533        // Verify created_at is oldest (from main)
1534        assert_eq!(
1535            *l0_main.vertex_created_at.get(&vid).unwrap(),
1536            ts_main_created,
1537            "created_at should preserve oldest timestamp"
1538        );
1539
1540        // Verify updated_at is latest (from tx)
1541        assert_eq!(
1542            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1543            ts_tx_updated,
1544            "updated_at should use latest timestamp"
1545        );
1546
1547        Ok(())
1548    }
1549
1550    #[test]
1551    fn test_merge_preserves_edge_timestamps() -> Result<()> {
1552        let mut l0_main = L0Buffer::new(0, None);
1553        let mut l0_tx = L0Buffer::new(0, None);
1554        let vid_a = Vid::new(1);
1555        let vid_b = Vid::new(2);
1556        let eid = Eid::new(100);
1557
1558        // Main buffer: insert edge with timestamp T1
1559        let ts_main_created = 1000;
1560        let ts_main_updated = 1100;
1561        l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1562        l0_main.edge_created_at.insert(eid, ts_main_created);
1563        l0_main.edge_updated_at.insert(eid, ts_main_updated);
1564
1565        // Transaction buffer: update same edge with timestamp T2 (later)
1566        let ts_tx_created = 2000; // should be ignored
1567        let ts_tx_updated = 2100; // should win
1568        l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1569        l0_tx.edge_created_at.insert(eid, ts_tx_created);
1570        l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1571
1572        // Merge transaction into main
1573        l0_main.merge(&l0_tx)?;
1574
1575        // Verify created_at is oldest (from main)
1576        assert_eq!(
1577            *l0_main.edge_created_at.get(&eid).unwrap(),
1578            ts_main_created,
1579            "edge created_at should preserve oldest timestamp"
1580        );
1581
1582        // Verify updated_at is latest (from tx)
1583        assert_eq!(
1584            *l0_main.edge_updated_at.get(&eid).unwrap(),
1585            ts_tx_updated,
1586            "edge updated_at should use latest timestamp"
1587        );
1588
1589        Ok(())
1590    }
1591
1592    #[test]
1593    fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1594        use uni_common::Value;
1595
1596        let mut l0_main = L0Buffer::new(0, None);
1597        let mut l0_tx = L0Buffer::new(0, None);
1598        let vid = Vid::new(1);
1599
1600        // Main buffer: vertex created at T1
1601        let ts_original = 1000;
1602        l0_main.insert_vertex(vid, HashMap::new());
1603        l0_main.vertex_created_at.insert(vid, ts_original);
1604        l0_main.vertex_updated_at.insert(vid, ts_original);
1605
1606        // Transaction buffer: update vertex (created_at would be T2 if set)
1607        let ts_tx = 2000;
1608        let mut props = HashMap::new();
1609        props.insert("updated".to_string(), Value::String("yes".to_string()));
1610        l0_tx.insert_vertex(vid, props);
1611        l0_tx.vertex_created_at.insert(vid, ts_tx);
1612        l0_tx.vertex_updated_at.insert(vid, ts_tx);
1613
1614        // Merge transaction into main
1615        l0_main.merge(&l0_tx)?;
1616
1617        // Verify created_at was NOT overwritten (still T1, not T2)
1618        assert_eq!(
1619            *l0_main.vertex_created_at.get(&vid).unwrap(),
1620            ts_original,
1621            "created_at must not be overwritten for existing vertex"
1622        );
1623
1624        // Verify updated_at WAS updated (now T2)
1625        assert_eq!(
1626            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1627            ts_tx,
1628            "updated_at should reflect transaction timestamp"
1629        );
1630
1631        // Verify properties were merged
1632        assert!(
1633            l0_main
1634                .vertex_properties
1635                .get(&vid)
1636                .unwrap()
1637                .contains_key("updated")
1638        );
1639
1640        Ok(())
1641    }
1642
1643    /// Test for Issue #23: Vertex labels preserved through replay_mutations
1644    #[test]
1645    fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1646        use crate::runtime::wal::Mutation;
1647
1648        let mut l0 = L0Buffer::new(0, None);
1649        let vid = Vid::new(42);
1650
1651        // Create InsertVertex mutation with labels
1652        let mutations = vec![Mutation::InsertVertex {
1653            vid,
1654            properties: {
1655                let mut props = HashMap::new();
1656                props.insert(
1657                    "name".to_string(),
1658                    uni_common::Value::String("Alice".to_string()),
1659                );
1660                props
1661            },
1662            labels: vec!["Person".to_string(), "User".to_string()],
1663        }];
1664
1665        // Replay mutations
1666        l0.replay_mutations(mutations)?;
1667
1668        // Verify vertex exists in L0
1669        assert!(l0.vertex_properties.contains_key(&vid));
1670
1671        // Verify labels are preserved
1672        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1673        assert_eq!(labels.len(), 2);
1674        assert!(labels.contains(&"Person".to_string()));
1675        assert!(labels.contains(&"User".to_string()));
1676
1677        // Verify vertex is findable by label
1678        let person_vids = l0.vids_for_label("Person");
1679        assert_eq!(person_vids.len(), 1);
1680        assert_eq!(person_vids[0], vid);
1681
1682        let user_vids = l0.vids_for_label("User");
1683        assert_eq!(user_vids.len(), 1);
1684        assert_eq!(user_vids[0], vid);
1685
1686        Ok(())
1687    }
1688
1689    /// Test for Issue #23: DeleteVertex labels preserved for tombstone flushing
1690    #[test]
1691    fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
1692        use crate::runtime::wal::Mutation;
1693
1694        let mut l0 = L0Buffer::new(0, None);
1695        let vid = Vid::new(99);
1696
1697        // First insert vertex with labels
1698        l0.insert_vertex_with_labels(
1699            vid,
1700            HashMap::new(),
1701            &["Person".to_string(), "Admin".to_string()],
1702        );
1703
1704        // Verify vertex and labels exist
1705        assert!(l0.vertex_properties.contains_key(&vid));
1706        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1707        assert_eq!(labels.len(), 2);
1708
1709        // Create DeleteVertex mutation with labels
1710        let mutations = vec![Mutation::DeleteVertex {
1711            vid,
1712            labels: vec!["Person".to_string(), "Admin".to_string()],
1713        }];
1714
1715        // Replay deletion
1716        l0.replay_mutations(mutations)?;
1717
1718        // Verify vertex is tombstoned
1719        assert!(l0.vertex_tombstones.contains(&vid));
1720
1721        // Verify labels are preserved in L0 (needed for Issue #76 tombstone flushing)
1722        // The labels should still be accessible for the flush logic to know which tables to update
1723        let labels = l0.get_vertex_labels(vid);
1724        assert!(
1725            labels.is_some(),
1726            "Labels should be preserved even after deletion for tombstone flushing"
1727        );
1728
1729        Ok(())
1730    }
1731
1732    /// Test for Issue #28: Edge type name preserved through replay_mutations
1733    #[test]
1734    fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
1735        use crate::runtime::wal::Mutation;
1736
1737        let mut l0 = L0Buffer::new(0, None);
1738        let src = Vid::new(1);
1739        let dst = Vid::new(2);
1740        let eid = Eid::new(500);
1741        let edge_type = 100;
1742
1743        // Create InsertEdge mutation with edge_type_name
1744        let mutations = vec![Mutation::InsertEdge {
1745            src_vid: src,
1746            dst_vid: dst,
1747            edge_type,
1748            eid,
1749            version: 1,
1750            properties: {
1751                let mut props = HashMap::new();
1752                props.insert("since".to_string(), uni_common::Value::Int(2020));
1753                props
1754            },
1755            edge_type_name: Some("KNOWS".to_string()),
1756        }];
1757
1758        // Replay mutations
1759        l0.replay_mutations(mutations)?;
1760
1761        // Verify edge exists in L0
1762        assert!(l0.edge_endpoints.contains_key(&eid));
1763
1764        // Verify edge type name is preserved
1765        let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
1766        assert_eq!(type_name, "KNOWS");
1767
1768        // Verify edge is findable by type name
1769        let knows_eids = l0.eids_for_type("KNOWS");
1770        assert_eq!(knows_eids.len(), 1);
1771        assert_eq!(knows_eids[0], eid);
1772
1773        Ok(())
1774    }
1775
1776    /// Test for Issue #28: Edge type mapping survives multiple replay cycles
1777    #[test]
1778    fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
1779        use crate::runtime::wal::Mutation;
1780
1781        let mut l0 = L0Buffer::new(0, None);
1782
1783        // Replay multiple edge insertions with different types
1784        let mutations = vec![
1785            Mutation::InsertEdge {
1786                src_vid: Vid::new(1),
1787                dst_vid: Vid::new(2),
1788                edge_type: 100,
1789                eid: Eid::new(1000),
1790                version: 1,
1791                properties: HashMap::new(),
1792                edge_type_name: Some("KNOWS".to_string()),
1793            },
1794            Mutation::InsertEdge {
1795                src_vid: Vid::new(2),
1796                dst_vid: Vid::new(3),
1797                edge_type: 101,
1798                eid: Eid::new(1001),
1799                version: 2,
1800                properties: HashMap::new(),
1801                edge_type_name: Some("LIKES".to_string()),
1802            },
1803            Mutation::InsertEdge {
1804                src_vid: Vid::new(3),
1805                dst_vid: Vid::new(1),
1806                edge_type: 100,
1807                eid: Eid::new(1002),
1808                version: 3,
1809                properties: HashMap::new(),
1810                edge_type_name: Some("KNOWS".to_string()),
1811            },
1812        ];
1813
1814        l0.replay_mutations(mutations)?;
1815
1816        // Verify all edge type mappings are preserved
1817        assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
1818        assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
1819        assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
1820
1821        // Verify edges can be queried by type
1822        let knows_edges = l0.eids_for_type("KNOWS");
1823        assert_eq!(knows_edges.len(), 2);
1824        assert!(knows_edges.contains(&Eid::new(1000)));
1825        assert!(knows_edges.contains(&Eid::new(1002)));
1826
1827        let likes_edges = l0.eids_for_type("LIKES");
1828        assert_eq!(likes_edges.len(), 1);
1829        assert_eq!(likes_edges[0], Eid::new(1001));
1830
1831        Ok(())
1832    }
1833
1834    /// Test for Issue #23 + #28: Combined vertex labels and edge types in replay
1835    #[test]
1836    fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
1837        use crate::runtime::wal::Mutation;
1838
1839        let mut l0 = L0Buffer::new(0, None);
1840        let alice = Vid::new(1);
1841        let bob = Vid::new(2);
1842        let eid = Eid::new(100);
1843
1844        // Simulate crash recovery scenario: replay full transaction log
1845        let mutations = vec![
1846            // Insert Alice with Person label
1847            Mutation::InsertVertex {
1848                vid: alice,
1849                properties: {
1850                    let mut props = HashMap::new();
1851                    props.insert(
1852                        "name".to_string(),
1853                        uni_common::Value::String("Alice".to_string()),
1854                    );
1855                    props
1856                },
1857                labels: vec!["Person".to_string()],
1858            },
1859            // Insert Bob with Person label
1860            Mutation::InsertVertex {
1861                vid: bob,
1862                properties: {
1863                    let mut props = HashMap::new();
1864                    props.insert(
1865                        "name".to_string(),
1866                        uni_common::Value::String("Bob".to_string()),
1867                    );
1868                    props
1869                },
1870                labels: vec!["Person".to_string()],
1871            },
1872            // Create KNOWS edge between them
1873            Mutation::InsertEdge {
1874                src_vid: alice,
1875                dst_vid: bob,
1876                edge_type: 1,
1877                eid,
1878                version: 3,
1879                properties: HashMap::new(),
1880                edge_type_name: Some("KNOWS".to_string()),
1881            },
1882        ];
1883
1884        // Replay all mutations
1885        l0.replay_mutations(mutations)?;
1886
1887        // Verify vertex labels preserved
1888        assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
1889        assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
1890        assert_eq!(l0.vids_for_label("Person").len(), 2);
1891
1892        // Verify edge type name preserved
1893        assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
1894        assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
1895
1896        // Verify graph structure
1897        let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
1898        assert_eq!(alice_neighbors.len(), 1);
1899        assert_eq!(alice_neighbors[0].0, bob);
1900
1901        Ok(())
1902    }
1903
1904    /// Test for Issue #23: Empty labels should deserialize correctly (backward compat)
1905    #[test]
1906    fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
1907        use crate::runtime::wal::Mutation;
1908
1909        let mut l0 = L0Buffer::new(0, None);
1910        let vid = Vid::new(1);
1911
1912        // Simulate old WAL format: InsertVertex with empty labels
1913        // (This tests #[serde(default)] behavior)
1914        let mutations = vec![Mutation::InsertVertex {
1915            vid,
1916            properties: HashMap::new(),
1917            labels: vec![], // Empty labels (old format compatibility)
1918        }];
1919
1920        l0.replay_mutations(mutations)?;
1921
1922        // Vertex should exist
1923        assert!(l0.vertex_properties.contains_key(&vid));
1924
1925        // Labels should be empty but entry should exist in vertex_labels
1926        let labels = l0.get_vertex_labels(vid);
1927        assert!(labels.is_some(), "Labels entry should exist even if empty");
1928        assert_eq!(labels.unwrap().len(), 0);
1929
1930        Ok(())
1931    }
1932
1933    #[test]
1934    fn test_now_nanos_returns_nanosecond_range() {
1935        // Test that now_nanos() returns a value in nanosecond range
1936        // As of 2025, Unix timestamp in nanoseconds should be > 1.7e18
1937        // (2025-01-01 is approximately 1,735,689,600 seconds = 1.735e18 nanoseconds)
1938        let now = now_nanos();
1939
1940        // Verify it's in nanosecond range (not microseconds which would be 1000x smaller)
1941        assert!(
1942            now > 1_700_000_000_000_000_000,
1943            "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
1944            now
1945        );
1946
1947        // Sanity check: should also be less than year 2100 in nanoseconds (4.1e18)
1948        assert!(
1949            now < 4_100_000_000_000_000_000,
1950            "now_nanos() returned {}, expected < 4.1e18",
1951            now
1952        );
1953    }
1954}