1use std::collections::HashMap;
31use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
32use std::sync::Arc;
33use std::sync::RwLock;
34
35use super::page::{Page, PageType, PAGE_SIZE};
36
37pub const MAX_ID_SIZE: usize = 256;
39
40pub const MAX_LABEL_SIZE: usize = 512;
42
43pub const NODE_HEADER_SIZE_V1: usize = 10;
46
47pub const NODE_HEADER_SIZE: usize = 13;
51
52pub const TABLE_REF_SIZE: usize = 10;
54
55pub const NODE_FLAG_HAS_TABLE_REF: u8 = 0x01;
57pub const NODE_FLAG_HAS_VECTOR_REF: u8 = 0x02;
59
60pub const VECTOR_REF_HEADER_SIZE: usize = 10;
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
65pub struct TableRef {
66 pub table_id: u16,
68 pub row_id: u64,
70}
71
72impl TableRef {
73 pub fn new(table_id: u16, row_id: u64) -> Self {
75 Self { table_id, row_id }
76 }
77
78 pub fn encode(&self) -> [u8; TABLE_REF_SIZE] {
80 let mut buf = [0u8; TABLE_REF_SIZE];
81 buf[0..2].copy_from_slice(&self.table_id.to_le_bytes());
82 buf[2..10].copy_from_slice(&self.row_id.to_le_bytes());
83 buf
84 }
85
86 pub fn decode(data: &[u8]) -> Option<Self> {
88 if data.len() < TABLE_REF_SIZE {
89 return None;
90 }
91 Some(Self {
92 table_id: u16::from_le_bytes([data[0], data[1]]),
93 row_id: u64::from_le_bytes([
94 data[2], data[3], data[4], data[5], data[6], data[7], data[8], data[9],
95 ]),
96 })
97 }
98}
99
100pub const EDGE_HEADER_SIZE_V1: usize = 9;
103
104pub const EDGE_HEADER_SIZE: usize = 12;
108
109#[derive(Debug, Clone)]
111pub struct StoredNode {
112 pub id: String,
113 pub label: String,
114 pub node_type: String,
119 pub label_id: LabelId,
121 pub flags: u8,
122 pub out_edge_count: u32,
123 pub in_edge_count: u32,
124 pub page_id: u32,
126 pub slot: u16,
128 pub table_ref: Option<TableRef>,
130 pub vector_ref: Option<(String, u64)>,
132}
133
134impl StoredNode {
135 pub fn encode(&self) -> Vec<u8> {
137 let id_bytes = self.id.as_bytes();
138 let label_bytes = self.label.as_bytes();
139 let has_table_ref = self.table_ref.is_some();
140 let has_vector_ref = self.vector_ref.is_some();
141
142 let mut flags = self.flags & !(NODE_FLAG_HAS_TABLE_REF | NODE_FLAG_HAS_VECTOR_REF);
144 if has_table_ref {
145 flags |= NODE_FLAG_HAS_TABLE_REF;
146 }
147 if has_vector_ref {
148 flags |= NODE_FLAG_HAS_VECTOR_REF;
149 }
150
151 let table_ref_size = if has_table_ref { TABLE_REF_SIZE } else { 0 };
152 let vector_ref_size = if let Some((ref coll, _)) = self.vector_ref {
153 2 + coll.len() + 8
154 } else {
155 0
156 };
157 let mut buf = Vec::with_capacity(
158 NODE_HEADER_SIZE
159 + id_bytes.len()
160 + label_bytes.len()
161 + table_ref_size
162 + vector_ref_size,
163 );
164
165 buf.extend_from_slice(&(id_bytes.len() as u16).to_le_bytes());
167 buf.extend_from_slice(&(label_bytes.len() as u16).to_le_bytes());
168 buf.extend_from_slice(&self.label_id.as_u32().to_le_bytes());
169 buf.push(flags);
170 buf.extend_from_slice(&(self.out_edge_count as u16).to_le_bytes());
171 buf.extend_from_slice(&(self.in_edge_count as u16).to_le_bytes());
172
173 buf.extend_from_slice(id_bytes);
174 buf.extend_from_slice(label_bytes);
175
176 if let Some(ref tref) = self.table_ref {
177 buf.extend_from_slice(&tref.encode());
178 }
179
180 if let Some((ref collection, vector_id)) = self.vector_ref {
181 let coll_bytes = collection.as_bytes();
182 buf.extend_from_slice(&(coll_bytes.len() as u16).to_le_bytes());
183 buf.extend_from_slice(coll_bytes);
184 buf.extend_from_slice(&vector_id.to_le_bytes());
185 }
186
187 buf
188 }
189
190 pub fn decode(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
192 if data.len() < NODE_HEADER_SIZE {
193 return None;
194 }
195
196 let id_len = u16::from_le_bytes([data[0], data[1]]) as usize;
197 let label_len = u16::from_le_bytes([data[2], data[3]]) as usize;
198 let label_id = LabelId::new(u32::from_le_bytes([data[4], data[5], data[6], data[7]]));
199 let flags = data[8];
200 let out_edge_count = u16::from_le_bytes([data[9], data[10]]) as u32;
201 let in_edge_count = u16::from_le_bytes([data[11], data[12]]) as u32;
202 let node_type = label_id_to_node_label(label_id);
205
206 Self::decode_payload(
207 data,
208 page_id,
209 slot,
210 NODE_HEADER_SIZE,
211 id_len,
212 label_len,
213 flags,
214 out_edge_count,
215 in_edge_count,
216 node_type,
217 label_id,
218 )
219 }
220
221 pub fn decode_v1(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
226 if data.len() < NODE_HEADER_SIZE_V1 {
227 return None;
228 }
229 let id_len = u16::from_le_bytes([data[0], data[1]]) as usize;
230 let label_len = u16::from_le_bytes([data[2], data[3]]) as usize;
231 if data[4] > 8 {
235 return None;
236 }
237 let flags = data[5];
238 let out_edge_count = u16::from_le_bytes([data[6], data[7]]) as u32;
239 let in_edge_count = u16::from_le_bytes([data[8], data[9]]) as u32;
240 let label_id = LabelRegistry::legacy_node_label_id(data[4]);
241 let node_type = label_id_to_node_label(label_id);
242 Self::decode_payload(
243 data,
244 page_id,
245 slot,
246 NODE_HEADER_SIZE_V1,
247 id_len,
248 label_len,
249 flags,
250 out_edge_count,
251 in_edge_count,
252 node_type,
253 label_id,
254 )
255 }
256
257 #[allow(clippy::too_many_arguments)]
259 fn decode_payload(
260 data: &[u8],
261 page_id: u32,
262 slot: u16,
263 header_size: usize,
264 id_len: usize,
265 label_len: usize,
266 flags: u8,
267 out_edge_count: u32,
268 in_edge_count: u32,
269 node_type: String,
270 label_id: LabelId,
271 ) -> Option<Self> {
272 let has_table_ref = (flags & NODE_FLAG_HAS_TABLE_REF) != 0;
273 let has_vector_ref = (flags & NODE_FLAG_HAS_VECTOR_REF) != 0;
274 let table_ref_size = if has_table_ref { TABLE_REF_SIZE } else { 0 };
275
276 let mut offset = header_size + id_len + label_len + table_ref_size;
277 if data.len() < offset {
278 return None;
279 }
280
281 let id = String::from_utf8_lossy(&data[header_size..header_size + id_len]).to_string();
282 let label =
283 String::from_utf8_lossy(&data[header_size + id_len..header_size + id_len + label_len])
284 .to_string();
285
286 let table_ref = if has_table_ref {
287 let ref_start = header_size + id_len + label_len;
288 TableRef::decode(&data[ref_start..])
289 } else {
290 None
291 };
292
293 let vector_ref = if has_vector_ref {
294 if data.len() < offset + 2 {
295 return None;
296 }
297 let coll_len = u16::from_le_bytes([data[offset], data[offset + 1]]) as usize;
298 offset += 2;
299 if data.len() < offset + coll_len + 8 {
300 return None;
301 }
302 let collection = String::from_utf8_lossy(&data[offset..offset + coll_len]).to_string();
303 offset += coll_len;
304 let vector_id = u64::from_le_bytes(data[offset..offset + 8].try_into().ok()?);
305 Some((collection, vector_id))
306 } else {
307 None
308 };
309
310 Some(Self {
311 id,
312 label,
313 node_type,
314 label_id,
315 flags,
316 out_edge_count,
317 in_edge_count,
318 page_id,
319 slot,
320 table_ref,
321 vector_ref,
322 })
323 }
324
325 pub fn encoded_size(&self) -> usize {
327 let table_ref_size = if self.table_ref.is_some() {
328 TABLE_REF_SIZE
329 } else {
330 0
331 };
332 let vector_ref_size = if let Some((ref coll, _)) = self.vector_ref {
333 2 + coll.len() + 8
334 } else {
335 0
336 };
337 NODE_HEADER_SIZE + self.id.len() + self.label.len() + table_ref_size + vector_ref_size
338 }
339
340 pub fn link_to_row(&mut self, table_id: u16, row_id: u64) {
342 self.table_ref = Some(TableRef::new(table_id, row_id));
343 }
344
345 pub fn unlink_from_row(&mut self) {
347 self.table_ref = None;
348 }
349
350 pub fn link_to_vector(&mut self, collection: String, vector_id: u64) {
352 self.vector_ref = Some((collection, vector_id));
353 }
354
355 pub fn unlink_from_vector(&mut self) {
357 self.vector_ref = None;
358 }
359
360 pub fn is_linked(&self) -> bool {
362 self.table_ref.is_some()
363 }
364}
365
366#[derive(Debug, Clone)]
368pub struct StoredEdge {
369 pub source_id: String,
370 pub target_id: String,
371 pub edge_type: String,
373 pub label_id: LabelId,
375 pub weight: f32,
376 pub page_id: u32,
378 pub slot: u16,
380}
381
382impl StoredEdge {
383 pub fn encode(&self) -> Vec<u8> {
385 let source_bytes = self.source_id.as_bytes();
386 let target_bytes = self.target_id.as_bytes();
387
388 let mut buf =
389 Vec::with_capacity(EDGE_HEADER_SIZE + source_bytes.len() + target_bytes.len());
390
391 buf.extend_from_slice(&(source_bytes.len() as u16).to_le_bytes());
393 buf.extend_from_slice(&(target_bytes.len() as u16).to_le_bytes());
394 buf.extend_from_slice(&self.label_id.as_u32().to_le_bytes());
395 buf.extend_from_slice(&self.weight.to_le_bytes());
396
397 buf.extend_from_slice(source_bytes);
398 buf.extend_from_slice(target_bytes);
399
400 buf
401 }
402
403 pub fn decode(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
405 if data.len() < EDGE_HEADER_SIZE {
406 return None;
407 }
408
409 let source_len = u16::from_le_bytes([data[0], data[1]]) as usize;
410 let target_len = u16::from_le_bytes([data[2], data[3]]) as usize;
411 let label_id = LabelId::new(u32::from_le_bytes([data[4], data[5], data[6], data[7]]));
412 let weight = f32::from_le_bytes([data[8], data[9], data[10], data[11]]);
413 let edge_type = label_id_to_edge_label(label_id);
414
415 if data.len() < EDGE_HEADER_SIZE + source_len + target_len {
416 return None;
417 }
418
419 let source_id =
420 String::from_utf8_lossy(&data[EDGE_HEADER_SIZE..EDGE_HEADER_SIZE + source_len])
421 .to_string();
422 let target_id = String::from_utf8_lossy(
423 &data[EDGE_HEADER_SIZE + source_len..EDGE_HEADER_SIZE + source_len + target_len],
424 )
425 .to_string();
426
427 Some(Self {
428 source_id,
429 target_id,
430 edge_type,
431 label_id,
432 weight,
433 page_id,
434 slot,
435 })
436 }
437
438 pub fn decode_v1(data: &[u8], page_id: u32, slot: u16) -> Option<Self> {
442 if data.len() < EDGE_HEADER_SIZE_V1 {
443 return None;
444 }
445 let source_len = u16::from_le_bytes([data[0], data[1]]) as usize;
446 let target_len = u16::from_le_bytes([data[2], data[3]]) as usize;
447 if data[4] > 9 {
448 return None;
449 }
450 let weight = f32::from_le_bytes([data[5], data[6], data[7], data[8]]);
451 let label_id = LabelRegistry::legacy_edge_label_id(data[4]);
452 let edge_type = label_id_to_edge_label(label_id);
453
454 if data.len() < EDGE_HEADER_SIZE_V1 + source_len + target_len {
455 return None;
456 }
457 let source_id =
458 String::from_utf8_lossy(&data[EDGE_HEADER_SIZE_V1..EDGE_HEADER_SIZE_V1 + source_len])
459 .to_string();
460 let target_id = String::from_utf8_lossy(
461 &data[EDGE_HEADER_SIZE_V1 + source_len..EDGE_HEADER_SIZE_V1 + source_len + target_len],
462 )
463 .to_string();
464
465 Some(Self {
466 source_id,
467 target_id,
468 edge_type,
469 label_id,
470 weight,
471 page_id,
472 slot,
473 })
474 }
475
476 pub fn encoded_size(&self) -> usize {
478 EDGE_HEADER_SIZE + self.source_id.len() + self.target_id.len()
479 }
480}
481
482fn label_id_to_node_label(id: LabelId) -> String {
488 match id.as_u32() {
489 1 => "host".to_string(),
490 2 => "service".to_string(),
491 3 => "credential".to_string(),
492 4 => "vulnerability".to_string(),
493 5 => "endpoint".to_string(),
494 6 => "technology".to_string(),
495 7 => "user".to_string(),
496 8 => "domain".to_string(),
497 9 => "certificate".to_string(),
498 n => format!("label_{}", n),
499 }
500}
501
502fn label_id_to_edge_label(id: LabelId) -> String {
505 match id.as_u32() {
506 10 => "has_service".to_string(),
507 11 => "has_endpoint".to_string(),
508 12 => "uses_tech".to_string(),
509 13 => "auth_access".to_string(),
510 14 => "affected_by".to_string(),
511 15 => "contains".to_string(),
512 16 => "connects_to".to_string(),
513 17 => "related_to".to_string(),
514 18 => "has_user".to_string(),
515 19 => "has_cert".to_string(),
516 n => format!("label_{}", n),
517 }
518}
519
520#[derive(Debug, Clone, Copy)]
522pub struct RecordLocation {
523 pub page_id: u32,
524 pub slot: u16,
525}
526
527#[derive(Debug, Clone, Default)]
529pub struct GraphStats {
530 pub node_count: u64,
531 pub edge_count: u64,
532 pub node_pages: u32,
533 pub edge_pages: u32,
534 pub nodes_by_label: HashMap<String, u64>,
537 pub edges_by_label: HashMap<String, u64>,
539}
540
541pub struct ShardedIndex<V> {
544 shards: Vec<RwLock<HashMap<String, V>>>,
545 shard_count: usize,
546}
547
548impl<V: Clone> ShardedIndex<V> {
549 pub fn new(shard_count: usize) -> Self {
550 let shards = (0..shard_count)
551 .map(|_| RwLock::new(HashMap::new()))
552 .collect();
553 Self {
554 shards,
555 shard_count,
556 }
557 }
558
559 #[inline]
560 fn shard_for(&self, key: &str) -> usize {
561 let hash: u64 = key
563 .bytes()
564 .fold(0u64, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u64));
565 (hash as usize) % self.shard_count
566 }
567
568 pub fn get(&self, key: &str) -> Option<V> {
569 let shard = self.shard_for(key);
570 self.shards[shard].read().ok()?.get(key).cloned()
571 }
572
573 pub fn insert(&self, key: String, value: V) {
574 let shard = self.shard_for(&key);
575 if let Ok(mut guard) = self.shards[shard].write() {
576 guard.insert(key, value);
577 }
578 }
579
580 pub fn remove(&self, key: &str) -> Option<V> {
581 let shard = self.shard_for(key);
582 self.shards[shard].write().ok()?.remove(key)
583 }
584
585 pub fn contains(&self, key: &str) -> bool {
586 let shard = self.shard_for(key);
587 self.shards[shard]
588 .read()
589 .ok()
590 .map(|g| g.contains_key(key))
591 .unwrap_or(false)
592 }
593
594 pub fn len(&self) -> usize {
595 self.shards
596 .iter()
597 .filter_map(|s| s.read().ok())
598 .map(|g| g.len())
599 .sum()
600 }
601
602 pub fn is_empty(&self) -> bool {
603 self.len() == 0
604 }
605}
606
607pub struct EdgeIndex {
612 forward: ShardedIndex<Vec<(String, String, f32)>>,
614 backward: ShardedIndex<Vec<(String, String, f32)>>,
616}
617
618impl EdgeIndex {
619 pub fn new(shard_count: usize) -> Self {
620 Self {
621 forward: ShardedIndex::new(shard_count),
622 backward: ShardedIndex::new(shard_count),
623 }
624 }
625
626 pub fn add_edge(&self, source: &str, target: &str, edge_label: &str, weight: f32) {
627 let shard = self.forward.shard_for(source);
628 if let Ok(mut guard) = self.forward.shards[shard].write() {
629 guard
630 .entry(source.to_string())
631 .or_insert_with(Vec::new)
632 .push((edge_label.to_string(), target.to_string(), weight));
633 }
634
635 let shard = self.backward.shard_for(target);
636 if let Ok(mut guard) = self.backward.shards[shard].write() {
637 guard
638 .entry(target.to_string())
639 .or_insert_with(Vec::new)
640 .push((edge_label.to_string(), source.to_string(), weight));
641 }
642 }
643
644 pub fn remove_edge(&self, source: &str, target: &str, edge_label: &str) {
645 let shard = self.forward.shard_for(source);
646 if let Ok(mut guard) = self.forward.shards[shard].write() {
647 if let Some(edges) = guard.get_mut(source) {
648 edges.retain(|(et, t, _)| !(et == edge_label && t == target));
649 }
650 }
651
652 let shard = self.backward.shard_for(target);
653 if let Ok(mut guard) = self.backward.shards[shard].write() {
654 if let Some(edges) = guard.get_mut(target) {
655 edges.retain(|(et, s, _)| !(et == edge_label && s == source));
656 }
657 }
658 }
659
660 pub fn outgoing(&self, source: &str) -> Vec<(String, String, f32)> {
661 self.forward.get(source).unwrap_or_default()
662 }
663
664 pub fn incoming(&self, target: &str) -> Vec<(String, String, f32)> {
665 self.backward.get(target).unwrap_or_default()
666 }
667
668 pub fn outgoing_of_type(&self, source: &str, edge_label: &str) -> Vec<(String, f32)> {
669 self.forward
670 .get(source)
671 .unwrap_or_default()
672 .into_iter()
673 .filter(|(et, _, _)| et == edge_label)
674 .map(|(_, t, w)| (t, w))
675 .collect()
676 }
677}
678
679pub struct GraphStore {
684 node_index: ShardedIndex<RecordLocation>,
686 edge_index: EdgeIndex,
688 node_secondary: std::sync::Arc<secondary_index::NodeSecondaryIndex>,
695 pub registry: Arc<LabelRegistry>,
698 node_pages: RwLock<Vec<Page>>,
700 edge_pages: RwLock<Vec<Page>>,
702 current_node_page: AtomicU32,
704 current_edge_page: AtomicU32,
706 stats: GraphStats,
708 node_count: AtomicU64,
709 edge_count: AtomicU64,
710}
711
712#[path = "graph_store/impl.rs"]
713mod graph_store_impl;
714pub mod label_registry;
715pub mod secondary_index;
716pub use label_registry::{
717 LabelId, LabelRegistry, LabelRegistryError, Namespace, FIRST_USER_LABEL_ID, MAX_LABEL_LEN,
718 UNSET_LABEL_ID,
719};
720pub use secondary_index::NodeSecondaryIndex;
721impl Default for GraphStore {
722 fn default() -> Self {
723 Self::new()
724 }
725}
726
727pub struct NodeIterator<'a> {
729 store: &'a GraphStore,
730 page_idx: usize,
731 cell_idx: usize,
732}
733
734impl<'a> Iterator for NodeIterator<'a> {
735 type Item = StoredNode;
736
737 fn next(&mut self) -> Option<Self::Item> {
738 let pages = self.store.node_pages.read().ok()?;
739
740 loop {
741 if self.page_idx >= pages.len() {
742 return None;
743 }
744
745 let page = &pages[self.page_idx];
746 let cell_count = page.cell_count() as usize;
747
748 if self.cell_idx >= cell_count {
749 self.page_idx += 1;
750 self.cell_idx = 0;
751 continue;
752 }
753
754 if let Ok((_, value)) = page.read_cell(self.cell_idx) {
755 self.cell_idx += 1;
756 if let Some(node) =
757 StoredNode::decode(&value, self.page_idx as u32, (self.cell_idx - 1) as u16)
758 {
759 return Some(node);
760 }
761 } else {
762 self.cell_idx += 1;
763 }
764 }
765 }
766}
767
768#[derive(Debug, Clone)]
770pub enum GraphStoreError {
771 NodeExists(String),
772 NodeNotFound(String),
773 EdgeNotFound(String, String),
774 PageFull,
775 LockPoisoned,
776 InvalidData(String),
777 IoError(String),
778}
779
780impl std::fmt::Display for GraphStoreError {
781 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
782 match self {
783 Self::NodeExists(id) => write!(f, "Node already exists: {}", id),
784 Self::NodeNotFound(id) => write!(f, "Node not found: {}", id),
785 Self::EdgeNotFound(s, t) => write!(f, "Edge not found: {} -> {}", s, t),
786 Self::PageFull => write!(f, "Page is full"),
787 Self::LockPoisoned => write!(f, "Lock poisoned"),
788 Self::InvalidData(msg) => write!(f, "Invalid data: {}", msg),
789 Self::IoError(msg) => write!(f, "I/O error: {}", msg),
790 }
791 }
792}
793
794impl std::error::Error for GraphStoreError {}
795
796#[cfg(test)]
797mod tests {
798 use super::*;
799 use std::sync::Arc;
800
801 #[test]
802 fn test_graph_store_basic() {
803 let store = GraphStore::new();
804
805 store
807 .add_node_with_label("host:192.168.1.1", "Web Server", "host")
808 .unwrap();
809 store
810 .add_node_with_label("host:192.168.1.2", "Database", "host")
811 .unwrap();
812 store
813 .add_node_with_label("service:192.168.1.1:80:http", "HTTP", "service")
814 .unwrap();
815
816 assert_eq!(store.node_count(), 3);
817
818 store
820 .add_edge_with_label(
821 "host:192.168.1.1",
822 "service:192.168.1.1:80:http",
823 "has_service",
824 1.0,
825 )
826 .unwrap();
827 store
828 .add_edge_with_label("host:192.168.1.1", "host:192.168.1.2", "connects_to", 1.0)
829 .unwrap();
830
831 assert_eq!(store.edge_count(), 2);
832
833 let node = store.get_node("host:192.168.1.1").unwrap();
835 assert_eq!(node.label, "Web Server");
836
837 let out_edges = store.outgoing_edges("host:192.168.1.1");
838 assert_eq!(out_edges.len(), 2);
839 }
840
841 #[test]
842 fn test_graph_store_serialization() {
843 let store = GraphStore::new();
844
845 store
846 .add_node_with_label("host:10.0.0.1", "Server A", "host")
847 .unwrap();
848 store
849 .add_node_with_label("host:10.0.0.2", "Server B", "host")
850 .unwrap();
851 store
852 .add_edge_with_label("host:10.0.0.1", "host:10.0.0.2", "connects_to", 0.5)
853 .unwrap();
854
855 let bytes = store.serialize();
857
858 let restored = GraphStore::deserialize(&bytes).unwrap();
860
861 assert_eq!(restored.node_count(), 2);
862 assert_eq!(restored.edge_count(), 1);
863
864 let node = restored.get_node("host:10.0.0.1").unwrap();
865 assert_eq!(node.label, "Server A");
866 }
867
868 #[test]
869 fn test_concurrent_reads() {
870 use std::thread;
871
872 let store = Arc::new(GraphStore::new());
873
874 for i in 0..100 {
876 store
877 .add_node_with_label(&format!("host:{}", i), &format!("Host {}", i), "host")
878 .unwrap();
879 }
880
881 let mut handles = vec![];
883 for _ in 0..4 {
884 let store_clone = Arc::clone(&store);
885 handles.push(thread::spawn(move || {
886 for i in 0..100 {
887 let _ = store_clone.get_node(&format!("host:{}", i));
888 }
889 }));
890 }
891
892 for handle in handles {
893 handle.join().unwrap();
894 }
895
896 assert_eq!(store.node_count(), 100);
897 }
898
899 #[test]
900 fn test_edge_index_performance() {
901 let store = GraphStore::new();
902
903 store
905 .add_node_with_label("hub", "Hub Node", "host")
906 .unwrap();
907 for i in 0..100 {
908 store
909 .add_node_with_label(&format!("spoke:{}", i), &format!("Spoke {}", i), "host")
910 .unwrap();
911 store
912 .add_edge_with_label("hub", &format!("spoke:{}", i), "connects_to", 1.0)
913 .unwrap();
914 }
915
916 let edges = store.outgoing_edges("hub");
918 assert_eq!(edges.len(), 100);
919 }
920
921 #[test]
922 fn test_nodes_of_type_uses_secondary_index() {
923 let store = GraphStore::new();
924 store
925 .add_node_with_label("host:1", "Web Server", "host")
926 .unwrap();
927 store
928 .add_node_with_label("host:2", "DB Server", "host")
929 .unwrap();
930 store
931 .add_node_with_label("svc:1", "HTTP", "service")
932 .unwrap();
933 store
934 .add_node_with_label("vuln:1", "CVE-2024-1", "vulnerability")
935 .unwrap();
936
937 let hosts = store.nodes_with_category("host");
938 assert_eq!(hosts.len(), 2);
939 assert!(hosts.iter().all(|n| n.node_type == "host"));
940
941 let services = store.nodes_with_category("service");
942 assert_eq!(services.len(), 1);
943 assert_eq!(services[0].id, "svc:1");
944
945 assert_eq!(store.nodes_with_category("user").len(), 0);
946 }
947
948 #[test]
949 fn test_nodes_by_label_with_bloom_prune() {
950 let store = GraphStore::new();
951 store
952 .add_node_with_label("host:1", "Edge Router", "host")
953 .unwrap();
954 store
955 .add_node_with_label("host:2", "Edge Router", "host")
956 .unwrap();
957 store
958 .add_node_with_label("host:3", "Core Switch", "host")
959 .unwrap();
960
961 let routers = store.nodes_by_label("Edge Router");
962 assert_eq!(routers.len(), 2);
963
964 let unknown = store.nodes_by_label("Quantum Router 9000");
965 assert!(unknown.is_empty());
966 assert!(store.may_contain_label("Edge Router"));
968 assert!(store.may_contain_label("Core Switch"));
969 }
970
971 #[test]
972 fn test_publish_indexes_to_registry() {
973 use crate::storage::index::{IndexKind, IndexRegistry, IndexScope};
974
975 let store = GraphStore::new();
976 store.add_node_with_label("h:1", "Alpha", "host").unwrap();
977 store.add_node_with_label("h:2", "Beta", "host").unwrap();
978 store
979 .add_node_with_label("svc:1", "HTTP", "service")
980 .unwrap();
981
982 let registry = IndexRegistry::new();
983 store.publish_indexes(®istry, "infra");
984
985 let shared = registry.get(&IndexScope::graph("infra")).unwrap();
986 let stats = shared.stats();
987 assert_eq!(stats.entries, 6);
990 assert_eq!(stats.kind, IndexKind::Inverted);
991 assert!(stats.has_bloom);
992
993 store.add_node_with_label("h:3", "Gamma", "host").unwrap();
996 let updated = registry.get(&IndexScope::graph("infra")).unwrap().stats();
997 assert_eq!(updated.entries, 8);
998 }
999
1000 #[test]
1001 fn test_secondary_index_rebuilt_after_deserialize() {
1002 let store = GraphStore::new();
1003 store
1004 .add_node_with_label("host:1", "Alpha", "host")
1005 .unwrap();
1006 store
1007 .add_node_with_label("svc:1", "HTTP", "service")
1008 .unwrap();
1009
1010 let bytes = store.serialize();
1011 let restored = GraphStore::deserialize(&bytes).unwrap();
1012
1013 assert_eq!(restored.nodes_with_category("host").len(), 1);
1014 assert_eq!(restored.nodes_by_label("HTTP").len(), 1);
1015 assert!(restored.may_contain_label("Alpha"));
1016 }
1017
1018 #[test]
1019 fn test_node_iteration() {
1020 let store = GraphStore::new();
1021
1022 for i in 0..50 {
1023 store
1024 .add_node_with_label(&format!("node:{}", i), &format!("Node {}", i), "host")
1025 .unwrap();
1026 }
1027
1028 let nodes: Vec<_> = store.iter_nodes().collect();
1029 assert_eq!(nodes.len(), 50);
1030 }
1031
1032 #[test]
1033 fn legacy_node_type_interns_into_registry() {
1034 let store = GraphStore::new();
1035 store.add_node_with_label("h1", "web", "host").unwrap();
1036 let id = store
1038 .registry
1039 .lookup(label_registry::Namespace::Node, "host")
1040 .expect("legacy enum name should be interned");
1041 let fetched = store.get_node("h1").unwrap();
1042 assert_eq!(fetched.label_id, id);
1043 assert_eq!(fetched.node_type, "host");
1044 }
1045
1046 #[test]
1047 fn v2_round_trip_preserves_user_labels() {
1048 let store = GraphStore::new();
1049 let user_id = store.intern_node_label("order").unwrap();
1052 assert!(user_id.as_u32() >= label_registry::FIRST_USER_LABEL_ID);
1053
1054 store.add_node_with_label("h1", "web-1", "host").unwrap();
1055 store.add_node_with_label("h2", "web-2", "service").unwrap();
1056 store
1057 .add_edge_with_label("h1", "h2", "connects_to", 1.0)
1058 .unwrap();
1059
1060 let bytes = store.serialize();
1061 assert_eq!(&bytes[0..4], b"RBGR");
1063 assert_eq!(
1064 u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]),
1065 2
1066 );
1067
1068 let restored = GraphStore::deserialize(&bytes).unwrap();
1069 assert_eq!(
1071 restored
1072 .registry
1073 .lookup(label_registry::Namespace::Node, "order"),
1074 Some(user_id)
1075 );
1076 let h1 = restored.get_node("h1").unwrap();
1078 assert_eq!(h1.node_type, "host");
1079 assert_eq!(
1080 h1.label_id,
1081 restored
1082 .registry
1083 .lookup(label_registry::Namespace::Node, "host")
1084 .unwrap()
1085 );
1086 let outgoing = restored.outgoing_edges("h1");
1088 assert_eq!(outgoing.len(), 1);
1089 assert_eq!(outgoing[0].0, "connects_to");
1090 }
1091
1092 #[test]
1093 fn v1_blob_deserializes_via_legacy_path() {
1094 let mut node_page = Page::new(PageType::GraphNode, 0);
1105 let mut v1_node = Vec::new();
1107 v1_node.extend_from_slice(&2u16.to_le_bytes()); v1_node.extend_from_slice(&1u16.to_le_bytes()); v1_node.push(0); v1_node.push(0); v1_node.extend_from_slice(&0u16.to_le_bytes()); v1_node.extend_from_slice(&0u16.to_le_bytes()); v1_node.extend_from_slice(b"n1");
1114 v1_node.extend_from_slice(b"L");
1115 node_page.insert_cell(b"n1", &v1_node).unwrap();
1116
1117 let mut edge_page = Page::new(PageType::GraphEdge, 0);
1118 let mut v1_edge = Vec::new();
1119 v1_edge.extend_from_slice(&2u16.to_le_bytes()); v1_edge.extend_from_slice(&2u16.to_le_bytes()); v1_edge.push(0); v1_edge.extend_from_slice(&1.0f32.to_le_bytes()); v1_edge.extend_from_slice(b"n1");
1124 v1_edge.extend_from_slice(b"n1");
1125 edge_page.insert_cell(b"n1|0|n1", &v1_edge).unwrap();
1126
1127 let mut bytes = Vec::new();
1129 bytes.extend_from_slice(b"RBGR");
1130 bytes.extend_from_slice(&1u32.to_le_bytes()); bytes.extend_from_slice(&1u64.to_le_bytes()); bytes.extend_from_slice(&1u64.to_le_bytes()); bytes.extend_from_slice(&1u32.to_le_bytes()); bytes.extend_from_slice(node_page.as_bytes());
1135 bytes.extend_from_slice(&1u32.to_le_bytes()); bytes.extend_from_slice(edge_page.as_bytes());
1137
1138 let store = GraphStore::deserialize(&bytes).expect("v1 blob deserializes");
1139 let node = store.get_node("n1").unwrap();
1141 assert_eq!(node.node_type, "host");
1142 assert_eq!(node.label_id, LabelId::new(1));
1143 let out = store.outgoing_edges("n1");
1145 assert_eq!(out.len(), 1);
1146 assert_eq!(out[0].0, "has_service");
1147 }
1148
1149 #[test]
1150 fn deserialize_rejects_unknown_version() {
1151 let mut bytes = Vec::new();
1152 bytes.extend_from_slice(b"RBGR");
1153 bytes.extend_from_slice(&999u32.to_le_bytes()); bytes.extend_from_slice(&0u64.to_le_bytes());
1155 bytes.extend_from_slice(&0u64.to_le_bytes());
1156 match GraphStore::deserialize(&bytes) {
1157 Err(GraphStoreError::InvalidData(msg)) => assert!(msg.contains("Unsupported")),
1158 Err(other) => panic!("unexpected error: {:?}", other),
1159 Ok(_) => panic!("expected error for unknown version"),
1160 }
1161 }
1162}