1use std::sync::Arc;
2
3use crate::storage::index::{IndexRegistry, IndexScope};
4
5use super::label_registry::Namespace;
6use super::*;
7
8impl GraphStore {
9 pub fn new() -> Self {
13 Self::with_registry(Arc::new(LabelRegistry::with_legacy_seed()))
14 }
15
16 pub fn with_registry(registry: Arc<LabelRegistry>) -> Self {
21 const SHARD_COUNT: usize = 16;
23
24 let initial_node_page = Page::new(PageType::GraphNode, 0);
25 let initial_edge_page = Page::new(PageType::GraphEdge, 0);
26
27 Self {
28 node_index: ShardedIndex::new(SHARD_COUNT),
29 edge_index: EdgeIndex::new(SHARD_COUNT),
30 node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
31 registry,
32 node_pages: RwLock::new(vec![initial_node_page]),
33 edge_pages: RwLock::new(vec![initial_edge_page]),
34 current_node_page: AtomicU32::new(0),
35 current_edge_page: AtomicU32::new(0),
36 stats: GraphStats::default(),
37 node_count: AtomicU64::new(0),
38 edge_count: AtomicU64::new(0),
39 }
40 }
41
42 pub fn intern_node_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
46 self.registry
47 .intern(Namespace::Node, label)
48 .map_err(|e| GraphStoreError::InvalidData(e.to_string()))
49 }
50
51 pub fn intern_edge_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
53 self.registry
54 .intern(Namespace::Edge, label)
55 .map_err(|e| GraphStoreError::InvalidData(e.to_string()))
56 }
57
58 pub fn publish_indexes(&self, registry: &IndexRegistry, collection: &str) {
66 registry.register(
67 IndexScope::graph(collection),
68 Arc::clone(&self.node_secondary) as Arc<dyn crate::storage::index::IndexBase>,
69 );
70 }
71
72 pub fn add_node_with_label(
75 &self,
76 id: &str,
77 display_label: &str,
78 category: &str,
79 ) -> Result<RecordLocation, GraphStoreError> {
80 if self.node_index.contains(id) {
81 return Err(GraphStoreError::NodeExists(id.to_string()));
82 }
83 let label_id = self.intern_node_label(category)?;
84 let node = StoredNode {
85 id: id.to_string(),
86 label: display_label.to_string(),
87 node_type: category.to_string(),
88 label_id,
89 flags: 0,
90 out_edge_count: 0,
91 in_edge_count: 0,
92 page_id: 0,
93 slot: 0,
94 table_ref: None,
95 vector_ref: None,
96 };
97 let location = self.write_node_record(id, &node)?;
98 self.node_index.insert(id.to_string(), location);
99 self.node_secondary.insert(id, label_id, display_label);
100 self.node_count.fetch_add(1, Ordering::Relaxed);
101 Ok(location)
102 }
103
104 pub fn add_edge_with_label(
106 &self,
107 source_id: &str,
108 target_id: &str,
109 category: &str,
110 weight: f32,
111 ) -> Result<RecordLocation, GraphStoreError> {
112 if !self.node_index.contains(source_id) {
113 return Err(GraphStoreError::NodeNotFound(source_id.to_string()));
114 }
115 if !self.node_index.contains(target_id) {
116 return Err(GraphStoreError::NodeNotFound(target_id.to_string()));
117 }
118 let label_id = self.intern_edge_label(category)?;
119 let edge = StoredEdge {
120 source_id: source_id.to_string(),
121 target_id: target_id.to_string(),
122 edge_type: category.to_string(),
123 label_id,
124 weight,
125 page_id: 0,
126 slot: 0,
127 };
128 let location = self.write_edge_record(source_id, target_id, label_id, &edge)?;
129 self.edge_index
130 .add_edge(source_id, target_id, category, weight);
131 self.edge_count.fetch_add(1, Ordering::Relaxed);
132 Ok(location)
133 }
134
135 fn write_node_record(
138 &self,
139 id: &str,
140 node: &StoredNode,
141 ) -> Result<RecordLocation, GraphStoreError> {
142 let encoded = node.encode();
143 let mut pages = self
144 .node_pages
145 .write()
146 .map_err(|_| GraphStoreError::LockPoisoned)?;
147 let current_page_id = self.current_node_page.load(Ordering::Acquire);
148 let page = &mut pages[current_page_id as usize];
149 match page.insert_cell(id.as_bytes(), &encoded) {
150 Ok(slot) => Ok(RecordLocation {
151 page_id: current_page_id,
152 slot: slot as u16,
153 }),
154 Err(_) => {
155 let new_page_id = pages.len() as u32;
156 let mut new_page = Page::new(PageType::GraphNode, new_page_id);
157 let slot = new_page
158 .insert_cell(id.as_bytes(), &encoded)
159 .map_err(|_| GraphStoreError::PageFull)?;
160 pages.push(new_page);
161 self.current_node_page.store(new_page_id, Ordering::Release);
162 Ok(RecordLocation {
163 page_id: new_page_id,
164 slot: slot as u16,
165 })
166 }
167 }
168 }
169
170 fn write_edge_record(
172 &self,
173 source_id: &str,
174 target_id: &str,
175 label_id: LabelId,
176 edge: &StoredEdge,
177 ) -> Result<RecordLocation, GraphStoreError> {
178 let encoded = edge.encode();
179 let edge_key = format!("{}|{}|{}", source_id, label_id.as_u32(), target_id);
180 let mut pages = self
181 .edge_pages
182 .write()
183 .map_err(|_| GraphStoreError::LockPoisoned)?;
184 let current_page_id = self.current_edge_page.load(Ordering::Acquire);
185 let page = &mut pages[current_page_id as usize];
186 match page.insert_cell(edge_key.as_bytes(), &encoded) {
187 Ok(slot) => Ok(RecordLocation {
188 page_id: current_page_id,
189 slot: slot as u16,
190 }),
191 Err(_) => {
192 let new_page_id = pages.len() as u32;
193 let mut new_page = Page::new(PageType::GraphEdge, new_page_id);
194 let slot = new_page
195 .insert_cell(edge_key.as_bytes(), &encoded)
196 .map_err(|_| GraphStoreError::PageFull)?;
197 pages.push(new_page);
198 self.current_edge_page.store(new_page_id, Ordering::Release);
199 Ok(RecordLocation {
200 page_id: new_page_id,
201 slot: slot as u16,
202 })
203 }
204 }
205 }
206
207 pub fn add_node_linked(
209 &self,
210 id: &str,
211 label: &str,
212 category: &str,
213 table_id: u16,
214 row_id: u64,
215 ) -> Result<RecordLocation, GraphStoreError> {
216 if self.node_index.contains(id) {
217 return Err(GraphStoreError::NodeExists(id.to_string()));
218 }
219 let label_id = self.intern_node_label(category)?;
220 let node = StoredNode {
221 id: id.to_string(),
222 label: label.to_string(),
223 node_type: category.to_string(),
224 label_id,
225 flags: NODE_FLAG_HAS_TABLE_REF,
226 out_edge_count: 0,
227 in_edge_count: 0,
228 page_id: 0,
229 slot: 0,
230 table_ref: Some(TableRef::new(table_id, row_id)),
231 vector_ref: None,
232 };
233
234 let location = self.write_node_record(id, &node)?;
235 self.node_index.insert(id.to_string(), location);
236 self.node_secondary.insert(id, label_id, label);
237 self.node_count.fetch_add(1, Ordering::Relaxed);
238 Ok(location)
239 }
240
241 pub fn get_node_table_ref(&self, node_id: &str) -> Option<TableRef> {
243 self.get_node(node_id).and_then(|n| n.table_ref)
244 }
245
246 pub fn get_node(&self, id: &str) -> Option<StoredNode> {
248 let location = self.node_index.get(id)?;
249
250 let pages = self.node_pages.read().ok()?;
251 let page = pages.get(location.page_id as usize)?;
252
253 let (_, value) = page.read_cell(location.slot as usize).ok()?;
254 StoredNode::decode(&value, location.page_id, location.slot)
255 }
256
257 #[inline]
259 pub fn outgoing_edges(&self, source_id: &str) -> Vec<(String, String, f32)> {
260 self.edge_index.outgoing(source_id)
261 }
262
263 #[inline]
265 pub fn incoming_edges(&self, target_id: &str) -> Vec<(String, String, f32)> {
266 self.edge_index.incoming(target_id)
267 }
268
269 #[inline]
271 pub fn outgoing_of_type(&self, source_id: &str, edge_label: &str) -> Vec<(String, f32)> {
272 self.edge_index.outgoing_of_type(source_id, edge_label)
273 }
274
275 #[inline]
277 pub fn has_node(&self, id: &str) -> bool {
278 self.node_index.contains(id)
279 }
280
281 #[inline]
283 pub fn node_count(&self) -> u64 {
284 self.node_count.load(Ordering::Relaxed)
285 }
286
287 #[inline]
289 pub fn edge_count(&self) -> u64 {
290 self.edge_count.load(Ordering::Relaxed)
291 }
292
293 pub fn iter_nodes(&self) -> NodeIterator<'_> {
295 NodeIterator {
296 store: self,
297 page_idx: 0,
298 cell_idx: 0,
299 }
300 }
301
302 pub fn iter_all_edges(&self) -> Vec<StoredEdge> {
307 let mut edges = Vec::new();
308
309 for node in self.iter_nodes() {
310 for (edge_label, target_id, weight) in self.outgoing_edges(&node.id) {
311 let label_id = self
312 .registry
313 .lookup(Namespace::Edge, &edge_label)
314 .unwrap_or(UNSET_LABEL_ID);
315 edges.push(StoredEdge {
316 source_id: node.id.clone(),
317 target_id,
318 edge_type: edge_label,
319 label_id,
320 weight,
321 page_id: 0,
322 slot: 0,
323 });
324 }
325 }
326
327 edges
328 }
329
330 pub fn nodes_of_label(&self, label_id: LabelId) -> Vec<StoredNode> {
333 self.node_secondary
334 .nodes_by_type(label_id)
335 .into_iter()
336 .filter_map(|id| self.get_node(&id))
337 .collect()
338 }
339
340 pub fn nodes_by_label(&self, label: &str) -> Vec<StoredNode> {
344 self.node_secondary
345 .nodes_by_label(label)
346 .into_iter()
347 .filter_map(|id| self.get_node(&id))
348 .collect()
349 }
350
351 pub fn nodes_with_category(&self, category: &str) -> Vec<StoredNode> {
358 let Some(label_id) = self.registry.lookup(Namespace::Node, category) else {
359 return Vec::new();
360 };
361 self.nodes_of_label(label_id)
362 }
363
364 pub fn may_contain_label(&self, label: &str) -> bool {
368 self.node_secondary.may_contain_label(label)
369 }
370
371 pub fn node_secondary_index(&self) -> &NodeSecondaryIndex {
373 &self.node_secondary
374 }
375
376 pub fn stats(&self) -> GraphStats {
378 let mut stats = GraphStats {
379 node_count: self.node_count.load(Ordering::Relaxed),
380 edge_count: self.edge_count.load(Ordering::Relaxed),
381 node_pages: self.node_pages.read().map(|p| p.len() as u32).unwrap_or(0),
382 edge_pages: self.edge_pages.read().map(|p| p.len() as u32).unwrap_or(0),
383 ..Default::default()
384 };
385
386 for (label_id, count) in self.node_secondary.label_id_counts() {
391 if let Some((Namespace::Node, label)) = self.registry.resolve(label_id) {
392 stats.nodes_by_label.insert(label, count);
393 }
394 }
395
396 stats
397 }
398
399 pub fn serialize(&self) -> Vec<u8> {
402 let mut buf = Vec::new();
403
404 buf.extend_from_slice(b"RBGR"); buf.extend_from_slice(&2u32.to_le_bytes()); buf.extend_from_slice(&self.node_count.load(Ordering::Relaxed).to_le_bytes());
408 buf.extend_from_slice(&self.edge_count.load(Ordering::Relaxed).to_le_bytes());
409
410 let registry_bytes = self.registry.encode().unwrap_or_default();
413 buf.extend_from_slice(&(registry_bytes.len() as u32).to_le_bytes());
414 buf.extend_from_slice(®istry_bytes);
415
416 if let Ok(pages) = self.node_pages.read() {
418 buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
419 for page in pages.iter() {
420 buf.extend_from_slice(page.as_bytes());
421 }
422 }
423
424 if let Ok(pages) = self.edge_pages.read() {
426 buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
427 for page in pages.iter() {
428 buf.extend_from_slice(page.as_bytes());
429 }
430 }
431
432 buf
433 }
434
435 pub fn deserialize(data: &[u8]) -> Result<Self, GraphStoreError> {
441 if data.len() < 24 {
442 return Err(GraphStoreError::InvalidData("Too short".to_string()));
443 }
444 if &data[0..4] != b"RBGR" {
445 return Err(GraphStoreError::InvalidData("Invalid magic".to_string()));
446 }
447
448 let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
449 let node_count = u64::from_le_bytes([
450 data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
451 ]);
452 let edge_count = u64::from_le_bytes([
453 data[16], data[17], data[18], data[19], data[20], data[21], data[22], data[23],
454 ]);
455
456 let mut offset = 24;
457
458 let registry: Arc<LabelRegistry> = match version {
460 1 => Arc::new(LabelRegistry::with_legacy_seed()),
461 2 => {
462 if data.len() < offset + 4 {
463 return Err(GraphStoreError::InvalidData(
464 "Truncated v2 header".to_string(),
465 ));
466 }
467 let reg_len = u32::from_le_bytes([
468 data[offset],
469 data[offset + 1],
470 data[offset + 2],
471 data[offset + 3],
472 ]) as usize;
473 offset += 4;
474 if data.len() < offset + reg_len {
475 return Err(GraphStoreError::InvalidData(
476 "Truncated registry blob".to_string(),
477 ));
478 }
479 let reg = LabelRegistry::decode(&data[offset..offset + reg_len])
480 .map_err(|e| GraphStoreError::InvalidData(e.to_string()))?;
481 offset += reg_len;
482 Arc::new(reg)
483 }
484 v => {
485 return Err(GraphStoreError::InvalidData(format!(
486 "Unsupported graph file version {}",
487 v
488 )));
489 }
490 };
491
492 if data.len() < offset + 4 {
494 return Err(GraphStoreError::InvalidData(
495 "Truncated before node-page count".to_string(),
496 ));
497 }
498 let node_page_count = u32::from_le_bytes([
499 data[offset],
500 data[offset + 1],
501 data[offset + 2],
502 data[offset + 3],
503 ]) as usize;
504 offset += 4;
505
506 let mut node_pages = Vec::with_capacity(node_page_count);
507 for _ in 0..node_page_count {
508 if offset + PAGE_SIZE > data.len() {
509 return Err(GraphStoreError::InvalidData(
510 "Truncated node pages".to_string(),
511 ));
512 }
513 let page = Page::from_slice(&data[offset..offset + PAGE_SIZE])
514 .map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
515 node_pages.push(page);
516 offset += PAGE_SIZE;
517 }
518
519 if data.len() < offset + 4 {
521 return Err(GraphStoreError::InvalidData(
522 "Truncated before edge-page count".to_string(),
523 ));
524 }
525 let edge_page_count = u32::from_le_bytes([
526 data[offset],
527 data[offset + 1],
528 data[offset + 2],
529 data[offset + 3],
530 ]) as usize;
531 offset += 4;
532
533 let mut edge_pages = Vec::with_capacity(edge_page_count);
534 for _ in 0..edge_page_count {
535 if offset + PAGE_SIZE > data.len() {
536 return Err(GraphStoreError::InvalidData(
537 "Truncated edge pages".to_string(),
538 ));
539 }
540 let page = Page::from_slice(&data[offset..offset + PAGE_SIZE])
541 .map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
542 edge_pages.push(page);
543 offset += PAGE_SIZE;
544 }
545
546 if version == 1 {
550 let store = Self::with_registry(Arc::clone(®istry));
551 for (page_idx, page) in node_pages.iter().enumerate() {
552 let cell_count = page.cell_count() as usize;
553 for cell_idx in 0..cell_count {
554 if let Ok((_, value)) = page.read_cell(cell_idx) {
555 if let Some(n) =
556 StoredNode::decode_v1(&value, page_idx as u32, cell_idx as u16)
557 {
558 store.add_node_with_label(&n.id, &n.label, &n.node_type)?;
561 }
562 }
563 }
564 }
565 for (page_idx, page) in edge_pages.iter().enumerate() {
566 let cell_count = page.cell_count() as usize;
567 for cell_idx in 0..cell_count {
568 if let Ok((_, value)) = page.read_cell(cell_idx) {
569 if let Some(e) =
570 StoredEdge::decode_v1(&value, page_idx as u32, cell_idx as u16)
571 {
572 if !store.has_node(&e.source_id) || !store.has_node(&e.target_id) {
574 continue;
575 }
576 store.add_edge_with_label(
577 &e.source_id,
578 &e.target_id,
579 &e.edge_type,
580 e.weight,
581 )?;
582 }
583 }
584 }
585 }
586 let _ = (node_count, edge_count);
590 return Ok(store);
591 }
592
593 let store = Self {
594 node_index: ShardedIndex::new(16),
595 edge_index: EdgeIndex::new(16),
596 node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
597 registry,
598 node_pages: RwLock::new(node_pages),
599 edge_pages: RwLock::new(edge_pages),
600 current_node_page: AtomicU32::new(0),
601 current_edge_page: AtomicU32::new(0),
602 stats: GraphStats::default(),
603 node_count: AtomicU64::new(node_count),
604 edge_count: AtomicU64::new(edge_count),
605 };
606
607 store.rebuild_indexes(version)?;
608
609 Ok(store)
610 }
611
612 fn rebuild_indexes(&self, version: u32) -> Result<(), GraphStoreError> {
615 let decode_node = |bytes: &[u8], page_idx: u32, slot: u16| match version {
616 1 => StoredNode::decode_v1(bytes, page_idx, slot),
617 _ => StoredNode::decode(bytes, page_idx, slot),
618 };
619 let decode_edge = |bytes: &[u8], page_idx: u32, slot: u16| match version {
620 1 => StoredEdge::decode_v1(bytes, page_idx, slot),
621 _ => StoredEdge::decode(bytes, page_idx, slot),
622 };
623
624 self.node_secondary.clear();
626 if let Ok(pages) = self.node_pages.read() {
627 for (page_idx, page) in pages.iter().enumerate() {
628 let cell_count = page.cell_count() as usize;
629 for cell_idx in 0..cell_count {
630 if let Ok((key, value)) = page.read_cell(cell_idx) {
631 let id = String::from_utf8_lossy(&key).to_string();
632 self.node_index.insert(
633 id.clone(),
634 RecordLocation {
635 page_id: page_idx as u32,
636 slot: cell_idx as u16,
637 },
638 );
639 if let Some(node) = decode_node(&value, page_idx as u32, cell_idx as u16) {
640 self.node_secondary.insert(&id, node.label_id, &node.label);
641 }
642 }
643 }
644 }
645
646 if !pages.is_empty() {
647 self.current_node_page
648 .store((pages.len() - 1) as u32, Ordering::Release);
649 }
650 }
651
652 if let Ok(pages) = self.edge_pages.read() {
654 for (page_idx, page) in pages.iter().enumerate() {
655 let cell_count = page.cell_count() as usize;
656 for cell_idx in 0..cell_count {
657 if let Ok((_, value)) = page.read_cell(cell_idx) {
658 if let Some(edge) = decode_edge(&value, page_idx as u32, cell_idx as u16) {
659 self.edge_index.add_edge(
660 &edge.source_id,
661 &edge.target_id,
662 &edge.edge_type,
663 edge.weight,
664 );
665 }
666 }
667 }
668 }
669
670 if !pages.is_empty() {
671 self.current_edge_page
672 .store((pages.len() - 1) as u32, Ordering::Release);
673 }
674 }
675
676 Ok(())
677 }
678}