Skip to main content

reddb_server/storage/engine/
graph_store.rs

1//! High-Performance Disk-Backed Graph Storage Engine
2//!
3//! A concurrent, page-based graph storage optimized for:
4//! - Lock-free reads with RwLock for concurrent traversal
5//! - Cache-friendly packed layouts for nodes and edges
6//! - B+ tree index for O(log n) edge lookups
7//! - Streaming iteration for large graphs
8//!
9//! # Architecture
10//!
11//! ```text
12//! ┌─────────────────────────────────────────────────────────────────┐
13//! │                       GraphStore                                 │
14//! ├─────────────────────────────────────────────────────────────────┤
15//! │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐        │
16//! │  │ NodeIndex│  │EdgeIndex │  │ NodePages│  │ EdgePages│        │
17//! │  │ (B+ Tree)│  │ (B+ Tree)│  │ (Packed) │  │ (Packed) │        │
18//! │  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘        │
19//! │       │              │             │             │              │
20//! │  ┌────▼──────────────▼─────────────▼─────────────▼────┐        │
21//! │  │                    Pager (4KB Pages)                │        │
22//! │  └────────────────────────────────────────────────────┘        │
23//! │                              │                                  │
24//! │  ┌───────────────────────────▼────────────────────────┐        │
25//! │  │              SIEVE PageCache (lock-free reads)      │        │
26//! │  └────────────────────────────────────────────────────┘        │
27//! └─────────────────────────────────────────────────────────────────┘
28//! ```
29
30use std::collections::HashMap;
31use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
32use std::sync::Arc;
33use std::sync::RwLock;
34
35use super::page::{Page, PageType, PAGE_SIZE};
36
37/// Maximum key size for node/edge IDs
38pub const MAX_ID_SIZE: usize = 256;
39
40/// Maximum label size
41pub const MAX_LABEL_SIZE: usize = 512;
42
43/// V1 node record header size: id_len(2) + label_len(2) + type(1) + flags(1) + edge_count(4).
44/// Kept for [`StoredNode::decode_v1`]; new writes use [`NODE_HEADER_SIZE`].
45pub const NODE_HEADER_SIZE_V1: usize = 10;
46
47/// Node record header size: id_len(2) + label_len(2) + label_id(4) + flags(1) + edge_count(4).
48/// The 1-byte legacy `node_type` discriminant has been replaced by a 4-byte
49/// dynamic [`LabelId`] resolved through [`LabelRegistry`].
50pub const NODE_HEADER_SIZE: usize = 13;
51
52/// TableRef size: table_id(2) + row_id(8)
53pub const TABLE_REF_SIZE: usize = 10;
54
55/// Node flag: has table reference
56pub const NODE_FLAG_HAS_TABLE_REF: u8 = 0x01;
57/// Node flag: has vector embedding reference
58pub const NODE_FLAG_HAS_VECTOR_REF: u8 = 0x02;
59
60/// VectorRef size: collection_len(2) + vector_id(8) = 10 bytes header + variable collection name
61pub const VECTOR_REF_HEADER_SIZE: usize = 10;
62
63/// Reference to a table row (for linking graph nodes to tabular data)
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
65pub struct TableRef {
66    /// Table identifier (index into table registry)
67    pub table_id: u16,
68    /// Row ID within the table
69    pub row_id: u64,
70}
71
72impl TableRef {
73    /// Create a new table reference
74    pub fn new(table_id: u16, row_id: u64) -> Self {
75        Self { table_id, row_id }
76    }
77
78    /// Encode to bytes (10 bytes total)
79    pub fn encode(&self) -> [u8; TABLE_REF_SIZE] {
80        let mut buf = [0u8; TABLE_REF_SIZE];
81        buf[0..2].copy_from_slice(&self.table_id.to_le_bytes());
82        buf[2..10].copy_from_slice(&self.row_id.to_le_bytes());
83        buf
84    }
85
86    /// Decode from bytes
87    pub fn decode(data: &[u8]) -> Option<Self> {
88        if data.len() < TABLE_REF_SIZE {
89            return None;
90        }
91        Some(Self {
92            table_id: u16::from_le_bytes([data[0], data[1]]),
93            row_id: u64::from_le_bytes([
94                data[2], data[3], data[4], data[5], data[6], data[7], data[8], data[9],
95            ]),
96        })
97    }
98}
99
100/// V1 edge record header size: source_len(2) + target_len(2) + type(1) + weight(4).
101/// Kept for [`StoredEdge::decode_v1`]; new writes use [`EDGE_HEADER_SIZE`].
102pub const EDGE_HEADER_SIZE_V1: usize = 9;
103
104/// Edge record header size: source_len(2) + target_len(2) + label_id(4) + weight(4).
105/// The 1-byte legacy `edge_type` discriminant has been replaced by a 4-byte
106/// dynamic [`LabelId`] resolved through [`LabelRegistry`].
107pub const EDGE_HEADER_SIZE: usize = 12;
108
109/// A graph node stored on disk
110#[derive(Debug, Clone)]
111pub struct StoredNode {
112    pub id: String,
113    pub label: String,
114    /// Canonical category label string (e.g. `"host"`, `"order"`). Resolved
115    /// from [`label_id`] at decode time via the legacy seed mapping.
116    /// Caller-visible string; the registry-stored [`label_id`] is the
117    /// source-of-truth identifier.
118    pub node_type: String,
119    /// Authoritative label identifier resolved through [`LabelRegistry`].
120    pub label_id: LabelId,
121    pub flags: u8,
122    pub out_edge_count: u32,
123    pub in_edge_count: u32,
124    /// Page ID where this node is stored
125    pub page_id: u32,
126    /// Slot index within the page
127    pub slot: u16,
128    /// Optional reference to a table row (for unified queries)
129    pub table_ref: Option<TableRef>,
130    /// Optional reference to a vector embedding (collection name, vector_id)
131    pub vector_ref: Option<(String, u64)>,
132}
133
134impl StoredNode {
135    /// Encode node to bytes in v2 format (label_id replaces node_type).
136    pub fn encode(&self) -> Vec<u8> {
137        let id_bytes = self.id.as_bytes();
138        let label_bytes = self.label.as_bytes();
139        let has_table_ref = self.table_ref.is_some();
140        let has_vector_ref = self.vector_ref.is_some();
141
142        // Compute flags with table_ref and vector_ref indicators
143        let mut flags = self.flags & !(NODE_FLAG_HAS_TABLE_REF | NODE_FLAG_HAS_VECTOR_REF);
144        if has_table_ref {
145            flags |= NODE_FLAG_HAS_TABLE_REF;
146        }
147        if has_vector_ref {
148            flags |= NODE_FLAG_HAS_VECTOR_REF;
149        }
150
151        let table_ref_size = if has_table_ref { TABLE_REF_SIZE } else { 0 };
152        let vector_ref_size = if let Some((ref coll, _)) = self.vector_ref {
153            2 + coll.len() + 8
154        } else {
155            0
156        };
157        let mut buf = Vec::with_capacity(
158            NODE_HEADER_SIZE
159                + id_bytes.len()
160                + label_bytes.len()
161                + table_ref_size
162                + vector_ref_size,
163        );
164
165        // V2 header: id_len(2) + label_len(2) + label_id(4) + flags(1) + out_edges(2) + in_edges(2)
166        buf.extend_from_slice(&(id_bytes.len() as u16).to_le_bytes());
167        buf.extend_from_slice(&(label_bytes.len() as u16).to_le_bytes());
168        buf.extend_from_slice(&self.label_id.as_u32().to_le_bytes());
169        buf.push(flags);
170        buf.extend_from_slice(&(self.out_edge_count as u16).to_le_bytes());
171        buf.extend_from_slice(&(self.in_edge_count as u16).to_le_bytes());
172
173        buf.extend_from_slice(id_bytes);
174        buf.extend_from_slice(label_bytes);
175
176        if let Some(ref tref) = self.table_ref {
177            buf.extend_from_slice(&tref.encode());
178        }
179
180        if let Some((ref collection, vector_id)) = self.vector_ref {
181            let coll_bytes = collection.as_bytes();
182            buf.extend_from_slice(&(coll_bytes.len() as u16).to_le_bytes());
183            buf.extend_from_slice(coll_bytes);
184            buf.extend_from_slice(&vector_id.to_le_bytes());
185        }
186
187        buf
188    }
189
190    /// Decode node from bytes (v2 format). For v1 records use [`decode_v1`].
191    pub fn decode(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
192        if data.len() < NODE_HEADER_SIZE {
193            return None;
194        }
195
196        let id_len = u16::from_le_bytes([data[0], data[1]]) as usize;
197        let label_len = u16::from_le_bytes([data[2], data[3]]) as usize;
198        let label_id = LabelId::new(u32::from_le_bytes([data[4], data[5], data[6], data[7]]));
199        let flags = data[8];
200        let out_edge_count = u16::from_le_bytes([data[9], data[10]]) as u32;
201        let in_edge_count = u16::from_le_bytes([data[11], data[12]]) as u32;
202        // Derive legacy node_type from label_id for back-compat with callers
203        // that still read the field. PR3 removes this field entirely.
204        let node_type = label_id_to_node_label(label_id);
205
206        Self::decode_payload(
207            data,
208            page_id,
209            slot,
210            NODE_HEADER_SIZE,
211            id_len,
212            label_len,
213            flags,
214            out_edge_count,
215            in_edge_count,
216            node_type,
217            label_id,
218        )
219    }
220
221    /// Decode a v1 (legacy) node record. The caller must supply a
222    /// [`LabelRegistry`] seeded via [`LabelRegistry::with_legacy_seed`] so
223    /// the legacy `node_type` discriminant maps to the correct reserved
224    /// [`LabelId`].
225    pub fn decode_v1(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
226        if data.len() < NODE_HEADER_SIZE_V1 {
227            return None;
228        }
229        let id_len = u16::from_le_bytes([data[0], data[1]]) as usize;
230        let label_len = u16::from_le_bytes([data[2], data[3]]) as usize;
231        // V1 records carry the legacy enum discriminant; reject any byte
232        // outside the 9-variant range so we do not silently misinterpret
233        // unrelated bytes as a node-type.
234        if data[4] > 8 {
235            return None;
236        }
237        let flags = data[5];
238        let out_edge_count = u16::from_le_bytes([data[6], data[7]]) as u32;
239        let in_edge_count = u16::from_le_bytes([data[8], data[9]]) as u32;
240        let label_id = LabelRegistry::legacy_node_label_id(data[4]);
241        let node_type = label_id_to_node_label(label_id);
242        Self::decode_payload(
243            data,
244            page_id,
245            slot,
246            NODE_HEADER_SIZE_V1,
247            id_len,
248            label_len,
249            flags,
250            out_edge_count,
251            in_edge_count,
252            node_type,
253            label_id,
254        )
255    }
256
257    /// Shared payload (id, label, table_ref, vector_ref) decoder for v1/v2.
258    #[allow(clippy::too_many_arguments)]
259    fn decode_payload(
260        data: &[u8],
261        page_id: u32,
262        slot: u16,
263        header_size: usize,
264        id_len: usize,
265        label_len: usize,
266        flags: u8,
267        out_edge_count: u32,
268        in_edge_count: u32,
269        node_type: String,
270        label_id: LabelId,
271    ) -> Option<Self> {
272        let has_table_ref = (flags & NODE_FLAG_HAS_TABLE_REF) != 0;
273        let has_vector_ref = (flags & NODE_FLAG_HAS_VECTOR_REF) != 0;
274        let table_ref_size = if has_table_ref { TABLE_REF_SIZE } else { 0 };
275
276        let mut offset = header_size + id_len + label_len + table_ref_size;
277        if data.len() < offset {
278            return None;
279        }
280
281        let id = String::from_utf8_lossy(&data[header_size..header_size + id_len]).to_string();
282        let label =
283            String::from_utf8_lossy(&data[header_size + id_len..header_size + id_len + label_len])
284                .to_string();
285
286        let table_ref = if has_table_ref {
287            let ref_start = header_size + id_len + label_len;
288            TableRef::decode(&data[ref_start..])
289        } else {
290            None
291        };
292
293        let vector_ref = if has_vector_ref {
294            if data.len() < offset + 2 {
295                return None;
296            }
297            let coll_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
298            offset += 2;
299            if data.len() < offset + coll_len + 8 {
300                return None;
301            }
302            let collection = String::from_utf8_lossy(&data[offset..offset + coll_len]).to_string();
303            offset += coll_len;
304            let vector_id = u64::from_le_bytes(data[offset..offset + 8].try_into().ok()?);
305            Some((collection, vector_id))
306        } else {
307            None
308        };
309
310        Some(Self {
311            id,
312            label,
313            node_type,
314            label_id,
315            flags,
316            out_edge_count,
317            in_edge_count,
318            page_id,
319            slot,
320            table_ref,
321            vector_ref,
322        })
323    }
324
325    /// Calculate encoded size
326    pub fn encoded_size(&self) -> usize {
327        let table_ref_size = if self.table_ref.is_some() {
328            TABLE_REF_SIZE
329        } else {
330            0
331        };
332        let vector_ref_size = if let Some((ref coll, _)) = self.vector_ref {
333            2 + coll.len() + 8
334        } else {
335            0
336        };
337        NODE_HEADER_SIZE + self.id.len() + self.label.len() + table_ref_size + vector_ref_size
338    }
339
340    /// Link this node to a table row
341    pub fn link_to_row(&mut self, table_id: u16, row_id: u64) {
342        self.table_ref = Some(TableRef::new(table_id, row_id));
343    }
344
345    /// Unlink from table row
346    pub fn unlink_from_row(&mut self) {
347        self.table_ref = None;
348    }
349
350    /// Link this node to a vector embedding
351    pub fn link_to_vector(&mut self, collection: String, vector_id: u64) {
352        self.vector_ref = Some((collection, vector_id));
353    }
354
355    /// Unlink from vector embedding
356    pub fn unlink_from_vector(&mut self) {
357        self.vector_ref = None;
358    }
359
360    /// Check if this node is linked to a table row
361    pub fn is_linked(&self) -> bool {
362        self.table_ref.is_some()
363    }
364}
365
366/// A graph edge stored on disk
367#[derive(Debug, Clone)]
368pub struct StoredEdge {
369    pub source_id: String,
370    pub target_id: String,
371    /// Canonical edge label string. Derived from [`label_id`] at decode time.
372    pub edge_type: String,
373    /// Authoritative label identifier resolved through [`LabelRegistry`].
374    pub label_id: LabelId,
375    pub weight: f32,
376    /// Page ID where this edge is stored
377    pub page_id: u32,
378    /// Slot index within the page
379    pub slot: u16,
380}
381
382impl StoredEdge {
383    /// Encode edge to bytes (v2 format).
384    pub fn encode(&self) -> Vec<u8> {
385        let source_bytes = self.source_id.as_bytes();
386        let target_bytes = self.target_id.as_bytes();
387
388        let mut buf =
389            Vec::with_capacity(EDGE_HEADER_SIZE + source_bytes.len() + target_bytes.len());
390
391        // V2 header: source_len(2) + target_len(2) + label_id(4) + weight(4)
392        buf.extend_from_slice(&(source_bytes.len() as u16).to_le_bytes());
393        buf.extend_from_slice(&(target_bytes.len() as u16).to_le_bytes());
394        buf.extend_from_slice(&self.label_id.as_u32().to_le_bytes());
395        buf.extend_from_slice(&self.weight.to_le_bytes());
396
397        buf.extend_from_slice(source_bytes);
398        buf.extend_from_slice(target_bytes);
399
400        buf
401    }
402
403    /// Decode edge from bytes (v2 format). For v1 records use [`decode_v1`].
404    pub fn decode(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
405        if data.len() < EDGE_HEADER_SIZE {
406            return None;
407        }
408
409        let source_len = u16::from_le_bytes([data[0], data[1]]) as usize;
410        let target_len = u16::from_le_bytes([data[2], data[3]]) as usize;
411        let label_id = LabelId::new(u32::from_le_bytes([data[4], data[5], data[6], data[7]]));
412        let weight = f32::from_le_bytes([data[8], data[9], data[10], data[11]]);
413        let edge_type = label_id_to_edge_label(label_id);
414
415        if data.len() < EDGE_HEADER_SIZE + source_len + target_len {
416            return None;
417        }
418
419        let source_id =
420            String::from_utf8_lossy(&data[EDGE_HEADER_SIZE..EDGE_HEADER_SIZE + source_len])
421                .to_string();
422        let target_id = String::from_utf8_lossy(
423            &data[EDGE_HEADER_SIZE + source_len..EDGE_HEADER_SIZE + source_len + target_len],
424        )
425        .to_string();
426
427        Some(Self {
428            source_id,
429            target_id,
430            edge_type,
431            label_id,
432            weight,
433            page_id,
434            slot,
435        })
436    }
437
438    /// Decode a v1 (legacy) edge record. The 1-byte enum discriminant maps
439    /// to the legacy reserved [`LabelId`] range via
440    /// [`LabelRegistry::legacy_edge_label_id`].
441    pub fn decode_v1(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
442        if data.len() < EDGE_HEADER_SIZE_V1 {
443            return None;
444        }
445        let source_len = u16::from_le_bytes([data[0], data[1]]) as usize;
446        let target_len = u16::from_le_bytes([data[2], data[3]]) as usize;
447        if data[4] > 9 {
448            return None;
449        }
450        let weight = f32::from_le_bytes([data[5], data[6], data[7], data[8]]);
451        let label_id = LabelRegistry::legacy_edge_label_id(data[4]);
452        let edge_type = label_id_to_edge_label(label_id);
453
454        if data.len() < EDGE_HEADER_SIZE_V1 + source_len + target_len {
455            return None;
456        }
457        let source_id =
458            String::from_utf8_lossy(&data[EDGE_HEADER_SIZE_V1..EDGE_HEADER_SIZE_V1 + source_len])
459                .to_string();
460        let target_id = String::from_utf8_lossy(
461            &data[EDGE_HEADER_SIZE_V1 + source_len..EDGE_HEADER_SIZE_V1 + source_len + target_len],
462        )
463        .to_string();
464
465        Some(Self {
466            source_id,
467            target_id,
468            edge_type,
469            label_id,
470            weight,
471            page_id,
472            slot,
473        })
474    }
475
476    /// Calculate encoded size (v2)
477    pub fn encoded_size(&self) -> usize {
478        EDGE_HEADER_SIZE + self.source_id.len() + self.target_id.len()
479    }
480}
481
482/// Resolve a [`LabelId`] in the legacy reserved range to its canonical
483/// category string. For non-legacy IDs (≥ [`FIRST_USER_LABEL_ID`]) returns
484/// `format!("label_{}", id)` — a non-crashing placeholder that flags the
485/// caller is reading a record without a registry handle. Real callers
486/// should resolve through [`LabelRegistry`] when accuracy matters.
487fn label_id_to_node_label(id: LabelId) -> String {
488    match id.as_u32() {
489        1 => "host".to_string(),
490        2 => "service".to_string(),
491        3 => "credential".to_string(),
492        4 => "vulnerability".to_string(),
493        5 => "endpoint".to_string(),
494        6 => "technology".to_string(),
495        7 => "user".to_string(),
496        8 => "domain".to_string(),
497        9 => "certificate".to_string(),
498        n => format!("label_{}", n),
499    }
500}
501
502/// Resolve a [`LabelId`] in the legacy reserved edge range to its canonical
503/// edge label string.
504fn label_id_to_edge_label(id: LabelId) -> String {
505    match id.as_u32() {
506        10 => "has_service".to_string(),
507        11 => "has_endpoint".to_string(),
508        12 => "uses_tech".to_string(),
509        13 => "auth_access".to_string(),
510        14 => "affected_by".to_string(),
511        15 => "contains".to_string(),
512        16 => "connects_to".to_string(),
513        17 => "related_to".to_string(),
514        18 => "has_user".to_string(),
515        19 => "has_cert".to_string(),
516        n => format!("label_{}", n),
517    }
518}
519
520/// Location of a record in the graph store
521#[derive(Debug, Clone, Copy)]
522pub struct RecordLocation {
523    pub page_id: u32,
524    pub slot: u16,
525}
526
527/// Graph statistics
528#[derive(Debug, Clone, Default)]
529pub struct GraphStats {
530    pub node_count: u64,
531    pub edge_count: u64,
532    pub node_pages: u32,
533    pub edge_pages: u32,
534    /// Cardinality per category label (e.g. `"host" → 42`). Replaces the
535    /// closed-enum `nodes_by_type: [u64; 9]` from earlier revisions.
536    pub nodes_by_label: HashMap<String, u64>,
537    /// Cardinality per edge label.
538    pub edges_by_label: HashMap<String, u64>,
539}
540
541/// Concurrent in-memory index for fast lookups
542/// Uses sharded locks for reduced contention
543pub struct ShardedIndex<V> {
544    shards: Vec<RwLock<HashMap<String, V>>>,
545    shard_count: usize,
546}
547
548impl<V: Clone> ShardedIndex<V> {
549    pub fn new(shard_count: usize) -> Self {
550        let shards = (0..shard_count)
551            .map(|_| RwLock::new(HashMap::new()))
552            .collect();
553        Self {
554            shards,
555            shard_count,
556        }
557    }
558
559    #[inline]
560    fn shard_for(&self, key: &str) -> usize {
561        // Simple hash-based sharding
562        let hash: u64 = key
563            .bytes()
564            .fold(0u64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u64));
565        (hash as usize) % self.shard_count
566    }
567
568    pub fn get(&self, key: &str) -> Option<V> {
569        let shard = self.shard_for(key);
570        self.shards[shard].read().ok()?.get(key).cloned()
571    }
572
573    pub fn insert(&self, key: String, value: V) {
574        let shard = self.shard_for(&key);
575        if let Ok(mut guard) = self.shards[shard].write() {
576            guard.insert(key, value);
577        }
578    }
579
580    pub fn remove(&self, key: &str) -> Option<V> {
581        let shard = self.shard_for(key);
582        self.shards[shard].write().ok()?.remove(key)
583    }
584
585    pub fn contains(&self, key: &str) -> bool {
586        let shard = self.shard_for(key);
587        self.shards[shard]
588            .read()
589            .ok()
590            .map(|g| g.contains_key(key))
591            .unwrap_or(false)
592    }
593
594    pub fn len(&self) -> usize {
595        self.shards
596            .iter()
597            .filter_map(|s| s.read().ok())
598            .map(|g| g.len())
599            .sum()
600    }
601
602    pub fn is_empty(&self) -> bool {
603        self.len() == 0
604    }
605}
606
607/// Edge index key: `(source_id, edge_label)` → `Vec<target_id>`.
608/// Optimized for adjacency list queries; the edge label is the canonical
609/// string form (e.g. `"connects_to"`) — use the registry to resolve back to
610/// a [`LabelId`] when needed.
611pub struct EdgeIndex {
612    /// Forward edges: source → `[(edge_label, target, weight)]`
613    forward: ShardedIndex<Vec<(String, String, f32)>>,
614    /// Backward edges: target → `[(edge_label, source, weight)]`
615    backward: ShardedIndex<Vec<(String, String, f32)>>,
616}
617
618impl EdgeIndex {
619    pub fn new(shard_count: usize) -> Self {
620        Self {
621            forward: ShardedIndex::new(shard_count),
622            backward: ShardedIndex::new(shard_count),
623        }
624    }
625
626    pub fn add_edge(&self, source: &str, target: &str, edge_label: &str, weight: f32) {
627        let shard = self.forward.shard_for(source);
628        if let Ok(mut guard) = self.forward.shards[shard].write() {
629            guard
630                .entry(source.to_string())
631                .or_insert_with(Vec::new)
632                .push((edge_label.to_string(), target.to_string(), weight));
633        }
634
635        let shard = self.backward.shard_for(target);
636        if let Ok(mut guard) = self.backward.shards[shard].write() {
637            guard
638                .entry(target.to_string())
639                .or_insert_with(Vec::new)
640                .push((edge_label.to_string(), source.to_string(), weight));
641        }
642    }
643
644    pub fn remove_edge(&self, source: &str, target: &str, edge_label: &str) {
645        let shard = self.forward.shard_for(source);
646        if let Ok(mut guard) = self.forward.shards[shard].write() {
647            if let Some(edges) = guard.get_mut(source) {
648                edges.retain(|(et, t, _)| !(et == edge_label && t == target));
649            }
650        }
651
652        let shard = self.backward.shard_for(target);
653        if let Ok(mut guard) = self.backward.shards[shard].write() {
654            if let Some(edges) = guard.get_mut(target) {
655                edges.retain(|(et, s, _)| !(et == edge_label && s == source));
656            }
657        }
658    }
659
660    pub fn outgoing(&self, source: &str) -> Vec<(String, String, f32)> {
661        self.forward.get(source).unwrap_or_default()
662    }
663
664    pub fn incoming(&self, target: &str) -> Vec<(String, String, f32)> {
665        self.backward.get(target).unwrap_or_default()
666    }
667
668    pub fn outgoing_of_type(&self, source: &str, edge_label: &str) -> Vec<(String, f32)> {
669        self.forward
670            .get(source)
671            .unwrap_or_default()
672            .into_iter()
673            .filter(|(et, _, _)| et == edge_label)
674            .map(|(_, t, w)| (t, w))
675            .collect()
676    }
677}
678
679/// High-performance graph storage engine
680///
681/// Provides concurrent read access with minimal locking overhead.
682/// Writes are serialized through a write lock but reads can proceed in parallel.
683pub struct GraphStore {
684    /// Node index: node_id -> location
685    node_index: ShardedIndex<RecordLocation>,
686    /// Edge index: adjacency lists
687    edge_index: EdgeIndex,
688    /// Secondary inverted indexes on (type, label) for O(1) non-id lookups.
689    /// Avoids full node-page scans in `nodes_of_type` / `nodes_by_label`.
690    ///
691    /// Stored as `Arc` so [`GraphStore::publish_indexes`] can share the
692    /// exact live index with an [`crate::storage::index::IndexRegistry`]
693    /// instead of handing out a frozen snapshot.
694    node_secondary: std::sync::Arc<secondary_index::NodeSecondaryIndex>,
695    /// Dynamic label catalog. Resolves user-supplied label strings to
696    /// stable [`LabelId`] u32 values used in the v2 page format.
697    pub registry: Arc<LabelRegistry>,
698    /// Node pages (packed node records)
699    node_pages: RwLock<Vec<Page>>,
700    /// Edge pages (packed edge records)
701    edge_pages: RwLock<Vec<Page>>,
702    /// Current node page for inserts
703    current_node_page: AtomicU32,
704    /// Current edge page for inserts
705    current_edge_page: AtomicU32,
706    /// Statistics
707    stats: GraphStats,
708    node_count: AtomicU64,
709    edge_count: AtomicU64,
710}
711
712#[path = "graph_store/impl.rs"]
713mod graph_store_impl;
714pub mod label_registry;
715pub mod secondary_index;
716pub use label_registry::{
717    LabelId, LabelRegistry, LabelRegistryError, Namespace, FIRST_USER_LABEL_ID, MAX_LABEL_LEN,
718    UNSET_LABEL_ID,
719};
720pub use secondary_index::NodeSecondaryIndex;
721impl Default for GraphStore {
722    fn default() -> Self {
723        Self::new()
724    }
725}
726
727/// Iterator over all nodes in the graph
728pub struct NodeIterator<'a> {
729    store: &'a GraphStore,
730    page_idx: usize,
731    cell_idx: usize,
732}
733
734impl<'a> Iterator for NodeIterator<'a> {
735    type Item = StoredNode;
736
737    fn next(&mut self) -> Option<Self::Item> {
738        let pages = self.store.node_pages.read().ok()?;
739
740        loop {
741            if self.page_idx >= pages.len() {
742                return None;
743            }
744
745            let page = &pages[self.page_idx];
746            let cell_count = page.cell_count() as usize;
747
748            if self.cell_idx >= cell_count {
749                self.page_idx += 1;
750                self.cell_idx = 0;
751                continue;
752            }
753
754            if let Ok((_, value)) = page.read_cell(self.cell_idx) {
755                self.cell_idx += 1;
756                if let Some(node) =
757                    StoredNode::decode(&value, self.page_idx as u32, (self.cell_idx - 1) as u16)
758                {
759                    return Some(node);
760                }
761            } else {
762                self.cell_idx += 1;
763            }
764        }
765    }
766}
767
768/// Graph store errors
769#[derive(Debug, Clone)]
770pub enum GraphStoreError {
771    NodeExists(String),
772    NodeNotFound(String),
773    EdgeNotFound(String, String),
774    PageFull,
775    LockPoisoned,
776    InvalidData(String),
777    IoError(String),
778}
779
780impl std::fmt::Display for GraphStoreError {
781    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
782        match self {
783            Self::NodeExists(id) => write!(f, "Node already exists: {}", id),
784            Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
785            Self::EdgeNotFound(s, t) => write!(f, "Edge not found: {} -> {}", s, t),
786            Self::PageFull => write!(f, "Page is full"),
787            Self::LockPoisoned => write!(f, "Lock poisoned"),
788            Self::InvalidData(msg) => write!(f, "Invalid data: {}", msg),
789            Self::IoError(msg) => write!(f, "I/O error: {}", msg),
790        }
791    }
792}
793
794impl std::error::Error for GraphStoreError {}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799    use std::sync::Arc;
800
801    #[test]
802    fn test_graph_store_basic() {
803        let store = GraphStore::new();
804
805        // Add nodes
806        store
807            .add_node_with_label("host:192.168.1.1", "Web Server", "host")
808            .unwrap();
809        store
810            .add_node_with_label("host:192.168.1.2", "Database", "host")
811            .unwrap();
812        store
813            .add_node_with_label("service:192.168.1.1:80:http", "HTTP", "service")
814            .unwrap();
815
816        assert_eq!(store.node_count(), 3);
817
818        // Add edges
819        store
820            .add_edge_with_label(
821                "host:192.168.1.1",
822                "service:192.168.1.1:80:http",
823                "has_service",
824                1.0,
825            )
826            .unwrap();
827        store
828            .add_edge_with_label("host:192.168.1.1", "host:192.168.1.2", "connects_to", 1.0)
829            .unwrap();
830
831        assert_eq!(store.edge_count(), 2);
832
833        // Query
834        let node = store.get_node("host:192.168.1.1").unwrap();
835        assert_eq!(node.label, "Web Server");
836
837        let out_edges = store.outgoing_edges("host:192.168.1.1");
838        assert_eq!(out_edges.len(), 2);
839    }
840
841    #[test]
842    fn test_graph_store_serialization() {
843        let store = GraphStore::new();
844
845        store
846            .add_node_with_label("host:10.0.0.1", "Server A", "host")
847            .unwrap();
848        store
849            .add_node_with_label("host:10.0.0.2", "Server B", "host")
850            .unwrap();
851        store
852            .add_edge_with_label("host:10.0.0.1", "host:10.0.0.2", "connects_to", 0.5)
853            .unwrap();
854
855        // Serialize
856        let bytes = store.serialize();
857
858        // Deserialize
859        let restored = GraphStore::deserialize(&bytes).unwrap();
860
861        assert_eq!(restored.node_count(), 2);
862        assert_eq!(restored.edge_count(), 1);
863
864        let node = restored.get_node("host:10.0.0.1").unwrap();
865        assert_eq!(node.label, "Server A");
866    }
867
868    #[test]
869    fn test_concurrent_reads() {
870        use std::thread;
871
872        let store = Arc::new(GraphStore::new());
873
874        // Add some data
875        for i in 0..100 {
876            store
877                .add_node_with_label(&format!("host:{}", i), &format!("Host {}", i), "host")
878                .unwrap();
879        }
880
881        // Spawn multiple reader threads
882        let mut handles = vec![];
883        for _ in 0..4 {
884            let store_clone = Arc::clone(&store);
885            handles.push(thread::spawn(move || {
886                for i in 0..100 {
887                    let _ = store_clone.get_node(&format!("host:{}", i));
888                }
889            }));
890        }
891
892        for handle in handles {
893            handle.join().unwrap();
894        }
895
896        assert_eq!(store.node_count(), 100);
897    }
898
899    #[test]
900    fn test_edge_index_performance() {
901        let store = GraphStore::new();
902
903        // Create a graph with many edges
904        store
905            .add_node_with_label("hub", "Hub Node", "host")
906            .unwrap();
907        for i in 0..100 {
908            store
909                .add_node_with_label(&format!("spoke:{}", i), &format!("Spoke {}", i), "host")
910                .unwrap();
911            store
912                .add_edge_with_label("hub", &format!("spoke:{}", i), "connects_to", 1.0)
913                .unwrap();
914        }
915
916        // Query outgoing edges (should be fast with index)
917        let edges = store.outgoing_edges("hub");
918        assert_eq!(edges.len(), 100);
919    }
920
921    #[test]
922    fn test_nodes_of_type_uses_secondary_index() {
923        let store = GraphStore::new();
924        store
925            .add_node_with_label("host:1", "Web Server", "host")
926            .unwrap();
927        store
928            .add_node_with_label("host:2", "DB Server", "host")
929            .unwrap();
930        store
931            .add_node_with_label("svc:1", "HTTP", "service")
932            .unwrap();
933        store
934            .add_node_with_label("vuln:1", "CVE-2024-1", "vulnerability")
935            .unwrap();
936
937        let hosts = store.nodes_with_category("host");
938        assert_eq!(hosts.len(), 2);
939        assert!(hosts.iter().all(|n| n.node_type == "host"));
940
941        let services = store.nodes_with_category("service");
942        assert_eq!(services.len(), 1);
943        assert_eq!(services[0].id, "svc:1");
944
945        assert_eq!(store.nodes_with_category("user").len(), 0);
946    }
947
948    #[test]
949    fn test_nodes_by_label_with_bloom_prune() {
950        let store = GraphStore::new();
951        store
952            .add_node_with_label("host:1", "Edge Router", "host")
953            .unwrap();
954        store
955            .add_node_with_label("host:2", "Edge Router", "host")
956            .unwrap();
957        store
958            .add_node_with_label("host:3", "Core Switch", "host")
959            .unwrap();
960
961        let routers = store.nodes_by_label("Edge Router");
962        assert_eq!(routers.len(), 2);
963
964        let unknown = store.nodes_by_label("Quantum Router 9000");
965        assert!(unknown.is_empty());
966        // Bloom is allowed to false-positive but must never hide real labels.
967        assert!(store.may_contain_label("Edge Router"));
968        assert!(store.may_contain_label("Core Switch"));
969    }
970
971    #[test]
972    fn test_publish_indexes_to_registry() {
973        use crate::storage::index::{IndexKind, IndexRegistry, IndexScope};
974
975        let store = GraphStore::new();
976        store.add_node_with_label("h:1", "Alpha", "host").unwrap();
977        store.add_node_with_label("h:2", "Beta", "host").unwrap();
978        store
979            .add_node_with_label("svc:1", "HTTP", "service")
980            .unwrap();
981
982        let registry = IndexRegistry::new();
983        store.publish_indexes(&registry, "infra");
984
985        let shared = registry.get(&IndexScope::graph("infra")).unwrap();
986        let stats = shared.stats();
987        // Two scopes × each insert = by_type + by_label per node
988        // 3 inserts × 2 scopes = 6 entries
989        assert_eq!(stats.entries, 6);
990        assert_eq!(stats.kind, IndexKind::Inverted);
991        assert!(stats.has_bloom);
992
993        // Live updates are visible through the registry since both sides
994        // share the same Arc<NodeSecondaryIndex>.
995        store.add_node_with_label("h:3", "Gamma", "host").unwrap();
996        let updated = registry.get(&IndexScope::graph("infra")).unwrap().stats();
997        assert_eq!(updated.entries, 8);
998    }
999
1000    #[test]
1001    fn test_secondary_index_rebuilt_after_deserialize() {
1002        let store = GraphStore::new();
1003        store
1004            .add_node_with_label("host:1", "Alpha", "host")
1005            .unwrap();
1006        store
1007            .add_node_with_label("svc:1", "HTTP", "service")
1008            .unwrap();
1009
1010        let bytes = store.serialize();
1011        let restored = GraphStore::deserialize(&bytes).unwrap();
1012
1013        assert_eq!(restored.nodes_with_category("host").len(), 1);
1014        assert_eq!(restored.nodes_by_label("HTTP").len(), 1);
1015        assert!(restored.may_contain_label("Alpha"));
1016    }
1017
1018    #[test]
1019    fn test_node_iteration() {
1020        let store = GraphStore::new();
1021
1022        for i in 0..50 {
1023            store
1024                .add_node_with_label(&format!("node:{}", i), &format!("Node {}", i), "host")
1025                .unwrap();
1026        }
1027
1028        let nodes: Vec<_> = store.iter_nodes().collect();
1029        assert_eq!(nodes.len(), 50);
1030    }
1031
1032    #[test]
1033    fn legacy_node_type_interns_into_registry() {
1034        let store = GraphStore::new();
1035        store.add_node_with_label("h1", "web", "host").unwrap();
1036        // Adding via the legacy enum must intern its as_str() name.
1037        let id = store
1038            .registry
1039            .lookup(label_registry::Namespace::Node, "host")
1040            .expect("legacy enum name should be interned");
1041        let fetched = store.get_node("h1").unwrap();
1042        assert_eq!(fetched.label_id, id);
1043        assert_eq!(fetched.node_type, "host");
1044    }
1045
1046    #[test]
1047    fn v2_round_trip_preserves_user_labels() {
1048        let store = GraphStore::new();
1049        // Intern a non-legacy user label and add a node referencing it via
1050        // the legacy bridge (Host) — exercises the full v2 encode path.
1051        let user_id = store.intern_node_label("order").unwrap();
1052        assert!(user_id.as_u32() >= label_registry::FIRST_USER_LABEL_ID);
1053
1054        store.add_node_with_label("h1", "web-1", "host").unwrap();
1055        store.add_node_with_label("h2", "web-2", "service").unwrap();
1056        store
1057            .add_edge_with_label("h1", "h2", "connects_to", 1.0)
1058            .unwrap();
1059
1060        let bytes = store.serialize();
1061        // V2 magic + version
1062        assert_eq!(&bytes[0..4], b"RBGR");
1063        assert_eq!(
1064            u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
1065            2
1066        );
1067
1068        let restored = GraphStore::deserialize(&bytes).unwrap();
1069        // Registry survived.
1070        assert_eq!(
1071            restored
1072                .registry
1073                .lookup(label_registry::Namespace::Node, "order"),
1074            Some(user_id)
1075        );
1076        // Records decoded with v2 label_id intact.
1077        let h1 = restored.get_node("h1").unwrap();
1078        assert_eq!(h1.node_type, "host");
1079        assert_eq!(
1080            h1.label_id,
1081            restored
1082                .registry
1083                .lookup(label_registry::Namespace::Node, "host")
1084                .unwrap()
1085        );
1086        // Edge index rebuilt.
1087        let outgoing = restored.outgoing_edges("h1");
1088        assert_eq!(outgoing.len(), 1);
1089        assert_eq!(outgoing[0].0, "connects_to");
1090    }
1091
1092    #[test]
1093    fn v1_blob_deserializes_via_legacy_path() {
1094        // Hand-craft a minimal v1 file: header (24 bytes, version=1) + 1
1095        // node page + 1 edge page. The node page contains one v1 record:
1096        //   header_v1 = id_len(2) "n1" label_len(2) "L" type(1=Host) flags(0) out(0) in(0)
1097        //   payload   = "n1" "L"
1098        // The edge page contains one v1 edge:
1099        //   header_v1 = src_len(2) "n1" tgt_len(2) "n1" type(0=HasService) weight(1.0)
1100        //   payload   = "n1" "n1"
1101        //
1102        // Page::insert_cell handles the cell layout for us, so we build
1103        // pages programmatically rather than poking at raw page bytes.
1104        let mut node_page = Page::new(PageType::GraphNode, 0);
1105        // Build a v1 node record.
1106        let mut v1_node = Vec::new();
1107        v1_node.extend_from_slice(&2u16.to_le_bytes()); // id_len
1108        v1_node.extend_from_slice(&1u16.to_le_bytes()); // label_len
1109        v1_node.push(0); // "host" (disc=0)
1110        v1_node.push(0); // flags
1111        v1_node.extend_from_slice(&0u16.to_le_bytes()); // out_edge_count
1112        v1_node.extend_from_slice(&0u16.to_le_bytes()); // in_edge_count
1113        v1_node.extend_from_slice(b"n1");
1114        v1_node.extend_from_slice(b"L");
1115        node_page.insert_cell(b"n1", &v1_node).unwrap();
1116
1117        let mut edge_page = Page::new(PageType::GraphEdge, 0);
1118        let mut v1_edge = Vec::new();
1119        v1_edge.extend_from_slice(&2u16.to_le_bytes()); // source_len
1120        v1_edge.extend_from_slice(&2u16.to_le_bytes()); // target_len
1121        v1_edge.push(0); // "has_service" (disc=0)
1122        v1_edge.extend_from_slice(&1.0f32.to_le_bytes()); // weight
1123        v1_edge.extend_from_slice(b"n1");
1124        v1_edge.extend_from_slice(b"n1");
1125        edge_page.insert_cell(b"n1|0|n1", &v1_edge).unwrap();
1126
1127        // Assemble v1 file: header + node-page-count + node-pages + edge-page-count + edge-pages.
1128        let mut bytes = Vec::new();
1129        bytes.extend_from_slice(b"RBGR");
1130        bytes.extend_from_slice(&1u32.to_le_bytes()); // version=1
1131        bytes.extend_from_slice(&1u64.to_le_bytes()); // node_count
1132        bytes.extend_from_slice(&1u64.to_le_bytes()); // edge_count
1133        bytes.extend_from_slice(&1u32.to_le_bytes()); // node_page_count
1134        bytes.extend_from_slice(node_page.as_bytes());
1135        bytes.extend_from_slice(&1u32.to_le_bytes()); // edge_page_count
1136        bytes.extend_from_slice(edge_page.as_bytes());
1137
1138        let store = GraphStore::deserialize(&bytes).expect("v1 blob deserializes");
1139        // Node decoded via legacy path → label_id mapped to reserved ID 1 ("host").
1140        let node = store.get_node("n1").unwrap();
1141        assert_eq!(node.node_type, "host");
1142        assert_eq!(node.label_id, LabelId::new(1));
1143        // Edge index rebuilt.
1144        let out = store.outgoing_edges("n1");
1145        assert_eq!(out.len(), 1);
1146        assert_eq!(out[0].0, "has_service");
1147    }
1148
1149    #[test]
1150    fn deserialize_rejects_unknown_version() {
1151        let mut bytes = Vec::new();
1152        bytes.extend_from_slice(b"RBGR");
1153        bytes.extend_from_slice(&999u32.to_le_bytes()); // bogus version
1154        bytes.extend_from_slice(&0u64.to_le_bytes());
1155        bytes.extend_from_slice(&0u64.to_le_bytes());
1156        match GraphStore::deserialize(&bytes) {
1157            Err(GraphStoreError::InvalidData(msg)) => assert!(msg.contains("Unsupported")),
1158            Err(other) => panic!("unexpected error: {:?}", other),
1159            Ok(_) => panic!("expected error for unknown version"),
1160        }
1161    }
1162}