nodedb_graph/csr/
memory.rs1use super::index::CsrIndex;
8
9impl CsrIndex {
10 pub fn record_access(&self, node_id: u32) {
12 let idx = node_id as usize;
13 if idx < self.access_counts.len() {
14 let c = &self.access_counts[idx];
15 c.set(c.get().saturating_add(1));
16 }
17 }
18
19 pub fn cold_nodes(&self, threshold: u32) -> Vec<u32> {
21 self.access_counts
22 .iter()
23 .enumerate()
24 .filter(|(_, c)| c.get() <= threshold)
25 .map(|(i, _)| i as u32)
26 .collect()
27 }
28
29 pub fn hot_node_count(&self) -> usize {
31 self.access_counts.iter().filter(|c| c.get() > 0).count()
32 }
33
34 pub fn query_epoch(&self) -> u64 {
36 self.query_epoch
37 }
38
39 pub fn reset_access_counts(&mut self) {
41 self.access_counts.iter().for_each(|c| c.set(0));
42 self.query_epoch = 0;
43 }
44
45 #[inline]
56 pub fn prefetch_node(&self, node_id: u32) {
57 let idx = node_id as usize;
58 if idx + 1 < self.out_offsets.len() {
59 #[cfg(target_arch = "x86_64")]
63 unsafe {
64 let ptr = self.out_offsets.as_ptr().add(idx) as *const u8;
65 std::arch::x86_64::_mm_prefetch(ptr as *const i8, std::arch::x86_64::_MM_HINT_T0);
66 }
67 }
68 }
69
70 pub fn prefetch_batch(&self, node_ids: &[u32]) {
72 for &id in node_ids {
73 self.prefetch_node(id);
74 }
75 }
76
77 pub fn evaluate_memory_pressure(
86 &self,
87 utilization: u8,
88 spill_threshold: u8,
89 restore_threshold: u8,
90 ) -> (usize, usize) {
91 if utilization >= spill_threshold {
92 let cold = self.cold_nodes(0);
94 (cold.len(), 0)
95 } else if utilization <= restore_threshold {
96 (0, self.node_count())
98 } else {
99 (0, 0)
101 }
102 }
103
104 pub fn estimated_memory_bytes(&self) -> usize {
108 let offsets = (self.out_offsets.len() + self.in_offsets.len()) * 4;
109 let targets = (self.out_targets.len() + self.in_targets.len()) * 4;
110 let labels = (self.out_labels.len() + self.in_labels.len()) * 2;
111 let weights = self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
112 + self.in_weights.as_ref().map_or(0, |w| w.len() * 8);
113 let buffer: usize = self
114 .buffer_out
115 .iter()
116 .chain(self.buffer_in.iter())
117 .map(|b| b.len() * 6) .sum();
119 let buffer_weights: usize = self
120 .buffer_out_weights
121 .iter()
122 .chain(self.buffer_in_weights.iter())
123 .map(|b| b.len() * 8)
124 .sum();
125 let interning = self.id_to_node.iter().map(|s| s.len() + 24).sum::<usize>()
126 + self.id_to_label.iter().map(|s| s.len() + 24).sum::<usize>();
127 offsets + targets + labels + weights + buffer + buffer_weights + interning
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 #[test]
136 fn access_tracking() {
137 let mut csr = CsrIndex::new();
138 csr.add_edge("a", "L", "b");
139 csr.add_edge("b", "L", "c");
140
141 let a_id = csr.node_id("a").unwrap();
142 assert_eq!(csr.hot_node_count(), 0);
143
144 csr.record_access(a_id);
145 csr.record_access(a_id);
146 assert_eq!(csr.hot_node_count(), 1);
147
148 let cold = csr.cold_nodes(0);
149 assert!(!cold.contains(&a_id));
150 assert_eq!(cold.len(), 2); csr.reset_access_counts();
153 assert_eq!(csr.hot_node_count(), 0);
154 }
155
156 #[test]
157 fn memory_estimation_includes_weights() {
158 let mut unweighted = CsrIndex::new();
159 unweighted.add_edge("a", "L", "b");
160
161 let mut weighted = CsrIndex::new();
162 weighted.add_edge_weighted("a", "L", "b", 5.0);
163
164 assert!(weighted.estimated_memory_bytes() >= unweighted.estimated_memory_bytes());
166 }
167
168 #[test]
169 fn evaluate_memory_pressure_hysteresis() {
170 let mut csr = CsrIndex::new();
171 csr.add_edge("a", "L", "b");
172
173 let (demote, promote) = csr.evaluate_memory_pressure(95, 90, 75);
175 assert!(demote > 0);
176 assert_eq!(promote, 0);
177
178 let (demote, promote) = csr.evaluate_memory_pressure(60, 90, 75);
180 assert_eq!(demote, 0);
181 assert!(promote > 0);
182
183 let (demote, promote) = csr.evaluate_memory_pressure(80, 90, 75);
185 assert_eq!(demote, 0);
186 assert_eq!(promote, 0);
187 }
188}