1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//! Memory estimation, access tracking, and prefetching for the CSR index.
//!
//! Provides RAM usage estimation for SpillController integration,
//! hot/cold node identification via access counting, and CPU cache
//! prefetch hints for BFS traversal optimization.
use super::index::CsrIndex;
impl CsrIndex {
/// Record a node access (called during traversals for hot/cold tracking).
pub fn record_access(&self, node_id: u32) {
let idx = node_id as usize;
if idx < self.access_counts.len() {
let c = &self.access_counts[idx];
c.set(c.get().saturating_add(1));
}
}
/// Get nodes with access count below `threshold` (cold nodes).
pub fn cold_nodes(&self, threshold: u32) -> Vec<u32> {
self.access_counts
.iter()
.enumerate()
.filter(|(_, c)| c.get() <= threshold)
.map(|(i, _)| i as u32)
.collect()
}
/// Number of hot nodes (access count > 0).
pub fn hot_node_count(&self) -> usize {
self.access_counts.iter().filter(|c| c.get() > 0).count()
}
/// Current query epoch (incremented per traversal call).
pub fn query_epoch(&self) -> u64 {
self.query_epoch
}
/// Reset access counters (called during compaction or periodically).
pub fn reset_access_counts(&mut self) {
self.access_counts.iter().for_each(|c| c.set(0));
self.query_epoch = 0;
}
/// Predictive prefetch: hint the OS to load a node's adjacency data
/// into the page cache before the traversal touches it.
///
/// For in-memory dense CSR, this prefetches the cache line containing
/// the node's offset/target entries. For future mmap'd cold segments,
/// this would call `madvise(MADV_WILLNEED)` on the relevant pages.
///
/// Called during BFS planning: when adding nodes to the next frontier,
/// prefetch their neighbors' data so it's resident when the BFS loop
/// reaches them on the next iteration.
#[inline]
pub fn prefetch_node(&self, node_id: u32) {
let idx = node_id as usize;
if idx + 1 < self.out_offsets.len() {
// SAFETY: We're just hinting the CPU to load this address.
// The offset is within bounds (checked above). This is a
// performance hint, not a correctness requirement.
#[cfg(target_arch = "x86_64")]
unsafe {
let ptr = self.out_offsets.as_ptr().add(idx) as *const u8;
std::arch::x86_64::_mm_prefetch(ptr as *const i8, std::arch::x86_64::_MM_HINT_T0);
}
}
}
/// Prefetch a batch of nodes (called during BFS frontier expansion).
pub fn prefetch_batch(&self, node_ids: &[u32]) {
for &id in node_ids {
self.prefetch_node(id);
}
}
/// Evaluate graph memory pressure and return promotion/demotion hints.
///
/// Uses SpillController-compatible thresholds (90%/75% hysteresis):
/// - Above spill threshold: demote cold nodes (spill to potential mmap)
/// - Below restore threshold: promote warm nodes back to hot RAM
///
/// `utilization` = estimated memory usage as percentage (0-100).
/// Returns `(nodes_to_demote, nodes_to_promote)` counts.
pub fn evaluate_memory_pressure(
&self,
utilization: u8,
spill_threshold: u8,
restore_threshold: u8,
) -> (usize, usize) {
if utilization >= spill_threshold {
// Above spill threshold: identify cold nodes to demote.
let cold = self.cold_nodes(0);
(cold.len(), 0)
} else if utilization <= restore_threshold {
// Below restore threshold: all nodes can stay hot.
(0, self.node_count())
} else {
// In hysteresis band: no action.
(0, 0)
}
}
/// Estimated memory usage of the dense CSR in bytes.
///
/// Used for SpillController utilization calculation.
pub fn estimated_memory_bytes(&self) -> usize {
let offsets = (self.out_offsets.len() + self.in_offsets.len()) * 4;
let targets = (self.out_targets.len() + self.in_targets.len()) * 4;
let labels = (self.out_labels.len() + self.in_labels.len()) * 2;
let weights = self.out_weights.as_ref().map_or(0, |w| w.len() * 8)
+ self.in_weights.as_ref().map_or(0, |w| w.len() * 8);
let buffer: usize = self
.buffer_out
.iter()
.chain(self.buffer_in.iter())
.map(|b| b.len() * 6) // (u16 + u32) per entry
.sum();
let buffer_weights: usize = self
.buffer_out_weights
.iter()
.chain(self.buffer_in_weights.iter())
.map(|b| b.len() * 8)
.sum();
let interning = self.id_to_node.iter().map(|s| s.len() + 24).sum::<usize>()
+ self.id_to_label.iter().map(|s| s.len() + 24).sum::<usize>();
offsets + targets + labels + weights + buffer + buffer_weights + interning
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn access_tracking() {
let mut csr = CsrIndex::new();
csr.add_edge("a", "L", "b");
csr.add_edge("b", "L", "c");
let a_id = csr.node_id("a").unwrap();
assert_eq!(csr.hot_node_count(), 0);
csr.record_access(a_id);
csr.record_access(a_id);
assert_eq!(csr.hot_node_count(), 1);
let cold = csr.cold_nodes(0);
assert!(!cold.contains(&a_id));
assert_eq!(cold.len(), 2); // b and c
csr.reset_access_counts();
assert_eq!(csr.hot_node_count(), 0);
}
#[test]
fn memory_estimation_includes_weights() {
let mut unweighted = CsrIndex::new();
unweighted.add_edge("a", "L", "b");
let mut weighted = CsrIndex::new();
weighted.add_edge_weighted("a", "L", "b", 5.0);
// Weighted graph uses more memory.
assert!(weighted.estimated_memory_bytes() >= unweighted.estimated_memory_bytes());
}
#[test]
fn evaluate_memory_pressure_hysteresis() {
let mut csr = CsrIndex::new();
csr.add_edge("a", "L", "b");
// Above spill threshold.
let (demote, promote) = csr.evaluate_memory_pressure(95, 90, 75);
assert!(demote > 0);
assert_eq!(promote, 0);
// Below restore threshold.
let (demote, promote) = csr.evaluate_memory_pressure(60, 90, 75);
assert_eq!(demote, 0);
assert!(promote > 0);
// In hysteresis band.
let (demote, promote) = csr.evaluate_memory_pressure(80, 90, 75);
assert_eq!(demote, 0);
assert_eq!(promote, 0);
}
}