Skip to main content

uni_store/runtime/
l0.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15/// Items a read-write transaction observed during execution, used for SSI
16/// read-write antidependency detection at commit.
17///
18/// Shared (via `Arc<Mutex<_>>`) between the read path — which records reads
19/// through the transaction's `QueryContext` — and the commit path, which checks
20/// it against concurrently-committed write-sets. Item-level granularity;
21/// phantoms are out of scope (handled by the `FOR UPDATE` escape hatch).
22#[derive(Debug, Default)]
23pub struct OccReadSet {
24    /// Vertices the transaction read.
25    pub vertices: HashSet<Vid>,
26    /// Edges the transaction read.
27    pub edges: HashSet<Eid>,
28}
29
30impl OccReadSet {
31    /// `true` when nothing has been read yet. Used to decide whether a `FOR
32    /// UPDATE` acquisition may safely re-pin a still-fresh transaction.
33    pub fn is_empty(&self) -> bool {
34        self.vertices.is_empty() && self.edges.is_empty()
35    }
36}
37
38/// Returns the current timestamp in nanoseconds since Unix epoch.
39fn now_nanos() -> i64 {
40    SystemTime::now()
41        .duration_since(UNIX_EPOCH)
42        .map(|d| d.as_nanos() as i64)
43        .unwrap_or(0)
44}
45
46/// Returns the [`Crdt`] a property value encodes, or `None` if it is not one.
47///
48/// `Crdt` is `#[serde(tag = "t", content = "d")]`, so it deserializes only from a
49/// JSON object, and only `Value::Map(_)` produces one — gating on `Map` avoids
50/// allocating a JSON tree for large non-map values (e.g. embedding columns).
51///
52/// This is the single source of truth for "is this value CRDT-mergeable": both
53/// the commit-time merge ([`L0Buffer::merge_crdt_properties`]) and the OCC
54/// write-set carve-out ([`crate::runtime::occ::WriteSet::from_l0`]) consult it,
55/// so the carve-out can never exclude an item the merge would actually overwrite
56/// (which would silently lose an update).
57pub(crate) fn try_as_crdt(v: &Value) -> Option<Crdt> {
58    if !matches!(v, Value::Map(_)) {
59        return None;
60    }
61    serde_json::from_value::<Crdt>(v.clone().into()).ok()
62}
63
64/// Serialize a constraint key for O(1) uniqueness checks.
65/// Format: label + separator + sorted (prop_name, value) pairs.
66pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
67    let mut buf = label.as_bytes().to_vec();
68    buf.push(0); // separator
69    let mut sorted = key_values.to_vec();
70    sorted.sort_by(|a, b| a.0.cmp(&b.0));
71    for (k, v) in &sorted {
72        buf.extend(k.as_bytes());
73        buf.push(0);
74        // Use serde_json serialization for deterministic value encoding
75        buf.extend(serde_json::to_vec(v).unwrap_or_default());
76        buf.push(0);
77    }
78    buf
79}
80
81/// Per-type mutation counters accumulated by the L0 buffer.
82///
83/// Used to provide detailed mutation statistics (e.g., `nodes_created`,
84/// `relationships_deleted`) on `ExecuteResult`. Callers snapshot before/after
85/// execution and call [`diff()`](MutationStats::diff) to get the delta.
86#[derive(Debug, Clone, Default)]
87pub struct MutationStats {
88    pub nodes_created: usize,
89    pub nodes_deleted: usize,
90    pub relationships_created: usize,
91    pub relationships_deleted: usize,
92    pub properties_set: usize,
93    pub properties_removed: usize,
94    pub labels_added: usize,
95    pub labels_removed: usize,
96}
97
98impl MutationStats {
99    /// Compute the field-wise difference `self - before`.
100    pub fn diff(&self, before: &Self) -> Self {
101        Self {
102            nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
103            nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
104            relationships_created: self
105                .relationships_created
106                .saturating_sub(before.relationships_created),
107            relationships_deleted: self
108                .relationships_deleted
109                .saturating_sub(before.relationships_deleted),
110            properties_set: self.properties_set.saturating_sub(before.properties_set),
111            properties_removed: self
112                .properties_removed
113                .saturating_sub(before.properties_removed),
114            labels_added: self.labels_added.saturating_sub(before.labels_added),
115            labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
116        }
117    }
118}
119
120#[derive(Clone, Debug)]
121pub struct TombstoneEntry {
122    pub eid: Eid,
123    pub src_vid: Vid,
124    pub dst_vid: Vid,
125    pub edge_type: u32,
126}
127
128pub struct L0Buffer {
129    /// Graph topology using simple adjacency lists
130    pub graph: SimpleGraph,
131    /// Soft-deleted edges (tombstones for LSM-style merging)
132    pub tombstones: HashMap<Eid, TombstoneEntry>,
133    /// Soft-deleted vertices
134    pub vertex_tombstones: HashSet<Vid>,
135    /// Edge version tracking for MVCC
136    pub edge_versions: HashMap<Eid, u64>,
137    /// Vertex version tracking for MVCC
138    pub vertex_versions: HashMap<Vid, u64>,
139    /// Edge properties (stored separately from topology)
140    pub edge_properties: HashMap<Eid, Properties>,
141    /// Vertex properties (stored separately from topology)
142    pub vertex_properties: HashMap<Vid, Properties>,
143    /// Edge endpoint lookup: eid -> (src, dst, type)
144    pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
145    /// Vertex labels (VID -> list of label names)
146    /// New in storage design: vertices can have multiple labels
147    pub vertex_labels: HashMap<Vid, Vec<String>>,
148    /// Reverse index: label name → set of VIDs with that label. Maintained
149    /// alongside `vertex_labels` for O(1) label-based vertex lookups.
150    pub label_to_vids: HashMap<String, HashSet<Vid>>,
151    /// Vids whose FULL label set was explicitly replaced by a label mutation
152    /// (`SET n:Label` / `REMOVE n:Label`) in this buffer, via
153    /// [`L0Buffer::set_vertex_labels`]. Distinguishes a deliberate label
154    /// replacement from the empty `vertex_labels` entry a property-only write
155    /// incidentally creates (`entry().or_default()`), so `merge` knows to REPLACE
156    /// (not append) these vids' labels and `WriteSet::from_l0` knows they are
157    /// conflictable writes. A transaction-buffer concept; empty on main L0.
158    pub vertex_label_overwrites: HashSet<Vid>,
159    /// Edge types (EID -> type name)
160    pub edge_types: HashMap<Eid, String>,
161    /// Current version counter
162    pub current_version: u64,
163    /// Mutation count for flush decisions
164    pub mutation_count: usize,
165    /// Per-type mutation counters for detailed statistics.
166    pub mutation_stats: MutationStats,
167    /// Write-ahead log for durability
168    pub wal: Option<Arc<WriteAheadLog>>,
169    /// WAL LSN at the time this L0 was rotated for flush.
170    /// Used to ensure WAL truncation doesn't remove entries needed by pending flushes.
171    pub wal_lsn_at_flush: u64,
172    /// WAL LSN at the time this L0 became active (the previous rotation point).
173    ///
174    /// Everything at or below this LSN is durable in L1 before this buffer's own
175    /// data begins; while the buffer is pending flush its committed WAL entries
176    /// live strictly ABOVE it. It is therefore the floor below which WAL
177    /// truncation and a published `wal_high_water_mark` may safely advance —
178    /// using `wal_lsn_at_flush` (the high watermark) there would discard a
179    /// pending buffer's own not-yet-flushed entries (lost-commit on a graceful
180    /// close after a failed flush).
181    pub wal_lsn_at_start: u64,
182    /// Vertex creation timestamps (nanoseconds since epoch)
183    pub vertex_created_at: HashMap<Vid, i64>,
184    /// Vertex update timestamps (nanoseconds since epoch)
185    pub vertex_updated_at: HashMap<Vid, i64>,
186    /// Edge creation timestamps (nanoseconds since epoch)
187    pub edge_created_at: HashMap<Eid, i64>,
188    /// Edge update timestamps (nanoseconds since epoch)
189    pub edge_updated_at: HashMap<Eid, i64>,
190    /// Estimated size in bytes for memory limit enforcement.
191    /// Incremented O(1) on each mutation to avoid O(V+E) size_bytes() calls.
192    pub estimated_size: usize,
193    /// Per-constraint index for O(1) unique key checks.
194    /// Key: constraint composite key (label + sorted property values serialized).
195    /// Value: Vid that owns this key.
196    pub constraint_index: HashMap<Vec<u8>, Vid>,
197    /// Implicit MERGE-key guard for phantom-free `MERGE` *without* a declared
198    /// `UNIQUE` constraint. Same key format as `constraint_index` (built by
199    /// [`serialize_constraint_key`]), but populated only by a `MERGE` that
200    /// *creates* a node, and re-probed at commit only against other concurrent
201    /// `MERGE`-creates — so two concurrent `MERGE`s of the same key converge to
202    /// one node (the loser aborts retriably) instead of silently duplicating,
203    /// while a plain `CREATE` of the same properties is unaffected (it never
204    /// registers a key). Tombstoned with the owning vid; transient (not rebuilt
205    /// on recovery).
206    pub merge_guard_index: HashMap<Vec<u8>, Vid>,
207    /// Reverse index `ext_id` → owning vid for O(1) global ext_id uniqueness
208    /// checks (`Writer::check_extid_globally_unique` previously scanned every
209    /// `vertex_properties` map per insert — O(n²) ingest). Maintained by the
210    /// vertex insert impls (synced to the post-CRDT-merge value) and by
211    /// `apply_vertex_deletion`, so merge and WAL replay keep it consistent
212    /// for free.
213    pub extid_index: HashMap<String, Vid>,
214    /// Per-VID set of property keys that should land via Lance MergeInsert
215    /// (partial-column update) at flush time. Populated by
216    /// `insert_vertex_partial`; cleared by full-row inserts and deletes.
217    /// A VID present here at flush time is emitted to the partial batch;
218    /// absent VIDs flush via the existing full-row Append.
219    pub vertex_partial_keys: HashMap<Vid, HashSet<String>>,
220    /// Edge analog of `vertex_partial_keys` (Round 12 §A). Populated by
221    /// `insert_edge_partial_full`; cleared by full-row inserts and edge
222    /// deletes. Per-edge-type delta-table flush honors these by emitting
223    /// a `MergeInsertBuilder` source with only the touched schema
224    /// columns plus `eid`, `op`, `_version`, `_updated_at`, and
225    /// `overflow_json` (when an overflow prop was touched).
226    pub edge_partial_keys: HashMap<Eid, HashSet<String>>,
227    /// Phase B (UniConfig::defer_embeddings): VIDs whose auto-embedding
228    /// was skipped at insert time and is owed at flush. Value = primary
229    /// label name (the rest of the embedding config is looked up from the
230    /// schema at flush time). Drained by `flush_stream_l1` before column
231    /// extraction; entries are removed when the embedding lands in the
232    /// vertex's L0 property map.
233    pub pending_embeddings: HashMap<Vid, String>,
234    /// Optimistic-concurrency read sequence (SSI). Stamped on a transaction's
235    /// private L0 at creation with the Writer's commit-sequence at that moment,
236    /// and consulted at commit to detect intervening conflicting commits. `0`
237    /// for the main L0 and when SSI is disabled.
238    pub occ_read_seq: u64,
239    /// Optimistic-concurrency read-set (SSI). `Some` on a read-write
240    /// transaction's private L0 when SSI tracking is active; the read path
241    /// records observed ids here and commit checks them for antidependencies.
242    /// `None` for the main L0 and read-only / SSI-disabled paths.
243    pub occ_read_set: Option<Arc<parking_lot::Mutex<OccReadSet>>>,
244}
245
246impl std::fmt::Debug for L0Buffer {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        f.debug_struct("L0Buffer")
249            .field("vertex_count", &self.graph.vertex_count())
250            .field("edge_count", &self.graph.edge_count())
251            .field("tombstones", &self.tombstones.len())
252            .field("vertex_tombstones", &self.vertex_tombstones.len())
253            .field("current_version", &self.current_version)
254            .field("mutation_count", &self.mutation_count)
255            .finish()
256    }
257}
258
259impl Clone for L0Buffer {
260    /// Clone the L0 buffer for fork/restore (ASSUME/ABDUCE).
261    ///
262    /// The cloned buffer does NOT share the WAL reference — forked L0s are
263    /// ephemeral and should not write to the WAL.
264    fn clone(&self) -> Self {
265        Self {
266            graph: self.graph.clone(),
267            tombstones: self.tombstones.clone(),
268            vertex_tombstones: self.vertex_tombstones.clone(),
269            edge_versions: self.edge_versions.clone(),
270            vertex_versions: self.vertex_versions.clone(),
271            edge_properties: self.edge_properties.clone(),
272            vertex_properties: self.vertex_properties.clone(),
273            edge_endpoints: self.edge_endpoints.clone(),
274            vertex_labels: self.vertex_labels.clone(),
275            label_to_vids: self.label_to_vids.clone(),
276            vertex_label_overwrites: self.vertex_label_overwrites.clone(),
277            edge_types: self.edge_types.clone(),
278            current_version: self.current_version,
279            mutation_count: self.mutation_count,
280            mutation_stats: self.mutation_stats.clone(),
281            wal: None, // Forked L0s don't share the WAL
282            wal_lsn_at_flush: self.wal_lsn_at_flush,
283            wal_lsn_at_start: self.wal_lsn_at_start,
284            vertex_created_at: self.vertex_created_at.clone(),
285            vertex_updated_at: self.vertex_updated_at.clone(),
286            edge_created_at: self.edge_created_at.clone(),
287            edge_updated_at: self.edge_updated_at.clone(),
288            estimated_size: self.estimated_size,
289            constraint_index: self.constraint_index.clone(),
290            merge_guard_index: self.merge_guard_index.clone(),
291            extid_index: self.extid_index.clone(),
292            vertex_partial_keys: self.vertex_partial_keys.clone(),
293            edge_partial_keys: self.edge_partial_keys.clone(),
294            pending_embeddings: self.pending_embeddings.clone(),
295            occ_read_seq: self.occ_read_seq,
296            // Forked L0s (ASSUME/ABDUCE) do not participate in OCC tracking.
297            occ_read_set: None,
298        }
299    }
300}
301
302impl L0Buffer {
303    /// Append labels to a vec, skipping duplicates.
304    fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
305        for label in labels {
306            if !existing.contains(label) {
307                existing.push(label.clone());
308            }
309        }
310    }
311
312    /// Add a VID to the reverse label index for each of the given labels.
313    fn index_labels_for_vid(&mut self, vid: Vid, labels: &[String]) {
314        for label in labels {
315            self.label_to_vids
316                .entry(label.clone())
317                .or_default()
318                .insert(vid);
319        }
320    }
321
322    /// Read a vertex's current string `ext_id` from a property map.
323    fn extid_of(props: &Properties) -> Option<String> {
324        props
325            .get("ext_id")
326            .and_then(|v| v.as_str())
327            .map(str::to_owned)
328    }
329
330    /// Sync `extid_index` for `vid` around a property write.
331    ///
332    /// `old` / `new` are the vertex's ext_id before / after the CRDT merge —
333    /// the index always reflects the post-merge value, matching what the old
334    /// full scan in `Writer::check_extid_globally_unique` observed. Only
335    /// called when the incoming properties contain an `ext_id` key (the merge
336    /// cannot change the value otherwise).
337    fn sync_extid_index(&mut self, vid: Vid, old: Option<String>, new: Option<String>) {
338        if old == new {
339            return;
340        }
341        if let Some(old) = old
342            && self.extid_index.get(&old) == Some(&vid)
343        {
344            self.extid_index.remove(&old);
345        }
346        if let Some(new) = new {
347            self.extid_index.insert(new, vid);
348        }
349    }
350
351    /// Remove a VID from all label entries in the reverse index.
352    fn remove_vid_from_label_index(&mut self, vid: Vid) {
353        if let Some(labels) = self.vertex_labels.get(&vid) {
354            for label in labels {
355                if let Some(set) = self.label_to_vids.get_mut(label) {
356                    set.remove(&vid);
357                }
358            }
359        }
360    }
361
362    /// Replaces a vertex's FULL label set — the semantics of `SET n:Label` /
363    /// `REMOVE n:Label`, which resolve the new complete set before writing.
364    ///
365    /// Unlike [`add_vertex_labels`](Self::add_vertex_labels) (append), this clears
366    /// the vid's existing labels from the reverse index, sets the new set, and
367    /// re-indexes — so a removal actually removes. It marks the vid in
368    /// `vertex_label_overwrites` so `merge` REPLACES (not appends) these labels at
369    /// commit and `WriteSet::from_l0` treats the change as a conflictable write.
370    /// Increments `mutation_count` (a label change is a real mutation; its sibling
371    /// `remove_vertex_label` already does so).
372    pub fn set_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
373        self.remove_vid_from_label_index(vid);
374        self.vertex_labels.insert(vid, labels.to_vec());
375        self.index_labels_for_vid(vid, labels);
376        self.vertex_label_overwrites.insert(vid);
377        self.current_version += 1;
378        self.mutation_count += 1;
379    }
380
381    /// Merge CRDT properties into an existing property map.
382    /// Attempts CRDT merge if both values are valid CRDTs, falls back to overwrite.
383    ///
384    /// When the entry is empty (new vertex insert), skips the expensive JSON
385    /// round-trip and directly assigns the properties.
386    ///
387    /// Logs a warning when a CRDT value is overwritten by a non-CRDT scalar
388    /// (limitation R1).
389    fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
390        // Fast path: new vertex with no existing properties — skip JSON round-trip
391        if entry.is_empty() {
392            *entry = properties;
393            return;
394        }
395
396        for (k, v) in properties {
397            // `try_as_crdt` performs the Map-gated CRDT probe (see its docs for the
398            // wide-row perf rationale). Sharing it with `WriteSet::from_l0` keeps the
399            // OCC carve-out consistent with this merge-versus-overwrite decision.
400            if let Some(mut new_crdt) = try_as_crdt(&v)
401                && let Some(existing_v) = entry.get(&k)
402                && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
403            {
404                // Use try_merge to avoid panic on type mismatch.
405                if new_crdt.try_merge(&existing_crdt).is_ok()
406                    && let Ok(merged_json) = serde_json::to_value(new_crdt)
407                {
408                    entry.insert(k, uni_common::Value::from(merged_json));
409                    continue;
410                }
411                // CRDT variant mismatch (or a failed re-serialize): fall through to
412                // a last-writer-wins overwrite, discarding the existing CRDT's
413                // merged state. The OCC commit-time carve-out check
414                // (`occ::crdt_carveout_overwrite`) aborts a *concurrent* writer
415                // before reaching here, and write-time schema enforcement rejects a
416                // wrong declared variant; this warns on any residual (e.g. a
417                // single-writer variant change) so the discarded state is visible.
418                tracing::warn!(
419                    property = %k,
420                    existing_variant = existing_crdt.type_name(),
421                    "overwriting CRDT property with a different CRDT variant \
422                     (last-writer-wins); merged CRDT state is discarded"
423                );
424            } else if try_as_crdt(&v).is_none()
425                && entry.get(&k).is_some_and(|e| try_as_crdt(e).is_some())
426            {
427                // R1: an existing CRDT value is overwritten by a non-CRDT scalar
428                // (last-writer-wins), silently discarding the CRDT's merged state
429                // — a property written as BOTH a CRDT and last-writer-wins. The OCC
430                // write-set carve-out lets CRDT-only writers commit without
431                // conflicting, so a concurrent LWW write on the same property
432                // cannot be flagged as a conflict; surface it here instead.
433                tracing::warn!(
434                    property = %k,
435                    "overwriting CRDT property with non-CRDT value (last-writer-wins); \
436                     merged CRDT state is discarded"
437                );
438            }
439            // Fallback: Overwrite (last-writer-wins).
440            entry.insert(k, v);
441        }
442    }
443
444    /// Helper function to estimate property map size in bytes.
445    fn estimate_properties_size(props: &Properties) -> usize {
446        props.keys().map(|k| k.len() + 32).sum()
447    }
448
449    /// Returns an estimate of the buffer size in bytes.
450    /// Includes all fields for accurate memory accounting.
451    pub fn size_bytes(&self) -> usize {
452        let mut total = 0;
453
454        // Topology
455        total += self.graph.vertex_count() * 8;
456        total += self.graph.edge_count() * 24;
457
458        // Properties (rough estimate: key string + 32 bytes for value)
459        for props in self.vertex_properties.values() {
460            total += Self::estimate_properties_size(props);
461        }
462        for props in self.edge_properties.values() {
463            total += Self::estimate_properties_size(props);
464        }
465
466        // Metadata
467        total += self.tombstones.len() * 64;
468        total += self.vertex_tombstones.len() * 8;
469        total += self.edge_versions.len() * 16;
470        total += self.vertex_versions.len() * 16;
471        total += self.edge_endpoints.len() * 28; // (Vid, Vid, u32) = 8+8+4 + overhead
472
473        // Vertex labels
474        for labels in self.vertex_labels.values() {
475            total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
476        }
477
478        // Reverse label index (label_to_vids)
479        for (label, vids) in &self.label_to_vids {
480            total += label.len() + 24 + vids.len() * 8 + 48; // string + HashSet overhead
481        }
482
483        // Edge types
484        for type_name in self.edge_types.values() {
485            total += type_name.len() + 24;
486        }
487
488        // Timestamps (4 maps, each entry is 16 bytes: 8-byte key + 8-byte i64 value)
489        total += self.vertex_created_at.len() * 16;
490        total += self.vertex_updated_at.len() * 16;
491        total += self.edge_created_at.len() * 16;
492        total += self.edge_updated_at.len() * 16;
493
494        total
495    }
496
497    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
498        Self {
499            graph: SimpleGraph::new(),
500            tombstones: HashMap::new(),
501            vertex_tombstones: HashSet::new(),
502            edge_versions: HashMap::new(),
503            vertex_versions: HashMap::new(),
504            edge_properties: HashMap::new(),
505            vertex_properties: HashMap::new(),
506            edge_endpoints: HashMap::new(),
507            vertex_labels: HashMap::new(),
508            label_to_vids: HashMap::new(),
509            vertex_label_overwrites: HashSet::new(),
510            edge_types: HashMap::new(),
511            current_version: start_version,
512            mutation_count: 0,
513            mutation_stats: MutationStats::default(),
514            wal,
515            wal_lsn_at_flush: 0,
516            wal_lsn_at_start: 0,
517            vertex_created_at: HashMap::new(),
518            vertex_updated_at: HashMap::new(),
519            edge_created_at: HashMap::new(),
520            edge_updated_at: HashMap::new(),
521            estimated_size: 0,
522            constraint_index: HashMap::new(),
523            merge_guard_index: HashMap::new(),
524            extid_index: HashMap::new(),
525            vertex_partial_keys: HashMap::new(),
526            edge_partial_keys: HashMap::new(),
527            pending_embeddings: HashMap::new(),
528            occ_read_seq: 0,
529            occ_read_set: None,
530        }
531    }
532
533    pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
534        self.insert_vertex_with_labels(vid, properties, &[]);
535    }
536
537    /// Insert a vertex with associated labels.
538    pub fn insert_vertex_with_labels(
539        &mut self,
540        vid: Vid,
541        properties: Properties,
542        labels: &[String],
543    ) {
544        self.insert_vertex_with_labels_impl(vid, properties, labels, false);
545    }
546
547    /// Core vertex insertion. When `skip_wal` is true, skips WAL append
548    /// (used during merge where the caller already wrote to WAL).
549    fn insert_vertex_with_labels_impl(
550        &mut self,
551        vid: Vid,
552        properties: Properties,
553        labels: &[String],
554        skip_wal: bool,
555    ) {
556        self.current_version += 1;
557        let version = self.current_version;
558        let now = now_nanos();
559
560        if !skip_wal && let Some(wal) = &self.wal {
561            let _ = wal.append(Mutation::InsertVertex {
562                vid,
563                properties: properties.clone(),
564                labels: labels.to_vec(),
565            });
566        }
567
568        self.vertex_tombstones.remove(&vid);
569
570        // Full-row insert supersedes any pending partial-update state for
571        // this VID.
572        self.vertex_partial_keys.remove(&vid);
573
574        // Size/count computed up front so `properties` can be moved into the
575        // CRDT merge below instead of deep-cloned.
576        let props_size = Self::estimate_properties_size(&properties);
577        let props_count = properties.len();
578        let tracks_extid = properties.contains_key("ext_id");
579
580        let entry = self.vertex_properties.entry(vid).or_default();
581        let old_extid = if tracks_extid {
582            Self::extid_of(entry)
583        } else {
584            None
585        };
586        Self::merge_crdt_properties(entry, properties);
587        if tracks_extid {
588            let new_extid =
589                Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
590            self.sync_extid_index(vid, old_extid, new_extid);
591        }
592        self.vertex_versions.insert(vid, version);
593
594        // Set timestamps - created_at only set if this is a new vertex
595        self.vertex_created_at.entry(vid).or_insert(now);
596        self.vertex_updated_at.insert(vid, now);
597
598        // Track labels — always create an entry so unlabeled vertices are
599        // distinguishable from "not in L0" when queried via get_vertex_labels.
600        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
601        let existing = self.vertex_labels.entry(vid).or_default();
602        Self::append_unique_labels(existing, labels);
603        self.index_labels_for_vid(vid, labels);
604
605        self.graph.add_vertex(vid);
606        self.mutation_count += 1;
607        self.mutation_stats.nodes_created += 1;
608        self.mutation_stats.properties_set += props_count;
609        self.mutation_stats.labels_added += labels.len();
610
611        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
612    }
613
614    /// Insert a vertex's FULL property row, tagging `touched_keys` so the
615    /// flush emits exactly those columns via Lance `MergeInsertBuilder`
616    /// instead of a full-row Append.
617    ///
618    /// `props` MUST be the fully-merged property map (storage union
619    /// in-flight L0 union the new touched values, per
620    /// `PropertyManager::get_all_vertex_props_with_ctx`). The caller is
621    /// responsible for the union; L0 here just stores it so scans see
622    /// the complete row without per-key reconciliation.
623    ///
624    /// `touched_keys` lists the property keys this SET statement
625    /// actually assigned — the union of those across all coalesced
626    /// SetItems on this VID. Lance MergeInsert sends a source batch
627    /// with `_vid`, `_deleted`, `_version`, `_updated_at`, and those
628    /// touched columns; non-touched columns retain their pre-merge
629    /// values on the Lance side, skipping the wide-row write.
630    ///
631    /// A subsequent full-row `insert_vertex_with_labels` or
632    /// `delete_vertex` on the same VID clears the partial-keys entry
633    /// so partial state never outlives a stronger write.
634    pub fn insert_vertex_partial_full(
635        &mut self,
636        vid: Vid,
637        props: Properties,
638        touched_keys: HashSet<String>,
639        labels: &[String],
640    ) {
641        // Stage the full row through the existing partial-impl (same
642        // CRDT merge / version bump / timestamps), preserving the
643        // partial-keys entry so we can extend it below.
644        self.insert_vertex_with_labels_partial_impl(vid, props, labels, false);
645        self.vertex_partial_keys
646            .entry(vid)
647            .or_default()
648            .extend(touched_keys);
649    }
650
651    /// Legacy partial-only variant used by some uni-store paths. Kept
652    /// for source-compatibility but new uni-query callers should use
653    /// `insert_vertex_partial_full` to preserve scan-side L0 visibility.
654    pub fn insert_vertex_partial(&mut self, vid: Vid, touched: Properties, labels: &[String]) {
655        // Record dirty keys BEFORE the full-row impl runs (which would
656        // clear them). The keys come from the touched set; the values
657        // are merged into L0 by the shared CRDT path below.
658        let touched_keys: Vec<String> = touched.keys().cloned().collect();
659
660        // If the VID already has a full-row pending insert (e.g., CREATE
661        // earlier in the same tx), we must NOT downgrade it to partial.
662        // Detected by: VID is in vertex_properties WITH a version stamp
663        // AND not currently in vertex_partial_keys → it was written as
664        // a full row recently. The conservative rule: only enable the
665        // partial path when there's no full-row pending insert. We
666        // approximate "no full-row pending" by checking that the VID's
667        // current entry in vertex_partial_keys is non-empty OR the VID
668        // is not in vertex_properties (fresh row, but caller asked
669        // partial — let it through and the post-flush union covers it).
670        let already_full = self.vertex_properties.contains_key(&vid)
671            && !self.vertex_partial_keys.contains_key(&vid);
672
673        // Stage the CRDT merge through the existing path. We bypass the
674        // full-row `insert_vertex_with_labels_impl` clearing of
675        // partial_keys by inlining the work, then restoring/extending
676        // the partial-key set.
677        self.insert_vertex_with_labels_partial_impl(vid, touched, labels, false);
678
679        if !already_full {
680            self.vertex_partial_keys
681                .entry(vid)
682                .or_default()
683                .extend(touched_keys);
684        }
685    }
686
687    /// Core partial-insert: same as `insert_vertex_with_labels_impl` but
688    /// preserves any existing `vertex_partial_keys[vid]` entry so the
689    /// caller can extend it after the merge.
690    fn insert_vertex_with_labels_partial_impl(
691        &mut self,
692        vid: Vid,
693        properties: Properties,
694        labels: &[String],
695        skip_wal: bool,
696    ) {
697        self.current_version += 1;
698        let version = self.current_version;
699        let now = now_nanos();
700
701        if !skip_wal && let Some(wal) = &self.wal {
702            // WAL records the partial as a full-row InsertVertex; on replay
703            // the full-row path runs (which clears partial_keys). This is
704            // semantically correct — L0 in memory always holds the union of
705            // partial deltas via merge_crdt_properties; recovery doesn't
706            // need to preserve partial-vs-full distinction.
707            let _ = wal.append(Mutation::InsertVertex {
708                vid,
709                properties: properties.clone(),
710                labels: labels.to_vec(),
711            });
712        }
713
714        self.vertex_tombstones.remove(&vid);
715        // NOTE: deliberately DOES NOT remove from vertex_partial_keys.
716        // The caller (`insert_vertex_partial`) extends that set after.
717
718        // Size/count computed up front so `properties` can be moved into the
719        // CRDT merge below instead of deep-cloned.
720        let props_size = Self::estimate_properties_size(&properties);
721        let props_count = properties.len();
722        let tracks_extid = properties.contains_key("ext_id");
723
724        let entry = self.vertex_properties.entry(vid).or_default();
725        let old_extid = if tracks_extid {
726            Self::extid_of(entry)
727        } else {
728            None
729        };
730        Self::merge_crdt_properties(entry, properties);
731        if tracks_extid {
732            let new_extid =
733                Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
734            self.sync_extid_index(vid, old_extid, new_extid);
735        }
736        self.vertex_versions.insert(vid, version);
737
738        self.vertex_created_at.entry(vid).or_insert(now);
739        self.vertex_updated_at.insert(vid, now);
740
741        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
742        let existing = self.vertex_labels.entry(vid).or_default();
743        Self::append_unique_labels(existing, labels);
744        self.index_labels_for_vid(vid, labels);
745
746        self.graph.add_vertex(vid);
747        self.mutation_count += 1;
748        // Partial writes don't create new nodes — they update existing ones.
749        // But counting under properties_set is correct.
750        self.mutation_stats.properties_set += props_count;
751        self.mutation_stats.labels_added += labels.len();
752
753        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
754    }
755
756    /// Add labels to an existing vertex.
757    pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
758        let existing = self.vertex_labels.entry(vid).or_default();
759        Self::append_unique_labels(existing, labels);
760        self.index_labels_for_vid(vid, labels);
761    }
762
763    /// Remove a label from an existing vertex.
764    /// Returns true if the label was found and removed, false otherwise.
765    pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
766        if let Some(labels) = self.vertex_labels.get_mut(&vid)
767            && let Some(pos) = labels.iter().position(|l| l == label)
768        {
769            labels.remove(pos);
770            if let Some(set) = self.label_to_vids.get_mut(label) {
771                set.remove(&vid);
772            }
773            self.current_version += 1;
774            self.mutation_count += 1;
775            self.mutation_stats.labels_removed += 1;
776            // Note: WAL logging for label mutations not yet implemented
777            // Currently consistent with add_vertex_labels behavior
778            return true;
779        }
780        false
781    }
782
783    /// Set the type for an edge.
784    pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
785        self.edge_types.insert(eid, edge_type);
786    }
787
788    pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
789        self.delete_vertex_impl(vid, false)
790    }
791
792    /// Core vertex deletion. When `skip_wal` is true, skips WAL append
793    /// (used during merge where the caller already wrote to WAL).
794    fn delete_vertex_impl(&mut self, vid: Vid, skip_wal: bool) -> Result<()> {
795        self.current_version += 1;
796
797        if !skip_wal && let Some(wal) = &mut self.wal {
798            let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
799            wal.append(Mutation::DeleteVertex { vid, labels })?;
800        }
801
802        self.apply_vertex_deletion(vid);
803        Ok(())
804    }
805
806    /// Cascade-delete a vertex: tombstone all connected edges and remove the vertex.
807    ///
808    /// Shared between `delete_vertex` (live mutations) and `replay_mutations` (WAL recovery).
809    fn apply_vertex_deletion(&mut self, vid: Vid) {
810        let version = self.current_version;
811
812        // Collect edges to delete using O(degree) neighbors() instead of O(E) scan
813        let mut edges_to_remove = HashSet::new();
814
815        // Collect outgoing edges
816        for entry in self.graph.neighbors(vid, Direction::Outgoing) {
817            edges_to_remove.insert(entry.eid);
818        }
819
820        // Collect incoming edges
821        for entry in self.graph.neighbors(vid, Direction::Incoming) {
822            edges_to_remove.insert(entry.eid); // HashSet handles self-loop deduplication
823        }
824
825        let cascaded_edges_count = edges_to_remove.len();
826
827        // Tombstone and remove all collected edges
828        for eid in edges_to_remove {
829            // Retrieve edge endpoints from the map to create tombstone
830            if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
831                self.tombstones.insert(
832                    eid,
833                    TombstoneEntry {
834                        eid,
835                        src_vid: *src,
836                        dst_vid: *dst,
837                        edge_type: *etype,
838                    },
839                );
840                self.edge_versions.insert(eid, version);
841                self.edge_endpoints.remove(&eid);
842                self.edge_properties.remove(&eid);
843                self.graph.remove_edge(eid);
844                self.mutation_count += 1;
845                self.mutation_stats.relationships_deleted += 1;
846            }
847        }
848
849        self.remove_vid_from_label_index(vid);
850        self.vertex_tombstones.insert(vid);
851        // Drop the vid's ext_id index entry (O(1) via its current property
852        // value, read before the property map entry is removed below).
853        if let Some(props) = self.vertex_properties.get(&vid)
854            && let Some(ext) = Self::extid_of(props)
855            && self.extid_index.get(&ext) == Some(&vid)
856        {
857            self.extid_index.remove(&ext);
858        }
859        self.vertex_properties.remove(&vid);
860        // Deletion supersedes any pending partial-update state.
861        self.vertex_partial_keys.remove(&vid);
862        self.vertex_versions.insert(vid, version);
863        self.graph.remove_vertex(vid);
864        self.mutation_count += 1;
865        self.mutation_stats.nodes_deleted += 1;
866
867        // Remove constraint index entries for this vertex
868        self.constraint_index.retain(|_, v| *v != vid);
869        // Same for the implicit MERGE guard, so a later re-MERGE of a deleted
870        // node's key does not false-conflict with the stale entry.
871        self.merge_guard_index.retain(|_, v| *v != vid);
872
873        // 64 bytes per edge tombstone + 8 for vertex tombstone
874        self.estimated_size += cascaded_edges_count * 72 + 8;
875    }
876
877    pub fn insert_edge(
878        &mut self,
879        src_vid: Vid,
880        dst_vid: Vid,
881        edge_type: u32,
882        eid: Eid,
883        properties: Properties,
884        edge_type_name: Option<String>,
885    ) -> Result<()> {
886        self.insert_edge_impl(
887            src_vid,
888            dst_vid,
889            edge_type,
890            eid,
891            properties,
892            edge_type_name,
893            false,
894        )
895    }
896
897    /// Core edge insertion. When `skip_wal` is true, skips WAL append
898    /// (used during merge where the caller already wrote to WAL).
899    #[allow(clippy::too_many_arguments)]
900    fn insert_edge_impl(
901        &mut self,
902        src_vid: Vid,
903        dst_vid: Vid,
904        edge_type: u32,
905        eid: Eid,
906        properties: Properties,
907        edge_type_name: Option<String>,
908        skip_wal: bool,
909    ) -> Result<()> {
910        self.current_version += 1;
911        let now = now_nanos();
912
913        if !skip_wal && let Some(wal) = &mut self.wal {
914            wal.append(Mutation::InsertEdge {
915                src_vid,
916                dst_vid,
917                edge_type,
918                eid,
919                version: self.current_version,
920                properties: properties.clone(),
921                edge_type_name: edge_type_name.clone(),
922            })?;
923        }
924
925        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
926
927        // Store edge type name in metadata if provided
928        let type_name_size = if let Some(ref name) = edge_type_name {
929            let size = name.len() + 24;
930            self.edge_types.insert(eid, name.clone());
931            size
932        } else {
933            0
934        };
935
936        // Set timestamps - created_at only set if this is a new edge
937        self.edge_created_at.entry(eid).or_insert(now);
938        self.edge_updated_at.insert(eid, now);
939
940        // A full-row insert supersedes any pending partial-update state
941        // for this EID (Round 12 §A).
942        self.edge_partial_keys.remove(&eid);
943
944        self.estimated_size += type_name_size;
945
946        Ok(())
947    }
948
949    /// Insert an edge's FULL property row plus a touched-keys hint so the
950    /// flush emits only those schema columns via Lance `MergeInsert` on
951    /// the per-edge-type delta tables. Edge analog of
952    /// `insert_vertex_partial_full` (Round 12 §A).
953    #[allow(clippy::too_many_arguments)]
954    pub fn insert_edge_partial_full(
955        &mut self,
956        src_vid: Vid,
957        dst_vid: Vid,
958        edge_type: u32,
959        eid: Eid,
960        properties: Properties,
961        edge_type_name: Option<String>,
962        touched_keys: HashSet<String>,
963    ) -> Result<()> {
964        self.current_version += 1;
965        let now = now_nanos();
966
967        if let Some(wal) = &mut self.wal {
968            wal.append(Mutation::InsertEdge {
969                src_vid,
970                dst_vid,
971                edge_type,
972                eid,
973                version: self.current_version,
974                properties: properties.clone(),
975                edge_type_name: edge_type_name.clone(),
976            })?;
977        }
978
979        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
980
981        // `apply_edge_insertion` cleared the partial-keys entry as a
982        // safety measure (full-row insert supersedes partial). Re-insert
983        // with the touched-keys hint so the flush emits a partial source.
984        self.edge_partial_keys
985            .entry(eid)
986            .or_default()
987            .extend(touched_keys);
988
989        let type_name_size = if let Some(ref name) = edge_type_name {
990            let size = name.len() + 24;
991            self.edge_types.insert(eid, name.clone());
992            size
993        } else {
994            0
995        };
996
997        self.edge_created_at.entry(eid).or_insert(now);
998        self.edge_updated_at.insert(eid, now);
999
1000        self.estimated_size += type_name_size;
1001
1002        Ok(())
1003    }
1004
1005    /// Core edge insertion logic: add vertices, add edge, merge properties, update metadata.
1006    ///
1007    /// Shared between `insert_edge` (live mutations) and `replay_mutations` (WAL recovery).
1008    ///
1009    /// # Errors
1010    ///
1011    /// Returns error if either endpoint vertex has been deleted (exists in vertex_tombstones).
1012    /// This prevents "ghost vertex" resurrection via edge insertion. See issue #77.
1013    fn apply_edge_insertion(
1014        &mut self,
1015        src_vid: Vid,
1016        dst_vid: Vid,
1017        edge_type: u32,
1018        eid: Eid,
1019        properties: Properties,
1020    ) -> Result<()> {
1021        let version = self.current_version;
1022
1023        // Check if either endpoint has been deleted. Inserting an edge to a deleted
1024        // vertex would resurrect it as a "ghost vertex" with no properties. See issue #77.
1025        if self.vertex_tombstones.contains(&src_vid) {
1026            anyhow::bail!(
1027                "Cannot insert edge: source vertex {} has been deleted (issue #77)",
1028                src_vid
1029            );
1030        }
1031        if self.vertex_tombstones.contains(&dst_vid) {
1032            anyhow::bail!(
1033                "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
1034                dst_vid
1035            );
1036        }
1037
1038        // Add vertices to graph topology if they don't exist.
1039        // IMPORTANT: Only add to graph structure, do NOT call insert_vertex.
1040        // insert_vertex creates a new version with empty properties, which would
1041        // cause MVCC to pick the empty version as "latest", losing original properties.
1042        if !self.graph.contains_vertex(src_vid) {
1043            self.graph.add_vertex(src_vid);
1044        }
1045        if !self.graph.contains_vertex(dst_vid) {
1046            self.graph.add_vertex(dst_vid);
1047        }
1048
1049        self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
1050
1051        // Store metadata with CRDT merge logic
1052        let props_size = Self::estimate_properties_size(&properties);
1053        let props_count = properties.len();
1054        if !properties.is_empty() {
1055            let entry = self.edge_properties.entry(eid).or_default();
1056            Self::merge_crdt_properties(entry, properties);
1057        }
1058
1059        self.edge_versions.insert(eid, version);
1060        self.edge_endpoints
1061            .insert(eid, (src_vid, dst_vid, edge_type));
1062        self.tombstones.remove(&eid);
1063        self.mutation_count += 1;
1064        self.mutation_stats.relationships_created += 1;
1065        self.mutation_stats.properties_set += props_count;
1066
1067        // 24 edge + props + 16 version + 28 endpoints + 32 timestamps
1068        self.estimated_size += 24 + props_size + 16 + 28 + 32;
1069
1070        Ok(())
1071    }
1072
1073    pub fn delete_edge(
1074        &mut self,
1075        eid: Eid,
1076        src_vid: Vid,
1077        dst_vid: Vid,
1078        edge_type: u32,
1079    ) -> Result<()> {
1080        self.delete_edge_impl(eid, src_vid, dst_vid, edge_type, false)
1081    }
1082
1083    /// Core edge deletion. When `skip_wal` is true, skips WAL append
1084    /// (used during merge where the caller already wrote to WAL).
1085    fn delete_edge_impl(
1086        &mut self,
1087        eid: Eid,
1088        src_vid: Vid,
1089        dst_vid: Vid,
1090        edge_type: u32,
1091        skip_wal: bool,
1092    ) -> Result<()> {
1093        self.current_version += 1;
1094        let now = now_nanos();
1095
1096        if !skip_wal && let Some(wal) = &mut self.wal {
1097            wal.append(Mutation::DeleteEdge {
1098                eid,
1099                src_vid,
1100                dst_vid,
1101                edge_type,
1102                version: self.current_version,
1103            })?;
1104        }
1105
1106        self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1107
1108        // Update timestamp - deletion is an update
1109        self.edge_updated_at.insert(eid, now);
1110
1111        Ok(())
1112    }
1113
1114    /// Core edge deletion logic: tombstone the edge, update version, remove from graph.
1115    ///
1116    /// Shared between `delete_edge` (live mutations) and `replay_mutations` (WAL recovery).
1117    fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
1118        let version = self.current_version;
1119
1120        self.tombstones.insert(
1121            eid,
1122            TombstoneEntry {
1123                eid,
1124                src_vid,
1125                dst_vid,
1126                edge_type,
1127            },
1128        );
1129        self.edge_versions.insert(eid, version);
1130        // Deletion supersedes any pending partial-update state for this
1131        // EID (Round 12 §A).
1132        self.edge_partial_keys.remove(&eid);
1133        self.graph.remove_edge(eid);
1134        self.mutation_count += 1;
1135        self.mutation_stats.relationships_deleted += 1;
1136
1137        // 64 bytes tombstone + 16 bytes version
1138        self.estimated_size += 80;
1139    }
1140
1141    /// Returns neighbors in the specified direction.
1142    /// O(degree) complexity - iterates only edges connected to the vertex.
1143    pub fn get_neighbors(
1144        &self,
1145        vid: Vid,
1146        edge_type: u32,
1147        direction: Direction,
1148    ) -> Vec<(Vid, Eid, u64)> {
1149        let edges = self.graph.neighbors(vid, direction);
1150
1151        edges
1152            .iter()
1153            .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
1154            .map(|e| {
1155                let neighbor = match direction {
1156                    Direction::Outgoing => e.dst_vid,
1157                    Direction::Incoming => e.src_vid,
1158                };
1159                let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
1160                (neighbor, e.eid, version)
1161            })
1162            .collect()
1163    }
1164
1165    pub fn is_tombstoned(&self, eid: Eid) -> bool {
1166        self.tombstones.contains_key(&eid)
1167    }
1168
1169    /// Returns all VIDs in vertex_labels that match the given label name.
1170    /// O(1) lookup via the reverse label index.
1171    pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
1172        self.label_to_vids
1173            .get(label_name)
1174            .map(|set| set.iter().copied().collect())
1175            .unwrap_or_default()
1176    }
1177
1178    /// Returns all vertex VIDs in the L0 buffer.
1179    ///
1180    /// Used for schemaless scanning (MATCH (n) without label).
1181    pub fn all_vertex_vids(&self) -> Vec<Vid> {
1182        self.vertex_properties.keys().copied().collect()
1183    }
1184
1185    /// Returns all VIDs in vertex_labels that match any of the given label names.
1186    /// Uses the reverse label index — O(sum of matching set sizes).
1187    pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1188        let mut result = HashSet::new();
1189        for label_name in label_names {
1190            if let Some(set) = self.label_to_vids.get(*label_name) {
1191                result.extend(set.iter().copied());
1192            }
1193        }
1194        result.into_iter().collect()
1195    }
1196
1197    /// Returns all VIDs that have ALL specified labels.
1198    /// Uses the reverse label index — intersects the per-label sets.
1199    pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1200        if label_names.is_empty() {
1201            return Vec::new();
1202        }
1203        // Collect the per-label sets; if any label is missing from the index,
1204        // the intersection is empty.
1205        let sets: Vec<&HashSet<Vid>> = match label_names
1206            .iter()
1207            .map(|ln| self.label_to_vids.get(*ln))
1208            .collect::<Option<Vec<_>>>()
1209        {
1210            Some(s) => s,
1211            None => return Vec::new(),
1212        };
1213        // Start from the smallest set for efficiency.
1214        let smallest = sets.iter().min_by_key(|s| s.len()).unwrap();
1215        smallest
1216            .iter()
1217            .copied()
1218            .filter(|vid| sets.iter().all(|s| s.contains(vid)))
1219            .collect()
1220    }
1221
1222    /// Gets the labels for a VID.
1223    pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
1224        self.vertex_labels.get(&vid).map(|v| v.as_slice())
1225    }
1226
1227    /// Gets the edge type for an EID.
1228    pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
1229        self.edge_types.get(&eid).map(|s| s.as_str())
1230    }
1231
1232    /// Returns all EIDs in edge_types that match the given type name.
1233    /// Used for L0 overlay during schemaless edge scanning.
1234    pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
1235        self.edge_types
1236            .iter()
1237            .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
1238            .map(|(eid, _)| *eid)
1239            .collect()
1240    }
1241
1242    /// Returns all edge EIDs in the L0 buffer (non-tombstoned).
1243    ///
1244    /// Used for schemaless scanning (`MATCH ()-[r]->()`) without type.
1245    pub fn all_edge_eids(&self) -> Vec<Eid> {
1246        self.edge_endpoints
1247            .keys()
1248            .filter(|eid| !self.tombstones.contains_key(eid))
1249            .copied()
1250            .collect()
1251    }
1252
1253    /// Returns edge endpoint data (src_vid, dst_vid) for an EID.
1254    pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
1255        self.edge_endpoints
1256            .get(&eid)
1257            .map(|(src, dst, _)| (*src, *dst))
1258    }
1259
1260    /// Returns full edge endpoint data (src_vid, dst_vid, edge_type_id) for an EID.
1261    pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
1262        self.edge_endpoints.get(&eid).copied()
1263    }
1264
1265    /// Insert a constraint key into the index for O(1) duplicate detection.
1266    pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
1267        self.constraint_index.insert(key, vid);
1268    }
1269
1270    /// Check if a constraint key exists in the index, excluding a specific VID.
1271    /// Returns true if the key exists and is owned by a different vertex.
1272    pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1273        self.constraint_index
1274            .get(key)
1275            .is_some_and(|&v| v != exclude_vid)
1276    }
1277
1278    /// Register a MERGE-create's key into the implicit phantom guard.
1279    pub fn insert_merge_guard_key(&mut self, key: Vec<u8>, vid: Vid) {
1280        self.merge_guard_index.insert(key, vid);
1281    }
1282
1283    /// Check if a MERGE-guard key exists, owned by a different vertex than
1284    /// `exclude_vid` — i.e. a concurrent MERGE already created this key.
1285    pub fn has_merge_guard_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1286        self.merge_guard_index
1287            .get(key)
1288            .is_some_and(|&v| v != exclude_vid)
1289    }
1290
1291    #[instrument(skip(self, other), level = "trace")]
1292    /// Validate that merging `other` into `self` will not bail on a tombstoned
1293    /// edge endpoint (issue #77), **without mutating** either buffer.
1294    ///
1295    /// Mirrors the endpoint-liveness guard in `apply_edge_insertion`
1296    /// against the tombstone state [`Self::merge`] produces: `other`'s vertex
1297    /// deletions are applied and its vertex inserts clear their own tombstone,
1298    /// so an inserted edge bails iff an endpoint is tombstoned in `self` or
1299    /// `other` and is not (re-)inserted by `other`.
1300    ///
1301    /// Run this under `flush_lock` *before* the durable WAL flush so an
1302    /// offending commit is rejected up front. After the flush the transaction
1303    /// is durable, and a `merge` bail would leave a ghost/partial commit whose
1304    /// WAL replay re-bails — rendering the database unopenable.
1305    ///
1306    /// # Errors
1307    ///
1308    /// Returns an error naming the offending edge and endpoint when the merge
1309    /// would bail.
1310    pub fn validate_merge_edge_endpoints(&self, other: &L0Buffer) -> Result<()> {
1311        // An endpoint is effectively deleted after the merge's vertex phase if
1312        // it is tombstoned in either buffer and `other` does not re-insert it
1313        // (an insert clears the tombstone).
1314        let is_deleted = |vid: &Vid| {
1315            (self.vertex_tombstones.contains(vid) || other.vertex_tombstones.contains(vid))
1316                && !other.vertex_properties.contains_key(vid)
1317        };
1318        for (eid, (src_vid, dst_vid, _etype)) in &other.edge_endpoints {
1319            if other.tombstones.contains_key(eid) {
1320                continue; // a deletion, not an insertion — never resurrects a vertex
1321            }
1322            if is_deleted(src_vid) {
1323                anyhow::bail!(
1324                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
1325                    eid,
1326                    src_vid
1327                );
1328            }
1329            if is_deleted(dst_vid) {
1330                anyhow::bail!(
1331                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
1332                    eid,
1333                    dst_vid
1334                );
1335            }
1336        }
1337        Ok(())
1338    }
1339
1340    pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
1341        // Validate-then-apply: reject a merge that would bail on a tombstoned
1342        // edge endpoint before mutating anything, so a failed merge can never
1343        // leave a partially-applied (non-atomic) commit.
1344        self.validate_merge_edge_endpoints(other)?;
1345        self.merge_validated(
1346            other,
1347            other.vertex_properties.clone(),
1348            other.edge_properties.clone(),
1349        )
1350    }
1351
1352    /// Commit-path variant of [`merge`](Self::merge) that consumes `other`'s
1353    /// vertex/edge property maps instead of deep-cloning every row.
1354    ///
1355    /// Everything else in `other` (endpoints, tombstones, versions, labels)
1356    /// is left intact — `commit_transaction_l0` still reads those after the
1357    /// merge. The caller must not rely on `other.vertex_properties` /
1358    /// `other.edge_properties` afterwards, which is safe on the commit path
1359    /// because committing consumes the transaction.
1360    pub fn merge_take(&mut self, other: &mut L0Buffer) -> Result<()> {
1361        // Validate BEFORE draining: the endpoint check consults
1362        // `other.vertex_properties` (the "re-inserted by other" exemption).
1363        self.validate_merge_edge_endpoints(other)?;
1364        let vertex_props = std::mem::take(&mut other.vertex_properties);
1365        let edge_props = std::mem::take(&mut other.edge_properties);
1366        self.merge_validated(other, vertex_props, edge_props)
1367    }
1368
1369    /// Shared merge body. `vertex_props` / `edge_props` are `other`'s property
1370    /// maps, passed by value so rows move instead of clone; the caller has
1371    /// already run `validate_merge_edge_endpoints`.
1372    fn merge_validated(
1373        &mut self,
1374        other: &L0Buffer,
1375        vertex_props: HashMap<Vid, Properties>,
1376        mut edge_props: HashMap<Eid, Properties>,
1377    ) -> Result<()> {
1378        trace!(
1379            other_mutation_count = other.mutation_count,
1380            "Merging L0 buffer"
1381        );
1382        // skip_wal=true throughout: the caller (commit_transaction_l0) already
1383        // wrote every one of these mutations to WAL before invoking merge —
1384        // re-appending here would double the WAL volume per commit.
1385        // Merge Vertices
1386        for &vid in &other.vertex_tombstones {
1387            self.delete_vertex_impl(vid, true)?;
1388        }
1389
1390        for (vid, props) in vertex_props {
1391            let labels = other.vertex_labels.get(&vid).cloned().unwrap_or_default();
1392            self.insert_vertex_with_labels_impl(vid, props, &labels, true);
1393        }
1394
1395        // Merge vertex labels that might not have properties
1396        for (vid, labels) in &other.vertex_labels {
1397            if !self.vertex_labels.contains_key(vid) {
1398                self.vertex_labels.insert(*vid, labels.clone());
1399                for label in labels {
1400                    self.label_to_vids
1401                        .entry(label.clone())
1402                        .or_default()
1403                        .insert(*vid);
1404                }
1405            }
1406        }
1407
1408        // Label-overwrite pass: a `SET n:Label` / `REMOVE n:Label` resolved the
1409        // FULL new label set into `other.vertex_labels[vid]` and flagged the vid.
1410        // REPLACE (not append) so removals actually remove and an existing
1411        // vertex's label change lands — overriding any append from the property
1412        // loop above. Skip vids deleted in the same commit. (The append loops
1413        // stay correct for property-path label unions, which are NOT flagged.)
1414        for vid in &other.vertex_label_overwrites {
1415            if other.vertex_tombstones.contains(vid) {
1416                continue;
1417            }
1418            let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1419            self.remove_vid_from_label_index(*vid);
1420            self.vertex_labels.insert(*vid, labels.clone());
1421            self.index_labels_for_vid(*vid, &labels);
1422            // Carry the overwrite flag into the persistent (main) buffer so a
1423            // pure relabel of a prior-window vid — which is absent from
1424            // `vertex_properties` — is still re-derived at flush (M8). The
1425            // flag is cleared when this buffer is rotated out by the flush.
1426            self.vertex_label_overwrites.insert(*vid);
1427        }
1428
1429        // Merge Edges - insert all edges from edge_endpoints, using empty props if none exist
1430        for (eid, (src, dst, etype)) in &other.edge_endpoints {
1431            if other.tombstones.contains_key(eid) {
1432                self.delete_edge_impl(*eid, *src, *dst, *etype, true)?;
1433            } else {
1434                let props = edge_props.remove(eid).unwrap_or_default();
1435                let etype_name = other.edge_types.get(eid).cloned();
1436                self.insert_edge_impl(*src, *dst, *etype, *eid, props, etype_name, true)?;
1437            }
1438        }
1439
1440        // Merge tombstones for edges that only exist in the target buffer (self),
1441        // not in the source buffer's edge_endpoints.  Without this, transaction
1442        // DELETEs of pre-existing edges are silently lost on commit.
1443        for (eid, tombstone) in &other.tombstones {
1444            if !other.edge_endpoints.contains_key(eid) {
1445                self.delete_edge_impl(
1446                    *eid,
1447                    tombstone.src_vid,
1448                    tombstone.dst_vid,
1449                    tombstone.edge_type,
1450                    true,
1451                )?;
1452            }
1453        }
1454
1455        // Edge types are now merged inside insert_edge, so no separate loop needed
1456
1457        // Merge timestamps - preserve semantics of or_insert (keep oldest created_at)
1458        // and insert (use latest updated_at)
1459        for (vid, ts) in &other.vertex_created_at {
1460            self.vertex_created_at.entry(*vid).or_insert(*ts); // keep oldest
1461        }
1462        for (vid, ts) in &other.vertex_updated_at {
1463            self.vertex_updated_at.insert(*vid, *ts); // use latest (tx wins)
1464        }
1465
1466        for (eid, ts) in &other.edge_created_at {
1467            self.edge_created_at.entry(*eid).or_insert(*ts); // keep oldest
1468        }
1469        for (eid, ts) in &other.edge_updated_at {
1470            self.edge_updated_at.insert(*eid, *ts); // use latest (tx wins)
1471        }
1472
1473        // Conservatively add other's estimated size (may overcount due to
1474        // deduplication, but that's safe for a memory limit).
1475        self.estimated_size += other.estimated_size;
1476
1477        // Merge constraint index
1478        for (key, vid) in &other.constraint_index {
1479            self.constraint_index.insert(key.clone(), *vid);
1480        }
1481
1482        // Merge the implicit MERGE-key guard so a committed MERGE-create is
1483        // visible to a concurrent transaction's commit-time re-probe.
1484        for (key, vid) in &other.merge_guard_index {
1485            self.merge_guard_index.insert(key.clone(), *vid);
1486        }
1487
1488        // Carry deferred-embedding markers from the tx L0 into the main L0 so the
1489        // flush-time `drain_pending_embeddings` sees them (the marked vids' properties were
1490        // just merged above). Without this, `defer_embeddings` auto-embed silently no-ops for
1491        // any transactional write — a pre-existing gap that also affects single-vector
1492        // deferral, surfaced while wiring multi-vector auto-embed (issue #104).
1493        for (vid, label) in &other.pending_embeddings {
1494            self.pending_embeddings.insert(*vid, label.clone());
1495        }
1496
1497        Ok(())
1498    }
1499
1500    /// Replay mutations from WAL without re-logging them.
1501    /// Used during startup recovery to restore L0 state from persisted WAL.
1502    /// Uses CRDT merge semantics to ensure recovered state matches pre-crash state.
1503    #[instrument(skip(self, mutations), level = "debug")]
1504    pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
1505        trace!(count = mutations.len(), "Replaying mutations");
1506        for mutation in mutations {
1507            match mutation {
1508                Mutation::InsertVertex {
1509                    vid,
1510                    properties,
1511                    labels,
1512                } => {
1513                    // Apply without WAL logging, with CRDT merge semantics
1514                    self.current_version += 1;
1515                    let version = self.current_version;
1516
1517                    self.vertex_tombstones.remove(&vid);
1518                    let tracks_extid = properties.contains_key("ext_id");
1519                    let entry = self.vertex_properties.entry(vid).or_default();
1520                    let old_extid = if tracks_extid {
1521                        Self::extid_of(entry)
1522                    } else {
1523                        None
1524                    };
1525                    Self::merge_crdt_properties(entry, properties);
1526                    if tracks_extid {
1527                        let new_extid = Self::extid_of(
1528                            self.vertex_properties.get(&vid).expect("just inserted"),
1529                        );
1530                        self.sync_extid_index(vid, old_extid, new_extid);
1531                    }
1532                    self.vertex_versions.insert(vid, version);
1533                    self.graph.add_vertex(vid);
1534                    self.mutation_count += 1;
1535
1536                    // Restore vertex labels from WAL
1537                    let existing = self.vertex_labels.entry(vid).or_default();
1538                    Self::append_unique_labels(existing, &labels);
1539                    for label in &labels {
1540                        self.label_to_vids
1541                            .entry(label.clone())
1542                            .or_default()
1543                            .insert(vid);
1544                    }
1545                }
1546                Mutation::DeleteVertex { vid, labels } => {
1547                    self.current_version += 1;
1548                    // Restore labels BEFORE apply_vertex_deletion
1549                    if !labels.is_empty() {
1550                        let existing = self.vertex_labels.entry(vid).or_default();
1551                        Self::append_unique_labels(existing, &labels);
1552                        for label in &labels {
1553                            self.label_to_vids
1554                                .entry(label.clone())
1555                                .or_default()
1556                                .insert(vid);
1557                        }
1558                    }
1559                    self.apply_vertex_deletion(vid);
1560                }
1561                Mutation::SetVertexLabels { vid, labels } => {
1562                    // REPLACE the vid's full label set (a label-only mutation
1563                    // resolved the complete set). Replace, not append, so a
1564                    // replayed removal removes; clears the old reverse-index
1565                    // entries first.
1566                    self.current_version += 1;
1567                    self.remove_vid_from_label_index(vid);
1568                    self.vertex_labels.insert(vid, labels.clone());
1569                    self.index_labels_for_vid(vid, &labels);
1570                    self.mutation_count += 1;
1571                }
1572                Mutation::InsertEdge {
1573                    src_vid,
1574                    dst_vid,
1575                    edge_type,
1576                    eid,
1577                    version: _,
1578                    properties,
1579                    edge_type_name,
1580                } => {
1581                    self.current_version += 1;
1582                    // Skip-and-warn on the issue-#77 endpoint bail: a pre-fix
1583                    // durable WAL may hold a ghost edge whose endpoint was
1584                    // tombstoned. Recovery must still open the database rather
1585                    // than abort, so drop the offending edge and continue.
1586                    match self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties) {
1587                        Ok(()) => {
1588                            // Restore edge type name metadata if present
1589                            if let Some(name) = edge_type_name {
1590                                self.edge_types.insert(eid, name);
1591                            }
1592                        }
1593                        Err(e) => {
1594                            tracing::warn!(
1595                                ?eid,
1596                                ?src_vid,
1597                                ?dst_vid,
1598                                error = %e,
1599                                "WAL replay: skipping edge insertion to a deleted endpoint (issue #77)"
1600                            );
1601                        }
1602                    }
1603                }
1604                Mutation::DeleteEdge {
1605                    eid,
1606                    src_vid,
1607                    dst_vid,
1608                    edge_type,
1609                    version: _,
1610                } => {
1611                    self.current_version += 1;
1612                    self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1613                }
1614            }
1615        }
1616        Ok(())
1617    }
1618}
1619
1620#[cfg(test)]
1621mod tests {
1622    use super::*;
1623
1624    #[test]
1625    fn test_l0_buffer_ops() -> Result<()> {
1626        let mut l0 = L0Buffer::new(0, None);
1627        let vid_a = Vid::new(1);
1628        let vid_b = Vid::new(2);
1629        let eid_ab = Eid::new(101);
1630
1631        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1632
1633        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1634        assert_eq!(neighbors.len(), 1);
1635        assert_eq!(neighbors[0].0, vid_b);
1636        assert_eq!(neighbors[0].1, eid_ab);
1637
1638        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1639        assert!(l0.is_tombstoned(eid_ab));
1640
1641        // Verify neighbors are empty after deletion
1642        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1643        assert_eq!(neighbors_after.len(), 0);
1644
1645        Ok(())
1646    }
1647
1648    /// Regression for review #5: merging an edge whose endpoint is tombstoned
1649    /// in the target buffer must be rejected up front (`validate_merge_edge_endpoints`)
1650    /// and `merge` must be atomic — never partially applied. Before the fix this
1651    /// bailed only *inside* `merge`, after the durable WAL flush, leaving a ghost
1652    /// commit that made the database unopenable on replay.
1653    #[test]
1654    fn validate_merge_rejects_edge_to_tombstoned_endpoint() {
1655        let mut main = L0Buffer::new(0, None);
1656        let vid_a = Vid::new(1);
1657        let vid_b = Vid::new(2);
1658        main.insert_vertex(vid_a, HashMap::new());
1659        main.insert_vertex(vid_b, HashMap::new());
1660        main.delete_vertex(vid_b).unwrap(); // B is now tombstoned in main
1661
1662        // A transaction that inserts an edge A -> B (B tombstoned in main).
1663        let mut tx = L0Buffer::new(0, None);
1664        let eid = Eid::new(101);
1665        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1666            .unwrap();
1667
1668        assert!(
1669            main.validate_merge_edge_endpoints(&tx).is_err(),
1670            "edge to a tombstoned endpoint must be rejected before merge"
1671        );
1672        // merge validates first, so it errors and leaves main untouched (atomic).
1673        assert!(
1674            main.merge(&tx).is_err(),
1675            "merge must reject, not bail mid-apply"
1676        );
1677        assert!(
1678            !main.edge_endpoints.contains_key(&eid),
1679            "a rejected merge must not have partially applied the edge"
1680        );
1681    }
1682
1683    /// When the transaction re-inserts the endpoint vertex, the edge is valid
1684    /// (the insert clears the tombstone) and the merge succeeds.
1685    #[test]
1686    fn validate_merge_allows_edge_when_endpoint_reinserted() {
1687        let mut main = L0Buffer::new(0, None);
1688        let vid_a = Vid::new(1);
1689        let vid_b = Vid::new(2);
1690        main.insert_vertex(vid_a, HashMap::new());
1691        main.insert_vertex(vid_b, HashMap::new());
1692        main.delete_vertex(vid_b).unwrap();
1693
1694        let mut tx = L0Buffer::new(0, None);
1695        tx.insert_vertex(vid_b, HashMap::new()); // re-insert B
1696        let eid = Eid::new(101);
1697        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1698            .unwrap();
1699
1700        assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1701        assert!(main.merge(&tx).is_ok());
1702        assert!(main.edge_endpoints.contains_key(&eid));
1703    }
1704
1705    /// Edges between live endpoints merge as before — no false positives.
1706    #[test]
1707    fn validate_merge_allows_edge_to_live_endpoints() {
1708        let mut main = L0Buffer::new(0, None);
1709        let vid_a = Vid::new(1);
1710        let vid_b = Vid::new(2);
1711        main.insert_vertex(vid_a, HashMap::new());
1712        main.insert_vertex(vid_b, HashMap::new());
1713
1714        let mut tx = L0Buffer::new(0, None);
1715        let eid = Eid::new(101);
1716        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1717            .unwrap();
1718
1719        assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1720        assert!(main.merge(&tx).is_ok());
1721        assert!(main.edge_endpoints.contains_key(&eid));
1722    }
1723
1724    #[test]
1725    fn test_l0_buffer_multiple_edges() -> Result<()> {
1726        let mut l0 = L0Buffer::new(0, None);
1727        let vid_a = Vid::new(1);
1728        let vid_b = Vid::new(2);
1729        let vid_c = Vid::new(3);
1730        let eid_ab = Eid::new(101);
1731        let eid_ac = Eid::new(102);
1732
1733        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1734        l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
1735
1736        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1737        assert_eq!(neighbors.len(), 2);
1738
1739        // Delete one edge
1740        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1741
1742        // Should still have one neighbor
1743        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1744        assert_eq!(neighbors_after.len(), 1);
1745        assert_eq!(neighbors_after[0].0, vid_c);
1746
1747        Ok(())
1748    }
1749
1750    #[test]
1751    fn test_l0_buffer_edge_type_filter() -> Result<()> {
1752        let mut l0 = L0Buffer::new(0, None);
1753        let vid_a = Vid::new(1);
1754        let vid_b = Vid::new(2);
1755        let vid_c = Vid::new(3);
1756        let eid_ab = Eid::new(101);
1757        let eid_ac = Eid::new(201); // Different edge type
1758
1759        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1760        l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
1761
1762        // Filter by edge type 1
1763        let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1764        assert_eq!(type1_neighbors.len(), 1);
1765        assert_eq!(type1_neighbors[0].0, vid_b);
1766
1767        // Filter by edge type 2
1768        let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
1769        assert_eq!(type2_neighbors.len(), 1);
1770        assert_eq!(type2_neighbors[0].0, vid_c);
1771
1772        Ok(())
1773    }
1774
1775    #[test]
1776    fn test_l0_buffer_incoming_edges() -> Result<()> {
1777        let mut l0 = L0Buffer::new(0, None);
1778        let vid_a = Vid::new(1);
1779        let vid_b = Vid::new(2);
1780        let vid_c = Vid::new(3);
1781        let eid_ab = Eid::new(101);
1782        let eid_cb = Eid::new(102);
1783
1784        // a -> b and c -> b
1785        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1786        l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
1787
1788        // Check incoming edges to b
1789        let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
1790        assert_eq!(incoming.len(), 2);
1791
1792        Ok(())
1793    }
1794
1795    /// Regression test: merge should preserve edges without properties
1796    #[test]
1797    fn test_merge_empty_props_edge() -> Result<()> {
1798        let mut main_l0 = L0Buffer::new(0, None);
1799        let mut tx_l0 = L0Buffer::new(0, None);
1800
1801        let vid_a = Vid::new(1);
1802        let vid_b = Vid::new(2);
1803        let eid_ab = Eid::new(101);
1804
1805        // Insert edge with empty properties in transaction L0
1806        tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1807
1808        // Verify edge exists in tx_l0
1809        assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
1810        assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); // No properties entry
1811
1812        // Merge into main L0
1813        main_l0.merge(&tx_l0)?;
1814
1815        // Edge should exist in main L0 after merge
1816        assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
1817        let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1818        assert_eq!(neighbors.len(), 1);
1819        assert_eq!(neighbors[0].0, vid_b);
1820
1821        Ok(())
1822    }
1823
1824    /// Regression test: WAL replay should use CRDT merge semantics
1825    #[test]
1826    fn test_replay_crdt_merge() -> Result<()> {
1827        use crate::runtime::wal::Mutation;
1828        use serde_json::json;
1829        use uni_common::Value;
1830
1831        let mut l0 = L0Buffer::new(0, None);
1832        let vid = Vid::new(1);
1833
1834        // Create GCounter CRDT values using correct serde format:
1835        // {"t": "gc", "d": {"counts": {...}}}
1836        let counter1: Value = json!({
1837            "t": "gc",
1838            "d": {"counts": {"node1": 5}}
1839        })
1840        .into();
1841        let counter2: Value = json!({
1842            "t": "gc",
1843            "d": {"counts": {"node2": 3}}
1844        })
1845        .into();
1846
1847        // First mutation: insert vertex with counter1
1848        let mut props1 = HashMap::new();
1849        props1.insert("counter".to_string(), counter1.clone());
1850        l0.replay_mutations(vec![Mutation::InsertVertex {
1851            vid,
1852            properties: props1,
1853            labels: vec![],
1854        }])?;
1855
1856        // Second mutation: insert same vertex with counter2 (should merge)
1857        let mut props2 = HashMap::new();
1858        props2.insert("counter".to_string(), counter2.clone());
1859        l0.replay_mutations(vec![Mutation::InsertVertex {
1860            vid,
1861            properties: props2,
1862            labels: vec![],
1863        }])?;
1864
1865        // Verify CRDT was merged (both node1 and node2 counts present)
1866        let stored_props = l0.vertex_properties.get(&vid).unwrap();
1867        let stored_counter = stored_props.get("counter").unwrap();
1868
1869        // Convert back to serde_json::Value for nested access
1870        let stored_json: serde_json::Value = stored_counter.clone().into();
1871        // The merged counter should have both node1: 5 and node2: 3
1872        let data = stored_json.get("d").unwrap();
1873        let counts = data.get("counts").unwrap();
1874        assert_eq!(counts.get("node1"), Some(&json!(5)));
1875        assert_eq!(counts.get("node2"), Some(&json!(3)));
1876
1877        Ok(())
1878    }
1879
1880    #[test]
1881    fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1882        let mut l0_main = L0Buffer::new(0, None);
1883        let mut l0_tx = L0Buffer::new(0, None);
1884        let vid = Vid::new(1);
1885
1886        // Main buffer: insert vertex with timestamp T1
1887        let ts_main_created = 1000;
1888        let ts_main_updated = 1100;
1889        l0_main.insert_vertex(vid, HashMap::new());
1890        l0_main.vertex_created_at.insert(vid, ts_main_created);
1891        l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1892
1893        // Transaction buffer: update same vertex with timestamp T2 (later)
1894        let ts_tx_created = 2000; // should be ignored (main has older created_at)
1895        let ts_tx_updated = 2100; // should win (tx has newer updated_at)
1896        l0_tx.insert_vertex(vid, HashMap::new());
1897        l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1898        l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1899
1900        // Merge transaction into main
1901        l0_main.merge(&l0_tx)?;
1902
1903        // Verify created_at is oldest (from main)
1904        assert_eq!(
1905            *l0_main.vertex_created_at.get(&vid).unwrap(),
1906            ts_main_created,
1907            "created_at should preserve oldest timestamp"
1908        );
1909
1910        // Verify updated_at is latest (from tx)
1911        assert_eq!(
1912            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1913            ts_tx_updated,
1914            "updated_at should use latest timestamp"
1915        );
1916
1917        Ok(())
1918    }
1919
1920    #[test]
1921    fn test_merge_preserves_edge_timestamps() -> Result<()> {
1922        let mut l0_main = L0Buffer::new(0, None);
1923        let mut l0_tx = L0Buffer::new(0, None);
1924        let vid_a = Vid::new(1);
1925        let vid_b = Vid::new(2);
1926        let eid = Eid::new(100);
1927
1928        // Main buffer: insert edge with timestamp T1
1929        let ts_main_created = 1000;
1930        let ts_main_updated = 1100;
1931        l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1932        l0_main.edge_created_at.insert(eid, ts_main_created);
1933        l0_main.edge_updated_at.insert(eid, ts_main_updated);
1934
1935        // Transaction buffer: update same edge with timestamp T2 (later)
1936        let ts_tx_created = 2000; // should be ignored
1937        let ts_tx_updated = 2100; // should win
1938        l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1939        l0_tx.edge_created_at.insert(eid, ts_tx_created);
1940        l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1941
1942        // Merge transaction into main
1943        l0_main.merge(&l0_tx)?;
1944
1945        // Verify created_at is oldest (from main)
1946        assert_eq!(
1947            *l0_main.edge_created_at.get(&eid).unwrap(),
1948            ts_main_created,
1949            "edge created_at should preserve oldest timestamp"
1950        );
1951
1952        // Verify updated_at is latest (from tx)
1953        assert_eq!(
1954            *l0_main.edge_updated_at.get(&eid).unwrap(),
1955            ts_tx_updated,
1956            "edge updated_at should use latest timestamp"
1957        );
1958
1959        Ok(())
1960    }
1961
1962    #[test]
1963    fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1964        use uni_common::Value;
1965
1966        let mut l0_main = L0Buffer::new(0, None);
1967        let mut l0_tx = L0Buffer::new(0, None);
1968        let vid = Vid::new(1);
1969
1970        // Main buffer: vertex created at T1
1971        let ts_original = 1000;
1972        l0_main.insert_vertex(vid, HashMap::new());
1973        l0_main.vertex_created_at.insert(vid, ts_original);
1974        l0_main.vertex_updated_at.insert(vid, ts_original);
1975
1976        // Transaction buffer: update vertex (created_at would be T2 if set)
1977        let ts_tx = 2000;
1978        let mut props = HashMap::new();
1979        props.insert("updated".to_string(), Value::String("yes".to_string()));
1980        l0_tx.insert_vertex(vid, props);
1981        l0_tx.vertex_created_at.insert(vid, ts_tx);
1982        l0_tx.vertex_updated_at.insert(vid, ts_tx);
1983
1984        // Merge transaction into main
1985        l0_main.merge(&l0_tx)?;
1986
1987        // Verify created_at was NOT overwritten (still T1, not T2)
1988        assert_eq!(
1989            *l0_main.vertex_created_at.get(&vid).unwrap(),
1990            ts_original,
1991            "created_at must not be overwritten for existing vertex"
1992        );
1993
1994        // Verify updated_at WAS updated (now T2)
1995        assert_eq!(
1996            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1997            ts_tx,
1998            "updated_at should reflect transaction timestamp"
1999        );
2000
2001        // Verify properties were merged
2002        assert!(
2003            l0_main
2004                .vertex_properties
2005                .get(&vid)
2006                .unwrap()
2007                .contains_key("updated")
2008        );
2009
2010        Ok(())
2011    }
2012
2013    /// Test for Issue #23: Vertex labels preserved through replay_mutations
2014    #[test]
2015    fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
2016        use crate::runtime::wal::Mutation;
2017
2018        let mut l0 = L0Buffer::new(0, None);
2019        let vid = Vid::new(42);
2020
2021        // Create InsertVertex mutation with labels
2022        let mutations = vec![Mutation::InsertVertex {
2023            vid,
2024            properties: {
2025                let mut props = HashMap::new();
2026                props.insert(
2027                    "name".to_string(),
2028                    uni_common::Value::String("Alice".to_string()),
2029                );
2030                props
2031            },
2032            labels: vec!["Person".to_string(), "User".to_string()],
2033        }];
2034
2035        // Replay mutations
2036        l0.replay_mutations(mutations)?;
2037
2038        // Verify vertex exists in L0
2039        assert!(l0.vertex_properties.contains_key(&vid));
2040
2041        // Verify labels are preserved
2042        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2043        assert_eq!(labels.len(), 2);
2044        assert!(labels.contains(&"Person".to_string()));
2045        assert!(labels.contains(&"User".to_string()));
2046
2047        // Verify vertex is findable by label
2048        let person_vids = l0.vids_for_label("Person");
2049        assert_eq!(person_vids.len(), 1);
2050        assert_eq!(person_vids[0], vid);
2051
2052        let user_vids = l0.vids_for_label("User");
2053        assert_eq!(user_vids.len(), 1);
2054        assert_eq!(user_vids[0], vid);
2055
2056        Ok(())
2057    }
2058
2059    /// Test for Issue #23: DeleteVertex labels preserved for tombstone flushing
2060    #[test]
2061    fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
2062        use crate::runtime::wal::Mutation;
2063
2064        let mut l0 = L0Buffer::new(0, None);
2065        let vid = Vid::new(99);
2066
2067        // First insert vertex with labels
2068        l0.insert_vertex_with_labels(
2069            vid,
2070            HashMap::new(),
2071            &["Person".to_string(), "Admin".to_string()],
2072        );
2073
2074        // Verify vertex and labels exist
2075        assert!(l0.vertex_properties.contains_key(&vid));
2076        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2077        assert_eq!(labels.len(), 2);
2078
2079        // Create DeleteVertex mutation with labels
2080        let mutations = vec![Mutation::DeleteVertex {
2081            vid,
2082            labels: vec!["Person".to_string(), "Admin".to_string()],
2083        }];
2084
2085        // Replay deletion
2086        l0.replay_mutations(mutations)?;
2087
2088        // Verify vertex is tombstoned
2089        assert!(l0.vertex_tombstones.contains(&vid));
2090
2091        // Verify labels are preserved in L0 (needed for Issue #76 tombstone flushing)
2092        // The labels should still be accessible for the flush logic to know which tables to update
2093        let labels = l0.get_vertex_labels(vid);
2094        assert!(
2095            labels.is_some(),
2096            "Labels should be preserved even after deletion for tombstone flushing"
2097        );
2098
2099        Ok(())
2100    }
2101
2102    /// Test for Issue #28: Edge type name preserved through replay_mutations
2103    #[test]
2104    fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
2105        use crate::runtime::wal::Mutation;
2106
2107        let mut l0 = L0Buffer::new(0, None);
2108        let src = Vid::new(1);
2109        let dst = Vid::new(2);
2110        let eid = Eid::new(500);
2111        let edge_type = 100;
2112
2113        // Create InsertEdge mutation with edge_type_name
2114        let mutations = vec![Mutation::InsertEdge {
2115            src_vid: src,
2116            dst_vid: dst,
2117            edge_type,
2118            eid,
2119            version: 1,
2120            properties: {
2121                let mut props = HashMap::new();
2122                props.insert("since".to_string(), uni_common::Value::Int(2020));
2123                props
2124            },
2125            edge_type_name: Some("KNOWS".to_string()),
2126        }];
2127
2128        // Replay mutations
2129        l0.replay_mutations(mutations)?;
2130
2131        // Verify edge exists in L0
2132        assert!(l0.edge_endpoints.contains_key(&eid));
2133
2134        // Verify edge type name is preserved
2135        let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
2136        assert_eq!(type_name, "KNOWS");
2137
2138        // Verify edge is findable by type name
2139        let knows_eids = l0.eids_for_type("KNOWS");
2140        assert_eq!(knows_eids.len(), 1);
2141        assert_eq!(knows_eids[0], eid);
2142
2143        Ok(())
2144    }
2145
2146    /// Test for Issue #28: Edge type mapping survives multiple replay cycles
2147    #[test]
2148    fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
2149        use crate::runtime::wal::Mutation;
2150
2151        let mut l0 = L0Buffer::new(0, None);
2152
2153        // Replay multiple edge insertions with different types
2154        let mutations = vec![
2155            Mutation::InsertEdge {
2156                src_vid: Vid::new(1),
2157                dst_vid: Vid::new(2),
2158                edge_type: 100,
2159                eid: Eid::new(1000),
2160                version: 1,
2161                properties: HashMap::new(),
2162                edge_type_name: Some("KNOWS".to_string()),
2163            },
2164            Mutation::InsertEdge {
2165                src_vid: Vid::new(2),
2166                dst_vid: Vid::new(3),
2167                edge_type: 101,
2168                eid: Eid::new(1001),
2169                version: 2,
2170                properties: HashMap::new(),
2171                edge_type_name: Some("LIKES".to_string()),
2172            },
2173            Mutation::InsertEdge {
2174                src_vid: Vid::new(3),
2175                dst_vid: Vid::new(1),
2176                edge_type: 100,
2177                eid: Eid::new(1002),
2178                version: 3,
2179                properties: HashMap::new(),
2180                edge_type_name: Some("KNOWS".to_string()),
2181            },
2182        ];
2183
2184        l0.replay_mutations(mutations)?;
2185
2186        // Verify all edge type mappings are preserved
2187        assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
2188        assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
2189        assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
2190
2191        // Verify edges can be queried by type
2192        let knows_edges = l0.eids_for_type("KNOWS");
2193        assert_eq!(knows_edges.len(), 2);
2194        assert!(knows_edges.contains(&Eid::new(1000)));
2195        assert!(knows_edges.contains(&Eid::new(1002)));
2196
2197        let likes_edges = l0.eids_for_type("LIKES");
2198        assert_eq!(likes_edges.len(), 1);
2199        assert_eq!(likes_edges[0], Eid::new(1001));
2200
2201        Ok(())
2202    }
2203
2204    /// Test for Issue #23 + #28: Combined vertex labels and edge types in replay
2205    #[test]
2206    fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
2207        use crate::runtime::wal::Mutation;
2208
2209        let mut l0 = L0Buffer::new(0, None);
2210        let alice = Vid::new(1);
2211        let bob = Vid::new(2);
2212        let eid = Eid::new(100);
2213
2214        // Simulate crash recovery scenario: replay full transaction log
2215        let mutations = vec![
2216            // Insert Alice with Person label
2217            Mutation::InsertVertex {
2218                vid: alice,
2219                properties: {
2220                    let mut props = HashMap::new();
2221                    props.insert(
2222                        "name".to_string(),
2223                        uni_common::Value::String("Alice".to_string()),
2224                    );
2225                    props
2226                },
2227                labels: vec!["Person".to_string()],
2228            },
2229            // Insert Bob with Person label
2230            Mutation::InsertVertex {
2231                vid: bob,
2232                properties: {
2233                    let mut props = HashMap::new();
2234                    props.insert(
2235                        "name".to_string(),
2236                        uni_common::Value::String("Bob".to_string()),
2237                    );
2238                    props
2239                },
2240                labels: vec!["Person".to_string()],
2241            },
2242            // Create KNOWS edge between them
2243            Mutation::InsertEdge {
2244                src_vid: alice,
2245                dst_vid: bob,
2246                edge_type: 1,
2247                eid,
2248                version: 3,
2249                properties: HashMap::new(),
2250                edge_type_name: Some("KNOWS".to_string()),
2251            },
2252        ];
2253
2254        // Replay all mutations
2255        l0.replay_mutations(mutations)?;
2256
2257        // Verify vertex labels preserved
2258        assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
2259        assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
2260        assert_eq!(l0.vids_for_label("Person").len(), 2);
2261
2262        // Verify edge type name preserved
2263        assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
2264        assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
2265
2266        // Verify graph structure
2267        let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
2268        assert_eq!(alice_neighbors.len(), 1);
2269        assert_eq!(alice_neighbors[0].0, bob);
2270
2271        Ok(())
2272    }
2273
2274    /// Test for Issue #23: Empty labels should deserialize correctly (backward compat)
2275    #[test]
2276    fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
2277        use crate::runtime::wal::Mutation;
2278
2279        let mut l0 = L0Buffer::new(0, None);
2280        let vid = Vid::new(1);
2281
2282        // Simulate old WAL format: InsertVertex with empty labels
2283        // (This tests #[serde(default)] behavior)
2284        let mutations = vec![Mutation::InsertVertex {
2285            vid,
2286            properties: HashMap::new(),
2287            labels: vec![], // Empty labels (old format compatibility)
2288        }];
2289
2290        l0.replay_mutations(mutations)?;
2291
2292        // Vertex should exist
2293        assert!(l0.vertex_properties.contains_key(&vid));
2294
2295        // Labels should be empty but entry should exist in vertex_labels
2296        let labels = l0.get_vertex_labels(vid);
2297        assert!(labels.is_some(), "Labels entry should exist even if empty");
2298        assert_eq!(labels.unwrap().len(), 0);
2299
2300        Ok(())
2301    }
2302
2303    #[test]
2304    fn test_now_nanos_returns_nanosecond_range() {
2305        // Test that now_nanos() returns a value in nanosecond range
2306        // As of 2025, Unix timestamp in nanoseconds should be > 1.7e18
2307        // (2025-01-01 is approximately 1,735,689,600 seconds = 1.735e18 nanoseconds)
2308        let now = now_nanos();
2309
2310        // Verify it's in nanosecond range (not microseconds which would be 1000x smaller)
2311        assert!(
2312            now > 1_700_000_000_000_000_000,
2313            "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
2314            now
2315        );
2316
2317        // Sanity check: should also be less than year 2100 in nanoseconds (4.1e18)
2318        assert!(
2319            now < 4_100_000_000_000_000_000,
2320            "now_nanos() returned {}, expected < 4.1e18",
2321            now
2322        );
2323    }
2324}