Skip to main content

nodedb_graph/csr/
compaction.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! CSR compaction: merges the mutable write buffer into dense arrays.
4
5use super::index::CsrIndex;
6use crate::GraphError;
7
8impl CsrIndex {
9    /// Merge the mutable buffer into the dense CSR arrays.
10    ///
11    /// Called during idle periods. Rebuilds the contiguous offset/target/label
12    /// (and weight) arrays from scratch (buffer + surviving dense edges).
13    /// The old arrays are dropped, freeing memory. O(E) where E = total edges.
14    ///
15    /// # Errors
16    ///
17    /// Returns [`GraphError::MemoryBudget`] if a memory governor is installed
18    /// and the dense-array allocation would exceed the `Graph` engine budget.
19    pub fn compact(&mut self) -> Result<(), GraphError> {
20        let n = self.id_to_node.len();
21        let mut new_out_edges: Vec<Vec<(u32, u32)>> = vec![Vec::new(); n];
22        let mut new_in_edges: Vec<Vec<(u32, u32)>> = vec![Vec::new(); n];
23        let mut new_out_weights: Vec<Vec<f64>> = if self.has_weights {
24            vec![Vec::new(); n]
25        } else {
26            Vec::new()
27        };
28        let mut new_in_weights: Vec<Vec<f64>> = if self.has_weights {
29            vec![Vec::new(); n]
30        } else {
31            Vec::new()
32        };
33
34        // Collect surviving dense edges.
35        for node in 0..n {
36            let node_id = node as u32;
37            let idx = node_id as usize;
38
39            // Outbound dense edges.
40            if idx + 1 < self.out_offsets.len() {
41                let start = self.out_offsets[idx] as usize;
42                let end = self.out_offsets[idx + 1] as usize;
43                for i in start..end {
44                    let lid = self.out_labels[i];
45                    let dst = self.out_targets[i];
46                    if !self.deleted_edges.contains(&(node_id, lid, dst)) {
47                        new_out_edges[node].push((lid, dst));
48                        if self.has_weights {
49                            let w = self
50                                .out_weights
51                                .as_ref()
52                                .map_or(1.0, |ws| ws.get(i).copied().unwrap_or(1.0));
53                            new_out_weights[node].push(w);
54                        }
55                    }
56                }
57            }
58
59            // Inbound dense edges.
60            if idx + 1 < self.in_offsets.len() {
61                let start = self.in_offsets[idx] as usize;
62                let end = self.in_offsets[idx + 1] as usize;
63                for i in start..end {
64                    let lid = self.in_labels[i];
65                    let src = self.in_targets[i];
66                    if !self.deleted_edges.contains(&(src, lid, node_id)) {
67                        new_in_edges[node].push((lid, src));
68                        if self.has_weights {
69                            let w = self
70                                .in_weights
71                                .as_ref()
72                                .map_or(1.0, |ws| ws.get(i).copied().unwrap_or(1.0));
73                            new_in_weights[node].push(w);
74                        }
75                    }
76                }
77            }
78        }
79
80        // Merge buffer edges.
81        for node in 0..n {
82            for (buf_idx, &(lid, dst)) in self.buffer_out[node].iter().enumerate() {
83                if !new_out_edges[node]
84                    .iter()
85                    .any(|&(l, d)| l == lid && d == dst)
86                {
87                    new_out_edges[node].push((lid, dst));
88                    if self.has_weights {
89                        let w = self.buffer_out_weights[node]
90                            .get(buf_idx)
91                            .copied()
92                            .unwrap_or(1.0);
93                        new_out_weights[node].push(w);
94                    }
95                }
96            }
97            for (buf_idx, &(lid, src)) in self.buffer_in[node].iter().enumerate() {
98                if !new_in_edges[node]
99                    .iter()
100                    .any(|&(l, s)| l == lid && s == src)
101                {
102                    new_in_edges[node].push((lid, src));
103                    if self.has_weights {
104                        let w = self.buffer_in_weights[node]
105                            .get(buf_idx)
106                            .copied()
107                            .unwrap_or(1.0);
108                        new_in_weights[node].push(w);
109                    }
110                }
111            }
112        }
113
114        // Build new dense arrays.
115        let governor = self.governor.as_ref();
116        let out = Self::build_dense(&new_out_edges, governor)?;
117        let in_ = Self::build_dense(&new_in_edges, governor)?;
118
119        self.out_offsets = out.offsets;
120        self.out_targets = out.targets.into();
121        self.out_labels = out.labels.into();
122        self.in_offsets = in_.offsets;
123        self.in_targets = in_.targets.into();
124        self.in_labels = in_.labels.into();
125
126        // Build weight arrays (flatten per-node vecs into contiguous array).
127        if self.has_weights {
128            self.out_weights = Some(
129                new_out_weights
130                    .into_iter()
131                    .flatten()
132                    .collect::<Vec<_>>()
133                    .into(),
134            );
135            self.in_weights = Some(
136                new_in_weights
137                    .into_iter()
138                    .flatten()
139                    .collect::<Vec<_>>()
140                    .into(),
141            );
142        }
143
144        // Clear buffer and deleted set.
145        for buf in &mut self.buffer_out {
146            buf.clear();
147        }
148        for buf in &mut self.buffer_in {
149            buf.clear();
150        }
151        if self.has_weights {
152            for buf in &mut self.buffer_out_weights {
153                buf.clear();
154            }
155            for buf in &mut self.buffer_in_weights {
156                buf.clear();
157            }
158        }
159        self.deleted_edges.clear();
160        Ok(())
161    }
162}