Skip to main content

nodedb_graph/csr/
compaction.rs

1//! CSR compaction: merges the mutable write buffer into dense arrays.
2
3use super::index::CsrIndex;
4
5impl CsrIndex {
6    /// Merge the mutable buffer into the dense CSR arrays.
7    ///
8    /// Called during idle periods. Rebuilds the contiguous offset/target/label
9    /// (and weight) arrays from scratch (buffer + surviving dense edges).
10    /// The old arrays are dropped, freeing memory. O(E) where E = total edges.
11    pub fn compact(&mut self) {
12        let n = self.id_to_node.len();
13        let mut new_out_edges: Vec<Vec<(u32, u32)>> = vec![Vec::new(); n];
14        let mut new_in_edges: Vec<Vec<(u32, u32)>> = vec![Vec::new(); n];
15        let mut new_out_weights: Vec<Vec<f64>> = if self.has_weights {
16            vec![Vec::new(); n]
17        } else {
18            Vec::new()
19        };
20        let mut new_in_weights: Vec<Vec<f64>> = if self.has_weights {
21            vec![Vec::new(); n]
22        } else {
23            Vec::new()
24        };
25
26        // Collect surviving dense edges.
27        for node in 0..n {
28            let node_id = node as u32;
29            let idx = node_id as usize;
30
31            // Outbound dense edges.
32            if idx + 1 < self.out_offsets.len() {
33                let start = self.out_offsets[idx] as usize;
34                let end = self.out_offsets[idx + 1] as usize;
35                for i in start..end {
36                    let lid = self.out_labels[i];
37                    let dst = self.out_targets[i];
38                    if !self.deleted_edges.contains(&(node_id, lid, dst)) {
39                        new_out_edges[node].push((lid, dst));
40                        if self.has_weights {
41                            let w = self
42                                .out_weights
43                                .as_ref()
44                                .map_or(1.0, |ws| ws.get(i).copied().unwrap_or(1.0));
45                            new_out_weights[node].push(w);
46                        }
47                    }
48                }
49            }
50
51            // Inbound dense edges.
52            if idx + 1 < self.in_offsets.len() {
53                let start = self.in_offsets[idx] as usize;
54                let end = self.in_offsets[idx + 1] as usize;
55                for i in start..end {
56                    let lid = self.in_labels[i];
57                    let src = self.in_targets[i];
58                    if !self.deleted_edges.contains(&(src, lid, node_id)) {
59                        new_in_edges[node].push((lid, src));
60                        if self.has_weights {
61                            let w = self
62                                .in_weights
63                                .as_ref()
64                                .map_or(1.0, |ws| ws.get(i).copied().unwrap_or(1.0));
65                            new_in_weights[node].push(w);
66                        }
67                    }
68                }
69            }
70        }
71
72        // Merge buffer edges.
73        for node in 0..n {
74            for (buf_idx, &(lid, dst)) in self.buffer_out[node].iter().enumerate() {
75                if !new_out_edges[node]
76                    .iter()
77                    .any(|&(l, d)| l == lid && d == dst)
78                {
79                    new_out_edges[node].push((lid, dst));
80                    if self.has_weights {
81                        let w = self.buffer_out_weights[node]
82                            .get(buf_idx)
83                            .copied()
84                            .unwrap_or(1.0);
85                        new_out_weights[node].push(w);
86                    }
87                }
88            }
89            for (buf_idx, &(lid, src)) in self.buffer_in[node].iter().enumerate() {
90                if !new_in_edges[node]
91                    .iter()
92                    .any(|&(l, s)| l == lid && s == src)
93                {
94                    new_in_edges[node].push((lid, src));
95                    if self.has_weights {
96                        let w = self.buffer_in_weights[node]
97                            .get(buf_idx)
98                            .copied()
99                            .unwrap_or(1.0);
100                        new_in_weights[node].push(w);
101                    }
102                }
103            }
104        }
105
106        // Build new dense arrays.
107        let (out_offsets, out_targets, out_labels) = Self::build_dense(&new_out_edges);
108        let (in_offsets, in_targets, in_labels) = Self::build_dense(&new_in_edges);
109
110        self.out_offsets = out_offsets;
111        self.out_targets = out_targets.into();
112        self.out_labels = out_labels.into();
113        self.in_offsets = in_offsets;
114        self.in_targets = in_targets.into();
115        self.in_labels = in_labels.into();
116
117        // Build weight arrays (flatten per-node vecs into contiguous array).
118        if self.has_weights {
119            self.out_weights = Some(
120                new_out_weights
121                    .into_iter()
122                    .flatten()
123                    .collect::<Vec<_>>()
124                    .into(),
125            );
126            self.in_weights = Some(
127                new_in_weights
128                    .into_iter()
129                    .flatten()
130                    .collect::<Vec<_>>()
131                    .into(),
132            );
133        }
134
135        // Clear buffer and deleted set.
136        for buf in &mut self.buffer_out {
137            buf.clear();
138        }
139        for buf in &mut self.buffer_in {
140            buf.clear();
141        }
142        if self.has_weights {
143            for buf in &mut self.buffer_out_weights {
144                buf.clear();
145            }
146            for buf in &mut self.buffer_in_weights {
147                buf.clear();
148            }
149        }
150        self.deleted_edges.clear();
151    }
152}