reddb_server/storage/engine/
graph_table_index.rs1use std::collections::HashMap;
31use std::sync::RwLock;
32
33use super::graph_store::TableRef;
34
35const NUM_SHARDS: usize = 16;
37
38fn fnv_hash(data: &[u8]) -> u64 {
40 const FNV_OFFSET: u64 = 0xcbf29ce484222325;
41 const FNV_PRIME: u64 = 0x100000001b3;
42
43 let mut hash = FNV_OFFSET;
44 for byte in data {
45 hash ^= *byte as u64;
46 hash = hash.wrapping_mul(FNV_PRIME);
47 }
48 hash
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
53pub struct RowKey {
54 pub table_id: u16,
55 pub row_id: u64,
56}
57
58impl RowKey {
59 pub fn new(table_id: u16, row_id: u64) -> Self {
60 Self { table_id, row_id }
61 }
62
63 pub fn from_table_ref(tref: &TableRef) -> Self {
64 Self {
65 table_id: tref.table_id,
66 row_id: tref.row_id,
67 }
68 }
69
70 fn to_bytes(&self) -> [u8; 10] {
72 reddb_file::encode_graph_table_ref(reddb_file::GraphTableRef {
73 table_id: self.table_id,
74 row_id: self.row_id,
75 })
76 }
77}
78
79struct NodeToRowIndex {
81 shards: Vec<RwLock<HashMap<String, TableRef>>>,
82}
83
84impl NodeToRowIndex {
85 fn new() -> Self {
86 let mut shards = Vec::with_capacity(NUM_SHARDS);
87 for _ in 0..NUM_SHARDS {
88 shards.push(RwLock::new(HashMap::new()));
89 }
90 Self { shards }
91 }
92
93 fn shard_for(&self, node_id: &str) -> usize {
94 (fnv_hash(node_id.as_bytes()) as usize) % NUM_SHARDS
95 }
96
97 fn insert(&self, node_id: String, table_ref: TableRef) {
98 let shard = self.shard_for(&node_id);
99 if let Ok(mut map) = self.shards[shard].write() {
100 map.insert(node_id, table_ref);
101 }
102 }
103
104 fn get(&self, node_id: &str) -> Option<TableRef> {
105 let shard = self.shard_for(node_id);
106 if let Ok(map) = self.shards[shard].read() {
107 map.get(node_id).copied()
108 } else {
109 None
110 }
111 }
112
113 fn remove(&self, node_id: &str) -> Option<TableRef> {
114 let shard = self.shard_for(node_id);
115 if let Ok(mut map) = self.shards[shard].write() {
116 map.remove(node_id)
117 } else {
118 None
119 }
120 }
121
122 fn contains(&self, node_id: &str) -> bool {
123 let shard = self.shard_for(node_id);
124 if let Ok(map) = self.shards[shard].read() {
125 map.contains_key(node_id)
126 } else {
127 false
128 }
129 }
130
131 fn len(&self) -> usize {
132 self.shards
133 .iter()
134 .filter_map(|s| s.read().ok())
135 .map(|m| m.len())
136 .sum()
137 }
138}
139
140struct RowToNodeIndex {
142 shards: Vec<RwLock<HashMap<RowKey, String>>>,
143}
144
145impl RowToNodeIndex {
146 fn new() -> Self {
147 let mut shards = Vec::with_capacity(NUM_SHARDS);
148 for _ in 0..NUM_SHARDS {
149 shards.push(RwLock::new(HashMap::new()));
150 }
151 Self { shards }
152 }
153
154 fn shard_for(&self, key: &RowKey) -> usize {
155 (fnv_hash(&key.to_bytes()) as usize) % NUM_SHARDS
156 }
157
158 fn insert(&self, key: RowKey, node_id: String) {
159 let shard = self.shard_for(&key);
160 if let Ok(mut map) = self.shards[shard].write() {
161 map.insert(key, node_id);
162 }
163 }
164
165 fn get(&self, key: &RowKey) -> Option<String> {
166 let shard = self.shard_for(key);
167 if let Ok(map) = self.shards[shard].read() {
168 map.get(key).cloned()
169 } else {
170 None
171 }
172 }
173
174 fn remove(&self, key: &RowKey) -> Option<String> {
175 let shard = self.shard_for(key);
176 if let Ok(mut map) = self.shards[shard].write() {
177 map.remove(key)
178 } else {
179 None
180 }
181 }
182
183 fn contains(&self, key: &RowKey) -> bool {
184 let shard = self.shard_for(key);
185 if let Ok(map) = self.shards[shard].read() {
186 map.contains_key(key)
187 } else {
188 false
189 }
190 }
191
192 fn nodes_for_table(&self, table_id: u16) -> Vec<(u64, String)> {
194 let mut results = Vec::new();
195 for shard in &self.shards {
196 if let Ok(map) = shard.read() {
197 for (key, node_id) in map.iter() {
198 if key.table_id == table_id {
199 results.push((key.row_id, node_id.clone()));
200 }
201 }
202 }
203 }
204 results
205 }
206
207 fn len(&self) -> usize {
208 self.shards
209 .iter()
210 .filter_map(|s| s.read().ok())
211 .map(|m| m.len())
212 .sum()
213 }
214}
215
216pub struct GraphTableIndex {
224 node_to_row: NodeToRowIndex,
225 row_to_node: RowToNodeIndex,
226}
227
228impl GraphTableIndex {
229 pub fn new() -> Self {
231 Self {
232 node_to_row: NodeToRowIndex::new(),
233 row_to_node: RowToNodeIndex::new(),
234 }
235 }
236
237 pub fn link(&self, node_id: &str, table_id: u16, row_id: u64) {
242 let table_ref = TableRef::new(table_id, row_id);
243 let row_key = RowKey::new(table_id, row_id);
244
245 self.node_to_row.insert(node_id.to_string(), table_ref);
246 self.row_to_node.insert(row_key, node_id.to_string());
247 }
248
249 pub fn unlink_node(&self, node_id: &str) -> Option<TableRef> {
254 if let Some(table_ref) = self.node_to_row.remove(node_id) {
255 let row_key = RowKey::from_table_ref(&table_ref);
256 self.row_to_node.remove(&row_key);
257 Some(table_ref)
258 } else {
259 None
260 }
261 }
262
263 pub fn unlink_row(&self, table_id: u16, row_id: u64) -> Option<String> {
268 let row_key = RowKey::new(table_id, row_id);
269 if let Some(node_id) = self.row_to_node.remove(&row_key) {
270 self.node_to_row.remove(&node_id);
271 Some(node_id)
272 } else {
273 None
274 }
275 }
276
277 pub fn get_row_for_node(&self, node_id: &str) -> Option<TableRef> {
279 self.node_to_row.get(node_id)
280 }
281
282 pub fn get_node_for_row(&self, table_id: u16, row_id: u64) -> Option<String> {
284 let row_key = RowKey::new(table_id, row_id);
285 self.row_to_node.get(&row_key)
286 }
287
288 pub fn is_node_linked(&self, node_id: &str) -> bool {
290 self.node_to_row.contains(node_id)
291 }
292
293 pub fn is_row_linked(&self, table_id: u16, row_id: u64) -> bool {
295 let row_key = RowKey::new(table_id, row_id);
296 self.row_to_node.contains(&row_key)
297 }
298
299 pub fn nodes_for_table(&self, table_id: u16) -> Vec<(u64, String)> {
303 self.row_to_node.nodes_for_table(table_id)
304 }
305
306 pub fn stats(&self) -> GraphTableIndexStats {
308 GraphTableIndexStats {
309 node_to_row_count: self.node_to_row.len(),
310 row_to_node_count: self.row_to_node.len(),
311 num_shards: NUM_SHARDS,
312 }
313 }
314
315 pub fn clear(&self) {
317 for shard in &self.node_to_row.shards {
318 if let Ok(mut map) = shard.write() {
319 map.clear();
320 }
321 }
322 for shard in &self.row_to_node.shards {
323 if let Ok(mut map) = shard.write() {
324 map.clear();
325 }
326 }
327 }
328
329 pub fn serialize(&self) -> Vec<u8> {
331 let mut mappings = Vec::new();
332 for shard in &self.node_to_row.shards {
333 if let Ok(map) = shard.read() {
334 for (node_id, table_ref) in map.iter() {
335 mappings.push(reddb_file::GraphTableIndexEntry {
336 node_id: node_id.clone(),
337 table_ref: reddb_file::GraphTableRef {
338 table_id: table_ref.table_id,
339 row_id: table_ref.row_id,
340 },
341 });
342 }
343 }
344 }
345
346 reddb_file::encode_graph_table_index_frame(&mappings)
347 .expect("in-memory graph-table index should encode")
348 }
349
350 pub fn deserialize(data: &[u8]) -> Result<Self, GraphTableIndexError> {
352 let index = Self::new();
353 let mappings = reddb_file::decode_graph_table_index_frame(data)
354 .map_err(|err| GraphTableIndexError::InvalidData(err.to_string()))?;
355 for mapping in mappings {
356 index.link(
357 &mapping.node_id,
358 mapping.table_ref.table_id,
359 mapping.table_ref.row_id,
360 );
361 }
362
363 Ok(index)
364 }
365}
366
367impl Default for GraphTableIndex {
368 fn default() -> Self {
369 Self::new()
370 }
371}
372
373#[derive(Debug, Clone, Copy)]
375pub struct GraphTableIndexStats {
376 pub node_to_row_count: usize,
377 pub row_to_node_count: usize,
378 pub num_shards: usize,
379}
380
381#[derive(Debug, Clone)]
383pub enum GraphTableIndexError {
384 InvalidData(String),
385}
386
387impl std::fmt::Display for GraphTableIndexError {
388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389 match self {
390 Self::InvalidData(msg) => write!(f, "Invalid data: {}", msg),
391 }
392 }
393}
394
395impl std::error::Error for GraphTableIndexError {}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400
401 #[test]
402 fn test_link_and_lookup() {
403 let index = GraphTableIndex::new();
404
405 index.link("host:192.168.1.1", 1, 100);
406 index.link("service:ssh", 2, 200);
407
408 let tref = index.get_row_for_node("host:192.168.1.1").unwrap();
410 assert_eq!(tref.table_id, 1);
411 assert_eq!(tref.row_id, 100);
412
413 let node_id = index.get_node_for_row(2, 200).unwrap();
415 assert_eq!(node_id, "service:ssh");
416
417 assert!(index.get_row_for_node("unknown").is_none());
419 assert!(index.get_node_for_row(99, 999).is_none());
420 }
421
422 #[test]
423 fn test_unlink() {
424 let index = GraphTableIndex::new();
425
426 index.link("node1", 1, 10);
427 assert!(index.is_node_linked("node1"));
428 assert!(index.is_row_linked(1, 10));
429
430 let tref = index.unlink_node("node1").unwrap();
432 assert_eq!(tref.table_id, 1);
433 assert_eq!(tref.row_id, 10);
434
435 assert!(!index.is_node_linked("node1"));
436 assert!(!index.is_row_linked(1, 10));
437 }
438
439 #[test]
440 fn test_unlink_by_row() {
441 let index = GraphTableIndex::new();
442
443 index.link("node2", 2, 20);
444
445 let node_id = index.unlink_row(2, 20).unwrap();
446 assert_eq!(node_id, "node2");
447
448 assert!(!index.is_node_linked("node2"));
449 assert!(!index.is_row_linked(2, 20));
450 }
451
452 #[test]
453 fn test_nodes_for_table() {
454 let index = GraphTableIndex::new();
455
456 index.link("host:1", 1, 100);
457 index.link("host:2", 1, 101);
458 index.link("host:3", 1, 102);
459 index.link("service:1", 2, 200);
460
461 let hosts = index.nodes_for_table(1);
462 assert_eq!(hosts.len(), 3);
463
464 let services = index.nodes_for_table(2);
465 assert_eq!(services.len(), 1);
466 }
467
468 #[test]
469 fn test_serialization() {
470 let index = GraphTableIndex::new();
471
472 index.link("node:a", 1, 100);
473 index.link("node:b", 2, 200);
474 index.link("node:c", 1, 300);
475
476 let bytes = index.serialize();
477 let restored = GraphTableIndex::deserialize(&bytes).unwrap();
478
479 assert_eq!(restored.stats().node_to_row_count, 3);
480 assert_eq!(restored.get_row_for_node("node:a").unwrap().row_id, 100);
481 assert_eq!(restored.get_node_for_row(2, 200).unwrap(), "node:b");
482 }
483
484 #[test]
485 fn test_concurrent_access() {
486 use std::sync::Arc;
487 use std::thread;
488
489 let index = Arc::new(GraphTableIndex::new());
490 let mut handles = vec![];
491
492 for i in 0..10 {
494 let idx = Arc::clone(&index);
495 handles.push(thread::spawn(move || {
496 for j in 0..100 {
497 idx.link(&format!("node:{}:{}", i, j), i as u16, j);
498 }
499 }));
500 }
501
502 for _ in 0..5 {
504 let idx = Arc::clone(&index);
505 handles.push(thread::spawn(move || {
506 for i in 0..10 {
507 for j in 0..100 {
508 let _ = idx.get_row_for_node(&format!("node:{}:{}", i, j));
509 }
510 }
511 }));
512 }
513
514 for h in handles {
515 h.join().unwrap();
516 }
517
518 assert_eq!(index.stats().node_to_row_count, 1000);
519 }
520}