1use std::collections::HashMap;
31use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
32use std::sync::Arc;
33use std::sync::RwLock;
34
35use super::page::{Page, PageType, PAGE_SIZE};
36
37pub const MAX_ID_SIZE: usize = reddb_file::GRAPH_MAX_ID_SIZE;
39
40pub const MAX_LABEL_SIZE: usize = reddb_file::GRAPH_MAX_LABEL_SIZE;
42
43pub const NODE_HEADER_SIZE_V1: usize = reddb_file::GRAPH_NODE_HEADER_SIZE_V1;
46
47pub const NODE_HEADER_SIZE: usize = reddb_file::GRAPH_NODE_HEADER_SIZE;
51
52pub const TABLE_REF_SIZE: usize = reddb_file::GRAPH_TABLE_REF_SIZE;
54
55pub const NODE_FLAG_HAS_TABLE_REF: u8 = reddb_file::GRAPH_NODE_FLAG_HAS_TABLE_REF;
57pub const NODE_FLAG_HAS_VECTOR_REF: u8 = reddb_file::GRAPH_NODE_FLAG_HAS_VECTOR_REF;
59
60pub const VECTOR_REF_HEADER_SIZE: usize = reddb_file::GRAPH_VECTOR_REF_HEADER_SIZE;
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
65pub struct TableRef {
66 pub table_id: u16,
68 pub row_id: u64,
70}
71
72impl TableRef {
73 pub fn new(table_id: u16, row_id: u64) -> Self {
75 Self { table_id, row_id }
76 }
77
78 pub fn encode(&self) -> [u8; TABLE_REF_SIZE] {
80 reddb_file::encode_graph_table_ref(reddb_file::GraphTableRef {
81 table_id: self.table_id,
82 row_id: self.row_id,
83 })
84 }
85
86 pub fn decode(data: &[u8]) -> Option<Self> {
88 let decoded = reddb_file::decode_graph_table_ref(data)?;
89 Some(Self {
90 table_id: decoded.table_id,
91 row_id: decoded.row_id,
92 })
93 }
94}
95
96pub const EDGE_HEADER_SIZE_V1: usize = reddb_file::GRAPH_EDGE_HEADER_SIZE_V1;
99
100pub const EDGE_HEADER_SIZE: usize = reddb_file::GRAPH_EDGE_HEADER_SIZE;
104
105#[derive(Debug, Clone)]
107pub struct StoredNode {
108 pub id: String,
109 pub label: String,
110 pub node_type: String,
115 pub label_id: LabelId,
117 pub flags: u8,
118 pub out_edge_count: u32,
119 pub in_edge_count: u32,
120 pub page_id: u32,
122 pub slot: u16,
124 pub table_ref: Option<TableRef>,
126 pub vector_ref: Option<(String, u64)>,
128}
129
130impl StoredNode {
131 pub fn encode(&self) -> Vec<u8> {
133 reddb_file::encode_graph_node_record_v2(&self.as_file_record())
134 }
135
136 pub fn decode(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
138 let record = reddb_file::decode_graph_node_record_v2(data)?;
139 let label_id = LabelId::new(record.label_id);
140 let node_type = label_id_to_node_label(label_id);
143 Some(Self::from_file_record(
144 record, page_id, slot, node_type, label_id,
145 ))
146 }
147
148 pub fn decode_v1(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
153 let record = reddb_file::decode_graph_node_record_v1(data)?;
154 if record.legacy_type > 8 {
158 return None;
159 }
160 let label_id = LabelRegistry::legacy_node_label_id(record.legacy_type);
161 let node_type = label_id_to_node_label(label_id);
162 Some(Self::from_legacy_file_record(
163 record, page_id, slot, node_type, label_id,
164 ))
165 }
166
167 fn as_file_record(&self) -> reddb_file::GraphNodeRecord {
168 reddb_file::GraphNodeRecord {
169 id: self.id.clone(),
170 label: self.label.clone(),
171 label_id: self.label_id.as_u32(),
172 flags: self.flags,
173 out_edge_count: self.out_edge_count,
174 in_edge_count: self.in_edge_count,
175 table_ref: self.table_ref.map(|t| reddb_file::GraphTableRef {
176 table_id: t.table_id,
177 row_id: t.row_id,
178 }),
179 vector_ref: self.vector_ref.as_ref().map(|(collection, vector_id)| {
180 reddb_file::GraphVectorRef {
181 collection: collection.clone(),
182 vector_id: *vector_id,
183 }
184 }),
185 }
186 }
187
188 fn from_file_record(
189 record: reddb_file::GraphNodeRecord,
190 page_id: u32,
191 slot: u16,
192 node_type: String,
193 label_id: LabelId,
194 ) -> Self {
195 Self {
196 id: record.id,
197 label: record.label,
198 node_type,
199 label_id,
200 flags: record.flags,
201 out_edge_count: record.out_edge_count,
202 in_edge_count: record.in_edge_count,
203 page_id,
204 slot,
205 table_ref: record
206 .table_ref
207 .map(|t| TableRef::new(t.table_id, t.row_id)),
208 vector_ref: record.vector_ref.map(|v| (v.collection, v.vector_id)),
209 }
210 }
211
212 fn from_legacy_file_record(
213 record: reddb_file::LegacyGraphNodeRecord,
214 page_id: u32,
215 slot: u16,
216 node_type: String,
217 label_id: LabelId,
218 ) -> Self {
219 Self {
220 id: record.id,
221 label: record.label,
222 node_type,
223 label_id,
224 flags: record.flags,
225 out_edge_count: record.out_edge_count,
226 in_edge_count: record.in_edge_count,
227 page_id,
228 slot,
229 table_ref: record
230 .table_ref
231 .map(|t| TableRef::new(t.table_id, t.row_id)),
232 vector_ref: record.vector_ref.map(|v| (v.collection, v.vector_id)),
233 }
234 }
235
236 pub fn encoded_size(&self) -> usize {
238 reddb_file::graph_node_record_v2_encoded_size(&self.as_file_record())
239 }
240
241 pub fn link_to_row(&mut self, table_id: u16, row_id: u64) {
243 self.table_ref = Some(TableRef::new(table_id, row_id));
244 }
245
246 pub fn unlink_from_row(&mut self) {
248 self.table_ref = None;
249 }
250
251 pub fn link_to_vector(&mut self, collection: String, vector_id: u64) {
253 self.vector_ref = Some((collection, vector_id));
254 }
255
256 pub fn unlink_from_vector(&mut self) {
258 self.vector_ref = None;
259 }
260
261 pub fn is_linked(&self) -> bool {
263 self.table_ref.is_some()
264 }
265}
266
267#[derive(Debug, Clone)]
269pub struct StoredEdge {
270 pub source_id: String,
271 pub target_id: String,
272 pub edge_type: String,
274 pub label_id: LabelId,
276 pub weight: f32,
277 pub page_id: u32,
279 pub slot: u16,
281}
282
283impl StoredEdge {
284 pub fn encode(&self) -> Vec<u8> {
286 reddb_file::encode_graph_edge_record_v2(&self.as_file_record())
287 }
288
289 pub fn decode(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
291 let record = reddb_file::decode_graph_edge_record_v2(data)?;
292 let label_id = LabelId::new(record.label_id);
293 let edge_type = label_id_to_edge_label(label_id);
294 Some(Self::from_file_record(
295 record, page_id, slot, edge_type, label_id,
296 ))
297 }
298
299 pub fn decode_v1(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
303 let record = reddb_file::decode_graph_edge_record_v1(data)?;
304 if record.legacy_type > 9 {
305 return None;
306 }
307 let label_id = LabelRegistry::legacy_edge_label_id(record.legacy_type);
308 let edge_type = label_id_to_edge_label(label_id);
309 Some(Self::from_legacy_file_record(
310 record, page_id, slot, edge_type, label_id,
311 ))
312 }
313
314 pub fn encoded_size(&self) -> usize {
316 reddb_file::graph_edge_record_v2_encoded_size(&self.as_file_record())
317 }
318
319 fn as_file_record(&self) -> reddb_file::GraphEdgeRecord {
320 reddb_file::GraphEdgeRecord {
321 source_id: self.source_id.clone(),
322 target_id: self.target_id.clone(),
323 label_id: self.label_id.as_u32(),
324 weight: self.weight,
325 }
326 }
327
328 fn from_file_record(
329 record: reddb_file::GraphEdgeRecord,
330 page_id: u32,
331 slot: u16,
332 edge_type: String,
333 label_id: LabelId,
334 ) -> Self {
335 Self {
336 source_id: record.source_id,
337 target_id: record.target_id,
338 edge_type,
339 label_id,
340 weight: record.weight,
341 page_id,
342 slot,
343 }
344 }
345
346 fn from_legacy_file_record(
347 record: reddb_file::LegacyGraphEdgeRecord,
348 page_id: u32,
349 slot: u16,
350 edge_type: String,
351 label_id: LabelId,
352 ) -> Self {
353 Self {
354 source_id: record.source_id,
355 target_id: record.target_id,
356 edge_type,
357 label_id,
358 weight: record.weight,
359 page_id,
360 slot,
361 }
362 }
363}
364
365fn label_id_to_node_label(id: LabelId) -> String {
371 match id.as_u32() {
372 1 => "host".to_string(),
373 2 => "service".to_string(),
374 3 => "credential".to_string(),
375 4 => "vulnerability".to_string(),
376 5 => "endpoint".to_string(),
377 6 => "technology".to_string(),
378 7 => "user".to_string(),
379 8 => "domain".to_string(),
380 9 => "certificate".to_string(),
381 n => format!("label_{}", n),
382 }
383}
384
385fn label_id_to_edge_label(id: LabelId) -> String {
388 match id.as_u32() {
389 10 => "has_service".to_string(),
390 11 => "has_endpoint".to_string(),
391 12 => "uses_tech".to_string(),
392 13 => "auth_access".to_string(),
393 14 => "affected_by".to_string(),
394 15 => "contains".to_string(),
395 16 => "connects_to".to_string(),
396 17 => "related_to".to_string(),
397 18 => "has_user".to_string(),
398 19 => "has_cert".to_string(),
399 n => format!("label_{}", n),
400 }
401}
402
403#[derive(Debug, Clone, Copy)]
405pub struct RecordLocation {
406 pub page_id: u32,
407 pub slot: u16,
408}
409
410#[derive(Debug, Clone, Default)]
412pub struct GraphStats {
413 pub node_count: u64,
414 pub edge_count: u64,
415 pub node_pages: u32,
416 pub edge_pages: u32,
417 pub nodes_by_label: HashMap<String, u64>,
420 pub edges_by_label: HashMap<String, u64>,
422}
423
424pub struct ShardedIndex<V> {
427 shards: Vec<RwLock<HashMap<String, V>>>,
428 shard_count: usize,
429}
430
431impl<V: Clone> ShardedIndex<V> {
432 pub fn new(shard_count: usize) -> Self {
433 let shards = (0..shard_count)
434 .map(|_| RwLock::new(HashMap::new()))
435 .collect();
436 Self {
437 shards,
438 shard_count,
439 }
440 }
441
442 #[inline]
443 fn shard_for(&self, key: &str) -> usize {
444 let hash: u64 = key
446 .bytes()
447 .fold(0u64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u64));
448 (hash as usize) % self.shard_count
449 }
450
451 pub fn get(&self, key: &str) -> Option<V> {
452 let shard = self.shard_for(key);
453 self.shards[shard].read().ok()?.get(key).cloned()
454 }
455
456 pub fn insert(&self, key: String, value: V) {
457 let shard = self.shard_for(&key);
458 if let Ok(mut guard) = self.shards[shard].write() {
459 guard.insert(key, value);
460 }
461 }
462
463 pub fn remove(&self, key: &str) -> Option<V> {
464 let shard = self.shard_for(key);
465 self.shards[shard].write().ok()?.remove(key)
466 }
467
468 pub fn contains(&self, key: &str) -> bool {
469 let shard = self.shard_for(key);
470 self.shards[shard]
471 .read()
472 .ok()
473 .map(|g| g.contains_key(key))
474 .unwrap_or(false)
475 }
476
477 pub fn len(&self) -> usize {
478 self.shards
479 .iter()
480 .filter_map(|s| s.read().ok())
481 .map(|g| g.len())
482 .sum()
483 }
484
485 pub fn is_empty(&self) -> bool {
486 self.len() == 0
487 }
488}
489
490pub struct EdgeIndex {
495 forward: ShardedIndex<Vec<(String, String, f32)>>,
497 backward: ShardedIndex<Vec<(String, String, f32)>>,
499}
500
501impl EdgeIndex {
502 pub fn new(shard_count: usize) -> Self {
503 Self {
504 forward: ShardedIndex::new(shard_count),
505 backward: ShardedIndex::new(shard_count),
506 }
507 }
508
509 pub fn add_edge(&self, source: &str, target: &str, edge_label: &str, weight: f32) {
510 let shard = self.forward.shard_for(source);
511 if let Ok(mut guard) = self.forward.shards[shard].write() {
512 guard
513 .entry(source.to_string())
514 .or_insert_with(Vec::new)
515 .push((edge_label.to_string(), target.to_string(), weight));
516 }
517
518 let shard = self.backward.shard_for(target);
519 if let Ok(mut guard) = self.backward.shards[shard].write() {
520 guard
521 .entry(target.to_string())
522 .or_insert_with(Vec::new)
523 .push((edge_label.to_string(), source.to_string(), weight));
524 }
525 }
526
527 pub fn remove_edge(&self, source: &str, target: &str, edge_label: &str) {
528 let shard = self.forward.shard_for(source);
529 if let Ok(mut guard) = self.forward.shards[shard].write() {
530 if let Some(edges) = guard.get_mut(source) {
531 edges.retain(|(et, t, _)| !(et == edge_label && t == target));
532 }
533 }
534
535 let shard = self.backward.shard_for(target);
536 if let Ok(mut guard) = self.backward.shards[shard].write() {
537 if let Some(edges) = guard.get_mut(target) {
538 edges.retain(|(et, s, _)| !(et == edge_label && s == source));
539 }
540 }
541 }
542
543 pub fn outgoing(&self, source: &str) -> Vec<(String, String, f32)> {
544 self.forward.get(source).unwrap_or_default()
545 }
546
547 pub fn incoming(&self, target: &str) -> Vec<(String, String, f32)> {
548 self.backward.get(target).unwrap_or_default()
549 }
550
551 pub fn outgoing_of_type(&self, source: &str, edge_label: &str) -> Vec<(String, f32)> {
552 self.forward
553 .get(source)
554 .unwrap_or_default()
555 .into_iter()
556 .filter(|(et, _, _)| et == edge_label)
557 .map(|(_, t, w)| (t, w))
558 .collect()
559 }
560}
561
562pub struct GraphStore {
567 node_index: ShardedIndex<RecordLocation>,
569 edge_index: EdgeIndex,
571 node_secondary: std::sync::Arc<secondary_index::NodeSecondaryIndex>,
578 pub registry: Arc<LabelRegistry>,
581 node_pages: RwLock<Vec<Page>>,
583 edge_pages: RwLock<Vec<Page>>,
585 current_node_page: AtomicU32,
587 current_edge_page: AtomicU32,
589 stats: GraphStats,
591 node_count: AtomicU64,
592 edge_count: AtomicU64,
593}
594
595#[path = "graph_store/impl.rs"]
596mod graph_store_impl;
597pub mod label_registry;
598pub mod secondary_index;
599pub use label_registry::{
600 LabelId, LabelRegistry, LabelRegistryError, Namespace, FIRST_USER_LABEL_ID, MAX_LABEL_LEN,
601 UNSET_LABEL_ID,
602};
603pub use secondary_index::NodeSecondaryIndex;
604impl Default for GraphStore {
605 fn default() -> Self {
606 Self::new()
607 }
608}
609
610pub struct NodeIterator<'a> {
612 store: &'a GraphStore,
613 page_idx: usize,
614 cell_idx: usize,
615}
616
617impl<'a> Iterator for NodeIterator<'a> {
618 type Item = StoredNode;
619
620 fn next(&mut self) -> Option<Self::Item> {
621 let pages = self.store.node_pages.read().ok()?;
622
623 loop {
624 if self.page_idx >= pages.len() {
625 return None;
626 }
627
628 let page = &pages[self.page_idx];
629 let cell_count = page.cell_count() as usize;
630
631 if self.cell_idx >= cell_count {
632 self.page_idx += 1;
633 self.cell_idx = 0;
634 continue;
635 }
636
637 if let Ok((_, value)) = page.read_cell(self.cell_idx) {
638 self.cell_idx += 1;
639 if let Some(node) =
640 StoredNode::decode(&value, self.page_idx as u32, (self.cell_idx - 1) as u16)
641 {
642 return Some(node);
643 }
644 } else {
645 self.cell_idx += 1;
646 }
647 }
648 }
649}
650
651#[derive(Debug, Clone)]
653pub enum GraphStoreError {
654 NodeExists(String),
655 NodeNotFound(String),
656 EdgeNotFound(String, String),
657 PageFull,
658 LockPoisoned,
659 InvalidData(String),
660 IoError(String),
661}
662
663impl std::fmt::Display for GraphStoreError {
664 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665 match self {
666 Self::NodeExists(id) => write!(f, "Node already exists: {}", id),
667 Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
668 Self::EdgeNotFound(s, t) => write!(f, "Edge not found: {} -> {}", s, t),
669 Self::PageFull => write!(f, "Page is full"),
670 Self::LockPoisoned => write!(f, "Lock poisoned"),
671 Self::InvalidData(msg) => write!(f, "Invalid data: {}", msg),
672 Self::IoError(msg) => write!(f, "I/O error: {}", msg),
673 }
674 }
675}
676
677impl std::error::Error for GraphStoreError {}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682 use std::sync::Arc;
683
684 #[test]
685 fn test_graph_store_basic() {
686 let store = GraphStore::new();
687
688 store
690 .add_node_with_label("host:192.168.1.1", "Web Server", "host")
691 .unwrap();
692 store
693 .add_node_with_label("host:192.168.1.2", "Database", "host")
694 .unwrap();
695 store
696 .add_node_with_label("service:192.168.1.1:80:http", "HTTP", "service")
697 .unwrap();
698
699 assert_eq!(store.node_count(), 3);
700
701 store
703 .add_edge_with_label(
704 "host:192.168.1.1",
705 "service:192.168.1.1:80:http",
706 "has_service",
707 1.0,
708 )
709 .unwrap();
710 store
711 .add_edge_with_label("host:192.168.1.1", "host:192.168.1.2", "connects_to", 1.0)
712 .unwrap();
713
714 assert_eq!(store.edge_count(), 2);
715
716 let node = store.get_node("host:192.168.1.1").unwrap();
718 assert_eq!(node.label, "Web Server");
719
720 let out_edges = store.outgoing_edges("host:192.168.1.1");
721 assert_eq!(out_edges.len(), 2);
722 }
723
724 #[test]
725 fn test_graph_store_serialization() {
726 let store = GraphStore::new();
727
728 store
729 .add_node_with_label("host:10.0.0.1", "Server A", "host")
730 .unwrap();
731 store
732 .add_node_with_label("host:10.0.0.2", "Server B", "host")
733 .unwrap();
734 store
735 .add_edge_with_label("host:10.0.0.1", "host:10.0.0.2", "connects_to", 0.5)
736 .unwrap();
737
738 let bytes = store.serialize();
740
741 let restored = GraphStore::deserialize(&bytes).unwrap();
743
744 assert_eq!(restored.node_count(), 2);
745 assert_eq!(restored.edge_count(), 1);
746
747 let node = restored.get_node("host:10.0.0.1").unwrap();
748 assert_eq!(node.label, "Server A");
749 }
750
751 #[test]
752 fn test_concurrent_reads() {
753 use std::thread;
754
755 let store = Arc::new(GraphStore::new());
756
757 for i in 0..100 {
759 store
760 .add_node_with_label(&format!("host:{}", i), &format!("Host {}", i), "host")
761 .unwrap();
762 }
763
764 let mut handles = vec![];
766 for _ in 0..4 {
767 let store_clone = Arc::clone(&store);
768 handles.push(thread::spawn(move || {
769 for i in 0..100 {
770 let _ = store_clone.get_node(&format!("host:{}", i));
771 }
772 }));
773 }
774
775 for handle in handles {
776 handle.join().unwrap();
777 }
778
779 assert_eq!(store.node_count(), 100);
780 }
781
782 #[test]
783 fn test_edge_index_performance() {
784 let store = GraphStore::new();
785
786 store
788 .add_node_with_label("hub", "Hub Node", "host")
789 .unwrap();
790 for i in 0..100 {
791 store
792 .add_node_with_label(&format!("spoke:{}", i), &format!("Spoke {}", i), "host")
793 .unwrap();
794 store
795 .add_edge_with_label("hub", &format!("spoke:{}", i), "connects_to", 1.0)
796 .unwrap();
797 }
798
799 let edges = store.outgoing_edges("hub");
801 assert_eq!(edges.len(), 100);
802 }
803
804 #[test]
805 fn test_nodes_of_type_uses_secondary_index() {
806 let store = GraphStore::new();
807 store
808 .add_node_with_label("host:1", "Web Server", "host")
809 .unwrap();
810 store
811 .add_node_with_label("host:2", "DB Server", "host")
812 .unwrap();
813 store
814 .add_node_with_label("svc:1", "HTTP", "service")
815 .unwrap();
816 store
817 .add_node_with_label("vuln:1", "CVE-2024-1", "vulnerability")
818 .unwrap();
819
820 let hosts = store.nodes_with_category("host");
821 assert_eq!(hosts.len(), 2);
822 assert!(hosts.iter().all(|n| n.node_type == "host"));
823
824 let services = store.nodes_with_category("service");
825 assert_eq!(services.len(), 1);
826 assert_eq!(services[0].id, "svc:1");
827
828 assert_eq!(store.nodes_with_category("user").len(), 0);
829 }
830
831 #[test]
832 fn test_nodes_by_label_with_bloom_prune() {
833 let store = GraphStore::new();
834 store
835 .add_node_with_label("host:1", "Edge Router", "host")
836 .unwrap();
837 store
838 .add_node_with_label("host:2", "Edge Router", "host")
839 .unwrap();
840 store
841 .add_node_with_label("host:3", "Core Switch", "host")
842 .unwrap();
843
844 let routers = store.nodes_by_label("Edge Router");
845 assert_eq!(routers.len(), 2);
846
847 let unknown = store.nodes_by_label("Quantum Router 9000");
848 assert!(unknown.is_empty());
849 assert!(store.may_contain_label("Edge Router"));
851 assert!(store.may_contain_label("Core Switch"));
852 }
853
854 #[test]
855 fn test_publish_indexes_to_registry() {
856 use crate::storage::index::{IndexKind, IndexRegistry, IndexScope};
857
858 let store = GraphStore::new();
859 store.add_node_with_label("h:1", "Alpha", "host").unwrap();
860 store.add_node_with_label("h:2", "Beta", "host").unwrap();
861 store
862 .add_node_with_label("svc:1", "HTTP", "service")
863 .unwrap();
864
865 let registry = IndexRegistry::new();
866 store.publish_indexes(®istry, "infra");
867
868 let shared = registry.get(&IndexScope::graph("infra")).unwrap();
869 let stats = shared.stats();
870 assert_eq!(stats.entries, 6);
873 assert_eq!(stats.kind, IndexKind::Inverted);
874 assert!(stats.has_bloom);
875
876 store.add_node_with_label("h:3", "Gamma", "host").unwrap();
879 let updated = registry.get(&IndexScope::graph("infra")).unwrap().stats();
880 assert_eq!(updated.entries, 8);
881 }
882
883 #[test]
884 fn test_secondary_index_rebuilt_after_deserialize() {
885 let store = GraphStore::new();
886 store
887 .add_node_with_label("host:1", "Alpha", "host")
888 .unwrap();
889 store
890 .add_node_with_label("svc:1", "HTTP", "service")
891 .unwrap();
892
893 let bytes = store.serialize();
894 let restored = GraphStore::deserialize(&bytes).unwrap();
895
896 assert_eq!(restored.nodes_with_category("host").len(), 1);
897 assert_eq!(restored.nodes_by_label("HTTP").len(), 1);
898 assert!(restored.may_contain_label("Alpha"));
899 }
900
901 #[test]
902 fn test_node_iteration() {
903 let store = GraphStore::new();
904
905 for i in 0..50 {
906 store
907 .add_node_with_label(&format!("node:{}", i), &format!("Node {}", i), "host")
908 .unwrap();
909 }
910
911 let nodes: Vec<_> = store.iter_nodes().collect();
912 assert_eq!(nodes.len(), 50);
913 }
914
915 #[test]
916 fn legacy_node_type_interns_into_registry() {
917 let store = GraphStore::new();
918 store.add_node_with_label("h1", "web", "host").unwrap();
919 let id = store
921 .registry
922 .lookup(label_registry::Namespace::Node, "host")
923 .expect("legacy enum name should be interned");
924 let fetched = store.get_node("h1").unwrap();
925 assert_eq!(fetched.label_id, id);
926 assert_eq!(fetched.node_type, "host");
927 }
928
929 #[test]
930 fn v2_round_trip_preserves_user_labels() {
931 let store = GraphStore::new();
932 let user_id = store.intern_node_label("order").unwrap();
935 assert!(user_id.as_u32() >= label_registry::FIRST_USER_LABEL_ID);
936
937 store.add_node_with_label("h1", "web-1", "host").unwrap();
938 store.add_node_with_label("h2", "web-2", "service").unwrap();
939 store
940 .add_edge_with_label("h1", "h2", "connects_to", 1.0)
941 .unwrap();
942
943 let bytes = store.serialize();
944 let frame = reddb_file::decode_graph_store_frame(&bytes, PAGE_SIZE).unwrap();
945 assert_eq!(frame.version, reddb_file::GRAPH_STORE_VERSION_V2);
946
947 let restored = GraphStore::deserialize(&bytes).unwrap();
948 assert_eq!(
950 restored
951 .registry
952 .lookup(label_registry::Namespace::Node, "order"),
953 Some(user_id)
954 );
955 let h1 = restored.get_node("h1").unwrap();
957 assert_eq!(h1.node_type, "host");
958 assert_eq!(
959 h1.label_id,
960 restored
961 .registry
962 .lookup(label_registry::Namespace::Node, "host")
963 .unwrap()
964 );
965 let outgoing = restored.outgoing_edges("h1");
967 assert_eq!(outgoing.len(), 1);
968 assert_eq!(outgoing[0].0, "connects_to");
969 }
970
971 #[test]
972 fn v1_blob_deserializes_via_legacy_path() {
973 let mut node_page = Page::new(PageType::GraphNode, 0);
984 let mut v1_node = Vec::new();
986 v1_node.extend_from_slice(&2u16.to_le_bytes()); v1_node.extend_from_slice(&1u16.to_le_bytes()); v1_node.push(0); v1_node.push(0); v1_node.extend_from_slice(&0u16.to_le_bytes()); v1_node.extend_from_slice(&0u16.to_le_bytes()); v1_node.extend_from_slice(b"n1");
993 v1_node.extend_from_slice(b"L");
994 node_page.insert_cell(b"n1", &v1_node).unwrap();
995
996 let mut edge_page = Page::new(PageType::GraphEdge, 0);
997 let mut v1_edge = Vec::new();
998 v1_edge.extend_from_slice(&2u16.to_le_bytes()); v1_edge.extend_from_slice(&2u16.to_le_bytes()); v1_edge.push(0); v1_edge.extend_from_slice(&1.0f32.to_le_bytes()); v1_edge.extend_from_slice(b"n1");
1003 v1_edge.extend_from_slice(b"n1");
1004 edge_page.insert_cell(b"n1|0|n1", &v1_edge).unwrap();
1005
1006 let bytes = reddb_file::encode_graph_store_frame(&reddb_file::GraphStoreFrame {
1007 version: reddb_file::GRAPH_STORE_VERSION_V1,
1008 node_count: 1,
1009 edge_count: 1,
1010 registry_bytes: None,
1011 node_pages: vec![node_page.as_bytes().to_vec()],
1012 edge_pages: vec![edge_page.as_bytes().to_vec()],
1013 })
1014 .unwrap();
1015
1016 let store = GraphStore::deserialize(&bytes).expect("v1 blob deserializes");
1017 let node = store.get_node("n1").unwrap();
1019 assert_eq!(node.node_type, "host");
1020 assert_eq!(node.label_id, LabelId::new(1));
1021 let out = store.outgoing_edges("n1");
1023 assert_eq!(out.len(), 1);
1024 assert_eq!(out[0].0, "has_service");
1025 }
1026
1027 #[test]
1028 fn deserialize_rejects_unknown_version() {
1029 let mut bytes = [0u8; reddb_file::GRAPH_STORE_HEADER_LEN];
1030 bytes[0..4].copy_from_slice(&reddb_file::GRAPH_STORE_MAGIC);
1031 bytes[4..8].copy_from_slice(&999u32.to_le_bytes());
1032 match GraphStore::deserialize(&bytes) {
1033 Err(GraphStoreError::InvalidData(msg)) => assert!(msg.contains("unsupported")),
1034 Err(other) => panic!("unexpected error: {:?}", other),
1035 Ok(_) => panic!("expected error for unknown version"),
1036 }
1037 }
1038}