Skip to main content

nodedb_graph/
csr.rs

1//! Dense integer CSR adjacency index with interned node IDs and labels.
2//!
3//! Memory layout:
4//! - Old-style `Vec<Vec<(String, u32)>>` ≈ 60 bytes/edge (heap String per edge)
5//! - Dense CSR: contiguous `Vec<u32>` offsets + targets + `Vec<u16>` labels ≈ 10 bytes/edge
6//!
7//! Writes accumulate in a mutable buffer. Reads check both dense CSR and buffer.
8//! `compact()` merges the buffer into dense arrays (double-buffered swap).
9
10use std::collections::{HashMap, HashSet, hash_map::Entry};
11
12// Re-export shared Direction from nodedb-types.
13pub use nodedb_types::graph::Direction;
14
15/// Dense integer CSR adjacency index.
16pub struct CsrIndex {
17    // ── Node interning ──
18    pub(crate) node_to_id: HashMap<String, u32>,
19    pub(crate) id_to_node: Vec<String>,
20
21    // ── Label interning ──
22    pub(crate) label_to_id: HashMap<String, u16>,
23    pub(crate) id_to_label: Vec<String>,
24
25    // ── Dense CSR (read-only between compactions) ──
26    pub(crate) out_offsets: Vec<u32>,
27    pub(crate) out_targets: Vec<u32>,
28    pub(crate) out_labels: Vec<u16>,
29
30    pub(crate) in_offsets: Vec<u32>,
31    pub(crate) in_targets: Vec<u32>,
32    pub(crate) in_labels: Vec<u16>,
33
34    // ── Mutable write buffer ──
35    pub(crate) buffer_out: Vec<Vec<(u16, u32)>>,
36    pub(crate) buffer_in: Vec<Vec<(u16, u32)>>,
37
38    /// Edges deleted since last compaction: `(src, label, dst)`.
39    pub(crate) deleted_edges: HashSet<(u32, u16, u32)>,
40}
41
42impl Default for CsrIndex {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl CsrIndex {
49    pub fn new() -> Self {
50        Self {
51            node_to_id: HashMap::new(),
52            id_to_node: Vec::new(),
53            label_to_id: HashMap::new(),
54            id_to_label: Vec::new(),
55            out_offsets: vec![0],
56            out_targets: Vec::new(),
57            out_labels: Vec::new(),
58            in_offsets: vec![0],
59            in_targets: Vec::new(),
60            in_labels: Vec::new(),
61            buffer_out: Vec::new(),
62            buffer_in: Vec::new(),
63            deleted_edges: HashSet::new(),
64        }
65    }
66
67    /// Get or create a dense ID for a node.
68    pub(crate) fn ensure_node(&mut self, node: &str) -> u32 {
69        match self.node_to_id.entry(node.to_string()) {
70            Entry::Occupied(e) => *e.get(),
71            Entry::Vacant(e) => {
72                let id = self.id_to_node.len() as u32;
73                e.insert(id);
74                self.id_to_node.push(node.to_string());
75                self.out_offsets
76                    .push(*self.out_offsets.last().unwrap_or(&0));
77                self.in_offsets.push(*self.in_offsets.last().unwrap_or(&0));
78                self.buffer_out.push(Vec::new());
79                self.buffer_in.push(Vec::new());
80                id
81            }
82        }
83    }
84
85    /// Get or create a dense ID for a label.
86    fn ensure_label(&mut self, label: &str) -> u16 {
87        match self.label_to_id.entry(label.to_string()) {
88            Entry::Occupied(e) => *e.get(),
89            Entry::Vacant(e) => {
90                let id = self.id_to_label.len() as u16;
91                e.insert(id);
92                self.id_to_label.push(label.to_string());
93                id
94            }
95        }
96    }
97
98    /// Add an edge (goes into mutable buffer). Deduplicates.
99    pub fn add_edge(&mut self, src: &str, label: &str, dst: &str) {
100        let src_id = self.ensure_node(src);
101        let dst_id = self.ensure_node(dst);
102        let label_id = self.ensure_label(label);
103
104        // Deduplicate in buffer.
105        if self.buffer_out[src_id as usize]
106            .iter()
107            .any(|&(l, d)| l == label_id && d == dst_id)
108        {
109            return;
110        }
111        // Deduplicate in dense CSR.
112        if self.dense_has_edge(src_id, label_id, dst_id) {
113            return;
114        }
115
116        self.buffer_out[src_id as usize].push((label_id, dst_id));
117        self.buffer_in[dst_id as usize].push((label_id, src_id));
118        self.deleted_edges.remove(&(src_id, label_id, dst_id));
119    }
120
121    /// Remove an edge.
122    pub fn remove_edge(&mut self, src: &str, label: &str, dst: &str) {
123        let (Some(&src_id), Some(&dst_id)) = (self.node_to_id.get(src), self.node_to_id.get(dst))
124        else {
125            return;
126        };
127        let Some(&label_id) = self.label_to_id.get(label) else {
128            return;
129        };
130
131        self.buffer_out[src_id as usize].retain(|&(l, d)| !(l == label_id && d == dst_id));
132        self.buffer_in[dst_id as usize].retain(|&(l, s)| !(l == label_id && s == src_id));
133
134        if self.dense_has_edge(src_id, label_id, dst_id) {
135            self.deleted_edges.insert((src_id, label_id, dst_id));
136        }
137    }
138
139    /// Get immediate neighbors.
140    pub fn neighbors(
141        &self,
142        node: &str,
143        label_filter: Option<&str>,
144        direction: Direction,
145    ) -> Vec<(String, String)> {
146        match label_filter {
147            Some(l) => self.neighbors_multi(node, &[l], direction),
148            None => self.neighbors_multi(node, &[], direction),
149        }
150    }
151
152    /// Get neighbors with multi-label filter. Empty labels = all edges.
153    pub fn neighbors_multi(
154        &self,
155        node: &str,
156        label_filters: &[&str],
157        direction: Direction,
158    ) -> Vec<(String, String)> {
159        let Some(&node_id) = self.node_to_id.get(node) else {
160            return Vec::new();
161        };
162        let label_ids: Vec<u16> = label_filters
163            .iter()
164            .filter_map(|l| self.label_to_id.get(*l).copied())
165            .collect();
166        let match_label = |lid: u16| label_ids.is_empty() || label_ids.contains(&lid);
167
168        let mut result = Vec::new();
169
170        if matches!(direction, Direction::Out | Direction::Both) {
171            for (lid, dst) in self.iter_out_edges(node_id) {
172                if match_label(lid) {
173                    result.push((
174                        self.id_to_label[lid as usize].clone(),
175                        self.id_to_node[dst as usize].clone(),
176                    ));
177                }
178            }
179        }
180        if matches!(direction, Direction::In | Direction::Both) {
181            for (lid, src) in self.iter_in_edges(node_id) {
182                if match_label(lid) {
183                    result.push((
184                        self.id_to_label[lid as usize].clone(),
185                        self.id_to_node[src as usize].clone(),
186                    ));
187                }
188            }
189        }
190
191        result
192    }
193
194    pub fn node_count(&self) -> usize {
195        self.id_to_node.len()
196    }
197
198    pub fn contains_node(&self, node: &str) -> bool {
199        self.node_to_id.contains_key(node)
200    }
201
202    /// Total edge count (dense + buffer - deleted).
203    pub fn edge_count(&self) -> usize {
204        let dense_out = self.out_targets.len();
205        let buffer_out: usize = self.buffer_out.iter().map(|b| b.len()).sum();
206        let deleted = self.deleted_edges.len();
207        dense_out + buffer_out - deleted
208    }
209
210    /// Estimated memory usage in bytes.
211    pub fn estimated_memory_bytes(&self) -> usize {
212        let offsets = (self.out_offsets.len() + self.in_offsets.len()) * 4;
213        let targets = (self.out_targets.len() + self.in_targets.len()) * 4;
214        let labels = (self.out_labels.len() + self.in_labels.len()) * 2;
215        let buffer: usize = self
216            .buffer_out
217            .iter()
218            .chain(self.buffer_in.iter())
219            .map(|b| b.len() * 6)
220            .sum();
221        let interning = self.id_to_node.iter().map(|s| s.len() + 24).sum::<usize>()
222            + self.id_to_label.iter().map(|s| s.len() + 24).sum::<usize>();
223        offsets + targets + labels + buffer + interning
224    }
225
226    fn dense_has_edge(&self, src: u32, label: u16, dst: u32) -> bool {
227        for (lid, target) in self.dense_out_edges(src) {
228            if lid == label && target == dst {
229                return true;
230            }
231        }
232        false
233    }
234
235    pub(crate) fn dense_out_edges(&self, node: u32) -> impl Iterator<Item = (u16, u32)> + '_ {
236        let idx = node as usize;
237        if idx + 1 >= self.out_offsets.len() {
238            return Vec::new().into_iter();
239        }
240        let start = self.out_offsets[idx] as usize;
241        let end = self.out_offsets[idx + 1] as usize;
242        (start..end)
243            .map(move |i| (self.out_labels[i], self.out_targets[i]))
244            .collect::<Vec<_>>()
245            .into_iter()
246    }
247
248    pub(crate) fn dense_in_edges(&self, node: u32) -> impl Iterator<Item = (u16, u32)> + '_ {
249        let idx = node as usize;
250        if idx + 1 >= self.in_offsets.len() {
251            return Vec::new().into_iter();
252        }
253        let start = self.in_offsets[idx] as usize;
254        let end = self.in_offsets[idx + 1] as usize;
255        (start..end)
256            .map(move |i| (self.in_labels[i], self.in_targets[i]))
257            .collect::<Vec<_>>()
258            .into_iter()
259    }
260
261    pub(crate) fn iter_out_edges(&self, node: u32) -> impl Iterator<Item = (u16, u32)> + '_ {
262        let idx = node as usize;
263        let dense = self
264            .dense_out_edges(node)
265            .filter(move |&(lid, dst)| !self.deleted_edges.contains(&(node, lid, dst)));
266        let buffer = if idx < self.buffer_out.len() {
267            self.buffer_out[idx].to_vec()
268        } else {
269            Vec::new()
270        };
271        dense.chain(buffer)
272    }
273
274    pub(crate) fn iter_in_edges(&self, node: u32) -> impl Iterator<Item = (u16, u32)> + '_ {
275        let idx = node as usize;
276        let dense = self
277            .dense_in_edges(node)
278            .filter(move |&(lid, src)| !self.deleted_edges.contains(&(src, lid, node)));
279        let buffer = if idx < self.buffer_in.len() {
280            self.buffer_in[idx].to_vec()
281        } else {
282            Vec::new()
283        };
284        dense.chain(buffer)
285    }
286
287    /// Resolve a node string to its internal ID.
288    pub(crate) fn node_id(&self, node: &str) -> Option<u32> {
289        self.node_to_id.get(node).copied()
290    }
291
292    /// Resolve an internal ID to its node string.
293    pub(crate) fn node_name(&self, id: u32) -> &str {
294        self.id_to_node
295            .get(id as usize)
296            .map(|s| s.as_str())
297            .unwrap_or("")
298    }
299
300    /// Resolve a label ID to its label string.
301    pub(crate) fn label_name(&self, id: u16) -> &str {
302        self.id_to_label
303            .get(id as usize)
304            .map(|s| s.as_str())
305            .unwrap_or("")
306    }
307
308    /// Resolve a label string to its internal ID.
309    pub(crate) fn label_id(&self, label: &str) -> Option<u16> {
310        self.label_to_id.get(label).copied()
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317
318    fn make_csr() -> CsrIndex {
319        let mut csr = CsrIndex::new();
320        csr.add_edge("a", "KNOWS", "b");
321        csr.add_edge("b", "KNOWS", "c");
322        csr.add_edge("c", "KNOWS", "d");
323        csr.add_edge("a", "WORKS", "e");
324        csr
325    }
326
327    #[test]
328    fn neighbors_out() {
329        let csr = make_csr();
330        let n = csr.neighbors("a", None, Direction::Out);
331        assert_eq!(n.len(), 2);
332        let dsts: Vec<&str> = n.iter().map(|(_, d)| d.as_str()).collect();
333        assert!(dsts.contains(&"b"));
334        assert!(dsts.contains(&"e"));
335    }
336
337    #[test]
338    fn neighbors_filtered() {
339        let csr = make_csr();
340        let n = csr.neighbors("a", Some("KNOWS"), Direction::Out);
341        assert_eq!(n.len(), 1);
342        assert_eq!(n[0].1, "b");
343    }
344
345    #[test]
346    fn neighbors_in() {
347        let csr = make_csr();
348        let n = csr.neighbors("b", None, Direction::In);
349        assert_eq!(n.len(), 1);
350        assert_eq!(n[0].1, "a");
351    }
352
353    #[test]
354    fn incremental_remove() {
355        let mut csr = make_csr();
356        assert_eq!(csr.neighbors("a", Some("KNOWS"), Direction::Out).len(), 1);
357        csr.remove_edge("a", "KNOWS", "b");
358        assert_eq!(csr.neighbors("a", Some("KNOWS"), Direction::Out).len(), 0);
359    }
360
361    #[test]
362    fn duplicate_add_is_idempotent() {
363        let mut csr = CsrIndex::new();
364        csr.add_edge("a", "L", "b");
365        csr.add_edge("a", "L", "b");
366        assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
367    }
368
369    #[test]
370    fn compact_merges_buffer_into_dense() {
371        let mut csr = CsrIndex::new();
372        csr.add_edge("a", "L", "b");
373        csr.add_edge("b", "L", "c");
374        assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
375
376        csr.compact();
377        assert!(csr.buffer_out.iter().all(|b| b.is_empty()));
378        assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
379        assert_eq!(csr.neighbors("b", None, Direction::Out).len(), 1);
380    }
381
382    #[test]
383    fn compact_handles_deletes() {
384        let mut csr = CsrIndex::new();
385        csr.add_edge("a", "L", "b");
386        csr.add_edge("a", "L", "c");
387        csr.compact();
388
389        csr.remove_edge("a", "L", "b");
390        assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
391
392        csr.compact();
393        assert_eq!(csr.neighbors("a", None, Direction::Out).len(), 1);
394        assert_eq!(csr.neighbors("a", None, Direction::Out)[0].1, "c");
395    }
396
397    #[test]
398    fn label_interning() {
399        let mut csr = CsrIndex::new();
400        for i in 0..100 {
401            csr.add_edge(&format!("n{i}"), "FOLLOWS", &format!("n{}", i + 1));
402        }
403        assert_eq!(csr.id_to_label.len(), 1);
404    }
405
406    #[test]
407    fn edge_count() {
408        let csr = make_csr();
409        assert_eq!(csr.edge_count(), 4);
410    }
411
412    #[test]
413    fn checkpoint_roundtrip() {
414        let mut csr = make_csr();
415        csr.compact();
416
417        let bytes = csr.checkpoint_to_bytes();
418        assert!(!bytes.is_empty());
419
420        let restored = CsrIndex::from_checkpoint(&bytes).unwrap();
421        assert_eq!(restored.node_count(), csr.node_count());
422        assert_eq!(restored.edge_count(), csr.edge_count());
423
424        let n = restored.neighbors("a", Some("KNOWS"), Direction::Out);
425        assert_eq!(n.len(), 1);
426        assert_eq!(n[0].1, "b");
427    }
428
429    #[test]
430    fn memory_estimation() {
431        let csr = make_csr();
432        let mem = csr.estimated_memory_bytes();
433        assert!(mem > 0);
434    }
435}