Skip to main content

reddb_server/storage/engine/graph_store/
impl.rs

1use std::sync::Arc;
2
3use crate::storage::index::{IndexRegistry, IndexScope};
4
5use super::label_registry::Namespace;
6use super::*;
7
8impl GraphStore {
9    /// Create a new empty graph store with a fresh [`LabelRegistry`] that
10    /// has the legacy reserved label IDs (1..=19) pre-seeded so v1
11    /// on-disk graph records still decode round-trip.
12    pub fn new() -> Self {
13        Self::with_registry(Arc::new(LabelRegistry::with_legacy_seed()))
14    }
15
16    /// Create an empty graph store sharing the given [`LabelRegistry`]. Use
17    /// this when multiple [`GraphStore`] instances should agree on
18    /// [`LabelId`] assignments (e.g. when the same database holds several
19    /// named graphs).
20    pub fn with_registry(registry: Arc<LabelRegistry>) -> Self {
21        // Use 16 shards for good parallelism on modern CPUs
22        const SHARD_COUNT: usize = 16;
23
24        let initial_node_page = Page::new(PageType::GraphNode, 0);
25        let initial_edge_page = Page::new(PageType::GraphEdge, 0);
26
27        Self {
28            node_index: ShardedIndex::new(SHARD_COUNT),
29            edge_index: EdgeIndex::new(SHARD_COUNT),
30            node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
31            registry,
32            node_pages: RwLock::new(vec![initial_node_page]),
33            edge_pages: RwLock::new(vec![initial_edge_page]),
34            current_node_page: AtomicU32::new(0),
35            current_edge_page: AtomicU32::new(0),
36            stats: GraphStats::default(),
37            node_count: AtomicU64::new(0),
38            edge_count: AtomicU64::new(0),
39        }
40    }
41
42    /// Intern an arbitrary node label string. Convenience wrapper over
43    /// [`LabelRegistry::intern`]; the returned [`LabelId`] can be passed to
44    /// the upcoming label-id-aware insert APIs (PR3).
45    pub fn intern_node_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
46        self.registry
47            .intern(Namespace::Node, label)
48            .map_err(|e| GraphStoreError::InvalidData(e.to_string()))
49    }
50
51    /// Intern an arbitrary edge label string.
52    pub fn intern_edge_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
53        self.registry
54            .intern(Namespace::Edge, label)
55            .map_err(|e| GraphStoreError::InvalidData(e.to_string()))
56    }
57
58    /// Publish this graph's secondary index into an external
59    /// [`IndexRegistry`]. The registry holds an `Arc` pointing to the same
60    /// live index, so planners consulting the registry see current stats
61    /// without any copy/refresh logic.
62    ///
63    /// Scope: `IndexScope::graph(collection)`. Idempotent — subsequent
64    /// calls replace the previous entry.
65    pub fn publish_indexes(&self, registry: &IndexRegistry, collection: &str) {
66        registry.register(
67            IndexScope::graph(collection),
68            Arc::clone(&self.node_secondary) as Arc<dyn crate::storage::index::IndexBase>,
69        );
70    }
71
72    /// Add a node using a category label string. Interns `category` into the
73    /// [`LabelRegistry`] and writes the node in v2 format.
74    pub fn add_node_with_label(
75        &self,
76        id: &str,
77        display_label: &str,
78        category: &str,
79    ) -> Result<RecordLocation, GraphStoreError> {
80        if self.node_index.contains(id) {
81            return Err(GraphStoreError::NodeExists(id.to_string()));
82        }
83        let label_id = self.intern_node_label(category)?;
84        let node = StoredNode {
85            id: id.to_string(),
86            label: display_label.to_string(),
87            node_type: category.to_string(),
88            label_id,
89            flags: 0,
90            out_edge_count: 0,
91            in_edge_count: 0,
92            page_id: 0,
93            slot: 0,
94            table_ref: None,
95            vector_ref: None,
96        };
97        let location = self.write_node_record(id, &node)?;
98        self.node_index.insert(id.to_string(), location);
99        self.node_secondary.insert(id, label_id, display_label);
100        self.node_count.fetch_add(1, Ordering::Relaxed);
101        Ok(location)
102    }
103
104    /// Add an edge using a category label string.
105    pub fn add_edge_with_label(
106        &self,
107        source_id: &str,
108        target_id: &str,
109        category: &str,
110        weight: f32,
111    ) -> Result<RecordLocation, GraphStoreError> {
112        if !self.node_index.contains(source_id) {
113            return Err(GraphStoreError::NodeNotFound(source_id.to_string()));
114        }
115        if !self.node_index.contains(target_id) {
116            return Err(GraphStoreError::NodeNotFound(target_id.to_string()));
117        }
118        let label_id = self.intern_edge_label(category)?;
119        let edge = StoredEdge {
120            source_id: source_id.to_string(),
121            target_id: target_id.to_string(),
122            edge_type: category.to_string(),
123            label_id,
124            weight,
125            page_id: 0,
126            slot: 0,
127        };
128        let location = self.write_edge_record(source_id, target_id, label_id, &edge)?;
129        self.edge_index
130            .add_edge(source_id, target_id, category, weight);
131        self.edge_count.fetch_add(1, Ordering::Relaxed);
132        Ok(location)
133    }
134
135    /// Internal: encode a [`StoredNode`] and append to the current node page,
136    /// rolling over to a new page when full.
137    fn write_node_record(
138        &self,
139        id: &str,
140        node: &StoredNode,
141    ) -> Result<RecordLocation, GraphStoreError> {
142        let encoded = node.encode();
143        let mut pages = self
144            .node_pages
145            .write()
146            .map_err(|_| GraphStoreError::LockPoisoned)?;
147        let current_page_id = self.current_node_page.load(Ordering::Acquire);
148        let page = &mut pages[current_page_id as usize];
149        match page.insert_cell(id.as_bytes(), &encoded) {
150            Ok(slot) => Ok(RecordLocation {
151                page_id: current_page_id,
152                slot: slot as u16,
153            }),
154            Err(_) => {
155                let new_page_id = pages.len() as u32;
156                let mut new_page = Page::new(PageType::GraphNode, new_page_id);
157                let slot = new_page
158                    .insert_cell(id.as_bytes(), &encoded)
159                    .map_err(|_| GraphStoreError::PageFull)?;
160                pages.push(new_page);
161                self.current_node_page.store(new_page_id, Ordering::Release);
162                Ok(RecordLocation {
163                    page_id: new_page_id,
164                    slot: slot as u16,
165                })
166            }
167        }
168    }
169
170    /// Internal: encode a [`StoredEdge`] and append it.
171    fn write_edge_record(
172        &self,
173        source_id: &str,
174        target_id: &str,
175        label_id: LabelId,
176        edge: &StoredEdge,
177    ) -> Result<RecordLocation, GraphStoreError> {
178        let encoded = edge.encode();
179        let edge_key = format!("{}|{}|{}", source_id, label_id.as_u32(), target_id);
180        let mut pages = self
181            .edge_pages
182            .write()
183            .map_err(|_| GraphStoreError::LockPoisoned)?;
184        let current_page_id = self.current_edge_page.load(Ordering::Acquire);
185        let page = &mut pages[current_page_id as usize];
186        match page.insert_cell(edge_key.as_bytes(), &encoded) {
187            Ok(slot) => Ok(RecordLocation {
188                page_id: current_page_id,
189                slot: slot as u16,
190            }),
191            Err(_) => {
192                let new_page_id = pages.len() as u32;
193                let mut new_page = Page::new(PageType::GraphEdge, new_page_id);
194                let slot = new_page
195                    .insert_cell(edge_key.as_bytes(), &encoded)
196                    .map_err(|_| GraphStoreError::PageFull)?;
197                pages.push(new_page);
198                self.current_edge_page.store(new_page_id, Ordering::Release);
199                Ok(RecordLocation {
200                    page_id: new_page_id,
201                    slot: slot as u16,
202                })
203            }
204        }
205    }
206
207    /// Add a node linked to a table row (for unified queries).
208    pub fn add_node_linked(
209        &self,
210        id: &str,
211        label: &str,
212        category: &str,
213        table_id: u16,
214        row_id: u64,
215    ) -> Result<RecordLocation, GraphStoreError> {
216        if self.node_index.contains(id) {
217            return Err(GraphStoreError::NodeExists(id.to_string()));
218        }
219        let label_id = self.intern_node_label(category)?;
220        let node = StoredNode {
221            id: id.to_string(),
222            label: label.to_string(),
223            node_type: category.to_string(),
224            label_id,
225            flags: NODE_FLAG_HAS_TABLE_REF,
226            out_edge_count: 0,
227            in_edge_count: 0,
228            page_id: 0,
229            slot: 0,
230            table_ref: Some(TableRef::new(table_id, row_id)),
231            vector_ref: None,
232        };
233
234        let location = self.write_node_record(id, &node)?;
235        self.node_index.insert(id.to_string(), location);
236        self.node_secondary.insert(id, label_id, label);
237        self.node_count.fetch_add(1, Ordering::Relaxed);
238        Ok(location)
239    }
240
241    /// Get table reference for a node (if linked)
242    pub fn get_node_table_ref(&self, node_id: &str) -> Option<TableRef> {
243        self.get_node(node_id).and_then(|n| n.table_ref)
244    }
245
246    /// Get a node by ID (lock-free read)
247    pub fn get_node(&self, id: &str) -> Option<StoredNode> {
248        let location = self.node_index.get(id)?;
249
250        let pages = self.node_pages.read().ok()?;
251        let page = pages.get(location.page_id as usize)?;
252
253        let (_, value) = page.read_cell(location.slot as usize).ok()?;
254        StoredNode::decode(&value, location.page_id, location.slot)
255    }
256
257    /// Get all outgoing edges from a node `(edge_label, target, weight)`.
258    #[inline]
259    pub fn outgoing_edges(&self, source_id: &str) -> Vec<(String, String, f32)> {
260        self.edge_index.outgoing(source_id)
261    }
262
263    /// Get all incoming edges to a node `(edge_label, source, weight)`.
264    #[inline]
265    pub fn incoming_edges(&self, target_id: &str) -> Vec<(String, String, f32)> {
266        self.edge_index.incoming(target_id)
267    }
268
269    /// Get outgoing edges of a specific label.
270    #[inline]
271    pub fn outgoing_of_type(&self, source_id: &str, edge_label: &str) -> Vec<(String, f32)> {
272        self.edge_index.outgoing_of_type(source_id, edge_label)
273    }
274
275    /// Check if a node exists
276    #[inline]
277    pub fn has_node(&self, id: &str) -> bool {
278        self.node_index.contains(id)
279    }
280
281    /// Get node count
282    #[inline]
283    pub fn node_count(&self) -> u64 {
284        self.node_count.load(Ordering::Relaxed)
285    }
286
287    /// Get edge count
288    #[inline]
289    pub fn edge_count(&self) -> u64 {
290        self.edge_count.load(Ordering::Relaxed)
291    }
292
293    /// Iterate over all nodes (streaming)
294    pub fn iter_nodes(&self) -> NodeIterator<'_> {
295        NodeIterator {
296            store: self,
297            page_idx: 0,
298            cell_idx: 0,
299        }
300    }
301
302    /// Iterate all edges in the graph
303    ///
304    /// This collects outgoing edges from all nodes to build a complete edge list.
305    /// Returns StoredEdge structs with source, target, type, and weight.
306    pub fn iter_all_edges(&self) -> Vec<StoredEdge> {
307        let mut edges = Vec::new();
308
309        for node in self.iter_nodes() {
310            for (edge_label, target_id, weight) in self.outgoing_edges(&node.id) {
311                let label_id = self
312                    .registry
313                    .lookup(Namespace::Edge, &edge_label)
314                    .unwrap_or(UNSET_LABEL_ID);
315                edges.push(StoredEdge {
316                    source_id: node.id.clone(),
317                    target_id,
318                    edge_type: edge_label,
319                    label_id,
320                    weight,
321                    page_id: 0,
322                    slot: 0,
323                });
324            }
325        }
326
327        edges
328    }
329
330    /// Get nodes whose category resolves to `label_id`. O(k) via secondary
331    /// index plus one fetch per id.
332    pub fn nodes_of_label(&self, label_id: LabelId) -> Vec<StoredNode> {
333        self.node_secondary
334            .nodes_by_type(label_id)
335            .into_iter()
336            .filter_map(|id| self.get_node(&id))
337            .collect()
338    }
339
340    /// Get nodes with a given label. Backed by the secondary inverted index
341    /// (`label → node_id set`) with a bloom-filter pre-check for absent
342    /// labels.
343    pub fn nodes_by_label(&self, label: &str) -> Vec<StoredNode> {
344        self.node_secondary
345            .nodes_by_label(label)
346            .into_iter()
347            .filter_map(|id| self.get_node(&id))
348            .collect()
349    }
350
351    /// Get nodes whose category label (as registered in the
352    /// [`LabelRegistry`]) matches the given string. Replaces the
353    /// enum-typed [`nodes_of_type`] for callers that work with arbitrary
354    /// user-defined labels.
355    ///
356    /// O(k) lookup via secondary index keyed by [`LabelId`].
357    pub fn nodes_with_category(&self, category: &str) -> Vec<StoredNode> {
358        let Some(label_id) = self.registry.lookup(Namespace::Node, category) else {
359            return Vec::new();
360        };
361        self.nodes_of_label(label_id)
362    }
363
364    /// Returns `true` iff the label is *possibly* present. Bloom-backed
365    /// fast path for planners that want to skip a traversal without paying
366    /// the set lookup cost.
367    pub fn may_contain_label(&self, label: &str) -> bool {
368        self.node_secondary.may_contain_label(label)
369    }
370
371    /// Read-only handle to the secondary index (for planner/diagnostics).
372    pub fn node_secondary_index(&self) -> &NodeSecondaryIndex {
373        &self.node_secondary
374    }
375
376    /// Get statistics
377    pub fn stats(&self) -> GraphStats {
378        let mut stats = GraphStats {
379            node_count: self.node_count.load(Ordering::Relaxed),
380            edge_count: self.edge_count.load(Ordering::Relaxed),
381            node_pages: self.node_pages.read().map(|p| p.len() as u32).unwrap_or(0),
382            edge_pages: self.edge_pages.read().map(|p| p.len() as u32).unwrap_or(0),
383            ..Default::default()
384        };
385
386        // Per-category counts derived from the secondary index — O(number
387        // of distinct labels) instead of O(node_count). The secondary
388        // index already maintains the bucket cardinalities incrementally
389        // on every add/remove, so this is essentially free.
390        for (label_id, count) in self.node_secondary.label_id_counts() {
391            if let Some((Namespace::Node, label)) = self.registry.resolve(label_id) {
392                stats.nodes_by_label.insert(label, count);
393            }
394        }
395
396        stats
397    }
398
399    /// Serialize to bytes for persistence (file format v2: includes the
400    /// embedded [`LabelRegistry`] catalog right after the fixed header).
401    pub fn serialize(&self) -> Vec<u8> {
402        let registry_bytes = self.registry.encode().unwrap_or_default();
403        let node_pages = self
404            .node_pages
405            .read()
406            .map(|pages| pages.iter().map(|page| page.as_bytes().to_vec()).collect())
407            .unwrap_or_default();
408        let edge_pages = self
409            .edge_pages
410            .read()
411            .map(|pages| pages.iter().map(|page| page.as_bytes().to_vec()).collect())
412            .unwrap_or_default();
413
414        reddb_file::encode_graph_store_frame(&reddb_file::GraphStoreFrame {
415            version: reddb_file::GRAPH_STORE_VERSION_V2,
416            node_count: self.node_count.load(Ordering::Relaxed),
417            edge_count: self.edge_count.load(Ordering::Relaxed),
418            registry_bytes: Some(registry_bytes),
419            node_pages,
420            edge_pages,
421        })
422        .expect("in-memory graph store should encode")
423    }
424
425    /// Deserialize from bytes. Dual-path: a v1 file (no embedded registry,
426    /// 1-byte enum discriminants) is read with [`StoredNode::decode_v1`]
427    /// against a freshly-seeded legacy registry. A v2 file restores the
428    /// registry from its embedded blob and decodes records via
429    /// [`StoredNode::decode`].
430    pub fn deserialize(data: &[u8]) -> Result<Self, GraphStoreError> {
431        let frame = reddb_file::decode_graph_store_frame(data, PAGE_SIZE)
432            .map_err(|e| GraphStoreError::InvalidData(e.to_string()))?;
433
434        // V2 carries the registry blob inline. V1 has none (legacy seed).
435        let registry: Arc<LabelRegistry> = match frame.version {
436            1 => Arc::new(LabelRegistry::with_legacy_seed()),
437            2 => {
438                let registry_bytes = frame.registry_bytes.as_deref().ok_or_else(|| {
439                    GraphStoreError::InvalidData("Missing registry blob".to_string())
440                })?;
441                let reg = LabelRegistry::decode(registry_bytes)
442                    .map_err(|e| GraphStoreError::InvalidData(e.to_string()))?;
443                Arc::new(reg)
444            }
445            v => {
446                return Err(GraphStoreError::InvalidData(format!(
447                    "Unsupported graph file version {}",
448                    v
449                )));
450            }
451        };
452
453        let mut node_pages = Vec::with_capacity(frame.node_pages.len());
454        for page_bytes in &frame.node_pages {
455            let page = Page::from_slice(page_bytes)
456                .map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
457            node_pages.push(page);
458        }
459
460        let mut edge_pages = Vec::with_capacity(frame.edge_pages.len());
461        for page_bytes in &frame.edge_pages {
462            let page = Page::from_slice(page_bytes)
463                .map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
464            edge_pages.push(page);
465        }
466
467        // V1 records on disk use the legacy 1-byte enum header, which the
468        // rest of GraphStore (get_node, iterators) does not understand. Migrate
469        // in place: decode every v1 cell, re-insert via the v2 write path.
470        if frame.version == 1 {
471            let store = Self::with_registry(Arc::clone(&registry));
472            for (page_idx, page) in node_pages.iter().enumerate() {
473                let cell_count = page.cell_count() as usize;
474                for cell_idx in 0..cell_count {
475                    if let Ok((_, value)) = page.read_cell(cell_idx) {
476                        if let Some(n) =
477                            StoredNode::decode_v1(&value, page_idx as u32, cell_idx as u16)
478                        {
479                            // V1 node_type already carries the canonical
480                            // legacy label string thanks to the v1 decoder.
481                            store.add_node_with_label(&n.id, &n.label, &n.node_type)?;
482                        }
483                    }
484                }
485            }
486            for (page_idx, page) in edge_pages.iter().enumerate() {
487                let cell_count = page.cell_count() as usize;
488                for cell_idx in 0..cell_count {
489                    if let Ok((_, value)) = page.read_cell(cell_idx) {
490                        if let Some(e) =
491                            StoredEdge::decode_v1(&value, page_idx as u32, cell_idx as u16)
492                        {
493                            // Skip edges whose endpoints failed to migrate.
494                            if !store.has_node(&e.source_id) || !store.has_node(&e.target_id) {
495                                continue;
496                            }
497                            store.add_edge_with_label(
498                                &e.source_id,
499                                &e.target_id,
500                                &e.edge_type,
501                                e.weight,
502                            )?;
503                        }
504                    }
505                }
506            }
507            // Sanity-check counts (v1 file headers can theoretically lie; a
508            // mismatch here points at a corrupt blob, but is not fatal —
509            // the store reflects what we successfully migrated).
510            let _ = (frame.node_count, frame.edge_count);
511            return Ok(store);
512        }
513
514        let store = Self {
515            node_index: ShardedIndex::new(16),
516            edge_index: EdgeIndex::new(16),
517            node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
518            registry,
519            node_pages: RwLock::new(node_pages),
520            edge_pages: RwLock::new(edge_pages),
521            current_node_page: AtomicU32::new(0),
522            current_edge_page: AtomicU32::new(0),
523            stats: GraphStats::default(),
524            node_count: AtomicU64::new(frame.node_count),
525            edge_count: AtomicU64::new(frame.edge_count),
526        };
527
528        store.rebuild_indexes(frame.version)?;
529
530        Ok(store)
531    }
532
533    /// Rebuild indexes from pages. `version` selects the on-disk record
534    /// format used when each cell was written.
535    fn rebuild_indexes(&self, version: u32) -> Result<(), GraphStoreError> {
536        let decode_node = |bytes: &[u8], page_idx: u32, slot: u16| match version {
537            1 => StoredNode::decode_v1(bytes, page_idx, slot),
538            _ => StoredNode::decode(bytes, page_idx, slot),
539        };
540        let decode_edge = |bytes: &[u8], page_idx: u32, slot: u16| match version {
541            1 => StoredEdge::decode_v1(bytes, page_idx, slot),
542            _ => StoredEdge::decode(bytes, page_idx, slot),
543        };
544
545        // Rebuild node + secondary index
546        self.node_secondary.clear();
547        if let Ok(pages) = self.node_pages.read() {
548            for (page_idx, page) in pages.iter().enumerate() {
549                let cell_count = page.cell_count() as usize;
550                for cell_idx in 0..cell_count {
551                    if let Ok((key, value)) = page.read_cell(cell_idx) {
552                        let id = String::from_utf8_lossy(&key).to_string();
553                        self.node_index.insert(
554                            id.clone(),
555                            RecordLocation {
556                                page_id: page_idx as u32,
557                                slot: cell_idx as u16,
558                            },
559                        );
560                        if let Some(node) = decode_node(&value, page_idx as u32, cell_idx as u16) {
561                            self.node_secondary.insert(&id, node.label_id, &node.label);
562                        }
563                    }
564                }
565            }
566
567            if !pages.is_empty() {
568                self.current_node_page
569                    .store((pages.len() - 1) as u32, Ordering::Release);
570            }
571        }
572
573        // Rebuild edge index
574        if let Ok(pages) = self.edge_pages.read() {
575            for (page_idx, page) in pages.iter().enumerate() {
576                let cell_count = page.cell_count() as usize;
577                for cell_idx in 0..cell_count {
578                    if let Ok((_, value)) = page.read_cell(cell_idx) {
579                        if let Some(edge) = decode_edge(&value, page_idx as u32, cell_idx as u16) {
580                            self.edge_index.add_edge(
581                                &edge.source_id,
582                                &edge.target_id,
583                                &edge.edge_type,
584                                edge.weight,
585                            );
586                        }
587                    }
588                }
589            }
590
591            if !pages.is_empty() {
592                self.current_edge_page
593                    .store((pages.len() - 1) as u32, Ordering::Release);
594            }
595        }
596
597        Ok(())
598    }
599}