Skip to main content

uni_store/runtime/
l0.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::wal::{Mutation, WriteAheadLog};
5use anyhow::Result;
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9use tracing::{instrument, trace};
10use uni_common::core::id::{Eid, Vid};
11use uni_common::graph::simple_graph::{Direction, SimpleGraph};
12use uni_common::{Properties, Value};
13use uni_crdt::Crdt;
14
15/// Returns the current timestamp in nanoseconds since Unix epoch.
16fn now_nanos() -> i64 {
17    SystemTime::now()
18        .duration_since(UNIX_EPOCH)
19        .map(|d| d.as_nanos() as i64)
20        .unwrap_or(0)
21}
22
23/// Serialize a constraint key for O(1) uniqueness checks.
24/// Format: label + separator + sorted (prop_name, value) pairs.
25pub fn serialize_constraint_key(label: &str, key_values: &[(String, Value)]) -> Vec<u8> {
26    let mut buf = label.as_bytes().to_vec();
27    buf.push(0); // separator
28    let mut sorted = key_values.to_vec();
29    sorted.sort_by(|a, b| a.0.cmp(&b.0));
30    for (k, v) in &sorted {
31        buf.extend(k.as_bytes());
32        buf.push(0);
33        // Use serde_json serialization for deterministic value encoding
34        buf.extend(serde_json::to_vec(v).unwrap_or_default());
35        buf.push(0);
36    }
37    buf
38}
39
40#[derive(Clone, Debug)]
41pub struct TombstoneEntry {
42    pub eid: Eid,
43    pub src_vid: Vid,
44    pub dst_vid: Vid,
45    pub edge_type: u32,
46}
47
48pub struct L0Buffer {
49    /// Graph topology using simple adjacency lists
50    pub graph: SimpleGraph,
51    /// Soft-deleted edges (tombstones for LSM-style merging)
52    pub tombstones: HashMap<Eid, TombstoneEntry>,
53    /// Soft-deleted vertices
54    pub vertex_tombstones: HashSet<Vid>,
55    /// Edge version tracking for MVCC
56    pub edge_versions: HashMap<Eid, u64>,
57    /// Vertex version tracking for MVCC
58    pub vertex_versions: HashMap<Vid, u64>,
59    /// Edge properties (stored separately from topology)
60    pub edge_properties: HashMap<Eid, Properties>,
61    /// Vertex properties (stored separately from topology)
62    pub vertex_properties: HashMap<Vid, Properties>,
63    /// Edge endpoint lookup: eid -> (src, dst, type)
64    pub edge_endpoints: HashMap<Eid, (Vid, Vid, u32)>,
65    /// Vertex labels (VID -> list of label names)
66    /// New in storage design: vertices can have multiple labels
67    pub vertex_labels: HashMap<Vid, Vec<String>>,
68    /// Edge types (EID -> type name)
69    pub edge_types: HashMap<Eid, String>,
70    /// Current version counter
71    pub current_version: u64,
72    /// Mutation count for flush decisions
73    pub mutation_count: usize,
74    /// Write-ahead log for durability
75    pub wal: Option<Arc<WriteAheadLog>>,
76    /// WAL LSN at the time this L0 was rotated for flush.
77    /// Used to ensure WAL truncation doesn't remove entries needed by pending flushes.
78    pub wal_lsn_at_flush: u64,
79    /// Vertex creation timestamps (nanoseconds since epoch)
80    pub vertex_created_at: HashMap<Vid, i64>,
81    /// Vertex update timestamps (nanoseconds since epoch)
82    pub vertex_updated_at: HashMap<Vid, i64>,
83    /// Edge creation timestamps (nanoseconds since epoch)
84    pub edge_created_at: HashMap<Eid, i64>,
85    /// Edge update timestamps (nanoseconds since epoch)
86    pub edge_updated_at: HashMap<Eid, i64>,
87    /// Estimated size in bytes for memory limit enforcement.
88    /// Incremented O(1) on each mutation to avoid O(V+E) size_bytes() calls.
89    pub estimated_size: usize,
90    /// Per-constraint index for O(1) unique key checks.
91    /// Key: constraint composite key (label + sorted property values serialized).
92    /// Value: Vid that owns this key.
93    pub constraint_index: HashMap<Vec<u8>, Vid>,
94}
95
96impl std::fmt::Debug for L0Buffer {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("L0Buffer")
99            .field("vertex_count", &self.graph.vertex_count())
100            .field("edge_count", &self.graph.edge_count())
101            .field("tombstones", &self.tombstones.len())
102            .field("vertex_tombstones", &self.vertex_tombstones.len())
103            .field("current_version", &self.current_version)
104            .field("mutation_count", &self.mutation_count)
105            .finish()
106    }
107}
108
109impl L0Buffer {
110    /// Append labels to a vec, skipping duplicates.
111    fn append_unique_labels(existing: &mut Vec<String>, labels: &[String]) {
112        for label in labels {
113            if !existing.contains(label) {
114                existing.push(label.clone());
115            }
116        }
117    }
118
119    /// Merge CRDT properties into an existing property map.
120    /// Attempts CRDT merge if both values are valid CRDTs, falls back to overwrite.
121    fn merge_crdt_properties(entry: &mut Properties, properties: Properties) {
122        for (k, v) in properties {
123            // Attempt merge if CRDT — convert to serde_json::Value for CRDT deserialization
124            let json_v: serde_json::Value = v.clone().into();
125            if let Ok(mut new_crdt) = serde_json::from_value::<Crdt>(json_v)
126                && let Some(existing_v) = entry.get(&k)
127                && let Ok(existing_crdt) = serde_json::from_value::<Crdt>(existing_v.clone().into())
128            {
129                // Use try_merge to avoid panic on type mismatch
130                if new_crdt.try_merge(&existing_crdt).is_ok()
131                    && let Ok(merged_json) = serde_json::to_value(new_crdt)
132                {
133                    entry.insert(k, uni_common::Value::from(merged_json));
134                    continue;
135                }
136                // try_merge failed (type mismatch) - fall through to overwrite
137            }
138            // Fallback: Overwrite
139            entry.insert(k, v);
140        }
141    }
142
143    /// Helper function to estimate property map size in bytes.
144    fn estimate_properties_size(props: &Properties) -> usize {
145        props.keys().map(|k| k.len() + 32).sum()
146    }
147
148    /// Returns an estimate of the buffer size in bytes.
149    /// Includes all fields for accurate memory accounting.
150    pub fn size_bytes(&self) -> usize {
151        let mut total = 0;
152
153        // Topology
154        total += self.graph.vertex_count() * 8;
155        total += self.graph.edge_count() * 24;
156
157        // Properties (rough estimate: key string + 32 bytes for value)
158        for props in self.vertex_properties.values() {
159            total += Self::estimate_properties_size(props);
160        }
161        for props in self.edge_properties.values() {
162            total += Self::estimate_properties_size(props);
163        }
164
165        // Metadata
166        total += self.tombstones.len() * 64;
167        total += self.vertex_tombstones.len() * 8;
168        total += self.edge_versions.len() * 16;
169        total += self.vertex_versions.len() * 16;
170        total += self.edge_endpoints.len() * 28; // (Vid, Vid, u32) = 8+8+4 + overhead
171
172        // Vertex labels
173        for labels in self.vertex_labels.values() {
174            total += labels.iter().map(|l| l.len() + 24).sum::<usize>();
175        }
176
177        // Edge types
178        for type_name in self.edge_types.values() {
179            total += type_name.len() + 24;
180        }
181
182        // Timestamps (4 maps, each entry is 16 bytes: 8-byte key + 8-byte i64 value)
183        total += self.vertex_created_at.len() * 16;
184        total += self.vertex_updated_at.len() * 16;
185        total += self.edge_created_at.len() * 16;
186        total += self.edge_updated_at.len() * 16;
187
188        total
189    }
190
191    pub fn new(start_version: u64, wal: Option<Arc<WriteAheadLog>>) -> Self {
192        Self {
193            graph: SimpleGraph::new(),
194            tombstones: HashMap::new(),
195            vertex_tombstones: HashSet::new(),
196            edge_versions: HashMap::new(),
197            vertex_versions: HashMap::new(),
198            edge_properties: HashMap::new(),
199            vertex_properties: HashMap::new(),
200            edge_endpoints: HashMap::new(),
201            vertex_labels: HashMap::new(),
202            edge_types: HashMap::new(),
203            current_version: start_version,
204            mutation_count: 0,
205            wal,
206            wal_lsn_at_flush: 0,
207            vertex_created_at: HashMap::new(),
208            vertex_updated_at: HashMap::new(),
209            edge_created_at: HashMap::new(),
210            edge_updated_at: HashMap::new(),
211            estimated_size: 0,
212            constraint_index: HashMap::new(),
213        }
214    }
215
216    pub fn insert_vertex(&mut self, vid: Vid, properties: Properties) {
217        self.insert_vertex_with_labels(vid, properties, &[]);
218    }
219
220    /// Insert a vertex with associated labels.
221    pub fn insert_vertex_with_labels(
222        &mut self,
223        vid: Vid,
224        properties: Properties,
225        labels: &[String],
226    ) {
227        self.current_version += 1;
228        let version = self.current_version;
229        let now = now_nanos();
230
231        if let Some(wal) = &self.wal {
232            let _ = wal.append(&Mutation::InsertVertex {
233                vid,
234                properties: properties.clone(),
235                labels: labels.to_vec(),
236            });
237        }
238
239        self.vertex_tombstones.remove(&vid);
240
241        let entry = self.vertex_properties.entry(vid).or_default();
242        Self::merge_crdt_properties(entry, properties.clone());
243        self.vertex_versions.insert(vid, version);
244
245        // Set timestamps - created_at only set if this is a new vertex
246        self.vertex_created_at.entry(vid).or_insert(now);
247        self.vertex_updated_at.insert(vid, now);
248
249        // Track labels — always create an entry so unlabeled vertices are
250        // distinguishable from "not in L0" when queried via get_vertex_labels.
251        let labels_size: usize = labels.iter().map(|l| l.len() + 24).sum();
252        let existing = self.vertex_labels.entry(vid).or_default();
253        Self::append_unique_labels(existing, labels);
254
255        self.graph.add_vertex(vid);
256        self.mutation_count += 1;
257
258        let props_size = Self::estimate_properties_size(&properties);
259        self.estimated_size += 8 + props_size + 16 + labels_size + 32;
260    }
261
262    /// Add labels to an existing vertex.
263    pub fn add_vertex_labels(&mut self, vid: Vid, labels: &[String]) {
264        let existing = self.vertex_labels.entry(vid).or_default();
265        Self::append_unique_labels(existing, labels);
266    }
267
268    /// Remove a label from an existing vertex.
269    /// Returns true if the label was found and removed, false otherwise.
270    pub fn remove_vertex_label(&mut self, vid: Vid, label: &str) -> bool {
271        if let Some(labels) = self.vertex_labels.get_mut(&vid)
272            && let Some(pos) = labels.iter().position(|l| l == label)
273        {
274            labels.remove(pos);
275            self.current_version += 1;
276            self.mutation_count += 1;
277            // Note: WAL logging for label mutations not yet implemented
278            // Currently consistent with add_vertex_labels behavior
279            return true;
280        }
281        false
282    }
283
284    /// Set the type for an edge.
285    pub fn set_edge_type(&mut self, eid: Eid, edge_type: String) {
286        self.edge_types.insert(eid, edge_type);
287    }
288
289    pub fn delete_vertex(&mut self, vid: Vid) -> Result<()> {
290        self.current_version += 1;
291
292        if let Some(wal) = &mut self.wal {
293            let labels = self.vertex_labels.get(&vid).cloned().unwrap_or_default();
294            wal.append(&Mutation::DeleteVertex { vid, labels })?;
295        }
296
297        self.apply_vertex_deletion(vid);
298        Ok(())
299    }
300
301    /// Cascade-delete a vertex: tombstone all connected edges and remove the vertex.
302    ///
303    /// Shared between `delete_vertex` (live mutations) and `replay_mutations` (WAL recovery).
304    fn apply_vertex_deletion(&mut self, vid: Vid) {
305        let version = self.current_version;
306
307        // Collect edges to delete using O(degree) neighbors() instead of O(E) scan
308        let mut edges_to_remove = HashSet::new();
309
310        // Collect outgoing edges
311        for entry in self.graph.neighbors(vid, Direction::Outgoing) {
312            edges_to_remove.insert(entry.eid);
313        }
314
315        // Collect incoming edges
316        for entry in self.graph.neighbors(vid, Direction::Incoming) {
317            edges_to_remove.insert(entry.eid); // HashSet handles self-loop deduplication
318        }
319
320        let cascaded_edges_count = edges_to_remove.len();
321
322        // Tombstone and remove all collected edges
323        for eid in edges_to_remove {
324            // Retrieve edge endpoints from the map to create tombstone
325            if let Some((src, dst, etype)) = self.edge_endpoints.get(&eid) {
326                self.tombstones.insert(
327                    eid,
328                    TombstoneEntry {
329                        eid,
330                        src_vid: *src,
331                        dst_vid: *dst,
332                        edge_type: *etype,
333                    },
334                );
335                self.edge_versions.insert(eid, version);
336                self.edge_endpoints.remove(&eid);
337                self.edge_properties.remove(&eid);
338                self.graph.remove_edge(eid);
339                self.mutation_count += 1;
340            }
341        }
342
343        self.vertex_tombstones.insert(vid);
344        self.vertex_properties.remove(&vid);
345        self.vertex_versions.insert(vid, version);
346        self.graph.remove_vertex(vid);
347        self.mutation_count += 1;
348
349        // Remove constraint index entries for this vertex
350        self.constraint_index.retain(|_, v| *v != vid);
351
352        // 64 bytes per edge tombstone + 8 for vertex tombstone
353        self.estimated_size += cascaded_edges_count * 72 + 8;
354    }
355
356    pub fn insert_edge(
357        &mut self,
358        src_vid: Vid,
359        dst_vid: Vid,
360        edge_type: u32,
361        eid: Eid,
362        properties: Properties,
363        edge_type_name: Option<String>,
364    ) -> Result<()> {
365        self.current_version += 1;
366        let now = now_nanos();
367
368        if let Some(wal) = &mut self.wal {
369            wal.append(&Mutation::InsertEdge {
370                src_vid,
371                dst_vid,
372                edge_type,
373                eid,
374                version: self.current_version,
375                properties: properties.clone(),
376                edge_type_name: edge_type_name.clone(),
377            })?;
378        }
379
380        self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
381
382        // Store edge type name in metadata if provided
383        let type_name_size = if let Some(ref name) = edge_type_name {
384            let size = name.len() + 24;
385            self.edge_types.insert(eid, name.clone());
386            size
387        } else {
388            0
389        };
390
391        // Set timestamps - created_at only set if this is a new edge
392        self.edge_created_at.entry(eid).or_insert(now);
393        self.edge_updated_at.insert(eid, now);
394
395        self.estimated_size += type_name_size;
396
397        Ok(())
398    }
399
400    /// Core edge insertion logic: add vertices, add edge, merge properties, update metadata.
401    ///
402    /// Shared between `insert_edge` (live mutations) and `replay_mutations` (WAL recovery).
403    ///
404    /// # Errors
405    ///
406    /// Returns error if either endpoint vertex has been deleted (exists in vertex_tombstones).
407    /// This prevents "ghost vertex" resurrection via edge insertion. See issue #77.
408    fn apply_edge_insertion(
409        &mut self,
410        src_vid: Vid,
411        dst_vid: Vid,
412        edge_type: u32,
413        eid: Eid,
414        properties: Properties,
415    ) -> Result<()> {
416        let version = self.current_version;
417
418        // Check if either endpoint has been deleted. Inserting an edge to a deleted
419        // vertex would resurrect it as a "ghost vertex" with no properties. See issue #77.
420        if self.vertex_tombstones.contains(&src_vid) {
421            anyhow::bail!(
422                "Cannot insert edge: source vertex {} has been deleted (issue #77)",
423                src_vid
424            );
425        }
426        if self.vertex_tombstones.contains(&dst_vid) {
427            anyhow::bail!(
428                "Cannot insert edge: destination vertex {} has been deleted (issue #77)",
429                dst_vid
430            );
431        }
432
433        // Add vertices to graph topology if they don't exist.
434        // IMPORTANT: Only add to graph structure, do NOT call insert_vertex.
435        // insert_vertex creates a new version with empty properties, which would
436        // cause MVCC to pick the empty version as "latest", losing original properties.
437        if !self.graph.contains_vertex(src_vid) {
438            self.graph.add_vertex(src_vid);
439        }
440        if !self.graph.contains_vertex(dst_vid) {
441            self.graph.add_vertex(dst_vid);
442        }
443
444        self.graph.add_edge(src_vid, dst_vid, eid, edge_type);
445
446        // Store metadata with CRDT merge logic
447        let props_size = Self::estimate_properties_size(&properties);
448        if !properties.is_empty() {
449            let entry = self.edge_properties.entry(eid).or_default();
450            Self::merge_crdt_properties(entry, properties);
451        }
452
453        self.edge_versions.insert(eid, version);
454        self.edge_endpoints
455            .insert(eid, (src_vid, dst_vid, edge_type));
456        self.tombstones.remove(&eid);
457        self.mutation_count += 1;
458
459        // 24 edge + props + 16 version + 28 endpoints + 32 timestamps
460        self.estimated_size += 24 + props_size + 16 + 28 + 32;
461
462        Ok(())
463    }
464
465    pub fn delete_edge(
466        &mut self,
467        eid: Eid,
468        src_vid: Vid,
469        dst_vid: Vid,
470        edge_type: u32,
471    ) -> Result<()> {
472        self.current_version += 1;
473        let now = now_nanos();
474
475        if let Some(wal) = &mut self.wal {
476            wal.append(&Mutation::DeleteEdge {
477                eid,
478                src_vid,
479                dst_vid,
480                edge_type,
481                version: self.current_version,
482            })?;
483        }
484
485        self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
486
487        // Update timestamp - deletion is an update
488        self.edge_updated_at.insert(eid, now);
489
490        Ok(())
491    }
492
493    /// Core edge deletion logic: tombstone the edge, update version, remove from graph.
494    ///
495    /// Shared between `delete_edge` (live mutations) and `replay_mutations` (WAL recovery).
496    fn apply_edge_deletion(&mut self, eid: Eid, src_vid: Vid, dst_vid: Vid, edge_type: u32) {
497        let version = self.current_version;
498
499        self.tombstones.insert(
500            eid,
501            TombstoneEntry {
502                eid,
503                src_vid,
504                dst_vid,
505                edge_type,
506            },
507        );
508        self.edge_versions.insert(eid, version);
509        self.graph.remove_edge(eid);
510        self.mutation_count += 1;
511
512        // 64 bytes tombstone + 16 bytes version
513        self.estimated_size += 80;
514    }
515
516    /// Returns neighbors in the specified direction.
517    /// O(degree) complexity - iterates only edges connected to the vertex.
518    pub fn get_neighbors(
519        &self,
520        vid: Vid,
521        edge_type: u32,
522        direction: Direction,
523    ) -> Vec<(Vid, Eid, u64)> {
524        let edges = self.graph.neighbors(vid, direction);
525
526        edges
527            .iter()
528            .filter(|e| e.edge_type == edge_type && !self.is_tombstoned(e.eid))
529            .map(|e| {
530                let neighbor = match direction {
531                    Direction::Outgoing => e.dst_vid,
532                    Direction::Incoming => e.src_vid,
533                };
534                let version = self.edge_versions.get(&e.eid).copied().unwrap_or(0);
535                (neighbor, e.eid, version)
536            })
537            .collect()
538    }
539
540    pub fn is_tombstoned(&self, eid: Eid) -> bool {
541        self.tombstones.contains_key(&eid)
542    }
543
544    /// Returns all VIDs in vertex_labels that match the given label name.
545    /// Used for L0 overlay during vertex scanning.
546    pub fn vids_for_label(&self, label_name: &str) -> Vec<Vid> {
547        self.vertex_labels
548            .iter()
549            .filter(|(_, labels)| labels.iter().any(|l| l == label_name))
550            .map(|(vid, _)| *vid)
551            .collect()
552    }
553
554    /// Returns all vertex VIDs in the L0 buffer.
555    ///
556    /// Used for schemaless scanning (MATCH (n) without label).
557    pub fn all_vertex_vids(&self) -> Vec<Vid> {
558        self.vertex_properties.keys().copied().collect()
559    }
560
561    /// Returns all VIDs in vertex_labels that match any of the given label names.
562    /// Used for L0 overlay during multi-label vertex scanning.
563    pub fn vids_for_labels(&self, label_names: &[&str]) -> Vec<Vid> {
564        self.vertex_labels
565            .iter()
566            .filter(|(_, labels)| label_names.iter().any(|ln| labels.iter().any(|l| l == *ln)))
567            .map(|(vid, _)| *vid)
568            .collect()
569    }
570
571    /// Returns all VIDs that have ALL specified labels.
572    pub fn vids_with_all_labels(&self, label_names: &[&str]) -> Vec<Vid> {
573        self.vertex_labels
574            .iter()
575            .filter(|(_, labels)| label_names.iter().all(|ln| labels.iter().any(|l| l == *ln)))
576            .map(|(vid, _)| *vid)
577            .collect()
578    }
579
580    /// Gets the labels for a VID.
581    pub fn get_vertex_labels(&self, vid: Vid) -> Option<&[String]> {
582        self.vertex_labels.get(&vid).map(|v| v.as_slice())
583    }
584
585    /// Gets the edge type for an EID.
586    pub fn get_edge_type(&self, eid: Eid) -> Option<&str> {
587        self.edge_types.get(&eid).map(|s| s.as_str())
588    }
589
590    /// Returns all EIDs in edge_types that match the given type name.
591    /// Used for L0 overlay during schemaless edge scanning.
592    pub fn eids_for_type(&self, type_name: &str) -> Vec<Eid> {
593        self.edge_types
594            .iter()
595            .filter(|(eid, etype)| *etype == type_name && !self.tombstones.contains_key(eid))
596            .map(|(eid, _)| *eid)
597            .collect()
598    }
599
600    /// Returns all edge EIDs in the L0 buffer (non-tombstoned).
601    ///
602    /// Used for schemaless scanning (`MATCH ()-[r]->()`) without type.
603    pub fn all_edge_eids(&self) -> Vec<Eid> {
604        self.edge_endpoints
605            .keys()
606            .filter(|eid| !self.tombstones.contains_key(eid))
607            .copied()
608            .collect()
609    }
610
611    /// Returns edge endpoint data (src_vid, dst_vid) for an EID.
612    pub fn get_edge_endpoints(&self, eid: Eid) -> Option<(Vid, Vid)> {
613        self.edge_endpoints
614            .get(&eid)
615            .map(|(src, dst, _)| (*src, *dst))
616    }
617
618    /// Returns full edge endpoint data (src_vid, dst_vid, edge_type_id) for an EID.
619    pub fn get_edge_endpoint_full(&self, eid: Eid) -> Option<(Vid, Vid, u32)> {
620        self.edge_endpoints.get(&eid).copied()
621    }
622
623    /// Insert a constraint key into the index for O(1) duplicate detection.
624    pub fn insert_constraint_key(&mut self, key: Vec<u8>, vid: Vid) {
625        self.constraint_index.insert(key, vid);
626    }
627
628    /// Check if a constraint key exists in the index, excluding a specific VID.
629    /// Returns true if the key exists and is owned by a different vertex.
630    pub fn has_constraint_key(&self, key: &[u8], exclude_vid: Vid) -> bool {
631        self.constraint_index
632            .get(key)
633            .is_some_and(|&v| v != exclude_vid)
634    }
635
636    #[instrument(skip(self, other), level = "trace")]
637    pub fn merge(&mut self, other: &L0Buffer) -> Result<()> {
638        trace!(
639            other_mutation_count = other.mutation_count,
640            "Merging L0 buffer"
641        );
642        // Merge Vertices
643        for &vid in &other.vertex_tombstones {
644            self.delete_vertex(vid)?;
645        }
646
647        for (vid, props) in &other.vertex_properties {
648            let labels = other.vertex_labels.get(vid).cloned().unwrap_or_default();
649            self.insert_vertex_with_labels(*vid, props.clone(), &labels);
650        }
651
652        // Merge vertex labels that might not have properties
653        for (vid, labels) in &other.vertex_labels {
654            if !self.vertex_labels.contains_key(vid) {
655                self.vertex_labels.insert(*vid, labels.clone());
656            }
657        }
658
659        // Merge Edges - insert all edges from edge_endpoints, using empty props if none exist
660        for (eid, (src, dst, etype)) in &other.edge_endpoints {
661            if other.tombstones.contains_key(eid) {
662                self.delete_edge(*eid, *src, *dst, *etype)?;
663            } else {
664                let props = other.edge_properties.get(eid).cloned().unwrap_or_default();
665                let etype_name = other.edge_types.get(eid).cloned();
666                self.insert_edge(*src, *dst, *etype, *eid, props, etype_name)?;
667            }
668        }
669
670        // Edge types are now merged inside insert_edge, so no separate loop needed
671
672        // Merge timestamps - preserve semantics of or_insert (keep oldest created_at)
673        // and insert (use latest updated_at)
674        for (vid, ts) in &other.vertex_created_at {
675            self.vertex_created_at.entry(*vid).or_insert(*ts); // keep oldest
676        }
677        for (vid, ts) in &other.vertex_updated_at {
678            self.vertex_updated_at.insert(*vid, *ts); // use latest (tx wins)
679        }
680
681        for (eid, ts) in &other.edge_created_at {
682            self.edge_created_at.entry(*eid).or_insert(*ts); // keep oldest
683        }
684        for (eid, ts) in &other.edge_updated_at {
685            self.edge_updated_at.insert(*eid, *ts); // use latest (tx wins)
686        }
687
688        // Conservatively add other's estimated size (may overcount due to
689        // deduplication, but that's safe for a memory limit).
690        self.estimated_size += other.estimated_size;
691
692        // Merge constraint index
693        for (key, vid) in &other.constraint_index {
694            self.constraint_index.insert(key.clone(), *vid);
695        }
696
697        Ok(())
698    }
699
700    /// Replay mutations from WAL without re-logging them.
701    /// Used during startup recovery to restore L0 state from persisted WAL.
702    /// Uses CRDT merge semantics to ensure recovered state matches pre-crash state.
703    #[instrument(skip(self, mutations), level = "debug")]
704    pub fn replay_mutations(&mut self, mutations: Vec<Mutation>) -> Result<()> {
705        trace!(count = mutations.len(), "Replaying mutations");
706        for mutation in mutations {
707            match mutation {
708                Mutation::InsertVertex {
709                    vid,
710                    properties,
711                    labels,
712                } => {
713                    // Apply without WAL logging, with CRDT merge semantics
714                    self.current_version += 1;
715                    let version = self.current_version;
716
717                    self.vertex_tombstones.remove(&vid);
718                    let entry = self.vertex_properties.entry(vid).or_default();
719                    Self::merge_crdt_properties(entry, properties);
720                    self.vertex_versions.insert(vid, version);
721                    self.graph.add_vertex(vid);
722                    self.mutation_count += 1;
723
724                    // Restore vertex labels from WAL
725                    let existing = self.vertex_labels.entry(vid).or_default();
726                    Self::append_unique_labels(existing, &labels);
727                }
728                Mutation::DeleteVertex { vid, labels } => {
729                    self.current_version += 1;
730                    // Restore labels BEFORE apply_vertex_deletion
731                    if !labels.is_empty() {
732                        let existing = self.vertex_labels.entry(vid).or_default();
733                        Self::append_unique_labels(existing, &labels);
734                    }
735                    self.apply_vertex_deletion(vid);
736                }
737                Mutation::InsertEdge {
738                    src_vid,
739                    dst_vid,
740                    edge_type,
741                    eid,
742                    version: _,
743                    properties,
744                    edge_type_name,
745                } => {
746                    self.current_version += 1;
747                    self.apply_edge_insertion(src_vid, dst_vid, edge_type, eid, properties)?;
748                    // Restore edge type name metadata if present
749                    if let Some(name) = edge_type_name {
750                        self.edge_types.insert(eid, name);
751                    }
752                }
753                Mutation::DeleteEdge {
754                    eid,
755                    src_vid,
756                    dst_vid,
757                    edge_type,
758                    version: _,
759                } => {
760                    self.current_version += 1;
761                    self.apply_edge_deletion(eid, src_vid, dst_vid, edge_type);
762                }
763            }
764        }
765        Ok(())
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use super::*;
772
773    #[test]
774    fn test_l0_buffer_ops() -> Result<()> {
775        let mut l0 = L0Buffer::new(0, None);
776        let vid_a = Vid::new(1);
777        let vid_b = Vid::new(2);
778        let eid_ab = Eid::new(101);
779
780        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
781
782        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
783        assert_eq!(neighbors.len(), 1);
784        assert_eq!(neighbors[0].0, vid_b);
785        assert_eq!(neighbors[0].1, eid_ab);
786
787        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
788        assert!(l0.is_tombstoned(eid_ab));
789
790        // Verify neighbors are empty after deletion
791        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
792        assert_eq!(neighbors_after.len(), 0);
793
794        Ok(())
795    }
796
797    #[test]
798    fn test_l0_buffer_multiple_edges() -> Result<()> {
799        let mut l0 = L0Buffer::new(0, None);
800        let vid_a = Vid::new(1);
801        let vid_b = Vid::new(2);
802        let vid_c = Vid::new(3);
803        let eid_ab = Eid::new(101);
804        let eid_ac = Eid::new(102);
805
806        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
807        l0.insert_edge(vid_a, vid_c, 1, eid_ac, HashMap::new(), None)?;
808
809        let neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
810        assert_eq!(neighbors.len(), 2);
811
812        // Delete one edge
813        l0.delete_edge(eid_ab, vid_a, vid_b, 1)?;
814
815        // Should still have one neighbor
816        let neighbors_after = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
817        assert_eq!(neighbors_after.len(), 1);
818        assert_eq!(neighbors_after[0].0, vid_c);
819
820        Ok(())
821    }
822
823    #[test]
824    fn test_l0_buffer_edge_type_filter() -> Result<()> {
825        let mut l0 = L0Buffer::new(0, None);
826        let vid_a = Vid::new(1);
827        let vid_b = Vid::new(2);
828        let vid_c = Vid::new(3);
829        let eid_ab = Eid::new(101);
830        let eid_ac = Eid::new(201); // Different edge type
831
832        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
833        l0.insert_edge(vid_a, vid_c, 2, eid_ac, HashMap::new(), None)?;
834
835        // Filter by edge type 1
836        let type1_neighbors = l0.get_neighbors(vid_a, 1, Direction::Outgoing);
837        assert_eq!(type1_neighbors.len(), 1);
838        assert_eq!(type1_neighbors[0].0, vid_b);
839
840        // Filter by edge type 2
841        let type2_neighbors = l0.get_neighbors(vid_a, 2, Direction::Outgoing);
842        assert_eq!(type2_neighbors.len(), 1);
843        assert_eq!(type2_neighbors[0].0, vid_c);
844
845        Ok(())
846    }
847
848    #[test]
849    fn test_l0_buffer_incoming_edges() -> Result<()> {
850        let mut l0 = L0Buffer::new(0, None);
851        let vid_a = Vid::new(1);
852        let vid_b = Vid::new(2);
853        let vid_c = Vid::new(3);
854        let eid_ab = Eid::new(101);
855        let eid_cb = Eid::new(102);
856
857        // a -> b and c -> b
858        l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
859        l0.insert_edge(vid_c, vid_b, 1, eid_cb, HashMap::new(), None)?;
860
861        // Check incoming edges to b
862        let incoming = l0.get_neighbors(vid_b, 1, Direction::Incoming);
863        assert_eq!(incoming.len(), 2);
864
865        Ok(())
866    }
867
868    /// Regression test: merge should preserve edges without properties
869    #[test]
870    fn test_merge_empty_props_edge() -> Result<()> {
871        let mut main_l0 = L0Buffer::new(0, None);
872        let mut tx_l0 = L0Buffer::new(0, None);
873
874        let vid_a = Vid::new(1);
875        let vid_b = Vid::new(2);
876        let eid_ab = Eid::new(101);
877
878        // Insert edge with empty properties in transaction L0
879        tx_l0.insert_edge(vid_a, vid_b, 1, eid_ab, HashMap::new(), None)?;
880
881        // Verify edge exists in tx_l0
882        assert!(tx_l0.edge_endpoints.contains_key(&eid_ab));
883        assert!(!tx_l0.edge_properties.contains_key(&eid_ab)); // No properties entry
884
885        // Merge into main L0
886        main_l0.merge(&tx_l0)?;
887
888        // Edge should exist in main L0 after merge
889        assert!(main_l0.edge_endpoints.contains_key(&eid_ab));
890        let neighbors = main_l0.get_neighbors(vid_a, 1, Direction::Outgoing);
891        assert_eq!(neighbors.len(), 1);
892        assert_eq!(neighbors[0].0, vid_b);
893
894        Ok(())
895    }
896
897    /// Regression test: WAL replay should use CRDT merge semantics
898    #[test]
899    fn test_replay_crdt_merge() -> Result<()> {
900        use crate::runtime::wal::Mutation;
901        use serde_json::json;
902        use uni_common::Value;
903
904        let mut l0 = L0Buffer::new(0, None);
905        let vid = Vid::new(1);
906
907        // Create GCounter CRDT values using correct serde format:
908        // {"t": "gc", "d": {"counts": {...}}}
909        let counter1: Value = json!({
910            "t": "gc",
911            "d": {"counts": {"node1": 5}}
912        })
913        .into();
914        let counter2: Value = json!({
915            "t": "gc",
916            "d": {"counts": {"node2": 3}}
917        })
918        .into();
919
920        // First mutation: insert vertex with counter1
921        let mut props1 = HashMap::new();
922        props1.insert("counter".to_string(), counter1.clone());
923        l0.replay_mutations(vec![Mutation::InsertVertex {
924            vid,
925            properties: props1,
926            labels: vec![],
927        }])?;
928
929        // Second mutation: insert same vertex with counter2 (should merge)
930        let mut props2 = HashMap::new();
931        props2.insert("counter".to_string(), counter2.clone());
932        l0.replay_mutations(vec![Mutation::InsertVertex {
933            vid,
934            properties: props2,
935            labels: vec![],
936        }])?;
937
938        // Verify CRDT was merged (both node1 and node2 counts present)
939        let stored_props = l0.vertex_properties.get(&vid).unwrap();
940        let stored_counter = stored_props.get("counter").unwrap();
941
942        // Convert back to serde_json::Value for nested access
943        let stored_json: serde_json::Value = stored_counter.clone().into();
944        // The merged counter should have both node1: 5 and node2: 3
945        let data = stored_json.get("d").unwrap();
946        let counts = data.get("counts").unwrap();
947        assert_eq!(counts.get("node1"), Some(&json!(5)));
948        assert_eq!(counts.get("node2"), Some(&json!(3)));
949
950        Ok(())
951    }
952
953    #[test]
954    fn test_merge_preserves_vertex_timestamps() -> Result<()> {
955        let mut l0_main = L0Buffer::new(0, None);
956        let mut l0_tx = L0Buffer::new(0, None);
957        let vid = Vid::new(1);
958
959        // Main buffer: insert vertex with timestamp T1
960        let ts_main_created = 1000;
961        let ts_main_updated = 1100;
962        l0_main.insert_vertex(vid, HashMap::new());
963        l0_main.vertex_created_at.insert(vid, ts_main_created);
964        l0_main.vertex_updated_at.insert(vid, ts_main_updated);
965
966        // Transaction buffer: update same vertex with timestamp T2 (later)
967        let ts_tx_created = 2000; // should be ignored (main has older created_at)
968        let ts_tx_updated = 2100; // should win (tx has newer updated_at)
969        l0_tx.insert_vertex(vid, HashMap::new());
970        l0_tx.vertex_created_at.insert(vid, ts_tx_created);
971        l0_tx.vertex_updated_at.insert(vid, ts_tx_updated);
972
973        // Merge transaction into main
974        l0_main.merge(&l0_tx)?;
975
976        // Verify created_at is oldest (from main)
977        assert_eq!(
978            *l0_main.vertex_created_at.get(&vid).unwrap(),
979            ts_main_created,
980            "created_at should preserve oldest timestamp"
981        );
982
983        // Verify updated_at is latest (from tx)
984        assert_eq!(
985            *l0_main.vertex_updated_at.get(&vid).unwrap(),
986            ts_tx_updated,
987            "updated_at should use latest timestamp"
988        );
989
990        Ok(())
991    }
992
993    #[test]
994    fn test_merge_preserves_edge_timestamps() -> Result<()> {
995        let mut l0_main = L0Buffer::new(0, None);
996        let mut l0_tx = L0Buffer::new(0, None);
997        let vid_a = Vid::new(1);
998        let vid_b = Vid::new(2);
999        let eid = Eid::new(100);
1000
1001        // Main buffer: insert edge with timestamp T1
1002        let ts_main_created = 1000;
1003        let ts_main_updated = 1100;
1004        l0_main.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1005        l0_main.edge_created_at.insert(eid, ts_main_created);
1006        l0_main.edge_updated_at.insert(eid, ts_main_updated);
1007
1008        // Transaction buffer: update same edge with timestamp T2 (later)
1009        let ts_tx_created = 2000; // should be ignored
1010        let ts_tx_updated = 2100; // should win
1011        l0_tx.insert_edge(vid_a, vid_b, 1, eid, HashMap::new(), None)?;
1012        l0_tx.edge_created_at.insert(eid, ts_tx_created);
1013        l0_tx.edge_updated_at.insert(eid, ts_tx_updated);
1014
1015        // Merge transaction into main
1016        l0_main.merge(&l0_tx)?;
1017
1018        // Verify created_at is oldest (from main)
1019        assert_eq!(
1020            *l0_main.edge_created_at.get(&eid).unwrap(),
1021            ts_main_created,
1022            "edge created_at should preserve oldest timestamp"
1023        );
1024
1025        // Verify updated_at is latest (from tx)
1026        assert_eq!(
1027            *l0_main.edge_updated_at.get(&eid).unwrap(),
1028            ts_tx_updated,
1029            "edge updated_at should use latest timestamp"
1030        );
1031
1032        Ok(())
1033    }
1034
1035    #[test]
1036    fn test_merge_created_at_not_overwritten_for_existing_vertex() -> Result<()> {
1037        use uni_common::Value;
1038
1039        let mut l0_main = L0Buffer::new(0, None);
1040        let mut l0_tx = L0Buffer::new(0, None);
1041        let vid = Vid::new(1);
1042
1043        // Main buffer: vertex created at T1
1044        let ts_original = 1000;
1045        l0_main.insert_vertex(vid, HashMap::new());
1046        l0_main.vertex_created_at.insert(vid, ts_original);
1047        l0_main.vertex_updated_at.insert(vid, ts_original);
1048
1049        // Transaction buffer: update vertex (created_at would be T2 if set)
1050        let ts_tx = 2000;
1051        let mut props = HashMap::new();
1052        props.insert("updated".to_string(), Value::String("yes".to_string()));
1053        l0_tx.insert_vertex(vid, props);
1054        l0_tx.vertex_created_at.insert(vid, ts_tx);
1055        l0_tx.vertex_updated_at.insert(vid, ts_tx);
1056
1057        // Merge transaction into main
1058        l0_main.merge(&l0_tx)?;
1059
1060        // Verify created_at was NOT overwritten (still T1, not T2)
1061        assert_eq!(
1062            *l0_main.vertex_created_at.get(&vid).unwrap(),
1063            ts_original,
1064            "created_at must not be overwritten for existing vertex"
1065        );
1066
1067        // Verify updated_at WAS updated (now T2)
1068        assert_eq!(
1069            *l0_main.vertex_updated_at.get(&vid).unwrap(),
1070            ts_tx,
1071            "updated_at should reflect transaction timestamp"
1072        );
1073
1074        // Verify properties were merged
1075        assert!(
1076            l0_main
1077                .vertex_properties
1078                .get(&vid)
1079                .unwrap()
1080                .contains_key("updated")
1081        );
1082
1083        Ok(())
1084    }
1085
1086    /// Test for Issue #23: Vertex labels preserved through replay_mutations
1087    #[test]
1088    fn test_replay_mutations_preserves_vertex_labels() -> Result<()> {
1089        use crate::runtime::wal::Mutation;
1090
1091        let mut l0 = L0Buffer::new(0, None);
1092        let vid = Vid::new(42);
1093
1094        // Create InsertVertex mutation with labels
1095        let mutations = vec![Mutation::InsertVertex {
1096            vid,
1097            properties: {
1098                let mut props = HashMap::new();
1099                props.insert(
1100                    "name".to_string(),
1101                    uni_common::Value::String("Alice".to_string()),
1102                );
1103                props
1104            },
1105            labels: vec!["Person".to_string(), "User".to_string()],
1106        }];
1107
1108        // Replay mutations
1109        l0.replay_mutations(mutations)?;
1110
1111        // Verify vertex exists in L0
1112        assert!(l0.vertex_properties.contains_key(&vid));
1113
1114        // Verify labels are preserved
1115        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1116        assert_eq!(labels.len(), 2);
1117        assert!(labels.contains(&"Person".to_string()));
1118        assert!(labels.contains(&"User".to_string()));
1119
1120        // Verify vertex is findable by label
1121        let person_vids = l0.vids_for_label("Person");
1122        assert_eq!(person_vids.len(), 1);
1123        assert_eq!(person_vids[0], vid);
1124
1125        let user_vids = l0.vids_for_label("User");
1126        assert_eq!(user_vids.len(), 1);
1127        assert_eq!(user_vids[0], vid);
1128
1129        Ok(())
1130    }
1131
1132    /// Test for Issue #23: DeleteVertex labels preserved for tombstone flushing
1133    #[test]
1134    fn test_replay_mutations_preserves_delete_vertex_labels() -> Result<()> {
1135        use crate::runtime::wal::Mutation;
1136
1137        let mut l0 = L0Buffer::new(0, None);
1138        let vid = Vid::new(99);
1139
1140        // First insert vertex with labels
1141        l0.insert_vertex_with_labels(
1142            vid,
1143            HashMap::new(),
1144            &["Person".to_string(), "Admin".to_string()],
1145        );
1146
1147        // Verify vertex and labels exist
1148        assert!(l0.vertex_properties.contains_key(&vid));
1149        let labels = l0.get_vertex_labels(vid).expect("Labels should exist");
1150        assert_eq!(labels.len(), 2);
1151
1152        // Create DeleteVertex mutation with labels
1153        let mutations = vec![Mutation::DeleteVertex {
1154            vid,
1155            labels: vec!["Person".to_string(), "Admin".to_string()],
1156        }];
1157
1158        // Replay deletion
1159        l0.replay_mutations(mutations)?;
1160
1161        // Verify vertex is tombstoned
1162        assert!(l0.vertex_tombstones.contains(&vid));
1163
1164        // Verify labels are preserved in L0 (needed for Issue #76 tombstone flushing)
1165        // The labels should still be accessible for the flush logic to know which tables to update
1166        let labels = l0.get_vertex_labels(vid);
1167        assert!(
1168            labels.is_some(),
1169            "Labels should be preserved even after deletion for tombstone flushing"
1170        );
1171
1172        Ok(())
1173    }
1174
1175    /// Test for Issue #28: Edge type name preserved through replay_mutations
1176    #[test]
1177    fn test_replay_mutations_preserves_edge_type_name() -> Result<()> {
1178        use crate::runtime::wal::Mutation;
1179
1180        let mut l0 = L0Buffer::new(0, None);
1181        let src = Vid::new(1);
1182        let dst = Vid::new(2);
1183        let eid = Eid::new(500);
1184        let edge_type = 100;
1185
1186        // Create InsertEdge mutation with edge_type_name
1187        let mutations = vec![Mutation::InsertEdge {
1188            src_vid: src,
1189            dst_vid: dst,
1190            edge_type,
1191            eid,
1192            version: 1,
1193            properties: {
1194                let mut props = HashMap::new();
1195                props.insert("since".to_string(), uni_common::Value::Int(2020));
1196                props
1197            },
1198            edge_type_name: Some("KNOWS".to_string()),
1199        }];
1200
1201        // Replay mutations
1202        l0.replay_mutations(mutations)?;
1203
1204        // Verify edge exists in L0
1205        assert!(l0.edge_endpoints.contains_key(&eid));
1206
1207        // Verify edge type name is preserved
1208        let type_name = l0.get_edge_type(eid).expect("Edge type name should exist");
1209        assert_eq!(type_name, "KNOWS");
1210
1211        // Verify edge is findable by type name
1212        let knows_eids = l0.eids_for_type("KNOWS");
1213        assert_eq!(knows_eids.len(), 1);
1214        assert_eq!(knows_eids[0], eid);
1215
1216        Ok(())
1217    }
1218
1219    /// Test for Issue #28: Edge type mapping survives multiple replay cycles
1220    #[test]
1221    fn test_edge_type_mapping_survives_multiple_replays() -> Result<()> {
1222        use crate::runtime::wal::Mutation;
1223
1224        let mut l0 = L0Buffer::new(0, None);
1225
1226        // Replay multiple edge insertions with different types
1227        let mutations = vec![
1228            Mutation::InsertEdge {
1229                src_vid: Vid::new(1),
1230                dst_vid: Vid::new(2),
1231                edge_type: 100,
1232                eid: Eid::new(1000),
1233                version: 1,
1234                properties: HashMap::new(),
1235                edge_type_name: Some("KNOWS".to_string()),
1236            },
1237            Mutation::InsertEdge {
1238                src_vid: Vid::new(2),
1239                dst_vid: Vid::new(3),
1240                edge_type: 101,
1241                eid: Eid::new(1001),
1242                version: 2,
1243                properties: HashMap::new(),
1244                edge_type_name: Some("LIKES".to_string()),
1245            },
1246            Mutation::InsertEdge {
1247                src_vid: Vid::new(3),
1248                dst_vid: Vid::new(1),
1249                edge_type: 100,
1250                eid: Eid::new(1002),
1251                version: 3,
1252                properties: HashMap::new(),
1253                edge_type_name: Some("KNOWS".to_string()),
1254            },
1255        ];
1256
1257        l0.replay_mutations(mutations)?;
1258
1259        // Verify all edge type mappings are preserved
1260        assert_eq!(l0.get_edge_type(Eid::new(1000)), Some("KNOWS"));
1261        assert_eq!(l0.get_edge_type(Eid::new(1001)), Some("LIKES"));
1262        assert_eq!(l0.get_edge_type(Eid::new(1002)), Some("KNOWS"));
1263
1264        // Verify edges can be queried by type
1265        let knows_edges = l0.eids_for_type("KNOWS");
1266        assert_eq!(knows_edges.len(), 2);
1267        assert!(knows_edges.contains(&Eid::new(1000)));
1268        assert!(knows_edges.contains(&Eid::new(1002)));
1269
1270        let likes_edges = l0.eids_for_type("LIKES");
1271        assert_eq!(likes_edges.len(), 1);
1272        assert_eq!(likes_edges[0], Eid::new(1001));
1273
1274        Ok(())
1275    }
1276
1277    /// Test for Issue #23 + #28: Combined vertex labels and edge types in replay
1278    #[test]
1279    fn test_replay_mutations_combined_labels_and_edge_types() -> Result<()> {
1280        use crate::runtime::wal::Mutation;
1281
1282        let mut l0 = L0Buffer::new(0, None);
1283        let alice = Vid::new(1);
1284        let bob = Vid::new(2);
1285        let eid = Eid::new(100);
1286
1287        // Simulate crash recovery scenario: replay full transaction log
1288        let mutations = vec![
1289            // Insert Alice with Person label
1290            Mutation::InsertVertex {
1291                vid: alice,
1292                properties: {
1293                    let mut props = HashMap::new();
1294                    props.insert(
1295                        "name".to_string(),
1296                        uni_common::Value::String("Alice".to_string()),
1297                    );
1298                    props
1299                },
1300                labels: vec!["Person".to_string()],
1301            },
1302            // Insert Bob with Person label
1303            Mutation::InsertVertex {
1304                vid: bob,
1305                properties: {
1306                    let mut props = HashMap::new();
1307                    props.insert(
1308                        "name".to_string(),
1309                        uni_common::Value::String("Bob".to_string()),
1310                    );
1311                    props
1312                },
1313                labels: vec!["Person".to_string()],
1314            },
1315            // Create KNOWS edge between them
1316            Mutation::InsertEdge {
1317                src_vid: alice,
1318                dst_vid: bob,
1319                edge_type: 1,
1320                eid,
1321                version: 3,
1322                properties: HashMap::new(),
1323                edge_type_name: Some("KNOWS".to_string()),
1324            },
1325        ];
1326
1327        // Replay all mutations
1328        l0.replay_mutations(mutations)?;
1329
1330        // Verify vertex labels preserved
1331        assert_eq!(l0.get_vertex_labels(alice).unwrap().len(), 1);
1332        assert_eq!(l0.get_vertex_labels(bob).unwrap().len(), 1);
1333        assert_eq!(l0.vids_for_label("Person").len(), 2);
1334
1335        // Verify edge type name preserved
1336        assert_eq!(l0.get_edge_type(eid).unwrap(), "KNOWS");
1337        assert_eq!(l0.eids_for_type("KNOWS").len(), 1);
1338
1339        // Verify graph structure
1340        let alice_neighbors = l0.get_neighbors(alice, 1, Direction::Outgoing);
1341        assert_eq!(alice_neighbors.len(), 1);
1342        assert_eq!(alice_neighbors[0].0, bob);
1343
1344        Ok(())
1345    }
1346
1347    /// Test for Issue #23: Empty labels should deserialize correctly (backward compat)
1348    #[test]
1349    fn test_replay_mutations_backward_compat_empty_labels() -> Result<()> {
1350        use crate::runtime::wal::Mutation;
1351
1352        let mut l0 = L0Buffer::new(0, None);
1353        let vid = Vid::new(1);
1354
1355        // Simulate old WAL format: InsertVertex with empty labels
1356        // (This tests #[serde(default)] behavior)
1357        let mutations = vec![Mutation::InsertVertex {
1358            vid,
1359            properties: HashMap::new(),
1360            labels: vec![], // Empty labels (old format compatibility)
1361        }];
1362
1363        l0.replay_mutations(mutations)?;
1364
1365        // Vertex should exist
1366        assert!(l0.vertex_properties.contains_key(&vid));
1367
1368        // Labels should be empty but entry should exist in vertex_labels
1369        let labels = l0.get_vertex_labels(vid);
1370        assert!(labels.is_some(), "Labels entry should exist even if empty");
1371        assert_eq!(labels.unwrap().len(), 0);
1372
1373        Ok(())
1374    }
1375
1376    #[test]
1377    fn test_now_nanos_returns_nanosecond_range() {
1378        // Test that now_nanos() returns a value in nanosecond range
1379        // As of 2025, Unix timestamp in nanoseconds should be > 1.7e18
1380        // (2025-01-01 is approximately 1,735,689,600 seconds = 1.735e18 nanoseconds)
1381        let now = now_nanos();
1382
1383        // Verify it's in nanosecond range (not microseconds which would be 1000x smaller)
1384        assert!(
1385            now > 1_700_000_000_000_000_000,
1386            "now_nanos() returned {}, expected > 1.7e18 for nanoseconds",
1387            now
1388        );
1389
1390        // Sanity check: should also be less than year 2100 in nanoseconds (4.1e18)
1391        assert!(
1392            now < 4_100_000_000_000_000_000,
1393            "now_nanos() returned {}, expected < 4.1e18",
1394            now
1395        );
1396    }
1397}