Skip to main content

nodedb_graph/csr/index/
mutation.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Edge insert / remove paths and node-edge cleanup.
4
5use super::types::CsrIndex;
6
7impl CsrIndex {
8    /// Incrementally add an unweighted edge (goes into mutable buffer).
9    /// Uses weight 1.0 if the graph already has weighted edges.
10    ///
11    /// Returns `Err(GraphError::LabelOverflow)` if the label id space is
12    /// exhausted. Production callers should always surface this to the
13    /// client; silently ignoring it reproduces the silent-wrap bug the
14    /// `u32` widening was meant to fix.
15    pub fn add_edge(&mut self, src: &str, label: &str, dst: &str) -> Result<(), crate::GraphError> {
16        self.add_edge_internal(src, label, dst, 1.0, false)
17    }
18
19    /// Incrementally add a weighted edge (goes into mutable buffer).
20    ///
21    /// If this is the first weighted edge (weight != 1.0), initializes
22    /// the weight tracking infrastructure (backfills existing buffer
23    /// entries with 1.0).
24    pub fn add_edge_weighted(
25        &mut self,
26        src: &str,
27        label: &str,
28        dst: &str,
29        weight: f64,
30    ) -> Result<(), crate::GraphError> {
31        self.add_edge_internal(src, label, dst, weight, weight != 1.0)
32    }
33
34    fn add_edge_internal(
35        &mut self,
36        src: &str,
37        label: &str,
38        dst: &str,
39        weight: f64,
40        force_weights: bool,
41    ) -> Result<(), crate::GraphError> {
42        let src_id = self.ensure_node(src)?;
43        let dst_id = self.ensure_node(dst)?;
44        let label_id = self.ensure_label(label)?;
45
46        // Check for duplicates in buffer.
47        let out = &self.buffer_out[src_id as usize];
48        if out.iter().any(|&(l, d)| l == label_id && d == dst_id) {
49            return Ok(());
50        }
51        // Check for duplicates in dense CSR.
52        if self.dense_has_edge(src_id, label_id, dst_id) {
53            return Ok(());
54        }
55
56        // Initialize weight tracking on first non-default weight.
57        if force_weights && !self.has_weights {
58            self.enable_weights();
59        }
60
61        self.buffer_out[src_id as usize].push((label_id, dst_id));
62        self.buffer_in[dst_id as usize].push((label_id, src_id));
63
64        if self.has_weights {
65            self.buffer_out_weights[src_id as usize].push(weight);
66            self.buffer_in_weights[dst_id as usize].push(weight);
67        }
68
69        // If it was previously deleted, un-delete.
70        self.deleted_edges.remove(&(src_id, label_id, dst_id));
71        Ok(())
72    }
73
74    /// Incrementally remove an edge.
75    pub fn remove_edge(&mut self, src: &str, label: &str, dst: &str) {
76        let (Some(&src_id), Some(&dst_id)) = (self.node_to_id.get(src), self.node_to_id.get(dst))
77        else {
78            return;
79        };
80        let Some(&label_id) = self.label_to_id.get(label) else {
81            return;
82        };
83
84        // Remove from buffer if present (keep weight buffers in sync).
85        let out_buf = &self.buffer_out[src_id as usize];
86        if let Some(pos) = out_buf
87            .iter()
88            .position(|&(l, d)| l == label_id && d == dst_id)
89        {
90            self.buffer_out[src_id as usize].swap_remove(pos);
91            if self.has_weights {
92                self.buffer_out_weights[src_id as usize].swap_remove(pos);
93            }
94        }
95        let in_buf = &self.buffer_in[dst_id as usize];
96        if let Some(pos) = in_buf
97            .iter()
98            .position(|&(l, s)| l == label_id && s == src_id)
99        {
100            self.buffer_in[dst_id as usize].swap_remove(pos);
101            if self.has_weights {
102                self.buffer_in_weights[dst_id as usize].swap_remove(pos);
103            }
104        }
105
106        // Mark as deleted in dense CSR.
107        if self.dense_has_edge(src_id, label_id, dst_id) {
108            self.deleted_edges.insert((src_id, label_id, dst_id));
109        }
110    }
111
112    /// Remove ALL edges touching a node. Returns the number of edges removed.
113    pub fn remove_node_edges(&mut self, node: &str) -> usize {
114        let Some(&node_id) = self.node_to_id.get(node) else {
115            return 0;
116        };
117        let mut removed = 0;
118
119        // Collect outgoing edges then remove reverse references.
120        let out_edges: Vec<(u32, u32)> = self.dense_iter_out(node_id).collect();
121        for (label_id, dst_id) in &out_edges {
122            let in_buf = &self.buffer_in[*dst_id as usize];
123            if let Some(pos) = in_buf
124                .iter()
125                .position(|&(l, s)| l == *label_id && s == node_id)
126            {
127                self.buffer_in[*dst_id as usize].swap_remove(pos);
128                if self.has_weights {
129                    self.buffer_in_weights[*dst_id as usize].swap_remove(pos);
130                }
131            }
132            self.deleted_edges.insert((node_id, *label_id, *dst_id));
133            removed += 1;
134        }
135        self.buffer_out[node_id as usize].clear();
136        if self.has_weights {
137            self.buffer_out_weights[node_id as usize].clear();
138        }
139
140        // Collect incoming edges then remove reverse references.
141        let in_edges: Vec<(u32, u32)> = self.dense_iter_in(node_id).collect();
142        for (label_id, src_id) in &in_edges {
143            let out_buf = &self.buffer_out[*src_id as usize];
144            if let Some(pos) = out_buf
145                .iter()
146                .position(|&(l, d)| l == *label_id && d == node_id)
147            {
148                self.buffer_out[*src_id as usize].swap_remove(pos);
149                if self.has_weights {
150                    self.buffer_out_weights[*src_id as usize].swap_remove(pos);
151                }
152            }
153            self.deleted_edges.insert((*src_id, *label_id, node_id));
154            removed += 1;
155        }
156        self.buffer_in[node_id as usize].clear();
157        if self.has_weights {
158            self.buffer_in_weights[node_id as usize].clear();
159        }
160
161        removed
162    }
163
164    /// Remove all edges touching any node whose ID starts with `prefix`.
165    ///
166    /// Used for tenant purge: `prefix = "{tenant_id}:"` removes all
167    /// edges belonging to that tenant.
168    pub fn remove_nodes_with_prefix(&mut self, prefix: &str) {
169        let matching_nodes: Vec<String> = self
170            .node_to_id
171            .keys()
172            .filter(|k| k.starts_with(prefix))
173            .cloned()
174            .collect();
175        for node in &matching_nodes {
176            self.remove_node_edges(node);
177        }
178    }
179}