1use std::sync::Arc;
2
3use crate::storage::index::{IndexRegistry, IndexScope};
4
5use super::label_registry::Namespace;
6use super::*;
7
8impl GraphStore {
9 pub fn new() -> Self {
13 Self::with_registry(Arc::new(LabelRegistry::with_legacy_seed()))
14 }
15
16 pub fn with_registry(registry: Arc<LabelRegistry>) -> Self {
21 const SHARD_COUNT: usize = 16;
23
24 let initial_node_page = Page::new(PageType::GraphNode, 0);
25 let initial_edge_page = Page::new(PageType::GraphEdge, 0);
26
27 Self {
28 node_index: ShardedIndex::new(SHARD_COUNT),
29 edge_index: EdgeIndex::new(SHARD_COUNT),
30 node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
31 registry,
32 node_pages: RwLock::new(vec![initial_node_page]),
33 edge_pages: RwLock::new(vec![initial_edge_page]),
34 current_node_page: AtomicU32::new(0),
35 current_edge_page: AtomicU32::new(0),
36 stats: GraphStats::default(),
37 node_count: AtomicU64::new(0),
38 edge_count: AtomicU64::new(0),
39 }
40 }
41
42 pub fn intern_node_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
46 self.registry
47 .intern(Namespace::Node, label)
48 .map_err(|e| GraphStoreError::InvalidData(e.to_string()))
49 }
50
51 pub fn intern_edge_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
53 self.registry
54 .intern(Namespace::Edge, label)
55 .map_err(|e| GraphStoreError::InvalidData(e.to_string()))
56 }
57
58 pub fn publish_indexes(&self, registry: &IndexRegistry, collection: &str) {
66 registry.register(
67 IndexScope::graph(collection),
68 Arc::clone(&self.node_secondary) as Arc<dyn crate::storage::index::IndexBase>,
69 );
70 }
71
72 pub fn add_node_with_label(
75 &self,
76 id: &str,
77 display_label: &str,
78 category: &str,
79 ) -> Result<RecordLocation, GraphStoreError> {
80 if self.node_index.contains(id) {
81 return Err(GraphStoreError::NodeExists(id.to_string()));
82 }
83 let label_id = self.intern_node_label(category)?;
84 let node = StoredNode {
85 id: id.to_string(),
86 label: display_label.to_string(),
87 node_type: category.to_string(),
88 label_id,
89 flags: 0,
90 out_edge_count: 0,
91 in_edge_count: 0,
92 page_id: 0,
93 slot: 0,
94 table_ref: None,
95 vector_ref: None,
96 };
97 let location = self.write_node_record(id, &node)?;
98 self.node_index.insert(id.to_string(), location);
99 self.node_secondary.insert(id, label_id, display_label);
100 self.node_count.fetch_add(1, Ordering::Relaxed);
101 Ok(location)
102 }
103
104 pub fn add_edge_with_label(
106 &self,
107 source_id: &str,
108 target_id: &str,
109 category: &str,
110 weight: f32,
111 ) -> Result<RecordLocation, GraphStoreError> {
112 if !self.node_index.contains(source_id) {
113 return Err(GraphStoreError::NodeNotFound(source_id.to_string()));
114 }
115 if !self.node_index.contains(target_id) {
116 return Err(GraphStoreError::NodeNotFound(target_id.to_string()));
117 }
118 let label_id = self.intern_edge_label(category)?;
119 let edge = StoredEdge {
120 source_id: source_id.to_string(),
121 target_id: target_id.to_string(),
122 edge_type: category.to_string(),
123 label_id,
124 weight,
125 page_id: 0,
126 slot: 0,
127 };
128 let location = self.write_edge_record(source_id, target_id, label_id, &edge)?;
129 self.edge_index
130 .add_edge(source_id, target_id, category, weight);
131 self.edge_count.fetch_add(1, Ordering::Relaxed);
132 Ok(location)
133 }
134
135 fn write_node_record(
138 &self,
139 id: &str,
140 node: &StoredNode,
141 ) -> Result<RecordLocation, GraphStoreError> {
142 let encoded = node.encode();
143 let mut pages = self
144 .node_pages
145 .write()
146 .map_err(|_| GraphStoreError::LockPoisoned)?;
147 let current_page_id = self.current_node_page.load(Ordering::Acquire);
148 let page = &mut pages[current_page_id as usize];
149 match page.insert_cell(id.as_bytes(), &encoded) {
150 Ok(slot) => Ok(RecordLocation {
151 page_id: current_page_id,
152 slot: slot as u16,
153 }),
154 Err(_) => {
155 let new_page_id = pages.len() as u32;
156 let mut new_page = Page::new(PageType::GraphNode, new_page_id);
157 let slot = new_page
158 .insert_cell(id.as_bytes(), &encoded)
159 .map_err(|_| GraphStoreError::PageFull)?;
160 pages.push(new_page);
161 self.current_node_page.store(new_page_id, Ordering::Release);
162 Ok(RecordLocation {
163 page_id: new_page_id,
164 slot: slot as u16,
165 })
166 }
167 }
168 }
169
170 fn write_edge_record(
172 &self,
173 source_id: &str,
174 target_id: &str,
175 label_id: LabelId,
176 edge: &StoredEdge,
177 ) -> Result<RecordLocation, GraphStoreError> {
178 let encoded = edge.encode();
179 let edge_key = format!("{}|{}|{}", source_id, label_id.as_u32(), target_id);
180 let mut pages = self
181 .edge_pages
182 .write()
183 .map_err(|_| GraphStoreError::LockPoisoned)?;
184 let current_page_id = self.current_edge_page.load(Ordering::Acquire);
185 let page = &mut pages[current_page_id as usize];
186 match page.insert_cell(edge_key.as_bytes(), &encoded) {
187 Ok(slot) => Ok(RecordLocation {
188 page_id: current_page_id,
189 slot: slot as u16,
190 }),
191 Err(_) => {
192 let new_page_id = pages.len() as u32;
193 let mut new_page = Page::new(PageType::GraphEdge, new_page_id);
194 let slot = new_page
195 .insert_cell(edge_key.as_bytes(), &encoded)
196 .map_err(|_| GraphStoreError::PageFull)?;
197 pages.push(new_page);
198 self.current_edge_page.store(new_page_id, Ordering::Release);
199 Ok(RecordLocation {
200 page_id: new_page_id,
201 slot: slot as u16,
202 })
203 }
204 }
205 }
206
207 pub fn add_node_linked(
209 &self,
210 id: &str,
211 label: &str,
212 category: &str,
213 table_id: u16,
214 row_id: u64,
215 ) -> Result<RecordLocation, GraphStoreError> {
216 if self.node_index.contains(id) {
217 return Err(GraphStoreError::NodeExists(id.to_string()));
218 }
219 let label_id = self.intern_node_label(category)?;
220 let node = StoredNode {
221 id: id.to_string(),
222 label: label.to_string(),
223 node_type: category.to_string(),
224 label_id,
225 flags: NODE_FLAG_HAS_TABLE_REF,
226 out_edge_count: 0,
227 in_edge_count: 0,
228 page_id: 0,
229 slot: 0,
230 table_ref: Some(TableRef::new(table_id, row_id)),
231 vector_ref: None,
232 };
233
234 let location = self.write_node_record(id, &node)?;
235 self.node_index.insert(id.to_string(), location);
236 self.node_secondary.insert(id, label_id, label);
237 self.node_count.fetch_add(1, Ordering::Relaxed);
238 Ok(location)
239 }
240
241 pub fn get_node_table_ref(&self, node_id: &str) -> Option<TableRef> {
243 self.get_node(node_id).and_then(|n| n.table_ref)
244 }
245
246 pub fn get_node(&self, id: &str) -> Option<StoredNode> {
248 let location = self.node_index.get(id)?;
249
250 let pages = self.node_pages.read().ok()?;
251 let page = pages.get(location.page_id as usize)?;
252
253 let (_, value) = page.read_cell(location.slot as usize).ok()?;
254 StoredNode::decode(&value, location.page_id, location.slot)
255 }
256
257 #[inline]
259 pub fn outgoing_edges(&self, source_id: &str) -> Vec<(String, String, f32)> {
260 self.edge_index.outgoing(source_id)
261 }
262
263 #[inline]
265 pub fn incoming_edges(&self, target_id: &str) -> Vec<(String, String, f32)> {
266 self.edge_index.incoming(target_id)
267 }
268
269 #[inline]
271 pub fn outgoing_of_type(&self, source_id: &str, edge_label: &str) -> Vec<(String, f32)> {
272 self.edge_index.outgoing_of_type(source_id, edge_label)
273 }
274
275 #[inline]
277 pub fn has_node(&self, id: &str) -> bool {
278 self.node_index.contains(id)
279 }
280
281 #[inline]
283 pub fn node_count(&self) -> u64 {
284 self.node_count.load(Ordering::Relaxed)
285 }
286
287 #[inline]
289 pub fn edge_count(&self) -> u64 {
290 self.edge_count.load(Ordering::Relaxed)
291 }
292
293 pub fn iter_nodes(&self) -> NodeIterator<'_> {
295 NodeIterator {
296 store: self,
297 page_idx: 0,
298 cell_idx: 0,
299 }
300 }
301
302 pub fn iter_all_edges(&self) -> Vec<StoredEdge> {
307 let mut edges = Vec::new();
308
309 for node in self.iter_nodes() {
310 for (edge_label, target_id, weight) in self.outgoing_edges(&node.id) {
311 let label_id = self
312 .registry
313 .lookup(Namespace::Edge, &edge_label)
314 .unwrap_or(UNSET_LABEL_ID);
315 edges.push(StoredEdge {
316 source_id: node.id.clone(),
317 target_id,
318 edge_type: edge_label,
319 label_id,
320 weight,
321 page_id: 0,
322 slot: 0,
323 });
324 }
325 }
326
327 edges
328 }
329
330 pub fn nodes_of_label(&self, label_id: LabelId) -> Vec<StoredNode> {
333 self.node_secondary
334 .nodes_by_type(label_id)
335 .into_iter()
336 .filter_map(|id| self.get_node(&id))
337 .collect()
338 }
339
340 pub fn nodes_by_label(&self, label: &str) -> Vec<StoredNode> {
344 self.node_secondary
345 .nodes_by_label(label)
346 .into_iter()
347 .filter_map(|id| self.get_node(&id))
348 .collect()
349 }
350
351 pub fn nodes_with_category(&self, category: &str) -> Vec<StoredNode> {
358 let Some(label_id) = self.registry.lookup(Namespace::Node, category) else {
359 return Vec::new();
360 };
361 self.nodes_of_label(label_id)
362 }
363
364 pub fn may_contain_label(&self, label: &str) -> bool {
368 self.node_secondary.may_contain_label(label)
369 }
370
371 pub fn node_secondary_index(&self) -> &NodeSecondaryIndex {
373 &self.node_secondary
374 }
375
376 pub fn stats(&self) -> GraphStats {
378 let mut stats = GraphStats {
379 node_count: self.node_count.load(Ordering::Relaxed),
380 edge_count: self.edge_count.load(Ordering::Relaxed),
381 node_pages: self.node_pages.read().map(|p| p.len() as u32).unwrap_or(0),
382 edge_pages: self.edge_pages.read().map(|p| p.len() as u32).unwrap_or(0),
383 ..Default::default()
384 };
385
386 for (label_id, count) in self.node_secondary.label_id_counts() {
391 if let Some((Namespace::Node, label)) = self.registry.resolve(label_id) {
392 stats.nodes_by_label.insert(label, count);
393 }
394 }
395
396 stats
397 }
398
399 pub fn serialize(&self) -> Vec<u8> {
402 let registry_bytes = self.registry.encode().unwrap_or_default();
403 let node_pages = self
404 .node_pages
405 .read()
406 .map(|pages| pages.iter().map(|page| page.as_bytes().to_vec()).collect())
407 .unwrap_or_default();
408 let edge_pages = self
409 .edge_pages
410 .read()
411 .map(|pages| pages.iter().map(|page| page.as_bytes().to_vec()).collect())
412 .unwrap_or_default();
413
414 reddb_file::encode_graph_store_frame(&reddb_file::GraphStoreFrame {
415 version: reddb_file::GRAPH_STORE_VERSION_V2,
416 node_count: self.node_count.load(Ordering::Relaxed),
417 edge_count: self.edge_count.load(Ordering::Relaxed),
418 registry_bytes: Some(registry_bytes),
419 node_pages,
420 edge_pages,
421 })
422 .expect("in-memory graph store should encode")
423 }
424
425 pub fn deserialize(data: &[u8]) -> Result<Self, GraphStoreError> {
431 let frame = reddb_file::decode_graph_store_frame(data, PAGE_SIZE)
432 .map_err(|e| GraphStoreError::InvalidData(e.to_string()))?;
433
434 let registry: Arc<LabelRegistry> = match frame.version {
436 1 => Arc::new(LabelRegistry::with_legacy_seed()),
437 2 => {
438 let registry_bytes = frame.registry_bytes.as_deref().ok_or_else(|| {
439 GraphStoreError::InvalidData("Missing registry blob".to_string())
440 })?;
441 let reg = LabelRegistry::decode(registry_bytes)
442 .map_err(|e| GraphStoreError::InvalidData(e.to_string()))?;
443 Arc::new(reg)
444 }
445 v => {
446 return Err(GraphStoreError::InvalidData(format!(
447 "Unsupported graph file version {}",
448 v
449 )));
450 }
451 };
452
453 let mut node_pages = Vec::with_capacity(frame.node_pages.len());
454 for page_bytes in &frame.node_pages {
455 let page = Page::from_slice(page_bytes)
456 .map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
457 node_pages.push(page);
458 }
459
460 let mut edge_pages = Vec::with_capacity(frame.edge_pages.len());
461 for page_bytes in &frame.edge_pages {
462 let page = Page::from_slice(page_bytes)
463 .map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
464 edge_pages.push(page);
465 }
466
467 if frame.version == 1 {
471 let store = Self::with_registry(Arc::clone(®istry));
472 for (page_idx, page) in node_pages.iter().enumerate() {
473 let cell_count = page.cell_count() as usize;
474 for cell_idx in 0..cell_count {
475 if let Ok((_, value)) = page.read_cell(cell_idx) {
476 if let Some(n) =
477 StoredNode::decode_v1(&value, page_idx as u32, cell_idx as u16)
478 {
479 store.add_node_with_label(&n.id, &n.label, &n.node_type)?;
482 }
483 }
484 }
485 }
486 for (page_idx, page) in edge_pages.iter().enumerate() {
487 let cell_count = page.cell_count() as usize;
488 for cell_idx in 0..cell_count {
489 if let Ok((_, value)) = page.read_cell(cell_idx) {
490 if let Some(e) =
491 StoredEdge::decode_v1(&value, page_idx as u32, cell_idx as u16)
492 {
493 if !store.has_node(&e.source_id) || !store.has_node(&e.target_id) {
495 continue;
496 }
497 store.add_edge_with_label(
498 &e.source_id,
499 &e.target_id,
500 &e.edge_type,
501 e.weight,
502 )?;
503 }
504 }
505 }
506 }
507 let _ = (frame.node_count, frame.edge_count);
511 return Ok(store);
512 }
513
514 let store = Self {
515 node_index: ShardedIndex::new(16),
516 edge_index: EdgeIndex::new(16),
517 node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
518 registry,
519 node_pages: RwLock::new(node_pages),
520 edge_pages: RwLock::new(edge_pages),
521 current_node_page: AtomicU32::new(0),
522 current_edge_page: AtomicU32::new(0),
523 stats: GraphStats::default(),
524 node_count: AtomicU64::new(frame.node_count),
525 edge_count: AtomicU64::new(frame.edge_count),
526 };
527
528 store.rebuild_indexes(frame.version)?;
529
530 Ok(store)
531 }
532
533 fn rebuild_indexes(&self, version: u32) -> Result<(), GraphStoreError> {
536 let decode_node = |bytes: &[u8], page_idx: u32, slot: u16| match version {
537 1 => StoredNode::decode_v1(bytes, page_idx, slot),
538 _ => StoredNode::decode(bytes, page_idx, slot),
539 };
540 let decode_edge = |bytes: &[u8], page_idx: u32, slot: u16| match version {
541 1 => StoredEdge::decode_v1(bytes, page_idx, slot),
542 _ => StoredEdge::decode(bytes, page_idx, slot),
543 };
544
545 self.node_secondary.clear();
547 if let Ok(pages) = self.node_pages.read() {
548 for (page_idx, page) in pages.iter().enumerate() {
549 let cell_count = page.cell_count() as usize;
550 for cell_idx in 0..cell_count {
551 if let Ok((key, value)) = page.read_cell(cell_idx) {
552 let id = String::from_utf8_lossy(&key).to_string();
553 self.node_index.insert(
554 id.clone(),
555 RecordLocation {
556 page_id: page_idx as u32,
557 slot: cell_idx as u16,
558 },
559 );
560 if let Some(node) = decode_node(&value, page_idx as u32, cell_idx as u16) {
561 self.node_secondary.insert(&id, node.label_id, &node.label);
562 }
563 }
564 }
565 }
566
567 if !pages.is_empty() {
568 self.current_node_page
569 .store((pages.len() - 1) as u32, Ordering::Release);
570 }
571 }
572
573 if let Ok(pages) = self.edge_pages.read() {
575 for (page_idx, page) in pages.iter().enumerate() {
576 let cell_count = page.cell_count() as usize;
577 for cell_idx in 0..cell_count {
578 if let Ok((_, value)) = page.read_cell(cell_idx) {
579 if let Some(edge) = decode_edge(&value, page_idx as u32, cell_idx as u16) {
580 self.edge_index.add_edge(
581 &edge.source_id,
582 &edge.target_id,
583 &edge.edge_type,
584 edge.weight,
585 );
586 }
587 }
588 }
589 }
590
591 if !pages.is_empty() {
592 self.current_edge_page
593 .store((pages.len() - 1) as u32, Ordering::Release);
594 }
595 }
596
597 Ok(())
598 }
599}