Skip to main content

graphmind/graph/
store.rs

1//! # In-Memory Graph Storage -- Adjacency Lists, Indices, and Statistics
2//!
3//! [`GraphStore`] is the central data structure of the Graphmind graph engine. It
4//! holds all nodes, edges, indices, and metadata in memory, optimized for the
5//! access patterns of a graph query engine.
6//!
7//! ## Why adjacency lists, not adjacency matrices?
8//!
9//! An adjacency matrix for V vertices requires O(V^2) space and O(V) time to
10//! find a node's neighbors. Real-world graphs are overwhelmingly **sparse** --
11//! a social network with 1 million users might have 100 million edges, but a
12//! matrix would need 10^12 cells. Adjacency lists use O(V + E) space and give
13//! O(degree) neighbor iteration -- a far better fit.
14//!
15//! ## Arena allocation with MVCC versioning
16//!
17//! Nodes and edges are stored in `Vec<Vec<T>>` structures (arena allocation).
18//! The outer `Vec` is indexed by entity ID (acting as a dense array), and the
19//! inner `Vec` holds successive **MVCC versions** of that entity. This layout
20//! gives O(1) lookup by ID (direct indexing, no hash computation), excellent
21//! cache locality for sequential scans, and natural support for snapshot reads
22//! at any historical version. This is similar to how PostgreSQL stores tuple
23//! versions in heap pages, but without the overhead of disk I/O.
24//!
25//! ## Sorted adjacency lists and `edge_between()`
26//!
27//! The `outgoing` and `incoming` adjacency lists store `Vec<(NodeId, EdgeId)>`
28//! tuples **sorted by NodeId**. This enables `edge_between(a, b)` to use
29//! **binary search** with O(log d) complexity (where d = degree of the node),
30//! which is critical for the `ExpandIntoOperator` that checks edge existence
31//! between two already-bound nodes in triangle-pattern queries.
32//!
33//! ## Secondary indices: label and edge-type
34//!
35//! `label_index: HashMap<Label, HashSet<NodeId>>` and
36//! `edge_type_index: HashMap<EdgeType, HashSet<EdgeId>>` act as secondary
37//! indices, analogous to B-tree indices in an RDBMS. When a Cypher query
38//! specifies `MATCH (n:Person)`, the engine looks up the `Person` entry in
39//! `label_index` and scans only matching nodes, avoiding a full table scan.
40//! We use `HashMap` (not `BTreeMap`) because we only need equality lookups
41//! on labels, not range scans.
42//!
43//! ## ColumnStore integration (late materialization)
44//!
45//! `node_columns` and `edge_columns` provide **columnar storage** for
46//! frequently accessed properties. In the traditional row store (`PropertyMap`
47//! on each node), reading one property from 1000 nodes touches 1000 scattered
48//! `HashMap`s. The columnar store groups all values of the same property
49//! contiguously, enabling vectorized reads and better CPU cache utilization.
50//! The query engine uses **late materialization**: scan operators produce
51//! lightweight `Value::NodeRef(id)` references instead of cloning full nodes,
52//! and properties are resolved on demand from the column store.
53//!
54//! ## GraphStatistics for cost-based optimization
55//!
56//! The query planner uses [`GraphStatistics`] to estimate the cardinality
57//! (number of rows) at each stage of a query plan, choosing the plan with
58//! the lowest estimated cost. See the struct documentation for details.
59//!
60//! ## Key Rust patterns
61//!
62//! - **`Vec<Vec<T>>` arena**: dense ID-indexed storage avoids HashMap overhead;
63//!   inner Vec holds MVCC versions.
64//! - **`HashMap` vs `BTreeMap`**: `HashMap` is used for indices because we need
65//!   O(1) point lookups (label equality), not ordered iteration. `BTreeMap`
66//!   would add an unnecessary O(log n) factor.
67//! - **`Arc`**: `vector_index` and `property_index` are wrapped in `Arc`
68//!   (atomic reference counting) for shared ownership across the query engine
69//!   and background index-maintenance tasks.
70//! - **`thiserror`**: the [`GraphError`] enum uses the `thiserror` crate's
71//!   derive macro to auto-generate `Display` and `Error` implementations,
72//!   reducing boilerplate for error types.
73//!
74//! ## Requirements coverage
75//!
76//! - REQ-GRAPH-001: Property graph data model
77//! - REQ-MEM-001: In-memory storage
78//! - REQ-MEM-003: Memory-optimized data structures
79
80use super::catalog::GraphCatalog;
81use super::edge::Edge;
82use super::node::Node;
83use super::property::{PropertyMap, PropertyValue};
84use super::types::{EdgeId, EdgeType, Label, NodeId};
85use crate::agent::{tools::WebSearchTool, AgentRuntime};
86use crate::graph::storage::ColumnStore;
87use crate::index::IndexManager;
88use crate::vector::{DistanceMetric, VectorIndexManager, VectorResult};
89use std::collections::{HashMap, HashSet};
90use std::sync::Arc;
91use thiserror::Error;
92use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
93
94// Add chrono dependency (local hack like in node.rs)
95mod chrono {
96    pub struct Utc;
97    impl Utc {
98        pub fn now() -> DateTime {
99            DateTime
100        }
101    }
102    pub struct DateTime;
103    impl DateTime {
104        pub fn timestamp_millis(&self) -> i64 {
105            // Use system time for now
106            std::time::SystemTime::now()
107                .duration_since(std::time::UNIX_EPOCH)
108                .unwrap()
109                .as_millis() as i64
110        }
111    }
112}
113
114/// Errors that can occur during graph operations
115#[derive(Error, Debug, PartialEq)]
116pub enum GraphError {
117    #[error("Node {0} not found")]
118    NodeNotFound(NodeId),
119
120    #[error("Edge {0} not found")]
121    EdgeNotFound(EdgeId),
122
123    #[error("Node {0} already exists")]
124    NodeAlreadyExists(NodeId),
125
126    #[error("Edge {0} already exists")]
127    EdgeAlreadyExists(EdgeId),
128
129    #[error("Invalid edge: source node {0} does not exist")]
130    InvalidEdgeSource(NodeId),
131
132    #[error("Invalid edge: target node {0} does not exist")]
133    InvalidEdgeTarget(NodeId),
134
135    #[error("Constraint violation: {0}")]
136    ConstraintViolation(String),
137}
138
139pub type GraphResult<T> = Result<T, GraphError>;
140
141/// Statistics about graph contents for **cost-based query optimization**.
142///
143/// # What is cardinality estimation?
144///
145/// Every query plan is a tree of operators (scan, filter, join, sort, ...).
146/// The query planner must choose among many possible orderings of these
147/// operators. To compare plans, it estimates the **cardinality** (number of
148/// rows) flowing through each operator. For example, if `:Person` has 10,000
149/// nodes and `:Company` has 500, the planner should scan `:Company` first in
150/// a join -- fewer rows means less work at every subsequent stage.
151///
152/// # How selectivity works
153///
154/// **Selectivity** is the probability that a predicate evaluates to `true`
155/// for a randomly chosen row. A selectivity of 0.01 on a 10,000-row scan
156/// means the filter will pass approximately 100 rows. Selectivity is
157/// estimated as `1 / distinct_count` for equality predicates (assuming a
158/// uniform distribution). The `null_fraction` is subtracted before
159/// estimation because NULL values never match equality predicates.
160///
161/// # Sampling approach for property stats
162///
163/// Computing exact statistics for every property on every label would be
164/// expensive in a large graph. Instead, `compute_statistics()` samples the
165/// first 1,000 nodes per label and extrapolates `distinct_count`,
166/// `null_fraction`, and `selectivity`. This is similar to PostgreSQL's
167/// `ANALYZE` command, which samples a configurable fraction of each table.
168/// The trade-off is speed vs. accuracy -- sampling may miss rare values,
169/// but is sufficient for plan selection in practice.
170#[derive(Debug, Clone)]
171pub struct GraphStatistics {
172    /// Total number of nodes
173    pub total_nodes: usize,
174    /// Total number of edges
175    pub total_edges: usize,
176    /// Node count per label
177    pub label_counts: HashMap<Label, usize>,
178    /// Edge count per type
179    pub edge_type_counts: HashMap<EdgeType, usize>,
180    /// Average outgoing degree
181    pub avg_out_degree: f64,
182    /// Property statistics per (label, property_name)
183    pub property_stats: HashMap<(Label, String), PropertyStats>,
184}
185
186/// Statistics about a specific property
187#[derive(Debug, Clone)]
188pub struct PropertyStats {
189    /// Fraction of nodes with this label that have NULL for this property
190    pub null_fraction: f64,
191    /// Estimated number of distinct values
192    pub distinct_count: usize,
193    /// Selectivity: probability of matching a random value (1/distinct_count)
194    pub selectivity: f64,
195}
196
197impl GraphStatistics {
198    /// Estimate the number of rows from a label scan
199    pub fn estimate_label_scan(&self, label: &Label) -> usize {
200        self.label_counts
201            .get(label)
202            .copied()
203            .unwrap_or(self.total_nodes)
204    }
205
206    /// Estimate the number of rows after an expand (edge traversal)
207    pub fn estimate_expand(&self, edge_type: Option<&EdgeType>) -> f64 {
208        match edge_type {
209            Some(et) => self.edge_type_counts.get(et).copied().unwrap_or(0) as f64,
210            None => self.total_edges as f64,
211        }
212    }
213
214    /// Estimate selectivity of an equality filter on a property
215    pub fn estimate_equality_selectivity(&self, label: &Label, property: &str) -> f64 {
216        self.property_stats
217            .get(&(label.clone(), property.to_string()))
218            .map(|ps| ps.selectivity)
219            .unwrap_or(0.1) // Default 10% selectivity
220    }
221
222    /// Format statistics as human-readable text
223    pub fn format(&self) -> String {
224        let mut result = String::new();
225        result.push_str("Graph Statistics:\n");
226        result.push_str(&format!("  Total nodes: {}\n", self.total_nodes));
227        result.push_str(&format!("  Total edges: {}\n", self.total_edges));
228        result.push_str(&format!("  Avg out-degree: {:.2}\n", self.avg_out_degree));
229        result.push_str("  Labels:\n");
230        let mut labels: Vec<_> = self.label_counts.iter().collect();
231        labels.sort_by(|a, b| b.1.cmp(a.1));
232        for (label, count) in labels {
233            result.push_str(&format!("    :{} = {} nodes\n", label.as_str(), count));
234        }
235        result.push_str("  Edge types:\n");
236        let mut types: Vec<_> = self.edge_type_counts.iter().collect();
237        types.sort_by(|a, b| b.1.cmp(a.1));
238        for (etype, count) in types {
239            result.push_str(&format!("    :{} = {} edges\n", etype.as_str(), count));
240        }
241        result
242    }
243}
244
245/// In-memory graph storage engine.
246///
247/// `GraphStore` is the authoritative source of truth for all graph data.
248/// Its data structure layout is designed for the access patterns of a
249/// Cypher query engine:
250///
251/// | Field | Type | Purpose | Lookup cost |
252/// |---|---|---|---|
253/// | `nodes` | `Vec<Vec<Node>>` | Arena-allocated node versions, indexed by `NodeId` | O(1) |
254/// | `edges` | `Vec<Vec<Edge>>` | Arena-allocated edge versions, indexed by `EdgeId` | O(1) |
255/// | `outgoing` | `Vec<Vec<(NodeId, EdgeId)>>` | Sorted adjacency list per node (outgoing) | O(log d) binary search |
256/// | `incoming` | `Vec<Vec<(NodeId, EdgeId)>>` | Sorted adjacency list per node (incoming) | O(log d) binary search |
257/// | `label_index` | `HashMap<Label, HashSet<NodeId>>` | Secondary index: label -> node set | O(1) lookup, O(n) scan |
258/// | `edge_type_index` | `HashMap<EdgeType, HashSet<EdgeId>>` | Secondary index: type -> edge set | O(1) lookup, O(n) scan |
259/// | `node_columns` | `ColumnStore` | Columnar property storage for late materialization | O(1) per cell |
260/// | `catalog` | `GraphCatalog` | Triple-level statistics for graph-native planning (ADR-015) | O(1) |
261///
262/// The `free_node_ids` / `free_edge_ids` vectors enable **ID reuse** after
263/// deletions, avoiding unbounded growth of the arena vectors.
264///
265/// Thread safety: `GraphStore` is not `Sync` by itself. Concurrent access
266/// is managed by the server layer, which wraps it in `Arc<RwLock<GraphStore>>`
267/// for shared-nothing read parallelism with exclusive write access.
268#[derive(Debug)]
269pub struct GraphStore {
270    /// Node storage (Arena with versioning: NodeId -> [Versions])
271    nodes: Vec<Vec<Node>>,
272
273    /// Edge storage (Arena with versioning: EdgeId -> [Versions])
274    edges: Vec<Vec<Edge>>,
275
276    /// Outgoing edges for each node, sorted by target NodeId: (target_NodeId, EdgeId)
277    outgoing: Vec<Vec<(NodeId, EdgeId)>>,
278
279    /// Incoming edges for each node, sorted by source NodeId: (source_NodeId, EdgeId)
280    incoming: Vec<Vec<(NodeId, EdgeId)>>,
281
282    /// Current global version for MVCC
283    pub current_version: u64,
284
285    /// Free node IDs for reuse
286    free_node_ids: Vec<u64>,
287
288    /// Free edge IDs for reuse
289    free_edge_ids: Vec<u64>,
290
291    /// Label index for fast lookups
292    label_index: HashMap<Label, HashSet<NodeId>>,
293
294    /// Edge type index for fast lookups
295    edge_type_index: HashMap<EdgeType, HashSet<EdgeId>>,
296
297    /// Vector indices manager
298    pub vector_index: Arc<VectorIndexManager>,
299
300    /// Property indices manager
301    pub property_index: Arc<IndexManager>,
302
303    /// Columnar storage for node properties
304    pub node_columns: ColumnStore,
305
306    /// Columnar storage for edge properties
307    pub edge_columns: ColumnStore,
308
309    /// Async index event sender
310    pub index_sender: Option<UnboundedSender<crate::graph::event::IndexEvent>>,
311
312    /// Next node ID
313    next_node_id: u64,
314
315    /// Next edge ID
316    next_edge_id: u64,
317
318    /// Triple-level statistics catalog for graph-native query planning (ADR-015)
319    catalog: GraphCatalog,
320}
321
322impl GraphStore {
323    /// Create a new empty graph store
324    pub fn new() -> Self {
325        GraphStore {
326            nodes: Vec::with_capacity(1024),
327            edges: Vec::with_capacity(4096),
328            outgoing: Vec::with_capacity(1024),
329            incoming: Vec::with_capacity(1024),
330            current_version: 1,
331            free_node_ids: Vec::new(),
332            free_edge_ids: Vec::new(),
333            label_index: HashMap::new(),
334            edge_type_index: HashMap::new(),
335            vector_index: Arc::new(VectorIndexManager::new()),
336            property_index: Arc::new(IndexManager::new()),
337            node_columns: ColumnStore::new(),
338            edge_columns: ColumnStore::new(),
339            index_sender: None,
340            next_node_id: 1,
341            next_edge_id: 1,
342            catalog: GraphCatalog::new(),
343        }
344    }
345
346    /// Create a new GraphStore with async indexing enabled
347    pub fn with_async_indexing() -> (
348        Self,
349        tokio::sync::mpsc::UnboundedReceiver<crate::graph::event::IndexEvent>,
350    ) {
351        let (tx, rx) = unbounded_channel();
352        let mut store = Self::new();
353        store.index_sender = Some(tx);
354        (store, rx)
355    }
356
357    /// Start the background indexer loop
358    pub async fn start_background_indexer(
359        mut receiver: tokio::sync::mpsc::UnboundedReceiver<crate::graph::event::IndexEvent>,
360        vector_index: Arc<VectorIndexManager>,
361        property_index: Arc<IndexManager>,
362        tenant_manager: Arc<crate::persistence::TenantManager>,
363    ) {
364        use crate::graph::event::IndexEvent::*;
365
366        while let Some(event) = receiver.recv().await {
367            match event {
368                NodeCreated {
369                    tenant_id,
370                    id,
371                    labels,
372                    properties,
373                } => {
374                    for (key, value) in &properties {
375                        if let PropertyValue::Vector(vec) = value {
376                            for label in &labels {
377                                let _ = vector_index.add_vector(label.as_str(), key, id, vec);
378                            }
379                        }
380                        for label in &labels {
381                            property_index.index_insert(label, key, value.clone(), id);
382                        }
383
384                        // Auto-Embed check
385                        if let PropertyValue::String(text) = value {
386                            if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
387                                if let Some(config) = tenant.embed_config {
388                                    for label in &labels {
389                                        if let Some(keys) =
390                                            config.embedding_policies.get(label.as_str())
391                                        {
392                                            if keys.contains(key) {
393                                                // Trigger Auto-Embed
394                                                let vector_index_clone = Arc::clone(&vector_index);
395                                                let label_str = label.as_str().to_string();
396                                                let key_clone = key.clone();
397                                                let text_clone = text.clone();
398                                                let config_clone = config.clone();
399
400                                                tokio::spawn(async move {
401                                                    if let Ok(pipeline) =
402                                                        crate::embed::EmbedPipeline::new(
403                                                            config_clone,
404                                                        )
405                                                    {
406                                                        if let Ok(chunks) =
407                                                            pipeline.process_text(&text_clone).await
408                                                        {
409                                                            for chunk in chunks {
410                                                                let _ = vector_index_clone
411                                                                    .add_vector(
412                                                                        &label_str,
413                                                                        &key_clone,
414                                                                        id,
415                                                                        &chunk.embedding,
416                                                                    );
417                                                            }
418                                                        }
419                                                    }
420                                                });
421                                            }
422                                        }
423                                    }
424                                }
425                            }
426                        }
427                    }
428
429                    // Agentic Enrichment Trigger
430                    if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
431                        if let Some(agent_config) = tenant.agent_config {
432                            if agent_config.enabled {
433                                for label in &labels {
434                                    if let Some(trigger_prompt) =
435                                        agent_config.policies.get(label.as_str())
436                                    {
437                                        // Construct context from node properties
438                                        let context =
439                                            format!("Node: {} {:?}", label.as_str(), properties);
440                                        let task = trigger_prompt.clone();
441                                        let mut runtime = AgentRuntime::new(agent_config.clone());
442                                        let label_str = label.as_str().to_string();
443
444                                        // Register tools based on config/availability
445                                        if let Some(api_key) = &agent_config.api_key {
446                                            runtime.register_tool(Arc::new(WebSearchTool::new(
447                                                api_key.clone(),
448                                            )));
449                                        } else {
450                                            // Mock/Prototype mode
451                                            runtime.register_tool(Arc::new(WebSearchTool::new(
452                                                "mock".to_string(),
453                                            )));
454                                        }
455
456                                        tokio::spawn(async move {
457                                            if let Ok(result) =
458                                                runtime.process_trigger(&task, &context).await
459                                            {
460                                                println!(
461                                                    "Agent Action [{}] -> {}",
462                                                    label_str, result
463                                                );
464                                                // Future: Write result back to graph properties using a GraphStore client
465                                            }
466                                        });
467                                    }
468                                }
469                            }
470                        }
471                    }
472                }
473                NodeDeleted {
474                    tenant_id: _,
475                    id,
476                    labels,
477                    properties,
478                } => {
479                    for (key, value) in properties {
480                        for label in &labels {
481                            property_index.index_remove(label, &key, &value, id);
482                        }
483                    }
484                }
485                PropertySet {
486                    tenant_id,
487                    id,
488                    labels,
489                    key,
490                    old_value,
491                    new_value,
492                } => {
493                    if let Some(old) = old_value {
494                        for label in &labels {
495                            property_index.index_remove(label, &key, &old, id);
496                        }
497                    }
498                    for label in &labels {
499                        property_index.index_insert(label, &key, new_value.clone(), id);
500                    }
501                    if let PropertyValue::Vector(vec) = &new_value {
502                        for label in &labels {
503                            let _ = vector_index.add_vector(label.as_str(), &key, id, vec);
504                        }
505                    }
506
507                    // Auto-Embed check
508                    if let PropertyValue::String(text) = &new_value {
509                        if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
510                            if let Some(config) = tenant.embed_config {
511                                for label in &labels {
512                                    if let Some(keys) =
513                                        config.embedding_policies.get(label.as_str())
514                                    {
515                                        if keys.contains(&key) {
516                                            let vector_index_clone = Arc::clone(&vector_index);
517                                            let label_str = label.as_str().to_string();
518                                            let key_clone = key.clone();
519                                            let text_clone = text.clone();
520                                            let config_clone = config.clone();
521
522                                            tokio::spawn(async move {
523                                                if let Ok(pipeline) =
524                                                    crate::embed::EmbedPipeline::new(config_clone)
525                                                {
526                                                    if let Ok(chunks) =
527                                                        pipeline.process_text(&text_clone).await
528                                                    {
529                                                        if let Some(first) = chunks.first() {
530                                                            let _ = vector_index_clone.add_vector(
531                                                                &label_str,
532                                                                &key_clone,
533                                                                id,
534                                                                &first.embedding,
535                                                            );
536                                                        }
537                                                    }
538                                                }
539                                            });
540                                        }
541                                    }
542                                }
543                            }
544                        }
545                    }
546
547                    // Agentic Enrichment Trigger (PropertySet)
548                    if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
549                        if let Some(agent_config) = tenant.agent_config {
550                            if agent_config.enabled {
551                                for label in &labels {
552                                    if let Some(trigger_prompt) =
553                                        agent_config.policies.get(label.as_str())
554                                    {
555                                        let context = format!(
556                                            "Node: {} (Property Updated: {})",
557                                            label.as_str(),
558                                            key
559                                        );
560                                        let task = trigger_prompt.clone();
561                                        let mut runtime = AgentRuntime::new(agent_config.clone());
562                                        let label_str = label.as_str().to_string();
563
564                                        if let Some(api_key) = &agent_config.api_key {
565                                            runtime.register_tool(Arc::new(WebSearchTool::new(
566                                                api_key.clone(),
567                                            )));
568                                        } else {
569                                            runtime.register_tool(Arc::new(WebSearchTool::new(
570                                                "mock".to_string(),
571                                            )));
572                                        }
573
574                                        tokio::spawn(async move {
575                                            if let Ok(result) =
576                                                runtime.process_trigger(&task, &context).await
577                                            {
578                                                println!(
579                                                    "Agent Action [{}] -> {}",
580                                                    label_str, result
581                                                );
582                                            }
583                                        });
584                                    }
585                                }
586                            }
587                        }
588                    }
589                }
590                LabelAdded {
591                    tenant_id,
592                    id,
593                    label,
594                    properties,
595                } => {
596                    for (key, value) in properties {
597                        if let PropertyValue::Vector(vec) = &value {
598                            let _ = vector_index.add_vector(label.as_str(), &key, id, vec);
599                        }
600                        property_index.index_insert(&label, &key, value.clone(), id);
601
602                        // Auto-Embed check
603                        if let PropertyValue::String(text) = &value {
604                            if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
605                                if let Some(config) = tenant.embed_config {
606                                    if let Some(keys) =
607                                        config.embedding_policies.get(label.as_str())
608                                    {
609                                        if keys.contains(&key) {
610                                            let vector_index_clone = Arc::clone(&vector_index);
611                                            let label_str = label.as_str().to_string();
612                                            let key_clone = key.clone();
613                                            let text_clone = text.clone();
614                                            let config_clone = config.clone();
615
616                                            tokio::spawn(async move {
617                                                if let Ok(pipeline) =
618                                                    crate::embed::EmbedPipeline::new(config_clone)
619                                                {
620                                                    if let Ok(chunks) =
621                                                        pipeline.process_text(&text_clone).await
622                                                    {
623                                                        if let Some(first) = chunks.first() {
624                                                            let _ = vector_index_clone.add_vector(
625                                                                &label_str,
626                                                                &key_clone,
627                                                                id,
628                                                                &first.embedding,
629                                                            );
630                                                        }
631                                                    }
632                                                }
633                                            });
634                                        }
635                                    }
636                                }
637                            }
638                        }
639                    }
640                }
641            }
642        }
643    }
644
645    /// Create a node with auto-generated ID and single label
646    pub fn create_node(&mut self, label: impl Into<Label>) -> NodeId {
647        let node_id_u64 = if let Some(id) = self.free_node_ids.pop() {
648            id
649        } else {
650            let id = self.next_node_id;
651            self.next_node_id += 1;
652            id
653        };
654        let node_id = NodeId::new(node_id_u64);
655        let idx = node_id_u64 as usize;
656
657        let label = label.into();
658        let mut node = Node::new(node_id, label.clone());
659        node.version = self.current_version;
660
661        // Add to label index
662        self.label_index
663            .entry(label.clone())
664            .or_default()
665            .insert(node_id);
666
667        // Update catalog label count
668        self.catalog.on_label_added(&label);
669
670        // Ensure storage capacity
671        if idx >= self.nodes.len() {
672            self.nodes.resize(idx + 1, Vec::new());
673            self.outgoing.resize(idx + 1, Vec::new());
674            self.incoming.resize(idx + 1, Vec::new());
675        }
676
677        let event = crate::graph::event::IndexEvent::NodeCreated {
678            tenant_id: "default".to_string(),
679            id: node_id,
680            labels: node.labels.iter().cloned().collect(),
681            properties: node.properties.clone(),
682        };
683
684        if let Some(sender) = &self.index_sender {
685            let _ = sender.send(event);
686        } else {
687            self.handle_index_event(event, None);
688        }
689
690        self.nodes[idx].push(node);
691        node_id
692    }
693
694    /// Create a node with multiple labels and properties
695    pub fn create_node_with_properties(
696        &mut self,
697        tenant_id: &str,
698        labels: Vec<Label>,
699        properties: PropertyMap,
700    ) -> NodeId {
701        let node_id_u64 = if let Some(id) = self.free_node_ids.pop() {
702            id
703        } else {
704            let id = self.next_node_id;
705            self.next_node_id += 1;
706            id
707        };
708        let node_id = NodeId::new(node_id_u64);
709        let idx = node_id_u64 as usize;
710
711        // Populate columnar storage
712        for (key, value) in &properties {
713            self.node_columns.set_property(idx, key, value.clone());
714        }
715
716        let mut node = Node::new_with_properties(node_id, labels.clone(), properties);
717        node.version = self.current_version;
718
719        // Add to label indices
720        for label in &labels {
721            self.label_index
722                .entry(label.clone())
723                .or_default()
724                .insert(node_id);
725            // Update catalog label count
726            self.catalog.on_label_added(label);
727        }
728
729        // Ensure storage capacity
730        if idx >= self.nodes.len() {
731            self.nodes.resize(idx + 1, Vec::new());
732            self.outgoing.resize(idx + 1, Vec::new());
733            self.incoming.resize(idx + 1, Vec::new());
734        }
735
736        let event = crate::graph::event::IndexEvent::NodeCreated {
737            tenant_id: tenant_id.to_string(),
738            id: node_id,
739            labels: node.labels.iter().cloned().collect(),
740            properties: node.properties.clone(),
741        };
742
743        if let Some(sender) = &self.index_sender {
744            let _ = sender.send(event);
745        } else {
746            self.handle_index_event(event, None);
747        }
748
749        self.nodes[idx].push(node);
750        node_id
751    }
752
753    /// Get a node by ID at a specific version (MVCC)
754    pub fn get_node_at_version(&self, id: NodeId, version: u64) -> Option<&Node> {
755        let idx = id.as_u64() as usize;
756        let versions = self.nodes.get(idx)?;
757
758        // Find the latest version <= requested version
759        // Versions are sorted by creation time
760        versions.iter().rev().find(|n| n.version <= version)
761    }
762
763    /// Get a node by ID (uses current version)
764    pub fn get_node(&self, id: NodeId) -> Option<&Node> {
765        self.get_node_at_version(id, self.current_version)
766    }
767
768    /// Get a mutable node by ID (always latest version)
769    pub fn get_node_mut(&mut self, id: NodeId) -> Option<&mut Node> {
770        self.nodes
771            .get_mut(id.as_u64() as usize)
772            .and_then(|v| v.last_mut())
773    }
774
775    /// Check if a node exists
776    pub fn has_node(&self, id: NodeId) -> bool {
777        self.get_node(id).is_some()
778    }
779
780    /// Set a property on a node and update vector indices if necessary
781    pub fn set_node_property(
782        &mut self,
783        tenant_id: &str,
784        node_id: NodeId,
785        key: impl Into<String>,
786        value: impl Into<PropertyValue>,
787    ) -> GraphResult<()> {
788        let key_str = key.into();
789        let val = value.into();
790        let idx = node_id.as_u64() as usize;
791
792        // Enforce UNIQUE constraints: check if this label+property+value already exists
793        if let Some(node_versions) = self.nodes.get(idx) {
794            if let Some(node) = node_versions.last() {
795                for label in &node.labels {
796                    if self.property_index.has_unique_constraint(label, &key_str) {
797                        // Check if another node already has this value
798                        if let Some(existing_ids) =
799                            self.property_index.lookup(label, &key_str, &val)
800                        {
801                            for existing_id in existing_ids {
802                                if existing_id != node_id {
803                                    return Err(GraphError::ConstraintViolation(format!(
804                                        "Node already exists with label '{}' and {} = {:?}",
805                                        label.as_str(),
806                                        key_str,
807                                        val
808                                    )));
809                                }
810                            }
811                        }
812                    }
813                }
814            }
815        }
816
817        // Update columnar storage (always latest)
818        self.node_columns.set_property(idx, &key_str, val.clone());
819
820        // Get access to versions
821        let versions = self
822            .nodes
823            .get_mut(idx)
824            .ok_or(GraphError::NodeNotFound(node_id))?;
825        let latest_node = versions.last().ok_or(GraphError::NodeNotFound(node_id))?;
826
827        let old_val;
828
829        if latest_node.version < self.current_version {
830            // COW: Create new version
831            let mut new_node = latest_node.clone();
832            new_node.version = self.current_version;
833            new_node.updated_at = chrono::Utc::now().timestamp_millis();
834            old_val = new_node.set_property(key_str.clone(), val.clone());
835            versions.push(new_node);
836        } else {
837            // Update in place (same transaction/version)
838            let node = versions.last_mut().unwrap();
839            old_val = node.set_property(key_str.clone(), val.clone());
840        }
841
842        let event = crate::graph::event::IndexEvent::PropertySet {
843            tenant_id: tenant_id.to_string(),
844            id: node_id,
845            labels: versions.last().unwrap().labels.iter().cloned().collect(),
846            key: key_str,
847            old_value: old_val,
848            new_value: val,
849        };
850
851        if let Some(sender) = &self.index_sender {
852            let _ = sender.send(event);
853        } else {
854            self.handle_index_event(event, None);
855        }
856
857        Ok(())
858    }
859
860    /// Delete a node and all its connected edges
861    pub fn delete_node(&mut self, tenant_id: &str, id: NodeId) -> GraphResult<Node> {
862        let idx = id.as_u64() as usize;
863        let latest_node = self
864            .get_node(id)
865            .ok_or(GraphError::NodeNotFound(id))?
866            .clone();
867
868        // Create a tombstone version (we use a special property or metadata in a real system)
869        // For now, we'll just not return it in get_node_at_version if we had a flag.
870        // Let's add a `deleted` flag to Node/Edge for true MVCC.
871
872        // Add to free list for reuse (In true MVCC, we only reuse after compaction/vacuum)
873        self.free_node_ids.push(id.as_u64());
874
875        // For this prototype, we'll keep the removal logic but wrap it in MVCC metadata if needed.
876        // Actually, let's keep it simple: removal from the latest version effectively deletes it.
877        // But to keep history, we should NOT remove from the Vec.
878
879        // Remove from label indices and update catalog
880        for label in &latest_node.labels {
881            if let Some(node_set) = self.label_index.get_mut(label) {
882                node_set.remove(&id);
883            }
884            self.catalog.on_label_removed(label);
885        }
886
887        let event = crate::graph::event::IndexEvent::NodeDeleted {
888            tenant_id: tenant_id.to_string(),
889            id,
890            labels: latest_node.labels.iter().cloned().collect(),
891            properties: latest_node.properties.clone(),
892        };
893
894        if let Some(sender) = &self.index_sender {
895            let _ = sender.send(event);
896        } else {
897            self.handle_index_event(event, None);
898        }
899
900        // Remove from the versions (breaking historical reads for now, full MVCC is complex)
901        // TODO: Implement proper tombstone versions
902        let node = self.nodes[idx].pop().unwrap();
903
904        // Remove all connected edges
905        let outgoing_edges: Vec<EdgeId> = std::mem::take(&mut self.outgoing[idx])
906            .into_iter()
907            .map(|(_, eid)| eid)
908            .collect();
909        let incoming_edges: Vec<EdgeId> = std::mem::take(&mut self.incoming[idx])
910            .into_iter()
911            .map(|(_, eid)| eid)
912            .collect();
913
914        for edge_id in outgoing_edges.iter().chain(incoming_edges.iter()) {
915            let _ = self.delete_edge(*edge_id);
916        }
917
918        Ok(node)
919    }
920
921    /// Add a label to an existing node AND update the label index
922    ///
923    /// This is the correct way to add labels to nodes after creation.
924    /// Using `node.add_label()` directly will NOT update the label_index,
925    /// making the node invisible to `get_nodes_by_label()` queries.
926    pub fn add_label_to_node(
927        &mut self,
928        tenant_id: &str,
929        node_id: NodeId,
930        label: impl Into<Label>,
931    ) -> GraphResult<()> {
932        let label = label.into();
933        let idx = node_id.as_u64() as usize;
934
935        // Get the node and add the label
936        let node = self
937            .nodes
938            .get_mut(idx)
939            .and_then(|v| v.last_mut())
940            .ok_or(GraphError::NodeNotFound(node_id))?;
941        node.add_label(label.clone());
942
943        // Update the label index so queries can find this node by the new label
944        self.label_index
945            .entry(label.clone())
946            .or_default()
947            .insert(node_id);
948
949        // Update catalog label count
950        self.catalog.on_label_added(&label);
951
952        let event = crate::graph::event::IndexEvent::LabelAdded {
953            tenant_id: tenant_id.to_string(),
954            id: node_id,
955            label: label.clone(),
956            properties: node.properties.clone(),
957        };
958
959        if let Some(sender) = &self.index_sender {
960            let _ = sender.send(event);
961        } else {
962            self.handle_index_event(event, None);
963        }
964
965        Ok(())
966    }
967
968    /// Create an edge between two nodes
969    pub fn create_edge(
970        &mut self,
971        source: NodeId,
972        target: NodeId,
973        edge_type: impl Into<EdgeType>,
974    ) -> GraphResult<EdgeId> {
975        // Validate nodes exist
976        if !self.has_node(source) {
977            return Err(GraphError::InvalidEdgeSource(source));
978        }
979        if !self.has_node(target) {
980            return Err(GraphError::InvalidEdgeTarget(target));
981        }
982
983        let edge_id_u64 = if let Some(id) = self.free_edge_ids.pop() {
984            id
985        } else {
986            let id = self.next_edge_id;
987            self.next_edge_id += 1;
988            id
989        };
990        let edge_id = EdgeId::new(edge_id_u64);
991        let idx = edge_id_u64 as usize;
992
993        let edge_type = edge_type.into();
994        let mut edge = Edge::new(edge_id, source, target, edge_type.clone());
995        edge.version = self.current_version;
996
997        // Update adjacency lists (sorted insert by target/source NodeId)
998        {
999            let out_list = &mut self.outgoing[source.as_u64() as usize];
1000            let pos = out_list
1001                .binary_search_by_key(&target, |(nid, _)| *nid)
1002                .unwrap_or_else(|p| p);
1003            out_list.insert(pos, (target, edge_id));
1004        }
1005        {
1006            let in_list = &mut self.incoming[target.as_u64() as usize];
1007            let pos = in_list
1008                .binary_search_by_key(&source, |(nid, _)| *nid)
1009                .unwrap_or_else(|p| p);
1010            in_list.insert(pos, (source, edge_id));
1011        }
1012
1013        // Ensure storage capacity
1014        if idx >= self.edges.len() {
1015            self.edges.resize(idx + 1, Vec::new());
1016        }
1017
1018        // Update edge type index
1019        self.edge_type_index
1020            .entry(edge_type.clone())
1021            .or_default()
1022            .insert(edge_id);
1023
1024        // Update catalog triple stats
1025        let src_labels: Vec<Label> = self
1026            .get_node(source)
1027            .map(|n| n.labels.iter().cloned().collect())
1028            .unwrap_or_default();
1029        let tgt_labels: Vec<Label> = self
1030            .get_node(target)
1031            .map(|n| n.labels.iter().cloned().collect())
1032            .unwrap_or_default();
1033        self.catalog
1034            .on_edge_created(source, &src_labels, &edge_type, target, &tgt_labels);
1035
1036        self.edges[idx].push(edge);
1037        Ok(edge_id)
1038    }
1039
1040    /// Create an edge with properties
1041    pub fn create_edge_with_properties(
1042        &mut self,
1043        source: NodeId,
1044        target: NodeId,
1045        edge_type: impl Into<EdgeType>,
1046        properties: PropertyMap,
1047    ) -> GraphResult<EdgeId> {
1048        // Validate nodes exist
1049        if !self.has_node(source) {
1050            return Err(GraphError::InvalidEdgeSource(source));
1051        }
1052        if !self.has_node(target) {
1053            return Err(GraphError::InvalidEdgeTarget(target));
1054        }
1055
1056        let edge_id_u64 = if let Some(id) = self.free_edge_ids.pop() {
1057            id
1058        } else {
1059            let id = self.next_edge_id;
1060            self.next_edge_id += 1;
1061            id
1062        };
1063        let edge_id = EdgeId::new(edge_id_u64);
1064        let idx = edge_id_u64 as usize;
1065
1066        // Populate columnar storage
1067        for (key, value) in &properties {
1068            self.edge_columns.set_property(idx, key, value.clone());
1069        }
1070
1071        let edge_type = edge_type.into();
1072        let mut edge =
1073            Edge::new_with_properties(edge_id, source, target, edge_type.clone(), properties);
1074        edge.version = self.current_version;
1075
1076        // Update adjacency lists (sorted insert by target/source NodeId)
1077        {
1078            let out_list = &mut self.outgoing[source.as_u64() as usize];
1079            let pos = out_list
1080                .binary_search_by_key(&target, |(nid, _)| *nid)
1081                .unwrap_or_else(|p| p);
1082            out_list.insert(pos, (target, edge_id));
1083        }
1084        {
1085            let in_list = &mut self.incoming[target.as_u64() as usize];
1086            let pos = in_list
1087                .binary_search_by_key(&source, |(nid, _)| *nid)
1088                .unwrap_or_else(|p| p);
1089            in_list.insert(pos, (source, edge_id));
1090        }
1091
1092        // Ensure storage capacity
1093        if idx >= self.edges.len() {
1094            self.edges.resize(idx + 1, Vec::new());
1095        }
1096
1097        // Update edge type index
1098        self.edge_type_index
1099            .entry(edge_type.clone())
1100            .or_default()
1101            .insert(edge_id);
1102
1103        // Update catalog triple stats
1104        let src_labels: Vec<Label> = self
1105            .get_node(source)
1106            .map(|n| n.labels.iter().cloned().collect())
1107            .unwrap_or_default();
1108        let tgt_labels: Vec<Label> = self
1109            .get_node(target)
1110            .map(|n| n.labels.iter().cloned().collect())
1111            .unwrap_or_default();
1112        self.catalog
1113            .on_edge_created(source, &src_labels, &edge_type, target, &tgt_labels);
1114
1115        self.edges[idx].push(edge);
1116        Ok(edge_id)
1117    }
1118
1119    /// Get an edge by ID at a specific version (MVCC)
1120    pub fn get_edge_at_version(&self, id: EdgeId, version: u64) -> Option<&Edge> {
1121        let idx = id.as_u64() as usize;
1122        let versions = self.edges.get(idx)?;
1123
1124        // Find the latest version <= requested version
1125        versions.iter().rev().find(|e| e.version <= version)
1126    }
1127
1128    /// Get an edge by ID (uses current version)
1129    pub fn get_edge(&self, id: EdgeId) -> Option<&Edge> {
1130        self.get_edge_at_version(id, self.current_version)
1131    }
1132
1133    /// Get a mutable edge by ID (always latest version)
1134    pub fn get_edge_mut(&mut self, id: EdgeId) -> Option<&mut Edge> {
1135        self.edges
1136            .get_mut(id.as_u64() as usize)
1137            .and_then(|v| v.last_mut())
1138    }
1139
1140    /// Check if an edge exists
1141    pub fn has_edge(&self, id: EdgeId) -> bool {
1142        self.get_edge(id).is_some()
1143    }
1144
1145    /// Delete an edge
1146    pub fn delete_edge(&mut self, id: EdgeId) -> GraphResult<Edge> {
1147        let idx = id.as_u64() as usize;
1148
1149        // Collect catalog info before removal
1150        let (src_labels, tgt_labels, source_id, target_id, edge_type_clone) = {
1151            let edge = self
1152                .edges
1153                .get(idx)
1154                .and_then(|v| v.last())
1155                .ok_or(GraphError::EdgeNotFound(id))?;
1156            let src_labels: Vec<Label> = self
1157                .get_node(edge.source)
1158                .map(|n| n.labels.iter().cloned().collect())
1159                .unwrap_or_default();
1160            let tgt_labels: Vec<Label> = self
1161                .get_node(edge.target)
1162                .map(|n| n.labels.iter().cloned().collect())
1163                .unwrap_or_default();
1164            (
1165                src_labels,
1166                tgt_labels,
1167                edge.source,
1168                edge.target,
1169                edge.edge_type.clone(),
1170            )
1171        };
1172
1173        let edge = self
1174            .edges
1175            .get_mut(idx)
1176            .and_then(|v| v.pop())
1177            .ok_or(GraphError::EdgeNotFound(id))?;
1178
1179        // Add to free list
1180        self.free_edge_ids.push(id.as_u64());
1181
1182        // Remove from edge type index
1183        if let Some(edge_set) = self.edge_type_index.get_mut(&edge.edge_type) {
1184            edge_set.remove(&id);
1185        }
1186
1187        // Remove from adjacency lists
1188        if let Some(adj) = self.outgoing.get_mut(edge.source.as_u64() as usize) {
1189            adj.retain(|&(_, eid)| eid != id);
1190        }
1191        if let Some(adj) = self.incoming.get_mut(edge.target.as_u64() as usize) {
1192            adj.retain(|&(_, eid)| eid != id);
1193        }
1194
1195        // Update catalog triple stats
1196        self.catalog.on_edge_deleted(
1197            source_id,
1198            &src_labels,
1199            &edge_type_clone,
1200            target_id,
1201            &tgt_labels,
1202        );
1203
1204        Ok(edge)
1205    }
1206
1207    /// Get all outgoing edges from a node
1208    pub fn get_outgoing_edges(&self, node_id: NodeId) -> Vec<&Edge> {
1209        self.outgoing
1210            .get(node_id.as_u64() as usize)
1211            .map(|entries| {
1212                entries
1213                    .iter()
1214                    .filter_map(|&(_, eid)| self.get_edge(eid))
1215                    .collect()
1216            })
1217            .unwrap_or_default()
1218    }
1219
1220    /// Get all incoming edges to a node
1221    pub fn get_incoming_edges(&self, node_id: NodeId) -> Vec<&Edge> {
1222        self.incoming
1223            .get(node_id.as_u64() as usize)
1224            .map(|entries| {
1225                entries
1226                    .iter()
1227                    .filter_map(|&(_, eid)| self.get_edge(eid))
1228                    .collect()
1229            })
1230            .unwrap_or_default()
1231    }
1232
1233    /// Get outgoing edge targets as lightweight tuples (no Edge clone)
1234    /// Returns (EdgeId, source NodeId, target NodeId, &EdgeType) for each outgoing edge
1235    pub fn get_outgoing_edge_targets(
1236        &self,
1237        node_id: NodeId,
1238    ) -> Vec<(EdgeId, NodeId, NodeId, &EdgeType)> {
1239        self.outgoing
1240            .get(node_id.as_u64() as usize)
1241            .map(|entries| {
1242                entries
1243                    .iter()
1244                    .filter_map(|&(_, eid)| {
1245                        self.get_edge(eid)
1246                            .map(|e| (eid, e.source, e.target, &e.edge_type))
1247                    })
1248                    .collect()
1249            })
1250            .unwrap_or_default()
1251    }
1252
1253    /// Get incoming edge sources as lightweight tuples (no Edge clone)
1254    /// Returns (EdgeId, source NodeId, target NodeId, &EdgeType) for each incoming edge
1255    pub fn get_incoming_edge_sources(
1256        &self,
1257        node_id: NodeId,
1258    ) -> Vec<(EdgeId, NodeId, NodeId, &EdgeType)> {
1259        self.incoming
1260            .get(node_id.as_u64() as usize)
1261            .map(|entries| {
1262                entries
1263                    .iter()
1264                    .filter_map(|&(_, eid)| {
1265                        self.get_edge(eid)
1266                            .map(|e| (eid, e.source, e.target, &e.edge_type))
1267                    })
1268                    .collect()
1269            })
1270            .unwrap_or_default()
1271    }
1272
1273    /// Check if an edge exists between source and target, optionally filtered by edge type.
1274    /// Uses binary search on sorted adjacency lists for O(log d) lookup.
1275    /// Returns the first matching EdgeId, or None.
1276    pub fn edge_between(
1277        &self,
1278        source: NodeId,
1279        target: NodeId,
1280        edge_type: Option<&EdgeType>,
1281    ) -> Option<EdgeId> {
1282        let out_len = self
1283            .outgoing
1284            .get(source.as_u64() as usize)
1285            .map(|v| v.len())
1286            .unwrap_or(0);
1287        let in_len = self
1288            .incoming
1289            .get(target.as_u64() as usize)
1290            .map(|v| v.len())
1291            .unwrap_or(0);
1292
1293        // Use outgoing from source (search for target) or incoming to target (search for source)
1294        let (entries, search_key) = if out_len <= in_len {
1295            (self.outgoing.get(source.as_u64() as usize)?, target)
1296        } else {
1297            (self.incoming.get(target.as_u64() as usize)?, source)
1298        };
1299
1300        // Binary search to find the neighborhood of matching entries
1301        let start = match entries.binary_search_by_key(&search_key, |(nid, _)| *nid) {
1302            Ok(pos) => {
1303                // Walk back to find the first entry with this key
1304                let mut p = pos;
1305                while p > 0 && entries[p - 1].0 == search_key {
1306                    p -= 1;
1307                }
1308                p
1309            }
1310            Err(_) => return None,
1311        };
1312
1313        // Scan forward through entries with the matching key
1314        for &(nid, eid) in entries.iter().skip(start) {
1315            if nid != search_key {
1316                break;
1317            }
1318            if let Some(e) = self.get_edge(eid) {
1319                if e.source == source && e.target == target {
1320                    match edge_type {
1321                        Some(et) if &e.edge_type != et => continue,
1322                        _ => return Some(eid),
1323                    }
1324                }
1325            }
1326        }
1327        None
1328    }
1329
1330    /// Get all edges between source and target, optionally filtered by edge type.
1331    /// Uses binary search on sorted adjacency lists.
1332    pub fn edges_between(
1333        &self,
1334        source: NodeId,
1335        target: NodeId,
1336        edge_type: Option<&EdgeType>,
1337    ) -> Vec<EdgeId> {
1338        let out_len = self
1339            .outgoing
1340            .get(source.as_u64() as usize)
1341            .map(|v| v.len())
1342            .unwrap_or(0);
1343        let in_len = self
1344            .incoming
1345            .get(target.as_u64() as usize)
1346            .map(|v| v.len())
1347            .unwrap_or(0);
1348
1349        let (entries, search_key) = if out_len <= in_len {
1350            match self.outgoing.get(source.as_u64() as usize) {
1351                Some(e) => (e, target),
1352                None => return Vec::new(),
1353            }
1354        } else {
1355            match self.incoming.get(target.as_u64() as usize) {
1356                Some(e) => (e, source),
1357                None => return Vec::new(),
1358            }
1359        };
1360
1361        let start = match entries.binary_search_by_key(&search_key, |(nid, _)| *nid) {
1362            Ok(pos) => {
1363                let mut p = pos;
1364                while p > 0 && entries[p - 1].0 == search_key {
1365                    p -= 1;
1366                }
1367                p
1368            }
1369            Err(_) => return Vec::new(),
1370        };
1371
1372        let mut result = Vec::new();
1373        for &(nid, eid) in entries.iter().skip(start) {
1374            if nid != search_key {
1375                break;
1376            }
1377            if let Some(e) = self.get_edge(eid) {
1378                if e.source == source && e.target == target {
1379                    match edge_type {
1380                        Some(et) if &e.edge_type != et => {}
1381                        _ => result.push(eid),
1382                    }
1383                }
1384            }
1385        }
1386        result
1387    }
1388
1389    /// Get all nodes with a specific label
1390    pub fn get_nodes_by_label(&self, label: &Label) -> Vec<&Node> {
1391        self.label_index
1392            .get(label)
1393            .map(|node_ids| {
1394                node_ids
1395                    .iter()
1396                    .filter_map(|&id| self.get_node(id))
1397                    .collect()
1398            })
1399            .unwrap_or_default()
1400    }
1401
1402    /// Get all edges of a specific type
1403    pub fn get_edges_by_type(&self, edge_type: &EdgeType) -> Vec<&Edge> {
1404        self.edge_type_index
1405            .get(edge_type)
1406            .map(|edge_ids| {
1407                edge_ids
1408                    .iter()
1409                    .filter_map(|&id| self.get_edge(id))
1410                    .collect()
1411            })
1412            .unwrap_or_default()
1413    }
1414
1415    /// Get total number of nodes
1416    pub fn node_count(&self) -> usize {
1417        self.nodes.iter().flatten().count()
1418    }
1419
1420    /// Get total number of edges
1421    pub fn edge_count(&self) -> usize {
1422        self.edges.iter().flatten().count()
1423    }
1424
1425    /// Get all nodes in the graph
1426    pub fn all_nodes(&self) -> Vec<&Node> {
1427        self.nodes.iter().flatten().collect()
1428    }
1429
1430    /// Get all edges in the graph
1431    pub fn all_edges(&self) -> Vec<&Edge> {
1432        self.edges.iter().flatten().collect()
1433    }
1434
1435    // ============================================================
1436    // Graph Statistics (for cost-based query optimization)
1437    // ============================================================
1438
1439    /// Compute statistics for the current graph state
1440    pub fn compute_statistics(&self) -> GraphStatistics {
1441        let total_nodes = self.node_count();
1442        let total_edges = self.edge_count();
1443
1444        let mut label_counts = HashMap::new();
1445        for (label, node_ids) in &self.label_index {
1446            label_counts.insert(label.clone(), node_ids.len());
1447        }
1448
1449        let mut edge_type_counts = HashMap::new();
1450        for (edge_type, edge_ids) in &self.edge_type_index {
1451            edge_type_counts.insert(edge_type.clone(), edge_ids.len());
1452        }
1453
1454        // Compute average degree
1455        let avg_out_degree = if total_nodes > 0 {
1456            total_edges as f64 / total_nodes as f64
1457        } else {
1458            0.0
1459        };
1460
1461        // Sample property selectivity for common properties
1462        let mut property_stats: HashMap<(Label, String), PropertyStats> = HashMap::new();
1463        for (label, node_ids) in &self.label_index {
1464            let sample_size = node_ids.len().min(1000);
1465            let mut property_presence: HashMap<String, usize> = HashMap::new();
1466            let mut property_distinct: HashMap<String, HashSet<u64>> = HashMap::new();
1467
1468            for (i, &node_id) in node_ids.iter().enumerate() {
1469                if i >= sample_size {
1470                    break;
1471                }
1472                if let Some(node) = self.get_node(node_id) {
1473                    for (key, val) in &node.properties {
1474                        *property_presence.entry(key.clone()).or_insert(0) += 1;
1475
1476                        let hash = {
1477                            use std::hash::{Hash, Hasher};
1478                            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1479                            val.hash(&mut hasher);
1480                            hasher.finish()
1481                        };
1482                        property_distinct
1483                            .entry(key.clone())
1484                            .or_default()
1485                            .insert(hash);
1486                    }
1487                }
1488            }
1489
1490            for (prop, count) in &property_presence {
1491                let distinct = property_distinct.get(prop).map(|s| s.len()).unwrap_or(0);
1492                let selectivity = if distinct > 0 {
1493                    1.0 / distinct as f64
1494                } else {
1495                    1.0
1496                };
1497                property_stats.insert(
1498                    (label.clone(), prop.clone()),
1499                    PropertyStats {
1500                        null_fraction: 1.0 - (*count as f64 / sample_size as f64),
1501                        distinct_count: distinct,
1502                        selectivity,
1503                    },
1504                );
1505            }
1506        }
1507
1508        GraphStatistics {
1509            total_nodes,
1510            total_edges,
1511            label_counts,
1512            edge_type_counts,
1513            avg_out_degree,
1514            property_stats,
1515        }
1516    }
1517
1518    /// Get the triple-level statistics catalog (for graph-native query planning)
1519    pub fn catalog(&self) -> &GraphCatalog {
1520        &self.catalog
1521    }
1522
1523    /// Get node count for a specific label (fast, O(1))
1524    pub fn label_node_count(&self, label: &Label) -> usize {
1525        self.label_index.get(label).map(|s| s.len()).unwrap_or(0)
1526    }
1527
1528    /// Get edge count for a specific type (fast, O(1))
1529    pub fn edge_type_count(&self, edge_type: &EdgeType) -> usize {
1530        self.edge_type_index
1531            .get(edge_type)
1532            .map(|s| s.len())
1533            .unwrap_or(0)
1534    }
1535
1536    /// Get all label names in the graph
1537    pub fn all_labels(&self) -> Vec<&Label> {
1538        self.label_index.keys().collect()
1539    }
1540
1541    /// Get all edge type names in the graph
1542    pub fn all_edge_types(&self) -> Vec<&EdgeType> {
1543        self.edge_type_index.keys().collect()
1544    }
1545
1546    /// Clear all data from the graph
1547    pub fn clear(&mut self) {
1548        self.nodes.clear();
1549        self.edges.clear();
1550        self.outgoing.clear();
1551        self.incoming.clear();
1552        self.free_node_ids.clear();
1553        self.free_edge_ids.clear();
1554        self.label_index.clear();
1555        self.edge_type_index.clear();
1556        self.vector_index = Arc::new(VectorIndexManager::new());
1557        self.property_index = Arc::new(IndexManager::new());
1558        self.node_columns = ColumnStore::new();
1559        self.edge_columns = ColumnStore::new();
1560        self.next_node_id = 1;
1561        self.next_edge_id = 1;
1562        self.catalog.clear();
1563    }
1564
1565    // ============================================================
1566    // Event Handling
1567    // ============================================================
1568
1569    pub fn handle_index_event(
1570        &self,
1571        event: crate::graph::event::IndexEvent,
1572        _tenant_manager: Option<Arc<crate::persistence::TenantManager>>,
1573    ) {
1574        use crate::graph::event::IndexEvent::*;
1575        match event {
1576            NodeCreated {
1577                tenant_id: _,
1578                id,
1579                labels,
1580                properties,
1581            } => {
1582                for (key, value) in properties {
1583                    if let PropertyValue::Vector(vec) = &value {
1584                        for label in &labels {
1585                            let _ = self.vector_index.add_vector(label.as_str(), &key, id, vec);
1586                        }
1587                    }
1588                    for label in &labels {
1589                        self.property_index
1590                            .index_insert(label, &key, value.clone(), id);
1591                    }
1592                }
1593            }
1594            NodeDeleted {
1595                tenant_id: _,
1596                id,
1597                labels,
1598                properties,
1599            } => {
1600                for (key, value) in properties {
1601                    for label in &labels {
1602                        self.property_index.index_remove(label, &key, &value, id);
1603                    }
1604                }
1605            }
1606            PropertySet {
1607                tenant_id: _,
1608                id,
1609                labels,
1610                key,
1611                old_value,
1612                new_value,
1613            } => {
1614                if let Some(old) = old_value {
1615                    for label in &labels {
1616                        self.property_index.index_remove(label, &key, &old, id);
1617                    }
1618                }
1619                for label in &labels {
1620                    self.property_index
1621                        .index_insert(label, &key, new_value.clone(), id);
1622                }
1623                if let PropertyValue::Vector(vec) = &new_value {
1624                    for label in &labels {
1625                        let _ = self.vector_index.add_vector(label.as_str(), &key, id, vec);
1626                    }
1627                }
1628            }
1629            LabelAdded {
1630                tenant_id: _,
1631                id,
1632                label,
1633                properties,
1634            } => {
1635                for (key, value) in properties {
1636                    if let PropertyValue::Vector(vec) = &value {
1637                        let _ = self.vector_index.add_vector(label.as_str(), &key, id, vec);
1638                    }
1639                    self.property_index
1640                        .index_insert(&label, &key, value.clone(), id);
1641                }
1642            }
1643        }
1644    }
1645
1646    // ============================================================
1647    // Vector Index methods
1648    // ============================================================
1649
1650    /// Create a vector index for a specific label and property
1651    pub fn create_vector_index(
1652        &self,
1653        label: &str,
1654        property_key: &str,
1655        dimensions: usize,
1656        metric: DistanceMetric,
1657    ) -> VectorResult<()> {
1658        self.vector_index
1659            .create_index(label, property_key, dimensions, metric)
1660    }
1661
1662    /// Search for nearest neighbors using a vector index
1663    pub fn vector_search(
1664        &self,
1665        label: &str,
1666        property_key: &str,
1667        query: &[f32],
1668        k: usize,
1669    ) -> VectorResult<Vec<(NodeId, f32)>> {
1670        self.vector_index.search(label, property_key, query, k)
1671    }
1672
1673    // ============================================================
1674    // Recovery methods - used to rebuild graph from persisted data
1675    // ============================================================
1676
1677    /// Insert a recovered node (used during recovery from persistence)
1678    /// Unlike create_node(), this preserves the node's existing ID
1679    pub fn insert_recovered_node(&mut self, node: Node) {
1680        let node_id = node.id;
1681        let idx = node_id.as_u64() as usize;
1682
1683        // Ensure storage capacity
1684        if idx >= self.nodes.len() {
1685            self.nodes.resize(idx + 1, Vec::new());
1686            self.outgoing.resize(idx + 1, Vec::new());
1687            self.incoming.resize(idx + 1, Vec::new());
1688        }
1689
1690        // Update label indices for all labels
1691        for label in &node.labels {
1692            self.label_index
1693                .entry(label.clone())
1694                .or_default()
1695                .insert(node_id);
1696        }
1697
1698        // Insert the node
1699        self.nodes[idx].push(node);
1700
1701        // Update next_node_id to be higher than any recovered node
1702        if node_id.as_u64() >= self.next_node_id {
1703            self.next_node_id = node_id.as_u64() + 1;
1704        }
1705    }
1706
1707    /// Insert a recovered edge (used during recovery from persistence)
1708    /// Unlike create_edge(), this preserves the edge's existing ID
1709    /// Note: Source and target nodes must already exist
1710    pub fn insert_recovered_edge(&mut self, edge: Edge) -> GraphResult<()> {
1711        let edge_id = edge.id;
1712        let idx = edge_id.as_u64() as usize;
1713        let source = edge.source;
1714        let target = edge.target;
1715
1716        // Ensure capacity
1717        if idx >= self.edges.len() {
1718            self.edges.resize(idx + 1, Vec::new());
1719        }
1720
1721        // Validate nodes exist
1722        if !self.has_node(source) {
1723            return Err(GraphError::InvalidEdgeSource(source));
1724        }
1725        if !self.has_node(target) {
1726            return Err(GraphError::InvalidEdgeTarget(target));
1727        }
1728
1729        // Update adjacency lists (sorted insert)
1730        {
1731            let out_list = &mut self.outgoing[source.as_u64() as usize];
1732            let pos = out_list
1733                .binary_search_by_key(&target, |(nid, _)| *nid)
1734                .unwrap_or_else(|p| p);
1735            out_list.insert(pos, (target, edge_id));
1736        }
1737        {
1738            let in_list = &mut self.incoming[target.as_u64() as usize];
1739            let pos = in_list
1740                .binary_search_by_key(&source, |(nid, _)| *nid)
1741                .unwrap_or_else(|p| p);
1742            in_list.insert(pos, (source, edge_id));
1743        }
1744
1745        // Update edge type index
1746        self.edge_type_index
1747            .entry(edge.edge_type.clone())
1748            .or_default()
1749            .insert(edge_id);
1750
1751        // Insert the edge
1752        self.edges[idx].push(edge);
1753
1754        // Update next_edge_id to be higher than any recovered edge
1755        if edge_id.as_u64() >= self.next_edge_id {
1756            self.next_edge_id = edge_id.as_u64() + 1;
1757        }
1758
1759        Ok(())
1760    }
1761}
1762
1763impl Default for GraphStore {
1764    fn default() -> Self {
1765        Self::new()
1766    }
1767}
1768
1769#[cfg(test)]
1770mod tests {
1771    use super::*;
1772
1773    #[test]
1774    fn test_create_and_get_node() {
1775        let mut store = GraphStore::new();
1776        let node_id = store.create_node("Person");
1777
1778        assert_eq!(store.node_count(), 1);
1779        let node = store.get_node(node_id).unwrap();
1780        assert_eq!(node.id, node_id);
1781        assert!(node.has_label(&Label::new("Person")));
1782    }
1783
1784    #[test]
1785    fn test_create_node_with_properties() {
1786        let mut store = GraphStore::new();
1787        let mut props = PropertyMap::new();
1788        props.insert("name".to_string(), "Alice".into());
1789        props.insert("age".to_string(), 30i64.into());
1790
1791        let node_id = store.create_node_with_properties(
1792            "default",
1793            vec![Label::new("Person"), Label::new("Employee")],
1794            props,
1795        );
1796
1797        let node = store.get_node(node_id).unwrap();
1798        assert_eq!(node.label_count(), 2);
1799        assert_eq!(
1800            node.get_property("name").unwrap().as_string(),
1801            Some("Alice")
1802        );
1803        assert_eq!(node.get_property("age").unwrap().as_integer(), Some(30));
1804    }
1805
1806    #[test]
1807    fn test_create_and_get_edge() {
1808        let mut store = GraphStore::new();
1809        let node1 = store.create_node("Person");
1810        let node2 = store.create_node("Person");
1811
1812        let edge_id = store.create_edge(node1, node2, "KNOWS").unwrap();
1813
1814        assert_eq!(store.edge_count(), 1);
1815        let edge = store.get_edge(edge_id).unwrap();
1816        assert_eq!(edge.source, node1);
1817        assert_eq!(edge.target, node2);
1818        assert_eq!(edge.edge_type, EdgeType::new("KNOWS"));
1819    }
1820
1821    #[test]
1822    fn test_edge_validation() {
1823        let mut store = GraphStore::new();
1824        let node1 = store.create_node("Person");
1825        let invalid_node = NodeId::new(999);
1826
1827        // Invalid source node
1828        let result = store.create_edge(invalid_node, node1, "KNOWS");
1829        assert_eq!(result, Err(GraphError::InvalidEdgeSource(invalid_node)));
1830
1831        // Invalid target node
1832        let result = store.create_edge(node1, invalid_node, "KNOWS");
1833        assert_eq!(result, Err(GraphError::InvalidEdgeTarget(invalid_node)));
1834    }
1835
1836    #[test]
1837    fn test_adjacency_lists() {
1838        let mut store = GraphStore::new();
1839        let node1 = store.create_node("Person");
1840        let node2 = store.create_node("Person");
1841        let node3 = store.create_node("Person");
1842
1843        store.create_edge(node1, node2, "KNOWS").unwrap();
1844        store.create_edge(node1, node3, "KNOWS").unwrap();
1845        store.create_edge(node2, node3, "FOLLOWS").unwrap();
1846
1847        // Node1 has 2 outgoing edges
1848        let outgoing = store.get_outgoing_edges(node1);
1849        assert_eq!(outgoing.len(), 2);
1850
1851        // Node2 has 1 outgoing, 1 incoming
1852        let outgoing = store.get_outgoing_edges(node2);
1853        assert_eq!(outgoing.len(), 1);
1854        let incoming = store.get_incoming_edges(node2);
1855        assert_eq!(incoming.len(), 1);
1856
1857        // Node3 has 0 outgoing, 2 incoming
1858        let outgoing = store.get_outgoing_edges(node3);
1859        assert_eq!(outgoing.len(), 0);
1860        let incoming = store.get_incoming_edges(node3);
1861        assert_eq!(incoming.len(), 2);
1862    }
1863
1864    #[test]
1865    fn test_label_index() {
1866        let mut store = GraphStore::new();
1867        store.create_node("Person");
1868        store.create_node("Person");
1869        store.create_node("Company");
1870
1871        let persons = store.get_nodes_by_label(&Label::new("Person"));
1872        assert_eq!(persons.len(), 2);
1873
1874        let companies = store.get_nodes_by_label(&Label::new("Company"));
1875        assert_eq!(companies.len(), 1);
1876    }
1877
1878    #[test]
1879    fn test_edge_type_index() {
1880        let mut store = GraphStore::new();
1881        let n1 = store.create_node("Person");
1882        let n2 = store.create_node("Person");
1883        let n3 = store.create_node("Person");
1884
1885        store.create_edge(n1, n2, "KNOWS").unwrap();
1886        store.create_edge(n2, n3, "KNOWS").unwrap();
1887        store.create_edge(n1, n3, "FOLLOWS").unwrap();
1888
1889        let knows_edges = store.get_edges_by_type(&EdgeType::new("KNOWS"));
1890        assert_eq!(knows_edges.len(), 2);
1891
1892        let follows_edges = store.get_edges_by_type(&EdgeType::new("FOLLOWS"));
1893        assert_eq!(follows_edges.len(), 1);
1894    }
1895
1896    #[test]
1897    fn test_delete_node() {
1898        let mut store = GraphStore::new();
1899        let node1 = store.create_node("Person");
1900        let node2 = store.create_node("Person");
1901        store.create_edge(node1, node2, "KNOWS").unwrap();
1902
1903        assert_eq!(store.node_count(), 2);
1904        assert_eq!(store.edge_count(), 1);
1905
1906        // Delete node1 (should also delete connected edge)
1907        let deleted = store.delete_node("default", node1);
1908        assert!(deleted.is_ok());
1909        assert_eq!(store.node_count(), 1);
1910        assert_eq!(store.edge_count(), 0);
1911    }
1912
1913    #[test]
1914    fn test_delete_edge() {
1915        let mut store = GraphStore::new();
1916        let node1 = store.create_node("Person");
1917        let node2 = store.create_node("Person");
1918        let edge_id = store.create_edge(node1, node2, "KNOWS").unwrap();
1919
1920        assert_eq!(store.edge_count(), 1);
1921
1922        let deleted = store.delete_edge(edge_id);
1923        assert!(deleted.is_ok());
1924        assert_eq!(store.edge_count(), 0);
1925
1926        // Edge removed from adjacency lists
1927        assert_eq!(store.get_outgoing_edges(node1).len(), 0);
1928        assert_eq!(store.get_incoming_edges(node2).len(), 0);
1929    }
1930
1931    #[test]
1932    fn test_multiple_edges_between_nodes() {
1933        // REQ-GRAPH-008: Multiple edges between same nodes
1934        let mut store = GraphStore::new();
1935        let node1 = store.create_node("Person");
1936        let node2 = store.create_node("Person");
1937
1938        let edge1 = store.create_edge(node1, node2, "KNOWS").unwrap();
1939        let edge2 = store.create_edge(node1, node2, "WORKS_WITH").unwrap();
1940        let edge3 = store.create_edge(node1, node2, "KNOWS").unwrap();
1941
1942        assert_eq!(store.edge_count(), 3);
1943        assert_ne!(edge1, edge2);
1944        assert_ne!(edge1, edge3);
1945
1946        let outgoing = store.get_outgoing_edges(node1);
1947        assert_eq!(outgoing.len(), 3);
1948    }
1949
1950    #[test]
1951    fn test_clear() {
1952        let mut store = GraphStore::new();
1953        store.create_node("Person");
1954        store.create_node("Person");
1955
1956        assert_eq!(store.node_count(), 2);
1957
1958        store.clear();
1959        assert_eq!(store.node_count(), 0);
1960        assert_eq!(store.edge_count(), 0);
1961    }
1962
1963    #[test]
1964    fn test_add_label_to_node() {
1965        let mut store = GraphStore::new();
1966        let node_id = store.create_node("Person");
1967
1968        // Initially only "Person" label is indexed
1969        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 1);
1970        assert_eq!(store.get_nodes_by_label(&Label::new("Employee")).len(), 0);
1971
1972        // Add "Employee" label using the proper method
1973        store
1974            .add_label_to_node("default", node_id, "Employee")
1975            .unwrap();
1976
1977        // Now both labels should be indexed and queryable
1978        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 1);
1979        assert_eq!(store.get_nodes_by_label(&Label::new("Employee")).len(), 1);
1980
1981        // Verify the node actually has both labels
1982        let node = store.get_node(node_id).unwrap();
1983        assert!(node.has_label(&Label::new("Person")));
1984        assert!(node.has_label(&Label::new("Employee")));
1985    }
1986
1987    #[test]
1988    fn test_add_label_to_nonexistent_node() {
1989        let mut store = GraphStore::new();
1990        let invalid_id = NodeId::new(999);
1991
1992        let result = store.add_label_to_node("default", invalid_id, "Employee");
1993        assert_eq!(result, Err(GraphError::NodeNotFound(invalid_id)));
1994    }
1995
1996    #[test]
1997    fn test_mvcc_node_versioning() {
1998        let mut store = GraphStore::new();
1999
2000        // Version 1: Create node
2001        let node_id = store.create_node("Person");
2002        store
2003            .set_node_property("default", node_id, "name", "Alice")
2004            .unwrap();
2005
2006        // Check version 1
2007        let v1_node = store.get_node_at_version(node_id, 1).unwrap();
2008        assert_eq!(v1_node.version, 1);
2009        assert_eq!(
2010            v1_node.get_property("name").unwrap().as_string(),
2011            Some("Alice")
2012        );
2013
2014        // Version 2: Update property (creates new version)
2015        store.current_version = 2;
2016        store
2017            .set_node_property("default", node_id, "name", "Alice Cooper")
2018            .unwrap();
2019
2020        // Check version 2
2021        let v2_node = store.get_node_at_version(node_id, 2).unwrap();
2022        assert_eq!(v2_node.version, 2);
2023        assert_eq!(
2024            v2_node.get_property("name").unwrap().as_string(),
2025            Some("Alice Cooper")
2026        );
2027
2028        // Historical read (Version 1 should still be Alice)
2029        let v1_lookup = store.get_node_at_version(node_id, 1).unwrap();
2030        assert_eq!(v1_lookup.version, 1);
2031        assert_eq!(
2032            v1_lookup.get_property("name").unwrap().as_string(),
2033            Some("Alice")
2034        );
2035
2036        let node = store.get_node(node_id).unwrap();
2037        assert_eq!(node.version, 2); // Should be latest version
2038    }
2039
2040    #[test]
2041    fn test_arena_resize() {
2042        let mut store = GraphStore::new();
2043        // Default capacity is 1024. Let's force a resize.
2044        // We can't easily peek capacity, but we can add > 1024 nodes.
2045
2046        for _ in 0..1100 {
2047            store.create_node("Item");
2048        }
2049
2050        assert_eq!(store.node_count(), 1100);
2051        let last_id = NodeId::new(1100);
2052        assert!(store.has_node(last_id));
2053    }
2054
2055    #[test]
2056    fn test_id_reuse() {
2057        let mut store = GraphStore::new();
2058        let n1 = store.create_node("A");
2059        let _n2 = store.create_node("B");
2060
2061        store.delete_node("default", n1).unwrap();
2062
2063        // Next creation should reuse n1's ID (which is 1)
2064        // n2 is 2.
2065        let n3 = store.create_node("C");
2066
2067        assert_eq!(n3, n1); // ID reuse
2068        assert_eq!(store.node_count(), 2); // B and C
2069    }
2070
2071    // ========== Batch 5: Additional Store Tests ==========
2072
2073    #[test]
2074    fn test_get_node() {
2075        let mut store = GraphStore::new();
2076        let id = store.create_node("Person");
2077        let node = store.get_node(id);
2078        assert!(node.is_some());
2079        assert!(node.unwrap().labels.contains(&Label::new("Person")));
2080
2081        // Non-existent node
2082        assert!(store.get_node(NodeId::new(9999)).is_none());
2083    }
2084
2085    #[test]
2086    fn test_get_node_mut() {
2087        let mut store = GraphStore::new();
2088        let id = store.create_node("Person");
2089        {
2090            let node = store.get_node_mut(id).unwrap();
2091            node.set_property(
2092                "name".to_string(),
2093                PropertyValue::String("Alice".to_string()),
2094            );
2095        }
2096        let node = store.get_node(id).unwrap();
2097        assert_eq!(
2098            node.get_property("name"),
2099            Some(&PropertyValue::String("Alice".to_string()))
2100        );
2101
2102        // Non-existent node
2103        assert!(store.get_node_mut(NodeId::new(9999)).is_none());
2104    }
2105
2106    #[test]
2107    fn test_has_node() {
2108        let mut store = GraphStore::new();
2109        let id = store.create_node("A");
2110        assert!(store.has_node(id));
2111        store.delete_node("default", id).unwrap();
2112        assert!(!store.has_node(id));
2113    }
2114
2115    #[test]
2116    fn test_set_node_property() {
2117        let mut store = GraphStore::new();
2118        let id = store.create_node("Person");
2119        store
2120            .set_node_property("default", id, "age", PropertyValue::Integer(30))
2121            .unwrap();
2122        let node = store.get_node(id).unwrap();
2123        assert_eq!(node.get_property("age"), Some(&PropertyValue::Integer(30)));
2124
2125        // Update existing property
2126        store
2127            .set_node_property("default", id, "age", PropertyValue::Integer(31))
2128            .unwrap();
2129        let node = store.get_node(id).unwrap();
2130        assert_eq!(node.get_property("age"), Some(&PropertyValue::Integer(31)));
2131
2132        // Non-existent node
2133        let result =
2134            store.set_node_property("default", NodeId::new(9999), "x", PropertyValue::Null);
2135        assert!(result.is_err());
2136    }
2137
2138    #[test]
2139    fn test_create_edge_with_properties() {
2140        let mut store = GraphStore::new();
2141        let a = store.create_node("Person");
2142        let b = store.create_node("Person");
2143
2144        let mut props = std::collections::HashMap::new();
2145        props.insert("since".to_string(), PropertyValue::Integer(2020));
2146        props.insert("weight".to_string(), PropertyValue::Float(0.8));
2147
2148        let eid = store
2149            .create_edge_with_properties(a, b, "KNOWS", props)
2150            .unwrap();
2151        let edge = store.get_edge(eid).unwrap();
2152        assert_eq!(edge.source, a);
2153        assert_eq!(edge.target, b);
2154        assert_eq!(
2155            edge.get_property("since"),
2156            Some(&PropertyValue::Integer(2020))
2157        );
2158        assert_eq!(
2159            edge.get_property("weight"),
2160            Some(&PropertyValue::Float(0.8))
2161        );
2162    }
2163
2164    #[test]
2165    fn test_create_edge_with_properties_invalid_nodes() {
2166        let mut store = GraphStore::new();
2167        let a = store.create_node("A");
2168        let props = std::collections::HashMap::new();
2169
2170        // Invalid target
2171        let result = store.create_edge_with_properties(a, NodeId::new(9999), "E", props.clone());
2172        assert!(result.is_err());
2173
2174        // Invalid source
2175        let result = store.create_edge_with_properties(NodeId::new(9999), a, "E", props);
2176        assert!(result.is_err());
2177    }
2178
2179    #[test]
2180    fn test_get_edge_and_has_edge() {
2181        let mut store = GraphStore::new();
2182        let a = store.create_node("A");
2183        let b = store.create_node("B");
2184        let eid = store.create_edge(a, b, "LINKS").unwrap();
2185
2186        assert!(store.has_edge(eid));
2187        let edge = store.get_edge(eid).unwrap();
2188        assert_eq!(edge.source, a);
2189        assert_eq!(edge.target, b);
2190
2191        // Non-existent
2192        assert!(!store.has_edge(EdgeId::new(9999)));
2193        assert!(store.get_edge(EdgeId::new(9999)).is_none());
2194    }
2195
2196    #[test]
2197    fn test_get_edge_mut() {
2198        let mut store = GraphStore::new();
2199        let a = store.create_node("A");
2200        let b = store.create_node("B");
2201        let eid = store.create_edge(a, b, "LINKS").unwrap();
2202
2203        {
2204            let edge = store.get_edge_mut(eid).unwrap();
2205            edge.set_property("weight".to_string(), PropertyValue::Float(1.5));
2206        }
2207        let edge = store.get_edge(eid).unwrap();
2208        assert_eq!(
2209            edge.get_property("weight"),
2210            Some(&PropertyValue::Float(1.5))
2211        );
2212
2213        assert!(store.get_edge_mut(EdgeId::new(9999)).is_none());
2214    }
2215
2216    #[test]
2217    fn test_get_outgoing_edge_targets() {
2218        let mut store = GraphStore::new();
2219        let a = store.create_node("A");
2220        let b = store.create_node("B");
2221        let c = store.create_node("C");
2222        let e1 = store.create_edge(a, b, "KNOWS").unwrap();
2223        let e2 = store.create_edge(a, c, "LIKES").unwrap();
2224
2225        let targets = store.get_outgoing_edge_targets(a);
2226        assert_eq!(targets.len(), 2);
2227        // Each tuple is (EdgeId, source, target, &EdgeType)
2228        let edge_ids: Vec<EdgeId> = targets.iter().map(|t| t.0).collect();
2229        assert!(edge_ids.contains(&e1));
2230        assert!(edge_ids.contains(&e2));
2231
2232        // Node with no outgoing edges
2233        let targets = store.get_outgoing_edge_targets(b);
2234        assert!(targets.is_empty());
2235    }
2236
2237    #[test]
2238    fn test_get_incoming_edge_sources() {
2239        let mut store = GraphStore::new();
2240        let a = store.create_node("A");
2241        let b = store.create_node("B");
2242        let c = store.create_node("C");
2243        store.create_edge(a, c, "KNOWS").unwrap();
2244        store.create_edge(b, c, "LIKES").unwrap();
2245
2246        let sources = store.get_incoming_edge_sources(c);
2247        assert_eq!(sources.len(), 2);
2248        let src_nodes: Vec<NodeId> = sources.iter().map(|t| t.1).collect();
2249        assert!(src_nodes.contains(&a));
2250        assert!(src_nodes.contains(&b));
2251
2252        // Node with no incoming edges
2253        let sources = store.get_incoming_edge_sources(a);
2254        assert!(sources.is_empty());
2255    }
2256
2257    #[test]
2258    fn test_all_nodes() {
2259        let mut store = GraphStore::new();
2260        assert!(store.all_nodes().is_empty());
2261
2262        store.create_node("A");
2263        store.create_node("B");
2264        store.create_node("C");
2265        assert_eq!(store.all_nodes().len(), 3);
2266    }
2267
2268    #[test]
2269    fn test_all_edges() {
2270        let mut store = GraphStore::new();
2271        assert!(store.all_edges().is_empty());
2272
2273        let a = store.create_node("A");
2274        let b = store.create_node("B");
2275        let c = store.create_node("C");
2276        store.create_edge(a, b, "R1").unwrap();
2277        store.create_edge(b, c, "R2").unwrap();
2278        assert_eq!(store.all_edges().len(), 2);
2279    }
2280
2281    #[test]
2282    fn test_compute_statistics() {
2283        let mut store = GraphStore::new();
2284        let a = store.create_node("Person");
2285        let b = store.create_node("Person");
2286        let c = store.create_node("Company");
2287        store.get_node_mut(a).unwrap().set_property(
2288            "name".to_string(),
2289            PropertyValue::String("Alice".to_string()),
2290        );
2291        store
2292            .get_node_mut(b)
2293            .unwrap()
2294            .set_property("name".to_string(), PropertyValue::String("Bob".to_string()));
2295        store.get_node_mut(c).unwrap().set_property(
2296            "name".to_string(),
2297            PropertyValue::String("Acme".to_string()),
2298        );
2299        store.create_edge(a, b, "KNOWS").unwrap();
2300        store.create_edge(a, c, "WORKS_AT").unwrap();
2301
2302        let stats = store.compute_statistics();
2303        assert_eq!(stats.total_nodes, 3);
2304        assert_eq!(stats.total_edges, 2);
2305        assert_eq!(*stats.label_counts.get(&Label::new("Person")).unwrap(), 2);
2306        assert_eq!(*stats.label_counts.get(&Label::new("Company")).unwrap(), 1);
2307        assert_eq!(
2308            *stats.edge_type_counts.get(&EdgeType::new("KNOWS")).unwrap(),
2309            1
2310        );
2311        assert_eq!(
2312            *stats
2313                .edge_type_counts
2314                .get(&EdgeType::new("WORKS_AT"))
2315                .unwrap(),
2316            1
2317        );
2318        assert!(stats.avg_out_degree > 0.0);
2319        // Property stats should exist for Person.name
2320        let person_name_stats = stats
2321            .property_stats
2322            .get(&(Label::new("Person"), "name".to_string()));
2323        assert!(person_name_stats.is_some());
2324        let ps = person_name_stats.unwrap();
2325        assert_eq!(ps.null_fraction, 0.0); // All Person nodes have "name"
2326        assert_eq!(ps.distinct_count, 2); // Alice, Bob
2327    }
2328
2329    #[test]
2330    fn test_compute_statistics_empty_graph() {
2331        let store = GraphStore::new();
2332        let stats = store.compute_statistics();
2333        assert_eq!(stats.total_nodes, 0);
2334        assert_eq!(stats.total_edges, 0);
2335        assert_eq!(stats.avg_out_degree, 0.0);
2336        assert!(stats.label_counts.is_empty());
2337        assert!(stats.edge_type_counts.is_empty());
2338    }
2339
2340    #[test]
2341    fn test_label_node_count() {
2342        let mut store = GraphStore::new();
2343        store.create_node("Person");
2344        store.create_node("Person");
2345        store.create_node("Company");
2346
2347        assert_eq!(store.label_node_count(&Label::new("Person")), 2);
2348        assert_eq!(store.label_node_count(&Label::new("Company")), 1);
2349        assert_eq!(store.label_node_count(&Label::new("NotExist")), 0);
2350    }
2351
2352    #[test]
2353    fn test_edge_type_count() {
2354        let mut store = GraphStore::new();
2355        let a = store.create_node("A");
2356        let b = store.create_node("B");
2357        let c = store.create_node("C");
2358        store.create_edge(a, b, "KNOWS").unwrap();
2359        store.create_edge(a, c, "KNOWS").unwrap();
2360        store.create_edge(b, c, "LIKES").unwrap();
2361
2362        assert_eq!(store.edge_type_count(&EdgeType::new("KNOWS")), 2);
2363        assert_eq!(store.edge_type_count(&EdgeType::new("LIKES")), 1);
2364        assert_eq!(store.edge_type_count(&EdgeType::new("NOPE")), 0);
2365    }
2366
2367    #[test]
2368    fn test_all_labels() {
2369        let mut store = GraphStore::new();
2370        store.create_node("Person");
2371        store.create_node("Company");
2372        store.create_node("Person"); // duplicate label
2373
2374        let labels = store.all_labels();
2375        assert_eq!(labels.len(), 2);
2376        let label_strs: Vec<&str> = labels.iter().map(|l| l.as_str()).collect();
2377        assert!(label_strs.contains(&"Person"));
2378        assert!(label_strs.contains(&"Company"));
2379    }
2380
2381    #[test]
2382    fn test_all_edge_types() {
2383        let mut store = GraphStore::new();
2384        let a = store.create_node("A");
2385        let b = store.create_node("B");
2386        let c = store.create_node("C");
2387        store.create_edge(a, b, "KNOWS").unwrap();
2388        store.create_edge(b, c, "LIKES").unwrap();
2389
2390        let types = store.all_edge_types();
2391        assert_eq!(types.len(), 2);
2392        let type_strs: Vec<&str> = types.iter().map(|t| t.as_str()).collect();
2393        assert!(type_strs.contains(&"KNOWS"));
2394        assert!(type_strs.contains(&"LIKES"));
2395    }
2396
2397    #[test]
2398    fn test_get_node_at_version() {
2399        let mut store = GraphStore::new();
2400        let id = store.create_node("Person");
2401        let v0 = store.get_node(id).unwrap().version;
2402
2403        // Node at its creation version should exist
2404        let node = store.get_node_at_version(id, v0);
2405        assert!(node.is_some());
2406        assert!(node.unwrap().labels.contains(&Label::new("Person")));
2407
2408        // Node at a version before creation should not exist
2409        // (only if v0 > 0, otherwise any version >= 0 finds it)
2410        if v0 > 0 {
2411            assert!(store.get_node_at_version(id, v0 - 1).is_none());
2412        }
2413
2414        // Non-existent node
2415        assert!(store.get_node_at_version(NodeId::new(9999), 0).is_none());
2416    }
2417
2418    #[test]
2419    fn test_get_edge_at_version() {
2420        let mut store = GraphStore::new();
2421        let a = store.create_node("A");
2422        let b = store.create_node("B");
2423        let eid = store.create_edge(a, b, "KNOWS").unwrap();
2424        let v0 = store.get_edge(eid).unwrap().version;
2425
2426        // Edge at version 0 should exist
2427        assert!(store.get_edge_at_version(eid, v0).is_some());
2428
2429        // Non-existent edge
2430        assert!(store.get_edge_at_version(EdgeId::new(9999), 0).is_none());
2431    }
2432
2433    #[test]
2434    fn test_create_vector_index() {
2435        let store = GraphStore::new();
2436        let result = store.create_vector_index(
2437            "Person",
2438            "embedding",
2439            128,
2440            crate::vector::DistanceMetric::Cosine,
2441        );
2442        assert!(result.is_ok());
2443
2444        // Creating a second index with different label should also succeed
2445        let result2 =
2446            store.create_vector_index("Document", "vec", 256, crate::vector::DistanceMetric::L2);
2447        assert!(result2.is_ok());
2448    }
2449
2450    #[test]
2451    fn test_set_node_property_updates_in_place() {
2452        let mut store = GraphStore::new();
2453        let id = store.create_node("Person");
2454        store
2455            .set_node_property(
2456                "default",
2457                id,
2458                "name",
2459                PropertyValue::String("Alice".to_string()),
2460            )
2461            .unwrap();
2462
2463        // Same version update — in-place
2464        store
2465            .set_node_property(
2466                "default",
2467                id,
2468                "name",
2469                PropertyValue::String("Bob".to_string()),
2470            )
2471            .unwrap();
2472        let node = store.get_node(id).unwrap();
2473        assert_eq!(
2474            node.get_property("name"),
2475            Some(&PropertyValue::String("Bob".to_string()))
2476        );
2477
2478        // Add another property
2479        store
2480            .set_node_property("default", id, "age", PropertyValue::Integer(30))
2481            .unwrap();
2482        let node = store.get_node(id).unwrap();
2483        assert_eq!(node.get_property("age"), Some(&PropertyValue::Integer(30)));
2484        assert_eq!(
2485            node.get_property("name"),
2486            Some(&PropertyValue::String("Bob".to_string()))
2487        );
2488    }
2489
2490    // ========== Coverage Enhancement Tests ==========
2491
2492    #[test]
2493    fn test_graph_error_display_node_not_found() {
2494        let err = GraphError::NodeNotFound(NodeId::new(42));
2495        assert_eq!(format!("{}", err), "Node NodeId(42) not found");
2496    }
2497
2498    #[test]
2499    fn test_graph_error_display_edge_not_found() {
2500        let err = GraphError::EdgeNotFound(EdgeId::new(7));
2501        assert_eq!(format!("{}", err), "Edge EdgeId(7) not found");
2502    }
2503
2504    #[test]
2505    fn test_graph_error_display_node_already_exists() {
2506        let err = GraphError::NodeAlreadyExists(NodeId::new(1));
2507        assert_eq!(format!("{}", err), "Node NodeId(1) already exists");
2508    }
2509
2510    #[test]
2511    fn test_graph_error_display_edge_already_exists() {
2512        let err = GraphError::EdgeAlreadyExists(EdgeId::new(3));
2513        assert_eq!(format!("{}", err), "Edge EdgeId(3) already exists");
2514    }
2515
2516    #[test]
2517    fn test_graph_error_display_invalid_edge_source() {
2518        let err = GraphError::InvalidEdgeSource(NodeId::new(99));
2519        assert_eq!(
2520            format!("{}", err),
2521            "Invalid edge: source node NodeId(99) does not exist"
2522        );
2523    }
2524
2525    #[test]
2526    fn test_graph_error_display_invalid_edge_target() {
2527        let err = GraphError::InvalidEdgeTarget(NodeId::new(88));
2528        assert_eq!(
2529            format!("{}", err),
2530            "Invalid edge: target node NodeId(88) does not exist"
2531        );
2532    }
2533
2534    #[test]
2535    fn test_graph_error_equality() {
2536        assert_eq!(
2537            GraphError::NodeNotFound(NodeId::new(1)),
2538            GraphError::NodeNotFound(NodeId::new(1))
2539        );
2540        assert_ne!(
2541            GraphError::NodeNotFound(NodeId::new(1)),
2542            GraphError::NodeNotFound(NodeId::new(2))
2543        );
2544        assert_ne!(
2545            GraphError::NodeNotFound(NodeId::new(1)),
2546            GraphError::EdgeNotFound(EdgeId::new(1))
2547        );
2548    }
2549
2550    #[test]
2551    fn test_graph_statistics_estimate_label_scan() {
2552        let mut store = GraphStore::new();
2553        for _ in 0..50 {
2554            store.create_node("Person");
2555        }
2556        for _ in 0..20 {
2557            store.create_node("Company");
2558        }
2559        let stats = store.compute_statistics();
2560        assert_eq!(stats.estimate_label_scan(&Label::new("Person")), 50);
2561        assert_eq!(stats.estimate_label_scan(&Label::new("Company")), 20);
2562        // Unknown label falls back to total_nodes (all-node scan estimate)
2563        assert_eq!(stats.estimate_label_scan(&Label::new("Unknown")), 70);
2564    }
2565
2566    #[test]
2567    fn test_graph_statistics_estimate_expand() {
2568        let mut store = GraphStore::new();
2569        let a = store.create_node("A");
2570        let b = store.create_node("B");
2571        let c = store.create_node("C");
2572        store.create_edge(a, b, "KNOWS").unwrap();
2573        store.create_edge(a, c, "KNOWS").unwrap();
2574        store.create_edge(b, c, "LIKES").unwrap();
2575
2576        let stats = store.compute_statistics();
2577        assert_eq!(
2578            stats.estimate_expand(Some(&EdgeType::new("KNOWS"))) as usize,
2579            2
2580        );
2581        assert_eq!(
2582            stats.estimate_expand(Some(&EdgeType::new("LIKES"))) as usize,
2583            1
2584        );
2585        assert_eq!(
2586            stats.estimate_expand(Some(&EdgeType::new("NOPE"))) as usize,
2587            0
2588        );
2589        // None means all edges
2590        assert_eq!(stats.estimate_expand(None) as usize, 3);
2591    }
2592
2593    #[test]
2594    fn test_graph_statistics_estimate_equality_selectivity() {
2595        let mut store = GraphStore::new();
2596        for i in 0..10 {
2597            let id = store.create_node("Person");
2598            store.get_node_mut(id).unwrap().set_property(
2599                "city".to_string(),
2600                PropertyValue::String(format!("City{}", i % 5)),
2601            );
2602        }
2603        let stats = store.compute_statistics();
2604        // 5 distinct cities among 10 Person nodes => selectivity = 1/5 = 0.2
2605        let sel = stats.estimate_equality_selectivity(&Label::new("Person"), "city");
2606        assert!((sel - 0.2).abs() < 0.01);
2607        // Unknown property should return default 0.1
2608        let default_sel =
2609            stats.estimate_equality_selectivity(&Label::new("Person"), "unknown_prop");
2610        assert!((default_sel - 0.1).abs() < 0.01);
2611        // Unknown label should return default 0.1
2612        let default_sel2 = stats.estimate_equality_selectivity(&Label::new("NotExist"), "city");
2613        assert!((default_sel2 - 0.1).abs() < 0.01);
2614    }
2615
2616    #[test]
2617    fn test_graph_statistics_format() {
2618        let mut store = GraphStore::new();
2619        let a = store.create_node("Person");
2620        let b = store.create_node("Company");
2621        store.create_edge(a, b, "WORKS_AT").unwrap();
2622
2623        let stats = store.compute_statistics();
2624        let formatted = stats.format();
2625        assert!(formatted.contains("Graph Statistics:"));
2626        assert!(formatted.contains("Total nodes: 2"));
2627        assert!(formatted.contains("Total edges: 1"));
2628        assert!(formatted.contains("Avg out-degree:"));
2629        assert!(formatted.contains(":Person"));
2630        assert!(formatted.contains(":Company"));
2631        assert!(formatted.contains(":WORKS_AT"));
2632    }
2633
2634    #[test]
2635    fn test_graph_statistics_property_null_fraction() {
2636        let mut store = GraphStore::new();
2637        // Create 4 Person nodes, only 2 have the "email" property
2638        for i in 0..4 {
2639            let id = store.create_node("Person");
2640            store.get_node_mut(id).unwrap().set_property(
2641                "name".to_string(),
2642                PropertyValue::String(format!("Person{}", i)),
2643            );
2644            if i < 2 {
2645                store.get_node_mut(id).unwrap().set_property(
2646                    "email".to_string(),
2647                    PropertyValue::String(format!("person{}@example.com", i)),
2648                );
2649            }
2650        }
2651        let stats = store.compute_statistics();
2652        // name should have null_fraction = 0.0 (all 4 have it)
2653        let name_stats = stats
2654            .property_stats
2655            .get(&(Label::new("Person"), "name".to_string()))
2656            .unwrap();
2657        assert_eq!(name_stats.null_fraction, 0.0);
2658        // email should have null_fraction = 0.5 (2 out of 4 have it)
2659        let email_stats = stats
2660            .property_stats
2661            .get(&(Label::new("Person"), "email".to_string()))
2662            .unwrap();
2663        assert!((email_stats.null_fraction - 0.5).abs() < 0.01);
2664    }
2665
2666    #[test]
2667    fn test_vector_search_with_data() {
2668        let store = GraphStore::new();
2669        // Create a 4-dimensional index
2670        store
2671            .create_vector_index(
2672                "Document",
2673                "embedding",
2674                4,
2675                crate::vector::DistanceMetric::Cosine,
2676            )
2677            .unwrap();
2678
2679        // Add some vectors
2680        let n1 = NodeId::new(1);
2681        let n2 = NodeId::new(2);
2682        let n3 = NodeId::new(3);
2683        let v1 = vec![1.0, 0.0, 0.0, 0.0];
2684        let v2 = vec![0.0, 1.0, 0.0, 0.0];
2685        let v3 = vec![0.9, 0.1, 0.0, 0.0]; // close to v1
2686
2687        store
2688            .vector_index
2689            .add_vector("Document", "embedding", n1, &v1)
2690            .unwrap();
2691        store
2692            .vector_index
2693            .add_vector("Document", "embedding", n2, &v2)
2694            .unwrap();
2695        store
2696            .vector_index
2697            .add_vector("Document", "embedding", n3, &v3)
2698            .unwrap();
2699
2700        // Search for vectors similar to v1
2701        let results = store
2702            .vector_search("Document", "embedding", &[1.0, 0.0, 0.0, 0.0], 2)
2703            .unwrap();
2704        assert_eq!(results.len(), 2);
2705        // n1 should be the closest (exact match)
2706        assert_eq!(results[0].0, n1);
2707    }
2708
2709    #[test]
2710    fn test_vector_search_nonexistent_index() {
2711        let store = GraphStore::new();
2712        // Search on a non-existent index should return empty results
2713        let results = store
2714            .vector_search("NoLabel", "noprop", &[1.0, 2.0], 5)
2715            .unwrap();
2716        assert!(results.is_empty());
2717    }
2718
2719    #[test]
2720    fn test_clear_thorough() {
2721        let mut store = GraphStore::new();
2722        let a = store.create_node("Person");
2723        let b = store.create_node("Company");
2724        store
2725            .set_node_property(
2726                "default",
2727                a,
2728                "name",
2729                PropertyValue::String("Alice".to_string()),
2730            )
2731            .unwrap();
2732        let eid = store.create_edge(a, b, "WORKS_AT").unwrap();
2733
2734        // Verify state before clear
2735        assert_eq!(store.node_count(), 2);
2736        assert_eq!(store.edge_count(), 1);
2737        assert_eq!(store.all_labels().len(), 2);
2738        assert_eq!(store.all_edge_types().len(), 1);
2739        assert!(store.has_node(a));
2740        assert!(store.has_edge(eid));
2741
2742        store.clear();
2743
2744        // Verify everything is cleaned up
2745        assert_eq!(store.node_count(), 0);
2746        assert_eq!(store.edge_count(), 0);
2747        assert!(store.all_labels().is_empty());
2748        assert!(store.all_edge_types().is_empty());
2749        assert!(!store.has_node(a));
2750        assert!(!store.has_edge(eid));
2751        assert!(store.get_nodes_by_label(&Label::new("Person")).is_empty());
2752        assert!(store
2753            .get_edges_by_type(&EdgeType::new("WORKS_AT"))
2754            .is_empty());
2755
2756        // After clear, creating new nodes should start from ID 1 again
2757        let new_node = store.create_node("NewLabel");
2758        assert_eq!(new_node, NodeId::new(1));
2759    }
2760
2761    #[test]
2762    fn test_delete_edge_verifies_edge_type_index_cleanup() {
2763        let mut store = GraphStore::new();
2764        let a = store.create_node("A");
2765        let b = store.create_node("B");
2766        let c = store.create_node("C");
2767
2768        let e1 = store.create_edge(a, b, "KNOWS").unwrap();
2769        let e2 = store.create_edge(a, c, "KNOWS").unwrap();
2770        assert_eq!(store.get_edges_by_type(&EdgeType::new("KNOWS")).len(), 2);
2771
2772        // Delete one edge
2773        store.delete_edge(e1).unwrap();
2774        assert_eq!(store.get_edges_by_type(&EdgeType::new("KNOWS")).len(), 1);
2775
2776        // Delete the other
2777        store.delete_edge(e2).unwrap();
2778        assert_eq!(store.get_edges_by_type(&EdgeType::new("KNOWS")).len(), 0);
2779    }
2780
2781    #[test]
2782    fn test_delete_edge_nonexistent() {
2783        let mut store = GraphStore::new();
2784        let result = store.delete_edge(EdgeId::new(999));
2785        assert_eq!(result, Err(GraphError::EdgeNotFound(EdgeId::new(999))));
2786    }
2787
2788    #[test]
2789    fn test_delete_node_nonexistent() {
2790        let mut store = GraphStore::new();
2791        let result = store.delete_node("default", NodeId::new(999));
2792        assert_eq!(result, Err(GraphError::NodeNotFound(NodeId::new(999))));
2793    }
2794
2795    #[test]
2796    fn test_delete_node_removes_from_label_index() {
2797        let mut store = GraphStore::new();
2798        let a = store.create_node("Person");
2799        let _b = store.create_node("Person");
2800        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 2);
2801
2802        store.delete_node("default", a).unwrap();
2803        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 1);
2804    }
2805
2806    #[test]
2807    fn test_delete_node_cascades_edges() {
2808        let mut store = GraphStore::new();
2809        let a = store.create_node("A");
2810        let b = store.create_node("B");
2811        let c = store.create_node("C");
2812        store.create_edge(a, b, "E1").unwrap();
2813        store.create_edge(c, a, "E2").unwrap();
2814
2815        assert_eq!(store.edge_count(), 2);
2816        store.delete_node("default", a).unwrap();
2817        assert_eq!(store.edge_count(), 0);
2818        // b and c should still exist
2819        assert!(store.has_node(b));
2820        assert!(store.has_node(c));
2821    }
2822
2823    #[test]
2824    fn test_edge_id_reuse() {
2825        let mut store = GraphStore::new();
2826        let a = store.create_node("A");
2827        let b = store.create_node("B");
2828        let c = store.create_node("C");
2829
2830        let e1 = store.create_edge(a, b, "X").unwrap();
2831        store.delete_edge(e1).unwrap();
2832
2833        // Next edge should reuse e1's ID
2834        let e2 = store.create_edge(a, c, "Y").unwrap();
2835        assert_eq!(e2, e1);
2836    }
2837
2838    #[test]
2839    fn test_insert_recovered_node() {
2840        let mut store = GraphStore::new();
2841        let node = Node::new(NodeId::new(10), Label::new("Recovered"));
2842
2843        store.insert_recovered_node(node);
2844        assert!(store.has_node(NodeId::new(10)));
2845        assert_eq!(store.get_nodes_by_label(&Label::new("Recovered")).len(), 1);
2846
2847        // Next node created should have ID > 10
2848        let new_id = store.create_node("New");
2849        assert!(new_id.as_u64() > 10);
2850    }
2851
2852    #[test]
2853    fn test_insert_recovered_edge() {
2854        let mut store = GraphStore::new();
2855        let a = store.create_node("A");
2856        let b = store.create_node("B");
2857
2858        let edge = Edge::new(EdgeId::new(50), a, b, EdgeType::new("RECOVERED"));
2859        store.insert_recovered_edge(edge).unwrap();
2860
2861        assert!(store.has_edge(EdgeId::new(50)));
2862        assert_eq!(store.get_outgoing_edges(a).len(), 1);
2863        assert_eq!(store.get_incoming_edges(b).len(), 1);
2864        assert_eq!(
2865            store.get_edges_by_type(&EdgeType::new("RECOVERED")).len(),
2866            1
2867        );
2868
2869        // Next edge should have ID > 50
2870        let new_eid = store.create_edge(a, b, "NEW").unwrap();
2871        assert!(new_eid.as_u64() > 50);
2872    }
2873
2874    #[test]
2875    fn test_insert_recovered_edge_invalid_source() {
2876        let mut store = GraphStore::new();
2877        let b = store.create_node("B");
2878        let edge = Edge::new(EdgeId::new(1), NodeId::new(999), b, EdgeType::new("E"));
2879        let result = store.insert_recovered_edge(edge);
2880        assert_eq!(result, Err(GraphError::InvalidEdgeSource(NodeId::new(999))));
2881    }
2882
2883    #[test]
2884    fn test_insert_recovered_edge_invalid_target() {
2885        let mut store = GraphStore::new();
2886        let a = store.create_node("A");
2887        let edge = Edge::new(EdgeId::new(1), a, NodeId::new(999), EdgeType::new("E"));
2888        let result = store.insert_recovered_edge(edge);
2889        assert_eq!(result, Err(GraphError::InvalidEdgeTarget(NodeId::new(999))));
2890    }
2891
2892    #[test]
2893    fn test_default_impl() {
2894        let store = GraphStore::default();
2895        assert_eq!(store.node_count(), 0);
2896        assert_eq!(store.edge_count(), 0);
2897    }
2898
2899    #[test]
2900    fn test_mvcc_cow_versioning() {
2901        let mut store = GraphStore::new();
2902        let id = store.create_node("Person");
2903        store
2904            .set_node_property(
2905                "default",
2906                id,
2907                "name",
2908                PropertyValue::String("Alice".to_string()),
2909            )
2910            .unwrap();
2911
2912        // Bump version to trigger COW
2913        store.current_version = 2;
2914        store
2915            .set_node_property(
2916                "default",
2917                id,
2918                "name",
2919                PropertyValue::String("Bob".to_string()),
2920            )
2921            .unwrap();
2922
2923        // Latest version should be Bob
2924        let latest = store.get_node(id).unwrap();
2925        assert_eq!(
2926            latest.get_property("name"),
2927            Some(&PropertyValue::String("Bob".to_string()))
2928        );
2929        assert_eq!(latest.version, 2);
2930
2931        // Version 1 should still be Alice
2932        let v1 = store.get_node_at_version(id, 1).unwrap();
2933        assert_eq!(
2934            v1.get_property("name"),
2935            Some(&PropertyValue::String("Alice".to_string()))
2936        );
2937        assert_eq!(v1.version, 1);
2938    }
2939
2940    #[test]
2941    fn test_get_outgoing_edge_targets_detail() {
2942        let mut store = GraphStore::new();
2943        let a = store.create_node("A");
2944        let b = store.create_node("B");
2945        let c = store.create_node("C");
2946        let e1 = store.create_edge(a, b, "KNOWS").unwrap();
2947        let e2 = store.create_edge(a, c, "LIKES").unwrap();
2948
2949        let targets = store.get_outgoing_edge_targets(a);
2950        assert_eq!(targets.len(), 2);
2951
2952        // Check that tuples contain correct (edge_id, source, target, edge_type)
2953        for (eid, src, tgt, etype) in &targets {
2954            assert_eq!(*src, a);
2955            if *eid == e1 {
2956                assert_eq!(*tgt, b);
2957                assert_eq!(etype.as_str(), "KNOWS");
2958            } else if *eid == e2 {
2959                assert_eq!(*tgt, c);
2960                assert_eq!(etype.as_str(), "LIKES");
2961            } else {
2962                panic!("Unexpected edge ID");
2963            }
2964        }
2965
2966        // Non-existent node returns empty
2967        let empty = store.get_outgoing_edge_targets(NodeId::new(9999));
2968        assert!(empty.is_empty());
2969    }
2970
2971    #[test]
2972    fn test_get_incoming_edge_sources_detail() {
2973        let mut store = GraphStore::new();
2974        let a = store.create_node("A");
2975        let b = store.create_node("B");
2976        let c = store.create_node("C");
2977        let e1 = store.create_edge(a, c, "KNOWS").unwrap();
2978        let e2 = store.create_edge(b, c, "LIKES").unwrap();
2979
2980        let sources = store.get_incoming_edge_sources(c);
2981        assert_eq!(sources.len(), 2);
2982
2983        for (eid, src, tgt, etype) in &sources {
2984            assert_eq!(*tgt, c);
2985            if *eid == e1 {
2986                assert_eq!(*src, a);
2987                assert_eq!(etype.as_str(), "KNOWS");
2988            } else if *eid == e2 {
2989                assert_eq!(*src, b);
2990                assert_eq!(etype.as_str(), "LIKES");
2991            } else {
2992                panic!("Unexpected edge ID");
2993            }
2994        }
2995
2996        // Non-existent node returns empty
2997        let empty = store.get_incoming_edge_sources(NodeId::new(9999));
2998        assert!(empty.is_empty());
2999    }
3000
3001    #[test]
3002    fn test_all_labels_empty() {
3003        let store = GraphStore::new();
3004        assert!(store.all_labels().is_empty());
3005    }
3006
3007    #[test]
3008    fn test_all_edge_types_empty() {
3009        let store = GraphStore::new();
3010        assert!(store.all_edge_types().is_empty());
3011    }
3012
3013    #[test]
3014    fn test_columnar_storage_integration() {
3015        let mut store = GraphStore::new();
3016        let id = store.create_node_with_properties("default", vec![Label::new("Person")], {
3017            let mut props = PropertyMap::new();
3018            props.insert(
3019                "name".to_string(),
3020                PropertyValue::String("Alice".to_string()),
3021            );
3022            props.insert("age".to_string(), PropertyValue::Integer(30));
3023            props
3024        });
3025
3026        // Verify columnar storage has the values
3027        let idx = id.as_u64() as usize;
3028        let name_col = store.node_columns.get_property(idx, "name");
3029        assert_eq!(name_col, PropertyValue::String("Alice".to_string()));
3030        let age_col = store.node_columns.get_property(idx, "age");
3031        assert_eq!(age_col, PropertyValue::Integer(30));
3032    }
3033
3034    #[test]
3035    fn test_edge_columnar_storage_integration() {
3036        let mut store = GraphStore::new();
3037        let a = store.create_node("A");
3038        let b = store.create_node("B");
3039        let mut props = std::collections::HashMap::new();
3040        props.insert("weight".to_string(), PropertyValue::Float(0.75));
3041        let eid = store
3042            .create_edge_with_properties(a, b, "WEIGHTED", props)
3043            .unwrap();
3044
3045        let idx = eid.as_u64() as usize;
3046        let weight_col = store.edge_columns.get_property(idx, "weight");
3047        assert_eq!(weight_col, PropertyValue::Float(0.75));
3048    }
3049
3050    #[test]
3051    fn test_set_node_property_updates_columnar_storage() {
3052        let mut store = GraphStore::new();
3053        let id = store.create_node("Person");
3054        store
3055            .set_node_property("default", id, "score", PropertyValue::Float(1.0))
3056            .unwrap();
3057
3058        let idx = id.as_u64() as usize;
3059        assert_eq!(
3060            store.node_columns.get_property(idx, "score"),
3061            PropertyValue::Float(1.0)
3062        );
3063
3064        // Update
3065        store
3066            .set_node_property("default", id, "score", PropertyValue::Float(2.5))
3067            .unwrap();
3068        assert_eq!(
3069            store.node_columns.get_property(idx, "score"),
3070            PropertyValue::Float(2.5)
3071        );
3072    }
3073
3074    #[test]
3075    fn test_get_nodes_by_label_after_deletions() {
3076        let mut store = GraphStore::new();
3077        let a = store.create_node("Person");
3078        let b_id = store.create_node("Person");
3079        let c = store.create_node("Person");
3080
3081        store.delete_node("default", b_id).unwrap();
3082        let persons = store.get_nodes_by_label(&Label::new("Person"));
3083        assert_eq!(persons.len(), 2);
3084        let ids: Vec<NodeId> = persons.iter().map(|n| n.id).collect();
3085        assert!(ids.contains(&a));
3086        assert!(ids.contains(&c));
3087        assert!(!ids.contains(&b_id));
3088    }
3089
3090    #[test]
3091    fn test_get_edges_by_type_after_deletion() {
3092        let mut store = GraphStore::new();
3093        let a = store.create_node("A");
3094        let b = store.create_node("B");
3095        let c = store.create_node("C");
3096        let e1 = store.create_edge(a, b, "KNOWS").unwrap();
3097        let e2 = store.create_edge(b, c, "KNOWS").unwrap();
3098
3099        store.delete_edge(e1).unwrap();
3100        let knows_edges = store.get_edges_by_type(&EdgeType::new("KNOWS"));
3101        assert_eq!(knows_edges.len(), 1);
3102        assert_eq!(knows_edges[0].id, e2);
3103    }
3104
3105    #[test]
3106    fn test_all_nodes_after_operations() {
3107        let mut store = GraphStore::new();
3108        let a = store.create_node("A");
3109        store.create_node("B");
3110        store.create_node("C");
3111        store.delete_node("default", a).unwrap();
3112
3113        let all = store.all_nodes();
3114        assert_eq!(all.len(), 2);
3115    }
3116
3117    #[test]
3118    fn test_compute_statistics_with_large_sample() {
3119        let mut store = GraphStore::new();
3120        // Create more than 1000 nodes to test sample limiting
3121        for i in 0..1100 {
3122            let id = store.create_node("BigLabel");
3123            store
3124                .get_node_mut(id)
3125                .unwrap()
3126                .set_property("idx".to_string(), PropertyValue::Integer(i));
3127        }
3128        let stats = store.compute_statistics();
3129        assert_eq!(stats.total_nodes, 1100);
3130        // Property stats should exist (sampled from first 1000)
3131        let idx_stats = stats
3132            .property_stats
3133            .get(&(Label::new("BigLabel"), "idx".to_string()));
3134        assert!(idx_stats.is_some());
3135        let ps = idx_stats.unwrap();
3136        // All sampled nodes have the property => null_fraction = 0
3137        assert_eq!(ps.null_fraction, 0.0);
3138        // distinct_count is from the sample (first 1000 nodes), each has unique value
3139        assert_eq!(ps.distinct_count, 1000);
3140    }
3141
3142    #[test]
3143    fn test_add_label_then_label_count() {
3144        let mut store = GraphStore::new();
3145        let id = store.create_node("Person");
3146        store.add_label_to_node("default", id, "Employee").unwrap();
3147
3148        assert_eq!(store.label_node_count(&Label::new("Person")), 1);
3149        assert_eq!(store.label_node_count(&Label::new("Employee")), 1);
3150    }
3151
3152    #[test]
3153    fn test_insert_recovered_node_with_multiple_labels() {
3154        let mut store = GraphStore::new();
3155        let mut node = Node::new(NodeId::new(5), Label::new("Person"));
3156        node.add_label(Label::new("Employee"));
3157
3158        store.insert_recovered_node(node);
3159        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 1);
3160        assert_eq!(store.get_nodes_by_label(&Label::new("Employee")).len(), 1);
3161    }
3162
3163    #[test]
3164    fn test_graph_statistics_avg_out_degree() {
3165        let mut store = GraphStore::new();
3166        let a = store.create_node("A");
3167        let b = store.create_node("B");
3168        let c = store.create_node("C");
3169        let d = store.create_node("D");
3170        // a -> b, a -> c, a -> d (3 edges, 4 nodes)
3171        store.create_edge(a, b, "E").unwrap();
3172        store.create_edge(a, c, "E").unwrap();
3173        store.create_edge(a, d, "E").unwrap();
3174
3175        let stats = store.compute_statistics();
3176        assert!((stats.avg_out_degree - 0.75).abs() < 0.01); // 3/4 = 0.75
3177    }
3178
3179    #[test]
3180    fn test_self_loop_edge() {
3181        let mut store = GraphStore::new();
3182        let a = store.create_node("A");
3183        let eid = store.create_edge(a, a, "SELF").unwrap();
3184
3185        assert_eq!(store.get_outgoing_edges(a).len(), 1);
3186        assert_eq!(store.get_incoming_edges(a).len(), 1);
3187
3188        store.delete_edge(eid).unwrap();
3189        assert_eq!(store.get_outgoing_edges(a).len(), 0);
3190        assert_eq!(store.get_incoming_edges(a).len(), 0);
3191    }
3192
3193    #[test]
3194    fn test_get_outgoing_edges_nonexistent_node() {
3195        let store = GraphStore::new();
3196        let edges = store.get_outgoing_edges(NodeId::new(999));
3197        assert!(edges.is_empty());
3198    }
3199
3200    #[test]
3201    fn test_get_incoming_edges_nonexistent_node() {
3202        let store = GraphStore::new();
3203        let edges = store.get_incoming_edges(NodeId::new(999));
3204        assert!(edges.is_empty());
3205    }
3206
3207    #[test]
3208    fn test_get_nodes_by_label_nonexistent() {
3209        let store = GraphStore::new();
3210        let nodes = store.get_nodes_by_label(&Label::new("NoSuch"));
3211        assert!(nodes.is_empty());
3212    }
3213
3214    #[test]
3215    fn test_get_edges_by_type_nonexistent() {
3216        let store = GraphStore::new();
3217        let edges = store.get_edges_by_type(&EdgeType::new("NoSuch"));
3218        assert!(edges.is_empty());
3219    }
3220
3221    // ---- edge_between / edges_between tests (TDD) ----
3222
3223    #[test]
3224    fn test_edge_between_exists() {
3225        let mut store = GraphStore::new();
3226        let n1 = store.create_node("Person");
3227        let n2 = store.create_node("Person");
3228        let eid = store.create_edge(n1, n2, "KNOWS").unwrap();
3229
3230        assert_eq!(
3231            store.edge_between(n1, n2, Some(&EdgeType::new("KNOWS"))),
3232            Some(eid)
3233        );
3234    }
3235
3236    #[test]
3237    fn test_edge_between_not_exists() {
3238        let mut store = GraphStore::new();
3239        let n1 = store.create_node("Person");
3240        let n2 = store.create_node("Person");
3241
3242        assert_eq!(
3243            store.edge_between(n1, n2, Some(&EdgeType::new("KNOWS"))),
3244            None
3245        );
3246    }
3247
3248    #[test]
3249    fn test_edge_between_wrong_type() {
3250        let mut store = GraphStore::new();
3251        let n1 = store.create_node("Person");
3252        let n2 = store.create_node("Person");
3253        store.create_edge(n1, n2, "KNOWS").unwrap();
3254
3255        assert_eq!(
3256            store.edge_between(n1, n2, Some(&EdgeType::new("FOLLOWS"))),
3257            None
3258        );
3259    }
3260
3261    #[test]
3262    fn test_edge_between_any_type() {
3263        let mut store = GraphStore::new();
3264        let n1 = store.create_node("Person");
3265        let n2 = store.create_node("Person");
3266        let eid = store.create_edge(n1, n2, "KNOWS").unwrap();
3267
3268        // None edge_type means any type
3269        assert_eq!(store.edge_between(n1, n2, None), Some(eid));
3270    }
3271
3272    #[test]
3273    fn test_edge_between_reverse_direction() {
3274        let mut store = GraphStore::new();
3275        let n1 = store.create_node("Person");
3276        let n2 = store.create_node("Person");
3277        store.create_edge(n1, n2, "KNOWS").unwrap();
3278
3279        // Reverse direction should NOT find the edge
3280        assert_eq!(
3281            store.edge_between(n2, n1, Some(&EdgeType::new("KNOWS"))),
3282            None
3283        );
3284    }
3285
3286    #[test]
3287    fn test_edges_between_multi() {
3288        let mut store = GraphStore::new();
3289        let n1 = store.create_node("Person");
3290        let n2 = store.create_node("Person");
3291        let eid1 = store.create_edge(n1, n2, "KNOWS").unwrap();
3292        let eid2 = store.create_edge(n1, n2, "FOLLOWS").unwrap();
3293        let eid3 = store.create_edge(n1, n2, "KNOWS").unwrap();
3294
3295        // All edges between n1 and n2 (any type)
3296        let all = store.edges_between(n1, n2, None);
3297        assert_eq!(all.len(), 3);
3298
3299        // Only KNOWS edges
3300        let knows = store.edges_between(n1, n2, Some(&EdgeType::new("KNOWS")));
3301        assert_eq!(knows.len(), 2);
3302        assert!(knows.contains(&eid1));
3303        assert!(knows.contains(&eid3));
3304
3305        // Only FOLLOWS edges
3306        let follows = store.edges_between(n1, n2, Some(&EdgeType::new("FOLLOWS")));
3307        assert_eq!(follows.len(), 1);
3308        assert!(follows.contains(&eid2));
3309    }
3310
3311    // ---- Sorted adjacency list tests (Phase 1B TDD) ----
3312
3313    #[test]
3314    fn test_sorted_adjacency_insert_order() {
3315        let mut store = GraphStore::new();
3316        let n1 = store.create_node("Person");
3317        let n3 = store.create_node("Person");
3318        let n2 = store.create_node("Person");
3319
3320        // Insert edges to n3 first, then n2 — adjacency should still be sorted by target NodeId
3321        store.create_edge(n1, n3, "KNOWS").unwrap();
3322        store.create_edge(n1, n2, "KNOWS").unwrap();
3323
3324        // Outgoing from n1 should be sorted by target: n2 before n3
3325        let out = &store.outgoing[n1.as_u64() as usize];
3326        assert_eq!(out.len(), 2);
3327        assert!(
3328            out[0].0 <= out[1].0,
3329            "outgoing adjacency should be sorted by target NodeId"
3330        );
3331    }
3332
3333    #[test]
3334    fn test_sorted_adjacency_delete() {
3335        let mut store = GraphStore::new();
3336        let n1 = store.create_node("Person");
3337        let n2 = store.create_node("Person");
3338        let n3 = store.create_node("Person");
3339
3340        let e1 = store.create_edge(n1, n2, "KNOWS").unwrap();
3341        let _e2 = store.create_edge(n1, n3, "KNOWS").unwrap();
3342
3343        store.delete_edge(e1).unwrap();
3344
3345        let out = &store.outgoing[n1.as_u64() as usize];
3346        assert_eq!(out.len(), 1);
3347        assert_eq!(out[0].0, n3); // only n3 remains
3348    }
3349
3350    #[test]
3351    fn test_sorted_adjacency_multiple_edges_same_target() {
3352        let mut store = GraphStore::new();
3353        let n1 = store.create_node("Person");
3354        let n2 = store.create_node("Person");
3355
3356        let _e1 = store.create_edge(n1, n2, "KNOWS").unwrap();
3357        let _e2 = store.create_edge(n1, n2, "FOLLOWS").unwrap();
3358
3359        // Both edges should be to the same target
3360        let out = &store.outgoing[n1.as_u64() as usize];
3361        assert_eq!(out.len(), 2);
3362        assert_eq!(out[0].0, n2);
3363        assert_eq!(out[1].0, n2);
3364
3365        // edge_between with binary search should find the right one
3366        assert!(store
3367            .edge_between(n1, n2, Some(&EdgeType::new("KNOWS")))
3368            .is_some());
3369        assert!(store
3370            .edge_between(n1, n2, Some(&EdgeType::new("FOLLOWS")))
3371            .is_some());
3372    }
3373
3374    #[test]
3375    fn test_edge_between_binary_search_high_degree() {
3376        let mut store = GraphStore::new();
3377        let hub = store.create_node("Hub");
3378
3379        // Create 100 target nodes and edges from hub
3380        let mut targets = Vec::new();
3381        for _ in 0..100 {
3382            let t = store.create_node("Target");
3383            store.create_edge(hub, t, "LINKS").unwrap();
3384            targets.push(t);
3385        }
3386
3387        // Binary search should find any of them
3388        for &t in &targets {
3389            assert!(
3390                store
3391                    .edge_between(hub, t, Some(&EdgeType::new("LINKS")))
3392                    .is_some(),
3393                "edge_between should find edge to target {:?}",
3394                t
3395            );
3396        }
3397
3398        // Non-existent target
3399        let fake = NodeId::new(9999);
3400        assert!(store
3401            .edge_between(hub, fake, Some(&EdgeType::new("LINKS")))
3402            .is_none());
3403    }
3404}