Skip to main content

reddb_server/storage/engine/
graph_store.rs

1//! High-Performance Disk-Backed Graph Storage Engine
2//!
3//! A concurrent, page-based graph storage optimized for:
4//! - Lock-free reads with RwLock for concurrent traversal
5//! - Cache-friendly packed layouts for nodes and edges
6//! - B+ tree index for O(log n) edge lookups
7//! - Streaming iteration for large graphs
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌─────────────────────────────────────────────────────────────────┐
13//! │                       GraphStore                                 │
14//! ├─────────────────────────────────────────────────────────────────┤
15//! │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐        │
16//! │  │ NodeIndex│  │EdgeIndex │  │ NodePages│  │ EdgePages│        │
17//! │  │ (B+ Tree)│  │ (B+ Tree)│  │ (Packed) │  │ (Packed) │        │
18//! │  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘        │
19//! │       │              │             │             │              │
20//! │  ┌────▼──────────────▼─────────────▼─────────────▼────┐        │
21//! │  │                    Pager (4KB Pages)                │        │
22//! │  └────────────────────────────────────────────────────┘        │
23//! │                              │                                  │
24//! │  ┌───────────────────────────▼────────────────────────┐        │
25//! │  │              SIEVE PageCache (lock-free reads)      │        │
26//! │  └────────────────────────────────────────────────────┘        │
27//! └─────────────────────────────────────────────────────────────────┘
28//! ```
29
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
32use std::sync::Arc;
33use std::sync::RwLock;
34
35use super::page::{Page, PageType, PAGE_SIZE};
36
37/// Maximum key size for node/edge IDs
38pub const MAX_ID_SIZE: usize = reddb_file::GRAPH_MAX_ID_SIZE;
39
40/// Maximum label size
41pub const MAX_LABEL_SIZE: usize = reddb_file::GRAPH_MAX_LABEL_SIZE;
42
43/// V1 node record header size: id_len(2) + label_len(2) + type(1) + flags(1) + edge_count(4).
44/// Kept for [`StoredNode::decode_v1`]; new writes use [`NODE_HEADER_SIZE`].
45pub const NODE_HEADER_SIZE_V1: usize = reddb_file::GRAPH_NODE_HEADER_SIZE_V1;
46
47/// Node record header size: id_len(2) + label_len(2) + label_id(4) + flags(1) + edge_count(4).
48/// The 1-byte legacy `node_type` discriminant has been replaced by a 4-byte
49/// dynamic [`LabelId`] resolved through [`LabelRegistry`].
50pub const NODE_HEADER_SIZE: usize = reddb_file::GRAPH_NODE_HEADER_SIZE;
51
52/// TableRef size: table_id(2) + row_id(8)
53pub const TABLE_REF_SIZE: usize = reddb_file::GRAPH_TABLE_REF_SIZE;
54
55/// Node flag: has table reference
56pub const NODE_FLAG_HAS_TABLE_REF: u8 = reddb_file::GRAPH_NODE_FLAG_HAS_TABLE_REF;
57/// Node flag: has vector embedding reference
58pub const NODE_FLAG_HAS_VECTOR_REF: u8 = reddb_file::GRAPH_NODE_FLAG_HAS_VECTOR_REF;
59
60/// VectorRef size: collection_len(2) + vector_id(8) = 10 bytes header + variable collection name
61pub const VECTOR_REF_HEADER_SIZE: usize = reddb_file::GRAPH_VECTOR_REF_HEADER_SIZE;
62
63/// Reference to a table row (for linking graph nodes to tabular data)
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
65pub struct TableRef {
66    /// Table identifier (index into table registry)
67    pub table_id: u16,
68    /// Row ID within the table
69    pub row_id: u64,
70}
71
72impl TableRef {
73    /// Create a new table reference
74    pub fn new(table_id: u16, row_id: u64) -> Self {
75        Self { table_id, row_id }
76    }
77
78    /// Encode to bytes (10 bytes total)
79    pub fn encode(&self) -> [u8; TABLE_REF_SIZE] {
80        reddb_file::encode_graph_table_ref(reddb_file::GraphTableRef {
81            table_id: self.table_id,
82            row_id: self.row_id,
83        })
84    }
85
86    /// Decode from bytes
87    pub fn decode(data: &[u8]) -> Option<Self> {
88        let decoded = reddb_file::decode_graph_table_ref(data)?;
89        Some(Self {
90            table_id: decoded.table_id,
91            row_id: decoded.row_id,
92        })
93    }
94}
95
96/// V1 edge record header size: source_len(2) + target_len(2) + type(1) + weight(4).
97/// Kept for [`StoredEdge::decode_v1`]; new writes use [`EDGE_HEADER_SIZE`].
98pub const EDGE_HEADER_SIZE_V1: usize = reddb_file::GRAPH_EDGE_HEADER_SIZE_V1;
99
100/// Edge record header size: source_len(2) + target_len(2) + label_id(4) + weight(4).
101/// The 1-byte legacy `edge_type` discriminant has been replaced by a 4-byte
102/// dynamic [`LabelId`] resolved through [`LabelRegistry`].
103pub const EDGE_HEADER_SIZE: usize = reddb_file::GRAPH_EDGE_HEADER_SIZE;
104
105/// A graph node stored on disk
106#[derive(Debug, Clone)]
107pub struct StoredNode {
108    pub id: String,
109    pub label: String,
110    /// Canonical category label string (e.g. `"host"`, `"order"`). Resolved
111    /// from [`label_id`] at decode time via the legacy seed mapping.
112    /// Caller-visible string; the registry-stored [`label_id`] is the
113    /// source-of-truth identifier.
114    pub node_type: String,
115    /// Authoritative label identifier resolved through [`LabelRegistry`].
116    pub label_id: LabelId,
117    pub flags: u8,
118    pub out_edge_count: u32,
119    pub in_edge_count: u32,
120    /// Page ID where this node is stored
121    pub page_id: u32,
122    /// Slot index within the page
123    pub slot: u16,
124    /// Optional reference to a table row (for unified queries)
125    pub table_ref: Option<TableRef>,
126    /// Optional reference to a vector embedding (collection name, vector_id)
127    pub vector_ref: Option<(String, u64)>,
128}
129
130impl StoredNode {
131    /// Encode node to bytes in v2 format (label_id replaces node_type).
132    pub fn encode(&self) -> Vec<u8> {
133        reddb_file::encode_graph_node_record_v2(&self.as_file_record())
134    }
135
136    /// Decode node from bytes (v2 format). For v1 records use [`decode_v1`].
137    pub fn decode(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
138        let record = reddb_file::decode_graph_node_record_v2(data)?;
139        let label_id = LabelId::new(record.label_id);
140        // Derive legacy node_type from label_id for back-compat with callers
141        // that still read the field. PR3 removes this field entirely.
142        let node_type = label_id_to_node_label(label_id);
143        Some(Self::from_file_record(
144            record, page_id, slot, node_type, label_id,
145        ))
146    }
147
148    /// Decode a v1 (legacy) node record. The caller must supply a
149    /// [`LabelRegistry`] seeded via [`LabelRegistry::with_legacy_seed`] so
150    /// the legacy `node_type` discriminant maps to the correct reserved
151    /// [`LabelId`].
152    pub fn decode_v1(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
153        let record = reddb_file::decode_graph_node_record_v1(data)?;
154        // V1 records carry the legacy enum discriminant; reject any byte
155        // outside the 9-variant range so we do not silently misinterpret
156        // unrelated bytes as a node-type.
157        if record.legacy_type > 8 {
158            return None;
159        }
160        let label_id = LabelRegistry::legacy_node_label_id(record.legacy_type);
161        let node_type = label_id_to_node_label(label_id);
162        Some(Self::from_legacy_file_record(
163            record, page_id, slot, node_type, label_id,
164        ))
165    }
166
167    fn as_file_record(&self) -> reddb_file::GraphNodeRecord {
168        reddb_file::GraphNodeRecord {
169            id: self.id.clone(),
170            label: self.label.clone(),
171            label_id: self.label_id.as_u32(),
172            flags: self.flags,
173            out_edge_count: self.out_edge_count,
174            in_edge_count: self.in_edge_count,
175            table_ref: self.table_ref.map(|t| reddb_file::GraphTableRef {
176                table_id: t.table_id,
177                row_id: t.row_id,
178            }),
179            vector_ref: self.vector_ref.as_ref().map(|(collection, vector_id)| {
180                reddb_file::GraphVectorRef {
181                    collection: collection.clone(),
182                    vector_id: *vector_id,
183                }
184            }),
185        }
186    }
187
188    fn from_file_record(
189        record: reddb_file::GraphNodeRecord,
190        page_id: u32,
191        slot: u16,
192        node_type: String,
193        label_id: LabelId,
194    ) -> Self {
195        Self {
196            id: record.id,
197            label: record.label,
198            node_type,
199            label_id,
200            flags: record.flags,
201            out_edge_count: record.out_edge_count,
202            in_edge_count: record.in_edge_count,
203            page_id,
204            slot,
205            table_ref: record
206                .table_ref
207                .map(|t| TableRef::new(t.table_id, t.row_id)),
208            vector_ref: record.vector_ref.map(|v| (v.collection, v.vector_id)),
209        }
210    }
211
212    fn from_legacy_file_record(
213        record: reddb_file::LegacyGraphNodeRecord,
214        page_id: u32,
215        slot: u16,
216        node_type: String,
217        label_id: LabelId,
218    ) -> Self {
219        Self {
220            id: record.id,
221            label: record.label,
222            node_type,
223            label_id,
224            flags: record.flags,
225            out_edge_count: record.out_edge_count,
226            in_edge_count: record.in_edge_count,
227            page_id,
228            slot,
229            table_ref: record
230                .table_ref
231                .map(|t| TableRef::new(t.table_id, t.row_id)),
232            vector_ref: record.vector_ref.map(|v| (v.collection, v.vector_id)),
233        }
234    }
235
236    /// Calculate encoded size
237    pub fn encoded_size(&self) -> usize {
238        reddb_file::graph_node_record_v2_encoded_size(&self.as_file_record())
239    }
240
241    /// Link this node to a table row
242    pub fn link_to_row(&mut self, table_id: u16, row_id: u64) {
243        self.table_ref = Some(TableRef::new(table_id, row_id));
244    }
245
246    /// Unlink from table row
247    pub fn unlink_from_row(&mut self) {
248        self.table_ref = None;
249    }
250
251    /// Link this node to a vector embedding
252    pub fn link_to_vector(&mut self, collection: String, vector_id: u64) {
253        self.vector_ref = Some((collection, vector_id));
254    }
255
256    /// Unlink from vector embedding
257    pub fn unlink_from_vector(&mut self) {
258        self.vector_ref = None;
259    }
260
261    /// Check if this node is linked to a table row
262    pub fn is_linked(&self) -> bool {
263        self.table_ref.is_some()
264    }
265}
266
267/// A graph edge stored on disk
268#[derive(Debug, Clone)]
269pub struct StoredEdge {
270    pub source_id: String,
271    pub target_id: String,
272    /// Canonical edge label string. Derived from [`label_id`] at decode time.
273    pub edge_type: String,
274    /// Authoritative label identifier resolved through [`LabelRegistry`].
275    pub label_id: LabelId,
276    pub weight: f32,
277    /// Page ID where this edge is stored
278    pub page_id: u32,
279    /// Slot index within the page
280    pub slot: u16,
281}
282
283impl StoredEdge {
284    /// Encode edge to bytes (v2 format).
285    pub fn encode(&self) -> Vec<u8> {
286        reddb_file::encode_graph_edge_record_v2(&self.as_file_record())
287    }
288
289    /// Decode edge from bytes (v2 format). For v1 records use [`decode_v1`].
290    pub fn decode(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
291        let record = reddb_file::decode_graph_edge_record_v2(data)?;
292        let label_id = LabelId::new(record.label_id);
293        let edge_type = label_id_to_edge_label(label_id);
294        Some(Self::from_file_record(
295            record, page_id, slot, edge_type, label_id,
296        ))
297    }
298
299    /// Decode a v1 (legacy) edge record. The 1-byte enum discriminant maps
300    /// to the legacy reserved [`LabelId`] range via
301    /// [`LabelRegistry::legacy_edge_label_id`].
302    pub fn decode_v1(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
303        let record = reddb_file::decode_graph_edge_record_v1(data)?;
304        if record.legacy_type > 9 {
305            return None;
306        }
307        let label_id = LabelRegistry::legacy_edge_label_id(record.legacy_type);
308        let edge_type = label_id_to_edge_label(label_id);
309        Some(Self::from_legacy_file_record(
310            record, page_id, slot, edge_type, label_id,
311        ))
312    }
313
314    /// Calculate encoded size (v2)
315    pub fn encoded_size(&self) -> usize {
316        reddb_file::graph_edge_record_v2_encoded_size(&self.as_file_record())
317    }
318
319    fn as_file_record(&self) -> reddb_file::GraphEdgeRecord {
320        reddb_file::GraphEdgeRecord {
321            source_id: self.source_id.clone(),
322            target_id: self.target_id.clone(),
323            label_id: self.label_id.as_u32(),
324            weight: self.weight,
325        }
326    }
327
328    fn from_file_record(
329        record: reddb_file::GraphEdgeRecord,
330        page_id: u32,
331        slot: u16,
332        edge_type: String,
333        label_id: LabelId,
334    ) -> Self {
335        Self {
336            source_id: record.source_id,
337            target_id: record.target_id,
338            edge_type,
339            label_id,
340            weight: record.weight,
341            page_id,
342            slot,
343        }
344    }
345
346    fn from_legacy_file_record(
347        record: reddb_file::LegacyGraphEdgeRecord,
348        page_id: u32,
349        slot: u16,
350        edge_type: String,
351        label_id: LabelId,
352    ) -> Self {
353        Self {
354            source_id: record.source_id,
355            target_id: record.target_id,
356            edge_type,
357            label_id,
358            weight: record.weight,
359            page_id,
360            slot,
361        }
362    }
363}
364
365/// Resolve a [`LabelId`] in the legacy reserved range to its canonical
366/// category string. For non-legacy IDs (≥ [`FIRST_USER_LABEL_ID`]) returns
367/// `format!("label_{}", id)` — a non-crashing placeholder that flags the
368/// caller is reading a record without a registry handle. Real callers
369/// should resolve through [`LabelRegistry`] when accuracy matters.
370fn label_id_to_node_label(id: LabelId) -> String {
371    match id.as_u32() {
372        1 => "host".to_string(),
373        2 => "service".to_string(),
374        3 => "credential".to_string(),
375        4 => "vulnerability".to_string(),
376        5 => "endpoint".to_string(),
377        6 => "technology".to_string(),
378        7 => "user".to_string(),
379        8 => "domain".to_string(),
380        9 => "certificate".to_string(),
381        n => format!("label_{}", n),
382    }
383}
384
385/// Resolve a [`LabelId`] in the legacy reserved edge range to its canonical
386/// edge label string.
387fn label_id_to_edge_label(id: LabelId) -> String {
388    match id.as_u32() {
389        10 => "has_service".to_string(),
390        11 => "has_endpoint".to_string(),
391        12 => "uses_tech".to_string(),
392        13 => "auth_access".to_string(),
393        14 => "affected_by".to_string(),
394        15 => "contains".to_string(),
395        16 => "connects_to".to_string(),
396        17 => "related_to".to_string(),
397        18 => "has_user".to_string(),
398        19 => "has_cert".to_string(),
399        n => format!("label_{}", n),
400    }
401}
402
403/// Location of a record in the graph store
404#[derive(Debug, Clone, Copy)]
405pub struct RecordLocation {
406    pub page_id: u32,
407    pub slot: u16,
408}
409
410/// Graph statistics
411#[derive(Debug, Clone, Default)]
412pub struct GraphStats {
413    pub node_count: u64,
414    pub edge_count: u64,
415    pub node_pages: u32,
416    pub edge_pages: u32,
417    /// Cardinality per category label (e.g. `"host" → 42`). Replaces the
418    /// closed-enum `nodes_by_type: [u64; 9]` from earlier revisions.
419    pub nodes_by_label: HashMap<String, u64>,
420    /// Cardinality per edge label.
421    pub edges_by_label: HashMap<String, u64>,
422}
423
424/// Concurrent in-memory index for fast lookups
425/// Uses sharded locks for reduced contention
426pub struct ShardedIndex<V> {
427    shards: Vec<RwLock<HashMap<String, V>>>,
428    shard_count: usize,
429}
430
431impl<V: Clone> ShardedIndex<V> {
432    pub fn new(shard_count: usize) -> Self {
433        let shards = (0..shard_count)
434            .map(|_| RwLock::new(HashMap::new()))
435            .collect();
436        Self {
437            shards,
438            shard_count,
439        }
440    }
441
442    #[inline]
443    fn shard_for(&self, key: &str) -> usize {
444        // Simple hash-based sharding
445        let hash: u64 = key
446            .bytes()
447            .fold(0u64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u64));
448        (hash as usize) % self.shard_count
449    }
450
451    pub fn get(&self, key: &str) -> Option<V> {
452        let shard = self.shard_for(key);
453        self.shards[shard].read().ok()?.get(key).cloned()
454    }
455
456    pub fn insert(&self, key: String, value: V) {
457        let shard = self.shard_for(&key);
458        if let Ok(mut guard) = self.shards[shard].write() {
459            guard.insert(key, value);
460        }
461    }
462
463    pub fn remove(&self, key: &str) -> Option<V> {
464        let shard = self.shard_for(key);
465        self.shards[shard].write().ok()?.remove(key)
466    }
467
468    pub fn contains(&self, key: &str) -> bool {
469        let shard = self.shard_for(key);
470        self.shards[shard]
471            .read()
472            .ok()
473            .map(|g| g.contains_key(key))
474            .unwrap_or(false)
475    }
476
477    pub fn len(&self) -> usize {
478        self.shards
479            .iter()
480            .filter_map(|s| s.read().ok())
481            .map(|g| g.len())
482            .sum()
483    }
484
485    pub fn is_empty(&self) -> bool {
486        self.len() == 0
487    }
488}
489
490/// Edge index key: `(source_id, edge_label)` → `Vec<target_id>`.
491/// Optimized for adjacency list queries; the edge label is the canonical
492/// string form (e.g. `"connects_to"`) — use the registry to resolve back to
493/// a [`LabelId`] when needed.
494pub struct EdgeIndex {
495    /// Forward edges: source → `[(edge_label, target, weight)]`
496    forward: ShardedIndex<Vec<(String, String, f32)>>,
497    /// Backward edges: target → `[(edge_label, source, weight)]`
498    backward: ShardedIndex<Vec<(String, String, f32)>>,
499}
500
501impl EdgeIndex {
502    pub fn new(shard_count: usize) -> Self {
503        Self {
504            forward: ShardedIndex::new(shard_count),
505            backward: ShardedIndex::new(shard_count),
506        }
507    }
508
509    pub fn add_edge(&self, source: &str, target: &str, edge_label: &str, weight: f32) {
510        let shard = self.forward.shard_for(source);
511        if let Ok(mut guard) = self.forward.shards[shard].write() {
512            guard
513                .entry(source.to_string())
514                .or_insert_with(Vec::new)
515                .push((edge_label.to_string(), target.to_string(), weight));
516        }
517
518        let shard = self.backward.shard_for(target);
519        if let Ok(mut guard) = self.backward.shards[shard].write() {
520            guard
521                .entry(target.to_string())
522                .or_insert_with(Vec::new)
523                .push((edge_label.to_string(), source.to_string(), weight));
524        }
525    }
526
527    pub fn remove_edge(&self, source: &str, target: &str, edge_label: &str) {
528        let shard = self.forward.shard_for(source);
529        if let Ok(mut guard) = self.forward.shards[shard].write() {
530            if let Some(edges) = guard.get_mut(source) {
531                edges.retain(|(et, t, _)| !(et == edge_label && t == target));
532            }
533        }
534
535        let shard = self.backward.shard_for(target);
536        if let Ok(mut guard) = self.backward.shards[shard].write() {
537            if let Some(edges) = guard.get_mut(target) {
538                edges.retain(|(et, s, _)| !(et == edge_label && s == source));
539            }
540        }
541    }
542
543    pub fn outgoing(&self, source: &str) -> Vec<(String, String, f32)> {
544        self.forward.get(source).unwrap_or_default()
545    }
546
547    pub fn incoming(&self, target: &str) -> Vec<(String, String, f32)> {
548        self.backward.get(target).unwrap_or_default()
549    }
550
551    pub fn outgoing_of_type(&self, source: &str, edge_label: &str) -> Vec<(String, f32)> {
552        self.forward
553            .get(source)
554            .unwrap_or_default()
555            .into_iter()
556            .filter(|(et, _, _)| et == edge_label)
557            .map(|(_, t, w)| (t, w))
558            .collect()
559    }
560}
561
562/// High-performance graph storage engine
563///
564/// Provides concurrent read access with minimal locking overhead.
565/// Writes are serialized through a write lock but reads can proceed in parallel.
566pub struct GraphStore {
567    /// Node index: node_id -> location
568    node_index: ShardedIndex<RecordLocation>,
569    /// Edge index: adjacency lists
570    edge_index: EdgeIndex,
571    /// Secondary inverted indexes on (type, label) for O(1) non-id lookups.
572    /// Avoids full node-page scans in `nodes_of_type` / `nodes_by_label`.
573    ///
574    /// Stored as `Arc` so [`GraphStore::publish_indexes`] can share the
575    /// exact live index with an [`crate::storage::index::IndexRegistry`]
576    /// instead of handing out a frozen snapshot.
577    node_secondary: std::sync::Arc<secondary_index::NodeSecondaryIndex>,
578    /// Dynamic label catalog. Resolves user-supplied label strings to
579    /// stable [`LabelId`] u32 values used in the v2 page format.
580    pub registry: Arc<LabelRegistry>,
581    /// Node pages (packed node records)
582    node_pages: RwLock<Vec<Page>>,
583    /// Edge pages (packed edge records)
584    edge_pages: RwLock<Vec<Page>>,
585    /// Current node page for inserts
586    current_node_page: AtomicU32,
587    /// Current edge page for inserts
588    current_edge_page: AtomicU32,
589    /// Statistics
590    stats: GraphStats,
591    node_count: AtomicU64,
592    edge_count: AtomicU64,
593}
594
595#[path = "graph_store/impl.rs"]
596mod graph_store_impl;
597pub mod label_registry;
598pub mod secondary_index;
599pub use label_registry::{
600    LabelId, LabelRegistry, LabelRegistryError, Namespace, FIRST_USER_LABEL_ID, MAX_LABEL_LEN,
601    UNSET_LABEL_ID,
602};
603pub use secondary_index::NodeSecondaryIndex;
604impl Default for GraphStore {
605    fn default() -> Self {
606        Self::new()
607    }
608}
609
610/// Iterator over all nodes in the graph
611pub struct NodeIterator<'a> {
612    store: &'a GraphStore,
613    page_idx: usize,
614    cell_idx: usize,
615}
616
617impl<'a> Iterator for NodeIterator<'a> {
618    type Item = StoredNode;
619
620    fn next(&mut self) -> Option<Self::Item> {
621        let pages = self.store.node_pages.read().ok()?;
622
623        loop {
624            if self.page_idx >= pages.len() {
625                return None;
626            }
627
628            let page = &pages[self.page_idx];
629            let cell_count = page.cell_count() as usize;
630
631            if self.cell_idx >= cell_count {
632                self.page_idx += 1;
633                self.cell_idx = 0;
634                continue;
635            }
636
637            if let Ok((_, value)) = page.read_cell(self.cell_idx) {
638                self.cell_idx += 1;
639                if let Some(node) =
640                    StoredNode::decode(&value, self.page_idx as u32, (self.cell_idx - 1) as u16)
641                {
642                    return Some(node);
643                }
644            } else {
645                self.cell_idx += 1;
646            }
647        }
648    }
649}
650
651/// Graph store errors
652#[derive(Debug, Clone)]
653pub enum GraphStoreError {
654    NodeExists(String),
655    NodeNotFound(String),
656    EdgeNotFound(String, String),
657    PageFull,
658    LockPoisoned,
659    InvalidData(String),
660    IoError(String),
661}
662
663impl std::fmt::Display for GraphStoreError {
664    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665        match self {
666            Self::NodeExists(id) => write!(f, "Node already exists: {}", id),
667            Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
668            Self::EdgeNotFound(s, t) => write!(f, "Edge not found: {} -> {}", s, t),
669            Self::PageFull => write!(f, "Page is full"),
670            Self::LockPoisoned => write!(f, "Lock poisoned"),
671            Self::InvalidData(msg) => write!(f, "Invalid data: {}", msg),
672            Self::IoError(msg) => write!(f, "I/O error: {}", msg),
673        }
674    }
675}
676
677impl std::error::Error for GraphStoreError {}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682    use std::sync::Arc;
683
684    #[test]
685    fn test_graph_store_basic() {
686        let store = GraphStore::new();
687
688        // Add nodes
689        store
690            .add_node_with_label("host:192.168.1.1", "Web Server", "host")
691            .unwrap();
692        store
693            .add_node_with_label("host:192.168.1.2", "Database", "host")
694            .unwrap();
695        store
696            .add_node_with_label("service:192.168.1.1:80:http", "HTTP", "service")
697            .unwrap();
698
699        assert_eq!(store.node_count(), 3);
700
701        // Add edges
702        store
703            .add_edge_with_label(
704                "host:192.168.1.1",
705                "service:192.168.1.1:80:http",
706                "has_service",
707                1.0,
708            )
709            .unwrap();
710        store
711            .add_edge_with_label("host:192.168.1.1", "host:192.168.1.2", "connects_to", 1.0)
712            .unwrap();
713
714        assert_eq!(store.edge_count(), 2);
715
716        // Query
717        let node = store.get_node("host:192.168.1.1").unwrap();
718        assert_eq!(node.label, "Web Server");
719
720        let out_edges = store.outgoing_edges("host:192.168.1.1");
721        assert_eq!(out_edges.len(), 2);
722    }
723
724    #[test]
725    fn test_graph_store_serialization() {
726        let store = GraphStore::new();
727
728        store
729            .add_node_with_label("host:10.0.0.1", "Server A", "host")
730            .unwrap();
731        store
732            .add_node_with_label("host:10.0.0.2", "Server B", "host")
733            .unwrap();
734        store
735            .add_edge_with_label("host:10.0.0.1", "host:10.0.0.2", "connects_to", 0.5)
736            .unwrap();
737
738        // Serialize
739        let bytes = store.serialize();
740
741        // Deserialize
742        let restored = GraphStore::deserialize(&bytes).unwrap();
743
744        assert_eq!(restored.node_count(), 2);
745        assert_eq!(restored.edge_count(), 1);
746
747        let node = restored.get_node("host:10.0.0.1").unwrap();
748        assert_eq!(node.label, "Server A");
749    }
750
751    #[test]
752    fn test_concurrent_reads() {
753        use std::thread;
754
755        let store = Arc::new(GraphStore::new());
756
757        // Add some data
758        for i in 0..100 {
759            store
760                .add_node_with_label(&format!("host:{}", i), &format!("Host {}", i), "host")
761                .unwrap();
762        }
763
764        // Spawn multiple reader threads
765        let mut handles = vec![];
766        for _ in 0..4 {
767            let store_clone = Arc::clone(&store);
768            handles.push(thread::spawn(move || {
769                for i in 0..100 {
770                    let _ = store_clone.get_node(&format!("host:{}", i));
771                }
772            }));
773        }
774
775        for handle in handles {
776            handle.join().unwrap();
777        }
778
779        assert_eq!(store.node_count(), 100);
780    }
781
782    #[test]
783    fn test_edge_index_performance() {
784        let store = GraphStore::new();
785
786        // Create a graph with many edges
787        store
788            .add_node_with_label("hub", "Hub Node", "host")
789            .unwrap();
790        for i in 0..100 {
791            store
792                .add_node_with_label(&format!("spoke:{}", i), &format!("Spoke {}", i), "host")
793                .unwrap();
794            store
795                .add_edge_with_label("hub", &format!("spoke:{}", i), "connects_to", 1.0)
796                .unwrap();
797        }
798
799        // Query outgoing edges (should be fast with index)
800        let edges = store.outgoing_edges("hub");
801        assert_eq!(edges.len(), 100);
802    }
803
804    #[test]
805    fn test_nodes_of_type_uses_secondary_index() {
806        let store = GraphStore::new();
807        store
808            .add_node_with_label("host:1", "Web Server", "host")
809            .unwrap();
810        store
811            .add_node_with_label("host:2", "DB Server", "host")
812            .unwrap();
813        store
814            .add_node_with_label("svc:1", "HTTP", "service")
815            .unwrap();
816        store
817            .add_node_with_label("vuln:1", "CVE-2024-1", "vulnerability")
818            .unwrap();
819
820        let hosts = store.nodes_with_category("host");
821        assert_eq!(hosts.len(), 2);
822        assert!(hosts.iter().all(|n| n.node_type == "host"));
823
824        let services = store.nodes_with_category("service");
825        assert_eq!(services.len(), 1);
826        assert_eq!(services[0].id, "svc:1");
827
828        assert_eq!(store.nodes_with_category("user").len(), 0);
829    }
830
831    #[test]
832    fn test_nodes_by_label_with_bloom_prune() {
833        let store = GraphStore::new();
834        store
835            .add_node_with_label("host:1", "Edge Router", "host")
836            .unwrap();
837        store
838            .add_node_with_label("host:2", "Edge Router", "host")
839            .unwrap();
840        store
841            .add_node_with_label("host:3", "Core Switch", "host")
842            .unwrap();
843
844        let routers = store.nodes_by_label("Edge Router");
845        assert_eq!(routers.len(), 2);
846
847        let unknown = store.nodes_by_label("Quantum Router 9000");
848        assert!(unknown.is_empty());
849        // Bloom is allowed to false-positive but must never hide real labels.
850        assert!(store.may_contain_label("Edge Router"));
851        assert!(store.may_contain_label("Core Switch"));
852    }
853
854    #[test]
855    fn test_publish_indexes_to_registry() {
856        use crate::storage::index::{IndexKind, IndexRegistry, IndexScope};
857
858        let store = GraphStore::new();
859        store.add_node_with_label("h:1", "Alpha", "host").unwrap();
860        store.add_node_with_label("h:2", "Beta", "host").unwrap();
861        store
862            .add_node_with_label("svc:1", "HTTP", "service")
863            .unwrap();
864
865        let registry = IndexRegistry::new();
866        store.publish_indexes(&registry, "infra");
867
868        let shared = registry.get(&IndexScope::graph("infra")).unwrap();
869        let stats = shared.stats();
870        // Two scopes × each insert = by_type + by_label per node
871        // 3 inserts × 2 scopes = 6 entries
872        assert_eq!(stats.entries, 6);
873        assert_eq!(stats.kind, IndexKind::Inverted);
874        assert!(stats.has_bloom);
875
876        // Live updates are visible through the registry since both sides
877        // share the same Arc<NodeSecondaryIndex>.
878        store.add_node_with_label("h:3", "Gamma", "host").unwrap();
879        let updated = registry.get(&IndexScope::graph("infra")).unwrap().stats();
880        assert_eq!(updated.entries, 8);
881    }
882
883    #[test]
884    fn test_secondary_index_rebuilt_after_deserialize() {
885        let store = GraphStore::new();
886        store
887            .add_node_with_label("host:1", "Alpha", "host")
888            .unwrap();
889        store
890            .add_node_with_label("svc:1", "HTTP", "service")
891            .unwrap();
892
893        let bytes = store.serialize();
894        let restored = GraphStore::deserialize(&bytes).unwrap();
895
896        assert_eq!(restored.nodes_with_category("host").len(), 1);
897        assert_eq!(restored.nodes_by_label("HTTP").len(), 1);
898        assert!(restored.may_contain_label("Alpha"));
899    }
900
901    #[test]
902    fn test_node_iteration() {
903        let store = GraphStore::new();
904
905        for i in 0..50 {
906            store
907                .add_node_with_label(&format!("node:{}", i), &format!("Node {}", i), "host")
908                .unwrap();
909        }
910
911        let nodes: Vec<_> = store.iter_nodes().collect();
912        assert_eq!(nodes.len(), 50);
913    }
914
915    #[test]
916    fn legacy_node_type_interns_into_registry() {
917        let store = GraphStore::new();
918        store.add_node_with_label("h1", "web", "host").unwrap();
919        // Adding via the legacy enum must intern its as_str() name.
920        let id = store
921            .registry
922            .lookup(label_registry::Namespace::Node, "host")
923            .expect("legacy enum name should be interned");
924        let fetched = store.get_node("h1").unwrap();
925        assert_eq!(fetched.label_id, id);
926        assert_eq!(fetched.node_type, "host");
927    }
928
929    #[test]
930    fn v2_round_trip_preserves_user_labels() {
931        let store = GraphStore::new();
932        // Intern a non-legacy user label and add a node referencing it via
933        // the legacy bridge (Host) — exercises the full v2 encode path.
934        let user_id = store.intern_node_label("order").unwrap();
935        assert!(user_id.as_u32() >= label_registry::FIRST_USER_LABEL_ID);
936
937        store.add_node_with_label("h1", "web-1", "host").unwrap();
938        store.add_node_with_label("h2", "web-2", "service").unwrap();
939        store
940            .add_edge_with_label("h1", "h2", "connects_to", 1.0)
941            .unwrap();
942
943        let bytes = store.serialize();
944        let frame = reddb_file::decode_graph_store_frame(&bytes, PAGE_SIZE).unwrap();
945        assert_eq!(frame.version, reddb_file::GRAPH_STORE_VERSION_V2);
946
947        let restored = GraphStore::deserialize(&bytes).unwrap();
948        // Registry survived.
949        assert_eq!(
950            restored
951                .registry
952                .lookup(label_registry::Namespace::Node, "order"),
953            Some(user_id)
954        );
955        // Records decoded with v2 label_id intact.
956        let h1 = restored.get_node("h1").unwrap();
957        assert_eq!(h1.node_type, "host");
958        assert_eq!(
959            h1.label_id,
960            restored
961                .registry
962                .lookup(label_registry::Namespace::Node, "host")
963                .unwrap()
964        );
965        // Edge index rebuilt.
966        let outgoing = restored.outgoing_edges("h1");
967        assert_eq!(outgoing.len(), 1);
968        assert_eq!(outgoing[0].0, "connects_to");
969    }
970
971    #[test]
972    fn v1_blob_deserializes_via_legacy_path() {
973        // Hand-craft a minimal v1 file: header (24 bytes, version=1) + 1
974        // node page + 1 edge page. The node page contains one v1 record:
975        //   header_v1 = id_len(2) "n1" label_len(2) "L" type(1=Host) flags(0) out(0) in(0)
976        //   payload   = "n1" "L"
977        // The edge page contains one v1 edge:
978        //   header_v1 = src_len(2) "n1" tgt_len(2) "n1" type(0=HasService) weight(1.0)
979        //   payload   = "n1" "n1"
980        //
981        // Page::insert_cell handles the cell layout for us, so we build
982        // pages programmatically rather than poking at raw page bytes.
983        let mut node_page = Page::new(PageType::GraphNode, 0);
984        // Build a v1 node record.
985        let mut v1_node = Vec::new();
986        v1_node.extend_from_slice(&2u16.to_le_bytes()); // id_len
987        v1_node.extend_from_slice(&1u16.to_le_bytes()); // label_len
988        v1_node.push(0); // "host" (disc=0)
989        v1_node.push(0); // flags
990        v1_node.extend_from_slice(&0u16.to_le_bytes()); // out_edge_count
991        v1_node.extend_from_slice(&0u16.to_le_bytes()); // in_edge_count
992        v1_node.extend_from_slice(b"n1");
993        v1_node.extend_from_slice(b"L");
994        node_page.insert_cell(b"n1", &v1_node).unwrap();
995
996        let mut edge_page = Page::new(PageType::GraphEdge, 0);
997        let mut v1_edge = Vec::new();
998        v1_edge.extend_from_slice(&2u16.to_le_bytes()); // source_len
999        v1_edge.extend_from_slice(&2u16.to_le_bytes()); // target_len
1000        v1_edge.push(0); // "has_service" (disc=0)
1001        v1_edge.extend_from_slice(&1.0f32.to_le_bytes()); // weight
1002        v1_edge.extend_from_slice(b"n1");
1003        v1_edge.extend_from_slice(b"n1");
1004        edge_page.insert_cell(b"n1|0|n1", &v1_edge).unwrap();
1005
1006        let bytes = reddb_file::encode_graph_store_frame(&reddb_file::GraphStoreFrame {
1007            version: reddb_file::GRAPH_STORE_VERSION_V1,
1008            node_count: 1,
1009            edge_count: 1,
1010            registry_bytes: None,
1011            node_pages: vec![node_page.as_bytes().to_vec()],
1012            edge_pages: vec![edge_page.as_bytes().to_vec()],
1013        })
1014        .unwrap();
1015
1016        let store = GraphStore::deserialize(&bytes).expect("v1 blob deserializes");
1017        // Node decoded via legacy path → label_id mapped to reserved ID 1 ("host").
1018        let node = store.get_node("n1").unwrap();
1019        assert_eq!(node.node_type, "host");
1020        assert_eq!(node.label_id, LabelId::new(1));
1021        // Edge index rebuilt.
1022        let out = store.outgoing_edges("n1");
1023        assert_eq!(out.len(), 1);
1024        assert_eq!(out[0].0, "has_service");
1025    }
1026
1027    #[test]
1028    fn deserialize_rejects_unknown_version() {
1029        let mut bytes = [0u8; reddb_file::GRAPH_STORE_HEADER_LEN];
1030        bytes[0..4].copy_from_slice(&reddb_file::GRAPH_STORE_MAGIC);
1031        bytes[4..8].copy_from_slice(&999u32.to_le_bytes());
1032        match GraphStore::deserialize(&bytes) {
1033            Err(GraphStoreError::InvalidData(msg)) => assert!(msg.contains("unsupported")),
1034            Err(other) => panic!("unexpected error: {:?}", other),
1035            Ok(_) => panic!("expected error for unknown version"),
1036        }
1037    }
1038}