Skip to main content

uni_store/runtime/
l0.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15/// Returns the current timestamp in nanoseconds since Unix epoch.
16fn now_nanos() -> i64 {
17    SystemTime::now()
18        .duration_since(UNIX_EPOCH)
19        .map(|d| d.as_nanos() as i64)
20        .unwrap_or(0)
21}
22
23/// Serialize a constraint key for O(1) uniqueness checks.
24/// Format: label + separator + sorted (prop_name, value) pairs.
25pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
26    let mut buf = label.as_bytes().to_vec();
27    buf.push(0); // separator
28    let mut sorted = key_values.to_vec();
29    sorted.sort_by(|a, b| a.0.cmp(&b.0));
30    for (k, v) in &sorted {
31        buf.extend(k.as_bytes());
32        buf.push(0);
33        // Use serde_json serialization for deterministic value encoding
34        buf.extend(serde_json::to_vec(v).unwrap_or_default());
35        buf.push(0);
36    }
37    buf
38}
39
40/// Per-type mutation counters accumulated by the L0 buffer.
41///
42/// Used to provide detailed mutation statistics (e.g., `nodes_created`,
43/// `relationships_deleted`) on `ExecuteResult`. Callers snapshot before/after
44/// execution and call [`diff()`](MutationStats::diff) to get the delta.
45#[derive(Debug, Clone, Default)]
46pub struct MutationStats {
47    pub nodes_created: usize,
48    pub nodes_deleted: usize,
49    pub relationships_created: usize,
50    pub relationships_deleted: usize,
51    pub properties_set: usize,
52    pub properties_removed: usize,
53    pub labels_added: usize,
54    pub labels_removed: usize,
55}
56
57impl MutationStats {
58    /// Compute the field-wise difference `self - before`.
59    pub fn diff(&self, before: &Self) -> Self {
60        Self {
61            nodes_created: self.nodes_created.saturating_sub(before.nodes_created),
62            nodes_deleted: self.nodes_deleted.saturating_sub(before.nodes_deleted),
63            relationships_created: self
64                .relationships_created
65                .saturating_sub(before.relationships_created),
66            relationships_deleted: self
67                .relationships_deleted
68                .saturating_sub(before.relationships_deleted),
69            properties_set: self.properties_set.saturating_sub(before.properties_set),
70            properties_removed: self
71                .properties_removed
72                .saturating_sub(before.properties_removed),
73            labels_added: self.labels_added.saturating_sub(before.labels_added),
74            labels_removed: self.labels_removed.saturating_sub(before.labels_removed),
75        }
76    }
77}
78
79#[derive(Clone, Debug)]
80pub struct TombstoneEntry {
81    pub eid: Eid,
82    pub src_vid: Vid,
83    pub dst_vid: Vid,
84    pub edge_type: u32,
85}
86
87pub struct L0Buffer {
88    /// Graph topology using simple adjacency lists
89    pub graph: SimpleGraph,
90    /// Soft-deleted edges (tombstones for LSM-style merging)
91    pub tombstones: HashMap<Eid, TombstoneEntry>,
92    /// Soft-deleted vertices
93    pub vertex_tombstones: HashSet<Vid>,
94    /// Edge version tracking for MVCC
95    pub edge_versions: HashMap<Eid, u64>,
96    /// Vertex version tracking for MVCC
97    pub vertex_versions: HashMap<Vid, u64>,
98    /// Edge properties (stored separately from topology)
99    pub edge_properties: HashMap<Eid, Properties>,
100    /// Vertex properties (stored separately from topology)
101    pub vertex_properties: HashMap<Vid, Properties>,
102    /// Edge endpoint lookup: eid -> (src, dst, type)
103    pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
104    /// Vertex labels (VID -> list of label names)
105    /// New in storage design: vertices can have multiple labels
106    pub vertex_labels: HashMap<Vid, Vec<String>>,
107    /// Edge types (EID -> type name)
108    pub edge_types: HashMap<Eid, String>,
109    /// Current version counter
110    pub current_version: u64,
111    /// Mutation count for flush decisions
112    pub mutation_count: usize,
113    /// Per-type mutation counters for detailed statistics.
114    pub mutation_stats: MutationStats,
115    /// Write-ahead log for durability
116    pub wal: Option<Arc<WriteAheadLog>>,
117    /// WAL LSN at the time this L0 was rotated for flush.
118    /// Used to ensure WAL truncation doesn't remove entries needed by pending flushes.
119    pub wal_lsn_at_flush: u64,
120    /// Vertex creation timestamps (nanoseconds since epoch)
121    pub vertex_created_at: HashMap<Vid, i64>,
122    /// Vertex update timestamps (nanoseconds since epoch)
123    pub vertex_updated_at: HashMap<Vid, i64>,
124    /// Edge creation timestamps (nanoseconds since epoch)
125    pub edge_created_at: HashMap<Eid, i64>,
126    /// Edge update timestamps (nanoseconds since epoch)
127    pub edge_updated_at: HashMap<Eid, i64>,
128    /// Estimated size in bytes for memory limit enforcement.
129    /// Incremented O(1) on each mutation to avoid O(V+E) size_bytes() calls.
130    pub estimated_size: usize,
131    /// Per-constraint index for O(1) unique key checks.
132    /// Key: constraint composite key (label + sorted property values serialized).
133    /// Value: Vid that owns this key.
134    pub constraint_index: HashMap<Vec<u8>, Vid>,
135}
136
137impl std::fmt::Debug for L0Buffer {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("L0Buffer")
140            .field("vertex_count", &self.graph.vertex_count())
141            .field("edge_count", &self.graph.edge_count())
142            .field("tombstones", &self.tombstones.len())
143            .field("vertex_tombstones", &self.vertex_tombstones.len())
144            .field("current_version", &self.current_version)
145            .field("mutation_count", &self.mutation_count)
146            .finish()
147    }
148}
149
150impl Clone for L0Buffer {
151    /// Clone the L0 buffer for fork/restore (ASSUME/ABDUCE).
152    ///
153    /// The cloned buffer does NOT share the WAL reference — forked L0s are
154    /// ephemeral and should not write to the WAL.
155    fn clone(&self) -> Self {
156        Self {
157            graph: self.graph.clone(),
158            tombstones: self.tombstones.clone(),
159            vertex_tombstones: self.vertex_tombstones.clone(),
160            edge_versions: self.edge_versions.clone(),
161            vertex_versions: self.vertex_versions.clone(),
162            edge_properties: self.edge_properties.clone(),
163            vertex_properties: self.vertex_properties.clone(),
164            edge_endpoints: self.edge_endpoints.clone(),
165            vertex_labels: self.vertex_labels.clone(),
166            edge_types: self.edge_types.clone(),
167            current_version: self.current_version,
168            mutation_count: self.mutation_count,
169            mutation_stats: self.mutation_stats.clone(),
170            wal: None, // Forked L0s don't share the WAL
171            wal_lsn_at_flush: self.wal_lsn_at_flush,
172            vertex_created_at: self.vertex_created_at.clone(),
173            vertex_updated_at: self.vertex_updated_at.clone(),
174            edge_created_at: self.edge_created_at.clone(),
175            edge_updated_at: self.edge_updated_at.clone(),
176            estimated_size: self.estimated_size,
177            constraint_index: self.constraint_index.clone(),
178        }
179    }
180}
181
182impl L0Buffer {
183    /// Append labels to a vec, skipping duplicates.
184    fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
185        for label in labels {
186            if !existing.contains(label) {
187                existing.push(label.clone());
188            }
189        }
190    }
191
192    /// Merge CRDT properties into an existing property map.
193    /// Attempts CRDT merge if both values are valid CRDTs, falls back to overwrite.
194    fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
195        for (k, v) in properties {
196            // Attempt merge if CRDT — convert to serde_json::Value for CRDT deserialization
197            let json_v: serde_json::Value = v.clone().into();
198            if let Ok(mut new_crdt) = serde_json::from_value::<Crdt>(json_v)
199                && let Some(existing_v) = entry.get(&k)
200                && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
201            {
202                // Use try_merge to avoid panic on type mismatch
203                if new_crdt.try_merge(&existing_crdt).is_ok()
204                    && let Ok(merged_json) = serde_json::to_value(new_crdt)
205                {
206                    entry.insert(k, uni_common::Value::from(merged_json));
207                    continue;
208                }
209                // try_merge failed (type mismatch) - fall through to overwrite
210            }
211            // Fallback: Overwrite
212            entry.insert(k, v);
213        }
214    }
215
216    /// Helper function to estimate property map size in bytes.
217    fn estimate_properties_size(props: &Properties) -> usize {
218        props.keys().map(|k| k.len() + 32).sum()
219    }
220
221    /// Returns an estimate of the buffer size in bytes.
222    /// Includes all fields for accurate memory accounting.
223    pub fn size_bytes(&self) -> usize {
224        let mut total = 0;
225
226        // Topology
227        total += self.graph.vertex_count() * 8;
228        total += self.graph.edge_count() * 24;
229
230        // Properties (rough estimate: key string + 32 bytes for value)
231        for props in self.vertex_properties.values() {
232            total += Self::estimate_properties_size(props);
233        }
234        for props in self.edge_properties.values() {
235            total += Self::estimate_properties_size(props);
236        }
237
238        // Metadata
239        total += self.tombstones.len() * 64;
240        total += self.vertex_tombstones.len() * 8;
241        total += self.edge_versions.len() * 16;
242        total += self.vertex_versions.len() * 16;
243        total += self.edge_endpoints.len() * 28; // (Vid, Vid, u32) = 8+8+4 + overhead
244
245        // Vertex labels
246        for labels in self.vertex_labels.values() {
247            total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
248        }
249
250        // Edge types
251        for type_name in self.edge_types.values() {
252            total += type_name.len() + 24;
253        }
254
255        // Timestamps (4 maps, each entry is 16 bytes: 8-byte key + 8-byte i64 value)
256        total += self.vertex_created_at.len() * 16;
257        total += self.vertex_updated_at.len() * 16;
258        total += self.edge_created_at.len() * 16;
259        total += self.edge_updated_at.len() * 16;
260
261        total
262    }
263
264    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
265        Self {
266            graph: SimpleGraph::new(),
267            tombstones: HashMap::new(),
268            vertex_tombstones: HashSet::new(),
269            edge_versions: HashMap::new(),
270            vertex_versions: HashMap::new(),
271            edge_properties: HashMap::new(),
272            vertex_properties: HashMap::new(),
273            edge_endpoints: HashMap::new(),
274            vertex_labels: HashMap::new(),
275            edge_types: HashMap::new(),
276            current_version: start_version,
277            mutation_count: 0,
278            mutation_stats: MutationStats::default(),
279            wal,
280            wal_lsn_at_flush: 0,
281            vertex_created_at: HashMap::new(),
282            vertex_updated_at: HashMap::new(),
283            edge_created_at: HashMap::new(),
284            edge_updated_at: HashMap::new(),
285            estimated_size: 0,
286            constraint_index: HashMap::new(),
287        }
288    }
289
290    pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
291        self.insert_vertex_with_labels(vid, properties, &[]);
292    }
293
294    /// Insert a vertex with associated labels.
295    pub fn insert_vertex_with_labels(
296        &mut self,
297        vid: Vid,
298        properties: Properties,
299        labels: &[String],
300    ) {
301        self.current_version += 1;
302        let version = self.current_version;
303        let now = now_nanos();
304
305        if let Some(wal) = &self.wal {
306            let _ = wal.append(&Mutation::InsertVertex {
307                vid,
308                properties: properties.clone(),
309                labels: labels.to_vec(),
310            });
311        }
312
313        self.vertex_tombstones.remove(&vid);
314
315        let entry = self.vertex_properties.entry(vid).or_default();
316        Self::merge_crdt_properties(entry, properties.clone());
317        self.vertex_versions.insert(vid, version);
318
319        // Set timestamps - created_at only set if this is a new vertex
320        self.vertex_created_at.entry(vid).or_insert(now);
321        self.vertex_updated_at.insert(vid, now);
322
323        // Track labels — always create an entry so unlabeled vertices are
324        // distinguishable from "not in L0" when queried via get_vertex_labels.
325        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
326        let existing = self.vertex_labels.entry(vid).or_default();
327        Self::append_unique_labels(existing, labels);
328
329        self.graph.add_vertex(vid);
330        self.mutation_count += 1;
331        self.mutation_stats.nodes_created += 1;
332        self.mutation_stats.properties_set += properties.len();
333        self.mutation_stats.labels_added += labels.len();
334
335        let props_size = Self::estimate_properties_size(&properties);
336        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
337    }
338
339    /// Add labels to an existing vertex.
340    pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
341        let existing = self.vertex_labels.entry(vid).or_default();
342        Self::append_unique_labels(existing, labels);
343    }
344
345    /// Remove a label from an existing vertex.
346    /// Returns true if the label was found and removed, false otherwise.
347    pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
348        if let Some(labels) = self.vertex_labels.get_mut(&vid)
349            && let Some(pos) = labels.iter().position(|l| l == label)
350        {
351            labels.remove(pos);
352            self.current_version += 1;
353            self.mutation_count += 1;
354            self.mutation_stats.labels_removed += 1;
355            // Note: WAL logging for label mutations not yet implemented
356            // Currently consistent with add_vertex_labels behavior
357            return true;
358        }
359        false
360    }
361
362    /// Set the type for an edge.
363    pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
364        self.edge_types.insert(eid, edge_type);
365    }
366
367    pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
368        self.current_version += 1;
369
370        if let Some(wal) = &mut self.wal {
371            let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
372            wal.append(&Mutation::DeleteVertex { vid, labels })?;
373        }
374
375        self.apply_vertex_deletion(vid);
376        Ok(())
377    }
378
379    /// Cascade-delete a vertex: tombstone all connected edges and remove the vertex.
380    ///
381    /// Shared between `delete_vertex` (live mutations) and `replay_mutations` (WAL recovery).
382    fn apply_vertex_deletion(&mut self, vid: Vid) {
383        let version = self.current_version;
384
385        // Collect edges to delete using O(degree) neighbors() instead of O(E) scan
386        let mut edges_to_remove = HashSet::new();
387
388        // Collect outgoing edges
389        for entry in self.graph.neighbors(vid, Direction::Outgoing) {
390            edges_to_remove.insert(entry.eid);
391        }
392
393        // Collect incoming edges
394        for entry in self.graph.neighbors(vid, Direction::Incoming) {
395            edges_to_remove.insert(entry.eid); // HashSet handles self-loop deduplication
396        }
397
398        let cascaded_edges_count = edges_to_remove.len();
399
400        // Tombstone and remove all collected edges
401        for eid in edges_to_remove {
402            // Retrieve edge endpoints from the map to create tombstone
403            if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
404                self.tombstones.insert(
405                    eid,
406                    TombstoneEntry {
407                        eid,
408                        src_vid: *src,
409                        dst_vid: *dst,
410                        edge_type: *etype,
411                    },
412                );
413                self.edge_versions.insert(eid, version);
414                self.edge_endpoints.remove(&eid);
415                self.edge_properties.remove(&eid);
416                self.graph.remove_edge(eid);
417                self.mutation_count += 1;
418                self.mutation_stats.relationships_deleted += 1;
419            }
420        }
421
422        self.vertex_tombstones.insert(vid);
423        self.vertex_properties.remove(&vid);
424        self.vertex_versions.insert(vid, version);
425        self.graph.remove_vertex(vid);
426        self.mutation_count += 1;
427        self.mutation_stats.nodes_deleted += 1;
428
429        // Remove constraint index entries for this vertex
430        self.constraint_index.retain(|_, v| *v != vid);
431
432        // 64 bytes per edge tombstone + 8 for vertex tombstone
433        self.estimated_size += cascaded_edges_count * 72 + 8;
434    }
435
436    pub fn insert_edge(
437        &mut self,
438        src_vid: Vid,
439        dst_vid: Vid,
440        edge_type: u32,
441        eid: Eid,
442        properties: Properties,
443        edge_type_name: Option<String>,
444    ) -> Result<()> {
445        self.current_version += 1;
446        let now = now_nanos();
447
448        if let Some(wal) = &mut self.wal {
449            wal.append(&Mutation::InsertEdge {
450                src_vid,
451                dst_vid,
452                edge_type,
453                eid,
454                version: self.current_version,
455                properties: properties.clone(),
456                edge_type_name: edge_type_name.clone(),
457            })?;
458        }
459
460        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
461
462        // Store edge type name in metadata if provided
463        let type_name_size = if let Some(ref name) = edge_type_name {
464            let size = name.len() + 24;
465            self.edge_types.insert(eid, name.clone());
466            size
467        } else {
468            0
469        };
470
471        // Set timestamps - created_at only set if this is a new edge
472        self.edge_created_at.entry(eid).or_insert(now);
473        self.edge_updated_at.insert(eid, now);
474
475        self.estimated_size += type_name_size;
476
477        Ok(())
478    }
479
480    /// Core edge insertion logic: add vertices, add edge, merge properties, update metadata.
481    ///
482    /// Shared between `insert_edge` (live mutations) and `replay_mutations` (WAL recovery).
483    ///
484    /// # Errors
485    ///
486    /// Returns error if either endpoint vertex has been deleted (exists in vertex_tombstones).
487    /// This prevents "ghost vertex" resurrection via edge insertion. See issue #77.
488    fn apply_edge_insertion(
489        &mut self,
490        src_vid: Vid,
491        dst_vid: Vid,
492        edge_type: u32,
493        eid: Eid,
494        properties: Properties,
495    ) -> Result<()> {
496        let version = self.current_version;
497
498        // Check if either endpoint has been deleted. Inserting an edge to a deleted
499        // vertex would resurrect it as a "ghost vertex" with no properties. See issue #77.
500        if self.vertex_tombstones.contains(&src_vid) {
501            anyhow::bail!(
502                "Cannot insert edge: source vertex {} has been deleted (issue #77)",
503                src_vid
504            );
505        }
506        if self.vertex_tombstones.contains(&dst_vid) {
507            anyhow::bail!(
508                "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
509                dst_vid
510            );
511        }
512
513        // Add vertices to graph topology if they don't exist.
514        // IMPORTANT: Only add to graph structure, do NOT call insert_vertex.
515        // insert_vertex creates a new version with empty properties, which would
516        // cause MVCC to pick the empty version as "latest", losing original properties.
517        if !self.graph.contains_vertex(src_vid) {
518            self.graph.add_vertex(src_vid);
519        }
520        if !self.graph.contains_vertex(dst_vid) {
521            self.graph.add_vertex(dst_vid);
522        }
523
524        self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
525
526        // Store metadata with CRDT merge logic
527        let props_size = Self::estimate_properties_size(&properties);
528        let props_count = properties.len();
529        if !properties.is_empty() {
530            let entry = self.edge_properties.entry(eid).or_default();
531            Self::merge_crdt_properties(entry, properties);
532        }
533
534        self.edge_versions.insert(eid, version);
535        self.edge_endpoints
536            .insert(eid, (src_vid, dst_vid, edge_type));
537        self.tombstones.remove(&eid);
538        self.mutation_count += 1;
539        self.mutation_stats.relationships_created += 1;
540        self.mutation_stats.properties_set += props_count;
541
542        // 24 edge + props + 16 version + 28 endpoints + 32 timestamps
543        self.estimated_size += 24 + props_size + 16 + 28 + 32;
544
545        Ok(())
546    }
547
548    pub fn delete_edge(
549        &mut self,
550        eid: Eid,
551        src_vid: Vid,
552        dst_vid: Vid,
553        edge_type: u32,
554    ) -> Result<()> {
555        self.current_version += 1;
556        let now = now_nanos();
557
558        if let Some(wal) = &mut self.wal {
559            wal.append(&Mutation::DeleteEdge {
560                eid,
561                src_vid,
562                dst_vid,
563                edge_type,
564                version: self.current_version,
565            })?;
566        }
567
568        self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
569
570        // Update timestamp - deletion is an update
571        self.edge_updated_at.insert(eid, now);
572
573        Ok(())
574    }
575
576    /// Core edge deletion logic: tombstone the edge, update version, remove from graph.
577    ///
578    /// Shared between `delete_edge` (live mutations) and `replay_mutations` (WAL recovery).
579    fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
580        let version = self.current_version;
581
582        self.tombstones.insert(
583            eid,
584            TombstoneEntry {
585                eid,
586                src_vid,
587                dst_vid,
588                edge_type,
589            },
590        );
591        self.edge_versions.insert(eid, version);
592        self.graph.remove_edge(eid);
593        self.mutation_count += 1;
594        self.mutation_stats.relationships_deleted += 1;
595
596        // 64 bytes tombstone + 16 bytes version
597        self.estimated_size += 80;
598    }
599
600    /// Returns neighbors in the specified direction.
601    /// O(degree) complexity - iterates only edges connected to the vertex.
602    pub fn get_neighbors(
603        &self,
604        vid: Vid,
605        edge_type: u32,
606        direction: Direction,
607    ) -> Vec<(Vid, Eid, u64)> {
608        let edges = self.graph.neighbors(vid, direction);
609
610        edges
611            .iter()
612            .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
613            .map(|e| {
614                let neighbor = match direction {
615                    Direction::Outgoing => e.dst_vid,
616                    Direction::Incoming => e.src_vid,
617                };
618                let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
619                (neighbor, e.eid, version)
620            })
621            .collect()
622    }
623
624    pub fn is_tombstoned(&self, eid: Eid) -> bool {
625        self.tombstones.contains_key(&eid)
626    }
627
628    /// Returns all VIDs in vertex_labels that match the given label name.
629    /// Used for L0 overlay during vertex scanning.
630    pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
631        self.vertex_labels
632            .iter()
633            .filter(|(_, labels)| labels.iter().any(|l| l == label_name))
634            .map(|(vid, _)| *vid)
635            .collect()
636    }
637
638    /// Returns all vertex VIDs in the L0 buffer.
639    ///
640    /// Used for schemaless scanning (MATCH (n) without label).
641    pub fn all_vertex_vids(&self) -> Vec<Vid> {
642        self.vertex_properties.keys().copied().collect()
643    }
644
645    /// Returns all VIDs in vertex_labels that match any of the given label names.
646    /// Used for L0 overlay during multi-label vertex scanning.
647    pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
648        self.vertex_labels
649            .iter()
650            .filter(|(_, labels)| label_names.iter().any(|ln| labels.iter().any(|l| l == *ln)))
651            .map(|(vid, _)| *vid)
652            .collect()
653    }
654
655    /// Returns all VIDs that have ALL specified labels.
656    pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
657        self.vertex_labels
658            .iter()
659            .filter(|(_, labels)| label_names.iter().all(|ln| labels.iter().any(|l| l == *ln)))
660            .map(|(vid, _)| *vid)
661            .collect()
662    }
663
664    /// Gets the labels for a VID.
665    pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
666        self.vertex_labels.get(&vid).map(|v| v.as_slice())
667    }
668
669    /// Gets the edge type for an EID.
670    pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
671        self.edge_types.get(&eid).map(|s| s.as_str())
672    }
673
674    /// Returns all EIDs in edge_types that match the given type name.
675    /// Used for L0 overlay during schemaless edge scanning.
676    pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
677        self.edge_types
678            .iter()
679            .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
680            .map(|(eid, _)| *eid)
681            .collect()
682    }
683
684    /// Returns all edge EIDs in the L0 buffer (non-tombstoned).
685    ///
686    /// Used for schemaless scanning (`MATCH ()-[r]->()`) without type.
687    pub fn all_edge_eids(&self) -> Vec<Eid> {
688        self.edge_endpoints
689            .keys()
690            .filter(|eid| !self.tombstones.contains_key(eid))
691            .copied()
692            .collect()
693    }
694
695    /// Returns edge endpoint data (src_vid, dst_vid) for an EID.
696    pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
697        self.edge_endpoints
698            .get(&eid)
699            .map(|(src, dst, _)| (*src, *dst))
700    }
701
702    /// Returns full edge endpoint data (src_vid, dst_vid, edge_type_id) for an EID.
703    pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
704        self.edge_endpoints.get(&eid).copied()
705    }
706
707    /// Insert a constraint key into the index for O(1) duplicate detection.
708    pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
709        self.constraint_index.insert(key, vid);
710    }
711
712    /// Check if a constraint key exists in the index, excluding a specific VID.
713    /// Returns true if the key exists and is owned by a different vertex.
714    pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
715        self.constraint_index
716            .get(key)
717            .is_some_and(|&v| v != exclude_vid)
718    }
719
720    #[instrument(skip(self, other), level = "trace")]
721    pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
722        trace!(
723            other_mutation_count = other.mutation_count,
724            "Merging L0 buffer"
725        );
726        // Merge Vertices
727        for &vid in &other.vertex_tombstones {
728            self.delete_vertex(vid)?;
729        }
730
731        for (vid, props) in &other.vertex_properties {
732            let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
733            self.insert_vertex_with_labels(*vid, props.clone(), &labels);
734        }
735
736        // Merge vertex labels that might not have properties
737        for (vid, labels) in &other.vertex_labels {
738            if !self.vertex_labels.contains_key(vid) {
739                self.vertex_labels.insert(*vid, labels.clone());
740            }
741        }
742
743        // Merge Edges - insert all edges from edge_endpoints, using empty props if none exist
744        for (eid, (src, dst, etype)) in &other.edge_endpoints {
745            if other.tombstones.contains_key(eid) {
746                self.delete_edge(*eid, *src, *dst, *etype)?;
747            } else {
748                let props = other.edge_properties.get(eid).cloned().unwrap_or_default();
749                let etype_name = other.edge_types.get(eid).cloned();
750                self.insert_edge(*src, *dst, *etype, *eid, props, etype_name)?;
751            }
752        }
753
754        // Merge tombstones for edges that only exist in the target buffer (self),
755        // not in the source buffer's edge_endpoints.  Without this, transaction
756        // DELETEs of pre-existing edges are silently lost on commit.
757        for (eid, tombstone) in &other.tombstones {
758            if !other.edge_endpoints.contains_key(eid) {
759                self.delete_edge(
760                    *eid,
761                    tombstone.src_vid,
762                    tombstone.dst_vid,
763                    tombstone.edge_type,
764                )?;
765            }
766        }
767
768        // Edge types are now merged inside insert_edge, so no separate loop needed
769
770        // Merge timestamps - preserve semantics of or_insert (keep oldest created_at)
771        // and insert (use latest updated_at)
772        for (vid, ts) in &other.vertex_created_at {
773            self.vertex_created_at.entry(*vid).or_insert(*ts); // keep oldest
774        }
775        for (vid, ts) in &other.vertex_updated_at {
776            self.vertex_updated_at.insert(*vid, *ts); // use latest (tx wins)
777        }
778
779        for (eid, ts) in &other.edge_created_at {
780            self.edge_created_at.entry(*eid).or_insert(*ts); // keep oldest
781        }
782        for (eid, ts) in &other.edge_updated_at {
783            self.edge_updated_at.insert(*eid, *ts); // use latest (tx wins)
784        }
785
786        // Conservatively add other's estimated size (may overcount due to
787        // deduplication, but that's safe for a memory limit).
788        self.estimated_size += other.estimated_size;
789
790        // Merge constraint index
791        for (key, vid) in &other.constraint_index {
792            self.constraint_index.insert(key.clone(), *vid);
793        }
794
795        Ok(())
796    }
797
798    /// Replay mutations from WAL without re-logging them.
799    /// Used during startup recovery to restore L0 state from persisted WAL.
800    /// Uses CRDT merge semantics to ensure recovered state matches pre-crash state.
801    #[instrument(skip(self, mutations), level = "debug")]
802    pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
803        trace!(count = mutations.len(), "Replaying mutations");
804        for mutation in mutations {
805            match mutation {
806                Mutation::InsertVertex {
807                    vid,
808                    properties,
809                    labels,
810                } => {
811                    // Apply without WAL logging, with CRDT merge semantics
812                    self.current_version += 1;
813                    let version = self.current_version;
814
815                    self.vertex_tombstones.remove(&vid);
816                    let entry = self.vertex_properties.entry(vid).or_default();
817                    Self::merge_crdt_properties(entry, properties);
818                    self.vertex_versions.insert(vid, version);
819                    self.graph.add_vertex(vid);
820                    self.mutation_count += 1;
821
822                    // Restore vertex labels from WAL
823                    let existing = self.vertex_labels.entry(vid).or_default();
824                    Self::append_unique_labels(existing, &labels);
825                }
826                Mutation::DeleteVertex { vid, labels } => {
827                    self.current_version += 1;
828                    // Restore labels BEFORE apply_vertex_deletion
829                    if !labels.is_empty() {
830                        let existing = self.vertex_labels.entry(vid).or_default();
831                        Self::append_unique_labels(existing, &labels);
832                    }
833                    self.apply_vertex_deletion(vid);
834                }
835                Mutation::InsertEdge {
836                    src_vid,
837                    dst_vid,
838                    edge_type,
839                    eid,
840                    version: _,
841                    properties,
842                    edge_type_name,
843                } => {
844                    self.current_version += 1;
845                    self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
846                    // Restore edge type name metadata if present
847                    if let Some(name) = edge_type_name {
848                        self.edge_types.insert(eid, name);
849                    }
850                }
851                Mutation::DeleteEdge {
852                    eid,
853                    src_vid,
854                    dst_vid,
855                    edge_type,
856                    version: _,
857                } => {
858                    self.current_version += 1;
859                    self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
860                }
861            }
862        }
863        Ok(())
864    }
865}
866
867#[cfg(test)]
868mod tests {
869    use super::*;
870
871    #[test]
872    fn test_l0_buffer_ops() -> Result<()> {
873        let mut l0 = L0Buffer::new(0, None);
874        let vid_a = Vid::new(1);
875        let vid_b = Vid::new(2);
876        let eid_ab = Eid::new(101);
877
878        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
879
880        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
881        assert_eq!(neighbors.len(), 1);
882        assert_eq!(neighbors[0].0, vid_b);
883        assert_eq!(neighbors[0].1, eid_ab);
884
885        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
886        assert!(l0.is_tombstoned(eid_ab));
887
888        // Verify neighbors are empty after deletion
889        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
890        assert_eq!(neighbors_after.len(), 0);
891
892        Ok(())
893    }
894
895    #[test]
896    fn test_l0_buffer_multiple_edges() -> Result<()> {
897        let mut l0 = L0Buffer::new(0, None);
898        let vid_a = Vid::new(1);
899        let vid_b = Vid::new(2);
900        let vid_c = Vid::new(3);
901        let eid_ab = Eid::new(101);
902        let eid_ac = Eid::new(102);
903
904        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
905        l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
906
907        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
908        assert_eq!(neighbors.len(), 2);
909
910        // Delete one edge
911        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
912
913        // Should still have one neighbor
914        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
915        assert_eq!(neighbors_after.len(), 1);
916        assert_eq!(neighbors_after[0].0, vid_c);
917
918        Ok(())
919    }
920
921    #[test]
922    fn test_l0_buffer_edge_type_filter() -> Result<()> {
923        let mut l0 = L0Buffer::new(0, None);
924        let vid_a = Vid::new(1);
925        let vid_b = Vid::new(2);
926        let vid_c = Vid::new(3);
927        let eid_ab = Eid::new(101);
928        let eid_ac = Eid::new(201); // Different edge type
929
930        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
931        l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
932
933        // Filter by edge type 1
934        let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
935        assert_eq!(type1_neighbors.len(), 1);
936        assert_eq!(type1_neighbors[0].0, vid_b);
937
938        // Filter by edge type 2
939        let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
940        assert_eq!(type2_neighbors.len(), 1);
941        assert_eq!(type2_neighbors[0].0, vid_c);
942
943        Ok(())
944    }
945
946    #[test]
947    fn test_l0_buffer_incoming_edges() -> Result<()> {
948        let mut l0 = L0Buffer::new(0, None);
949        let vid_a = Vid::new(1);
950        let vid_b = Vid::new(2);
951        let vid_c = Vid::new(3);
952        let eid_ab = Eid::new(101);
953        let eid_cb = Eid::new(102);
954
955        // a -> b and c -> b
956        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
957        l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
958
959        // Check incoming edges to b
960        let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
961        assert_eq!(incoming.len(), 2);
962
963        Ok(())
964    }
965
966    /// Regression test: merge should preserve edges without properties
967    #[test]
968    fn test_merge_empty_props_edge() -> Result<()> {
969        let mut main_l0 = L0Buffer::new(0, None);
970        let mut tx_l0 = L0Buffer::new(0, None);
971
972        let vid_a = Vid::new(1);
973        let vid_b = Vid::new(2);
974        let eid_ab = Eid::new(101);
975
976        // Insert edge with empty properties in transaction L0
977        tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
978
979        // Verify edge exists in tx_l0
980        assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
981        assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); // No properties entry
982
983        // Merge into main L0
984        main_l0.merge(&tx_l0)?;
985
986        // Edge should exist in main L0 after merge
987        assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
988        let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
989        assert_eq!(neighbors.len(), 1);
990        assert_eq!(neighbors[0].0, vid_b);
991
992        Ok(())
993    }
994
995    /// Regression test: WAL replay should use CRDT merge semantics
996    #[test]
997    fn test_replay_crdt_merge() -> Result<()> {
998        use crate::runtime::wal::Mutation;
999        use serde_json::json;
1000        use uni_common::Value;
1001
1002        let mut l0 = L0Buffer::new(0, None);
1003        let vid = Vid::new(1);
1004
1005        // Create GCounter CRDT values using correct serde format:
1006        // {"t": "gc", "d": {"counts": {...}}}
1007        let counter1: Value = json!({
1008            "t": "gc",
1009            "d": {"counts": {"node1": 5}}
1010        })
1011        .into();
1012        let counter2: Value = json!({
1013            "t": "gc",
1014            "d": {"counts": {"node2": 3}}
1015        })
1016        .into();
1017
1018        // First mutation: insert vertex with counter1
1019        let mut props1 = HashMap::new();
1020        props1.insert("counter".to_string(), counter1.clone());
1021        l0.replay_mutations(vec![Mutation::InsertVertex {
1022            vid,
1023            properties: props1,
1024            labels: vec![],
1025        }])?;
1026
1027        // Second mutation: insert same vertex with counter2 (should merge)
1028        let mut props2 = HashMap::new();
1029        props2.insert("counter".to_string(), counter2.clone());
1030        l0.replay_mutations(vec![Mutation::InsertVertex {
1031            vid,
1032            properties: props2,
1033            labels: vec![],
1034        }])?;
1035
1036        // Verify CRDT was merged (both node1 and node2 counts present)
1037        let stored_props = l0.vertex_properties.get(&vid).unwrap();
1038        let stored_counter = stored_props.get("counter").unwrap();
1039
1040        // Convert back to serde_json::Value for nested access
1041        let stored_json: serde_json::Value = stored_counter.clone().into();
1042        // The merged counter should have both node1: 5 and node2: 3
1043        let data = stored_json.get("d").unwrap();
1044        let counts = data.get("counts").unwrap();
1045        assert_eq!(counts.get("node1"), Some(&json!(5)));
1046        assert_eq!(counts.get("node2"), Some(&json!(3)));
1047
1048        Ok(())
1049    }
1050
1051    #[test]
1052    fn test_merge_preserves_vertex_timestamps() -> Result<()> {
1053        let mut l0_main = L0Buffer::new(0, None);
1054        let mut l0_tx = L0Buffer::new(0, None);
1055        let vid = Vid::new(1);
1056
1057        // Main buffer: insert vertex with timestamp T1
1058        let ts_main_created = 1000;
1059        let ts_main_updated = 1100;
1060        l0_main.insert_vertex(vid, HashMap::new());
1061        l0_main.vertex_created_at.insert(vid, ts_main_created);
1062        l0_main.vertex_updated_at.insert(vid, ts_main_updated);
1063
1064        // Transaction buffer: update same vertex with timestamp T2 (later)
1065        let ts_tx_created = 2000; // should be ignored (main has older created_at)
1066        let ts_tx_updated = 2100; // should win (tx has newer updated_at)
1067        l0_tx.insert_vertex(vid, HashMap::new());
1068        l0_tx.vertex_created_at.insert(vid, ts_tx_created);
1069        l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
1070
1071        // Merge transaction into main
1072        l0_main.merge(&l0_tx)?;
1073
1074        // Verify created_at is oldest (from main)
1075        assert_eq!(
1076            *l0_main.vertex_created_at.get(&vid).unwrap(),
1077            ts_main_created,
1078            "created_at should preserve oldest timestamp"
1079        );
1080
1081        // Verify updated_at is latest (from tx)
1082        assert_eq!(
1083            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1084            ts_tx_updated,
1085            "updated_at should use latest timestamp"
1086        );
1087
1088        Ok(())
1089    }
1090
1091    #[test]
1092    fn test_merge_preserves_edge_timestamps() -> Result<()> {
1093        let mut l0_main = L0Buffer::new(0, None);
1094        let mut l0_tx = L0Buffer::new(0, None);
1095        let vid_a = Vid::new(1);
1096        let vid_b = Vid::new(2);
1097        let eid = Eid::new(100);
1098
1099        // Main buffer: insert edge with timestamp T1
1100        let ts_main_created = 1000;
1101        let ts_main_updated = 1100;
1102        l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1103        l0_main.edge_created_at.insert(eid, ts_main_created);
1104        l0_main.edge_updated_at.insert(eid, ts_main_updated);
1105
1106        // Transaction buffer: update same edge with timestamp T2 (later)
1107        let ts_tx_created = 2000; // should be ignored
1108        let ts_tx_updated = 2100; // should win
1109        l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1110        l0_tx.edge_created_at.insert(eid, ts_tx_created);
1111        l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1112
1113        // Merge transaction into main
1114        l0_main.merge(&l0_tx)?;
1115
1116        // Verify created_at is oldest (from main)
1117        assert_eq!(
1118            *l0_main.edge_created_at.get(&eid).unwrap(),
1119            ts_main_created,
1120            "edge created_at should preserve oldest timestamp"
1121        );
1122
1123        // Verify updated_at is latest (from tx)
1124        assert_eq!(
1125            *l0_main.edge_updated_at.get(&eid).unwrap(),
1126            ts_tx_updated,
1127            "edge updated_at should use latest timestamp"
1128        );
1129
1130        Ok(())
1131    }
1132
1133    #[test]
1134    fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1135        use uni_common::Value;
1136
1137        let mut l0_main = L0Buffer::new(0, None);
1138        let mut l0_tx = L0Buffer::new(0, None);
1139        let vid = Vid::new(1);
1140
1141        // Main buffer: vertex created at T1
1142        let ts_original = 1000;
1143        l0_main.insert_vertex(vid, HashMap::new());
1144        l0_main.vertex_created_at.insert(vid, ts_original);
1145        l0_main.vertex_updated_at.insert(vid, ts_original);
1146
1147        // Transaction buffer: update vertex (created_at would be T2 if set)
1148        let ts_tx = 2000;
1149        let mut props = HashMap::new();
1150        props.insert("updated".to_string(), Value::String("yes".to_string()));
1151        l0_tx.insert_vertex(vid, props);
1152        l0_tx.vertex_created_at.insert(vid, ts_tx);
1153        l0_tx.vertex_updated_at.insert(vid, ts_tx);
1154
1155        // Merge transaction into main
1156        l0_main.merge(&l0_tx)?;
1157
1158        // Verify created_at was NOT overwritten (still T1, not T2)
1159        assert_eq!(
1160            *l0_main.vertex_created_at.get(&vid).unwrap(),
1161            ts_original,
1162            "created_at must not be overwritten for existing vertex"
1163        );
1164
1165        // Verify updated_at WAS updated (now T2)
1166        assert_eq!(
1167            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1168            ts_tx,
1169            "updated_at should reflect transaction timestamp"
1170        );
1171
1172        // Verify properties were merged
1173        assert!(
1174            l0_main
1175                .vertex_properties
1176                .get(&vid)
1177                .unwrap()
1178                .contains_key("updated")
1179        );
1180
1181        Ok(())
1182    }
1183
1184    /// Test for Issue #23: Vertex labels preserved through replay_mutations
1185    #[test]
1186    fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1187        use crate::runtime::wal::Mutation;
1188
1189        let mut l0 = L0Buffer::new(0, None);
1190        let vid = Vid::new(42);
1191
1192        // Create InsertVertex mutation with labels
1193        let mutations = vec![Mutation::InsertVertex {
1194            vid,
1195            properties: {
1196                let mut props = HashMap::new();
1197                props.insert(
1198                    "name".to_string(),
1199                    uni_common::Value::String("Alice".to_string()),
1200                );
1201                props
1202            },
1203            labels: vec!["Person".to_string(), "User".to_string()],
1204        }];
1205
1206        // Replay mutations
1207        l0.replay_mutations(mutations)?;
1208
1209        // Verify vertex exists in L0
1210        assert!(l0.vertex_properties.contains_key(&vid));
1211
1212        // Verify labels are preserved
1213        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1214        assert_eq!(labels.len(), 2);
1215        assert!(labels.contains(&"Person".to_string()));
1216        assert!(labels.contains(&"User".to_string()));
1217
1218        // Verify vertex is findable by label
1219        let person_vids = l0.vids_for_label("Person");
1220        assert_eq!(person_vids.len(), 1);
1221        assert_eq!(person_vids[0], vid);
1222
1223        let user_vids = l0.vids_for_label("User");
1224        assert_eq!(user_vids.len(), 1);
1225        assert_eq!(user_vids[0], vid);
1226
1227        Ok(())
1228    }
1229
1230    /// Test for Issue #23: DeleteVertex labels preserved for tombstone flushing
1231    #[test]
1232    fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
1233        use crate::runtime::wal::Mutation;
1234
1235        let mut l0 = L0Buffer::new(0, None);
1236        let vid = Vid::new(99);
1237
1238        // First insert vertex with labels
1239        l0.insert_vertex_with_labels(
1240            vid,
1241            HashMap::new(),
1242            &["Person".to_string(), "Admin".to_string()],
1243        );
1244
1245        // Verify vertex and labels exist
1246        assert!(l0.vertex_properties.contains_key(&vid));
1247        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1248        assert_eq!(labels.len(), 2);
1249
1250        // Create DeleteVertex mutation with labels
1251        let mutations = vec![Mutation::DeleteVertex {
1252            vid,
1253            labels: vec!["Person".to_string(), "Admin".to_string()],
1254        }];
1255
1256        // Replay deletion
1257        l0.replay_mutations(mutations)?;
1258
1259        // Verify vertex is tombstoned
1260        assert!(l0.vertex_tombstones.contains(&vid));
1261
1262        // Verify labels are preserved in L0 (needed for Issue #76 tombstone flushing)
1263        // The labels should still be accessible for the flush logic to know which tables to update
1264        let labels = l0.get_vertex_labels(vid);
1265        assert!(
1266            labels.is_some(),
1267            "Labels should be preserved even after deletion for tombstone flushing"
1268        );
1269
1270        Ok(())
1271    }
1272
1273    /// Test for Issue #28: Edge type name preserved through replay_mutations
1274    #[test]
1275    fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
1276        use crate::runtime::wal::Mutation;
1277
1278        let mut l0 = L0Buffer::new(0, None);
1279        let src = Vid::new(1);
1280        let dst = Vid::new(2);
1281        let eid = Eid::new(500);
1282        let edge_type = 100;
1283
1284        // Create InsertEdge mutation with edge_type_name
1285        let mutations = vec![Mutation::InsertEdge {
1286            src_vid: src,
1287            dst_vid: dst,
1288            edge_type,
1289            eid,
1290            version: 1,
1291            properties: {
1292                let mut props = HashMap::new();
1293                props.insert("since".to_string(), uni_common::Value::Int(2020));
1294                props
1295            },
1296            edge_type_name: Some("KNOWS".to_string()),
1297        }];
1298
1299        // Replay mutations
1300        l0.replay_mutations(mutations)?;
1301
1302        // Verify edge exists in L0
1303        assert!(l0.edge_endpoints.contains_key(&eid));
1304
1305        // Verify edge type name is preserved
1306        let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
1307        assert_eq!(type_name, "KNOWS");
1308
1309        // Verify edge is findable by type name
1310        let knows_eids = l0.eids_for_type("KNOWS");
1311        assert_eq!(knows_eids.len(), 1);
1312        assert_eq!(knows_eids[0], eid);
1313
1314        Ok(())
1315    }
1316
1317    /// Test for Issue #28: Edge type mapping survives multiple replay cycles
1318    #[test]
1319    fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
1320        use crate::runtime::wal::Mutation;
1321
1322        let mut l0 = L0Buffer::new(0, None);
1323
1324        // Replay multiple edge insertions with different types
1325        let mutations = vec![
1326            Mutation::InsertEdge {
1327                src_vid: Vid::new(1),
1328                dst_vid: Vid::new(2),
1329                edge_type: 100,
1330                eid: Eid::new(1000),
1331                version: 1,
1332                properties: HashMap::new(),
1333                edge_type_name: Some("KNOWS".to_string()),
1334            },
1335            Mutation::InsertEdge {
1336                src_vid: Vid::new(2),
1337                dst_vid: Vid::new(3),
1338                edge_type: 101,
1339                eid: Eid::new(1001),
1340                version: 2,
1341                properties: HashMap::new(),
1342                edge_type_name: Some("LIKES".to_string()),
1343            },
1344            Mutation::InsertEdge {
1345                src_vid: Vid::new(3),
1346                dst_vid: Vid::new(1),
1347                edge_type: 100,
1348                eid: Eid::new(1002),
1349                version: 3,
1350                properties: HashMap::new(),
1351                edge_type_name: Some("KNOWS".to_string()),
1352            },
1353        ];
1354
1355        l0.replay_mutations(mutations)?;
1356
1357        // Verify all edge type mappings are preserved
1358        assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
1359        assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
1360        assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
1361
1362        // Verify edges can be queried by type
1363        let knows_edges = l0.eids_for_type("KNOWS");
1364        assert_eq!(knows_edges.len(), 2);
1365        assert!(knows_edges.contains(&Eid::new(1000)));
1366        assert!(knows_edges.contains(&Eid::new(1002)));
1367
1368        let likes_edges = l0.eids_for_type("LIKES");
1369        assert_eq!(likes_edges.len(), 1);
1370        assert_eq!(likes_edges[0], Eid::new(1001));
1371
1372        Ok(())
1373    }
1374
1375    /// Test for Issue #23 + #28: Combined vertex labels and edge types in replay
1376    #[test]
1377    fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
1378        use crate::runtime::wal::Mutation;
1379
1380        let mut l0 = L0Buffer::new(0, None);
1381        let alice = Vid::new(1);
1382        let bob = Vid::new(2);
1383        let eid = Eid::new(100);
1384
1385        // Simulate crash recovery scenario: replay full transaction log
1386        let mutations = vec![
1387            // Insert Alice with Person label
1388            Mutation::InsertVertex {
1389                vid: alice,
1390                properties: {
1391                    let mut props = HashMap::new();
1392                    props.insert(
1393                        "name".to_string(),
1394                        uni_common::Value::String("Alice".to_string()),
1395                    );
1396                    props
1397                },
1398                labels: vec!["Person".to_string()],
1399            },
1400            // Insert Bob with Person label
1401            Mutation::InsertVertex {
1402                vid: bob,
1403                properties: {
1404                    let mut props = HashMap::new();
1405                    props.insert(
1406                        "name".to_string(),
1407                        uni_common::Value::String("Bob".to_string()),
1408                    );
1409                    props
1410                },
1411                labels: vec!["Person".to_string()],
1412            },
1413            // Create KNOWS edge between them
1414            Mutation::InsertEdge {
1415                src_vid: alice,
1416                dst_vid: bob,
1417                edge_type: 1,
1418                eid,
1419                version: 3,
1420                properties: HashMap::new(),
1421                edge_type_name: Some("KNOWS".to_string()),
1422            },
1423        ];
1424
1425        // Replay all mutations
1426        l0.replay_mutations(mutations)?;
1427
1428        // Verify vertex labels preserved
1429        assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
1430        assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
1431        assert_eq!(l0.vids_for_label("Person").len(), 2);
1432
1433        // Verify edge type name preserved
1434        assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
1435        assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
1436
1437        // Verify graph structure
1438        let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
1439        assert_eq!(alice_neighbors.len(), 1);
1440        assert_eq!(alice_neighbors[0].0, bob);
1441
1442        Ok(())
1443    }
1444
1445    /// Test for Issue #23: Empty labels should deserialize correctly (backward compat)
1446    #[test]
1447    fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
1448        use crate::runtime::wal::Mutation;
1449
1450        let mut l0 = L0Buffer::new(0, None);
1451        let vid = Vid::new(1);
1452
1453        // Simulate old WAL format: InsertVertex with empty labels
1454        // (This tests #[serde(default)] behavior)
1455        let mutations = vec![Mutation::InsertVertex {
1456            vid,
1457            properties: HashMap::new(),
1458            labels: vec![], // Empty labels (old format compatibility)
1459        }];
1460
1461        l0.replay_mutations(mutations)?;
1462
1463        // Vertex should exist
1464        assert!(l0.vertex_properties.contains_key(&vid));
1465
1466        // Labels should be empty but entry should exist in vertex_labels
1467        let labels = l0.get_vertex_labels(vid);
1468        assert!(labels.is_some(), "Labels entry should exist even if empty");
1469        assert_eq!(labels.unwrap().len(), 0);
1470
1471        Ok(())
1472    }
1473
1474    #[test]
1475    fn test_now_nanos_returns_nanosecond_range() {
1476        // Test that now_nanos() returns a value in nanosecond range
1477        // As of 2025, Unix timestamp in nanoseconds should be > 1.7e18
1478        // (2025-01-01 is approximately 1,735,689,600 seconds = 1.735e18 nanoseconds)
1479        let now = now_nanos();
1480
1481        // Verify it's in nanosecond range (not microseconds which would be 1000x smaller)
1482        assert!(
1483            now > 1_700_000_000_000_000_000,
1484            "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
1485            now
1486        );
1487
1488        // Sanity check: should also be less than year 2100 in nanoseconds (4.1e18)
1489        assert!(
1490            now < 4_100_000_000_000_000_000,
1491            "now_nanos() returned {}, expected < 4.1e18",
1492            now
1493        );
1494    }
1495}