Skip to main content

reddb_server/storage/engine/graph_store/
impl.rs

1use std::sync::Arc;
2
3use crate::storage::index::{IndexRegistry, IndexScope};
4
5use super::label_registry::Namespace;
6use super::*;
7
8impl GraphStore {
9    /// Create a new empty graph store with a fresh [`LabelRegistry`] that
10    /// has the legacy reserved label IDs (1..=19) pre-seeded so v1
11    /// on-disk graph records still decode round-trip.
12    pub fn new() -> Self {
13        Self::with_registry(Arc::new(LabelRegistry::with_legacy_seed()))
14    }
15
16    /// Create an empty graph store sharing the given [`LabelRegistry`]. Use
17    /// this when multiple [`GraphStore`] instances should agree on
18    /// [`LabelId`] assignments (e.g. when the same database holds several
19    /// named graphs).
20    pub fn with_registry(registry: Arc<LabelRegistry>) -> Self {
21        // Use 16 shards for good parallelism on modern CPUs
22        const SHARD_COUNT: usize = 16;
23
24        let initial_node_page = Page::new(PageType::GraphNode, 0);
25        let initial_edge_page = Page::new(PageType::GraphEdge, 0);
26
27        Self {
28            node_index: ShardedIndex::new(SHARD_COUNT),
29            edge_index: EdgeIndex::new(SHARD_COUNT),
30            node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
31            registry,
32            node_pages: RwLock::new(vec![initial_node_page]),
33            edge_pages: RwLock::new(vec![initial_edge_page]),
34            current_node_page: AtomicU32::new(0),
35            current_edge_page: AtomicU32::new(0),
36            stats: GraphStats::default(),
37            node_count: AtomicU64::new(0),
38            edge_count: AtomicU64::new(0),
39        }
40    }
41
42    /// Intern an arbitrary node label string. Convenience wrapper over
43    /// [`LabelRegistry::intern`]; the returned [`LabelId`] can be passed to
44    /// the upcoming label-id-aware insert APIs (PR3).
45    pub fn intern_node_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
46        self.registry
47            .intern(Namespace::Node, label)
48            .map_err(|e| GraphStoreError::InvalidData(e.to_string()))
49    }
50
51    /// Intern an arbitrary edge label string.
52    pub fn intern_edge_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
53        self.registry
54            .intern(Namespace::Edge, label)
55            .map_err(|e| GraphStoreError::InvalidData(e.to_string()))
56    }
57
58    /// Publish this graph's secondary index into an external
59    /// [`IndexRegistry`]. The registry holds an `Arc` pointing to the same
60    /// live index, so planners consulting the registry see current stats
61    /// without any copy/refresh logic.
62    ///
63    /// Scope: `IndexScope::graph(collection)`. Idempotent — subsequent
64    /// calls replace the previous entry.
65    pub fn publish_indexes(&self, registry: &IndexRegistry, collection: &str) {
66        registry.register(
67            IndexScope::graph(collection),
68            Arc::clone(&self.node_secondary) as Arc<dyn crate::storage::index::IndexBase>,
69        );
70    }
71
72    /// Add a node using a category label string. Interns `category` into the
73    /// [`LabelRegistry`] and writes the node in v2 format.
74    pub fn add_node_with_label(
75        &self,
76        id: &str,
77        display_label: &str,
78        category: &str,
79    ) -> Result<RecordLocation, GraphStoreError> {
80        if self.node_index.contains(id) {
81            return Err(GraphStoreError::NodeExists(id.to_string()));
82        }
83        let label_id = self.intern_node_label(category)?;
84        let node = StoredNode {
85            id: id.to_string(),
86            label: display_label.to_string(),
87            node_type: category.to_string(),
88            label_id,
89            flags: 0,
90            out_edge_count: 0,
91            in_edge_count: 0,
92            page_id: 0,
93            slot: 0,
94            table_ref: None,
95            vector_ref: None,
96        };
97        let location = self.write_node_record(id, &node)?;
98        self.node_index.insert(id.to_string(), location);
99        self.node_secondary.insert(id, label_id, display_label);
100        self.node_count.fetch_add(1, Ordering::Relaxed);
101        Ok(location)
102    }
103
104    /// Add an edge using a category label string.
105    pub fn add_edge_with_label(
106        &self,
107        source_id: &str,
108        target_id: &str,
109        category: &str,
110        weight: f32,
111    ) -> Result<RecordLocation, GraphStoreError> {
112        if !self.node_index.contains(source_id) {
113            return Err(GraphStoreError::NodeNotFound(source_id.to_string()));
114        }
115        if !self.node_index.contains(target_id) {
116            return Err(GraphStoreError::NodeNotFound(target_id.to_string()));
117        }
118        let label_id = self.intern_edge_label(category)?;
119        let edge = StoredEdge {
120            source_id: source_id.to_string(),
121            target_id: target_id.to_string(),
122            edge_type: category.to_string(),
123            label_id,
124            weight,
125            page_id: 0,
126            slot: 0,
127        };
128        let location = self.write_edge_record(source_id, target_id, label_id, &edge)?;
129        self.edge_index
130            .add_edge(source_id, target_id, category, weight);
131        self.edge_count.fetch_add(1, Ordering::Relaxed);
132        Ok(location)
133    }
134
135    /// Internal: encode a [`StoredNode`] and append to the current node page,
136    /// rolling over to a new page when full.
137    fn write_node_record(
138        &self,
139        id: &str,
140        node: &StoredNode,
141    ) -> Result<RecordLocation, GraphStoreError> {
142        let encoded = node.encode();
143        let mut pages = self
144            .node_pages
145            .write()
146            .map_err(|_| GraphStoreError::LockPoisoned)?;
147        let current_page_id = self.current_node_page.load(Ordering::Acquire);
148        let page = &mut pages[current_page_id as usize];
149        match page.insert_cell(id.as_bytes(), &encoded) {
150            Ok(slot) => Ok(RecordLocation {
151                page_id: current_page_id,
152                slot: slot as u16,
153            }),
154            Err(_) => {
155                let new_page_id = pages.len() as u32;
156                let mut new_page = Page::new(PageType::GraphNode, new_page_id);
157                let slot = new_page
158                    .insert_cell(id.as_bytes(), &encoded)
159                    .map_err(|_| GraphStoreError::PageFull)?;
160                pages.push(new_page);
161                self.current_node_page.store(new_page_id, Ordering::Release);
162                Ok(RecordLocation {
163                    page_id: new_page_id,
164                    slot: slot as u16,
165                })
166            }
167        }
168    }
169
170    /// Internal: encode a [`StoredEdge`] and append it.
171    fn write_edge_record(
172        &self,
173        source_id: &str,
174        target_id: &str,
175        label_id: LabelId,
176        edge: &StoredEdge,
177    ) -> Result<RecordLocation, GraphStoreError> {
178        let encoded = edge.encode();
179        let edge_key = format!("{}|{}|{}", source_id, label_id.as_u32(), target_id);
180        let mut pages = self
181            .edge_pages
182            .write()
183            .map_err(|_| GraphStoreError::LockPoisoned)?;
184        let current_page_id = self.current_edge_page.load(Ordering::Acquire);
185        let page = &mut pages[current_page_id as usize];
186        match page.insert_cell(edge_key.as_bytes(), &encoded) {
187            Ok(slot) => Ok(RecordLocation {
188                page_id: current_page_id,
189                slot: slot as u16,
190            }),
191            Err(_) => {
192                let new_page_id = pages.len() as u32;
193                let mut new_page = Page::new(PageType::GraphEdge, new_page_id);
194                let slot = new_page
195                    .insert_cell(edge_key.as_bytes(), &encoded)
196                    .map_err(|_| GraphStoreError::PageFull)?;
197                pages.push(new_page);
198                self.current_edge_page.store(new_page_id, Ordering::Release);
199                Ok(RecordLocation {
200                    page_id: new_page_id,
201                    slot: slot as u16,
202                })
203            }
204        }
205    }
206
207    /// Add a node linked to a table row (for unified queries).
208    pub fn add_node_linked(
209        &self,
210        id: &str,
211        label: &str,
212        category: &str,
213        table_id: u16,
214        row_id: u64,
215    ) -> Result<RecordLocation, GraphStoreError> {
216        if self.node_index.contains(id) {
217            return Err(GraphStoreError::NodeExists(id.to_string()));
218        }
219        let label_id = self.intern_node_label(category)?;
220        let node = StoredNode {
221            id: id.to_string(),
222            label: label.to_string(),
223            node_type: category.to_string(),
224            label_id,
225            flags: NODE_FLAG_HAS_TABLE_REF,
226            out_edge_count: 0,
227            in_edge_count: 0,
228            page_id: 0,
229            slot: 0,
230            table_ref: Some(TableRef::new(table_id, row_id)),
231            vector_ref: None,
232        };
233
234        let location = self.write_node_record(id, &node)?;
235        self.node_index.insert(id.to_string(), location);
236        self.node_secondary.insert(id, label_id, label);
237        self.node_count.fetch_add(1, Ordering::Relaxed);
238        Ok(location)
239    }
240
241    /// Get table reference for a node (if linked)
242    pub fn get_node_table_ref(&self, node_id: &str) -> Option<TableRef> {
243        self.get_node(node_id).and_then(|n| n.table_ref)
244    }
245
246    /// Get a node by ID (lock-free read)
247    pub fn get_node(&self, id: &str) -> Option<StoredNode> {
248        let location = self.node_index.get(id)?;
249
250        let pages = self.node_pages.read().ok()?;
251        let page = pages.get(location.page_id as usize)?;
252
253        let (_, value) = page.read_cell(location.slot as usize).ok()?;
254        StoredNode::decode(&value, location.page_id, location.slot)
255    }
256
257    /// Get all outgoing edges from a node `(edge_label, target, weight)`.
258    #[inline]
259    pub fn outgoing_edges(&self, source_id: &str) -> Vec<(String, String, f32)> {
260        self.edge_index.outgoing(source_id)
261    }
262
263    /// Get all incoming edges to a node `(edge_label, source, weight)`.
264    #[inline]
265    pub fn incoming_edges(&self, target_id: &str) -> Vec<(String, String, f32)> {
266        self.edge_index.incoming(target_id)
267    }
268
269    /// Get outgoing edges of a specific label.
270    #[inline]
271    pub fn outgoing_of_type(&self, source_id: &str, edge_label: &str) -> Vec<(String, f32)> {
272        self.edge_index.outgoing_of_type(source_id, edge_label)
273    }
274
275    /// Check if a node exists
276    #[inline]
277    pub fn has_node(&self, id: &str) -> bool {
278        self.node_index.contains(id)
279    }
280
281    /// Get node count
282    #[inline]
283    pub fn node_count(&self) -> u64 {
284        self.node_count.load(Ordering::Relaxed)
285    }
286
287    /// Get edge count
288    #[inline]
289    pub fn edge_count(&self) -> u64 {
290        self.edge_count.load(Ordering::Relaxed)
291    }
292
293    /// Iterate over all nodes (streaming)
294    pub fn iter_nodes(&self) -> NodeIterator<'_> {
295        NodeIterator {
296            store: self,
297            page_idx: 0,
298            cell_idx: 0,
299        }
300    }
301
302    /// Iterate all edges in the graph
303    ///
304    /// This collects outgoing edges from all nodes to build a complete edge list.
305    /// Returns StoredEdge structs with source, target, type, and weight.
306    pub fn iter_all_edges(&self) -> Vec<StoredEdge> {
307        let mut edges = Vec::new();
308
309        for node in self.iter_nodes() {
310            for (edge_label, target_id, weight) in self.outgoing_edges(&node.id) {
311                let label_id = self
312                    .registry
313                    .lookup(Namespace::Edge, &edge_label)
314                    .unwrap_or(UNSET_LABEL_ID);
315                edges.push(StoredEdge {
316                    source_id: node.id.clone(),
317                    target_id,
318                    edge_type: edge_label,
319                    label_id,
320                    weight,
321                    page_id: 0,
322                    slot: 0,
323                });
324            }
325        }
326
327        edges
328    }
329
330    /// Get nodes whose category resolves to `label_id`. O(k) via secondary
331    /// index plus one fetch per id.
332    pub fn nodes_of_label(&self, label_id: LabelId) -> Vec<StoredNode> {
333        self.node_secondary
334            .nodes_by_type(label_id)
335            .into_iter()
336            .filter_map(|id| self.get_node(&id))
337            .collect()
338    }
339
340    /// Get nodes with a given label. Backed by the secondary inverted index
341    /// (`label → node_id set`) with a bloom-filter pre-check for absent
342    /// labels.
343    pub fn nodes_by_label(&self, label: &str) -> Vec<StoredNode> {
344        self.node_secondary
345            .nodes_by_label(label)
346            .into_iter()
347            .filter_map(|id| self.get_node(&id))
348            .collect()
349    }
350
351    /// Get nodes whose category label (as registered in the
352    /// [`LabelRegistry`]) matches the given string. Replaces the
353    /// enum-typed [`nodes_of_type`] for callers that work with arbitrary
354    /// user-defined labels.
355    ///
356    /// O(k) lookup via secondary index keyed by [`LabelId`].
357    pub fn nodes_with_category(&self, category: &str) -> Vec<StoredNode> {
358        let Some(label_id) = self.registry.lookup(Namespace::Node, category) else {
359            return Vec::new();
360        };
361        self.nodes_of_label(label_id)
362    }
363
364    /// Returns `true` iff the label is *possibly* present. Bloom-backed
365    /// fast path for planners that want to skip a traversal without paying
366    /// the set lookup cost.
367    pub fn may_contain_label(&self, label: &str) -> bool {
368        self.node_secondary.may_contain_label(label)
369    }
370
371    /// Read-only handle to the secondary index (for planner/diagnostics).
372    pub fn node_secondary_index(&self) -> &NodeSecondaryIndex {
373        &self.node_secondary
374    }
375
376    /// Get statistics
377    pub fn stats(&self) -> GraphStats {
378        let mut stats = GraphStats {
379            node_count: self.node_count.load(Ordering::Relaxed),
380            edge_count: self.edge_count.load(Ordering::Relaxed),
381            node_pages: self.node_pages.read().map(|p| p.len() as u32).unwrap_or(0),
382            edge_pages: self.edge_pages.read().map(|p| p.len() as u32).unwrap_or(0),
383            ..Default::default()
384        };
385
386        // Per-category counts derived from the secondary index — O(number
387        // of distinct labels) instead of O(node_count). The secondary
388        // index already maintains the bucket cardinalities incrementally
389        // on every add/remove, so this is essentially free.
390        for (label_id, count) in self.node_secondary.label_id_counts() {
391            if let Some((Namespace::Node, label)) = self.registry.resolve(label_id) {
392                stats.nodes_by_label.insert(label, count);
393            }
394        }
395
396        stats
397    }
398
399    /// Serialize to bytes for persistence (file format v2: includes the
400    /// embedded [`LabelRegistry`] catalog right after the fixed header).
401    pub fn serialize(&self) -> Vec<u8> {
402        let mut buf = Vec::new();
403
404        // Fixed header: magic(4) + version(4) + node_count(8) + edge_count(8) = 24 bytes.
405        buf.extend_from_slice(b"RBGR"); // RedDB GRaph
406        buf.extend_from_slice(&2u32.to_le_bytes()); // file format version
407        buf.extend_from_slice(&self.node_count.load(Ordering::Relaxed).to_le_bytes());
408        buf.extend_from_slice(&self.edge_count.load(Ordering::Relaxed).to_le_bytes());
409
410        // Embedded label registry: registry_len(4) + registry_bytes.
411        // Always present in v2; empty registry encodes as a 4-byte zero count.
412        let registry_bytes = self.registry.encode().unwrap_or_default();
413        buf.extend_from_slice(&(registry_bytes.len() as u32).to_le_bytes());
414        buf.extend_from_slice(&registry_bytes);
415
416        // Node pages: count(4) + pages.
417        if let Ok(pages) = self.node_pages.read() {
418            buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
419            for page in pages.iter() {
420                buf.extend_from_slice(page.as_bytes());
421            }
422        }
423
424        // Edge pages: count(4) + pages.
425        if let Ok(pages) = self.edge_pages.read() {
426            buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
427            for page in pages.iter() {
428                buf.extend_from_slice(page.as_bytes());
429            }
430        }
431
432        buf
433    }
434
435    /// Deserialize from bytes. Dual-path: a v1 file (no embedded registry,
436    /// 1-byte enum discriminants) is read with [`StoredNode::decode_v1`]
437    /// against a freshly-seeded legacy registry. A v2 file restores the
438    /// registry from its embedded blob and decodes records via
439    /// [`StoredNode::decode`].
440    pub fn deserialize(data: &[u8]) -> Result<Self, GraphStoreError> {
441        if data.len() < 24 {
442            return Err(GraphStoreError::InvalidData("Too short".to_string()));
443        }
444        if &data[0..4] != b"RBGR" {
445            return Err(GraphStoreError::InvalidData("Invalid magic".to_string()));
446        }
447
448        let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
449        let node_count = u64::from_le_bytes([
450            data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
451        ]);
452        let edge_count = u64::from_le_bytes([
453            data[16], data[17], data[18], data[19], data[20], data[21], data[22], data[23],
454        ]);
455
456        let mut offset = 24;
457
458        // V2 carries the registry blob inline. V1 has none (legacy seed).
459        let registry: Arc<LabelRegistry> = match version {
460            1 => Arc::new(LabelRegistry::with_legacy_seed()),
461            2 => {
462                if data.len() < offset + 4 {
463                    return Err(GraphStoreError::InvalidData(
464                        "Truncated v2 header".to_string(),
465                    ));
466                }
467                let reg_len = u32::from_le_bytes([
468                    data[offset],
469                    data[offset + 1],
470                    data[offset + 2],
471                    data[offset + 3],
472                ]) as usize;
473                offset += 4;
474                if data.len() < offset + reg_len {
475                    return Err(GraphStoreError::InvalidData(
476                        "Truncated registry blob".to_string(),
477                    ));
478                }
479                let reg = LabelRegistry::decode(&data[offset..offset + reg_len])
480                    .map_err(|e| GraphStoreError::InvalidData(e.to_string()))?;
481                offset += reg_len;
482                Arc::new(reg)
483            }
484            v => {
485                return Err(GraphStoreError::InvalidData(format!(
486                    "Unsupported graph file version {}",
487                    v
488                )));
489            }
490        };
491
492        // Node pages
493        if data.len() < offset + 4 {
494            return Err(GraphStoreError::InvalidData(
495                "Truncated before node-page count".to_string(),
496            ));
497        }
498        let node_page_count = u32::from_le_bytes([
499            data[offset],
500            data[offset + 1],
501            data[offset + 2],
502            data[offset + 3],
503        ]) as usize;
504        offset += 4;
505
506        let mut node_pages = Vec::with_capacity(node_page_count);
507        for _ in 0..node_page_count {
508            if offset + PAGE_SIZE > data.len() {
509                return Err(GraphStoreError::InvalidData(
510                    "Truncated node pages".to_string(),
511                ));
512            }
513            let page = Page::from_slice(&data[offset..offset + PAGE_SIZE])
514                .map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
515            node_pages.push(page);
516            offset += PAGE_SIZE;
517        }
518
519        // Edge pages
520        if data.len() < offset + 4 {
521            return Err(GraphStoreError::InvalidData(
522                "Truncated before edge-page count".to_string(),
523            ));
524        }
525        let edge_page_count = u32::from_le_bytes([
526            data[offset],
527            data[offset + 1],
528            data[offset + 2],
529            data[offset + 3],
530        ]) as usize;
531        offset += 4;
532
533        let mut edge_pages = Vec::with_capacity(edge_page_count);
534        for _ in 0..edge_page_count {
535            if offset + PAGE_SIZE > data.len() {
536                return Err(GraphStoreError::InvalidData(
537                    "Truncated edge pages".to_string(),
538                ));
539            }
540            let page = Page::from_slice(&data[offset..offset + PAGE_SIZE])
541                .map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
542            edge_pages.push(page);
543            offset += PAGE_SIZE;
544        }
545
546        // V1 records on disk use the legacy 1-byte enum header, which the
547        // rest of GraphStore (get_node, iterators) does not understand. Migrate
548        // in place: decode every v1 cell, re-insert via the v2 write path.
549        if version == 1 {
550            let store = Self::with_registry(Arc::clone(&registry));
551            for (page_idx, page) in node_pages.iter().enumerate() {
552                let cell_count = page.cell_count() as usize;
553                for cell_idx in 0..cell_count {
554                    if let Ok((_, value)) = page.read_cell(cell_idx) {
555                        if let Some(n) =
556                            StoredNode::decode_v1(&value, page_idx as u32, cell_idx as u16)
557                        {
558                            // V1 node_type already carries the canonical
559                            // legacy label string thanks to the v1 decoder.
560                            store.add_node_with_label(&n.id, &n.label, &n.node_type)?;
561                        }
562                    }
563                }
564            }
565            for (page_idx, page) in edge_pages.iter().enumerate() {
566                let cell_count = page.cell_count() as usize;
567                for cell_idx in 0..cell_count {
568                    if let Ok((_, value)) = page.read_cell(cell_idx) {
569                        if let Some(e) =
570                            StoredEdge::decode_v1(&value, page_idx as u32, cell_idx as u16)
571                        {
572                            // Skip edges whose endpoints failed to migrate.
573                            if !store.has_node(&e.source_id) || !store.has_node(&e.target_id) {
574                                continue;
575                            }
576                            store.add_edge_with_label(
577                                &e.source_id,
578                                &e.target_id,
579                                &e.edge_type,
580                                e.weight,
581                            )?;
582                        }
583                    }
584                }
585            }
586            // Sanity-check counts (v1 file headers can theoretically lie; a
587            // mismatch here points at a corrupt blob, but is not fatal —
588            // the store reflects what we successfully migrated).
589            let _ = (node_count, edge_count);
590            return Ok(store);
591        }
592
593        let store = Self {
594            node_index: ShardedIndex::new(16),
595            edge_index: EdgeIndex::new(16),
596            node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
597            registry,
598            node_pages: RwLock::new(node_pages),
599            edge_pages: RwLock::new(edge_pages),
600            current_node_page: AtomicU32::new(0),
601            current_edge_page: AtomicU32::new(0),
602            stats: GraphStats::default(),
603            node_count: AtomicU64::new(node_count),
604            edge_count: AtomicU64::new(edge_count),
605        };
606
607        store.rebuild_indexes(version)?;
608
609        Ok(store)
610    }
611
612    /// Rebuild indexes from pages. `version` selects the on-disk record
613    /// format used when each cell was written.
614    fn rebuild_indexes(&self, version: u32) -> Result<(), GraphStoreError> {
615        let decode_node = |bytes: &[u8], page_idx: u32, slot: u16| match version {
616            1 => StoredNode::decode_v1(bytes, page_idx, slot),
617            _ => StoredNode::decode(bytes, page_idx, slot),
618        };
619        let decode_edge = |bytes: &[u8], page_idx: u32, slot: u16| match version {
620            1 => StoredEdge::decode_v1(bytes, page_idx, slot),
621            _ => StoredEdge::decode(bytes, page_idx, slot),
622        };
623
624        // Rebuild node + secondary index
625        self.node_secondary.clear();
626        if let Ok(pages) = self.node_pages.read() {
627            for (page_idx, page) in pages.iter().enumerate() {
628                let cell_count = page.cell_count() as usize;
629                for cell_idx in 0..cell_count {
630                    if let Ok((key, value)) = page.read_cell(cell_idx) {
631                        let id = String::from_utf8_lossy(&key).to_string();
632                        self.node_index.insert(
633                            id.clone(),
634                            RecordLocation {
635                                page_id: page_idx as u32,
636                                slot: cell_idx as u16,
637                            },
638                        );
639                        if let Some(node) = decode_node(&value, page_idx as u32, cell_idx as u16) {
640                            self.node_secondary.insert(&id, node.label_id, &node.label);
641                        }
642                    }
643                }
644            }
645
646            if !pages.is_empty() {
647                self.current_node_page
648                    .store((pages.len() - 1) as u32, Ordering::Release);
649            }
650        }
651
652        // Rebuild edge index
653        if let Ok(pages) = self.edge_pages.read() {
654            for (page_idx, page) in pages.iter().enumerate() {
655                let cell_count = page.cell_count() as usize;
656                for cell_idx in 0..cell_count {
657                    if let Ok((_, value)) = page.read_cell(cell_idx) {
658                        if let Some(edge) = decode_edge(&value, page_idx as u32, cell_idx as u16) {
659                            self.edge_index.add_edge(
660                                &edge.source_id,
661                                &edge.target_id,
662                                &edge.edge_type,
663                                edge.weight,
664                            );
665                        }
666                    }
667                }
668            }
669
670            if !pages.is_empty() {
671                self.current_edge_page
672                    .store((pages.len() - 1) as u32, Ordering::Release);
673            }
674        }
675
676        Ok(())
677    }
678}