Skip to main content

uni_store/runtime/
l0.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15/// Items a read-write transaction observed during execution, used for SSI
16/// read-write antidependency detection at commit.
17///
18/// Shared (via `Arc<Mutex<_>>`) between the read path — which records reads
19/// through the transaction's `QueryContext` — and the commit path, which checks
20/// it against concurrently-committed write-sets. Item-level granularity;
21/// phantoms are out of scope (handled by the `FOR UPDATE` escape hatch).
22#[derive(Debug, Default)]
23pub struct OccReadSet {
24    /// Vertices the transaction read.
25    pub vertices: HashSet<Vid>,
26    /// Edges the transaction read.
27    pub edges: HashSet<Eid>,
28}
29
30impl OccReadSet {
31    /// `true` when nothing has been read yet. Used to decide whether a `FOR
32    /// UPDATE` acquisition may safely re-pin a still-fresh transaction.
33    pub fn is_empty(&self) -> bool {
34        self.vertices.is_empty() && self.edges.is_empty()
35    }
36}
37
38/// Returns the current timestamp in nanoseconds since Unix epoch.
39fn now_nanos() -> i64 {
40    SystemTime::now()
41        .duration_since(UNIX_EPOCH)
42        .map(|d| d.as_nanos() as i64)
43        .unwrap_or(0)
44}
45
46/// Returns the [`Crdt`] a property value encodes, or `None` if it is not one.
47///
48/// `Crdt` is `#[serde(tag = "t", content = "d")]`, so it deserializes only from a
49/// JSON object, and only `Value::Map(_)` produces one — gating on `Map` avoids
50/// allocating a JSON tree for large non-map values (e.g. embedding columns).
51///
52/// This is the single source of truth for "is this value CRDT-mergeable": both
53/// the commit-time merge ([`L0Buffer::merge_crdt_properties`]) and the OCC
54/// write-set carve-out ([`crate::runtime::occ::WriteSet::from_l0`]) consult it,
55/// so the carve-out can never exclude an item the merge would actually overwrite
56/// (which would silently lose an update).
57pub(crate) fn try_as_crdt(v: &Value) -> Option<Crdt> {
58    if !matches!(v, Value::Map(_)) {
59        return None;
60    }
61    serde_json::from_value::<Crdt>(v.clone().into()).ok()
62}
63
64/// Serialize a constraint key for O(1) uniqueness checks.
65/// Format: label + separator + sorted (prop_name, value) pairs.
66pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
67    let mut buf = label.as_bytes().to_vec();
68    buf.push(0); // separator
69    let mut sorted = key_values.to_vec();
70    sorted.sort_by(|a, b| a.0.cmp(&b.0));
71    for (k, v) in &sorted {
72        buf.extend(k.as_bytes());
73        buf.push(0);
74        // Use serde_json serialization for deterministic value encoding
75        buf.extend(serde_json::to_vec(v).unwrap_or_default());
76        buf.push(0);
77    }
78    buf
79}
80
81/// Per-type mutation counters accumulated by the L0 buffer.
82///
83/// Used to provide detailed mutation statistics (e.g., `nodes_created`,
84/// `relationships_deleted`) on `ExecuteResult`. Callers snapshot before/after
85/// execution and call [`diff()`](MutationStats::diff) to get the delta.
86#[derive(Debug, Clone, Default)]
87pub struct MutationStats {
88    pub nodes_created: usize,
89    pub nodes_deleted: usize,
90    pub relationships_created: usize,
91    pub relationships_deleted: usize,
92    pub properties_set: usize,
93    pub properties_removed: usize,
94    pub labels_added: usize,
95    pub labels_removed: usize,
96}
97
98impl MutationStats {
99    /// Compute the field-wise difference `self - before`.
100    pub fn diff(&self, before: &Self) -> Self {
101        Self {
102            nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
103            nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
104            relationships_created: self
105                .relationships_created
106                .saturating_sub(before.relationships_created),
107            relationships_deleted: self
108                .relationships_deleted
109                .saturating_sub(before.relationships_deleted),
110            properties_set: self.properties_set.saturating_sub(before.properties_set),
111            properties_removed: self
112                .properties_removed
113                .saturating_sub(before.properties_removed),
114            labels_added: self.labels_added.saturating_sub(before.labels_added),
115            labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
116        }
117    }
118}
119
120#[derive(Clone, Debug)]
121pub struct TombstoneEntry {
122    pub eid: Eid,
123    pub src_vid: Vid,
124    pub dst_vid: Vid,
125    pub edge_type: u32,
126}
127
128pub struct L0Buffer {
129    /// Graph topology using simple adjacency lists
130    pub graph: SimpleGraph,
131    /// Soft-deleted edges (tombstones for LSM-style merging)
132    pub tombstones: HashMap<Eid, TombstoneEntry>,
133    /// Soft-deleted vertices
134    pub vertex_tombstones: HashSet<Vid>,
135    /// Edge version tracking for MVCC
136    pub edge_versions: HashMap<Eid, u64>,
137    /// Vertex version tracking for MVCC
138    pub vertex_versions: HashMap<Vid, u64>,
139    /// Edge properties (stored separately from topology)
140    pub edge_properties: HashMap<Eid, Properties>,
141    /// Vertex properties (stored separately from topology)
142    pub vertex_properties: HashMap<Vid, Properties>,
143    /// Edge endpoint lookup: eid -> (src, dst, type)
144    pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
145    /// Vertex labels (VID -> list of label names)
146    /// New in storage design: vertices can have multiple labels
147    pub vertex_labels: HashMap<Vid, Vec<String>>,
148    /// Reverse index: label name → set of VIDs with that label. Maintained
149    /// alongside `vertex_labels` for O(1) label-based vertex lookups.
150    pub label_to_vids: HashMap<String, HashSet<Vid>>,
151    /// Vids whose FULL label set was explicitly replaced by a label mutation
152    /// (`SET n:Label` / `REMOVE n:Label`) in this buffer, via
153    /// [`L0Buffer::set_vertex_labels`]. Distinguishes a deliberate label
154    /// replacement from the empty `vertex_labels` entry a property-only write
155    /// incidentally creates (`entry().or_default()`), so `merge` knows to REPLACE
156    /// (not append) these vids' labels and `WriteSet::from_l0` knows they are
157    /// conflictable writes. A transaction-buffer concept; empty on main L0.
158    pub vertex_label_overwrites: HashSet<Vid>,
159    /// Edge types (EID -> type name)
160    pub edge_types: HashMap<Eid, String>,
161    /// Current version counter
162    pub current_version: u64,
163    /// Mutation count for flush decisions
164    pub mutation_count: usize,
165    /// Per-type mutation counters for detailed statistics.
166    pub mutation_stats: MutationStats,
167    /// Write-ahead log for durability
168    pub wal: Option<Arc<WriteAheadLog>>,
169    /// WAL LSN at the time this L0 was rotated for flush.
170    /// Used to ensure WAL truncation doesn't remove entries needed by pending flushes.
171    pub wal_lsn_at_flush: u64,
172    /// Vertex creation timestamps (nanoseconds since epoch)
173    pub vertex_created_at: HashMap<Vid, i64>,
174    /// Vertex update timestamps (nanoseconds since epoch)
175    pub vertex_updated_at: HashMap<Vid, i64>,
176    /// Edge creation timestamps (nanoseconds since epoch)
177    pub edge_created_at: HashMap<Eid, i64>,
178    /// Edge update timestamps (nanoseconds since epoch)
179    pub edge_updated_at: HashMap<Eid, i64>,
180    /// Estimated size in bytes for memory limit enforcement.
181    /// Incremented O(1) on each mutation to avoid O(V+E) size_bytes() calls.
182    pub estimated_size: usize,
183    /// Per-constraint index for O(1) unique key checks.
184    /// Key: constraint composite key (label + sorted property values serialized).
185    /// Value: Vid that owns this key.
186    pub constraint_index: HashMap<Vec<u8>, Vid>,
187    /// Implicit MERGE-key guard for phantom-free `MERGE` *without* a declared
188    /// `UNIQUE` constraint. Same key format as `constraint_index` (built by
189    /// [`serialize_constraint_key`]), but populated only by a `MERGE` that
190    /// *creates* a node, and re-probed at commit only against other concurrent
191    /// `MERGE`-creates — so two concurrent `MERGE`s of the same key converge to
192    /// one node (the loser aborts retriably) instead of silently duplicating,
193    /// while a plain `CREATE` of the same properties is unaffected (it never
194    /// registers a key). Tombstoned with the owning vid; transient (not rebuilt
195    /// on recovery).
196    pub merge_guard_index: HashMap<Vec<u8>, Vid>,
197    /// Reverse index `ext_id` → owning vid for O(1) global ext_id uniqueness
198    /// checks (`Writer::check_extid_globally_unique` previously scanned every
199    /// `vertex_properties` map per insert — O(n²) ingest). Maintained by the
200    /// vertex insert impls (synced to the post-CRDT-merge value) and by
201    /// `apply_vertex_deletion`, so merge and WAL replay keep it consistent
202    /// for free.
203    pub extid_index: HashMap<String, Vid>,
204    /// Per-VID set of property keys that should land via Lance MergeInsert
205    /// (partial-column update) at flush time. Populated by
206    /// `insert_vertex_partial`; cleared by full-row inserts and deletes.
207    /// A VID present here at flush time is emitted to the partial batch;
208    /// absent VIDs flush via the existing full-row Append.
209    pub vertex_partial_keys: HashMap<Vid, HashSet<String>>,
210    /// Edge analog of `vertex_partial_keys` (Round 12 §A). Populated by
211    /// `insert_edge_partial_full`; cleared by full-row inserts and edge
212    /// deletes. Per-edge-type delta-table flush honors these by emitting
213    /// a `MergeInsertBuilder` source with only the touched schema
214    /// columns plus `eid`, `op`, `_version`, `_updated_at`, and
215    /// `overflow_json` (when an overflow prop was touched).
216    pub edge_partial_keys: HashMap<Eid, HashSet<String>>,
217    /// Phase B (UniConfig::defer_embeddings): VIDs whose auto-embedding
218    /// was skipped at insert time and is owed at flush. Value = primary
219    /// label name (the rest of the embedding config is looked up from the
220    /// schema at flush time). Drained by `flush_stream_l1` before column
221    /// extraction; entries are removed when the embedding lands in the
222    /// vertex's L0 property map.
223    pub pending_embeddings: HashMap<Vid, String>,
224    /// Optimistic-concurrency read sequence (SSI). Stamped on a transaction's
225    /// private L0 at creation with the Writer's commit-sequence at that moment,
226    /// and consulted at commit to detect intervening conflicting commits. `0`
227    /// for the main L0 and when SSI is disabled.
228    pub occ_read_seq: u64,
229    /// Optimistic-concurrency read-set (SSI). `Some` on a read-write
230    /// transaction's private L0 when SSI tracking is active; the read path
231    /// records observed ids here and commit checks them for antidependencies.
232    /// `None` for the main L0 and read-only / SSI-disabled paths.
233    pub occ_read_set: Option<Arc<parking_lot::Mutex<OccReadSet>>>,
234}
235
236impl std::fmt::Debug for L0Buffer {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        f.debug_struct("L0Buffer")
239            .field("vertex_count", &self.graph.vertex_count())
240            .field("edge_count", &self.graph.edge_count())
241            .field("tombstones", &self.tombstones.len())
242            .field("vertex_tombstones", &self.vertex_tombstones.len())
243            .field("current_version", &self.current_version)
244            .field("mutation_count", &self.mutation_count)
245            .finish()
246    }
247}
248
249impl Clone for L0Buffer {
250    /// Clone the L0 buffer for fork/restore (ASSUME/ABDUCE).
251    ///
252    /// The cloned buffer does NOT share the WAL reference — forked L0s are
253    /// ephemeral and should not write to the WAL.
254    fn clone(&self) -> Self {
255        Self {
256            graph: self.graph.clone(),
257            tombstones: self.tombstones.clone(),
258            vertex_tombstones: self.vertex_tombstones.clone(),
259            edge_versions: self.edge_versions.clone(),
260            vertex_versions: self.vertex_versions.clone(),
261            edge_properties: self.edge_properties.clone(),
262            vertex_properties: self.vertex_properties.clone(),
263            edge_endpoints: self.edge_endpoints.clone(),
264            vertex_labels: self.vertex_labels.clone(),
265            label_to_vids: self.label_to_vids.clone(),
266            vertex_label_overwrites: self.vertex_label_overwrites.clone(),
267            edge_types: self.edge_types.clone(),
268            current_version: self.current_version,
269            mutation_count: self.mutation_count,
270            mutation_stats: self.mutation_stats.clone(),
271            wal: None, // Forked L0s don't share the WAL
272            wal_lsn_at_flush: self.wal_lsn_at_flush,
273            vertex_created_at: self.vertex_created_at.clone(),
274            vertex_updated_at: self.vertex_updated_at.clone(),
275            edge_created_at: self.edge_created_at.clone(),
276            edge_updated_at: self.edge_updated_at.clone(),
277            estimated_size: self.estimated_size,
278            constraint_index: self.constraint_index.clone(),
279            merge_guard_index: self.merge_guard_index.clone(),
280            extid_index: self.extid_index.clone(),
281            vertex_partial_keys: self.vertex_partial_keys.clone(),
282            edge_partial_keys: self.edge_partial_keys.clone(),
283            pending_embeddings: self.pending_embeddings.clone(),
284            occ_read_seq: self.occ_read_seq,
285            // Forked L0s (ASSUME/ABDUCE) do not participate in OCC tracking.
286            occ_read_set: None,
287        }
288    }
289}
290
291impl L0Buffer {
292    /// Append labels to a vec, skipping duplicates.
293    fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
294        for label in labels {
295            if !existing.contains(label) {
296                existing.push(label.clone());
297            }
298        }
299    }
300
301    /// Add a VID to the reverse label index for each of the given labels.
302    fn index_labels_for_vid(&mut self, vid: Vid, labels: &[String]) {
303        for label in labels {
304            self.label_to_vids
305                .entry(label.clone())
306                .or_default()
307                .insert(vid);
308        }
309    }
310
311    /// Read a vertex's current string `ext_id` from a property map.
312    fn extid_of(props: &Properties) -> Option<String> {
313        props
314            .get("ext_id")
315            .and_then(|v| v.as_str())
316            .map(str::to_owned)
317    }
318
319    /// Sync `extid_index` for `vid` around a property write.
320    ///
321    /// `old` / `new` are the vertex's ext_id before / after the CRDT merge —
322    /// the index always reflects the post-merge value, matching what the old
323    /// full scan in `Writer::check_extid_globally_unique` observed. Only
324    /// called when the incoming properties contain an `ext_id` key (the merge
325    /// cannot change the value otherwise).
326    fn sync_extid_index(&mut self, vid: Vid, old: Option<String>, new: Option<String>) {
327        if old == new {
328            return;
329        }
330        if let Some(old) = old
331            && self.extid_index.get(&old) == Some(&vid)
332        {
333            self.extid_index.remove(&old);
334        }
335        if let Some(new) = new {
336            self.extid_index.insert(new, vid);
337        }
338    }
339
340    /// Remove a VID from all label entries in the reverse index.
341    fn remove_vid_from_label_index(&mut self, vid: Vid) {
342        if let Some(labels) = self.vertex_labels.get(&vid) {
343            for label in labels {
344                if let Some(set) = self.label_to_vids.get_mut(label) {
345                    set.remove(&vid);
346                }
347            }
348        }
349    }
350
351    /// Replaces a vertex's FULL label set — the semantics of `SET n:Label` /
352    /// `REMOVE n:Label`, which resolve the new complete set before writing.
353    ///
354    /// Unlike [`add_vertex_labels`](Self::add_vertex_labels) (append), this clears
355    /// the vid's existing labels from the reverse index, sets the new set, and
356    /// re-indexes — so a removal actually removes. It marks the vid in
357    /// `vertex_label_overwrites` so `merge` REPLACES (not appends) these labels at
358    /// commit and `WriteSet::from_l0` treats the change as a conflictable write.
359    /// Increments `mutation_count` (a label change is a real mutation; its sibling
360    /// `remove_vertex_label` already does so).
361    pub fn set_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
362        self.remove_vid_from_label_index(vid);
363        self.vertex_labels.insert(vid, labels.to_vec());
364        self.index_labels_for_vid(vid, labels);
365        self.vertex_label_overwrites.insert(vid);
366        self.current_version += 1;
367        self.mutation_count += 1;
368    }
369
370    /// Merge CRDT properties into an existing property map.
371    /// Attempts CRDT merge if both values are valid CRDTs, falls back to overwrite.
372    ///
373    /// When the entry is empty (new vertex insert), skips the expensive JSON
374    /// round-trip and directly assigns the properties.
375    ///
376    /// Logs a warning when a CRDT value is overwritten by a non-CRDT scalar
377    /// (limitation R1).
378    fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
379        // Fast path: new vertex with no existing properties — skip JSON round-trip
380        if entry.is_empty() {
381            *entry = properties;
382            return;
383        }
384
385        for (k, v) in properties {
386            // `try_as_crdt` performs the Map-gated CRDT probe (see its docs for the
387            // wide-row perf rationale). Sharing it with `WriteSet::from_l0` keeps the
388            // OCC carve-out consistent with this merge-versus-overwrite decision.
389            if let Some(mut new_crdt) = try_as_crdt(&v)
390                && let Some(existing_v) = entry.get(&k)
391                && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
392            {
393                // Use try_merge to avoid panic on type mismatch.
394                if new_crdt.try_merge(&existing_crdt).is_ok()
395                    && let Ok(merged_json) = serde_json::to_value(new_crdt)
396                {
397                    entry.insert(k, uni_common::Value::from(merged_json));
398                    continue;
399                }
400                // CRDT variant mismatch (or a failed re-serialize): fall through to
401                // a last-writer-wins overwrite, discarding the existing CRDT's
402                // merged state. The OCC commit-time carve-out check
403                // (`occ::crdt_carveout_overwrite`) aborts a *concurrent* writer
404                // before reaching here, and write-time schema enforcement rejects a
405                // wrong declared variant; this warns on any residual (e.g. a
406                // single-writer variant change) so the discarded state is visible.
407                tracing::warn!(
408                    property = %k,
409                    existing_variant = existing_crdt.type_name(),
410                    "overwriting CRDT property with a different CRDT variant \
411                     (last-writer-wins); merged CRDT state is discarded"
412                );
413            } else if try_as_crdt(&v).is_none()
414                && entry.get(&k).is_some_and(|e| try_as_crdt(e).is_some())
415            {
416                // R1: an existing CRDT value is overwritten by a non-CRDT scalar
417                // (last-writer-wins), silently discarding the CRDT's merged state
418                // — a property written as BOTH a CRDT and last-writer-wins. The OCC
419                // write-set carve-out lets CRDT-only writers commit without
420                // conflicting, so a concurrent LWW write on the same property
421                // cannot be flagged as a conflict; surface it here instead.
422                tracing::warn!(
423                    property = %k,
424                    "overwriting CRDT property with non-CRDT value (last-writer-wins); \
425                     merged CRDT state is discarded"
426                );
427            }
428            // Fallback: Overwrite (last-writer-wins).
429            entry.insert(k, v);
430        }
431    }
432
433    /// Helper function to estimate property map size in bytes.
434    fn estimate_properties_size(props: &Properties) -> usize {
435        props.keys().map(|k| k.len() + 32).sum()
436    }
437
438    /// Returns an estimate of the buffer size in bytes.
439    /// Includes all fields for accurate memory accounting.
440    pub fn size_bytes(&self) -> usize {
441        let mut total = 0;
442
443        // Topology
444        total += self.graph.vertex_count() * 8;
445        total += self.graph.edge_count() * 24;
446
447        // Properties (rough estimate: key string + 32 bytes for value)
448        for props in self.vertex_properties.values() {
449            total += Self::estimate_properties_size(props);
450        }
451        for props in self.edge_properties.values() {
452            total += Self::estimate_properties_size(props);
453        }
454
455        // Metadata
456        total += self.tombstones.len() * 64;
457        total += self.vertex_tombstones.len() * 8;
458        total += self.edge_versions.len() * 16;
459        total += self.vertex_versions.len() * 16;
460        total += self.edge_endpoints.len() * 28; // (Vid, Vid, u32) = 8+8+4 + overhead
461
462        // Vertex labels
463        for labels in self.vertex_labels.values() {
464            total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
465        }
466
467        // Reverse label index (label_to_vids)
468        for (label, vids) in &self.label_to_vids {
469            total += label.len() + 24 + vids.len() * 8 + 48; // string + HashSet overhead
470        }
471
472        // Edge types
473        for type_name in self.edge_types.values() {
474            total += type_name.len() + 24;
475        }
476
477        // Timestamps (4 maps, each entry is 16 bytes: 8-byte key + 8-byte i64 value)
478        total += self.vertex_created_at.len() * 16;
479        total += self.vertex_updated_at.len() * 16;
480        total += self.edge_created_at.len() * 16;
481        total += self.edge_updated_at.len() * 16;
482
483        total
484    }
485
486    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
487        Self {
488            graph: SimpleGraph::new(),
489            tombstones: HashMap::new(),
490            vertex_tombstones: HashSet::new(),
491            edge_versions: HashMap::new(),
492            vertex_versions: HashMap::new(),
493            edge_properties: HashMap::new(),
494            vertex_properties: HashMap::new(),
495            edge_endpoints: HashMap::new(),
496            vertex_labels: HashMap::new(),
497            label_to_vids: HashMap::new(),
498            vertex_label_overwrites: HashSet::new(),
499            edge_types: HashMap::new(),
500            current_version: start_version,
501            mutation_count: 0,
502            mutation_stats: MutationStats::default(),
503            wal,
504            wal_lsn_at_flush: 0,
505            vertex_created_at: HashMap::new(),
506            vertex_updated_at: HashMap::new(),
507            edge_created_at: HashMap::new(),
508            edge_updated_at: HashMap::new(),
509            estimated_size: 0,
510            constraint_index: HashMap::new(),
511            merge_guard_index: HashMap::new(),
512            extid_index: HashMap::new(),
513            vertex_partial_keys: HashMap::new(),
514            edge_partial_keys: HashMap::new(),
515            pending_embeddings: HashMap::new(),
516            occ_read_seq: 0,
517            occ_read_set: None,
518        }
519    }
520
521    pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
522        self.insert_vertex_with_labels(vid, properties, &[]);
523    }
524
525    /// Insert a vertex with associated labels.
526    pub fn insert_vertex_with_labels(
527        &mut self,
528        vid: Vid,
529        properties: Properties,
530        labels: &[String],
531    ) {
532        self.insert_vertex_with_labels_impl(vid, properties, labels, false);
533    }
534
535    /// Core vertex insertion. When `skip_wal` is true, skips WAL append
536    /// (used during merge where the caller already wrote to WAL).
537    fn insert_vertex_with_labels_impl(
538        &mut self,
539        vid: Vid,
540        properties: Properties,
541        labels: &[String],
542        skip_wal: bool,
543    ) {
544        self.current_version += 1;
545        let version = self.current_version;
546        let now = now_nanos();
547
548        if !skip_wal && let Some(wal) = &self.wal {
549            let _ = wal.append(Mutation::InsertVertex {
550                vid,
551                properties: properties.clone(),
552                labels: labels.to_vec(),
553            });
554        }
555
556        self.vertex_tombstones.remove(&vid);
557
558        // Full-row insert supersedes any pending partial-update state for
559        // this VID.
560        self.vertex_partial_keys.remove(&vid);
561
562        // Size/count computed up front so `properties` can be moved into the
563        // CRDT merge below instead of deep-cloned.
564        let props_size = Self::estimate_properties_size(&properties);
565        let props_count = properties.len();
566        let tracks_extid = properties.contains_key("ext_id");
567
568        let entry = self.vertex_properties.entry(vid).or_default();
569        let old_extid = if tracks_extid {
570            Self::extid_of(entry)
571        } else {
572            None
573        };
574        Self::merge_crdt_properties(entry, properties);
575        if tracks_extid {
576            let new_extid =
577                Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
578            self.sync_extid_index(vid, old_extid, new_extid);
579        }
580        self.vertex_versions.insert(vid, version);
581
582        // Set timestamps - created_at only set if this is a new vertex
583        self.vertex_created_at.entry(vid).or_insert(now);
584        self.vertex_updated_at.insert(vid, now);
585
586        // Track labels — always create an entry so unlabeled vertices are
587        // distinguishable from "not in L0" when queried via get_vertex_labels.
588        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
589        let existing = self.vertex_labels.entry(vid).or_default();
590        Self::append_unique_labels(existing, labels);
591        self.index_labels_for_vid(vid, labels);
592
593        self.graph.add_vertex(vid);
594        self.mutation_count += 1;
595        self.mutation_stats.nodes_created += 1;
596        self.mutation_stats.properties_set += props_count;
597        self.mutation_stats.labels_added += labels.len();
598
599        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
600    }
601
602    /// Insert a vertex's FULL property row, tagging `touched_keys` so the
603    /// flush emits exactly those columns via Lance `MergeInsertBuilder`
604    /// instead of a full-row Append.
605    ///
606    /// `props` MUST be the fully-merged property map (storage union
607    /// in-flight L0 union the new touched values, per
608    /// `PropertyManager::get_all_vertex_props_with_ctx`). The caller is
609    /// responsible for the union; L0 here just stores it so scans see
610    /// the complete row without per-key reconciliation.
611    ///
612    /// `touched_keys` lists the property keys this SET statement
613    /// actually assigned — the union of those across all coalesced
614    /// SetItems on this VID. Lance MergeInsert sends a source batch
615    /// with `_vid`, `_deleted`, `_version`, `_updated_at`, and those
616    /// touched columns; non-touched columns retain their pre-merge
617    /// values on the Lance side, skipping the wide-row write.
618    ///
619    /// A subsequent full-row `insert_vertex_with_labels` or
620    /// `delete_vertex` on the same VID clears the partial-keys entry
621    /// so partial state never outlives a stronger write.
622    pub fn insert_vertex_partial_full(
623        &mut self,
624        vid: Vid,
625        props: Properties,
626        touched_keys: HashSet<String>,
627        labels: &[String],
628    ) {
629        // Stage the full row through the existing partial-impl (same
630        // CRDT merge / version bump / timestamps), preserving the
631        // partial-keys entry so we can extend it below.
632        self.insert_vertex_with_labels_partial_impl(vid, props, labels, false);
633        self.vertex_partial_keys
634            .entry(vid)
635            .or_default()
636            .extend(touched_keys);
637    }
638
639    /// Legacy partial-only variant used by some uni-store paths. Kept
640    /// for source-compatibility but new uni-query callers should use
641    /// `insert_vertex_partial_full` to preserve scan-side L0 visibility.
642    pub fn insert_vertex_partial(&mut self, vid: Vid, touched: Properties, labels: &[String]) {
643        // Record dirty keys BEFORE the full-row impl runs (which would
644        // clear them). The keys come from the touched set; the values
645        // are merged into L0 by the shared CRDT path below.
646        let touched_keys: Vec<String> = touched.keys().cloned().collect();
647
648        // If the VID already has a full-row pending insert (e.g., CREATE
649        // earlier in the same tx), we must NOT downgrade it to partial.
650        // Detected by: VID is in vertex_properties WITH a version stamp
651        // AND not currently in vertex_partial_keys → it was written as
652        // a full row recently. The conservative rule: only enable the
653        // partial path when there's no full-row pending insert. We
654        // approximate "no full-row pending" by checking that the VID's
655        // current entry in vertex_partial_keys is non-empty OR the VID
656        // is not in vertex_properties (fresh row, but caller asked
657        // partial — let it through and the post-flush union covers it).
658        let already_full = self.vertex_properties.contains_key(&vid)
659            && !self.vertex_partial_keys.contains_key(&vid);
660
661        // Stage the CRDT merge through the existing path. We bypass the
662        // full-row `insert_vertex_with_labels_impl` clearing of
663        // partial_keys by inlining the work, then restoring/extending
664        // the partial-key set.
665        self.insert_vertex_with_labels_partial_impl(vid, touched, labels, false);
666
667        if !already_full {
668            self.vertex_partial_keys
669                .entry(vid)
670                .or_default()
671                .extend(touched_keys);
672        }
673    }
674
675    /// Core partial-insert: same as `insert_vertex_with_labels_impl` but
676    /// preserves any existing `vertex_partial_keys[vid]` entry so the
677    /// caller can extend it after the merge.
678    fn insert_vertex_with_labels_partial_impl(
679        &mut self,
680        vid: Vid,
681        properties: Properties,
682        labels: &[String],
683        skip_wal: bool,
684    ) {
685        self.current_version += 1;
686        let version = self.current_version;
687        let now = now_nanos();
688
689        if !skip_wal && let Some(wal) = &self.wal {
690            // WAL records the partial as a full-row InsertVertex; on replay
691            // the full-row path runs (which clears partial_keys). This is
692            // semantically correct — L0 in memory always holds the union of
693            // partial deltas via merge_crdt_properties; recovery doesn't
694            // need to preserve partial-vs-full distinction.
695            let _ = wal.append(Mutation::InsertVertex {
696                vid,
697                properties: properties.clone(),
698                labels: labels.to_vec(),
699            });
700        }
701
702        self.vertex_tombstones.remove(&vid);
703        // NOTE: deliberately DOES NOT remove from vertex_partial_keys.
704        // The caller (`insert_vertex_partial`) extends that set after.
705
706        // Size/count computed up front so `properties` can be moved into the
707        // CRDT merge below instead of deep-cloned.
708        let props_size = Self::estimate_properties_size(&properties);
709        let props_count = properties.len();
710        let tracks_extid = properties.contains_key("ext_id");
711
712        let entry = self.vertex_properties.entry(vid).or_default();
713        let old_extid = if tracks_extid {
714            Self::extid_of(entry)
715        } else {
716            None
717        };
718        Self::merge_crdt_properties(entry, properties);
719        if tracks_extid {
720            let new_extid =
721                Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
722            self.sync_extid_index(vid, old_extid, new_extid);
723        }
724        self.vertex_versions.insert(vid, version);
725
726        self.vertex_created_at.entry(vid).or_insert(now);
727        self.vertex_updated_at.insert(vid, now);
728
729        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
730        let existing = self.vertex_labels.entry(vid).or_default();
731        Self::append_unique_labels(existing, labels);
732        self.index_labels_for_vid(vid, labels);
733
734        self.graph.add_vertex(vid);
735        self.mutation_count += 1;
736        // Partial writes don't create new nodes — they update existing ones.
737        // But counting under properties_set is correct.
738        self.mutation_stats.properties_set += props_count;
739        self.mutation_stats.labels_added += labels.len();
740
741        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
742    }
743
744    /// Add labels to an existing vertex.
745    pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
746        let existing = self.vertex_labels.entry(vid).or_default();
747        Self::append_unique_labels(existing, labels);
748        self.index_labels_for_vid(vid, labels);
749    }
750
751    /// Remove a label from an existing vertex.
752    /// Returns true if the label was found and removed, false otherwise.
753    pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
754        if let Some(labels) = self.vertex_labels.get_mut(&vid)
755            && let Some(pos) = labels.iter().position(|l| l == label)
756        {
757            labels.remove(pos);
758            if let Some(set) = self.label_to_vids.get_mut(label) {
759                set.remove(&vid);
760            }
761            self.current_version += 1;
762            self.mutation_count += 1;
763            self.mutation_stats.labels_removed += 1;
764            // Note: WAL logging for label mutations not yet implemented
765            // Currently consistent with add_vertex_labels behavior
766            return true;
767        }
768        false
769    }
770
771    /// Set the type for an edge.
772    pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
773        self.edge_types.insert(eid, edge_type);
774    }
775
776    pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
777        self.delete_vertex_impl(vid, false)
778    }
779
780    /// Core vertex deletion. When `skip_wal` is true, skips WAL append
781    /// (used during merge where the caller already wrote to WAL).
782    fn delete_vertex_impl(&mut self, vid: Vid, skip_wal: bool) -> Result<()> {
783        self.current_version += 1;
784
785        if !skip_wal && let Some(wal) = &mut self.wal {
786            let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
787            wal.append(Mutation::DeleteVertex { vid, labels })?;
788        }
789
790        self.apply_vertex_deletion(vid);
791        Ok(())
792    }
793
794    /// Cascade-delete a vertex: tombstone all connected edges and remove the vertex.
795    ///
796    /// Shared between `delete_vertex` (live mutations) and `replay_mutations` (WAL recovery).
797    fn apply_vertex_deletion(&mut self, vid: Vid) {
798        let version = self.current_version;
799
800        // Collect edges to delete using O(degree) neighbors() instead of O(E) scan
801        let mut edges_to_remove = HashSet::new();
802
803        // Collect outgoing edges
804        for entry in self.graph.neighbors(vid, Direction::Outgoing) {
805            edges_to_remove.insert(entry.eid);
806        }
807
808        // Collect incoming edges
809        for entry in self.graph.neighbors(vid, Direction::Incoming) {
810            edges_to_remove.insert(entry.eid); // HashSet handles self-loop deduplication
811        }
812
813        let cascaded_edges_count = edges_to_remove.len();
814
815        // Tombstone and remove all collected edges
816        for eid in edges_to_remove {
817            // Retrieve edge endpoints from the map to create tombstone
818            if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
819                self.tombstones.insert(
820                    eid,
821                    TombstoneEntry {
822                        eid,
823                        src_vid: *src,
824                        dst_vid: *dst,
825                        edge_type: *etype,
826                    },
827                );
828                self.edge_versions.insert(eid, version);
829                self.edge_endpoints.remove(&eid);
830                self.edge_properties.remove(&eid);
831                self.graph.remove_edge(eid);
832                self.mutation_count += 1;
833                self.mutation_stats.relationships_deleted += 1;
834            }
835        }
836
837        self.remove_vid_from_label_index(vid);
838        self.vertex_tombstones.insert(vid);
839        // Drop the vid's ext_id index entry (O(1) via its current property
840        // value, read before the property map entry is removed below).
841        if let Some(props) = self.vertex_properties.get(&vid)
842            && let Some(ext) = Self::extid_of(props)
843            && self.extid_index.get(&ext) == Some(&vid)
844        {
845            self.extid_index.remove(&ext);
846        }
847        self.vertex_properties.remove(&vid);
848        // Deletion supersedes any pending partial-update state.
849        self.vertex_partial_keys.remove(&vid);
850        self.vertex_versions.insert(vid, version);
851        self.graph.remove_vertex(vid);
852        self.mutation_count += 1;
853        self.mutation_stats.nodes_deleted += 1;
854
855        // Remove constraint index entries for this vertex
856        self.constraint_index.retain(|_, v| *v != vid);
857        // Same for the implicit MERGE guard, so a later re-MERGE of a deleted
858        // node's key does not false-conflict with the stale entry.
859        self.merge_guard_index.retain(|_, v| *v != vid);
860
861        // 64 bytes per edge tombstone + 8 for vertex tombstone
862        self.estimated_size += cascaded_edges_count * 72 + 8;
863    }
864
865    pub fn insert_edge(
866        &mut self,
867        src_vid: Vid,
868        dst_vid: Vid,
869        edge_type: u32,
870        eid: Eid,
871        properties: Properties,
872        edge_type_name: Option<String>,
873    ) -> Result<()> {
874        self.insert_edge_impl(
875            src_vid,
876            dst_vid,
877            edge_type,
878            eid,
879            properties,
880            edge_type_name,
881            false,
882        )
883    }
884
885    /// Core edge insertion. When `skip_wal` is true, skips WAL append
886    /// (used during merge where the caller already wrote to WAL).
887    #[allow(clippy::too_many_arguments)]
888    fn insert_edge_impl(
889        &mut self,
890        src_vid: Vid,
891        dst_vid: Vid,
892        edge_type: u32,
893        eid: Eid,
894        properties: Properties,
895        edge_type_name: Option<String>,
896        skip_wal: bool,
897    ) -> Result<()> {
898        self.current_version += 1;
899        let now = now_nanos();
900
901        if !skip_wal && let Some(wal) = &mut self.wal {
902            wal.append(Mutation::InsertEdge {
903                src_vid,
904                dst_vid,
905                edge_type,
906                eid,
907                version: self.current_version,
908                properties: properties.clone(),
909                edge_type_name: edge_type_name.clone(),
910            })?;
911        }
912
913        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
914
915        // Store edge type name in metadata if provided
916        let type_name_size = if let Some(ref name) = edge_type_name {
917            let size = name.len() + 24;
918            self.edge_types.insert(eid, name.clone());
919            size
920        } else {
921            0
922        };
923
924        // Set timestamps - created_at only set if this is a new edge
925        self.edge_created_at.entry(eid).or_insert(now);
926        self.edge_updated_at.insert(eid, now);
927
928        // A full-row insert supersedes any pending partial-update state
929        // for this EID (Round 12 §A).
930        self.edge_partial_keys.remove(&eid);
931
932        self.estimated_size += type_name_size;
933
934        Ok(())
935    }
936
937    /// Insert an edge's FULL property row plus a touched-keys hint so the
938    /// flush emits only those schema columns via Lance `MergeInsert` on
939    /// the per-edge-type delta tables. Edge analog of
940    /// `insert_vertex_partial_full` (Round 12 §A).
941    #[allow(clippy::too_many_arguments)]
942    pub fn insert_edge_partial_full(
943        &mut self,
944        src_vid: Vid,
945        dst_vid: Vid,
946        edge_type: u32,
947        eid: Eid,
948        properties: Properties,
949        edge_type_name: Option<String>,
950        touched_keys: HashSet<String>,
951    ) -> Result<()> {
952        self.current_version += 1;
953        let now = now_nanos();
954
955        if let Some(wal) = &mut self.wal {
956            wal.append(Mutation::InsertEdge {
957                src_vid,
958                dst_vid,
959                edge_type,
960                eid,
961                version: self.current_version,
962                properties: properties.clone(),
963                edge_type_name: edge_type_name.clone(),
964            })?;
965        }
966
967        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
968
969        // `apply_edge_insertion` cleared the partial-keys entry as a
970        // safety measure (full-row insert supersedes partial). Re-insert
971        // with the touched-keys hint so the flush emits a partial source.
972        self.edge_partial_keys
973            .entry(eid)
974            .or_default()
975            .extend(touched_keys);
976
977        let type_name_size = if let Some(ref name) = edge_type_name {
978            let size = name.len() + 24;
979            self.edge_types.insert(eid, name.clone());
980            size
981        } else {
982            0
983        };
984
985        self.edge_created_at.entry(eid).or_insert(now);
986        self.edge_updated_at.insert(eid, now);
987
988        self.estimated_size += type_name_size;
989
990        Ok(())
991    }
992
993    /// Core edge insertion logic: add vertices, add edge, merge properties, update metadata.
994    ///
995    /// Shared between `insert_edge` (live mutations) and `replay_mutations` (WAL recovery).
996    ///
997    /// # Errors
998    ///
999    /// Returns error if either endpoint vertex has been deleted (exists in vertex_tombstones).
1000    /// This prevents "ghost vertex" resurrection via edge insertion. See issue #77.
1001    fn apply_edge_insertion(
1002        &mut self,
1003        src_vid: Vid,
1004        dst_vid: Vid,
1005        edge_type: u32,
1006        eid: Eid,
1007        properties: Properties,
1008    ) -> Result<()> {
1009        let version = self.current_version;
1010
1011        // Check if either endpoint has been deleted. Inserting an edge to a deleted
1012        // vertex would resurrect it as a "ghost vertex" with no properties. See issue #77.
1013        if self.vertex_tombstones.contains(&src_vid) {
1014            anyhow::bail!(
1015                "Cannot insert edge: source vertex {} has been deleted (issue #77)",
1016                src_vid
1017            );
1018        }
1019        if self.vertex_tombstones.contains(&dst_vid) {
1020            anyhow::bail!(
1021                "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
1022                dst_vid
1023            );
1024        }
1025
1026        // Add vertices to graph topology if they don't exist.
1027        // IMPORTANT: Only add to graph structure, do NOT call insert_vertex.
1028        // insert_vertex creates a new version with empty properties, which would
1029        // cause MVCC to pick the empty version as "latest", losing original properties.
1030        if !self.graph.contains_vertex(src_vid) {
1031            self.graph.add_vertex(src_vid);
1032        }
1033        if !self.graph.contains_vertex(dst_vid) {
1034            self.graph.add_vertex(dst_vid);
1035        }
1036
1037        self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
1038
1039        // Store metadata with CRDT merge logic
1040        let props_size = Self::estimate_properties_size(&properties);
1041        let props_count = properties.len();
1042        if !properties.is_empty() {
1043            let entry = self.edge_properties.entry(eid).or_default();
1044            Self::merge_crdt_properties(entry, properties);
1045        }
1046
1047        self.edge_versions.insert(eid, version);
1048        self.edge_endpoints
1049            .insert(eid, (src_vid, dst_vid, edge_type));
1050        self.tombstones.remove(&eid);
1051        self.mutation_count += 1;
1052        self.mutation_stats.relationships_created += 1;
1053        self.mutation_stats.properties_set += props_count;
1054
1055        // 24 edge + props + 16 version + 28 endpoints + 32 timestamps
1056        self.estimated_size += 24 + props_size + 16 + 28 + 32;
1057
1058        Ok(())
1059    }
1060
1061    pub fn delete_edge(
1062        &mut self,
1063        eid: Eid,
1064        src_vid: Vid,
1065        dst_vid: Vid,
1066        edge_type: u32,
1067    ) -> Result<()> {
1068        self.delete_edge_impl(eid, src_vid, dst_vid, edge_type, false)
1069    }
1070
1071    /// Core edge deletion. When `skip_wal` is true, skips WAL append
1072    /// (used during merge where the caller already wrote to WAL).
1073    fn delete_edge_impl(
1074        &mut self,
1075        eid: Eid,
1076        src_vid: Vid,
1077        dst_vid: Vid,
1078        edge_type: u32,
1079        skip_wal: bool,
1080    ) -> Result<()> {
1081        self.current_version += 1;
1082        let now = now_nanos();
1083
1084        if !skip_wal && let Some(wal) = &mut self.wal {
1085            wal.append(Mutation::DeleteEdge {
1086                eid,
1087                src_vid,
1088                dst_vid,
1089                edge_type,
1090                version: self.current_version,
1091            })?;
1092        }
1093
1094        self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1095
1096        // Update timestamp - deletion is an update
1097        self.edge_updated_at.insert(eid, now);
1098
1099        Ok(())
1100    }
1101
1102    /// Core edge deletion logic: tombstone the edge, update version, remove from graph.
1103    ///
1104    /// Shared between `delete_edge` (live mutations) and `replay_mutations` (WAL recovery).
1105    fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
1106        let version = self.current_version;
1107
1108        self.tombstones.insert(
1109            eid,
1110            TombstoneEntry {
1111                eid,
1112                src_vid,
1113                dst_vid,
1114                edge_type,
1115            },
1116        );
1117        self.edge_versions.insert(eid, version);
1118        // Deletion supersedes any pending partial-update state for this
1119        // EID (Round 12 §A).
1120        self.edge_partial_keys.remove(&eid);
1121        self.graph.remove_edge(eid);
1122        self.mutation_count += 1;
1123        self.mutation_stats.relationships_deleted += 1;
1124
1125        // 64 bytes tombstone + 16 bytes version
1126        self.estimated_size += 80;
1127    }
1128
1129    /// Returns neighbors in the specified direction.
1130    /// O(degree) complexity - iterates only edges connected to the vertex.
1131    pub fn get_neighbors(
1132        &self,
1133        vid: Vid,
1134        edge_type: u32,
1135        direction: Direction,
1136    ) -> Vec<(Vid, Eid, u64)> {
1137        let edges = self.graph.neighbors(vid, direction);
1138
1139        edges
1140            .iter()
1141            .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
1142            .map(|e| {
1143                let neighbor = match direction {
1144                    Direction::Outgoing => e.dst_vid,
1145                    Direction::Incoming => e.src_vid,
1146                };
1147                let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
1148                (neighbor, e.eid, version)
1149            })
1150            .collect()
1151    }
1152
1153    pub fn is_tombstoned(&self, eid: Eid) -> bool {
1154        self.tombstones.contains_key(&eid)
1155    }
1156
1157    /// Returns all VIDs in vertex_labels that match the given label name.
1158    /// O(1) lookup via the reverse label index.
1159    pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
1160        self.label_to_vids
1161            .get(label_name)
1162            .map(|set| set.iter().copied().collect())
1163            .unwrap_or_default()
1164    }
1165
1166    /// Returns all vertex VIDs in the L0 buffer.
1167    ///
1168    /// Used for schemaless scanning (MATCH (n) without label).
1169    pub fn all_vertex_vids(&self) -> Vec<Vid> {
1170        self.vertex_properties.keys().copied().collect()
1171    }
1172
1173    /// Returns all VIDs in vertex_labels that match any of the given label names.
1174    /// Uses the reverse label index — O(sum of matching set sizes).
1175    pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1176        let mut result = HashSet::new();
1177        for label_name in label_names {
1178            if let Some(set) = self.label_to_vids.get(*label_name) {
1179                result.extend(set.iter().copied());
1180            }
1181        }
1182        result.into_iter().collect()
1183    }
1184
1185    /// Returns all VIDs that have ALL specified labels.
1186    /// Uses the reverse label index — intersects the per-label sets.
1187    pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1188        if label_names.is_empty() {
1189            return Vec::new();
1190        }
1191        // Collect the per-label sets; if any label is missing from the index,
1192        // the intersection is empty.
1193        let sets: Vec<&HashSet<Vid>> = match label_names
1194            .iter()
1195            .map(|ln| self.label_to_vids.get(*ln))
1196            .collect::<Option<Vec<_>>>()
1197        {
1198            Some(s) => s,
1199            None => return Vec::new(),
1200        };
1201        // Start from the smallest set for efficiency.
1202        let smallest = sets.iter().min_by_key(|s| s.len()).unwrap();
1203        smallest
1204            .iter()
1205            .copied()
1206            .filter(|vid| sets.iter().all(|s| s.contains(vid)))
1207            .collect()
1208    }
1209
1210    /// Gets the labels for a VID.
1211    pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
1212        self.vertex_labels.get(&vid).map(|v| v.as_slice())
1213    }
1214
1215    /// Gets the edge type for an EID.
1216    pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
1217        self.edge_types.get(&eid).map(|s| s.as_str())
1218    }
1219
1220    /// Returns all EIDs in edge_types that match the given type name.
1221    /// Used for L0 overlay during schemaless edge scanning.
1222    pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
1223        self.edge_types
1224            .iter()
1225            .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
1226            .map(|(eid, _)| *eid)
1227            .collect()
1228    }
1229
1230    /// Returns all edge EIDs in the L0 buffer (non-tombstoned).
1231    ///
1232    /// Used for schemaless scanning (`MATCH ()-[r]->()`) without type.
1233    pub fn all_edge_eids(&self) -> Vec<Eid> {
1234        self.edge_endpoints
1235            .keys()
1236            .filter(|eid| !self.tombstones.contains_key(eid))
1237            .copied()
1238            .collect()
1239    }
1240
1241    /// Returns edge endpoint data (src_vid, dst_vid) for an EID.
1242    pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
1243        self.edge_endpoints
1244            .get(&eid)
1245            .map(|(src, dst, _)| (*src, *dst))
1246    }
1247
1248    /// Returns full edge endpoint data (src_vid, dst_vid, edge_type_id) for an EID.
1249    pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
1250        self.edge_endpoints.get(&eid).copied()
1251    }
1252
1253    /// Insert a constraint key into the index for O(1) duplicate detection.
1254    pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
1255        self.constraint_index.insert(key, vid);
1256    }
1257
1258    /// Check if a constraint key exists in the index, excluding a specific VID.
1259    /// Returns true if the key exists and is owned by a different vertex.
1260    pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1261        self.constraint_index
1262            .get(key)
1263            .is_some_and(|&v| v != exclude_vid)
1264    }
1265
1266    /// Register a MERGE-create's key into the implicit phantom guard.
1267    pub fn insert_merge_guard_key(&mut self, key: Vec<u8>, vid: Vid) {
1268        self.merge_guard_index.insert(key, vid);
1269    }
1270
1271    /// Check if a MERGE-guard key exists, owned by a different vertex than
1272    /// `exclude_vid` — i.e. a concurrent MERGE already created this key.
1273    pub fn has_merge_guard_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1274        self.merge_guard_index
1275            .get(key)
1276            .is_some_and(|&v| v != exclude_vid)
1277    }
1278
1279    #[instrument(skip(self, other), level = "trace")]
1280    /// Validate that merging `other` into `self` will not bail on a tombstoned
1281    /// edge endpoint (issue #77), **without mutating** either buffer.
1282    ///
1283    /// Mirrors the endpoint-liveness guard in `apply_edge_insertion`
1284    /// against the tombstone state [`Self::merge`] produces: `other`'s vertex
1285    /// deletions are applied and its vertex inserts clear their own tombstone,
1286    /// so an inserted edge bails iff an endpoint is tombstoned in `self` or
1287    /// `other` and is not (re-)inserted by `other`.
1288    ///
1289    /// Run this under `flush_lock` *before* the durable WAL flush so an
1290    /// offending commit is rejected up front. After the flush the transaction
1291    /// is durable, and a `merge` bail would leave a ghost/partial commit whose
1292    /// WAL replay re-bails — rendering the database unopenable.
1293    ///
1294    /// # Errors
1295    ///
1296    /// Returns an error naming the offending edge and endpoint when the merge
1297    /// would bail.
1298    pub fn validate_merge_edge_endpoints(&self, other: &L0Buffer) -> Result<()> {
1299        // An endpoint is effectively deleted after the merge's vertex phase if
1300        // it is tombstoned in either buffer and `other` does not re-insert it
1301        // (an insert clears the tombstone).
1302        let is_deleted = |vid: &Vid| {
1303            (self.vertex_tombstones.contains(vid) || other.vertex_tombstones.contains(vid))
1304                && !other.vertex_properties.contains_key(vid)
1305        };
1306        for (eid, (src_vid, dst_vid, _etype)) in &other.edge_endpoints {
1307            if other.tombstones.contains_key(eid) {
1308                continue; // a deletion, not an insertion — never resurrects a vertex
1309            }
1310            if is_deleted(src_vid) {
1311                anyhow::bail!(
1312                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
1313                    eid,
1314                    src_vid
1315                );
1316            }
1317            if is_deleted(dst_vid) {
1318                anyhow::bail!(
1319                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
1320                    eid,
1321                    dst_vid
1322                );
1323            }
1324        }
1325        Ok(())
1326    }
1327
1328    pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
1329        // Validate-then-apply: reject a merge that would bail on a tombstoned
1330        // edge endpoint before mutating anything, so a failed merge can never
1331        // leave a partially-applied (non-atomic) commit.
1332        self.validate_merge_edge_endpoints(other)?;
1333        self.merge_validated(
1334            other,
1335            other.vertex_properties.clone(),
1336            other.edge_properties.clone(),
1337        )
1338    }
1339
1340    /// Commit-path variant of [`merge`](Self::merge) that consumes `other`'s
1341    /// vertex/edge property maps instead of deep-cloning every row.
1342    ///
1343    /// Everything else in `other` (endpoints, tombstones, versions, labels)
1344    /// is left intact — `commit_transaction_l0` still reads those after the
1345    /// merge. The caller must not rely on `other.vertex_properties` /
1346    /// `other.edge_properties` afterwards, which is safe on the commit path
1347    /// because committing consumes the transaction.
1348    pub fn merge_take(&mut self, other: &mut L0Buffer) -> Result<()> {
1349        // Validate BEFORE draining: the endpoint check consults
1350        // `other.vertex_properties` (the "re-inserted by other" exemption).
1351        self.validate_merge_edge_endpoints(other)?;
1352        let vertex_props = std::mem::take(&mut other.vertex_properties);
1353        let edge_props = std::mem::take(&mut other.edge_properties);
1354        self.merge_validated(other, vertex_props, edge_props)
1355    }
1356
1357    /// Shared merge body. `vertex_props` / `edge_props` are `other`'s property
1358    /// maps, passed by value so rows move instead of clone; the caller has
1359    /// already run `validate_merge_edge_endpoints`.
1360    fn merge_validated(
1361        &mut self,
1362        other: &L0Buffer,
1363        vertex_props: HashMap<Vid, Properties>,
1364        mut edge_props: HashMap<Eid, Properties>,
1365    ) -> Result<()> {
1366        trace!(
1367            other_mutation_count = other.mutation_count,
1368            "Merging L0 buffer"
1369        );
1370        // skip_wal=true throughout: the caller (commit_transaction_l0) already
1371        // wrote every one of these mutations to WAL before invoking merge —
1372        // re-appending here would double the WAL volume per commit.
1373        // Merge Vertices
1374        for &vid in &other.vertex_tombstones {
1375            self.delete_vertex_impl(vid, true)?;
1376        }
1377
1378        for (vid, props) in vertex_props {
1379            let labels = other.vertex_labels.get(&vid).cloned().unwrap_or_default();
1380            self.insert_vertex_with_labels_impl(vid, props, &labels, true);
1381        }
1382
1383        // Merge vertex labels that might not have properties
1384        for (vid, labels) in &other.vertex_labels {
1385            if !self.vertex_labels.contains_key(vid) {
1386                self.vertex_labels.insert(*vid, labels.clone());
1387                for label in labels {
1388                    self.label_to_vids
1389                        .entry(label.clone())
1390                        .or_default()
1391                        .insert(*vid);
1392                }
1393            }
1394        }
1395
1396        // Label-overwrite pass: a `SET n:Label` / `REMOVE n:Label` resolved the
1397        // FULL new label set into `other.vertex_labels[vid]` and flagged the vid.
1398        // REPLACE (not append) so removals actually remove and an existing
1399        // vertex's label change lands — overriding any append from the property
1400        // loop above. Skip vids deleted in the same commit. (The append loops
1401        // stay correct for property-path label unions, which are NOT flagged.)
1402        for vid in &other.vertex_label_overwrites {
1403            if other.vertex_tombstones.contains(vid) {
1404                continue;
1405            }
1406            let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1407            self.remove_vid_from_label_index(*vid);
1408            self.vertex_labels.insert(*vid, labels.clone());
1409            self.index_labels_for_vid(*vid, &labels);
1410            // Carry the overwrite flag into the persistent (main) buffer so a
1411            // pure relabel of a prior-window vid — which is absent from
1412            // `vertex_properties` — is still re-derived at flush (M8). The
1413            // flag is cleared when this buffer is rotated out by the flush.
1414            self.vertex_label_overwrites.insert(*vid);
1415        }
1416
1417        // Merge Edges - insert all edges from edge_endpoints, using empty props if none exist
1418        for (eid, (src, dst, etype)) in &other.edge_endpoints {
1419            if other.tombstones.contains_key(eid) {
1420                self.delete_edge_impl(*eid, *src, *dst, *etype, true)?;
1421            } else {
1422                let props = edge_props.remove(eid).unwrap_or_default();
1423                let etype_name = other.edge_types.get(eid).cloned();
1424                self.insert_edge_impl(*src, *dst, *etype, *eid, props, etype_name, true)?;
1425            }
1426        }
1427
1428        // Merge tombstones for edges that only exist in the target buffer (self),
1429        // not in the source buffer's edge_endpoints.  Without this, transaction
1430        // DELETEs of pre-existing edges are silently lost on commit.
1431        for (eid, tombstone) in &other.tombstones {
1432            if !other.edge_endpoints.contains_key(eid) {
1433                self.delete_edge_impl(
1434                    *eid,
1435                    tombstone.src_vid,
1436                    tombstone.dst_vid,
1437                    tombstone.edge_type,
1438                    true,
1439                )?;
1440            }
1441        }
1442
1443        // Edge types are now merged inside insert_edge, so no separate loop needed
1444
1445        // Merge timestamps - preserve semantics of or_insert (keep oldest created_at)
1446        // and insert (use latest updated_at)
1447        for (vid, ts) in &other.vertex_created_at {
1448            self.vertex_created_at.entry(*vid).or_insert(*ts); // keep oldest
1449        }
1450        for (vid, ts) in &other.vertex_updated_at {
1451            self.vertex_updated_at.insert(*vid, *ts); // use latest (tx wins)
1452        }
1453
1454        for (eid, ts) in &other.edge_created_at {
1455            self.edge_created_at.entry(*eid).or_insert(*ts); // keep oldest
1456        }
1457        for (eid, ts) in &other.edge_updated_at {
1458            self.edge_updated_at.insert(*eid, *ts); // use latest (tx wins)
1459        }
1460
1461        // Conservatively add other's estimated size (may overcount due to
1462        // deduplication, but that's safe for a memory limit).
1463        self.estimated_size += other.estimated_size;
1464
1465        // Merge constraint index
1466        for (key, vid) in &other.constraint_index {
1467            self.constraint_index.insert(key.clone(), *vid);
1468        }
1469
1470        // Merge the implicit MERGE-key guard so a committed MERGE-create is
1471        // visible to a concurrent transaction's commit-time re-probe.
1472        for (key, vid) in &other.merge_guard_index {
1473            self.merge_guard_index.insert(key.clone(), *vid);
1474        }
1475
1476        Ok(())
1477    }
1478
1479    /// Replay mutations from WAL without re-logging them.
1480    /// Used during startup recovery to restore L0 state from persisted WAL.
1481    /// Uses CRDT merge semantics to ensure recovered state matches pre-crash state.
1482    #[instrument(skip(self, mutations), level = "debug")]
1483    pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
1484        trace!(count = mutations.len(), "Replaying mutations");
1485        for mutation in mutations {
1486            match mutation {
1487                Mutation::InsertVertex {
1488                    vid,
1489                    properties,
1490                    labels,
1491                } => {
1492                    // Apply without WAL logging, with CRDT merge semantics
1493                    self.current_version += 1;
1494                    let version = self.current_version;
1495
1496                    self.vertex_tombstones.remove(&vid);
1497                    let tracks_extid = properties.contains_key("ext_id");
1498                    let entry = self.vertex_properties.entry(vid).or_default();
1499                    let old_extid = if tracks_extid {
1500                        Self::extid_of(entry)
1501                    } else {
1502                        None
1503                    };
1504                    Self::merge_crdt_properties(entry, properties);
1505                    if tracks_extid {
1506                        let new_extid = Self::extid_of(
1507                            self.vertex_properties.get(&vid).expect("just inserted"),
1508                        );
1509                        self.sync_extid_index(vid, old_extid, new_extid);
1510                    }
1511                    self.vertex_versions.insert(vid, version);
1512                    self.graph.add_vertex(vid);
1513                    self.mutation_count += 1;
1514
1515                    // Restore vertex labels from WAL
1516                    let existing = self.vertex_labels.entry(vid).or_default();
1517                    Self::append_unique_labels(existing, &labels);
1518                    for label in &labels {
1519                        self.label_to_vids
1520                            .entry(label.clone())
1521                            .or_default()
1522                            .insert(vid);
1523                    }
1524                }
1525                Mutation::DeleteVertex { vid, labels } => {
1526                    self.current_version += 1;
1527                    // Restore labels BEFORE apply_vertex_deletion
1528                    if !labels.is_empty() {
1529                        let existing = self.vertex_labels.entry(vid).or_default();
1530                        Self::append_unique_labels(existing, &labels);
1531                        for label in &labels {
1532                            self.label_to_vids
1533                                .entry(label.clone())
1534                                .or_default()
1535                                .insert(vid);
1536                        }
1537                    }
1538                    self.apply_vertex_deletion(vid);
1539                }
1540                Mutation::SetVertexLabels { vid, labels } => {
1541                    // REPLACE the vid's full label set (a label-only mutation
1542                    // resolved the complete set). Replace, not append, so a
1543                    // replayed removal removes; clears the old reverse-index
1544                    // entries first.
1545                    self.current_version += 1;
1546                    self.remove_vid_from_label_index(vid);
1547                    self.vertex_labels.insert(vid, labels.clone());
1548                    self.index_labels_for_vid(vid, &labels);
1549                    self.mutation_count += 1;
1550                }
1551                Mutation::InsertEdge {
1552                    src_vid,
1553                    dst_vid,
1554                    edge_type,
1555                    eid,
1556                    version: _,
1557                    properties,
1558                    edge_type_name,
1559                } => {
1560                    self.current_version += 1;
1561                    // Skip-and-warn on the issue-#77 endpoint bail: a pre-fix
1562                    // durable WAL may hold a ghost edge whose endpoint was
1563                    // tombstoned. Recovery must still open the database rather
1564                    // than abort, so drop the offending edge and continue.
1565                    match self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties) {
1566                        Ok(()) => {
1567                            // Restore edge type name metadata if present
1568                            if let Some(name) = edge_type_name {
1569                                self.edge_types.insert(eid, name);
1570                            }
1571                        }
1572                        Err(e) => {
1573                            tracing::warn!(
1574                                ?eid,
1575                                ?src_vid,
1576                                ?dst_vid,
1577                                error = %e,
1578                                "WAL replay: skipping edge insertion to a deleted endpoint (issue #77)"
1579                            );
1580                        }
1581                    }
1582                }
1583                Mutation::DeleteEdge {
1584                    eid,
1585                    src_vid,
1586                    dst_vid,
1587                    edge_type,
1588                    version: _,
1589                } => {
1590                    self.current_version += 1;
1591                    self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1592                }
1593            }
1594        }
1595        Ok(())
1596    }
1597}
1598
1599#[cfg(test)]
1600mod tests {
1601    use super::*;
1602
1603    #[test]
1604    fn test_l0_buffer_ops() -> Result<()> {
1605        let mut l0 = L0Buffer::new(0, None);
1606        let vid_a = Vid::new(1);
1607        let vid_b = Vid::new(2);
1608        let eid_ab = Eid::new(101);
1609
1610        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1611
1612        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1613        assert_eq!(neighbors.len(), 1);
1614        assert_eq!(neighbors[0].0, vid_b);
1615        assert_eq!(neighbors[0].1, eid_ab);
1616
1617        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1618        assert!(l0.is_tombstoned(eid_ab));
1619
1620        // Verify neighbors are empty after deletion
1621        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1622        assert_eq!(neighbors_after.len(), 0);
1623
1624        Ok(())
1625    }
1626
1627    /// Regression for review #5: merging an edge whose endpoint is tombstoned
1628    /// in the target buffer must be rejected up front (`validate_merge_edge_endpoints`)
1629    /// and `merge` must be atomic — never partially applied. Before the fix this
1630    /// bailed only *inside* `merge`, after the durable WAL flush, leaving a ghost
1631    /// commit that made the database unopenable on replay.
1632    #[test]
1633    fn validate_merge_rejects_edge_to_tombstoned_endpoint() {
1634        let mut main = L0Buffer::new(0, None);
1635        let vid_a = Vid::new(1);
1636        let vid_b = Vid::new(2);
1637        main.insert_vertex(vid_a, HashMap::new());
1638        main.insert_vertex(vid_b, HashMap::new());
1639        main.delete_vertex(vid_b).unwrap(); // B is now tombstoned in main
1640
1641        // A transaction that inserts an edge A -> B (B tombstoned in main).
1642        let mut tx = L0Buffer::new(0, None);
1643        let eid = Eid::new(101);
1644        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1645            .unwrap();
1646
1647        assert!(
1648            main.validate_merge_edge_endpoints(&tx).is_err(),
1649            "edge to a tombstoned endpoint must be rejected before merge"
1650        );
1651        // merge validates first, so it errors and leaves main untouched (atomic).
1652        assert!(
1653            main.merge(&tx).is_err(),
1654            "merge must reject, not bail mid-apply"
1655        );
1656        assert!(
1657            !main.edge_endpoints.contains_key(&eid),
1658            "a rejected merge must not have partially applied the edge"
1659        );
1660    }
1661
1662    /// When the transaction re-inserts the endpoint vertex, the edge is valid
1663    /// (the insert clears the tombstone) and the merge succeeds.
1664    #[test]
1665    fn validate_merge_allows_edge_when_endpoint_reinserted() {
1666        let mut main = L0Buffer::new(0, None);
1667        let vid_a = Vid::new(1);
1668        let vid_b = Vid::new(2);
1669        main.insert_vertex(vid_a, HashMap::new());
1670        main.insert_vertex(vid_b, HashMap::new());
1671        main.delete_vertex(vid_b).unwrap();
1672
1673        let mut tx = L0Buffer::new(0, None);
1674        tx.insert_vertex(vid_b, HashMap::new()); // re-insert B
1675        let eid = Eid::new(101);
1676        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1677            .unwrap();
1678
1679        assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1680        assert!(main.merge(&tx).is_ok());
1681        assert!(main.edge_endpoints.contains_key(&eid));
1682    }
1683
1684    /// Edges between live endpoints merge as before — no false positives.
1685    #[test]
1686    fn validate_merge_allows_edge_to_live_endpoints() {
1687        let mut main = L0Buffer::new(0, None);
1688        let vid_a = Vid::new(1);
1689        let vid_b = Vid::new(2);
1690        main.insert_vertex(vid_a, HashMap::new());
1691        main.insert_vertex(vid_b, HashMap::new());
1692
1693        let mut tx = L0Buffer::new(0, None);
1694        let eid = Eid::new(101);
1695        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1696            .unwrap();
1697
1698        assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1699        assert!(main.merge(&tx).is_ok());
1700        assert!(main.edge_endpoints.contains_key(&eid));
1701    }
1702
1703    #[test]
1704    fn test_l0_buffer_multiple_edges() -> Result<()> {
1705        let mut l0 = L0Buffer::new(0, None);
1706        let vid_a = Vid::new(1);
1707        let vid_b = Vid::new(2);
1708        let vid_c = Vid::new(3);
1709        let eid_ab = Eid::new(101);
1710        let eid_ac = Eid::new(102);
1711
1712        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1713        l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
1714
1715        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1716        assert_eq!(neighbors.len(), 2);
1717
1718        // Delete one edge
1719        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1720
1721        // Should still have one neighbor
1722        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1723        assert_eq!(neighbors_after.len(), 1);
1724        assert_eq!(neighbors_after[0].0, vid_c);
1725
1726        Ok(())
1727    }
1728
1729    #[test]
1730    fn test_l0_buffer_edge_type_filter() -> Result<()> {
1731        let mut l0 = L0Buffer::new(0, None);
1732        let vid_a = Vid::new(1);
1733        let vid_b = Vid::new(2);
1734        let vid_c = Vid::new(3);
1735        let eid_ab = Eid::new(101);
1736        let eid_ac = Eid::new(201); // Different edge type
1737
1738        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1739        l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
1740
1741        // Filter by edge type 1
1742        let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1743        assert_eq!(type1_neighbors.len(), 1);
1744        assert_eq!(type1_neighbors[0].0, vid_b);
1745
1746        // Filter by edge type 2
1747        let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
1748        assert_eq!(type2_neighbors.len(), 1);
1749        assert_eq!(type2_neighbors[0].0, vid_c);
1750
1751        Ok(())
1752    }
1753
1754    #[test]
1755    fn test_l0_buffer_incoming_edges() -> Result<()> {
1756        let mut l0 = L0Buffer::new(0, None);
1757        let vid_a = Vid::new(1);
1758        let vid_b = Vid::new(2);
1759        let vid_c = Vid::new(3);
1760        let eid_ab = Eid::new(101);
1761        let eid_cb = Eid::new(102);
1762
1763        // a -> b and c -> b
1764        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1765        l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
1766
1767        // Check incoming edges to b
1768        let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
1769        assert_eq!(incoming.len(), 2);
1770
1771        Ok(())
1772    }
1773
1774    /// Regression test: merge should preserve edges without properties
1775    #[test]
1776    fn test_merge_empty_props_edge() -> Result<()> {
1777        let mut main_l0 = L0Buffer::new(0, None);
1778        let mut tx_l0 = L0Buffer::new(0, None);
1779
1780        let vid_a = Vid::new(1);
1781        let vid_b = Vid::new(2);
1782        let eid_ab = Eid::new(101);
1783
1784        // Insert edge with empty properties in transaction L0
1785        tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1786
1787        // Verify edge exists in tx_l0
1788        assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
1789        assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); // No properties entry
1790
1791        // Merge into main L0
1792        main_l0.merge(&tx_l0)?;
1793
1794        // Edge should exist in main L0 after merge
1795        assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
1796        let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1797        assert_eq!(neighbors.len(), 1);
1798        assert_eq!(neighbors[0].0, vid_b);
1799
1800        Ok(())
1801    }
1802
1803    /// Regression test: WAL replay should use CRDT merge semantics
1804    #[test]
1805    fn test_replay_crdt_merge() -> Result<()> {
1806        use crate::runtime::wal::Mutation;
1807        use serde_json::json;
1808        use uni_common::Value;
1809
1810        let mut l0 = L0Buffer::new(0, None);
1811        let vid = Vid::new(1);
1812
1813        // Create GCounter CRDT values using correct serde format:
1814        // {"t": "gc", "d": {"counts": {...}}}
1815        let counter1: Value = json!({
1816            "t": "gc",
1817            "d": {"counts": {"node1": 5}}
1818        })
1819        .into();
1820        let counter2: Value = json!({
1821            "t": "gc",
1822            "d": {"counts": {"node2": 3}}
1823        })
1824        .into();
1825
1826        // First mutation: insert vertex with counter1
1827        let mut props1 = HashMap::new();
1828        props1.insert("counter".to_string(), counter1.clone());
1829        l0.replay_mutations(vec![Mutation::InsertVertex {
1830            vid,
1831            properties: props1,
1832            labels: vec![],
1833        }])?;
1834
1835        // Second mutation: insert same vertex with counter2 (should merge)
1836        let mut props2 = HashMap::new();
1837        props2.insert("counter".to_string(), counter2.clone());
1838        l0.replay_mutations(vec![Mutation::InsertVertex {
1839            vid,
1840            properties: props2,
1841            labels: vec![],
1842        }])?;
1843
1844        // Verify CRDT was merged (both node1 and node2 counts present)
1845        let stored_props = l0.vertex_properties.get(&vid).unwrap();
1846        let stored_counter = stored_props.get("counter").unwrap();
1847
1848        // Convert back to serde_json::Value for nested access
1849        let stored_json: serde_json::Value = stored_counter.clone().into();
1850        // The merged counter should have both node1: 5 and node2: 3
1851        let data = stored_json.get("d").unwrap();
1852        let counts = data.get("counts").unwrap();
1853        assert_eq!(counts.get("node1"), Some(&json!(5)));
1854        assert_eq!(counts.get("node2"), Some(&json!(3)));
1855
1856        Ok(())
1857    }
1858
1859    #[test]
1860    fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1861        let mut l0_main = L0Buffer::new(0, None);
1862        let mut l0_tx = L0Buffer::new(0, None);
1863        let vid = Vid::new(1);
1864
1865        // Main buffer: insert vertex with timestamp T1
1866        let ts_main_created = 1000;
1867        let ts_main_updated = 1100;
1868        l0_main.insert_vertex(vid, HashMap::new());
1869        l0_main.vertex_created_at.insert(vid, ts_main_created);
1870        l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1871
1872        // Transaction buffer: update same vertex with timestamp T2 (later)
1873        let ts_tx_created = 2000; // should be ignored (main has older created_at)
1874        let ts_tx_updated = 2100; // should win (tx has newer updated_at)
1875        l0_tx.insert_vertex(vid, HashMap::new());
1876        l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1877        l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1878
1879        // Merge transaction into main
1880        l0_main.merge(&l0_tx)?;
1881
1882        // Verify created_at is oldest (from main)
1883        assert_eq!(
1884            *l0_main.vertex_created_at.get(&vid).unwrap(),
1885            ts_main_created,
1886            "created_at should preserve oldest timestamp"
1887        );
1888
1889        // Verify updated_at is latest (from tx)
1890        assert_eq!(
1891            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1892            ts_tx_updated,
1893            "updated_at should use latest timestamp"
1894        );
1895
1896        Ok(())
1897    }
1898
1899    #[test]
1900    fn test_merge_preserves_edge_timestamps() -> Result<()> {
1901        let mut l0_main = L0Buffer::new(0, None);
1902        let mut l0_tx = L0Buffer::new(0, None);
1903        let vid_a = Vid::new(1);
1904        let vid_b = Vid::new(2);
1905        let eid = Eid::new(100);
1906
1907        // Main buffer: insert edge with timestamp T1
1908        let ts_main_created = 1000;
1909        let ts_main_updated = 1100;
1910        l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1911        l0_main.edge_created_at.insert(eid, ts_main_created);
1912        l0_main.edge_updated_at.insert(eid, ts_main_updated);
1913
1914        // Transaction buffer: update same edge with timestamp T2 (later)
1915        let ts_tx_created = 2000; // should be ignored
1916        let ts_tx_updated = 2100; // should win
1917        l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1918        l0_tx.edge_created_at.insert(eid, ts_tx_created);
1919        l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1920
1921        // Merge transaction into main
1922        l0_main.merge(&l0_tx)?;
1923
1924        // Verify created_at is oldest (from main)
1925        assert_eq!(
1926            *l0_main.edge_created_at.get(&eid).unwrap(),
1927            ts_main_created,
1928            "edge created_at should preserve oldest timestamp"
1929        );
1930
1931        // Verify updated_at is latest (from tx)
1932        assert_eq!(
1933            *l0_main.edge_updated_at.get(&eid).unwrap(),
1934            ts_tx_updated,
1935            "edge updated_at should use latest timestamp"
1936        );
1937
1938        Ok(())
1939    }
1940
1941    #[test]
1942    fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1943        use uni_common::Value;
1944
1945        let mut l0_main = L0Buffer::new(0, None);
1946        let mut l0_tx = L0Buffer::new(0, None);
1947        let vid = Vid::new(1);
1948
1949        // Main buffer: vertex created at T1
1950        let ts_original = 1000;
1951        l0_main.insert_vertex(vid, HashMap::new());
1952        l0_main.vertex_created_at.insert(vid, ts_original);
1953        l0_main.vertex_updated_at.insert(vid, ts_original);
1954
1955        // Transaction buffer: update vertex (created_at would be T2 if set)
1956        let ts_tx = 2000;
1957        let mut props = HashMap::new();
1958        props.insert("updated".to_string(), Value::String("yes".to_string()));
1959        l0_tx.insert_vertex(vid, props);
1960        l0_tx.vertex_created_at.insert(vid, ts_tx);
1961        l0_tx.vertex_updated_at.insert(vid, ts_tx);
1962
1963        // Merge transaction into main
1964        l0_main.merge(&l0_tx)?;
1965
1966        // Verify created_at was NOT overwritten (still T1, not T2)
1967        assert_eq!(
1968            *l0_main.vertex_created_at.get(&vid).unwrap(),
1969            ts_original,
1970            "created_at must not be overwritten for existing vertex"
1971        );
1972
1973        // Verify updated_at WAS updated (now T2)
1974        assert_eq!(
1975            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1976            ts_tx,
1977            "updated_at should reflect transaction timestamp"
1978        );
1979
1980        // Verify properties were merged
1981        assert!(
1982            l0_main
1983                .vertex_properties
1984                .get(&vid)
1985                .unwrap()
1986                .contains_key("updated")
1987        );
1988
1989        Ok(())
1990    }
1991
1992    /// Test for Issue #23: Vertex labels preserved through replay_mutations
1993    #[test]
1994    fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1995        use crate::runtime::wal::Mutation;
1996
1997        let mut l0 = L0Buffer::new(0, None);
1998        let vid = Vid::new(42);
1999
2000        // Create InsertVertex mutation with labels
2001        let mutations = vec![Mutation::InsertVertex {
2002            vid,
2003            properties: {
2004                let mut props = HashMap::new();
2005                props.insert(
2006                    "name".to_string(),
2007                    uni_common::Value::String("Alice".to_string()),
2008                );
2009                props
2010            },
2011            labels: vec!["Person".to_string(), "User".to_string()],
2012        }];
2013
2014        // Replay mutations
2015        l0.replay_mutations(mutations)?;
2016
2017        // Verify vertex exists in L0
2018        assert!(l0.vertex_properties.contains_key(&vid));
2019
2020        // Verify labels are preserved
2021        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2022        assert_eq!(labels.len(), 2);
2023        assert!(labels.contains(&"Person".to_string()));
2024        assert!(labels.contains(&"User".to_string()));
2025
2026        // Verify vertex is findable by label
2027        let person_vids = l0.vids_for_label("Person");
2028        assert_eq!(person_vids.len(), 1);
2029        assert_eq!(person_vids[0], vid);
2030
2031        let user_vids = l0.vids_for_label("User");
2032        assert_eq!(user_vids.len(), 1);
2033        assert_eq!(user_vids[0], vid);
2034
2035        Ok(())
2036    }
2037
2038    /// Test for Issue #23: DeleteVertex labels preserved for tombstone flushing
2039    #[test]
2040    fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
2041        use crate::runtime::wal::Mutation;
2042
2043        let mut l0 = L0Buffer::new(0, None);
2044        let vid = Vid::new(99);
2045
2046        // First insert vertex with labels
2047        l0.insert_vertex_with_labels(
2048            vid,
2049            HashMap::new(),
2050            &["Person".to_string(), "Admin".to_string()],
2051        );
2052
2053        // Verify vertex and labels exist
2054        assert!(l0.vertex_properties.contains_key(&vid));
2055        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2056        assert_eq!(labels.len(), 2);
2057
2058        // Create DeleteVertex mutation with labels
2059        let mutations = vec![Mutation::DeleteVertex {
2060            vid,
2061            labels: vec!["Person".to_string(), "Admin".to_string()],
2062        }];
2063
2064        // Replay deletion
2065        l0.replay_mutations(mutations)?;
2066
2067        // Verify vertex is tombstoned
2068        assert!(l0.vertex_tombstones.contains(&vid));
2069
2070        // Verify labels are preserved in L0 (needed for Issue #76 tombstone flushing)
2071        // The labels should still be accessible for the flush logic to know which tables to update
2072        let labels = l0.get_vertex_labels(vid);
2073        assert!(
2074            labels.is_some(),
2075            "Labels should be preserved even after deletion for tombstone flushing"
2076        );
2077
2078        Ok(())
2079    }
2080
2081    /// Test for Issue #28: Edge type name preserved through replay_mutations
2082    #[test]
2083    fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
2084        use crate::runtime::wal::Mutation;
2085
2086        let mut l0 = L0Buffer::new(0, None);
2087        let src = Vid::new(1);
2088        let dst = Vid::new(2);
2089        let eid = Eid::new(500);
2090        let edge_type = 100;
2091
2092        // Create InsertEdge mutation with edge_type_name
2093        let mutations = vec![Mutation::InsertEdge {
2094            src_vid: src,
2095            dst_vid: dst,
2096            edge_type,
2097            eid,
2098            version: 1,
2099            properties: {
2100                let mut props = HashMap::new();
2101                props.insert("since".to_string(), uni_common::Value::Int(2020));
2102                props
2103            },
2104            edge_type_name: Some("KNOWS".to_string()),
2105        }];
2106
2107        // Replay mutations
2108        l0.replay_mutations(mutations)?;
2109
2110        // Verify edge exists in L0
2111        assert!(l0.edge_endpoints.contains_key(&eid));
2112
2113        // Verify edge type name is preserved
2114        let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
2115        assert_eq!(type_name, "KNOWS");
2116
2117        // Verify edge is findable by type name
2118        let knows_eids = l0.eids_for_type("KNOWS");
2119        assert_eq!(knows_eids.len(), 1);
2120        assert_eq!(knows_eids[0], eid);
2121
2122        Ok(())
2123    }
2124
2125    /// Test for Issue #28: Edge type mapping survives multiple replay cycles
2126    #[test]
2127    fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
2128        use crate::runtime::wal::Mutation;
2129
2130        let mut l0 = L0Buffer::new(0, None);
2131
2132        // Replay multiple edge insertions with different types
2133        let mutations = vec![
2134            Mutation::InsertEdge {
2135                src_vid: Vid::new(1),
2136                dst_vid: Vid::new(2),
2137                edge_type: 100,
2138                eid: Eid::new(1000),
2139                version: 1,
2140                properties: HashMap::new(),
2141                edge_type_name: Some("KNOWS".to_string()),
2142            },
2143            Mutation::InsertEdge {
2144                src_vid: Vid::new(2),
2145                dst_vid: Vid::new(3),
2146                edge_type: 101,
2147                eid: Eid::new(1001),
2148                version: 2,
2149                properties: HashMap::new(),
2150                edge_type_name: Some("LIKES".to_string()),
2151            },
2152            Mutation::InsertEdge {
2153                src_vid: Vid::new(3),
2154                dst_vid: Vid::new(1),
2155                edge_type: 100,
2156                eid: Eid::new(1002),
2157                version: 3,
2158                properties: HashMap::new(),
2159                edge_type_name: Some("KNOWS".to_string()),
2160            },
2161        ];
2162
2163        l0.replay_mutations(mutations)?;
2164
2165        // Verify all edge type mappings are preserved
2166        assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
2167        assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
2168        assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
2169
2170        // Verify edges can be queried by type
2171        let knows_edges = l0.eids_for_type("KNOWS");
2172        assert_eq!(knows_edges.len(), 2);
2173        assert!(knows_edges.contains(&Eid::new(1000)));
2174        assert!(knows_edges.contains(&Eid::new(1002)));
2175
2176        let likes_edges = l0.eids_for_type("LIKES");
2177        assert_eq!(likes_edges.len(), 1);
2178        assert_eq!(likes_edges[0], Eid::new(1001));
2179
2180        Ok(())
2181    }
2182
2183    /// Test for Issue #23 + #28: Combined vertex labels and edge types in replay
2184    #[test]
2185    fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
2186        use crate::runtime::wal::Mutation;
2187
2188        let mut l0 = L0Buffer::new(0, None);
2189        let alice = Vid::new(1);
2190        let bob = Vid::new(2);
2191        let eid = Eid::new(100);
2192
2193        // Simulate crash recovery scenario: replay full transaction log
2194        let mutations = vec![
2195            // Insert Alice with Person label
2196            Mutation::InsertVertex {
2197                vid: alice,
2198                properties: {
2199                    let mut props = HashMap::new();
2200                    props.insert(
2201                        "name".to_string(),
2202                        uni_common::Value::String("Alice".to_string()),
2203                    );
2204                    props
2205                },
2206                labels: vec!["Person".to_string()],
2207            },
2208            // Insert Bob with Person label
2209            Mutation::InsertVertex {
2210                vid: bob,
2211                properties: {
2212                    let mut props = HashMap::new();
2213                    props.insert(
2214                        "name".to_string(),
2215                        uni_common::Value::String("Bob".to_string()),
2216                    );
2217                    props
2218                },
2219                labels: vec!["Person".to_string()],
2220            },
2221            // Create KNOWS edge between them
2222            Mutation::InsertEdge {
2223                src_vid: alice,
2224                dst_vid: bob,
2225                edge_type: 1,
2226                eid,
2227                version: 3,
2228                properties: HashMap::new(),
2229                edge_type_name: Some("KNOWS".to_string()),
2230            },
2231        ];
2232
2233        // Replay all mutations
2234        l0.replay_mutations(mutations)?;
2235
2236        // Verify vertex labels preserved
2237        assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
2238        assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
2239        assert_eq!(l0.vids_for_label("Person").len(), 2);
2240
2241        // Verify edge type name preserved
2242        assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
2243        assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
2244
2245        // Verify graph structure
2246        let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
2247        assert_eq!(alice_neighbors.len(), 1);
2248        assert_eq!(alice_neighbors[0].0, bob);
2249
2250        Ok(())
2251    }
2252
2253    /// Test for Issue #23: Empty labels should deserialize correctly (backward compat)
2254    #[test]
2255    fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
2256        use crate::runtime::wal::Mutation;
2257
2258        let mut l0 = L0Buffer::new(0, None);
2259        let vid = Vid::new(1);
2260
2261        // Simulate old WAL format: InsertVertex with empty labels
2262        // (This tests #[serde(default)] behavior)
2263        let mutations = vec![Mutation::InsertVertex {
2264            vid,
2265            properties: HashMap::new(),
2266            labels: vec![], // Empty labels (old format compatibility)
2267        }];
2268
2269        l0.replay_mutations(mutations)?;
2270
2271        // Vertex should exist
2272        assert!(l0.vertex_properties.contains_key(&vid));
2273
2274        // Labels should be empty but entry should exist in vertex_labels
2275        let labels = l0.get_vertex_labels(vid);
2276        assert!(labels.is_some(), "Labels entry should exist even if empty");
2277        assert_eq!(labels.unwrap().len(), 0);
2278
2279        Ok(())
2280    }
2281
2282    #[test]
2283    fn test_now_nanos_returns_nanosecond_range() {
2284        // Test that now_nanos() returns a value in nanosecond range
2285        // As of 2025, Unix timestamp in nanoseconds should be > 1.7e18
2286        // (2025-01-01 is approximately 1,735,689,600 seconds = 1.735e18 nanoseconds)
2287        let now = now_nanos();
2288
2289        // Verify it's in nanosecond range (not microseconds which would be 1000x smaller)
2290        assert!(
2291            now > 1_700_000_000_000_000_000,
2292            "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
2293            now
2294        );
2295
2296        // Sanity check: should also be less than year 2100 in nanoseconds (4.1e18)
2297        assert!(
2298            now < 4_100_000_000_000_000_000,
2299            "now_nanos() returned {}, expected < 4.1e18",
2300            now
2301        );
2302    }
2303}