Skip to main content

nodedb_graph/csr/index/
mutation.rs

1//! Edge insert / remove paths and node-edge cleanup.
2
3use super::types::CsrIndex;
4
5impl CsrIndex {
6    /// Incrementally add an unweighted edge (goes into mutable buffer).
7    /// Uses weight 1.0 if the graph already has weighted edges.
8    ///
9    /// Returns `Err(GraphError::LabelOverflow)` if the label id space is
10    /// exhausted. Production callers should always surface this to the
11    /// client; silently ignoring it reproduces the silent-wrap bug the
12    /// `u32` widening was meant to fix.
13    pub fn add_edge(&mut self, src: &str, label: &str, dst: &str) -> Result<(), crate::GraphError> {
14        self.add_edge_internal(src, label, dst, 1.0, false)
15    }
16
17    /// Incrementally add a weighted edge (goes into mutable buffer).
18    ///
19    /// If this is the first weighted edge (weight != 1.0), initializes
20    /// the weight tracking infrastructure (backfills existing buffer
21    /// entries with 1.0).
22    pub fn add_edge_weighted(
23        &mut self,
24        src: &str,
25        label: &str,
26        dst: &str,
27        weight: f64,
28    ) -> Result<(), crate::GraphError> {
29        self.add_edge_internal(src, label, dst, weight, weight != 1.0)
30    }
31
32    fn add_edge_internal(
33        &mut self,
34        src: &str,
35        label: &str,
36        dst: &str,
37        weight: f64,
38        force_weights: bool,
39    ) -> Result<(), crate::GraphError> {
40        let src_id = self.ensure_node(src);
41        let dst_id = self.ensure_node(dst);
42        let label_id = self.ensure_label(label)?;
43
44        // Check for duplicates in buffer.
45        let out = &self.buffer_out[src_id as usize];
46        if out.iter().any(|&(l, d)| l == label_id && d == dst_id) {
47            return Ok(());
48        }
49        // Check for duplicates in dense CSR.
50        if self.dense_has_edge(src_id, label_id, dst_id) {
51            return Ok(());
52        }
53
54        // Initialize weight tracking on first non-default weight.
55        if force_weights && !self.has_weights {
56            self.enable_weights();
57        }
58
59        self.buffer_out[src_id as usize].push((label_id, dst_id));
60        self.buffer_in[dst_id as usize].push((label_id, src_id));
61
62        if self.has_weights {
63            self.buffer_out_weights[src_id as usize].push(weight);
64            self.buffer_in_weights[dst_id as usize].push(weight);
65        }
66
67        // If it was previously deleted, un-delete.
68        self.deleted_edges.remove(&(src_id, label_id, dst_id));
69        Ok(())
70    }
71
72    /// Incrementally remove an edge.
73    pub fn remove_edge(&mut self, src: &str, label: &str, dst: &str) {
74        let (Some(&src_id), Some(&dst_id)) = (self.node_to_id.get(src), self.node_to_id.get(dst))
75        else {
76            return;
77        };
78        let Some(&label_id) = self.label_to_id.get(label) else {
79            return;
80        };
81
82        // Remove from buffer if present (keep weight buffers in sync).
83        let out_buf = &self.buffer_out[src_id as usize];
84        if let Some(pos) = out_buf
85            .iter()
86            .position(|&(l, d)| l == label_id && d == dst_id)
87        {
88            self.buffer_out[src_id as usize].swap_remove(pos);
89            if self.has_weights {
90                self.buffer_out_weights[src_id as usize].swap_remove(pos);
91            }
92        }
93        let in_buf = &self.buffer_in[dst_id as usize];
94        if let Some(pos) = in_buf
95            .iter()
96            .position(|&(l, s)| l == label_id && s == src_id)
97        {
98            self.buffer_in[dst_id as usize].swap_remove(pos);
99            if self.has_weights {
100                self.buffer_in_weights[dst_id as usize].swap_remove(pos);
101            }
102        }
103
104        // Mark as deleted in dense CSR.
105        if self.dense_has_edge(src_id, label_id, dst_id) {
106            self.deleted_edges.insert((src_id, label_id, dst_id));
107        }
108    }
109
110    /// Remove ALL edges touching a node. Returns the number of edges removed.
111    pub fn remove_node_edges(&mut self, node: &str) -> usize {
112        let Some(&node_id) = self.node_to_id.get(node) else {
113            return 0;
114        };
115        let mut removed = 0;
116
117        // Collect outgoing edges then remove reverse references.
118        let out_edges: Vec<(u32, u32)> = self.iter_out_edges(node_id).collect();
119        for (label_id, dst_id) in &out_edges {
120            let in_buf = &self.buffer_in[*dst_id as usize];
121            if let Some(pos) = in_buf
122                .iter()
123                .position(|&(l, s)| l == *label_id && s == node_id)
124            {
125                self.buffer_in[*dst_id as usize].swap_remove(pos);
126                if self.has_weights {
127                    self.buffer_in_weights[*dst_id as usize].swap_remove(pos);
128                }
129            }
130            self.deleted_edges.insert((node_id, *label_id, *dst_id));
131            removed += 1;
132        }
133        self.buffer_out[node_id as usize].clear();
134        if self.has_weights {
135            self.buffer_out_weights[node_id as usize].clear();
136        }
137
138        // Collect incoming edges then remove reverse references.
139        let in_edges: Vec<(u32, u32)> = self.iter_in_edges(node_id).collect();
140        for (label_id, src_id) in &in_edges {
141            let out_buf = &self.buffer_out[*src_id as usize];
142            if let Some(pos) = out_buf
143                .iter()
144                .position(|&(l, d)| l == *label_id && d == node_id)
145            {
146                self.buffer_out[*src_id as usize].swap_remove(pos);
147                if self.has_weights {
148                    self.buffer_out_weights[*src_id as usize].swap_remove(pos);
149                }
150            }
151            self.deleted_edges.insert((*src_id, *label_id, node_id));
152            removed += 1;
153        }
154        self.buffer_in[node_id as usize].clear();
155        if self.has_weights {
156            self.buffer_in_weights[node_id as usize].clear();
157        }
158
159        removed
160    }
161
162    /// Remove all edges touching any node whose ID starts with `prefix`.
163    ///
164    /// Used for tenant purge: `prefix = "{tenant_id}:"` removes all
165    /// edges belonging to that tenant.
166    pub fn remove_nodes_with_prefix(&mut self, prefix: &str) {
167        let matching_nodes: Vec<String> = self
168            .node_to_id
169            .keys()
170            .filter(|k| k.starts_with(prefix))
171            .cloned()
172            .collect();
173        for node in &matching_nodes {
174            self.remove_node_edges(node);
175        }
176    }
177}