Skip to main content

graphmind/graph/
store.rs

1//! # In-Memory Graph Storage -- Adjacency Lists, Indices, and Statistics
2//!
3//! [`GraphStore`] is the central data structure of the Graphmind graph engine. It
4//! holds all nodes, edges, indices, and metadata in memory, optimized for the
5//! access patterns of a graph query engine.
6//!
7//! ## Why adjacency lists, not adjacency matrices?
8//!
9//! An adjacency matrix for V vertices requires O(V^2) space and O(V) time to
10//! find a node's neighbors. Real-world graphs are overwhelmingly **sparse** --
11//! a social network with 1 million users might have 100 million edges, but a
12//! matrix would need 10^12 cells. Adjacency lists use O(V + E) space and give
13//! O(degree) neighbor iteration -- a far better fit.
14//!
15//! ## Arena allocation with MVCC versioning
16//!
17//! Nodes and edges are stored in `Vec<Vec<T>>` structures (arena allocation).
18//! The outer `Vec` is indexed by entity ID (acting as a dense array), and the
19//! inner `Vec` holds successive **MVCC versions** of that entity. This layout
20//! gives O(1) lookup by ID (direct indexing, no hash computation), excellent
21//! cache locality for sequential scans, and natural support for snapshot reads
22//! at any historical version. This is similar to how PostgreSQL stores tuple
23//! versions in heap pages, but without the overhead of disk I/O.
24//!
25//! ## Sorted adjacency lists and `edge_between()`
26//!
27//! The `outgoing` and `incoming` adjacency lists store `Vec<(NodeId, EdgeId)>`
28//! tuples **sorted by NodeId**. This enables `edge_between(a, b)` to use
29//! **binary search** with O(log d) complexity (where d = degree of the node),
30//! which is critical for the `ExpandIntoOperator` that checks edge existence
31//! between two already-bound nodes in triangle-pattern queries.
32//!
33//! ## Secondary indices: label and edge-type
34//!
35//! `label_index: HashMap<Label, HashSet<NodeId>>` and
36//! `edge_type_index: HashMap<EdgeType, HashSet<EdgeId>>` act as secondary
37//! indices, analogous to B-tree indices in an RDBMS. When a Cypher query
38//! specifies `MATCH (n:Person)`, the engine looks up the `Person` entry in
39//! `label_index` and scans only matching nodes, avoiding a full table scan.
40//! We use `HashMap` (not `BTreeMap`) because we only need equality lookups
41//! on labels, not range scans.
42//!
43//! ## ColumnStore integration (late materialization)
44//!
45//! `node_columns` and `edge_columns` provide **columnar storage** for
46//! frequently accessed properties. In the traditional row store (`PropertyMap`
47//! on each node), reading one property from 1000 nodes touches 1000 scattered
48//! `HashMap`s. The columnar store groups all values of the same property
49//! contiguously, enabling vectorized reads and better CPU cache utilization.
50//! The query engine uses **late materialization**: scan operators produce
51//! lightweight `Value::NodeRef(id)` references instead of cloning full nodes,
52//! and properties are resolved on demand from the column store.
53//!
54//! ## GraphStatistics for cost-based optimization
55//!
56//! The query planner uses [`GraphStatistics`] to estimate the cardinality
57//! (number of rows) at each stage of a query plan, choosing the plan with
58//! the lowest estimated cost. See the struct documentation for details.
59//!
60//! ## Key Rust patterns
61//!
62//! - **`Vec<Vec<T>>` arena**: dense ID-indexed storage avoids HashMap overhead;
63//!   inner Vec holds MVCC versions.
64//! - **`HashMap` vs `BTreeMap`**: `HashMap` is used for indices because we need
65//!   O(1) point lookups (label equality), not ordered iteration. `BTreeMap`
66//!   would add an unnecessary O(log n) factor.
67//! - **`Arc`**: `vector_index` and `property_index` are wrapped in `Arc`
68//!   (atomic reference counting) for shared ownership across the query engine
69//!   and background index-maintenance tasks.
70//! - **`thiserror`**: the [`GraphError`] enum uses the `thiserror` crate's
71//!   derive macro to auto-generate `Display` and `Error` implementations,
72//!   reducing boilerplate for error types.
73//!
74//! ## Requirements coverage
75//!
76//! - REQ-GRAPH-001: Property graph data model
77//! - REQ-MEM-001: In-memory storage
78//! - REQ-MEM-003: Memory-optimized data structures
79
80use super::catalog::GraphCatalog;
81use super::edge::Edge;
82use super::node::Node;
83use super::property::{PropertyMap, PropertyValue};
84use super::types::{EdgeId, EdgeType, Label, NodeId};
85use crate::agent::{tools::WebSearchTool, AgentRuntime};
86use crate::graph::storage::ColumnStore;
87use crate::index::IndexManager;
88use crate::vector::{DistanceMetric, VectorIndexManager, VectorResult};
89use std::collections::{HashMap, HashSet};
90use std::sync::Arc;
91use thiserror::Error;
92use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
93
94// Add chrono dependency (local hack like in node.rs)
95mod chrono {
96    pub struct Utc;
97    impl Utc {
98        pub fn now() -> DateTime {
99            DateTime
100        }
101    }
102    pub struct DateTime;
103    impl DateTime {
104        pub fn timestamp_millis(&self) -> i64 {
105            // Use system time for now
106            std::time::SystemTime::now()
107                .duration_since(std::time::UNIX_EPOCH)
108                .unwrap()
109                .as_millis() as i64
110        }
111    }
112}
113
114/// Errors that can occur during graph operations
115#[derive(Error, Debug, PartialEq)]
116pub enum GraphError {
117    #[error("Node {0} not found")]
118    NodeNotFound(NodeId),
119
120    #[error("Edge {0} not found")]
121    EdgeNotFound(EdgeId),
122
123    #[error("Node {0} already exists")]
124    NodeAlreadyExists(NodeId),
125
126    #[error("Edge {0} already exists")]
127    EdgeAlreadyExists(EdgeId),
128
129    #[error("Invalid edge: source node {0} does not exist")]
130    InvalidEdgeSource(NodeId),
131
132    #[error("Invalid edge: target node {0} does not exist")]
133    InvalidEdgeTarget(NodeId),
134}
135
136pub type GraphResult<T> = Result<T, GraphError>;
137
138/// Statistics about graph contents for **cost-based query optimization**.
139///
140/// # What is cardinality estimation?
141///
142/// Every query plan is a tree of operators (scan, filter, join, sort, ...).
143/// The query planner must choose among many possible orderings of these
144/// operators. To compare plans, it estimates the **cardinality** (number of
145/// rows) flowing through each operator. For example, if `:Person` has 10,000
146/// nodes and `:Company` has 500, the planner should scan `:Company` first in
147/// a join -- fewer rows means less work at every subsequent stage.
148///
149/// # How selectivity works
150///
151/// **Selectivity** is the probability that a predicate evaluates to `true`
152/// for a randomly chosen row. A selectivity of 0.01 on a 10,000-row scan
153/// means the filter will pass approximately 100 rows. Selectivity is
154/// estimated as `1 / distinct_count` for equality predicates (assuming a
155/// uniform distribution). The `null_fraction` is subtracted before
156/// estimation because NULL values never match equality predicates.
157///
158/// # Sampling approach for property stats
159///
160/// Computing exact statistics for every property on every label would be
161/// expensive in a large graph. Instead, `compute_statistics()` samples the
162/// first 1,000 nodes per label and extrapolates `distinct_count`,
163/// `null_fraction`, and `selectivity`. This is similar to PostgreSQL's
164/// `ANALYZE` command, which samples a configurable fraction of each table.
165/// The trade-off is speed vs. accuracy -- sampling may miss rare values,
166/// but is sufficient for plan selection in practice.
167#[derive(Debug, Clone)]
168pub struct GraphStatistics {
169    /// Total number of nodes
170    pub total_nodes: usize,
171    /// Total number of edges
172    pub total_edges: usize,
173    /// Node count per label
174    pub label_counts: HashMap<Label, usize>,
175    /// Edge count per type
176    pub edge_type_counts: HashMap<EdgeType, usize>,
177    /// Average outgoing degree
178    pub avg_out_degree: f64,
179    /// Property statistics per (label, property_name)
180    pub property_stats: HashMap<(Label, String), PropertyStats>,
181}
182
183/// Statistics about a specific property
184#[derive(Debug, Clone)]
185pub struct PropertyStats {
186    /// Fraction of nodes with this label that have NULL for this property
187    pub null_fraction: f64,
188    /// Estimated number of distinct values
189    pub distinct_count: usize,
190    /// Selectivity: probability of matching a random value (1/distinct_count)
191    pub selectivity: f64,
192}
193
194impl GraphStatistics {
195    /// Estimate the number of rows from a label scan
196    pub fn estimate_label_scan(&self, label: &Label) -> usize {
197        self.label_counts
198            .get(label)
199            .copied()
200            .unwrap_or(self.total_nodes)
201    }
202
203    /// Estimate the number of rows after an expand (edge traversal)
204    pub fn estimate_expand(&self, edge_type: Option<&EdgeType>) -> f64 {
205        match edge_type {
206            Some(et) => self.edge_type_counts.get(et).copied().unwrap_or(0) as f64,
207            None => self.total_edges as f64,
208        }
209    }
210
211    /// Estimate selectivity of an equality filter on a property
212    pub fn estimate_equality_selectivity(&self, label: &Label, property: &str) -> f64 {
213        self.property_stats
214            .get(&(label.clone(), property.to_string()))
215            .map(|ps| ps.selectivity)
216            .unwrap_or(0.1) // Default 10% selectivity
217    }
218
219    /// Format statistics as human-readable text
220    pub fn format(&self) -> String {
221        let mut result = String::new();
222        result.push_str("Graph Statistics:\n");
223        result.push_str(&format!("  Total nodes: {}\n", self.total_nodes));
224        result.push_str(&format!("  Total edges: {}\n", self.total_edges));
225        result.push_str(&format!("  Avg out-degree: {:.2}\n", self.avg_out_degree));
226        result.push_str("  Labels:\n");
227        let mut labels: Vec<_> = self.label_counts.iter().collect();
228        labels.sort_by(|a, b| b.1.cmp(a.1));
229        for (label, count) in labels {
230            result.push_str(&format!("    :{} = {} nodes\n", label.as_str(), count));
231        }
232        result.push_str("  Edge types:\n");
233        let mut types: Vec<_> = self.edge_type_counts.iter().collect();
234        types.sort_by(|a, b| b.1.cmp(a.1));
235        for (etype, count) in types {
236            result.push_str(&format!("    :{} = {} edges\n", etype.as_str(), count));
237        }
238        result
239    }
240}
241
242/// In-memory graph storage engine.
243///
244/// `GraphStore` is the authoritative source of truth for all graph data.
245/// Its data structure layout is designed for the access patterns of a
246/// Cypher query engine:
247///
248/// | Field | Type | Purpose | Lookup cost |
249/// |---|---|---|---|
250/// | `nodes` | `Vec<Vec<Node>>` | Arena-allocated node versions, indexed by `NodeId` | O(1) |
251/// | `edges` | `Vec<Vec<Edge>>` | Arena-allocated edge versions, indexed by `EdgeId` | O(1) |
252/// | `outgoing` | `Vec<Vec<(NodeId, EdgeId)>>` | Sorted adjacency list per node (outgoing) | O(log d) binary search |
253/// | `incoming` | `Vec<Vec<(NodeId, EdgeId)>>` | Sorted adjacency list per node (incoming) | O(log d) binary search |
254/// | `label_index` | `HashMap<Label, HashSet<NodeId>>` | Secondary index: label -> node set | O(1) lookup, O(n) scan |
255/// | `edge_type_index` | `HashMap<EdgeType, HashSet<EdgeId>>` | Secondary index: type -> edge set | O(1) lookup, O(n) scan |
256/// | `node_columns` | `ColumnStore` | Columnar property storage for late materialization | O(1) per cell |
257/// | `catalog` | `GraphCatalog` | Triple-level statistics for graph-native planning (ADR-015) | O(1) |
258///
259/// The `free_node_ids` / `free_edge_ids` vectors enable **ID reuse** after
260/// deletions, avoiding unbounded growth of the arena vectors.
261///
262/// Thread safety: `GraphStore` is not `Sync` by itself. Concurrent access
263/// is managed by the server layer, which wraps it in `Arc<RwLock<GraphStore>>`
264/// for shared-nothing read parallelism with exclusive write access.
265#[derive(Debug)]
266pub struct GraphStore {
267    /// Node storage (Arena with versioning: NodeId -> [Versions])
268    nodes: Vec<Vec<Node>>,
269
270    /// Edge storage (Arena with versioning: EdgeId -> [Versions])
271    edges: Vec<Vec<Edge>>,
272
273    /// Outgoing edges for each node, sorted by target NodeId: (target_NodeId, EdgeId)
274    outgoing: Vec<Vec<(NodeId, EdgeId)>>,
275
276    /// Incoming edges for each node, sorted by source NodeId: (source_NodeId, EdgeId)
277    incoming: Vec<Vec<(NodeId, EdgeId)>>,
278
279    /// Current global version for MVCC
280    pub current_version: u64,
281
282    /// Free node IDs for reuse
283    free_node_ids: Vec<u64>,
284
285    /// Free edge IDs for reuse
286    free_edge_ids: Vec<u64>,
287
288    /// Label index for fast lookups
289    label_index: HashMap<Label, HashSet<NodeId>>,
290
291    /// Edge type index for fast lookups
292    edge_type_index: HashMap<EdgeType, HashSet<EdgeId>>,
293
294    /// Vector indices manager
295    pub vector_index: Arc<VectorIndexManager>,
296
297    /// Property indices manager
298    pub property_index: Arc<IndexManager>,
299
300    /// Columnar storage for node properties
301    pub node_columns: ColumnStore,
302
303    /// Columnar storage for edge properties
304    pub edge_columns: ColumnStore,
305
306    /// Async index event sender
307    pub index_sender: Option<UnboundedSender<crate::graph::event::IndexEvent>>,
308
309    /// Next node ID
310    next_node_id: u64,
311
312    /// Next edge ID
313    next_edge_id: u64,
314
315    /// Triple-level statistics catalog for graph-native query planning (ADR-015)
316    catalog: GraphCatalog,
317}
318
319impl GraphStore {
320    /// Create a new empty graph store
321    pub fn new() -> Self {
322        GraphStore {
323            nodes: Vec::with_capacity(1024),
324            edges: Vec::with_capacity(4096),
325            outgoing: Vec::with_capacity(1024),
326            incoming: Vec::with_capacity(1024),
327            current_version: 1,
328            free_node_ids: Vec::new(),
329            free_edge_ids: Vec::new(),
330            label_index: HashMap::new(),
331            edge_type_index: HashMap::new(),
332            vector_index: Arc::new(VectorIndexManager::new()),
333            property_index: Arc::new(IndexManager::new()),
334            node_columns: ColumnStore::new(),
335            edge_columns: ColumnStore::new(),
336            index_sender: None,
337            next_node_id: 1,
338            next_edge_id: 1,
339            catalog: GraphCatalog::new(),
340        }
341    }
342
343    /// Create a new GraphStore with async indexing enabled
344    pub fn with_async_indexing() -> (
345        Self,
346        tokio::sync::mpsc::UnboundedReceiver<crate::graph::event::IndexEvent>,
347    ) {
348        let (tx, rx) = unbounded_channel();
349        let mut store = Self::new();
350        store.index_sender = Some(tx);
351        (store, rx)
352    }
353
354    /// Start the background indexer loop
355    pub async fn start_background_indexer(
356        mut receiver: tokio::sync::mpsc::UnboundedReceiver<crate::graph::event::IndexEvent>,
357        vector_index: Arc<VectorIndexManager>,
358        property_index: Arc<IndexManager>,
359        tenant_manager: Arc<crate::persistence::TenantManager>,
360    ) {
361        use crate::graph::event::IndexEvent::*;
362
363        while let Some(event) = receiver.recv().await {
364            match event {
365                NodeCreated {
366                    tenant_id,
367                    id,
368                    labels,
369                    properties,
370                } => {
371                    for (key, value) in &properties {
372                        if let PropertyValue::Vector(vec) = value {
373                            for label in &labels {
374                                let _ = vector_index.add_vector(label.as_str(), key, id, vec);
375                            }
376                        }
377                        for label in &labels {
378                            property_index.index_insert(label, key, value.clone(), id);
379                        }
380
381                        // Auto-Embed check
382                        if let PropertyValue::String(text) = value {
383                            if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
384                                if let Some(config) = tenant.embed_config {
385                                    for label in &labels {
386                                        if let Some(keys) =
387                                            config.embedding_policies.get(label.as_str())
388                                        {
389                                            if keys.contains(key) {
390                                                // Trigger Auto-Embed
391                                                let vector_index_clone = Arc::clone(&vector_index);
392                                                let label_str = label.as_str().to_string();
393                                                let key_clone = key.clone();
394                                                let text_clone = text.clone();
395                                                let config_clone = config.clone();
396
397                                                tokio::spawn(async move {
398                                                    if let Ok(pipeline) =
399                                                        crate::embed::EmbedPipeline::new(
400                                                            config_clone,
401                                                        )
402                                                    {
403                                                        if let Ok(chunks) =
404                                                            pipeline.process_text(&text_clone).await
405                                                        {
406                                                            for chunk in chunks {
407                                                                let _ = vector_index_clone
408                                                                    .add_vector(
409                                                                        &label_str,
410                                                                        &key_clone,
411                                                                        id,
412                                                                        &chunk.embedding,
413                                                                    );
414                                                            }
415                                                        }
416                                                    }
417                                                });
418                                            }
419                                        }
420                                    }
421                                }
422                            }
423                        }
424                    }
425
426                    // Agentic Enrichment Trigger
427                    if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
428                        if let Some(agent_config) = tenant.agent_config {
429                            if agent_config.enabled {
430                                for label in &labels {
431                                    if let Some(trigger_prompt) =
432                                        agent_config.policies.get(label.as_str())
433                                    {
434                                        // Construct context from node properties
435                                        let context =
436                                            format!("Node: {} {:?}", label.as_str(), properties);
437                                        let task = trigger_prompt.clone();
438                                        let mut runtime = AgentRuntime::new(agent_config.clone());
439                                        let label_str = label.as_str().to_string();
440
441                                        // Register tools based on config/availability
442                                        if let Some(api_key) = &agent_config.api_key {
443                                            runtime.register_tool(Arc::new(WebSearchTool::new(
444                                                api_key.clone(),
445                                            )));
446                                        } else {
447                                            // Mock/Prototype mode
448                                            runtime.register_tool(Arc::new(WebSearchTool::new(
449                                                "mock".to_string(),
450                                            )));
451                                        }
452
453                                        tokio::spawn(async move {
454                                            if let Ok(result) =
455                                                runtime.process_trigger(&task, &context).await
456                                            {
457                                                println!(
458                                                    "Agent Action [{}] -> {}",
459                                                    label_str, result
460                                                );
461                                                // Future: Write result back to graph properties using a GraphStore client
462                                            }
463                                        });
464                                    }
465                                }
466                            }
467                        }
468                    }
469                }
470                NodeDeleted {
471                    tenant_id: _,
472                    id,
473                    labels,
474                    properties,
475                } => {
476                    for (key, value) in properties {
477                        for label in &labels {
478                            property_index.index_remove(label, &key, &value, id);
479                        }
480                    }
481                }
482                PropertySet {
483                    tenant_id,
484                    id,
485                    labels,
486                    key,
487                    old_value,
488                    new_value,
489                } => {
490                    if let Some(old) = old_value {
491                        for label in &labels {
492                            property_index.index_remove(label, &key, &old, id);
493                        }
494                    }
495                    for label in &labels {
496                        property_index.index_insert(label, &key, new_value.clone(), id);
497                    }
498                    if let PropertyValue::Vector(vec) = &new_value {
499                        for label in &labels {
500                            let _ = vector_index.add_vector(label.as_str(), &key, id, vec);
501                        }
502                    }
503
504                    // Auto-Embed check
505                    if let PropertyValue::String(text) = &new_value {
506                        if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
507                            if let Some(config) = tenant.embed_config {
508                                for label in &labels {
509                                    if let Some(keys) =
510                                        config.embedding_policies.get(label.as_str())
511                                    {
512                                        if keys.contains(&key) {
513                                            let vector_index_clone = Arc::clone(&vector_index);
514                                            let label_str = label.as_str().to_string();
515                                            let key_clone = key.clone();
516                                            let text_clone = text.clone();
517                                            let config_clone = config.clone();
518
519                                            tokio::spawn(async move {
520                                                if let Ok(pipeline) =
521                                                    crate::embed::EmbedPipeline::new(config_clone)
522                                                {
523                                                    if let Ok(chunks) =
524                                                        pipeline.process_text(&text_clone).await
525                                                    {
526                                                        if let Some(first) = chunks.first() {
527                                                            let _ = vector_index_clone.add_vector(
528                                                                &label_str,
529                                                                &key_clone,
530                                                                id,
531                                                                &first.embedding,
532                                                            );
533                                                        }
534                                                    }
535                                                }
536                                            });
537                                        }
538                                    }
539                                }
540                            }
541                        }
542                    }
543
544                    // Agentic Enrichment Trigger (PropertySet)
545                    if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
546                        if let Some(agent_config) = tenant.agent_config {
547                            if agent_config.enabled {
548                                for label in &labels {
549                                    if let Some(trigger_prompt) =
550                                        agent_config.policies.get(label.as_str())
551                                    {
552                                        let context = format!(
553                                            "Node: {} (Property Updated: {})",
554                                            label.as_str(),
555                                            key
556                                        );
557                                        let task = trigger_prompt.clone();
558                                        let mut runtime = AgentRuntime::new(agent_config.clone());
559                                        let label_str = label.as_str().to_string();
560
561                                        if let Some(api_key) = &agent_config.api_key {
562                                            runtime.register_tool(Arc::new(WebSearchTool::new(
563                                                api_key.clone(),
564                                            )));
565                                        } else {
566                                            runtime.register_tool(Arc::new(WebSearchTool::new(
567                                                "mock".to_string(),
568                                            )));
569                                        }
570
571                                        tokio::spawn(async move {
572                                            if let Ok(result) =
573                                                runtime.process_trigger(&task, &context).await
574                                            {
575                                                println!(
576                                                    "Agent Action [{}] -> {}",
577                                                    label_str, result
578                                                );
579                                            }
580                                        });
581                                    }
582                                }
583                            }
584                        }
585                    }
586                }
587                LabelAdded {
588                    tenant_id,
589                    id,
590                    label,
591                    properties,
592                } => {
593                    for (key, value) in properties {
594                        if let PropertyValue::Vector(vec) = &value {
595                            let _ = vector_index.add_vector(label.as_str(), &key, id, vec);
596                        }
597                        property_index.index_insert(&label, &key, value.clone(), id);
598
599                        // Auto-Embed check
600                        if let PropertyValue::String(text) = &value {
601                            if let Ok(tenant) = tenant_manager.get_tenant(&tenant_id) {
602                                if let Some(config) = tenant.embed_config {
603                                    if let Some(keys) =
604                                        config.embedding_policies.get(label.as_str())
605                                    {
606                                        if keys.contains(&key) {
607                                            let vector_index_clone = Arc::clone(&vector_index);
608                                            let label_str = label.as_str().to_string();
609                                            let key_clone = key.clone();
610                                            let text_clone = text.clone();
611                                            let config_clone = config.clone();
612
613                                            tokio::spawn(async move {
614                                                if let Ok(pipeline) =
615                                                    crate::embed::EmbedPipeline::new(config_clone)
616                                                {
617                                                    if let Ok(chunks) =
618                                                        pipeline.process_text(&text_clone).await
619                                                    {
620                                                        if let Some(first) = chunks.first() {
621                                                            let _ = vector_index_clone.add_vector(
622                                                                &label_str,
623                                                                &key_clone,
624                                                                id,
625                                                                &first.embedding,
626                                                            );
627                                                        }
628                                                    }
629                                                }
630                                            });
631                                        }
632                                    }
633                                }
634                            }
635                        }
636                    }
637                }
638            }
639        }
640    }
641
642    /// Create a node with auto-generated ID and single label
643    pub fn create_node(&mut self, label: impl Into<Label>) -> NodeId {
644        let node_id_u64 = if let Some(id) = self.free_node_ids.pop() {
645            id
646        } else {
647            let id = self.next_node_id;
648            self.next_node_id += 1;
649            id
650        };
651        let node_id = NodeId::new(node_id_u64);
652        let idx = node_id_u64 as usize;
653
654        let label = label.into();
655        let mut node = Node::new(node_id, label.clone());
656        node.version = self.current_version;
657
658        // Add to label index
659        self.label_index
660            .entry(label.clone())
661            .or_default()
662            .insert(node_id);
663
664        // Update catalog label count
665        self.catalog.on_label_added(&label);
666
667        // Ensure storage capacity
668        if idx >= self.nodes.len() {
669            self.nodes.resize(idx + 1, Vec::new());
670            self.outgoing.resize(idx + 1, Vec::new());
671            self.incoming.resize(idx + 1, Vec::new());
672        }
673
674        let event = crate::graph::event::IndexEvent::NodeCreated {
675            tenant_id: "default".to_string(),
676            id: node_id,
677            labels: node.labels.iter().cloned().collect(),
678            properties: node.properties.clone(),
679        };
680
681        if let Some(sender) = &self.index_sender {
682            let _ = sender.send(event);
683        } else {
684            self.handle_index_event(event, None);
685        }
686
687        self.nodes[idx].push(node);
688        node_id
689    }
690
691    /// Create a node with multiple labels and properties
692    pub fn create_node_with_properties(
693        &mut self,
694        tenant_id: &str,
695        labels: Vec<Label>,
696        properties: PropertyMap,
697    ) -> NodeId {
698        let node_id_u64 = if let Some(id) = self.free_node_ids.pop() {
699            id
700        } else {
701            let id = self.next_node_id;
702            self.next_node_id += 1;
703            id
704        };
705        let node_id = NodeId::new(node_id_u64);
706        let idx = node_id_u64 as usize;
707
708        // Populate columnar storage
709        for (key, value) in &properties {
710            self.node_columns.set_property(idx, key, value.clone());
711        }
712
713        let mut node = Node::new_with_properties(node_id, labels.clone(), properties);
714        node.version = self.current_version;
715
716        // Add to label indices
717        for label in &labels {
718            self.label_index
719                .entry(label.clone())
720                .or_default()
721                .insert(node_id);
722            // Update catalog label count
723            self.catalog.on_label_added(label);
724        }
725
726        // Ensure storage capacity
727        if idx >= self.nodes.len() {
728            self.nodes.resize(idx + 1, Vec::new());
729            self.outgoing.resize(idx + 1, Vec::new());
730            self.incoming.resize(idx + 1, Vec::new());
731        }
732
733        let event = crate::graph::event::IndexEvent::NodeCreated {
734            tenant_id: tenant_id.to_string(),
735            id: node_id,
736            labels: node.labels.iter().cloned().collect(),
737            properties: node.properties.clone(),
738        };
739
740        if let Some(sender) = &self.index_sender {
741            let _ = sender.send(event);
742        } else {
743            self.handle_index_event(event, None);
744        }
745
746        self.nodes[idx].push(node);
747        node_id
748    }
749
750    /// Get a node by ID at a specific version (MVCC)
751    pub fn get_node_at_version(&self, id: NodeId, version: u64) -> Option<&Node> {
752        let idx = id.as_u64() as usize;
753        let versions = self.nodes.get(idx)?;
754
755        // Find the latest version <= requested version
756        // Versions are sorted by creation time
757        versions.iter().rev().find(|n| n.version <= version)
758    }
759
760    /// Get a node by ID (uses current version)
761    pub fn get_node(&self, id: NodeId) -> Option<&Node> {
762        self.get_node_at_version(id, self.current_version)
763    }
764
765    /// Get a mutable node by ID (always latest version)
766    pub fn get_node_mut(&mut self, id: NodeId) -> Option<&mut Node> {
767        self.nodes
768            .get_mut(id.as_u64() as usize)
769            .and_then(|v| v.last_mut())
770    }
771
772    /// Check if a node exists
773    pub fn has_node(&self, id: NodeId) -> bool {
774        self.get_node(id).is_some()
775    }
776
777    /// Set a property on a node and update vector indices if necessary
778    pub fn set_node_property(
779        &mut self,
780        tenant_id: &str,
781        node_id: NodeId,
782        key: impl Into<String>,
783        value: impl Into<PropertyValue>,
784    ) -> GraphResult<()> {
785        let key_str = key.into();
786        let val = value.into();
787        let idx = node_id.as_u64() as usize;
788
789        // Update columnar storage (always latest)
790        self.node_columns.set_property(idx, &key_str, val.clone());
791
792        // Get access to versions
793        let versions = self
794            .nodes
795            .get_mut(idx)
796            .ok_or(GraphError::NodeNotFound(node_id))?;
797        let latest_node = versions.last().ok_or(GraphError::NodeNotFound(node_id))?;
798
799        let old_val;
800
801        if latest_node.version < self.current_version {
802            // COW: Create new version
803            let mut new_node = latest_node.clone();
804            new_node.version = self.current_version;
805            new_node.updated_at = chrono::Utc::now().timestamp_millis();
806            old_val = new_node.set_property(key_str.clone(), val.clone());
807            versions.push(new_node);
808        } else {
809            // Update in place (same transaction/version)
810            let node = versions.last_mut().unwrap();
811            old_val = node.set_property(key_str.clone(), val.clone());
812        }
813
814        let event = crate::graph::event::IndexEvent::PropertySet {
815            tenant_id: tenant_id.to_string(),
816            id: node_id,
817            labels: versions.last().unwrap().labels.iter().cloned().collect(),
818            key: key_str,
819            old_value: old_val,
820            new_value: val,
821        };
822
823        if let Some(sender) = &self.index_sender {
824            let _ = sender.send(event);
825        } else {
826            self.handle_index_event(event, None);
827        }
828
829        Ok(())
830    }
831
832    /// Delete a node and all its connected edges
833    pub fn delete_node(&mut self, tenant_id: &str, id: NodeId) -> GraphResult<Node> {
834        let idx = id.as_u64() as usize;
835        let latest_node = self
836            .get_node(id)
837            .ok_or(GraphError::NodeNotFound(id))?
838            .clone();
839
840        // Create a tombstone version (we use a special property or metadata in a real system)
841        // For now, we'll just not return it in get_node_at_version if we had a flag.
842        // Let's add a `deleted` flag to Node/Edge for true MVCC.
843
844        // Add to free list for reuse (In true MVCC, we only reuse after compaction/vacuum)
845        self.free_node_ids.push(id.as_u64());
846
847        // For this prototype, we'll keep the removal logic but wrap it in MVCC metadata if needed.
848        // Actually, let's keep it simple: removal from the latest version effectively deletes it.
849        // But to keep history, we should NOT remove from the Vec.
850
851        // Remove from label indices and update catalog
852        for label in &latest_node.labels {
853            if let Some(node_set) = self.label_index.get_mut(label) {
854                node_set.remove(&id);
855            }
856            self.catalog.on_label_removed(label);
857        }
858
859        let event = crate::graph::event::IndexEvent::NodeDeleted {
860            tenant_id: tenant_id.to_string(),
861            id,
862            labels: latest_node.labels.iter().cloned().collect(),
863            properties: latest_node.properties.clone(),
864        };
865
866        if let Some(sender) = &self.index_sender {
867            let _ = sender.send(event);
868        } else {
869            self.handle_index_event(event, None);
870        }
871
872        // Remove from the versions (breaking historical reads for now, full MVCC is complex)
873        // TODO: Implement proper tombstone versions
874        let node = self.nodes[idx].pop().unwrap();
875
876        // Remove all connected edges
877        let outgoing_edges: Vec<EdgeId> = std::mem::take(&mut self.outgoing[idx])
878            .into_iter()
879            .map(|(_, eid)| eid)
880            .collect();
881        let incoming_edges: Vec<EdgeId> = std::mem::take(&mut self.incoming[idx])
882            .into_iter()
883            .map(|(_, eid)| eid)
884            .collect();
885
886        for edge_id in outgoing_edges.iter().chain(incoming_edges.iter()) {
887            let _ = self.delete_edge(*edge_id);
888        }
889
890        Ok(node)
891    }
892
893    /// Add a label to an existing node AND update the label index
894    ///
895    /// This is the correct way to add labels to nodes after creation.
896    /// Using `node.add_label()` directly will NOT update the label_index,
897    /// making the node invisible to `get_nodes_by_label()` queries.
898    pub fn add_label_to_node(
899        &mut self,
900        tenant_id: &str,
901        node_id: NodeId,
902        label: impl Into<Label>,
903    ) -> GraphResult<()> {
904        let label = label.into();
905        let idx = node_id.as_u64() as usize;
906
907        // Get the node and add the label
908        let node = self
909            .nodes
910            .get_mut(idx)
911            .and_then(|v| v.last_mut())
912            .ok_or(GraphError::NodeNotFound(node_id))?;
913        node.add_label(label.clone());
914
915        // Update the label index so queries can find this node by the new label
916        self.label_index
917            .entry(label.clone())
918            .or_default()
919            .insert(node_id);
920
921        // Update catalog label count
922        self.catalog.on_label_added(&label);
923
924        let event = crate::graph::event::IndexEvent::LabelAdded {
925            tenant_id: tenant_id.to_string(),
926            id: node_id,
927            label: label.clone(),
928            properties: node.properties.clone(),
929        };
930
931        if let Some(sender) = &self.index_sender {
932            let _ = sender.send(event);
933        } else {
934            self.handle_index_event(event, None);
935        }
936
937        Ok(())
938    }
939
940    /// Create an edge between two nodes
941    pub fn create_edge(
942        &mut self,
943        source: NodeId,
944        target: NodeId,
945        edge_type: impl Into<EdgeType>,
946    ) -> GraphResult<EdgeId> {
947        // Validate nodes exist
948        if !self.has_node(source) {
949            return Err(GraphError::InvalidEdgeSource(source));
950        }
951        if !self.has_node(target) {
952            return Err(GraphError::InvalidEdgeTarget(target));
953        }
954
955        let edge_id_u64 = if let Some(id) = self.free_edge_ids.pop() {
956            id
957        } else {
958            let id = self.next_edge_id;
959            self.next_edge_id += 1;
960            id
961        };
962        let edge_id = EdgeId::new(edge_id_u64);
963        let idx = edge_id_u64 as usize;
964
965        let edge_type = edge_type.into();
966        let mut edge = Edge::new(edge_id, source, target, edge_type.clone());
967        edge.version = self.current_version;
968
969        // Update adjacency lists (sorted insert by target/source NodeId)
970        {
971            let out_list = &mut self.outgoing[source.as_u64() as usize];
972            let pos = out_list
973                .binary_search_by_key(&target, |(nid, _)| *nid)
974                .unwrap_or_else(|p| p);
975            out_list.insert(pos, (target, edge_id));
976        }
977        {
978            let in_list = &mut self.incoming[target.as_u64() as usize];
979            let pos = in_list
980                .binary_search_by_key(&source, |(nid, _)| *nid)
981                .unwrap_or_else(|p| p);
982            in_list.insert(pos, (source, edge_id));
983        }
984
985        // Ensure storage capacity
986        if idx >= self.edges.len() {
987            self.edges.resize(idx + 1, Vec::new());
988        }
989
990        // Update edge type index
991        self.edge_type_index
992            .entry(edge_type.clone())
993            .or_default()
994            .insert(edge_id);
995
996        // Update catalog triple stats
997        let src_labels: Vec<Label> = self
998            .get_node(source)
999            .map(|n| n.labels.iter().cloned().collect())
1000            .unwrap_or_default();
1001        let tgt_labels: Vec<Label> = self
1002            .get_node(target)
1003            .map(|n| n.labels.iter().cloned().collect())
1004            .unwrap_or_default();
1005        self.catalog
1006            .on_edge_created(source, &src_labels, &edge_type, target, &tgt_labels);
1007
1008        self.edges[idx].push(edge);
1009        Ok(edge_id)
1010    }
1011
1012    /// Create an edge with properties
1013    pub fn create_edge_with_properties(
1014        &mut self,
1015        source: NodeId,
1016        target: NodeId,
1017        edge_type: impl Into<EdgeType>,
1018        properties: PropertyMap,
1019    ) -> GraphResult<EdgeId> {
1020        // Validate nodes exist
1021        if !self.has_node(source) {
1022            return Err(GraphError::InvalidEdgeSource(source));
1023        }
1024        if !self.has_node(target) {
1025            return Err(GraphError::InvalidEdgeTarget(target));
1026        }
1027
1028        let edge_id_u64 = if let Some(id) = self.free_edge_ids.pop() {
1029            id
1030        } else {
1031            let id = self.next_edge_id;
1032            self.next_edge_id += 1;
1033            id
1034        };
1035        let edge_id = EdgeId::new(edge_id_u64);
1036        let idx = edge_id_u64 as usize;
1037
1038        // Populate columnar storage
1039        for (key, value) in &properties {
1040            self.edge_columns.set_property(idx, key, value.clone());
1041        }
1042
1043        let edge_type = edge_type.into();
1044        let mut edge =
1045            Edge::new_with_properties(edge_id, source, target, edge_type.clone(), properties);
1046        edge.version = self.current_version;
1047
1048        // Update adjacency lists (sorted insert by target/source NodeId)
1049        {
1050            let out_list = &mut self.outgoing[source.as_u64() as usize];
1051            let pos = out_list
1052                .binary_search_by_key(&target, |(nid, _)| *nid)
1053                .unwrap_or_else(|p| p);
1054            out_list.insert(pos, (target, edge_id));
1055        }
1056        {
1057            let in_list = &mut self.incoming[target.as_u64() as usize];
1058            let pos = in_list
1059                .binary_search_by_key(&source, |(nid, _)| *nid)
1060                .unwrap_or_else(|p| p);
1061            in_list.insert(pos, (source, edge_id));
1062        }
1063
1064        // Ensure storage capacity
1065        if idx >= self.edges.len() {
1066            self.edges.resize(idx + 1, Vec::new());
1067        }
1068
1069        // Update edge type index
1070        self.edge_type_index
1071            .entry(edge_type.clone())
1072            .or_default()
1073            .insert(edge_id);
1074
1075        // Update catalog triple stats
1076        let src_labels: Vec<Label> = self
1077            .get_node(source)
1078            .map(|n| n.labels.iter().cloned().collect())
1079            .unwrap_or_default();
1080        let tgt_labels: Vec<Label> = self
1081            .get_node(target)
1082            .map(|n| n.labels.iter().cloned().collect())
1083            .unwrap_or_default();
1084        self.catalog
1085            .on_edge_created(source, &src_labels, &edge_type, target, &tgt_labels);
1086
1087        self.edges[idx].push(edge);
1088        Ok(edge_id)
1089    }
1090
1091    /// Get an edge by ID at a specific version (MVCC)
1092    pub fn get_edge_at_version(&self, id: EdgeId, version: u64) -> Option<&Edge> {
1093        let idx = id.as_u64() as usize;
1094        let versions = self.edges.get(idx)?;
1095
1096        // Find the latest version <= requested version
1097        versions.iter().rev().find(|e| e.version <= version)
1098    }
1099
1100    /// Get an edge by ID (uses current version)
1101    pub fn get_edge(&self, id: EdgeId) -> Option<&Edge> {
1102        self.get_edge_at_version(id, self.current_version)
1103    }
1104
1105    /// Get a mutable edge by ID (always latest version)
1106    pub fn get_edge_mut(&mut self, id: EdgeId) -> Option<&mut Edge> {
1107        self.edges
1108            .get_mut(id.as_u64() as usize)
1109            .and_then(|v| v.last_mut())
1110    }
1111
1112    /// Check if an edge exists
1113    pub fn has_edge(&self, id: EdgeId) -> bool {
1114        self.get_edge(id).is_some()
1115    }
1116
1117    /// Delete an edge
1118    pub fn delete_edge(&mut self, id: EdgeId) -> GraphResult<Edge> {
1119        let idx = id.as_u64() as usize;
1120
1121        // Collect catalog info before removal
1122        let (src_labels, tgt_labels, source_id, target_id, edge_type_clone) = {
1123            let edge = self
1124                .edges
1125                .get(idx)
1126                .and_then(|v| v.last())
1127                .ok_or(GraphError::EdgeNotFound(id))?;
1128            let src_labels: Vec<Label> = self
1129                .get_node(edge.source)
1130                .map(|n| n.labels.iter().cloned().collect())
1131                .unwrap_or_default();
1132            let tgt_labels: Vec<Label> = self
1133                .get_node(edge.target)
1134                .map(|n| n.labels.iter().cloned().collect())
1135                .unwrap_or_default();
1136            (
1137                src_labels,
1138                tgt_labels,
1139                edge.source,
1140                edge.target,
1141                edge.edge_type.clone(),
1142            )
1143        };
1144
1145        let edge = self
1146            .edges
1147            .get_mut(idx)
1148            .and_then(|v| v.pop())
1149            .ok_or(GraphError::EdgeNotFound(id))?;
1150
1151        // Add to free list
1152        self.free_edge_ids.push(id.as_u64());
1153
1154        // Remove from edge type index
1155        if let Some(edge_set) = self.edge_type_index.get_mut(&edge.edge_type) {
1156            edge_set.remove(&id);
1157        }
1158
1159        // Remove from adjacency lists
1160        if let Some(adj) = self.outgoing.get_mut(edge.source.as_u64() as usize) {
1161            adj.retain(|&(_, eid)| eid != id);
1162        }
1163        if let Some(adj) = self.incoming.get_mut(edge.target.as_u64() as usize) {
1164            adj.retain(|&(_, eid)| eid != id);
1165        }
1166
1167        // Update catalog triple stats
1168        self.catalog.on_edge_deleted(
1169            source_id,
1170            &src_labels,
1171            &edge_type_clone,
1172            target_id,
1173            &tgt_labels,
1174        );
1175
1176        Ok(edge)
1177    }
1178
1179    /// Get all outgoing edges from a node
1180    pub fn get_outgoing_edges(&self, node_id: NodeId) -> Vec<&Edge> {
1181        self.outgoing
1182            .get(node_id.as_u64() as usize)
1183            .map(|entries| {
1184                entries
1185                    .iter()
1186                    .filter_map(|&(_, eid)| self.get_edge(eid))
1187                    .collect()
1188            })
1189            .unwrap_or_default()
1190    }
1191
1192    /// Get all incoming edges to a node
1193    pub fn get_incoming_edges(&self, node_id: NodeId) -> Vec<&Edge> {
1194        self.incoming
1195            .get(node_id.as_u64() as usize)
1196            .map(|entries| {
1197                entries
1198                    .iter()
1199                    .filter_map(|&(_, eid)| self.get_edge(eid))
1200                    .collect()
1201            })
1202            .unwrap_or_default()
1203    }
1204
1205    /// Get outgoing edge targets as lightweight tuples (no Edge clone)
1206    /// Returns (EdgeId, source NodeId, target NodeId, &EdgeType) for each outgoing edge
1207    pub fn get_outgoing_edge_targets(
1208        &self,
1209        node_id: NodeId,
1210    ) -> Vec<(EdgeId, NodeId, NodeId, &EdgeType)> {
1211        self.outgoing
1212            .get(node_id.as_u64() as usize)
1213            .map(|entries| {
1214                entries
1215                    .iter()
1216                    .filter_map(|&(_, eid)| {
1217                        self.get_edge(eid)
1218                            .map(|e| (eid, e.source, e.target, &e.edge_type))
1219                    })
1220                    .collect()
1221            })
1222            .unwrap_or_default()
1223    }
1224
1225    /// Get incoming edge sources as lightweight tuples (no Edge clone)
1226    /// Returns (EdgeId, source NodeId, target NodeId, &EdgeType) for each incoming edge
1227    pub fn get_incoming_edge_sources(
1228        &self,
1229        node_id: NodeId,
1230    ) -> Vec<(EdgeId, NodeId, NodeId, &EdgeType)> {
1231        self.incoming
1232            .get(node_id.as_u64() as usize)
1233            .map(|entries| {
1234                entries
1235                    .iter()
1236                    .filter_map(|&(_, eid)| {
1237                        self.get_edge(eid)
1238                            .map(|e| (eid, e.source, e.target, &e.edge_type))
1239                    })
1240                    .collect()
1241            })
1242            .unwrap_or_default()
1243    }
1244
1245    /// Check if an edge exists between source and target, optionally filtered by edge type.
1246    /// Uses binary search on sorted adjacency lists for O(log d) lookup.
1247    /// Returns the first matching EdgeId, or None.
1248    pub fn edge_between(
1249        &self,
1250        source: NodeId,
1251        target: NodeId,
1252        edge_type: Option<&EdgeType>,
1253    ) -> Option<EdgeId> {
1254        let out_len = self
1255            .outgoing
1256            .get(source.as_u64() as usize)
1257            .map(|v| v.len())
1258            .unwrap_or(0);
1259        let in_len = self
1260            .incoming
1261            .get(target.as_u64() as usize)
1262            .map(|v| v.len())
1263            .unwrap_or(0);
1264
1265        // Use outgoing from source (search for target) or incoming to target (search for source)
1266        let (entries, search_key) = if out_len <= in_len {
1267            (self.outgoing.get(source.as_u64() as usize)?, target)
1268        } else {
1269            (self.incoming.get(target.as_u64() as usize)?, source)
1270        };
1271
1272        // Binary search to find the neighborhood of matching entries
1273        let start = match entries.binary_search_by_key(&search_key, |(nid, _)| *nid) {
1274            Ok(pos) => {
1275                // Walk back to find the first entry with this key
1276                let mut p = pos;
1277                while p > 0 && entries[p - 1].0 == search_key {
1278                    p -= 1;
1279                }
1280                p
1281            }
1282            Err(_) => return None,
1283        };
1284
1285        // Scan forward through entries with the matching key
1286        for &(nid, eid) in entries.iter().skip(start) {
1287            if nid != search_key {
1288                break;
1289            }
1290            if let Some(e) = self.get_edge(eid) {
1291                if e.source == source && e.target == target {
1292                    match edge_type {
1293                        Some(et) if &e.edge_type != et => continue,
1294                        _ => return Some(eid),
1295                    }
1296                }
1297            }
1298        }
1299        None
1300    }
1301
1302    /// Get all edges between source and target, optionally filtered by edge type.
1303    /// Uses binary search on sorted adjacency lists.
1304    pub fn edges_between(
1305        &self,
1306        source: NodeId,
1307        target: NodeId,
1308        edge_type: Option<&EdgeType>,
1309    ) -> Vec<EdgeId> {
1310        let out_len = self
1311            .outgoing
1312            .get(source.as_u64() as usize)
1313            .map(|v| v.len())
1314            .unwrap_or(0);
1315        let in_len = self
1316            .incoming
1317            .get(target.as_u64() as usize)
1318            .map(|v| v.len())
1319            .unwrap_or(0);
1320
1321        let (entries, search_key) = if out_len <= in_len {
1322            match self.outgoing.get(source.as_u64() as usize) {
1323                Some(e) => (e, target),
1324                None => return Vec::new(),
1325            }
1326        } else {
1327            match self.incoming.get(target.as_u64() as usize) {
1328                Some(e) => (e, source),
1329                None => return Vec::new(),
1330            }
1331        };
1332
1333        let start = match entries.binary_search_by_key(&search_key, |(nid, _)| *nid) {
1334            Ok(pos) => {
1335                let mut p = pos;
1336                while p > 0 && entries[p - 1].0 == search_key {
1337                    p -= 1;
1338                }
1339                p
1340            }
1341            Err(_) => return Vec::new(),
1342        };
1343
1344        let mut result = Vec::new();
1345        for &(nid, eid) in entries.iter().skip(start) {
1346            if nid != search_key {
1347                break;
1348            }
1349            if let Some(e) = self.get_edge(eid) {
1350                if e.source == source && e.target == target {
1351                    match edge_type {
1352                        Some(et) if &e.edge_type != et => {}
1353                        _ => result.push(eid),
1354                    }
1355                }
1356            }
1357        }
1358        result
1359    }
1360
1361    /// Get all nodes with a specific label
1362    pub fn get_nodes_by_label(&self, label: &Label) -> Vec<&Node> {
1363        self.label_index
1364            .get(label)
1365            .map(|node_ids| {
1366                node_ids
1367                    .iter()
1368                    .filter_map(|&id| self.get_node(id))
1369                    .collect()
1370            })
1371            .unwrap_or_default()
1372    }
1373
1374    /// Get all edges of a specific type
1375    pub fn get_edges_by_type(&self, edge_type: &EdgeType) -> Vec<&Edge> {
1376        self.edge_type_index
1377            .get(edge_type)
1378            .map(|edge_ids| {
1379                edge_ids
1380                    .iter()
1381                    .filter_map(|&id| self.get_edge(id))
1382                    .collect()
1383            })
1384            .unwrap_or_default()
1385    }
1386
1387    /// Get total number of nodes
1388    pub fn node_count(&self) -> usize {
1389        self.nodes.iter().flatten().count()
1390    }
1391
1392    /// Get total number of edges
1393    pub fn edge_count(&self) -> usize {
1394        self.edges.iter().flatten().count()
1395    }
1396
1397    /// Get all nodes in the graph
1398    pub fn all_nodes(&self) -> Vec<&Node> {
1399        self.nodes.iter().flatten().collect()
1400    }
1401
1402    /// Get all edges in the graph
1403    pub fn all_edges(&self) -> Vec<&Edge> {
1404        self.edges.iter().flatten().collect()
1405    }
1406
1407    // ============================================================
1408    // Graph Statistics (for cost-based query optimization)
1409    // ============================================================
1410
1411    /// Compute statistics for the current graph state
1412    pub fn compute_statistics(&self) -> GraphStatistics {
1413        let total_nodes = self.node_count();
1414        let total_edges = self.edge_count();
1415
1416        let mut label_counts = HashMap::new();
1417        for (label, node_ids) in &self.label_index {
1418            label_counts.insert(label.clone(), node_ids.len());
1419        }
1420
1421        let mut edge_type_counts = HashMap::new();
1422        for (edge_type, edge_ids) in &self.edge_type_index {
1423            edge_type_counts.insert(edge_type.clone(), edge_ids.len());
1424        }
1425
1426        // Compute average degree
1427        let avg_out_degree = if total_nodes > 0 {
1428            total_edges as f64 / total_nodes as f64
1429        } else {
1430            0.0
1431        };
1432
1433        // Sample property selectivity for common properties
1434        let mut property_stats: HashMap<(Label, String), PropertyStats> = HashMap::new();
1435        for (label, node_ids) in &self.label_index {
1436            let sample_size = node_ids.len().min(1000);
1437            let mut property_presence: HashMap<String, usize> = HashMap::new();
1438            let mut property_distinct: HashMap<String, HashSet<u64>> = HashMap::new();
1439
1440            for (i, &node_id) in node_ids.iter().enumerate() {
1441                if i >= sample_size {
1442                    break;
1443                }
1444                if let Some(node) = self.get_node(node_id) {
1445                    for (key, val) in &node.properties {
1446                        *property_presence.entry(key.clone()).or_insert(0) += 1;
1447
1448                        let hash = {
1449                            use std::hash::{Hash, Hasher};
1450                            let mut hasher = std::collections::hash_map::DefaultHasher::new();
1451                            val.hash(&mut hasher);
1452                            hasher.finish()
1453                        };
1454                        property_distinct
1455                            .entry(key.clone())
1456                            .or_default()
1457                            .insert(hash);
1458                    }
1459                }
1460            }
1461
1462            for (prop, count) in &property_presence {
1463                let distinct = property_distinct.get(prop).map(|s| s.len()).unwrap_or(0);
1464                let selectivity = if distinct > 0 {
1465                    1.0 / distinct as f64
1466                } else {
1467                    1.0
1468                };
1469                property_stats.insert(
1470                    (label.clone(), prop.clone()),
1471                    PropertyStats {
1472                        null_fraction: 1.0 - (*count as f64 / sample_size as f64),
1473                        distinct_count: distinct,
1474                        selectivity,
1475                    },
1476                );
1477            }
1478        }
1479
1480        GraphStatistics {
1481            total_nodes,
1482            total_edges,
1483            label_counts,
1484            edge_type_counts,
1485            avg_out_degree,
1486            property_stats,
1487        }
1488    }
1489
1490    /// Get the triple-level statistics catalog (for graph-native query planning)
1491    pub fn catalog(&self) -> &GraphCatalog {
1492        &self.catalog
1493    }
1494
1495    /// Get node count for a specific label (fast, O(1))
1496    pub fn label_node_count(&self, label: &Label) -> usize {
1497        self.label_index.get(label).map(|s| s.len()).unwrap_or(0)
1498    }
1499
1500    /// Get edge count for a specific type (fast, O(1))
1501    pub fn edge_type_count(&self, edge_type: &EdgeType) -> usize {
1502        self.edge_type_index
1503            .get(edge_type)
1504            .map(|s| s.len())
1505            .unwrap_or(0)
1506    }
1507
1508    /// Get all label names in the graph
1509    pub fn all_labels(&self) -> Vec<&Label> {
1510        self.label_index.keys().collect()
1511    }
1512
1513    /// Get all edge type names in the graph
1514    pub fn all_edge_types(&self) -> Vec<&EdgeType> {
1515        self.edge_type_index.keys().collect()
1516    }
1517
1518    /// Clear all data from the graph
1519    pub fn clear(&mut self) {
1520        self.nodes.clear();
1521        self.edges.clear();
1522        self.outgoing.clear();
1523        self.incoming.clear();
1524        self.free_node_ids.clear();
1525        self.free_edge_ids.clear();
1526        self.label_index.clear();
1527        self.edge_type_index.clear();
1528        self.vector_index = Arc::new(VectorIndexManager::new());
1529        self.property_index = Arc::new(IndexManager::new());
1530        self.node_columns = ColumnStore::new();
1531        self.edge_columns = ColumnStore::new();
1532        self.next_node_id = 1;
1533        self.next_edge_id = 1;
1534        self.catalog.clear();
1535    }
1536
1537    // ============================================================
1538    // Event Handling
1539    // ============================================================
1540
1541    pub fn handle_index_event(
1542        &self,
1543        event: crate::graph::event::IndexEvent,
1544        _tenant_manager: Option<Arc<crate::persistence::TenantManager>>,
1545    ) {
1546        use crate::graph::event::IndexEvent::*;
1547        match event {
1548            NodeCreated {
1549                tenant_id: _,
1550                id,
1551                labels,
1552                properties,
1553            } => {
1554                for (key, value) in properties {
1555                    if let PropertyValue::Vector(vec) = &value {
1556                        for label in &labels {
1557                            let _ = self.vector_index.add_vector(label.as_str(), &key, id, vec);
1558                        }
1559                    }
1560                    for label in &labels {
1561                        self.property_index
1562                            .index_insert(label, &key, value.clone(), id);
1563                    }
1564                }
1565            }
1566            NodeDeleted {
1567                tenant_id: _,
1568                id,
1569                labels,
1570                properties,
1571            } => {
1572                for (key, value) in properties {
1573                    for label in &labels {
1574                        self.property_index.index_remove(label, &key, &value, id);
1575                    }
1576                }
1577            }
1578            PropertySet {
1579                tenant_id: _,
1580                id,
1581                labels,
1582                key,
1583                old_value,
1584                new_value,
1585            } => {
1586                if let Some(old) = old_value {
1587                    for label in &labels {
1588                        self.property_index.index_remove(label, &key, &old, id);
1589                    }
1590                }
1591                for label in &labels {
1592                    self.property_index
1593                        .index_insert(label, &key, new_value.clone(), id);
1594                }
1595                if let PropertyValue::Vector(vec) = &new_value {
1596                    for label in &labels {
1597                        let _ = self.vector_index.add_vector(label.as_str(), &key, id, vec);
1598                    }
1599                }
1600            }
1601            LabelAdded {
1602                tenant_id: _,
1603                id,
1604                label,
1605                properties,
1606            } => {
1607                for (key, value) in properties {
1608                    if let PropertyValue::Vector(vec) = &value {
1609                        let _ = self.vector_index.add_vector(label.as_str(), &key, id, vec);
1610                    }
1611                    self.property_index
1612                        .index_insert(&label, &key, value.clone(), id);
1613                }
1614            }
1615        }
1616    }
1617
1618    // ============================================================
1619    // Vector Index methods
1620    // ============================================================
1621
1622    /// Create a vector index for a specific label and property
1623    pub fn create_vector_index(
1624        &self,
1625        label: &str,
1626        property_key: &str,
1627        dimensions: usize,
1628        metric: DistanceMetric,
1629    ) -> VectorResult<()> {
1630        self.vector_index
1631            .create_index(label, property_key, dimensions, metric)
1632    }
1633
1634    /// Search for nearest neighbors using a vector index
1635    pub fn vector_search(
1636        &self,
1637        label: &str,
1638        property_key: &str,
1639        query: &[f32],
1640        k: usize,
1641    ) -> VectorResult<Vec<(NodeId, f32)>> {
1642        self.vector_index.search(label, property_key, query, k)
1643    }
1644
1645    // ============================================================
1646    // Recovery methods - used to rebuild graph from persisted data
1647    // ============================================================
1648
1649    /// Insert a recovered node (used during recovery from persistence)
1650    /// Unlike create_node(), this preserves the node's existing ID
1651    pub fn insert_recovered_node(&mut self, node: Node) {
1652        let node_id = node.id;
1653        let idx = node_id.as_u64() as usize;
1654
1655        // Ensure storage capacity
1656        if idx >= self.nodes.len() {
1657            self.nodes.resize(idx + 1, Vec::new());
1658            self.outgoing.resize(idx + 1, Vec::new());
1659            self.incoming.resize(idx + 1, Vec::new());
1660        }
1661
1662        // Update label indices for all labels
1663        for label in &node.labels {
1664            self.label_index
1665                .entry(label.clone())
1666                .or_default()
1667                .insert(node_id);
1668        }
1669
1670        // Insert the node
1671        self.nodes[idx].push(node);
1672
1673        // Update next_node_id to be higher than any recovered node
1674        if node_id.as_u64() >= self.next_node_id {
1675            self.next_node_id = node_id.as_u64() + 1;
1676        }
1677    }
1678
1679    /// Insert a recovered edge (used during recovery from persistence)
1680    /// Unlike create_edge(), this preserves the edge's existing ID
1681    /// Note: Source and target nodes must already exist
1682    pub fn insert_recovered_edge(&mut self, edge: Edge) -> GraphResult<()> {
1683        let edge_id = edge.id;
1684        let idx = edge_id.as_u64() as usize;
1685        let source = edge.source;
1686        let target = edge.target;
1687
1688        // Ensure capacity
1689        if idx >= self.edges.len() {
1690            self.edges.resize(idx + 1, Vec::new());
1691        }
1692
1693        // Validate nodes exist
1694        if !self.has_node(source) {
1695            return Err(GraphError::InvalidEdgeSource(source));
1696        }
1697        if !self.has_node(target) {
1698            return Err(GraphError::InvalidEdgeTarget(target));
1699        }
1700
1701        // Update adjacency lists (sorted insert)
1702        {
1703            let out_list = &mut self.outgoing[source.as_u64() as usize];
1704            let pos = out_list
1705                .binary_search_by_key(&target, |(nid, _)| *nid)
1706                .unwrap_or_else(|p| p);
1707            out_list.insert(pos, (target, edge_id));
1708        }
1709        {
1710            let in_list = &mut self.incoming[target.as_u64() as usize];
1711            let pos = in_list
1712                .binary_search_by_key(&source, |(nid, _)| *nid)
1713                .unwrap_or_else(|p| p);
1714            in_list.insert(pos, (source, edge_id));
1715        }
1716
1717        // Update edge type index
1718        self.edge_type_index
1719            .entry(edge.edge_type.clone())
1720            .or_default()
1721            .insert(edge_id);
1722
1723        // Insert the edge
1724        self.edges[idx].push(edge);
1725
1726        // Update next_edge_id to be higher than any recovered edge
1727        if edge_id.as_u64() >= self.next_edge_id {
1728            self.next_edge_id = edge_id.as_u64() + 1;
1729        }
1730
1731        Ok(())
1732    }
1733}
1734
1735impl Default for GraphStore {
1736    fn default() -> Self {
1737        Self::new()
1738    }
1739}
1740
1741#[cfg(test)]
1742mod tests {
1743    use super::*;
1744
1745    #[test]
1746    fn test_create_and_get_node() {
1747        let mut store = GraphStore::new();
1748        let node_id = store.create_node("Person");
1749
1750        assert_eq!(store.node_count(), 1);
1751        let node = store.get_node(node_id).unwrap();
1752        assert_eq!(node.id, node_id);
1753        assert!(node.has_label(&Label::new("Person")));
1754    }
1755
1756    #[test]
1757    fn test_create_node_with_properties() {
1758        let mut store = GraphStore::new();
1759        let mut props = PropertyMap::new();
1760        props.insert("name".to_string(), "Alice".into());
1761        props.insert("age".to_string(), 30i64.into());
1762
1763        let node_id = store.create_node_with_properties(
1764            "default",
1765            vec![Label::new("Person"), Label::new("Employee")],
1766            props,
1767        );
1768
1769        let node = store.get_node(node_id).unwrap();
1770        assert_eq!(node.label_count(), 2);
1771        assert_eq!(
1772            node.get_property("name").unwrap().as_string(),
1773            Some("Alice")
1774        );
1775        assert_eq!(node.get_property("age").unwrap().as_integer(), Some(30));
1776    }
1777
1778    #[test]
1779    fn test_create_and_get_edge() {
1780        let mut store = GraphStore::new();
1781        let node1 = store.create_node("Person");
1782        let node2 = store.create_node("Person");
1783
1784        let edge_id = store.create_edge(node1, node2, "KNOWS").unwrap();
1785
1786        assert_eq!(store.edge_count(), 1);
1787        let edge = store.get_edge(edge_id).unwrap();
1788        assert_eq!(edge.source, node1);
1789        assert_eq!(edge.target, node2);
1790        assert_eq!(edge.edge_type, EdgeType::new("KNOWS"));
1791    }
1792
1793    #[test]
1794    fn test_edge_validation() {
1795        let mut store = GraphStore::new();
1796        let node1 = store.create_node("Person");
1797        let invalid_node = NodeId::new(999);
1798
1799        // Invalid source node
1800        let result = store.create_edge(invalid_node, node1, "KNOWS");
1801        assert_eq!(result, Err(GraphError::InvalidEdgeSource(invalid_node)));
1802
1803        // Invalid target node
1804        let result = store.create_edge(node1, invalid_node, "KNOWS");
1805        assert_eq!(result, Err(GraphError::InvalidEdgeTarget(invalid_node)));
1806    }
1807
1808    #[test]
1809    fn test_adjacency_lists() {
1810        let mut store = GraphStore::new();
1811        let node1 = store.create_node("Person");
1812        let node2 = store.create_node("Person");
1813        let node3 = store.create_node("Person");
1814
1815        store.create_edge(node1, node2, "KNOWS").unwrap();
1816        store.create_edge(node1, node3, "KNOWS").unwrap();
1817        store.create_edge(node2, node3, "FOLLOWS").unwrap();
1818
1819        // Node1 has 2 outgoing edges
1820        let outgoing = store.get_outgoing_edges(node1);
1821        assert_eq!(outgoing.len(), 2);
1822
1823        // Node2 has 1 outgoing, 1 incoming
1824        let outgoing = store.get_outgoing_edges(node2);
1825        assert_eq!(outgoing.len(), 1);
1826        let incoming = store.get_incoming_edges(node2);
1827        assert_eq!(incoming.len(), 1);
1828
1829        // Node3 has 0 outgoing, 2 incoming
1830        let outgoing = store.get_outgoing_edges(node3);
1831        assert_eq!(outgoing.len(), 0);
1832        let incoming = store.get_incoming_edges(node3);
1833        assert_eq!(incoming.len(), 2);
1834    }
1835
1836    #[test]
1837    fn test_label_index() {
1838        let mut store = GraphStore::new();
1839        store.create_node("Person");
1840        store.create_node("Person");
1841        store.create_node("Company");
1842
1843        let persons = store.get_nodes_by_label(&Label::new("Person"));
1844        assert_eq!(persons.len(), 2);
1845
1846        let companies = store.get_nodes_by_label(&Label::new("Company"));
1847        assert_eq!(companies.len(), 1);
1848    }
1849
1850    #[test]
1851    fn test_edge_type_index() {
1852        let mut store = GraphStore::new();
1853        let n1 = store.create_node("Person");
1854        let n2 = store.create_node("Person");
1855        let n3 = store.create_node("Person");
1856
1857        store.create_edge(n1, n2, "KNOWS").unwrap();
1858        store.create_edge(n2, n3, "KNOWS").unwrap();
1859        store.create_edge(n1, n3, "FOLLOWS").unwrap();
1860
1861        let knows_edges = store.get_edges_by_type(&EdgeType::new("KNOWS"));
1862        assert_eq!(knows_edges.len(), 2);
1863
1864        let follows_edges = store.get_edges_by_type(&EdgeType::new("FOLLOWS"));
1865        assert_eq!(follows_edges.len(), 1);
1866    }
1867
1868    #[test]
1869    fn test_delete_node() {
1870        let mut store = GraphStore::new();
1871        let node1 = store.create_node("Person");
1872        let node2 = store.create_node("Person");
1873        store.create_edge(node1, node2, "KNOWS").unwrap();
1874
1875        assert_eq!(store.node_count(), 2);
1876        assert_eq!(store.edge_count(), 1);
1877
1878        // Delete node1 (should also delete connected edge)
1879        let deleted = store.delete_node("default", node1);
1880        assert!(deleted.is_ok());
1881        assert_eq!(store.node_count(), 1);
1882        assert_eq!(store.edge_count(), 0);
1883    }
1884
1885    #[test]
1886    fn test_delete_edge() {
1887        let mut store = GraphStore::new();
1888        let node1 = store.create_node("Person");
1889        let node2 = store.create_node("Person");
1890        let edge_id = store.create_edge(node1, node2, "KNOWS").unwrap();
1891
1892        assert_eq!(store.edge_count(), 1);
1893
1894        let deleted = store.delete_edge(edge_id);
1895        assert!(deleted.is_ok());
1896        assert_eq!(store.edge_count(), 0);
1897
1898        // Edge removed from adjacency lists
1899        assert_eq!(store.get_outgoing_edges(node1).len(), 0);
1900        assert_eq!(store.get_incoming_edges(node2).len(), 0);
1901    }
1902
1903    #[test]
1904    fn test_multiple_edges_between_nodes() {
1905        // REQ-GRAPH-008: Multiple edges between same nodes
1906        let mut store = GraphStore::new();
1907        let node1 = store.create_node("Person");
1908        let node2 = store.create_node("Person");
1909
1910        let edge1 = store.create_edge(node1, node2, "KNOWS").unwrap();
1911        let edge2 = store.create_edge(node1, node2, "WORKS_WITH").unwrap();
1912        let edge3 = store.create_edge(node1, node2, "KNOWS").unwrap();
1913
1914        assert_eq!(store.edge_count(), 3);
1915        assert_ne!(edge1, edge2);
1916        assert_ne!(edge1, edge3);
1917
1918        let outgoing = store.get_outgoing_edges(node1);
1919        assert_eq!(outgoing.len(), 3);
1920    }
1921
1922    #[test]
1923    fn test_clear() {
1924        let mut store = GraphStore::new();
1925        store.create_node("Person");
1926        store.create_node("Person");
1927
1928        assert_eq!(store.node_count(), 2);
1929
1930        store.clear();
1931        assert_eq!(store.node_count(), 0);
1932        assert_eq!(store.edge_count(), 0);
1933    }
1934
1935    #[test]
1936    fn test_add_label_to_node() {
1937        let mut store = GraphStore::new();
1938        let node_id = store.create_node("Person");
1939
1940        // Initially only "Person" label is indexed
1941        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 1);
1942        assert_eq!(store.get_nodes_by_label(&Label::new("Employee")).len(), 0);
1943
1944        // Add "Employee" label using the proper method
1945        store
1946            .add_label_to_node("default", node_id, "Employee")
1947            .unwrap();
1948
1949        // Now both labels should be indexed and queryable
1950        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 1);
1951        assert_eq!(store.get_nodes_by_label(&Label::new("Employee")).len(), 1);
1952
1953        // Verify the node actually has both labels
1954        let node = store.get_node(node_id).unwrap();
1955        assert!(node.has_label(&Label::new("Person")));
1956        assert!(node.has_label(&Label::new("Employee")));
1957    }
1958
1959    #[test]
1960    fn test_add_label_to_nonexistent_node() {
1961        let mut store = GraphStore::new();
1962        let invalid_id = NodeId::new(999);
1963
1964        let result = store.add_label_to_node("default", invalid_id, "Employee");
1965        assert_eq!(result, Err(GraphError::NodeNotFound(invalid_id)));
1966    }
1967
1968    #[test]
1969    fn test_mvcc_node_versioning() {
1970        let mut store = GraphStore::new();
1971
1972        // Version 1: Create node
1973        let node_id = store.create_node("Person");
1974        store
1975            .set_node_property("default", node_id, "name", "Alice")
1976            .unwrap();
1977
1978        // Check version 1
1979        let v1_node = store.get_node_at_version(node_id, 1).unwrap();
1980        assert_eq!(v1_node.version, 1);
1981        assert_eq!(
1982            v1_node.get_property("name").unwrap().as_string(),
1983            Some("Alice")
1984        );
1985
1986        // Version 2: Update property (creates new version)
1987        store.current_version = 2;
1988        store
1989            .set_node_property("default", node_id, "name", "Alice Cooper")
1990            .unwrap();
1991
1992        // Check version 2
1993        let v2_node = store.get_node_at_version(node_id, 2).unwrap();
1994        assert_eq!(v2_node.version, 2);
1995        assert_eq!(
1996            v2_node.get_property("name").unwrap().as_string(),
1997            Some("Alice Cooper")
1998        );
1999
2000        // Historical read (Version 1 should still be Alice)
2001        let v1_lookup = store.get_node_at_version(node_id, 1).unwrap();
2002        assert_eq!(v1_lookup.version, 1);
2003        assert_eq!(
2004            v1_lookup.get_property("name").unwrap().as_string(),
2005            Some("Alice")
2006        );
2007
2008        let node = store.get_node(node_id).unwrap();
2009        assert_eq!(node.version, 2); // Should be latest version
2010    }
2011
2012    #[test]
2013    fn test_arena_resize() {
2014        let mut store = GraphStore::new();
2015        // Default capacity is 1024. Let's force a resize.
2016        // We can't easily peek capacity, but we can add > 1024 nodes.
2017
2018        for _ in 0..1100 {
2019            store.create_node("Item");
2020        }
2021
2022        assert_eq!(store.node_count(), 1100);
2023        let last_id = NodeId::new(1100);
2024        assert!(store.has_node(last_id));
2025    }
2026
2027    #[test]
2028    fn test_id_reuse() {
2029        let mut store = GraphStore::new();
2030        let n1 = store.create_node("A");
2031        let _n2 = store.create_node("B");
2032
2033        store.delete_node("default", n1).unwrap();
2034
2035        // Next creation should reuse n1's ID (which is 1)
2036        // n2 is 2.
2037        let n3 = store.create_node("C");
2038
2039        assert_eq!(n3, n1); // ID reuse
2040        assert_eq!(store.node_count(), 2); // B and C
2041    }
2042
2043    // ========== Batch 5: Additional Store Tests ==========
2044
2045    #[test]
2046    fn test_get_node() {
2047        let mut store = GraphStore::new();
2048        let id = store.create_node("Person");
2049        let node = store.get_node(id);
2050        assert!(node.is_some());
2051        assert!(node.unwrap().labels.contains(&Label::new("Person")));
2052
2053        // Non-existent node
2054        assert!(store.get_node(NodeId::new(9999)).is_none());
2055    }
2056
2057    #[test]
2058    fn test_get_node_mut() {
2059        let mut store = GraphStore::new();
2060        let id = store.create_node("Person");
2061        {
2062            let node = store.get_node_mut(id).unwrap();
2063            node.set_property(
2064                "name".to_string(),
2065                PropertyValue::String("Alice".to_string()),
2066            );
2067        }
2068        let node = store.get_node(id).unwrap();
2069        assert_eq!(
2070            node.get_property("name"),
2071            Some(&PropertyValue::String("Alice".to_string()))
2072        );
2073
2074        // Non-existent node
2075        assert!(store.get_node_mut(NodeId::new(9999)).is_none());
2076    }
2077
2078    #[test]
2079    fn test_has_node() {
2080        let mut store = GraphStore::new();
2081        let id = store.create_node("A");
2082        assert!(store.has_node(id));
2083        store.delete_node("default", id).unwrap();
2084        assert!(!store.has_node(id));
2085    }
2086
2087    #[test]
2088    fn test_set_node_property() {
2089        let mut store = GraphStore::new();
2090        let id = store.create_node("Person");
2091        store
2092            .set_node_property("default", id, "age", PropertyValue::Integer(30))
2093            .unwrap();
2094        let node = store.get_node(id).unwrap();
2095        assert_eq!(node.get_property("age"), Some(&PropertyValue::Integer(30)));
2096
2097        // Update existing property
2098        store
2099            .set_node_property("default", id, "age", PropertyValue::Integer(31))
2100            .unwrap();
2101        let node = store.get_node(id).unwrap();
2102        assert_eq!(node.get_property("age"), Some(&PropertyValue::Integer(31)));
2103
2104        // Non-existent node
2105        let result =
2106            store.set_node_property("default", NodeId::new(9999), "x", PropertyValue::Null);
2107        assert!(result.is_err());
2108    }
2109
2110    #[test]
2111    fn test_create_edge_with_properties() {
2112        let mut store = GraphStore::new();
2113        let a = store.create_node("Person");
2114        let b = store.create_node("Person");
2115
2116        let mut props = std::collections::HashMap::new();
2117        props.insert("since".to_string(), PropertyValue::Integer(2020));
2118        props.insert("weight".to_string(), PropertyValue::Float(0.8));
2119
2120        let eid = store
2121            .create_edge_with_properties(a, b, "KNOWS", props)
2122            .unwrap();
2123        let edge = store.get_edge(eid).unwrap();
2124        assert_eq!(edge.source, a);
2125        assert_eq!(edge.target, b);
2126        assert_eq!(
2127            edge.get_property("since"),
2128            Some(&PropertyValue::Integer(2020))
2129        );
2130        assert_eq!(
2131            edge.get_property("weight"),
2132            Some(&PropertyValue::Float(0.8))
2133        );
2134    }
2135
2136    #[test]
2137    fn test_create_edge_with_properties_invalid_nodes() {
2138        let mut store = GraphStore::new();
2139        let a = store.create_node("A");
2140        let props = std::collections::HashMap::new();
2141
2142        // Invalid target
2143        let result = store.create_edge_with_properties(a, NodeId::new(9999), "E", props.clone());
2144        assert!(result.is_err());
2145
2146        // Invalid source
2147        let result = store.create_edge_with_properties(NodeId::new(9999), a, "E", props);
2148        assert!(result.is_err());
2149    }
2150
2151    #[test]
2152    fn test_get_edge_and_has_edge() {
2153        let mut store = GraphStore::new();
2154        let a = store.create_node("A");
2155        let b = store.create_node("B");
2156        let eid = store.create_edge(a, b, "LINKS").unwrap();
2157
2158        assert!(store.has_edge(eid));
2159        let edge = store.get_edge(eid).unwrap();
2160        assert_eq!(edge.source, a);
2161        assert_eq!(edge.target, b);
2162
2163        // Non-existent
2164        assert!(!store.has_edge(EdgeId::new(9999)));
2165        assert!(store.get_edge(EdgeId::new(9999)).is_none());
2166    }
2167
2168    #[test]
2169    fn test_get_edge_mut() {
2170        let mut store = GraphStore::new();
2171        let a = store.create_node("A");
2172        let b = store.create_node("B");
2173        let eid = store.create_edge(a, b, "LINKS").unwrap();
2174
2175        {
2176            let edge = store.get_edge_mut(eid).unwrap();
2177            edge.set_property("weight".to_string(), PropertyValue::Float(1.5));
2178        }
2179        let edge = store.get_edge(eid).unwrap();
2180        assert_eq!(
2181            edge.get_property("weight"),
2182            Some(&PropertyValue::Float(1.5))
2183        );
2184
2185        assert!(store.get_edge_mut(EdgeId::new(9999)).is_none());
2186    }
2187
2188    #[test]
2189    fn test_get_outgoing_edge_targets() {
2190        let mut store = GraphStore::new();
2191        let a = store.create_node("A");
2192        let b = store.create_node("B");
2193        let c = store.create_node("C");
2194        let e1 = store.create_edge(a, b, "KNOWS").unwrap();
2195        let e2 = store.create_edge(a, c, "LIKES").unwrap();
2196
2197        let targets = store.get_outgoing_edge_targets(a);
2198        assert_eq!(targets.len(), 2);
2199        // Each tuple is (EdgeId, source, target, &EdgeType)
2200        let edge_ids: Vec<EdgeId> = targets.iter().map(|t| t.0).collect();
2201        assert!(edge_ids.contains(&e1));
2202        assert!(edge_ids.contains(&e2));
2203
2204        // Node with no outgoing edges
2205        let targets = store.get_outgoing_edge_targets(b);
2206        assert!(targets.is_empty());
2207    }
2208
2209    #[test]
2210    fn test_get_incoming_edge_sources() {
2211        let mut store = GraphStore::new();
2212        let a = store.create_node("A");
2213        let b = store.create_node("B");
2214        let c = store.create_node("C");
2215        store.create_edge(a, c, "KNOWS").unwrap();
2216        store.create_edge(b, c, "LIKES").unwrap();
2217
2218        let sources = store.get_incoming_edge_sources(c);
2219        assert_eq!(sources.len(), 2);
2220        let src_nodes: Vec<NodeId> = sources.iter().map(|t| t.1).collect();
2221        assert!(src_nodes.contains(&a));
2222        assert!(src_nodes.contains(&b));
2223
2224        // Node with no incoming edges
2225        let sources = store.get_incoming_edge_sources(a);
2226        assert!(sources.is_empty());
2227    }
2228
2229    #[test]
2230    fn test_all_nodes() {
2231        let mut store = GraphStore::new();
2232        assert!(store.all_nodes().is_empty());
2233
2234        store.create_node("A");
2235        store.create_node("B");
2236        store.create_node("C");
2237        assert_eq!(store.all_nodes().len(), 3);
2238    }
2239
2240    #[test]
2241    fn test_all_edges() {
2242        let mut store = GraphStore::new();
2243        assert!(store.all_edges().is_empty());
2244
2245        let a = store.create_node("A");
2246        let b = store.create_node("B");
2247        let c = store.create_node("C");
2248        store.create_edge(a, b, "R1").unwrap();
2249        store.create_edge(b, c, "R2").unwrap();
2250        assert_eq!(store.all_edges().len(), 2);
2251    }
2252
2253    #[test]
2254    fn test_compute_statistics() {
2255        let mut store = GraphStore::new();
2256        let a = store.create_node("Person");
2257        let b = store.create_node("Person");
2258        let c = store.create_node("Company");
2259        store.get_node_mut(a).unwrap().set_property(
2260            "name".to_string(),
2261            PropertyValue::String("Alice".to_string()),
2262        );
2263        store
2264            .get_node_mut(b)
2265            .unwrap()
2266            .set_property("name".to_string(), PropertyValue::String("Bob".to_string()));
2267        store.get_node_mut(c).unwrap().set_property(
2268            "name".to_string(),
2269            PropertyValue::String("Acme".to_string()),
2270        );
2271        store.create_edge(a, b, "KNOWS").unwrap();
2272        store.create_edge(a, c, "WORKS_AT").unwrap();
2273
2274        let stats = store.compute_statistics();
2275        assert_eq!(stats.total_nodes, 3);
2276        assert_eq!(stats.total_edges, 2);
2277        assert_eq!(*stats.label_counts.get(&Label::new("Person")).unwrap(), 2);
2278        assert_eq!(*stats.label_counts.get(&Label::new("Company")).unwrap(), 1);
2279        assert_eq!(
2280            *stats.edge_type_counts.get(&EdgeType::new("KNOWS")).unwrap(),
2281            1
2282        );
2283        assert_eq!(
2284            *stats
2285                .edge_type_counts
2286                .get(&EdgeType::new("WORKS_AT"))
2287                .unwrap(),
2288            1
2289        );
2290        assert!(stats.avg_out_degree > 0.0);
2291        // Property stats should exist for Person.name
2292        let person_name_stats = stats
2293            .property_stats
2294            .get(&(Label::new("Person"), "name".to_string()));
2295        assert!(person_name_stats.is_some());
2296        let ps = person_name_stats.unwrap();
2297        assert_eq!(ps.null_fraction, 0.0); // All Person nodes have "name"
2298        assert_eq!(ps.distinct_count, 2); // Alice, Bob
2299    }
2300
2301    #[test]
2302    fn test_compute_statistics_empty_graph() {
2303        let store = GraphStore::new();
2304        let stats = store.compute_statistics();
2305        assert_eq!(stats.total_nodes, 0);
2306        assert_eq!(stats.total_edges, 0);
2307        assert_eq!(stats.avg_out_degree, 0.0);
2308        assert!(stats.label_counts.is_empty());
2309        assert!(stats.edge_type_counts.is_empty());
2310    }
2311
2312    #[test]
2313    fn test_label_node_count() {
2314        let mut store = GraphStore::new();
2315        store.create_node("Person");
2316        store.create_node("Person");
2317        store.create_node("Company");
2318
2319        assert_eq!(store.label_node_count(&Label::new("Person")), 2);
2320        assert_eq!(store.label_node_count(&Label::new("Company")), 1);
2321        assert_eq!(store.label_node_count(&Label::new("NotExist")), 0);
2322    }
2323
2324    #[test]
2325    fn test_edge_type_count() {
2326        let mut store = GraphStore::new();
2327        let a = store.create_node("A");
2328        let b = store.create_node("B");
2329        let c = store.create_node("C");
2330        store.create_edge(a, b, "KNOWS").unwrap();
2331        store.create_edge(a, c, "KNOWS").unwrap();
2332        store.create_edge(b, c, "LIKES").unwrap();
2333
2334        assert_eq!(store.edge_type_count(&EdgeType::new("KNOWS")), 2);
2335        assert_eq!(store.edge_type_count(&EdgeType::new("LIKES")), 1);
2336        assert_eq!(store.edge_type_count(&EdgeType::new("NOPE")), 0);
2337    }
2338
2339    #[test]
2340    fn test_all_labels() {
2341        let mut store = GraphStore::new();
2342        store.create_node("Person");
2343        store.create_node("Company");
2344        store.create_node("Person"); // duplicate label
2345
2346        let labels = store.all_labels();
2347        assert_eq!(labels.len(), 2);
2348        let label_strs: Vec<&str> = labels.iter().map(|l| l.as_str()).collect();
2349        assert!(label_strs.contains(&"Person"));
2350        assert!(label_strs.contains(&"Company"));
2351    }
2352
2353    #[test]
2354    fn test_all_edge_types() {
2355        let mut store = GraphStore::new();
2356        let a = store.create_node("A");
2357        let b = store.create_node("B");
2358        let c = store.create_node("C");
2359        store.create_edge(a, b, "KNOWS").unwrap();
2360        store.create_edge(b, c, "LIKES").unwrap();
2361
2362        let types = store.all_edge_types();
2363        assert_eq!(types.len(), 2);
2364        let type_strs: Vec<&str> = types.iter().map(|t| t.as_str()).collect();
2365        assert!(type_strs.contains(&"KNOWS"));
2366        assert!(type_strs.contains(&"LIKES"));
2367    }
2368
2369    #[test]
2370    fn test_get_node_at_version() {
2371        let mut store = GraphStore::new();
2372        let id = store.create_node("Person");
2373        let v0 = store.get_node(id).unwrap().version;
2374
2375        // Node at its creation version should exist
2376        let node = store.get_node_at_version(id, v0);
2377        assert!(node.is_some());
2378        assert!(node.unwrap().labels.contains(&Label::new("Person")));
2379
2380        // Node at a version before creation should not exist
2381        // (only if v0 > 0, otherwise any version >= 0 finds it)
2382        if v0 > 0 {
2383            assert!(store.get_node_at_version(id, v0 - 1).is_none());
2384        }
2385
2386        // Non-existent node
2387        assert!(store.get_node_at_version(NodeId::new(9999), 0).is_none());
2388    }
2389
2390    #[test]
2391    fn test_get_edge_at_version() {
2392        let mut store = GraphStore::new();
2393        let a = store.create_node("A");
2394        let b = store.create_node("B");
2395        let eid = store.create_edge(a, b, "KNOWS").unwrap();
2396        let v0 = store.get_edge(eid).unwrap().version;
2397
2398        // Edge at version 0 should exist
2399        assert!(store.get_edge_at_version(eid, v0).is_some());
2400
2401        // Non-existent edge
2402        assert!(store.get_edge_at_version(EdgeId::new(9999), 0).is_none());
2403    }
2404
2405    #[test]
2406    fn test_create_vector_index() {
2407        let store = GraphStore::new();
2408        let result = store.create_vector_index(
2409            "Person",
2410            "embedding",
2411            128,
2412            crate::vector::DistanceMetric::Cosine,
2413        );
2414        assert!(result.is_ok());
2415
2416        // Creating a second index with different label should also succeed
2417        let result2 =
2418            store.create_vector_index("Document", "vec", 256, crate::vector::DistanceMetric::L2);
2419        assert!(result2.is_ok());
2420    }
2421
2422    #[test]
2423    fn test_set_node_property_updates_in_place() {
2424        let mut store = GraphStore::new();
2425        let id = store.create_node("Person");
2426        store
2427            .set_node_property(
2428                "default",
2429                id,
2430                "name",
2431                PropertyValue::String("Alice".to_string()),
2432            )
2433            .unwrap();
2434
2435        // Same version update — in-place
2436        store
2437            .set_node_property(
2438                "default",
2439                id,
2440                "name",
2441                PropertyValue::String("Bob".to_string()),
2442            )
2443            .unwrap();
2444        let node = store.get_node(id).unwrap();
2445        assert_eq!(
2446            node.get_property("name"),
2447            Some(&PropertyValue::String("Bob".to_string()))
2448        );
2449
2450        // Add another property
2451        store
2452            .set_node_property("default", id, "age", PropertyValue::Integer(30))
2453            .unwrap();
2454        let node = store.get_node(id).unwrap();
2455        assert_eq!(node.get_property("age"), Some(&PropertyValue::Integer(30)));
2456        assert_eq!(
2457            node.get_property("name"),
2458            Some(&PropertyValue::String("Bob".to_string()))
2459        );
2460    }
2461
2462    // ========== Coverage Enhancement Tests ==========
2463
2464    #[test]
2465    fn test_graph_error_display_node_not_found() {
2466        let err = GraphError::NodeNotFound(NodeId::new(42));
2467        assert_eq!(format!("{}", err), "Node NodeId(42) not found");
2468    }
2469
2470    #[test]
2471    fn test_graph_error_display_edge_not_found() {
2472        let err = GraphError::EdgeNotFound(EdgeId::new(7));
2473        assert_eq!(format!("{}", err), "Edge EdgeId(7) not found");
2474    }
2475
2476    #[test]
2477    fn test_graph_error_display_node_already_exists() {
2478        let err = GraphError::NodeAlreadyExists(NodeId::new(1));
2479        assert_eq!(format!("{}", err), "Node NodeId(1) already exists");
2480    }
2481
2482    #[test]
2483    fn test_graph_error_display_edge_already_exists() {
2484        let err = GraphError::EdgeAlreadyExists(EdgeId::new(3));
2485        assert_eq!(format!("{}", err), "Edge EdgeId(3) already exists");
2486    }
2487
2488    #[test]
2489    fn test_graph_error_display_invalid_edge_source() {
2490        let err = GraphError::InvalidEdgeSource(NodeId::new(99));
2491        assert_eq!(
2492            format!("{}", err),
2493            "Invalid edge: source node NodeId(99) does not exist"
2494        );
2495    }
2496
2497    #[test]
2498    fn test_graph_error_display_invalid_edge_target() {
2499        let err = GraphError::InvalidEdgeTarget(NodeId::new(88));
2500        assert_eq!(
2501            format!("{}", err),
2502            "Invalid edge: target node NodeId(88) does not exist"
2503        );
2504    }
2505
2506    #[test]
2507    fn test_graph_error_equality() {
2508        assert_eq!(
2509            GraphError::NodeNotFound(NodeId::new(1)),
2510            GraphError::NodeNotFound(NodeId::new(1))
2511        );
2512        assert_ne!(
2513            GraphError::NodeNotFound(NodeId::new(1)),
2514            GraphError::NodeNotFound(NodeId::new(2))
2515        );
2516        assert_ne!(
2517            GraphError::NodeNotFound(NodeId::new(1)),
2518            GraphError::EdgeNotFound(EdgeId::new(1))
2519        );
2520    }
2521
2522    #[test]
2523    fn test_graph_statistics_estimate_label_scan() {
2524        let mut store = GraphStore::new();
2525        for _ in 0..50 {
2526            store.create_node("Person");
2527        }
2528        for _ in 0..20 {
2529            store.create_node("Company");
2530        }
2531        let stats = store.compute_statistics();
2532        assert_eq!(stats.estimate_label_scan(&Label::new("Person")), 50);
2533        assert_eq!(stats.estimate_label_scan(&Label::new("Company")), 20);
2534        // Unknown label falls back to total_nodes (all-node scan estimate)
2535        assert_eq!(stats.estimate_label_scan(&Label::new("Unknown")), 70);
2536    }
2537
2538    #[test]
2539    fn test_graph_statistics_estimate_expand() {
2540        let mut store = GraphStore::new();
2541        let a = store.create_node("A");
2542        let b = store.create_node("B");
2543        let c = store.create_node("C");
2544        store.create_edge(a, b, "KNOWS").unwrap();
2545        store.create_edge(a, c, "KNOWS").unwrap();
2546        store.create_edge(b, c, "LIKES").unwrap();
2547
2548        let stats = store.compute_statistics();
2549        assert_eq!(
2550            stats.estimate_expand(Some(&EdgeType::new("KNOWS"))) as usize,
2551            2
2552        );
2553        assert_eq!(
2554            stats.estimate_expand(Some(&EdgeType::new("LIKES"))) as usize,
2555            1
2556        );
2557        assert_eq!(
2558            stats.estimate_expand(Some(&EdgeType::new("NOPE"))) as usize,
2559            0
2560        );
2561        // None means all edges
2562        assert_eq!(stats.estimate_expand(None) as usize, 3);
2563    }
2564
2565    #[test]
2566    fn test_graph_statistics_estimate_equality_selectivity() {
2567        let mut store = GraphStore::new();
2568        for i in 0..10 {
2569            let id = store.create_node("Person");
2570            store.get_node_mut(id).unwrap().set_property(
2571                "city".to_string(),
2572                PropertyValue::String(format!("City{}", i % 5)),
2573            );
2574        }
2575        let stats = store.compute_statistics();
2576        // 5 distinct cities among 10 Person nodes => selectivity = 1/5 = 0.2
2577        let sel = stats.estimate_equality_selectivity(&Label::new("Person"), "city");
2578        assert!((sel - 0.2).abs() < 0.01);
2579        // Unknown property should return default 0.1
2580        let default_sel =
2581            stats.estimate_equality_selectivity(&Label::new("Person"), "unknown_prop");
2582        assert!((default_sel - 0.1).abs() < 0.01);
2583        // Unknown label should return default 0.1
2584        let default_sel2 = stats.estimate_equality_selectivity(&Label::new("NotExist"), "city");
2585        assert!((default_sel2 - 0.1).abs() < 0.01);
2586    }
2587
2588    #[test]
2589    fn test_graph_statistics_format() {
2590        let mut store = GraphStore::new();
2591        let a = store.create_node("Person");
2592        let b = store.create_node("Company");
2593        store.create_edge(a, b, "WORKS_AT").unwrap();
2594
2595        let stats = store.compute_statistics();
2596        let formatted = stats.format();
2597        assert!(formatted.contains("Graph Statistics:"));
2598        assert!(formatted.contains("Total nodes: 2"));
2599        assert!(formatted.contains("Total edges: 1"));
2600        assert!(formatted.contains("Avg out-degree:"));
2601        assert!(formatted.contains(":Person"));
2602        assert!(formatted.contains(":Company"));
2603        assert!(formatted.contains(":WORKS_AT"));
2604    }
2605
2606    #[test]
2607    fn test_graph_statistics_property_null_fraction() {
2608        let mut store = GraphStore::new();
2609        // Create 4 Person nodes, only 2 have the "email" property
2610        for i in 0..4 {
2611            let id = store.create_node("Person");
2612            store.get_node_mut(id).unwrap().set_property(
2613                "name".to_string(),
2614                PropertyValue::String(format!("Person{}", i)),
2615            );
2616            if i < 2 {
2617                store.get_node_mut(id).unwrap().set_property(
2618                    "email".to_string(),
2619                    PropertyValue::String(format!("person{}@example.com", i)),
2620                );
2621            }
2622        }
2623        let stats = store.compute_statistics();
2624        // name should have null_fraction = 0.0 (all 4 have it)
2625        let name_stats = stats
2626            .property_stats
2627            .get(&(Label::new("Person"), "name".to_string()))
2628            .unwrap();
2629        assert_eq!(name_stats.null_fraction, 0.0);
2630        // email should have null_fraction = 0.5 (2 out of 4 have it)
2631        let email_stats = stats
2632            .property_stats
2633            .get(&(Label::new("Person"), "email".to_string()))
2634            .unwrap();
2635        assert!((email_stats.null_fraction - 0.5).abs() < 0.01);
2636    }
2637
2638    #[test]
2639    fn test_vector_search_with_data() {
2640        let store = GraphStore::new();
2641        // Create a 4-dimensional index
2642        store
2643            .create_vector_index(
2644                "Document",
2645                "embedding",
2646                4,
2647                crate::vector::DistanceMetric::Cosine,
2648            )
2649            .unwrap();
2650
2651        // Add some vectors
2652        let n1 = NodeId::new(1);
2653        let n2 = NodeId::new(2);
2654        let n3 = NodeId::new(3);
2655        let v1 = vec![1.0, 0.0, 0.0, 0.0];
2656        let v2 = vec![0.0, 1.0, 0.0, 0.0];
2657        let v3 = vec![0.9, 0.1, 0.0, 0.0]; // close to v1
2658
2659        store
2660            .vector_index
2661            .add_vector("Document", "embedding", n1, &v1)
2662            .unwrap();
2663        store
2664            .vector_index
2665            .add_vector("Document", "embedding", n2, &v2)
2666            .unwrap();
2667        store
2668            .vector_index
2669            .add_vector("Document", "embedding", n3, &v3)
2670            .unwrap();
2671
2672        // Search for vectors similar to v1
2673        let results = store
2674            .vector_search("Document", "embedding", &[1.0, 0.0, 0.0, 0.0], 2)
2675            .unwrap();
2676        assert_eq!(results.len(), 2);
2677        // n1 should be the closest (exact match)
2678        assert_eq!(results[0].0, n1);
2679    }
2680
2681    #[test]
2682    fn test_vector_search_nonexistent_index() {
2683        let store = GraphStore::new();
2684        // Search on a non-existent index should return empty results
2685        let results = store
2686            .vector_search("NoLabel", "noprop", &[1.0, 2.0], 5)
2687            .unwrap();
2688        assert!(results.is_empty());
2689    }
2690
2691    #[test]
2692    fn test_clear_thorough() {
2693        let mut store = GraphStore::new();
2694        let a = store.create_node("Person");
2695        let b = store.create_node("Company");
2696        store
2697            .set_node_property(
2698                "default",
2699                a,
2700                "name",
2701                PropertyValue::String("Alice".to_string()),
2702            )
2703            .unwrap();
2704        let eid = store.create_edge(a, b, "WORKS_AT").unwrap();
2705
2706        // Verify state before clear
2707        assert_eq!(store.node_count(), 2);
2708        assert_eq!(store.edge_count(), 1);
2709        assert_eq!(store.all_labels().len(), 2);
2710        assert_eq!(store.all_edge_types().len(), 1);
2711        assert!(store.has_node(a));
2712        assert!(store.has_edge(eid));
2713
2714        store.clear();
2715
2716        // Verify everything is cleaned up
2717        assert_eq!(store.node_count(), 0);
2718        assert_eq!(store.edge_count(), 0);
2719        assert!(store.all_labels().is_empty());
2720        assert!(store.all_edge_types().is_empty());
2721        assert!(!store.has_node(a));
2722        assert!(!store.has_edge(eid));
2723        assert!(store.get_nodes_by_label(&Label::new("Person")).is_empty());
2724        assert!(store
2725            .get_edges_by_type(&EdgeType::new("WORKS_AT"))
2726            .is_empty());
2727
2728        // After clear, creating new nodes should start from ID 1 again
2729        let new_node = store.create_node("NewLabel");
2730        assert_eq!(new_node, NodeId::new(1));
2731    }
2732
2733    #[test]
2734    fn test_delete_edge_verifies_edge_type_index_cleanup() {
2735        let mut store = GraphStore::new();
2736        let a = store.create_node("A");
2737        let b = store.create_node("B");
2738        let c = store.create_node("C");
2739
2740        let e1 = store.create_edge(a, b, "KNOWS").unwrap();
2741        let e2 = store.create_edge(a, c, "KNOWS").unwrap();
2742        assert_eq!(store.get_edges_by_type(&EdgeType::new("KNOWS")).len(), 2);
2743
2744        // Delete one edge
2745        store.delete_edge(e1).unwrap();
2746        assert_eq!(store.get_edges_by_type(&EdgeType::new("KNOWS")).len(), 1);
2747
2748        // Delete the other
2749        store.delete_edge(e2).unwrap();
2750        assert_eq!(store.get_edges_by_type(&EdgeType::new("KNOWS")).len(), 0);
2751    }
2752
2753    #[test]
2754    fn test_delete_edge_nonexistent() {
2755        let mut store = GraphStore::new();
2756        let result = store.delete_edge(EdgeId::new(999));
2757        assert_eq!(result, Err(GraphError::EdgeNotFound(EdgeId::new(999))));
2758    }
2759
2760    #[test]
2761    fn test_delete_node_nonexistent() {
2762        let mut store = GraphStore::new();
2763        let result = store.delete_node("default", NodeId::new(999));
2764        assert_eq!(result, Err(GraphError::NodeNotFound(NodeId::new(999))));
2765    }
2766
2767    #[test]
2768    fn test_delete_node_removes_from_label_index() {
2769        let mut store = GraphStore::new();
2770        let a = store.create_node("Person");
2771        let _b = store.create_node("Person");
2772        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 2);
2773
2774        store.delete_node("default", a).unwrap();
2775        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 1);
2776    }
2777
2778    #[test]
2779    fn test_delete_node_cascades_edges() {
2780        let mut store = GraphStore::new();
2781        let a = store.create_node("A");
2782        let b = store.create_node("B");
2783        let c = store.create_node("C");
2784        store.create_edge(a, b, "E1").unwrap();
2785        store.create_edge(c, a, "E2").unwrap();
2786
2787        assert_eq!(store.edge_count(), 2);
2788        store.delete_node("default", a).unwrap();
2789        assert_eq!(store.edge_count(), 0);
2790        // b and c should still exist
2791        assert!(store.has_node(b));
2792        assert!(store.has_node(c));
2793    }
2794
2795    #[test]
2796    fn test_edge_id_reuse() {
2797        let mut store = GraphStore::new();
2798        let a = store.create_node("A");
2799        let b = store.create_node("B");
2800        let c = store.create_node("C");
2801
2802        let e1 = store.create_edge(a, b, "X").unwrap();
2803        store.delete_edge(e1).unwrap();
2804
2805        // Next edge should reuse e1's ID
2806        let e2 = store.create_edge(a, c, "Y").unwrap();
2807        assert_eq!(e2, e1);
2808    }
2809
2810    #[test]
2811    fn test_insert_recovered_node() {
2812        let mut store = GraphStore::new();
2813        let node = Node::new(NodeId::new(10), Label::new("Recovered"));
2814
2815        store.insert_recovered_node(node);
2816        assert!(store.has_node(NodeId::new(10)));
2817        assert_eq!(store.get_nodes_by_label(&Label::new("Recovered")).len(), 1);
2818
2819        // Next node created should have ID > 10
2820        let new_id = store.create_node("New");
2821        assert!(new_id.as_u64() > 10);
2822    }
2823
2824    #[test]
2825    fn test_insert_recovered_edge() {
2826        let mut store = GraphStore::new();
2827        let a = store.create_node("A");
2828        let b = store.create_node("B");
2829
2830        let edge = Edge::new(EdgeId::new(50), a, b, EdgeType::new("RECOVERED"));
2831        store.insert_recovered_edge(edge).unwrap();
2832
2833        assert!(store.has_edge(EdgeId::new(50)));
2834        assert_eq!(store.get_outgoing_edges(a).len(), 1);
2835        assert_eq!(store.get_incoming_edges(b).len(), 1);
2836        assert_eq!(
2837            store.get_edges_by_type(&EdgeType::new("RECOVERED")).len(),
2838            1
2839        );
2840
2841        // Next edge should have ID > 50
2842        let new_eid = store.create_edge(a, b, "NEW").unwrap();
2843        assert!(new_eid.as_u64() > 50);
2844    }
2845
2846    #[test]
2847    fn test_insert_recovered_edge_invalid_source() {
2848        let mut store = GraphStore::new();
2849        let b = store.create_node("B");
2850        let edge = Edge::new(EdgeId::new(1), NodeId::new(999), b, EdgeType::new("E"));
2851        let result = store.insert_recovered_edge(edge);
2852        assert_eq!(result, Err(GraphError::InvalidEdgeSource(NodeId::new(999))));
2853    }
2854
2855    #[test]
2856    fn test_insert_recovered_edge_invalid_target() {
2857        let mut store = GraphStore::new();
2858        let a = store.create_node("A");
2859        let edge = Edge::new(EdgeId::new(1), a, NodeId::new(999), EdgeType::new("E"));
2860        let result = store.insert_recovered_edge(edge);
2861        assert_eq!(result, Err(GraphError::InvalidEdgeTarget(NodeId::new(999))));
2862    }
2863
2864    #[test]
2865    fn test_default_impl() {
2866        let store = GraphStore::default();
2867        assert_eq!(store.node_count(), 0);
2868        assert_eq!(store.edge_count(), 0);
2869    }
2870
2871    #[test]
2872    fn test_mvcc_cow_versioning() {
2873        let mut store = GraphStore::new();
2874        let id = store.create_node("Person");
2875        store
2876            .set_node_property(
2877                "default",
2878                id,
2879                "name",
2880                PropertyValue::String("Alice".to_string()),
2881            )
2882            .unwrap();
2883
2884        // Bump version to trigger COW
2885        store.current_version = 2;
2886        store
2887            .set_node_property(
2888                "default",
2889                id,
2890                "name",
2891                PropertyValue::String("Bob".to_string()),
2892            )
2893            .unwrap();
2894
2895        // Latest version should be Bob
2896        let latest = store.get_node(id).unwrap();
2897        assert_eq!(
2898            latest.get_property("name"),
2899            Some(&PropertyValue::String("Bob".to_string()))
2900        );
2901        assert_eq!(latest.version, 2);
2902
2903        // Version 1 should still be Alice
2904        let v1 = store.get_node_at_version(id, 1).unwrap();
2905        assert_eq!(
2906            v1.get_property("name"),
2907            Some(&PropertyValue::String("Alice".to_string()))
2908        );
2909        assert_eq!(v1.version, 1);
2910    }
2911
2912    #[test]
2913    fn test_get_outgoing_edge_targets_detail() {
2914        let mut store = GraphStore::new();
2915        let a = store.create_node("A");
2916        let b = store.create_node("B");
2917        let c = store.create_node("C");
2918        let e1 = store.create_edge(a, b, "KNOWS").unwrap();
2919        let e2 = store.create_edge(a, c, "LIKES").unwrap();
2920
2921        let targets = store.get_outgoing_edge_targets(a);
2922        assert_eq!(targets.len(), 2);
2923
2924        // Check that tuples contain correct (edge_id, source, target, edge_type)
2925        for (eid, src, tgt, etype) in &targets {
2926            assert_eq!(*src, a);
2927            if *eid == e1 {
2928                assert_eq!(*tgt, b);
2929                assert_eq!(etype.as_str(), "KNOWS");
2930            } else if *eid == e2 {
2931                assert_eq!(*tgt, c);
2932                assert_eq!(etype.as_str(), "LIKES");
2933            } else {
2934                panic!("Unexpected edge ID");
2935            }
2936        }
2937
2938        // Non-existent node returns empty
2939        let empty = store.get_outgoing_edge_targets(NodeId::new(9999));
2940        assert!(empty.is_empty());
2941    }
2942
2943    #[test]
2944    fn test_get_incoming_edge_sources_detail() {
2945        let mut store = GraphStore::new();
2946        let a = store.create_node("A");
2947        let b = store.create_node("B");
2948        let c = store.create_node("C");
2949        let e1 = store.create_edge(a, c, "KNOWS").unwrap();
2950        let e2 = store.create_edge(b, c, "LIKES").unwrap();
2951
2952        let sources = store.get_incoming_edge_sources(c);
2953        assert_eq!(sources.len(), 2);
2954
2955        for (eid, src, tgt, etype) in &sources {
2956            assert_eq!(*tgt, c);
2957            if *eid == e1 {
2958                assert_eq!(*src, a);
2959                assert_eq!(etype.as_str(), "KNOWS");
2960            } else if *eid == e2 {
2961                assert_eq!(*src, b);
2962                assert_eq!(etype.as_str(), "LIKES");
2963            } else {
2964                panic!("Unexpected edge ID");
2965            }
2966        }
2967
2968        // Non-existent node returns empty
2969        let empty = store.get_incoming_edge_sources(NodeId::new(9999));
2970        assert!(empty.is_empty());
2971    }
2972
2973    #[test]
2974    fn test_all_labels_empty() {
2975        let store = GraphStore::new();
2976        assert!(store.all_labels().is_empty());
2977    }
2978
2979    #[test]
2980    fn test_all_edge_types_empty() {
2981        let store = GraphStore::new();
2982        assert!(store.all_edge_types().is_empty());
2983    }
2984
2985    #[test]
2986    fn test_columnar_storage_integration() {
2987        let mut store = GraphStore::new();
2988        let id = store.create_node_with_properties("default", vec![Label::new("Person")], {
2989            let mut props = PropertyMap::new();
2990            props.insert(
2991                "name".to_string(),
2992                PropertyValue::String("Alice".to_string()),
2993            );
2994            props.insert("age".to_string(), PropertyValue::Integer(30));
2995            props
2996        });
2997
2998        // Verify columnar storage has the values
2999        let idx = id.as_u64() as usize;
3000        let name_col = store.node_columns.get_property(idx, "name");
3001        assert_eq!(name_col, PropertyValue::String("Alice".to_string()));
3002        let age_col = store.node_columns.get_property(idx, "age");
3003        assert_eq!(age_col, PropertyValue::Integer(30));
3004    }
3005
3006    #[test]
3007    fn test_edge_columnar_storage_integration() {
3008        let mut store = GraphStore::new();
3009        let a = store.create_node("A");
3010        let b = store.create_node("B");
3011        let mut props = std::collections::HashMap::new();
3012        props.insert("weight".to_string(), PropertyValue::Float(0.75));
3013        let eid = store
3014            .create_edge_with_properties(a, b, "WEIGHTED", props)
3015            .unwrap();
3016
3017        let idx = eid.as_u64() as usize;
3018        let weight_col = store.edge_columns.get_property(idx, "weight");
3019        assert_eq!(weight_col, PropertyValue::Float(0.75));
3020    }
3021
3022    #[test]
3023    fn test_set_node_property_updates_columnar_storage() {
3024        let mut store = GraphStore::new();
3025        let id = store.create_node("Person");
3026        store
3027            .set_node_property("default", id, "score", PropertyValue::Float(1.0))
3028            .unwrap();
3029
3030        let idx = id.as_u64() as usize;
3031        assert_eq!(
3032            store.node_columns.get_property(idx, "score"),
3033            PropertyValue::Float(1.0)
3034        );
3035
3036        // Update
3037        store
3038            .set_node_property("default", id, "score", PropertyValue::Float(2.5))
3039            .unwrap();
3040        assert_eq!(
3041            store.node_columns.get_property(idx, "score"),
3042            PropertyValue::Float(2.5)
3043        );
3044    }
3045
3046    #[test]
3047    fn test_get_nodes_by_label_after_deletions() {
3048        let mut store = GraphStore::new();
3049        let a = store.create_node("Person");
3050        let b_id = store.create_node("Person");
3051        let c = store.create_node("Person");
3052
3053        store.delete_node("default", b_id).unwrap();
3054        let persons = store.get_nodes_by_label(&Label::new("Person"));
3055        assert_eq!(persons.len(), 2);
3056        let ids: Vec<NodeId> = persons.iter().map(|n| n.id).collect();
3057        assert!(ids.contains(&a));
3058        assert!(ids.contains(&c));
3059        assert!(!ids.contains(&b_id));
3060    }
3061
3062    #[test]
3063    fn test_get_edges_by_type_after_deletion() {
3064        let mut store = GraphStore::new();
3065        let a = store.create_node("A");
3066        let b = store.create_node("B");
3067        let c = store.create_node("C");
3068        let e1 = store.create_edge(a, b, "KNOWS").unwrap();
3069        let e2 = store.create_edge(b, c, "KNOWS").unwrap();
3070
3071        store.delete_edge(e1).unwrap();
3072        let knows_edges = store.get_edges_by_type(&EdgeType::new("KNOWS"));
3073        assert_eq!(knows_edges.len(), 1);
3074        assert_eq!(knows_edges[0].id, e2);
3075    }
3076
3077    #[test]
3078    fn test_all_nodes_after_operations() {
3079        let mut store = GraphStore::new();
3080        let a = store.create_node("A");
3081        store.create_node("B");
3082        store.create_node("C");
3083        store.delete_node("default", a).unwrap();
3084
3085        let all = store.all_nodes();
3086        assert_eq!(all.len(), 2);
3087    }
3088
3089    #[test]
3090    fn test_compute_statistics_with_large_sample() {
3091        let mut store = GraphStore::new();
3092        // Create more than 1000 nodes to test sample limiting
3093        for i in 0..1100 {
3094            let id = store.create_node("BigLabel");
3095            store
3096                .get_node_mut(id)
3097                .unwrap()
3098                .set_property("idx".to_string(), PropertyValue::Integer(i));
3099        }
3100        let stats = store.compute_statistics();
3101        assert_eq!(stats.total_nodes, 1100);
3102        // Property stats should exist (sampled from first 1000)
3103        let idx_stats = stats
3104            .property_stats
3105            .get(&(Label::new("BigLabel"), "idx".to_string()));
3106        assert!(idx_stats.is_some());
3107        let ps = idx_stats.unwrap();
3108        // All sampled nodes have the property => null_fraction = 0
3109        assert_eq!(ps.null_fraction, 0.0);
3110        // distinct_count is from the sample (first 1000 nodes), each has unique value
3111        assert_eq!(ps.distinct_count, 1000);
3112    }
3113
3114    #[test]
3115    fn test_add_label_then_label_count() {
3116        let mut store = GraphStore::new();
3117        let id = store.create_node("Person");
3118        store.add_label_to_node("default", id, "Employee").unwrap();
3119
3120        assert_eq!(store.label_node_count(&Label::new("Person")), 1);
3121        assert_eq!(store.label_node_count(&Label::new("Employee")), 1);
3122    }
3123
3124    #[test]
3125    fn test_insert_recovered_node_with_multiple_labels() {
3126        let mut store = GraphStore::new();
3127        let mut node = Node::new(NodeId::new(5), Label::new("Person"));
3128        node.add_label(Label::new("Employee"));
3129
3130        store.insert_recovered_node(node);
3131        assert_eq!(store.get_nodes_by_label(&Label::new("Person")).len(), 1);
3132        assert_eq!(store.get_nodes_by_label(&Label::new("Employee")).len(), 1);
3133    }
3134
3135    #[test]
3136    fn test_graph_statistics_avg_out_degree() {
3137        let mut store = GraphStore::new();
3138        let a = store.create_node("A");
3139        let b = store.create_node("B");
3140        let c = store.create_node("C");
3141        let d = store.create_node("D");
3142        // a -> b, a -> c, a -> d (3 edges, 4 nodes)
3143        store.create_edge(a, b, "E").unwrap();
3144        store.create_edge(a, c, "E").unwrap();
3145        store.create_edge(a, d, "E").unwrap();
3146
3147        let stats = store.compute_statistics();
3148        assert!((stats.avg_out_degree - 0.75).abs() < 0.01); // 3/4 = 0.75
3149    }
3150
3151    #[test]
3152    fn test_self_loop_edge() {
3153        let mut store = GraphStore::new();
3154        let a = store.create_node("A");
3155        let eid = store.create_edge(a, a, "SELF").unwrap();
3156
3157        assert_eq!(store.get_outgoing_edges(a).len(), 1);
3158        assert_eq!(store.get_incoming_edges(a).len(), 1);
3159
3160        store.delete_edge(eid).unwrap();
3161        assert_eq!(store.get_outgoing_edges(a).len(), 0);
3162        assert_eq!(store.get_incoming_edges(a).len(), 0);
3163    }
3164
3165    #[test]
3166    fn test_get_outgoing_edges_nonexistent_node() {
3167        let store = GraphStore::new();
3168        let edges = store.get_outgoing_edges(NodeId::new(999));
3169        assert!(edges.is_empty());
3170    }
3171
3172    #[test]
3173    fn test_get_incoming_edges_nonexistent_node() {
3174        let store = GraphStore::new();
3175        let edges = store.get_incoming_edges(NodeId::new(999));
3176        assert!(edges.is_empty());
3177    }
3178
3179    #[test]
3180    fn test_get_nodes_by_label_nonexistent() {
3181        let store = GraphStore::new();
3182        let nodes = store.get_nodes_by_label(&Label::new("NoSuch"));
3183        assert!(nodes.is_empty());
3184    }
3185
3186    #[test]
3187    fn test_get_edges_by_type_nonexistent() {
3188        let store = GraphStore::new();
3189        let edges = store.get_edges_by_type(&EdgeType::new("NoSuch"));
3190        assert!(edges.is_empty());
3191    }
3192
3193    // ---- edge_between / edges_between tests (TDD) ----
3194
3195    #[test]
3196    fn test_edge_between_exists() {
3197        let mut store = GraphStore::new();
3198        let n1 = store.create_node("Person");
3199        let n2 = store.create_node("Person");
3200        let eid = store.create_edge(n1, n2, "KNOWS").unwrap();
3201
3202        assert_eq!(
3203            store.edge_between(n1, n2, Some(&EdgeType::new("KNOWS"))),
3204            Some(eid)
3205        );
3206    }
3207
3208    #[test]
3209    fn test_edge_between_not_exists() {
3210        let mut store = GraphStore::new();
3211        let n1 = store.create_node("Person");
3212        let n2 = store.create_node("Person");
3213
3214        assert_eq!(
3215            store.edge_between(n1, n2, Some(&EdgeType::new("KNOWS"))),
3216            None
3217        );
3218    }
3219
3220    #[test]
3221    fn test_edge_between_wrong_type() {
3222        let mut store = GraphStore::new();
3223        let n1 = store.create_node("Person");
3224        let n2 = store.create_node("Person");
3225        store.create_edge(n1, n2, "KNOWS").unwrap();
3226
3227        assert_eq!(
3228            store.edge_between(n1, n2, Some(&EdgeType::new("FOLLOWS"))),
3229            None
3230        );
3231    }
3232
3233    #[test]
3234    fn test_edge_between_any_type() {
3235        let mut store = GraphStore::new();
3236        let n1 = store.create_node("Person");
3237        let n2 = store.create_node("Person");
3238        let eid = store.create_edge(n1, n2, "KNOWS").unwrap();
3239
3240        // None edge_type means any type
3241        assert_eq!(store.edge_between(n1, n2, None), Some(eid));
3242    }
3243
3244    #[test]
3245    fn test_edge_between_reverse_direction() {
3246        let mut store = GraphStore::new();
3247        let n1 = store.create_node("Person");
3248        let n2 = store.create_node("Person");
3249        store.create_edge(n1, n2, "KNOWS").unwrap();
3250
3251        // Reverse direction should NOT find the edge
3252        assert_eq!(
3253            store.edge_between(n2, n1, Some(&EdgeType::new("KNOWS"))),
3254            None
3255        );
3256    }
3257
3258    #[test]
3259    fn test_edges_between_multi() {
3260        let mut store = GraphStore::new();
3261        let n1 = store.create_node("Person");
3262        let n2 = store.create_node("Person");
3263        let eid1 = store.create_edge(n1, n2, "KNOWS").unwrap();
3264        let eid2 = store.create_edge(n1, n2, "FOLLOWS").unwrap();
3265        let eid3 = store.create_edge(n1, n2, "KNOWS").unwrap();
3266
3267        // All edges between n1 and n2 (any type)
3268        let all = store.edges_between(n1, n2, None);
3269        assert_eq!(all.len(), 3);
3270
3271        // Only KNOWS edges
3272        let knows = store.edges_between(n1, n2, Some(&EdgeType::new("KNOWS")));
3273        assert_eq!(knows.len(), 2);
3274        assert!(knows.contains(&eid1));
3275        assert!(knows.contains(&eid3));
3276
3277        // Only FOLLOWS edges
3278        let follows = store.edges_between(n1, n2, Some(&EdgeType::new("FOLLOWS")));
3279        assert_eq!(follows.len(), 1);
3280        assert!(follows.contains(&eid2));
3281    }
3282
3283    // ---- Sorted adjacency list tests (Phase 1B TDD) ----
3284
3285    #[test]
3286    fn test_sorted_adjacency_insert_order() {
3287        let mut store = GraphStore::new();
3288        let n1 = store.create_node("Person");
3289        let n3 = store.create_node("Person");
3290        let n2 = store.create_node("Person");
3291
3292        // Insert edges to n3 first, then n2 — adjacency should still be sorted by target NodeId
3293        store.create_edge(n1, n3, "KNOWS").unwrap();
3294        store.create_edge(n1, n2, "KNOWS").unwrap();
3295
3296        // Outgoing from n1 should be sorted by target: n2 before n3
3297        let out = &store.outgoing[n1.as_u64() as usize];
3298        assert_eq!(out.len(), 2);
3299        assert!(
3300            out[0].0 <= out[1].0,
3301            "outgoing adjacency should be sorted by target NodeId"
3302        );
3303    }
3304
3305    #[test]
3306    fn test_sorted_adjacency_delete() {
3307        let mut store = GraphStore::new();
3308        let n1 = store.create_node("Person");
3309        let n2 = store.create_node("Person");
3310        let n3 = store.create_node("Person");
3311
3312        let e1 = store.create_edge(n1, n2, "KNOWS").unwrap();
3313        let _e2 = store.create_edge(n1, n3, "KNOWS").unwrap();
3314
3315        store.delete_edge(e1).unwrap();
3316
3317        let out = &store.outgoing[n1.as_u64() as usize];
3318        assert_eq!(out.len(), 1);
3319        assert_eq!(out[0].0, n3); // only n3 remains
3320    }
3321
3322    #[test]
3323    fn test_sorted_adjacency_multiple_edges_same_target() {
3324        let mut store = GraphStore::new();
3325        let n1 = store.create_node("Person");
3326        let n2 = store.create_node("Person");
3327
3328        let _e1 = store.create_edge(n1, n2, "KNOWS").unwrap();
3329        let _e2 = store.create_edge(n1, n2, "FOLLOWS").unwrap();
3330
3331        // Both edges should be to the same target
3332        let out = &store.outgoing[n1.as_u64() as usize];
3333        assert_eq!(out.len(), 2);
3334        assert_eq!(out[0].0, n2);
3335        assert_eq!(out[1].0, n2);
3336
3337        // edge_between with binary search should find the right one
3338        assert!(store
3339            .edge_between(n1, n2, Some(&EdgeType::new("KNOWS")))
3340            .is_some());
3341        assert!(store
3342            .edge_between(n1, n2, Some(&EdgeType::new("FOLLOWS")))
3343            .is_some());
3344    }
3345
3346    #[test]
3347    fn test_edge_between_binary_search_high_degree() {
3348        let mut store = GraphStore::new();
3349        let hub = store.create_node("Hub");
3350
3351        // Create 100 target nodes and edges from hub
3352        let mut targets = Vec::new();
3353        for _ in 0..100 {
3354            let t = store.create_node("Target");
3355            store.create_edge(hub, t, "LINKS").unwrap();
3356            targets.push(t);
3357        }
3358
3359        // Binary search should find any of them
3360        for &t in &targets {
3361            assert!(
3362                store
3363                    .edge_between(hub, t, Some(&EdgeType::new("LINKS")))
3364                    .is_some(),
3365                "edge_between should find edge to target {:?}",
3366                t
3367            );
3368        }
3369
3370        // Non-existent target
3371        let fake = NodeId::new(9999);
3372        assert!(store
3373            .edge_between(hub, fake, Some(&EdgeType::new("LINKS")))
3374            .is_none());
3375    }
3376}