Skip to main content

uni_store/runtime/
l0.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15/// Items a read-write transaction observed during execution, used for SSI
16/// read-write antidependency detection at commit.
17///
18/// Shared (via `Arc<Mutex<_>>`) between the read path — which records reads
19/// through the transaction's `QueryContext` — and the commit path, which checks
20/// it against concurrently-committed write-sets. Item-level granularity;
21/// phantoms are out of scope (handled by the `FOR UPDATE` escape hatch).
22#[derive(Debug, Default)]
23pub struct OccReadSet {
24    /// Vertices the transaction read.
25    pub vertices: HashSet<Vid>,
26    /// Edges the transaction read.
27    pub edges: HashSet<Eid>,
28}
29
30impl OccReadSet {
31    /// `true` when nothing has been read yet. Used to decide whether a `FOR
32    /// UPDATE` acquisition may safely re-pin a still-fresh transaction.
33    pub fn is_empty(&self) -> bool {
34        self.vertices.is_empty() && self.edges.is_empty()
35    }
36}
37
38/// Returns the current timestamp in nanoseconds since Unix epoch.
39fn now_nanos() -> i64 {
40    SystemTime::now()
41        .duration_since(UNIX_EPOCH)
42        .map(|d| d.as_nanos() as i64)
43        .unwrap_or(0)
44}
45
46/// Returns the [`Crdt`] a property value encodes, or `None` if it is not one.
47///
48/// `Crdt` is `#[serde(tag = "t", content = "d")]`, so it deserializes only from a
49/// JSON object, and only `Value::Map(_)` produces one — gating on `Map` avoids
50/// allocating a JSON tree for large non-map values (e.g. embedding columns).
51///
52/// This is the single source of truth for "is this value CRDT-mergeable": both
53/// the commit-time merge ([`L0Buffer::merge_crdt_properties`]) and the OCC
54/// write-set carve-out ([`crate::runtime::occ::WriteSet::from_l0`]) consult it,
55/// so the carve-out can never exclude an item the merge would actually overwrite
56/// (which would silently lose an update).
57pub(crate) fn try_as_crdt(v: &Value) -> Option<Crdt> {
58    if !matches!(v, Value::Map(_)) {
59        return None;
60    }
61    serde_json::from_value::<Crdt>(v.clone().into()).ok()
62}
63
64/// Serialize a constraint key for O(1) uniqueness checks.
65/// Format: label + separator + sorted (prop_name, value) pairs.
66pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
67    let mut buf = label.as_bytes().to_vec();
68    buf.push(0); // separator
69    let mut sorted = key_values.to_vec();
70    sorted.sort_by(|a, b| a.0.cmp(&b.0));
71    for (k, v) in &sorted {
72        buf.extend(k.as_bytes());
73        buf.push(0);
74        // Use serde_json serialization for deterministic value encoding
75        buf.extend(serde_json::to_vec(v).unwrap_or_default());
76        buf.push(0);
77    }
78    buf
79}
80
81/// Per-type mutation counters accumulated by the L0 buffer.
82///
83/// Used to provide detailed mutation statistics (e.g., `nodes_created`,
84/// `relationships_deleted`) on `ExecuteResult`. Callers snapshot before/after
85/// execution and call [`diff()`](MutationStats::diff) to get the delta.
86#[derive(Debug, Clone, Default)]
87pub struct MutationStats {
88    pub nodes_created: usize,
89    pub nodes_deleted: usize,
90    pub relationships_created: usize,
91    pub relationships_deleted: usize,
92    pub properties_set: usize,
93    pub properties_removed: usize,
94    pub labels_added: usize,
95    pub labels_removed: usize,
96}
97
98impl MutationStats {
99    /// Compute the field-wise difference `self - before`.
100    pub fn diff(&self, before: &Self) -> Self {
101        Self {
102            nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
103            nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
104            relationships_created: self
105                .relationships_created
106                .saturating_sub(before.relationships_created),
107            relationships_deleted: self
108                .relationships_deleted
109                .saturating_sub(before.relationships_deleted),
110            properties_set: self.properties_set.saturating_sub(before.properties_set),
111            properties_removed: self
112                .properties_removed
113                .saturating_sub(before.properties_removed),
114            labels_added: self.labels_added.saturating_sub(before.labels_added),
115            labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
116        }
117    }
118}
119
120#[derive(Clone, Debug)]
121pub struct TombstoneEntry {
122    pub eid: Eid,
123    pub src_vid: Vid,
124    pub dst_vid: Vid,
125    pub edge_type: u32,
126}
127
128pub struct L0Buffer {
129    /// Graph topology using simple adjacency lists
130    pub graph: SimpleGraph,
131    /// Soft-deleted edges (tombstones for LSM-style merging)
132    pub tombstones: HashMap<Eid, TombstoneEntry>,
133    /// Soft-deleted vertices
134    pub vertex_tombstones: HashSet<Vid>,
135    /// Edge version tracking for MVCC
136    pub edge_versions: HashMap<Eid, u64>,
137    /// Vertex version tracking for MVCC
138    pub vertex_versions: HashMap<Vid, u64>,
139    /// Edge properties (stored separately from topology)
140    pub edge_properties: HashMap<Eid, Properties>,
141    /// Vertex properties (stored separately from topology)
142    pub vertex_properties: HashMap<Vid, Properties>,
143    /// Edge endpoint lookup: eid -> (src, dst, type)
144    pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
145    /// Vertex labels (VID -> list of label names)
146    /// New in storage design: vertices can have multiple labels
147    pub vertex_labels: HashMap<Vid, Vec<String>>,
148    /// Reverse index: label name → set of VIDs with that label. Maintained
149    /// alongside `vertex_labels` for O(1) label-based vertex lookups.
150    pub label_to_vids: HashMap<String, HashSet<Vid>>,
151    /// Vids whose FULL label set was explicitly replaced by a label mutation
152    /// (`SET n:Label` / `REMOVE n:Label`) in this buffer, via
153    /// [`L0Buffer::set_vertex_labels`]. Distinguishes a deliberate label
154    /// replacement from the empty `vertex_labels` entry a property-only write
155    /// incidentally creates (`entry().or_default()`), so `merge` knows to REPLACE
156    /// (not append) these vids' labels and `WriteSet::from_l0` knows they are
157    /// conflictable writes. A transaction-buffer concept; empty on main L0.
158    pub vertex_label_overwrites: HashSet<Vid>,
159    /// Edge types (EID -> type name)
160    pub edge_types: HashMap<Eid, String>,
161    /// Current version counter
162    pub current_version: u64,
163    /// Mutation count for flush decisions
164    pub mutation_count: usize,
165    /// Per-type mutation counters for detailed statistics.
166    pub mutation_stats: MutationStats,
167    /// Write-ahead log for durability
168    pub wal: Option<Arc<WriteAheadLog>>,
169    /// WAL LSN at the time this L0 was rotated for flush.
170    /// Used to ensure WAL truncation doesn't remove entries needed by pending flushes.
171    pub wal_lsn_at_flush: u64,
172    /// Vertex creation timestamps (nanoseconds since epoch)
173    pub vertex_created_at: HashMap<Vid, i64>,
174    /// Vertex update timestamps (nanoseconds since epoch)
175    pub vertex_updated_at: HashMap<Vid, i64>,
176    /// Edge creation timestamps (nanoseconds since epoch)
177    pub edge_created_at: HashMap<Eid, i64>,
178    /// Edge update timestamps (nanoseconds since epoch)
179    pub edge_updated_at: HashMap<Eid, i64>,
180    /// Estimated size in bytes for memory limit enforcement.
181    /// Incremented O(1) on each mutation to avoid O(V+E) size_bytes() calls.
182    pub estimated_size: usize,
183    /// Per-constraint index for O(1) unique key checks.
184    /// Key: constraint composite key (label + sorted property values serialized).
185    /// Value: Vid that owns this key.
186    pub constraint_index: HashMap<Vec<u8>, Vid>,
187    /// Implicit MERGE-key guard for phantom-free `MERGE` *without* a declared
188    /// `UNIQUE` constraint. Same key format as `constraint_index` (built by
189    /// [`serialize_constraint_key`]), but populated only by a `MERGE` that
190    /// *creates* a node, and re-probed at commit only against other concurrent
191    /// `MERGE`-creates — so two concurrent `MERGE`s of the same key converge to
192    /// one node (the loser aborts retriably) instead of silently duplicating,
193    /// while a plain `CREATE` of the same properties is unaffected (it never
194    /// registers a key). Tombstoned with the owning vid; transient (not rebuilt
195    /// on recovery).
196    pub merge_guard_index: HashMap<Vec<u8>, Vid>,
197    /// Reverse index `ext_id` → owning vid for O(1) global ext_id uniqueness
198    /// checks (`Writer::check_extid_globally_unique` previously scanned every
199    /// `vertex_properties` map per insert — O(n²) ingest). Maintained by the
200    /// vertex insert impls (synced to the post-CRDT-merge value) and by
201    /// `apply_vertex_deletion`, so merge and WAL replay keep it consistent
202    /// for free.
203    pub extid_index: HashMap<String, Vid>,
204    /// Per-VID set of property keys that should land via Lance MergeInsert
205    /// (partial-column update) at flush time. Populated by
206    /// `insert_vertex_partial`; cleared by full-row inserts and deletes.
207    /// A VID present here at flush time is emitted to the partial batch;
208    /// absent VIDs flush via the existing full-row Append.
209    pub vertex_partial_keys: HashMap<Vid, HashSet<String>>,
210    /// Edge analog of `vertex_partial_keys` (Round 12 §A). Populated by
211    /// `insert_edge_partial_full`; cleared by full-row inserts and edge
212    /// deletes. Per-edge-type delta-table flush honors these by emitting
213    /// a `MergeInsertBuilder` source with only the touched schema
214    /// columns plus `eid`, `op`, `_version`, `_updated_at`, and
215    /// `overflow_json` (when an overflow prop was touched).
216    pub edge_partial_keys: HashMap<Eid, HashSet<String>>,
217    /// Phase B (UniConfig::defer_embeddings): VIDs whose auto-embedding
218    /// was skipped at insert time and is owed at flush. Value = primary
219    /// label name (the rest of the embedding config is looked up from the
220    /// schema at flush time). Drained by `flush_stream_l1` before column
221    /// extraction; entries are removed when the embedding lands in the
222    /// vertex's L0 property map.
223    pub pending_embeddings: HashMap<Vid, String>,
224    /// Optimistic-concurrency read sequence (SSI). Stamped on a transaction's
225    /// private L0 at creation with the Writer's commit-sequence at that moment,
226    /// and consulted at commit to detect intervening conflicting commits. `0`
227    /// for the main L0 and when SSI is disabled.
228    pub occ_read_seq: u64,
229    /// Optimistic-concurrency read-set (SSI). `Some` on a read-write
230    /// transaction's private L0 when SSI tracking is active; the read path
231    /// records observed ids here and commit checks them for antidependencies.
232    /// `None` for the main L0 and read-only / SSI-disabled paths.
233    pub occ_read_set: Option<Arc<parking_lot::Mutex<OccReadSet>>>,
234}
235
236impl std::fmt::Debug for L0Buffer {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        f.debug_struct("L0Buffer")
239            .field("vertex_count", &self.graph.vertex_count())
240            .field("edge_count", &self.graph.edge_count())
241            .field("tombstones", &self.tombstones.len())
242            .field("vertex_tombstones", &self.vertex_tombstones.len())
243            .field("current_version", &self.current_version)
244            .field("mutation_count", &self.mutation_count)
245            .finish()
246    }
247}
248
249impl Clone for L0Buffer {
250    /// Clone the L0 buffer for fork/restore (ASSUME/ABDUCE).
251    ///
252    /// The cloned buffer does NOT share the WAL reference — forked L0s are
253    /// ephemeral and should not write to the WAL.
254    fn clone(&self) -> Self {
255        Self {
256            graph: self.graph.clone(),
257            tombstones: self.tombstones.clone(),
258            vertex_tombstones: self.vertex_tombstones.clone(),
259            edge_versions: self.edge_versions.clone(),
260            vertex_versions: self.vertex_versions.clone(),
261            edge_properties: self.edge_properties.clone(),
262            vertex_properties: self.vertex_properties.clone(),
263            edge_endpoints: self.edge_endpoints.clone(),
264            vertex_labels: self.vertex_labels.clone(),
265            label_to_vids: self.label_to_vids.clone(),
266            vertex_label_overwrites: self.vertex_label_overwrites.clone(),
267            edge_types: self.edge_types.clone(),
268            current_version: self.current_version,
269            mutation_count: self.mutation_count,
270            mutation_stats: self.mutation_stats.clone(),
271            wal: None, // Forked L0s don't share the WAL
272            wal_lsn_at_flush: self.wal_lsn_at_flush,
273            vertex_created_at: self.vertex_created_at.clone(),
274            vertex_updated_at: self.vertex_updated_at.clone(),
275            edge_created_at: self.edge_created_at.clone(),
276            edge_updated_at: self.edge_updated_at.clone(),
277            estimated_size: self.estimated_size,
278            constraint_index: self.constraint_index.clone(),
279            merge_guard_index: self.merge_guard_index.clone(),
280            extid_index: self.extid_index.clone(),
281            vertex_partial_keys: self.vertex_partial_keys.clone(),
282            edge_partial_keys: self.edge_partial_keys.clone(),
283            pending_embeddings: self.pending_embeddings.clone(),
284            occ_read_seq: self.occ_read_seq,
285            // Forked L0s (ASSUME/ABDUCE) do not participate in OCC tracking.
286            occ_read_set: None,
287        }
288    }
289}
290
291impl L0Buffer {
292    /// Append labels to a vec, skipping duplicates.
293    fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
294        for label in labels {
295            if !existing.contains(label) {
296                existing.push(label.clone());
297            }
298        }
299    }
300
301    /// Add a VID to the reverse label index for each of the given labels.
302    fn index_labels_for_vid(&mut self, vid: Vid, labels: &[String]) {
303        for label in labels {
304            self.label_to_vids
305                .entry(label.clone())
306                .or_default()
307                .insert(vid);
308        }
309    }
310
311    /// Read a vertex's current string `ext_id` from a property map.
312    fn extid_of(props: &Properties) -> Option<String> {
313        props
314            .get("ext_id")
315            .and_then(|v| v.as_str())
316            .map(str::to_owned)
317    }
318
319    /// Sync `extid_index` for `vid` around a property write.
320    ///
321    /// `old` / `new` are the vertex's ext_id before / after the CRDT merge —
322    /// the index always reflects the post-merge value, matching what the old
323    /// full scan in `Writer::check_extid_globally_unique` observed. Only
324    /// called when the incoming properties contain an `ext_id` key (the merge
325    /// cannot change the value otherwise).
326    fn sync_extid_index(&mut self, vid: Vid, old: Option<String>, new: Option<String>) {
327        if old == new {
328            return;
329        }
330        if let Some(old) = old
331            && self.extid_index.get(&old) == Some(&vid)
332        {
333            self.extid_index.remove(&old);
334        }
335        if let Some(new) = new {
336            self.extid_index.insert(new, vid);
337        }
338    }
339
340    /// Remove a VID from all label entries in the reverse index.
341    fn remove_vid_from_label_index(&mut self, vid: Vid) {
342        if let Some(labels) = self.vertex_labels.get(&vid) {
343            for label in labels {
344                if let Some(set) = self.label_to_vids.get_mut(label) {
345                    set.remove(&vid);
346                }
347            }
348        }
349    }
350
351    /// Replaces a vertex's FULL label set — the semantics of `SET n:Label` /
352    /// `REMOVE n:Label`, which resolve the new complete set before writing.
353    ///
354    /// Unlike [`add_vertex_labels`](Self::add_vertex_labels) (append), this clears
355    /// the vid's existing labels from the reverse index, sets the new set, and
356    /// re-indexes — so a removal actually removes. It marks the vid in
357    /// `vertex_label_overwrites` so `merge` REPLACES (not appends) these labels at
358    /// commit and `WriteSet::from_l0` treats the change as a conflictable write.
359    /// Increments `mutation_count` (a label change is a real mutation; its sibling
360    /// `remove_vertex_label` already does so).
361    pub fn set_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
362        self.remove_vid_from_label_index(vid);
363        self.vertex_labels.insert(vid, labels.to_vec());
364        self.index_labels_for_vid(vid, labels);
365        self.vertex_label_overwrites.insert(vid);
366        self.current_version += 1;
367        self.mutation_count += 1;
368    }
369
370    /// Merge CRDT properties into an existing property map.
371    /// Attempts CRDT merge if both values are valid CRDTs, falls back to overwrite.
372    ///
373    /// When the entry is empty (new vertex insert), skips the expensive JSON
374    /// round-trip and directly assigns the properties.
375    ///
376    /// Logs a warning when a CRDT value is overwritten by a non-CRDT scalar
377    /// (limitation R1).
378    fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
379        // Fast path: new vertex with no existing properties — skip JSON round-trip
380        if entry.is_empty() {
381            *entry = properties;
382            return;
383        }
384
385        for (k, v) in properties {
386            // `try_as_crdt` performs the Map-gated CRDT probe (see its docs for the
387            // wide-row perf rationale). Sharing it with `WriteSet::from_l0` keeps the
388            // OCC carve-out consistent with this merge-versus-overwrite decision.
389            if let Some(mut new_crdt) = try_as_crdt(&v)
390                && let Some(existing_v) = entry.get(&k)
391                && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
392            {
393                // Use try_merge to avoid panic on type mismatch.
394                if new_crdt.try_merge(&existing_crdt).is_ok()
395                    && let Ok(merged_json) = serde_json::to_value(new_crdt)
396                {
397                    entry.insert(k, uni_common::Value::from(merged_json));
398                    continue;
399                }
400                // CRDT variant mismatch (or a failed re-serialize): fall through to
401                // a last-writer-wins overwrite, discarding the existing CRDT's
402                // merged state. The OCC commit-time carve-out check
403                // (`occ::crdt_carveout_overwrite`) aborts a *concurrent* writer
404                // before reaching here, and write-time schema enforcement rejects a
405                // wrong declared variant; this warns on any residual (e.g. a
406                // single-writer variant change) so the discarded state is visible.
407                tracing::warn!(
408                    property = %k,
409                    existing_variant = existing_crdt.type_name(),
410                    "overwriting CRDT property with a different CRDT variant \
411                     (last-writer-wins); merged CRDT state is discarded"
412                );
413            } else if try_as_crdt(&v).is_none()
414                && entry.get(&k).is_some_and(|e| try_as_crdt(e).is_some())
415            {
416                // R1: an existing CRDT value is overwritten by a non-CRDT scalar
417                // (last-writer-wins), silently discarding the CRDT's merged state
418                // — a property written as BOTH a CRDT and last-writer-wins. The OCC
419                // write-set carve-out lets CRDT-only writers commit without
420                // conflicting, so a concurrent LWW write on the same property
421                // cannot be flagged as a conflict; surface it here instead.
422                tracing::warn!(
423                    property = %k,
424                    "overwriting CRDT property with non-CRDT value (last-writer-wins); \
425                     merged CRDT state is discarded"
426                );
427            }
428            // Fallback: Overwrite (last-writer-wins).
429            entry.insert(k, v);
430        }
431    }
432
433    /// Helper function to estimate property map size in bytes.
434    fn estimate_properties_size(props: &Properties) -> usize {
435        props.keys().map(|k| k.len() + 32).sum()
436    }
437
438    /// Returns an estimate of the buffer size in bytes.
439    /// Includes all fields for accurate memory accounting.
440    pub fn size_bytes(&self) -> usize {
441        let mut total = 0;
442
443        // Topology
444        total += self.graph.vertex_count() * 8;
445        total += self.graph.edge_count() * 24;
446
447        // Properties (rough estimate: key string + 32 bytes for value)
448        for props in self.vertex_properties.values() {
449            total += Self::estimate_properties_size(props);
450        }
451        for props in self.edge_properties.values() {
452            total += Self::estimate_properties_size(props);
453        }
454
455        // Metadata
456        total += self.tombstones.len() * 64;
457        total += self.vertex_tombstones.len() * 8;
458        total += self.edge_versions.len() * 16;
459        total += self.vertex_versions.len() * 16;
460        total += self.edge_endpoints.len() * 28; // (Vid, Vid, u32) = 8+8+4 + overhead
461
462        // Vertex labels
463        for labels in self.vertex_labels.values() {
464            total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
465        }
466
467        // Reverse label index (label_to_vids)
468        for (label, vids) in &self.label_to_vids {
469            total += label.len() + 24 + vids.len() * 8 + 48; // string + HashSet overhead
470        }
471
472        // Edge types
473        for type_name in self.edge_types.values() {
474            total += type_name.len() + 24;
475        }
476
477        // Timestamps (4 maps, each entry is 16 bytes: 8-byte key + 8-byte i64 value)
478        total += self.vertex_created_at.len() * 16;
479        total += self.vertex_updated_at.len() * 16;
480        total += self.edge_created_at.len() * 16;
481        total += self.edge_updated_at.len() * 16;
482
483        total
484    }
485
486    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
487        Self {
488            graph: SimpleGraph::new(),
489            tombstones: HashMap::new(),
490            vertex_tombstones: HashSet::new(),
491            edge_versions: HashMap::new(),
492            vertex_versions: HashMap::new(),
493            edge_properties: HashMap::new(),
494            vertex_properties: HashMap::new(),
495            edge_endpoints: HashMap::new(),
496            vertex_labels: HashMap::new(),
497            label_to_vids: HashMap::new(),
498            vertex_label_overwrites: HashSet::new(),
499            edge_types: HashMap::new(),
500            current_version: start_version,
501            mutation_count: 0,
502            mutation_stats: MutationStats::default(),
503            wal,
504            wal_lsn_at_flush: 0,
505            vertex_created_at: HashMap::new(),
506            vertex_updated_at: HashMap::new(),
507            edge_created_at: HashMap::new(),
508            edge_updated_at: HashMap::new(),
509            estimated_size: 0,
510            constraint_index: HashMap::new(),
511            merge_guard_index: HashMap::new(),
512            extid_index: HashMap::new(),
513            vertex_partial_keys: HashMap::new(),
514            edge_partial_keys: HashMap::new(),
515            pending_embeddings: HashMap::new(),
516            occ_read_seq: 0,
517            occ_read_set: None,
518        }
519    }
520
521    pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
522        self.insert_vertex_with_labels(vid, properties, &[]);
523    }
524
525    /// Insert a vertex with associated labels.
526    pub fn insert_vertex_with_labels(
527        &mut self,
528        vid: Vid,
529        properties: Properties,
530        labels: &[String],
531    ) {
532        self.insert_vertex_with_labels_impl(vid, properties, labels, false);
533    }
534
535    /// Core vertex insertion. When `skip_wal` is true, skips WAL append
536    /// (used during merge where the caller already wrote to WAL).
537    fn insert_vertex_with_labels_impl(
538        &mut self,
539        vid: Vid,
540        properties: Properties,
541        labels: &[String],
542        skip_wal: bool,
543    ) {
544        self.current_version += 1;
545        let version = self.current_version;
546        let now = now_nanos();
547
548        if !skip_wal && let Some(wal) = &self.wal {
549            let _ = wal.append(Mutation::InsertVertex {
550                vid,
551                properties: properties.clone(),
552                labels: labels.to_vec(),
553            });
554        }
555
556        self.vertex_tombstones.remove(&vid);
557
558        // Full-row insert supersedes any pending partial-update state for
559        // this VID.
560        self.vertex_partial_keys.remove(&vid);
561
562        // Size/count computed up front so `properties` can be moved into the
563        // CRDT merge below instead of deep-cloned.
564        let props_size = Self::estimate_properties_size(&properties);
565        let props_count = properties.len();
566        let tracks_extid = properties.contains_key("ext_id");
567
568        let entry = self.vertex_properties.entry(vid).or_default();
569        let old_extid = if tracks_extid {
570            Self::extid_of(entry)
571        } else {
572            None
573        };
574        Self::merge_crdt_properties(entry, properties);
575        if tracks_extid {
576            let new_extid =
577                Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
578            self.sync_extid_index(vid, old_extid, new_extid);
579        }
580        self.vertex_versions.insert(vid, version);
581
582        // Set timestamps - created_at only set if this is a new vertex
583        self.vertex_created_at.entry(vid).or_insert(now);
584        self.vertex_updated_at.insert(vid, now);
585
586        // Track labels — always create an entry so unlabeled vertices are
587        // distinguishable from "not in L0" when queried via get_vertex_labels.
588        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
589        let existing = self.vertex_labels.entry(vid).or_default();
590        Self::append_unique_labels(existing, labels);
591        self.index_labels_for_vid(vid, labels);
592
593        self.graph.add_vertex(vid);
594        self.mutation_count += 1;
595        self.mutation_stats.nodes_created += 1;
596        self.mutation_stats.properties_set += props_count;
597        self.mutation_stats.labels_added += labels.len();
598
599        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
600    }
601
602    /// Insert a vertex's FULL property row, tagging `touched_keys` so the
603    /// flush emits exactly those columns via Lance `MergeInsertBuilder`
604    /// instead of a full-row Append.
605    ///
606    /// `props` MUST be the fully-merged property map (storage union
607    /// in-flight L0 union the new touched values, per
608    /// `PropertyManager::get_all_vertex_props_with_ctx`). The caller is
609    /// responsible for the union; L0 here just stores it so scans see
610    /// the complete row without per-key reconciliation.
611    ///
612    /// `touched_keys` lists the property keys this SET statement
613    /// actually assigned — the union of those across all coalesced
614    /// SetItems on this VID. Lance MergeInsert sends a source batch
615    /// with `_vid`, `_deleted`, `_version`, `_updated_at`, and those
616    /// touched columns; non-touched columns retain their pre-merge
617    /// values on the Lance side, skipping the wide-row write.
618    ///
619    /// A subsequent full-row `insert_vertex_with_labels` or
620    /// `delete_vertex` on the same VID clears the partial-keys entry
621    /// so partial state never outlives a stronger write.
622    pub fn insert_vertex_partial_full(
623        &mut self,
624        vid: Vid,
625        props: Properties,
626        touched_keys: HashSet<String>,
627        labels: &[String],
628    ) {
629        // Stage the full row through the existing partial-impl (same
630        // CRDT merge / version bump / timestamps), preserving the
631        // partial-keys entry so we can extend it below.
632        self.insert_vertex_with_labels_partial_impl(vid, props, labels, false);
633        self.vertex_partial_keys
634            .entry(vid)
635            .or_default()
636            .extend(touched_keys);
637    }
638
639    /// Legacy partial-only variant used by some uni-store paths. Kept
640    /// for source-compatibility but new uni-query callers should use
641    /// `insert_vertex_partial_full` to preserve scan-side L0 visibility.
642    pub fn insert_vertex_partial(&mut self, vid: Vid, touched: Properties, labels: &[String]) {
643        // Record dirty keys BEFORE the full-row impl runs (which would
644        // clear them). The keys come from the touched set; the values
645        // are merged into L0 by the shared CRDT path below.
646        let touched_keys: Vec<String> = touched.keys().cloned().collect();
647
648        // If the VID already has a full-row pending insert (e.g., CREATE
649        // earlier in the same tx), we must NOT downgrade it to partial.
650        // Detected by: VID is in vertex_properties WITH a version stamp
651        // AND not currently in vertex_partial_keys → it was written as
652        // a full row recently. The conservative rule: only enable the
653        // partial path when there's no full-row pending insert. We
654        // approximate "no full-row pending" by checking that the VID's
655        // current entry in vertex_partial_keys is non-empty OR the VID
656        // is not in vertex_properties (fresh row, but caller asked
657        // partial — let it through and the post-flush union covers it).
658        let already_full = self.vertex_properties.contains_key(&vid)
659            && !self.vertex_partial_keys.contains_key(&vid);
660
661        // Stage the CRDT merge through the existing path. We bypass the
662        // full-row `insert_vertex_with_labels_impl` clearing of
663        // partial_keys by inlining the work, then restoring/extending
664        // the partial-key set.
665        self.insert_vertex_with_labels_partial_impl(vid, touched, labels, false);
666
667        if !already_full {
668            self.vertex_partial_keys
669                .entry(vid)
670                .or_default()
671                .extend(touched_keys);
672        }
673    }
674
675    /// Core partial-insert: same as `insert_vertex_with_labels_impl` but
676    /// preserves any existing `vertex_partial_keys[vid]` entry so the
677    /// caller can extend it after the merge.
678    fn insert_vertex_with_labels_partial_impl(
679        &mut self,
680        vid: Vid,
681        properties: Properties,
682        labels: &[String],
683        skip_wal: bool,
684    ) {
685        self.current_version += 1;
686        let version = self.current_version;
687        let now = now_nanos();
688
689        if !skip_wal && let Some(wal) = &self.wal {
690            // WAL records the partial as a full-row InsertVertex; on replay
691            // the full-row path runs (which clears partial_keys). This is
692            // semantically correct — L0 in memory always holds the union of
693            // partial deltas via merge_crdt_properties; recovery doesn't
694            // need to preserve partial-vs-full distinction.
695            let _ = wal.append(Mutation::InsertVertex {
696                vid,
697                properties: properties.clone(),
698                labels: labels.to_vec(),
699            });
700        }
701
702        self.vertex_tombstones.remove(&vid);
703        // NOTE: deliberately DOES NOT remove from vertex_partial_keys.
704        // The caller (`insert_vertex_partial`) extends that set after.
705
706        // Size/count computed up front so `properties` can be moved into the
707        // CRDT merge below instead of deep-cloned.
708        let props_size = Self::estimate_properties_size(&properties);
709        let props_count = properties.len();
710        let tracks_extid = properties.contains_key("ext_id");
711
712        let entry = self.vertex_properties.entry(vid).or_default();
713        let old_extid = if tracks_extid {
714            Self::extid_of(entry)
715        } else {
716            None
717        };
718        Self::merge_crdt_properties(entry, properties);
719        if tracks_extid {
720            let new_extid =
721                Self::extid_of(self.vertex_properties.get(&vid).expect("just inserted"));
722            self.sync_extid_index(vid, old_extid, new_extid);
723        }
724        self.vertex_versions.insert(vid, version);
725
726        self.vertex_created_at.entry(vid).or_insert(now);
727        self.vertex_updated_at.insert(vid, now);
728
729        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
730        let existing = self.vertex_labels.entry(vid).or_default();
731        Self::append_unique_labels(existing, labels);
732        self.index_labels_for_vid(vid, labels);
733
734        self.graph.add_vertex(vid);
735        self.mutation_count += 1;
736        // Partial writes don't create new nodes — they update existing ones.
737        // But counting under properties_set is correct.
738        self.mutation_stats.properties_set += props_count;
739        self.mutation_stats.labels_added += labels.len();
740
741        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
742    }
743
744    /// Add labels to an existing vertex.
745    pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
746        let existing = self.vertex_labels.entry(vid).or_default();
747        Self::append_unique_labels(existing, labels);
748        self.index_labels_for_vid(vid, labels);
749    }
750
751    /// Remove a label from an existing vertex.
752    /// Returns true if the label was found and removed, false otherwise.
753    pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
754        if let Some(labels) = self.vertex_labels.get_mut(&vid)
755            && let Some(pos) = labels.iter().position(|l| l == label)
756        {
757            labels.remove(pos);
758            if let Some(set) = self.label_to_vids.get_mut(label) {
759                set.remove(&vid);
760            }
761            self.current_version += 1;
762            self.mutation_count += 1;
763            self.mutation_stats.labels_removed += 1;
764            // Note: WAL logging for label mutations not yet implemented
765            // Currently consistent with add_vertex_labels behavior
766            return true;
767        }
768        false
769    }
770
771    /// Set the type for an edge.
772    pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
773        self.edge_types.insert(eid, edge_type);
774    }
775
776    pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
777        self.delete_vertex_impl(vid, false)
778    }
779
780    /// Core vertex deletion. When `skip_wal` is true, skips WAL append
781    /// (used during merge where the caller already wrote to WAL).
782    fn delete_vertex_impl(&mut self, vid: Vid, skip_wal: bool) -> Result<()> {
783        self.current_version += 1;
784
785        if !skip_wal && let Some(wal) = &mut self.wal {
786            let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
787            wal.append(Mutation::DeleteVertex { vid, labels })?;
788        }
789
790        self.apply_vertex_deletion(vid);
791        Ok(())
792    }
793
794    /// Cascade-delete a vertex: tombstone all connected edges and remove the vertex.
795    ///
796    /// Shared between `delete_vertex` (live mutations) and `replay_mutations` (WAL recovery).
797    fn apply_vertex_deletion(&mut self, vid: Vid) {
798        let version = self.current_version;
799
800        // Collect edges to delete using O(degree) neighbors() instead of O(E) scan
801        let mut edges_to_remove = HashSet::new();
802
803        // Collect outgoing edges
804        for entry in self.graph.neighbors(vid, Direction::Outgoing) {
805            edges_to_remove.insert(entry.eid);
806        }
807
808        // Collect incoming edges
809        for entry in self.graph.neighbors(vid, Direction::Incoming) {
810            edges_to_remove.insert(entry.eid); // HashSet handles self-loop deduplication
811        }
812
813        let cascaded_edges_count = edges_to_remove.len();
814
815        // Tombstone and remove all collected edges
816        for eid in edges_to_remove {
817            // Retrieve edge endpoints from the map to create tombstone
818            if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
819                self.tombstones.insert(
820                    eid,
821                    TombstoneEntry {
822                        eid,
823                        src_vid: *src,
824                        dst_vid: *dst,
825                        edge_type: *etype,
826                    },
827                );
828                self.edge_versions.insert(eid, version);
829                self.edge_endpoints.remove(&eid);
830                self.edge_properties.remove(&eid);
831                self.graph.remove_edge(eid);
832                self.mutation_count += 1;
833                self.mutation_stats.relationships_deleted += 1;
834            }
835        }
836
837        self.remove_vid_from_label_index(vid);
838        self.vertex_tombstones.insert(vid);
839        // Drop the vid's ext_id index entry (O(1) via its current property
840        // value, read before the property map entry is removed below).
841        if let Some(props) = self.vertex_properties.get(&vid)
842            && let Some(ext) = Self::extid_of(props)
843            && self.extid_index.get(&ext) == Some(&vid)
844        {
845            self.extid_index.remove(&ext);
846        }
847        self.vertex_properties.remove(&vid);
848        // Deletion supersedes any pending partial-update state.
849        self.vertex_partial_keys.remove(&vid);
850        self.vertex_versions.insert(vid, version);
851        self.graph.remove_vertex(vid);
852        self.mutation_count += 1;
853        self.mutation_stats.nodes_deleted += 1;
854
855        // Remove constraint index entries for this vertex
856        self.constraint_index.retain(|_, v| *v != vid);
857        // Same for the implicit MERGE guard, so a later re-MERGE of a deleted
858        // node's key does not false-conflict with the stale entry.
859        self.merge_guard_index.retain(|_, v| *v != vid);
860
861        // 64 bytes per edge tombstone + 8 for vertex tombstone
862        self.estimated_size += cascaded_edges_count * 72 + 8;
863    }
864
865    pub fn insert_edge(
866        &mut self,
867        src_vid: Vid,
868        dst_vid: Vid,
869        edge_type: u32,
870        eid: Eid,
871        properties: Properties,
872        edge_type_name: Option<String>,
873    ) -> Result<()> {
874        self.insert_edge_impl(
875            src_vid,
876            dst_vid,
877            edge_type,
878            eid,
879            properties,
880            edge_type_name,
881            false,
882        )
883    }
884
885    /// Core edge insertion. When `skip_wal` is true, skips WAL append
886    /// (used during merge where the caller already wrote to WAL).
887    #[allow(clippy::too_many_arguments)]
888    fn insert_edge_impl(
889        &mut self,
890        src_vid: Vid,
891        dst_vid: Vid,
892        edge_type: u32,
893        eid: Eid,
894        properties: Properties,
895        edge_type_name: Option<String>,
896        skip_wal: bool,
897    ) -> Result<()> {
898        self.current_version += 1;
899        let now = now_nanos();
900
901        if !skip_wal && let Some(wal) = &mut self.wal {
902            wal.append(Mutation::InsertEdge {
903                src_vid,
904                dst_vid,
905                edge_type,
906                eid,
907                version: self.current_version,
908                properties: properties.clone(),
909                edge_type_name: edge_type_name.clone(),
910            })?;
911        }
912
913        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
914
915        // Store edge type name in metadata if provided
916        let type_name_size = if let Some(ref name) = edge_type_name {
917            let size = name.len() + 24;
918            self.edge_types.insert(eid, name.clone());
919            size
920        } else {
921            0
922        };
923
924        // Set timestamps - created_at only set if this is a new edge
925        self.edge_created_at.entry(eid).or_insert(now);
926        self.edge_updated_at.insert(eid, now);
927
928        // A full-row insert supersedes any pending partial-update state
929        // for this EID (Round 12 §A).
930        self.edge_partial_keys.remove(&eid);
931
932        self.estimated_size += type_name_size;
933
934        Ok(())
935    }
936
937    /// Insert an edge's FULL property row plus a touched-keys hint so the
938    /// flush emits only those schema columns via Lance `MergeInsert` on
939    /// the per-edge-type delta tables. Edge analog of
940    /// `insert_vertex_partial_full` (Round 12 §A).
941    #[allow(clippy::too_many_arguments)]
942    pub fn insert_edge_partial_full(
943        &mut self,
944        src_vid: Vid,
945        dst_vid: Vid,
946        edge_type: u32,
947        eid: Eid,
948        properties: Properties,
949        edge_type_name: Option<String>,
950        touched_keys: HashSet<String>,
951    ) -> Result<()> {
952        self.current_version += 1;
953        let now = now_nanos();
954
955        if let Some(wal) = &mut self.wal {
956            wal.append(Mutation::InsertEdge {
957                src_vid,
958                dst_vid,
959                edge_type,
960                eid,
961                version: self.current_version,
962                properties: properties.clone(),
963                edge_type_name: edge_type_name.clone(),
964            })?;
965        }
966
967        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
968
969        // `apply_edge_insertion` cleared the partial-keys entry as a
970        // safety measure (full-row insert supersedes partial). Re-insert
971        // with the touched-keys hint so the flush emits a partial source.
972        self.edge_partial_keys
973            .entry(eid)
974            .or_default()
975            .extend(touched_keys);
976
977        let type_name_size = if let Some(ref name) = edge_type_name {
978            let size = name.len() + 24;
979            self.edge_types.insert(eid, name.clone());
980            size
981        } else {
982            0
983        };
984
985        self.edge_created_at.entry(eid).or_insert(now);
986        self.edge_updated_at.insert(eid, now);
987
988        self.estimated_size += type_name_size;
989
990        Ok(())
991    }
992
993    /// Core edge insertion logic: add vertices, add edge, merge properties, update metadata.
994    ///
995    /// Shared between `insert_edge` (live mutations) and `replay_mutations` (WAL recovery).
996    ///
997    /// # Errors
998    ///
999    /// Returns error if either endpoint vertex has been deleted (exists in vertex_tombstones).
1000    /// This prevents "ghost vertex" resurrection via edge insertion. See issue #77.
1001    fn apply_edge_insertion(
1002        &mut self,
1003        src_vid: Vid,
1004        dst_vid: Vid,
1005        edge_type: u32,
1006        eid: Eid,
1007        properties: Properties,
1008    ) -> Result<()> {
1009        let version = self.current_version;
1010
1011        // Check if either endpoint has been deleted. Inserting an edge to a deleted
1012        // vertex would resurrect it as a "ghost vertex" with no properties. See issue #77.
1013        if self.vertex_tombstones.contains(&src_vid) {
1014            anyhow::bail!(
1015                "Cannot insert edge: source vertex {} has been deleted (issue #77)",
1016                src_vid
1017            );
1018        }
1019        if self.vertex_tombstones.contains(&dst_vid) {
1020            anyhow::bail!(
1021                "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
1022                dst_vid
1023            );
1024        }
1025
1026        // Add vertices to graph topology if they don't exist.
1027        // IMPORTANT: Only add to graph structure, do NOT call insert_vertex.
1028        // insert_vertex creates a new version with empty properties, which would
1029        // cause MVCC to pick the empty version as "latest", losing original properties.
1030        if !self.graph.contains_vertex(src_vid) {
1031            self.graph.add_vertex(src_vid);
1032        }
1033        if !self.graph.contains_vertex(dst_vid) {
1034            self.graph.add_vertex(dst_vid);
1035        }
1036
1037        self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
1038
1039        // Store metadata with CRDT merge logic
1040        let props_size = Self::estimate_properties_size(&properties);
1041        let props_count = properties.len();
1042        if !properties.is_empty() {
1043            let entry = self.edge_properties.entry(eid).or_default();
1044            Self::merge_crdt_properties(entry, properties);
1045        }
1046
1047        self.edge_versions.insert(eid, version);
1048        self.edge_endpoints
1049            .insert(eid, (src_vid, dst_vid, edge_type));
1050        self.tombstones.remove(&eid);
1051        self.mutation_count += 1;
1052        self.mutation_stats.relationships_created += 1;
1053        self.mutation_stats.properties_set += props_count;
1054
1055        // 24 edge + props + 16 version + 28 endpoints + 32 timestamps
1056        self.estimated_size += 24 + props_size + 16 + 28 + 32;
1057
1058        Ok(())
1059    }
1060
1061    pub fn delete_edge(
1062        &mut self,
1063        eid: Eid,
1064        src_vid: Vid,
1065        dst_vid: Vid,
1066        edge_type: u32,
1067    ) -> Result<()> {
1068        self.delete_edge_impl(eid, src_vid, dst_vid, edge_type, false)
1069    }
1070
1071    /// Core edge deletion. When `skip_wal` is true, skips WAL append
1072    /// (used during merge where the caller already wrote to WAL).
1073    fn delete_edge_impl(
1074        &mut self,
1075        eid: Eid,
1076        src_vid: Vid,
1077        dst_vid: Vid,
1078        edge_type: u32,
1079        skip_wal: bool,
1080    ) -> Result<()> {
1081        self.current_version += 1;
1082        let now = now_nanos();
1083
1084        if !skip_wal && let Some(wal) = &mut self.wal {
1085            wal.append(Mutation::DeleteEdge {
1086                eid,
1087                src_vid,
1088                dst_vid,
1089                edge_type,
1090                version: self.current_version,
1091            })?;
1092        }
1093
1094        self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1095
1096        // Update timestamp - deletion is an update
1097        self.edge_updated_at.insert(eid, now);
1098
1099        Ok(())
1100    }
1101
1102    /// Core edge deletion logic: tombstone the edge, update version, remove from graph.
1103    ///
1104    /// Shared between `delete_edge` (live mutations) and `replay_mutations` (WAL recovery).
1105    fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
1106        let version = self.current_version;
1107
1108        self.tombstones.insert(
1109            eid,
1110            TombstoneEntry {
1111                eid,
1112                src_vid,
1113                dst_vid,
1114                edge_type,
1115            },
1116        );
1117        self.edge_versions.insert(eid, version);
1118        // Deletion supersedes any pending partial-update state for this
1119        // EID (Round 12 §A).
1120        self.edge_partial_keys.remove(&eid);
1121        self.graph.remove_edge(eid);
1122        self.mutation_count += 1;
1123        self.mutation_stats.relationships_deleted += 1;
1124
1125        // 64 bytes tombstone + 16 bytes version
1126        self.estimated_size += 80;
1127    }
1128
1129    /// Returns neighbors in the specified direction.
1130    /// O(degree) complexity - iterates only edges connected to the vertex.
1131    pub fn get_neighbors(
1132        &self,
1133        vid: Vid,
1134        edge_type: u32,
1135        direction: Direction,
1136    ) -> Vec<(Vid, Eid, u64)> {
1137        let edges = self.graph.neighbors(vid, direction);
1138
1139        edges
1140            .iter()
1141            .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
1142            .map(|e| {
1143                let neighbor = match direction {
1144                    Direction::Outgoing => e.dst_vid,
1145                    Direction::Incoming => e.src_vid,
1146                };
1147                let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
1148                (neighbor, e.eid, version)
1149            })
1150            .collect()
1151    }
1152
1153    pub fn is_tombstoned(&self, eid: Eid) -> bool {
1154        self.tombstones.contains_key(&eid)
1155    }
1156
1157    /// Returns all VIDs in vertex_labels that match the given label name.
1158    /// O(1) lookup via the reverse label index.
1159    pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
1160        self.label_to_vids
1161            .get(label_name)
1162            .map(|set| set.iter().copied().collect())
1163            .unwrap_or_default()
1164    }
1165
1166    /// Returns all vertex VIDs in the L0 buffer.
1167    ///
1168    /// Used for schemaless scanning (MATCH (n) without label).
1169    pub fn all_vertex_vids(&self) -> Vec<Vid> {
1170        self.vertex_properties.keys().copied().collect()
1171    }
1172
1173    /// Returns all VIDs in vertex_labels that match any of the given label names.
1174    /// Uses the reverse label index — O(sum of matching set sizes).
1175    pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1176        let mut result = HashSet::new();
1177        for label_name in label_names {
1178            if let Some(set) = self.label_to_vids.get(*label_name) {
1179                result.extend(set.iter().copied());
1180            }
1181        }
1182        result.into_iter().collect()
1183    }
1184
1185    /// Returns all VIDs that have ALL specified labels.
1186    /// Uses the reverse label index — intersects the per-label sets.
1187    pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
1188        if label_names.is_empty() {
1189            return Vec::new();
1190        }
1191        // Collect the per-label sets; if any label is missing from the index,
1192        // the intersection is empty.
1193        let sets: Vec<&HashSet<Vid>> = match label_names
1194            .iter()
1195            .map(|ln| self.label_to_vids.get(*ln))
1196            .collect::<Option<Vec<_>>>()
1197        {
1198            Some(s) => s,
1199            None => return Vec::new(),
1200        };
1201        // Start from the smallest set for efficiency.
1202        let smallest = sets.iter().min_by_key(|s| s.len()).unwrap();
1203        smallest
1204            .iter()
1205            .copied()
1206            .filter(|vid| sets.iter().all(|s| s.contains(vid)))
1207            .collect()
1208    }
1209
1210    /// Gets the labels for a VID.
1211    pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
1212        self.vertex_labels.get(&vid).map(|v| v.as_slice())
1213    }
1214
1215    /// Gets the edge type for an EID.
1216    pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
1217        self.edge_types.get(&eid).map(|s| s.as_str())
1218    }
1219
1220    /// Returns all EIDs in edge_types that match the given type name.
1221    /// Used for L0 overlay during schemaless edge scanning.
1222    pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
1223        self.edge_types
1224            .iter()
1225            .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
1226            .map(|(eid, _)| *eid)
1227            .collect()
1228    }
1229
1230    /// Returns all edge EIDs in the L0 buffer (non-tombstoned).
1231    ///
1232    /// Used for schemaless scanning (`MATCH ()-[r]->()`) without type.
1233    pub fn all_edge_eids(&self) -> Vec<Eid> {
1234        self.edge_endpoints
1235            .keys()
1236            .filter(|eid| !self.tombstones.contains_key(eid))
1237            .copied()
1238            .collect()
1239    }
1240
1241    /// Returns edge endpoint data (src_vid, dst_vid) for an EID.
1242    pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
1243        self.edge_endpoints
1244            .get(&eid)
1245            .map(|(src, dst, _)| (*src, *dst))
1246    }
1247
1248    /// Returns full edge endpoint data (src_vid, dst_vid, edge_type_id) for an EID.
1249    pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
1250        self.edge_endpoints.get(&eid).copied()
1251    }
1252
1253    /// Insert a constraint key into the index for O(1) duplicate detection.
1254    pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
1255        self.constraint_index.insert(key, vid);
1256    }
1257
1258    /// Check if a constraint key exists in the index, excluding a specific VID.
1259    /// Returns true if the key exists and is owned by a different vertex.
1260    pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1261        self.constraint_index
1262            .get(key)
1263            .is_some_and(|&v| v != exclude_vid)
1264    }
1265
1266    /// Register a MERGE-create's key into the implicit phantom guard.
1267    pub fn insert_merge_guard_key(&mut self, key: Vec<u8>, vid: Vid) {
1268        self.merge_guard_index.insert(key, vid);
1269    }
1270
1271    /// Check if a MERGE-guard key exists, owned by a different vertex than
1272    /// `exclude_vid` — i.e. a concurrent MERGE already created this key.
1273    pub fn has_merge_guard_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
1274        self.merge_guard_index
1275            .get(key)
1276            .is_some_and(|&v| v != exclude_vid)
1277    }
1278
1279    #[instrument(skip(self, other), level = "trace")]
1280    /// Validate that merging `other` into `self` will not bail on a tombstoned
1281    /// edge endpoint (issue #77), **without mutating** either buffer.
1282    ///
1283    /// Mirrors the endpoint-liveness guard in `apply_edge_insertion`
1284    /// against the tombstone state [`Self::merge`] produces: `other`'s vertex
1285    /// deletions are applied and its vertex inserts clear their own tombstone,
1286    /// so an inserted edge bails iff an endpoint is tombstoned in `self` or
1287    /// `other` and is not (re-)inserted by `other`.
1288    ///
1289    /// Run this under `flush_lock` *before* the durable WAL flush so an
1290    /// offending commit is rejected up front. After the flush the transaction
1291    /// is durable, and a `merge` bail would leave a ghost/partial commit whose
1292    /// WAL replay re-bails — rendering the database unopenable.
1293    ///
1294    /// # Errors
1295    ///
1296    /// Returns an error naming the offending edge and endpoint when the merge
1297    /// would bail.
1298    pub fn validate_merge_edge_endpoints(&self, other: &L0Buffer) -> Result<()> {
1299        // An endpoint is effectively deleted after the merge's vertex phase if
1300        // it is tombstoned in either buffer and `other` does not re-insert it
1301        // (an insert clears the tombstone).
1302        let is_deleted = |vid: &Vid| {
1303            (self.vertex_tombstones.contains(vid) || other.vertex_tombstones.contains(vid))
1304                && !other.vertex_properties.contains_key(vid)
1305        };
1306        for (eid, (src_vid, dst_vid, _etype)) in &other.edge_endpoints {
1307            if other.tombstones.contains_key(eid) {
1308                continue; // a deletion, not an insertion — never resurrects a vertex
1309            }
1310            if is_deleted(src_vid) {
1311                anyhow::bail!(
1312                    "Cannot insert edge {}: source vertex {} has been deleted (issue #77)",
1313                    eid,
1314                    src_vid
1315                );
1316            }
1317            if is_deleted(dst_vid) {
1318                anyhow::bail!(
1319                    "Cannot insert edge {}: destination vertex {} has been deleted (issue #77)",
1320                    eid,
1321                    dst_vid
1322                );
1323            }
1324        }
1325        Ok(())
1326    }
1327
1328    pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
1329        // Validate-then-apply: reject a merge that would bail on a tombstoned
1330        // edge endpoint before mutating anything, so a failed merge can never
1331        // leave a partially-applied (non-atomic) commit.
1332        self.validate_merge_edge_endpoints(other)?;
1333        self.merge_validated(
1334            other,
1335            other.vertex_properties.clone(),
1336            other.edge_properties.clone(),
1337        )
1338    }
1339
1340    /// Commit-path variant of [`merge`](Self::merge) that consumes `other`'s
1341    /// vertex/edge property maps instead of deep-cloning every row.
1342    ///
1343    /// Everything else in `other` (endpoints, tombstones, versions, labels)
1344    /// is left intact — `commit_transaction_l0` still reads those after the
1345    /// merge. The caller must not rely on `other.vertex_properties` /
1346    /// `other.edge_properties` afterwards, which is safe on the commit path
1347    /// because committing consumes the transaction.
1348    pub fn merge_take(&mut self, other: &mut L0Buffer) -> Result<()> {
1349        // Validate BEFORE draining: the endpoint check consults
1350        // `other.vertex_properties` (the "re-inserted by other" exemption).
1351        self.validate_merge_edge_endpoints(other)?;
1352        let vertex_props = std::mem::take(&mut other.vertex_properties);
1353        let edge_props = std::mem::take(&mut other.edge_properties);
1354        self.merge_validated(other, vertex_props, edge_props)
1355    }
1356
1357    /// Shared merge body. `vertex_props` / `edge_props` are `other`'s property
1358    /// maps, passed by value so rows move instead of clone; the caller has
1359    /// already run `validate_merge_edge_endpoints`.
1360    fn merge_validated(
1361        &mut self,
1362        other: &L0Buffer,
1363        vertex_props: HashMap<Vid, Properties>,
1364        mut edge_props: HashMap<Eid, Properties>,
1365    ) -> Result<()> {
1366        trace!(
1367            other_mutation_count = other.mutation_count,
1368            "Merging L0 buffer"
1369        );
1370        // skip_wal=true throughout: the caller (commit_transaction_l0) already
1371        // wrote every one of these mutations to WAL before invoking merge —
1372        // re-appending here would double the WAL volume per commit.
1373        // Merge Vertices
1374        for &vid in &other.vertex_tombstones {
1375            self.delete_vertex_impl(vid, true)?;
1376        }
1377
1378        for (vid, props) in vertex_props {
1379            let labels = other.vertex_labels.get(&vid).cloned().unwrap_or_default();
1380            self.insert_vertex_with_labels_impl(vid, props, &labels, true);
1381        }
1382
1383        // Merge vertex labels that might not have properties
1384        for (vid, labels) in &other.vertex_labels {
1385            if !self.vertex_labels.contains_key(vid) {
1386                self.vertex_labels.insert(*vid, labels.clone());
1387                for label in labels {
1388                    self.label_to_vids
1389                        .entry(label.clone())
1390                        .or_default()
1391                        .insert(*vid);
1392                }
1393            }
1394        }
1395
1396        // Label-overwrite pass: a `SET n:Label` / `REMOVE n:Label` resolved the
1397        // FULL new label set into `other.vertex_labels[vid]` and flagged the vid.
1398        // REPLACE (not append) so removals actually remove and an existing
1399        // vertex's label change lands — overriding any append from the property
1400        // loop above. Skip vids deleted in the same commit. (The append loops
1401        // stay correct for property-path label unions, which are NOT flagged.)
1402        for vid in &other.vertex_label_overwrites {
1403            if other.vertex_tombstones.contains(vid) {
1404                continue;
1405            }
1406            let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
1407            self.remove_vid_from_label_index(*vid);
1408            self.vertex_labels.insert(*vid, labels.clone());
1409            self.index_labels_for_vid(*vid, &labels);
1410        }
1411
1412        // Merge Edges - insert all edges from edge_endpoints, using empty props if none exist
1413        for (eid, (src, dst, etype)) in &other.edge_endpoints {
1414            if other.tombstones.contains_key(eid) {
1415                self.delete_edge_impl(*eid, *src, *dst, *etype, true)?;
1416            } else {
1417                let props = edge_props.remove(eid).unwrap_or_default();
1418                let etype_name = other.edge_types.get(eid).cloned();
1419                self.insert_edge_impl(*src, *dst, *etype, *eid, props, etype_name, true)?;
1420            }
1421        }
1422
1423        // Merge tombstones for edges that only exist in the target buffer (self),
1424        // not in the source buffer's edge_endpoints.  Without this, transaction
1425        // DELETEs of pre-existing edges are silently lost on commit.
1426        for (eid, tombstone) in &other.tombstones {
1427            if !other.edge_endpoints.contains_key(eid) {
1428                self.delete_edge_impl(
1429                    *eid,
1430                    tombstone.src_vid,
1431                    tombstone.dst_vid,
1432                    tombstone.edge_type,
1433                    true,
1434                )?;
1435            }
1436        }
1437
1438        // Edge types are now merged inside insert_edge, so no separate loop needed
1439
1440        // Merge timestamps - preserve semantics of or_insert (keep oldest created_at)
1441        // and insert (use latest updated_at)
1442        for (vid, ts) in &other.vertex_created_at {
1443            self.vertex_created_at.entry(*vid).or_insert(*ts); // keep oldest
1444        }
1445        for (vid, ts) in &other.vertex_updated_at {
1446            self.vertex_updated_at.insert(*vid, *ts); // use latest (tx wins)
1447        }
1448
1449        for (eid, ts) in &other.edge_created_at {
1450            self.edge_created_at.entry(*eid).or_insert(*ts); // keep oldest
1451        }
1452        for (eid, ts) in &other.edge_updated_at {
1453            self.edge_updated_at.insert(*eid, *ts); // use latest (tx wins)
1454        }
1455
1456        // Conservatively add other's estimated size (may overcount due to
1457        // deduplication, but that's safe for a memory limit).
1458        self.estimated_size += other.estimated_size;
1459
1460        // Merge constraint index
1461        for (key, vid) in &other.constraint_index {
1462            self.constraint_index.insert(key.clone(), *vid);
1463        }
1464
1465        // Merge the implicit MERGE-key guard so a committed MERGE-create is
1466        // visible to a concurrent transaction's commit-time re-probe.
1467        for (key, vid) in &other.merge_guard_index {
1468            self.merge_guard_index.insert(key.clone(), *vid);
1469        }
1470
1471        Ok(())
1472    }
1473
1474    /// Replay mutations from WAL without re-logging them.
1475    /// Used during startup recovery to restore L0 state from persisted WAL.
1476    /// Uses CRDT merge semantics to ensure recovered state matches pre-crash state.
1477    #[instrument(skip(self, mutations), level = "debug")]
1478    pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
1479        trace!(count = mutations.len(), "Replaying mutations");
1480        for mutation in mutations {
1481            match mutation {
1482                Mutation::InsertVertex {
1483                    vid,
1484                    properties,
1485                    labels,
1486                } => {
1487                    // Apply without WAL logging, with CRDT merge semantics
1488                    self.current_version += 1;
1489                    let version = self.current_version;
1490
1491                    self.vertex_tombstones.remove(&vid);
1492                    let tracks_extid = properties.contains_key("ext_id");
1493                    let entry = self.vertex_properties.entry(vid).or_default();
1494                    let old_extid = if tracks_extid {
1495                        Self::extid_of(entry)
1496                    } else {
1497                        None
1498                    };
1499                    Self::merge_crdt_properties(entry, properties);
1500                    if tracks_extid {
1501                        let new_extid = Self::extid_of(
1502                            self.vertex_properties.get(&vid).expect("just inserted"),
1503                        );
1504                        self.sync_extid_index(vid, old_extid, new_extid);
1505                    }
1506                    self.vertex_versions.insert(vid, version);
1507                    self.graph.add_vertex(vid);
1508                    self.mutation_count += 1;
1509
1510                    // Restore vertex labels from WAL
1511                    let existing = self.vertex_labels.entry(vid).or_default();
1512                    Self::append_unique_labels(existing, &labels);
1513                    for label in &labels {
1514                        self.label_to_vids
1515                            .entry(label.clone())
1516                            .or_default()
1517                            .insert(vid);
1518                    }
1519                }
1520                Mutation::DeleteVertex { vid, labels } => {
1521                    self.current_version += 1;
1522                    // Restore labels BEFORE apply_vertex_deletion
1523                    if !labels.is_empty() {
1524                        let existing = self.vertex_labels.entry(vid).or_default();
1525                        Self::append_unique_labels(existing, &labels);
1526                        for label in &labels {
1527                            self.label_to_vids
1528                                .entry(label.clone())
1529                                .or_default()
1530                                .insert(vid);
1531                        }
1532                    }
1533                    self.apply_vertex_deletion(vid);
1534                }
1535                Mutation::SetVertexLabels { vid, labels } => {
1536                    // REPLACE the vid's full label set (a label-only mutation
1537                    // resolved the complete set). Replace, not append, so a
1538                    // replayed removal removes; clears the old reverse-index
1539                    // entries first.
1540                    self.current_version += 1;
1541                    self.remove_vid_from_label_index(vid);
1542                    self.vertex_labels.insert(vid, labels.clone());
1543                    self.index_labels_for_vid(vid, &labels);
1544                    self.mutation_count += 1;
1545                }
1546                Mutation::InsertEdge {
1547                    src_vid,
1548                    dst_vid,
1549                    edge_type,
1550                    eid,
1551                    version: _,
1552                    properties,
1553                    edge_type_name,
1554                } => {
1555                    self.current_version += 1;
1556                    // Skip-and-warn on the issue-#77 endpoint bail: a pre-fix
1557                    // durable WAL may hold a ghost edge whose endpoint was
1558                    // tombstoned. Recovery must still open the database rather
1559                    // than abort, so drop the offending edge and continue.
1560                    match self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties) {
1561                        Ok(()) => {
1562                            // Restore edge type name metadata if present
1563                            if let Some(name) = edge_type_name {
1564                                self.edge_types.insert(eid, name);
1565                            }
1566                        }
1567                        Err(e) => {
1568                            tracing::warn!(
1569                                ?eid,
1570                                ?src_vid,
1571                                ?dst_vid,
1572                                error = %e,
1573                                "WAL replay: skipping edge insertion to a deleted endpoint (issue #77)"
1574                            );
1575                        }
1576                    }
1577                }
1578                Mutation::DeleteEdge {
1579                    eid,
1580                    src_vid,
1581                    dst_vid,
1582                    edge_type,
1583                    version: _,
1584                } => {
1585                    self.current_version += 1;
1586                    self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
1587                }
1588            }
1589        }
1590        Ok(())
1591    }
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596    use super::*;
1597
1598    #[test]
1599    fn test_l0_buffer_ops() -> Result<()> {
1600        let mut l0 = L0Buffer::new(0, None);
1601        let vid_a = Vid::new(1);
1602        let vid_b = Vid::new(2);
1603        let eid_ab = Eid::new(101);
1604
1605        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1606
1607        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1608        assert_eq!(neighbors.len(), 1);
1609        assert_eq!(neighbors[0].0, vid_b);
1610        assert_eq!(neighbors[0].1, eid_ab);
1611
1612        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1613        assert!(l0.is_tombstoned(eid_ab));
1614
1615        // Verify neighbors are empty after deletion
1616        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1617        assert_eq!(neighbors_after.len(), 0);
1618
1619        Ok(())
1620    }
1621
1622    /// Regression for review #5: merging an edge whose endpoint is tombstoned
1623    /// in the target buffer must be rejected up front (`validate_merge_edge_endpoints`)
1624    /// and `merge` must be atomic — never partially applied. Before the fix this
1625    /// bailed only *inside* `merge`, after the durable WAL flush, leaving a ghost
1626    /// commit that made the database unopenable on replay.
1627    #[test]
1628    fn validate_merge_rejects_edge_to_tombstoned_endpoint() {
1629        let mut main = L0Buffer::new(0, None);
1630        let vid_a = Vid::new(1);
1631        let vid_b = Vid::new(2);
1632        main.insert_vertex(vid_a, HashMap::new());
1633        main.insert_vertex(vid_b, HashMap::new());
1634        main.delete_vertex(vid_b).unwrap(); // B is now tombstoned in main
1635
1636        // A transaction that inserts an edge A -> B (B tombstoned in main).
1637        let mut tx = L0Buffer::new(0, None);
1638        let eid = Eid::new(101);
1639        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1640            .unwrap();
1641
1642        assert!(
1643            main.validate_merge_edge_endpoints(&tx).is_err(),
1644            "edge to a tombstoned endpoint must be rejected before merge"
1645        );
1646        // merge validates first, so it errors and leaves main untouched (atomic).
1647        assert!(
1648            main.merge(&tx).is_err(),
1649            "merge must reject, not bail mid-apply"
1650        );
1651        assert!(
1652            !main.edge_endpoints.contains_key(&eid),
1653            "a rejected merge must not have partially applied the edge"
1654        );
1655    }
1656
1657    /// When the transaction re-inserts the endpoint vertex, the edge is valid
1658    /// (the insert clears the tombstone) and the merge succeeds.
1659    #[test]
1660    fn validate_merge_allows_edge_when_endpoint_reinserted() {
1661        let mut main = L0Buffer::new(0, None);
1662        let vid_a = Vid::new(1);
1663        let vid_b = Vid::new(2);
1664        main.insert_vertex(vid_a, HashMap::new());
1665        main.insert_vertex(vid_b, HashMap::new());
1666        main.delete_vertex(vid_b).unwrap();
1667
1668        let mut tx = L0Buffer::new(0, None);
1669        tx.insert_vertex(vid_b, HashMap::new()); // re-insert B
1670        let eid = Eid::new(101);
1671        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1672            .unwrap();
1673
1674        assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1675        assert!(main.merge(&tx).is_ok());
1676        assert!(main.edge_endpoints.contains_key(&eid));
1677    }
1678
1679    /// Edges between live endpoints merge as before — no false positives.
1680    #[test]
1681    fn validate_merge_allows_edge_to_live_endpoints() {
1682        let mut main = L0Buffer::new(0, None);
1683        let vid_a = Vid::new(1);
1684        let vid_b = Vid::new(2);
1685        main.insert_vertex(vid_a, HashMap::new());
1686        main.insert_vertex(vid_b, HashMap::new());
1687
1688        let mut tx = L0Buffer::new(0, None);
1689        let eid = Eid::new(101);
1690        tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)
1691            .unwrap();
1692
1693        assert!(main.validate_merge_edge_endpoints(&tx).is_ok());
1694        assert!(main.merge(&tx).is_ok());
1695        assert!(main.edge_endpoints.contains_key(&eid));
1696    }
1697
1698    #[test]
1699    fn test_l0_buffer_multiple_edges() -> Result<()> {
1700        let mut l0 = L0Buffer::new(0, None);
1701        let vid_a = Vid::new(1);
1702        let vid_b = Vid::new(2);
1703        let vid_c = Vid::new(3);
1704        let eid_ab = Eid::new(101);
1705        let eid_ac = Eid::new(102);
1706
1707        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1708        l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
1709
1710        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1711        assert_eq!(neighbors.len(), 2);
1712
1713        // Delete one edge
1714        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
1715
1716        // Should still have one neighbor
1717        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1718        assert_eq!(neighbors_after.len(), 1);
1719        assert_eq!(neighbors_after[0].0, vid_c);
1720
1721        Ok(())
1722    }
1723
1724    #[test]
1725    fn test_l0_buffer_edge_type_filter() -> Result<()> {
1726        let mut l0 = L0Buffer::new(0, None);
1727        let vid_a = Vid::new(1);
1728        let vid_b = Vid::new(2);
1729        let vid_c = Vid::new(3);
1730        let eid_ab = Eid::new(101);
1731        let eid_ac = Eid::new(201); // Different edge type
1732
1733        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1734        l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
1735
1736        // Filter by edge type 1
1737        let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1738        assert_eq!(type1_neighbors.len(), 1);
1739        assert_eq!(type1_neighbors[0].0, vid_b);
1740
1741        // Filter by edge type 2
1742        let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
1743        assert_eq!(type2_neighbors.len(), 1);
1744        assert_eq!(type2_neighbors[0].0, vid_c);
1745
1746        Ok(())
1747    }
1748
1749    #[test]
1750    fn test_l0_buffer_incoming_edges() -> Result<()> {
1751        let mut l0 = L0Buffer::new(0, None);
1752        let vid_a = Vid::new(1);
1753        let vid_b = Vid::new(2);
1754        let vid_c = Vid::new(3);
1755        let eid_ab = Eid::new(101);
1756        let eid_cb = Eid::new(102);
1757
1758        // a -> b and c -> b
1759        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1760        l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
1761
1762        // Check incoming edges to b
1763        let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
1764        assert_eq!(incoming.len(), 2);
1765
1766        Ok(())
1767    }
1768
1769    /// Regression test: merge should preserve edges without properties
1770    #[test]
1771    fn test_merge_empty_props_edge() -> Result<()> {
1772        let mut main_l0 = L0Buffer::new(0, None);
1773        let mut tx_l0 = L0Buffer::new(0, None);
1774
1775        let vid_a = Vid::new(1);
1776        let vid_b = Vid::new(2);
1777        let eid_ab = Eid::new(101);
1778
1779        // Insert edge with empty properties in transaction L0
1780        tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
1781
1782        // Verify edge exists in tx_l0
1783        assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
1784        assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); // No properties entry
1785
1786        // Merge into main L0
1787        main_l0.merge(&tx_l0)?;
1788
1789        // Edge should exist in main L0 after merge
1790        assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
1791        let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
1792        assert_eq!(neighbors.len(), 1);
1793        assert_eq!(neighbors[0].0, vid_b);
1794
1795        Ok(())
1796    }
1797
1798    /// Regression test: WAL replay should use CRDT merge semantics
1799    #[test]
1800    fn test_replay_crdt_merge() -> Result<()> {
1801        use crate::runtime::wal::Mutation;
1802        use serde_json::json;
1803        use uni_common::Value;
1804
1805        let mut l0 = L0Buffer::new(0, None);
1806        let vid = Vid::new(1);
1807
1808        // Create GCounter CRDT values using correct serde format:
1809        // {"t": "gc", "d": {"counts": {...}}}
1810        let counter1: Value = json!({
1811            "t": "gc",
1812            "d": {"counts": {"node1": 5}}
1813        })
1814        .into();
1815        let counter2: Value = json!({
1816            "t": "gc",
1817            "d": {"counts": {"node2": 3}}
1818        })
1819        .into();
1820
1821        // First mutation: insert vertex with counter1
1822        let mut props1 = HashMap::new();
1823        props1.insert("counter".to_string(), counter1.clone());
1824        l0.replay_mutations(vec![Mutation::InsertVertex {
1825            vid,
1826            properties: props1,
1827            labels: vec![],
1828        }])?;
1829
1830        // Second mutation: insert same vertex with counter2 (should merge)
1831        let mut props2 = HashMap::new();
1832        props2.insert("counter".to_string(), counter2.clone());
1833        l0.replay_mutations(vec![Mutation::InsertVertex {
1834            vid,
1835            properties: props2,
1836            labels: vec![],
1837        }])?;
1838
1839        // Verify CRDT was merged (both node1 and node2 counts present)
1840        let stored_props = l0.vertex_properties.get(&vid).unwrap();
1841        let stored_counter = stored_props.get("counter").unwrap();
1842
1843        // Convert back to serde_json::Value for nested access
1844        let stored_json: serde_json::Value = stored_counter.clone().into();
1845        // The merged counter should have both node1: 5 and node2: 3
1846        let data = stored_json.get("d").unwrap();
1847        let counts = data.get("counts").unwrap();
1848        assert_eq!(counts.get("node1"), Some(&json!(5)));
1849        assert_eq!(counts.get("node2"), Some(&json!(3)));
1850
1851        Ok(())
1852    }
1853
1854    #[test]
1855    fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1856        let mut l0_main = L0Buffer::new(0, None);
1857        let mut l0_tx = L0Buffer::new(0, None);
1858        let vid = Vid::new(1);
1859
1860        // Main buffer: insert vertex with timestamp T1
1861        let ts_main_created = 1000;
1862        let ts_main_updated = 1100;
1863        l0_main.insert_vertex(vid, HashMap::new());
1864        l0_main.vertex_created_at.insert(vid, ts_main_created);
1865        l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1866
1867        // Transaction buffer: update same vertex with timestamp T2 (later)
1868        let ts_tx_created = 2000; // should be ignored (main has older created_at)
1869        let ts_tx_updated = 2100; // should win (tx has newer updated_at)
1870        l0_tx.insert_vertex(vid, HashMap::new());
1871        l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1872        l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1873
1874        // Merge transaction into main
1875        l0_main.merge(&l0_tx)?;
1876
1877        // Verify created_at is oldest (from main)
1878        assert_eq!(
1879            *l0_main.vertex_created_at.get(&vid).unwrap(),
1880            ts_main_created,
1881            "created_at should preserve oldest timestamp"
1882        );
1883
1884        // Verify updated_at is latest (from tx)
1885        assert_eq!(
1886            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1887            ts_tx_updated,
1888            "updated_at should use latest timestamp"
1889        );
1890
1891        Ok(())
1892    }
1893
1894    #[test]
1895    fn test_merge_preserves_edge_timestamps() -> Result<()> {
1896        let mut l0_main = L0Buffer::new(0, None);
1897        let mut l0_tx = L0Buffer::new(0, None);
1898        let vid_a = Vid::new(1);
1899        let vid_b = Vid::new(2);
1900        let eid = Eid::new(100);
1901
1902        // Main buffer: insert edge with timestamp T1
1903        let ts_main_created = 1000;
1904        let ts_main_updated = 1100;
1905        l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1906        l0_main.edge_created_at.insert(eid, ts_main_created);
1907        l0_main.edge_updated_at.insert(eid, ts_main_updated);
1908
1909        // Transaction buffer: update same edge with timestamp T2 (later)
1910        let ts_tx_created = 2000; // should be ignored
1911        let ts_tx_updated = 2100; // should win
1912        l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1913        l0_tx.edge_created_at.insert(eid, ts_tx_created);
1914        l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1915
1916        // Merge transaction into main
1917        l0_main.merge(&l0_tx)?;
1918
1919        // Verify created_at is oldest (from main)
1920        assert_eq!(
1921            *l0_main.edge_created_at.get(&eid).unwrap(),
1922            ts_main_created,
1923            "edge created_at should preserve oldest timestamp"
1924        );
1925
1926        // Verify updated_at is latest (from tx)
1927        assert_eq!(
1928            *l0_main.edge_updated_at.get(&eid).unwrap(),
1929            ts_tx_updated,
1930            "edge updated_at should use latest timestamp"
1931        );
1932
1933        Ok(())
1934    }
1935
1936    #[test]
1937    fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1938        use uni_common::Value;
1939
1940        let mut l0_main = L0Buffer::new(0, None);
1941        let mut l0_tx = L0Buffer::new(0, None);
1942        let vid = Vid::new(1);
1943
1944        // Main buffer: vertex created at T1
1945        let ts_original = 1000;
1946        l0_main.insert_vertex(vid, HashMap::new());
1947        l0_main.vertex_created_at.insert(vid, ts_original);
1948        l0_main.vertex_updated_at.insert(vid, ts_original);
1949
1950        // Transaction buffer: update vertex (created_at would be T2 if set)
1951        let ts_tx = 2000;
1952        let mut props = HashMap::new();
1953        props.insert("updated".to_string(), Value::String("yes".to_string()));
1954        l0_tx.insert_vertex(vid, props);
1955        l0_tx.vertex_created_at.insert(vid, ts_tx);
1956        l0_tx.vertex_updated_at.insert(vid, ts_tx);
1957
1958        // Merge transaction into main
1959        l0_main.merge(&l0_tx)?;
1960
1961        // Verify created_at was NOT overwritten (still T1, not T2)
1962        assert_eq!(
1963            *l0_main.vertex_created_at.get(&vid).unwrap(),
1964            ts_original,
1965            "created_at must not be overwritten for existing vertex"
1966        );
1967
1968        // Verify updated_at WAS updated (now T2)
1969        assert_eq!(
1970            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1971            ts_tx,
1972            "updated_at should reflect transaction timestamp"
1973        );
1974
1975        // Verify properties were merged
1976        assert!(
1977            l0_main
1978                .vertex_properties
1979                .get(&vid)
1980                .unwrap()
1981                .contains_key("updated")
1982        );
1983
1984        Ok(())
1985    }
1986
1987    /// Test for Issue #23: Vertex labels preserved through replay_mutations
1988    #[test]
1989    fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1990        use crate::runtime::wal::Mutation;
1991
1992        let mut l0 = L0Buffer::new(0, None);
1993        let vid = Vid::new(42);
1994
1995        // Create InsertVertex mutation with labels
1996        let mutations = vec![Mutation::InsertVertex {
1997            vid,
1998            properties: {
1999                let mut props = HashMap::new();
2000                props.insert(
2001                    "name".to_string(),
2002                    uni_common::Value::String("Alice".to_string()),
2003                );
2004                props
2005            },
2006            labels: vec!["Person".to_string(), "User".to_string()],
2007        }];
2008
2009        // Replay mutations
2010        l0.replay_mutations(mutations)?;
2011
2012        // Verify vertex exists in L0
2013        assert!(l0.vertex_properties.contains_key(&vid));
2014
2015        // Verify labels are preserved
2016        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2017        assert_eq!(labels.len(), 2);
2018        assert!(labels.contains(&"Person".to_string()));
2019        assert!(labels.contains(&"User".to_string()));
2020
2021        // Verify vertex is findable by label
2022        let person_vids = l0.vids_for_label("Person");
2023        assert_eq!(person_vids.len(), 1);
2024        assert_eq!(person_vids[0], vid);
2025
2026        let user_vids = l0.vids_for_label("User");
2027        assert_eq!(user_vids.len(), 1);
2028        assert_eq!(user_vids[0], vid);
2029
2030        Ok(())
2031    }
2032
2033    /// Test for Issue #23: DeleteVertex labels preserved for tombstone flushing
2034    #[test]
2035    fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
2036        use crate::runtime::wal::Mutation;
2037
2038        let mut l0 = L0Buffer::new(0, None);
2039        let vid = Vid::new(99);
2040
2041        // First insert vertex with labels
2042        l0.insert_vertex_with_labels(
2043            vid,
2044            HashMap::new(),
2045            &["Person".to_string(), "Admin".to_string()],
2046        );
2047
2048        // Verify vertex and labels exist
2049        assert!(l0.vertex_properties.contains_key(&vid));
2050        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
2051        assert_eq!(labels.len(), 2);
2052
2053        // Create DeleteVertex mutation with labels
2054        let mutations = vec![Mutation::DeleteVertex {
2055            vid,
2056            labels: vec!["Person".to_string(), "Admin".to_string()],
2057        }];
2058
2059        // Replay deletion
2060        l0.replay_mutations(mutations)?;
2061
2062        // Verify vertex is tombstoned
2063        assert!(l0.vertex_tombstones.contains(&vid));
2064
2065        // Verify labels are preserved in L0 (needed for Issue #76 tombstone flushing)
2066        // The labels should still be accessible for the flush logic to know which tables to update
2067        let labels = l0.get_vertex_labels(vid);
2068        assert!(
2069            labels.is_some(),
2070            "Labels should be preserved even after deletion for tombstone flushing"
2071        );
2072
2073        Ok(())
2074    }
2075
2076    /// Test for Issue #28: Edge type name preserved through replay_mutations
2077    #[test]
2078    fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
2079        use crate::runtime::wal::Mutation;
2080
2081        let mut l0 = L0Buffer::new(0, None);
2082        let src = Vid::new(1);
2083        let dst = Vid::new(2);
2084        let eid = Eid::new(500);
2085        let edge_type = 100;
2086
2087        // Create InsertEdge mutation with edge_type_name
2088        let mutations = vec![Mutation::InsertEdge {
2089            src_vid: src,
2090            dst_vid: dst,
2091            edge_type,
2092            eid,
2093            version: 1,
2094            properties: {
2095                let mut props = HashMap::new();
2096                props.insert("since".to_string(), uni_common::Value::Int(2020));
2097                props
2098            },
2099            edge_type_name: Some("KNOWS".to_string()),
2100        }];
2101
2102        // Replay mutations
2103        l0.replay_mutations(mutations)?;
2104
2105        // Verify edge exists in L0
2106        assert!(l0.edge_endpoints.contains_key(&eid));
2107
2108        // Verify edge type name is preserved
2109        let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
2110        assert_eq!(type_name, "KNOWS");
2111
2112        // Verify edge is findable by type name
2113        let knows_eids = l0.eids_for_type("KNOWS");
2114        assert_eq!(knows_eids.len(), 1);
2115        assert_eq!(knows_eids[0], eid);
2116
2117        Ok(())
2118    }
2119
2120    /// Test for Issue #28: Edge type mapping survives multiple replay cycles
2121    #[test]
2122    fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
2123        use crate::runtime::wal::Mutation;
2124
2125        let mut l0 = L0Buffer::new(0, None);
2126
2127        // Replay multiple edge insertions with different types
2128        let mutations = vec![
2129            Mutation::InsertEdge {
2130                src_vid: Vid::new(1),
2131                dst_vid: Vid::new(2),
2132                edge_type: 100,
2133                eid: Eid::new(1000),
2134                version: 1,
2135                properties: HashMap::new(),
2136                edge_type_name: Some("KNOWS".to_string()),
2137            },
2138            Mutation::InsertEdge {
2139                src_vid: Vid::new(2),
2140                dst_vid: Vid::new(3),
2141                edge_type: 101,
2142                eid: Eid::new(1001),
2143                version: 2,
2144                properties: HashMap::new(),
2145                edge_type_name: Some("LIKES".to_string()),
2146            },
2147            Mutation::InsertEdge {
2148                src_vid: Vid::new(3),
2149                dst_vid: Vid::new(1),
2150                edge_type: 100,
2151                eid: Eid::new(1002),
2152                version: 3,
2153                properties: HashMap::new(),
2154                edge_type_name: Some("KNOWS".to_string()),
2155            },
2156        ];
2157
2158        l0.replay_mutations(mutations)?;
2159
2160        // Verify all edge type mappings are preserved
2161        assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
2162        assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
2163        assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
2164
2165        // Verify edges can be queried by type
2166        let knows_edges = l0.eids_for_type("KNOWS");
2167        assert_eq!(knows_edges.len(), 2);
2168        assert!(knows_edges.contains(&Eid::new(1000)));
2169        assert!(knows_edges.contains(&Eid::new(1002)));
2170
2171        let likes_edges = l0.eids_for_type("LIKES");
2172        assert_eq!(likes_edges.len(), 1);
2173        assert_eq!(likes_edges[0], Eid::new(1001));
2174
2175        Ok(())
2176    }
2177
2178    /// Test for Issue #23 + #28: Combined vertex labels and edge types in replay
2179    #[test]
2180    fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
2181        use crate::runtime::wal::Mutation;
2182
2183        let mut l0 = L0Buffer::new(0, None);
2184        let alice = Vid::new(1);
2185        let bob = Vid::new(2);
2186        let eid = Eid::new(100);
2187
2188        // Simulate crash recovery scenario: replay full transaction log
2189        let mutations = vec![
2190            // Insert Alice with Person label
2191            Mutation::InsertVertex {
2192                vid: alice,
2193                properties: {
2194                    let mut props = HashMap::new();
2195                    props.insert(
2196                        "name".to_string(),
2197                        uni_common::Value::String("Alice".to_string()),
2198                    );
2199                    props
2200                },
2201                labels: vec!["Person".to_string()],
2202            },
2203            // Insert Bob with Person label
2204            Mutation::InsertVertex {
2205                vid: bob,
2206                properties: {
2207                    let mut props = HashMap::new();
2208                    props.insert(
2209                        "name".to_string(),
2210                        uni_common::Value::String("Bob".to_string()),
2211                    );
2212                    props
2213                },
2214                labels: vec!["Person".to_string()],
2215            },
2216            // Create KNOWS edge between them
2217            Mutation::InsertEdge {
2218                src_vid: alice,
2219                dst_vid: bob,
2220                edge_type: 1,
2221                eid,
2222                version: 3,
2223                properties: HashMap::new(),
2224                edge_type_name: Some("KNOWS".to_string()),
2225            },
2226        ];
2227
2228        // Replay all mutations
2229        l0.replay_mutations(mutations)?;
2230
2231        // Verify vertex labels preserved
2232        assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
2233        assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
2234        assert_eq!(l0.vids_for_label("Person").len(), 2);
2235
2236        // Verify edge type name preserved
2237        assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
2238        assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
2239
2240        // Verify graph structure
2241        let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
2242        assert_eq!(alice_neighbors.len(), 1);
2243        assert_eq!(alice_neighbors[0].0, bob);
2244
2245        Ok(())
2246    }
2247
2248    /// Test for Issue #23: Empty labels should deserialize correctly (backward compat)
2249    #[test]
2250    fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
2251        use crate::runtime::wal::Mutation;
2252
2253        let mut l0 = L0Buffer::new(0, None);
2254        let vid = Vid::new(1);
2255
2256        // Simulate old WAL format: InsertVertex with empty labels
2257        // (This tests #[serde(default)] behavior)
2258        let mutations = vec![Mutation::InsertVertex {
2259            vid,
2260            properties: HashMap::new(),
2261            labels: vec![], // Empty labels (old format compatibility)
2262        }];
2263
2264        l0.replay_mutations(mutations)?;
2265
2266        // Vertex should exist
2267        assert!(l0.vertex_properties.contains_key(&vid));
2268
2269        // Labels should be empty but entry should exist in vertex_labels
2270        let labels = l0.get_vertex_labels(vid);
2271        assert!(labels.is_some(), "Labels entry should exist even if empty");
2272        assert_eq!(labels.unwrap().len(), 0);
2273
2274        Ok(())
2275    }
2276
2277    #[test]
2278    fn test_now_nanos_returns_nanosecond_range() {
2279        // Test that now_nanos() returns a value in nanosecond range
2280        // As of 2025, Unix timestamp in nanoseconds should be > 1.7e18
2281        // (2025-01-01 is approximately 1,735,689,600 seconds = 1.735e18 nanoseconds)
2282        let now = now_nanos();
2283
2284        // Verify it's in nanosecond range (not microseconds which would be 1000x smaller)
2285        assert!(
2286            now > 1_700_000_000_000_000_000,
2287            "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
2288            now
2289        );
2290
2291        // Sanity check: should also be less than year 2100 in nanoseconds (4.1e18)
2292        assert!(
2293            now < 4_100_000_000_000_000_000,
2294            "now_nanos() returned {}, expected < 4.1e18",
2295            now
2296        );
2297    }
2298}