ruvector_graph/optimization/
cache_hierarchy.rs1use parking_lot::RwLock;
7use std::alloc::{alloc, dealloc, Layout};
8use std::sync::Arc;
9
10const CACHE_LINE_SIZE: usize = 64;
12
13const L1_CACHE_SIZE: usize = 32 * 1024;
15
16const L2_CACHE_SIZE: usize = 256 * 1024;
18
19const L3_CACHE_SIZE: usize = 8 * 1024 * 1024;
21
22pub struct CacheHierarchy {
24 hot_storage: Arc<RwLock<HotStorage>>,
26 cold_storage: Arc<RwLock<ColdStorage>>,
28 access_tracker: Arc<RwLock<AccessTracker>>,
30}
31
32impl CacheHierarchy {
33 pub fn new(hot_capacity: usize, cold_capacity: usize) -> Self {
35 Self {
36 hot_storage: Arc::new(RwLock::new(HotStorage::new(hot_capacity))),
37 cold_storage: Arc::new(RwLock::new(ColdStorage::new(cold_capacity))),
38 access_tracker: Arc::new(RwLock::new(AccessTracker::new())),
39 }
40 }
41
42 pub fn get_node(&self, node_id: u64) -> Option<NodeData> {
44 self.access_tracker.write().record_access(node_id);
46
47 if let Some(data) = self.hot_storage.read().get(node_id) {
49 return Some(data);
50 }
51
52 if let Some(data) = self.cold_storage.read().get(node_id) {
54 if self.access_tracker.read().should_promote(node_id) {
56 self.promote_to_hot(node_id, data.clone());
57 }
58 return Some(data);
59 }
60
61 None
62 }
63
64 pub fn insert_node(&self, node_id: u64, data: NodeData) {
66 self.access_tracker.write().record_access(node_id);
68
69 if self.hot_storage.read().is_at_capacity() {
71 self.evict_one_to_cold(node_id); }
73
74 self.hot_storage.write().insert(node_id, data.clone());
76 }
77
78 fn promote_to_hot(&self, node_id: u64, data: NodeData) {
80 if self.hot_storage.read().is_full() {
82 self.evict_one_to_cold(node_id); }
84
85 self.hot_storage.write().insert(node_id, data);
86 self.cold_storage.write().remove(node_id);
87 }
88
89 fn evict_cold(&self) {
91 let tracker = self.access_tracker.read();
92 let lru_nodes = tracker.get_lru_nodes_by_frequency(10);
93 drop(tracker);
94
95 let mut hot = self.hot_storage.write();
96 let mut cold = self.cold_storage.write();
97
98 for node_id in lru_nodes {
99 if let Some(data) = hot.remove(node_id) {
100 cold.insert(node_id, data);
101 }
102 }
103 }
104
105 fn evict_one_to_cold(&self, protected_id: u64) {
107 let tracker = self.access_tracker.read();
108 let candidates = tracker.get_lru_nodes_by_frequency(5);
110 drop(tracker);
111
112 let mut hot = self.hot_storage.write();
113 let mut cold = self.cold_storage.write();
114
115 for node_id in candidates {
116 if node_id != protected_id {
117 if let Some(data) = hot.remove(node_id) {
118 cold.insert(node_id, data);
119 return;
120 }
121 }
122 }
123 }
124
125 pub fn prefetch_neighbors(&self, node_ids: &[u64]) {
127 for &node_id in node_ids {
129 #[cfg(target_arch = "x86_64")]
130 unsafe {
131 std::arch::x86_64::_mm_prefetch(
133 &node_id as *const u64 as *const i8,
134 std::arch::x86_64::_MM_HINT_T0,
135 );
136 }
137 }
138 }
139}
140
141#[repr(align(64))]
143struct HotStorage {
144 entries: Vec<CacheLineEntry>,
146 capacity: usize,
148 size: usize,
150}
151
152impl HotStorage {
153 fn new(capacity: usize) -> Self {
154 Self {
155 entries: Vec::with_capacity(capacity),
156 capacity,
157 size: 0,
158 }
159 }
160
161 fn get(&self, node_id: u64) -> Option<NodeData> {
162 self.entries
163 .iter()
164 .find(|e| e.node_id == node_id)
165 .map(|e| e.data.clone())
166 }
167
168 fn insert(&mut self, node_id: u64, data: NodeData) {
169 self.entries.retain(|e| e.node_id != node_id);
171
172 if self.entries.len() >= self.capacity {
173 self.entries.remove(0); }
175
176 self.entries.push(CacheLineEntry { node_id, data });
177 self.size = self.entries.len();
178 }
179
180 fn remove(&mut self, node_id: u64) -> Option<NodeData> {
181 if let Some(pos) = self.entries.iter().position(|e| e.node_id == node_id) {
182 let entry = self.entries.remove(pos);
183 self.size = self.entries.len();
184 Some(entry.data)
185 } else {
186 None
187 }
188 }
189
190 fn is_full(&self) -> bool {
191 self.size >= self.capacity
192 }
193
194 fn is_at_capacity(&self) -> bool {
195 self.size >= self.capacity
196 }
197}
198
199#[repr(align(64))]
201#[derive(Clone)]
202struct CacheLineEntry {
203 node_id: u64,
204 data: NodeData,
205}
206
207struct ColdStorage {
209 entries: dashmap::DashMap<u64, Vec<u8>>,
211 capacity: usize,
212}
213
214impl ColdStorage {
215 fn new(capacity: usize) -> Self {
216 Self {
217 entries: dashmap::DashMap::new(),
218 capacity,
219 }
220 }
221
222 fn get(&self, node_id: u64) -> Option<NodeData> {
223 self.entries.get(&node_id).and_then(|compressed| {
224 bincode::decode_from_slice(&compressed, bincode::config::standard())
226 .ok()
227 .map(|(data, _)| data)
228 })
229 }
230
231 fn insert(&mut self, node_id: u64, data: NodeData) {
232 if let Ok(compressed) = bincode::encode_to_vec(&data, bincode::config::standard()) {
234 self.entries.insert(node_id, compressed);
235 }
236 }
237
238 fn remove(&mut self, node_id: u64) -> Option<NodeData> {
239 self.entries.remove(&node_id).and_then(|(_, compressed)| {
240 bincode::decode_from_slice(&compressed, bincode::config::standard())
241 .ok()
242 .map(|(data, _)| data)
243 })
244 }
245}
246
247struct AccessTracker {
249 access_counts: dashmap::DashMap<u64, u32>,
251 last_access: dashmap::DashMap<u64, u64>,
253 timestamp: u64,
255}
256
257impl AccessTracker {
258 fn new() -> Self {
259 Self {
260 access_counts: dashmap::DashMap::new(),
261 last_access: dashmap::DashMap::new(),
262 timestamp: 0,
263 }
264 }
265
266 fn record_access(&mut self, node_id: u64) {
267 self.timestamp += 1;
268
269 self.access_counts
270 .entry(node_id)
271 .and_modify(|count| *count += 1)
272 .or_insert(1);
273
274 self.last_access.insert(node_id, self.timestamp);
275 }
276
277 fn should_promote(&self, node_id: u64) -> bool {
278 self.access_counts
280 .get(&node_id)
281 .map(|count| *count > 5)
282 .unwrap_or(false)
283 }
284
285 fn get_lru_nodes(&self, count: usize) -> Vec<u64> {
286 let mut nodes: Vec<_> = self
287 .last_access
288 .iter()
289 .map(|entry| (*entry.key(), *entry.value()))
290 .collect();
291
292 nodes.sort_by_key(|(_, timestamp)| *timestamp);
293 nodes
294 .into_iter()
295 .take(count)
296 .map(|(node_id, _)| node_id)
297 .collect()
298 }
299
300 fn get_lru_nodes_by_frequency(&self, count: usize) -> Vec<u64> {
302 let mut nodes: Vec<_> = self
303 .access_counts
304 .iter()
305 .map(|entry| (*entry.key(), *entry.value()))
306 .collect();
307
308 nodes.sort_by_key(|(_, access_count)| *access_count);
310 nodes
311 .into_iter()
312 .take(count)
313 .map(|(node_id, _)| node_id)
314 .collect()
315 }
316}
317
318#[derive(Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
320pub struct NodeData {
321 pub id: u64,
322 pub labels: Vec<String>,
323 pub properties: Vec<(String, CachePropertyValue)>,
324}
325
326#[derive(Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
328pub enum CachePropertyValue {
329 String(String),
330 Integer(i64),
331 Float(f64),
332 Boolean(bool),
333}
334
335pub struct HotColdStorage {
337 cache_hierarchy: CacheHierarchy,
338}
339
340impl HotColdStorage {
341 pub fn new() -> Self {
342 Self {
343 cache_hierarchy: CacheHierarchy::new(1000, 10000),
344 }
345 }
346
347 pub fn get(&self, node_id: u64) -> Option<NodeData> {
348 self.cache_hierarchy.get_node(node_id)
349 }
350
351 pub fn insert(&self, node_id: u64, data: NodeData) {
352 self.cache_hierarchy.insert_node(node_id, data);
353 }
354
355 pub fn prefetch(&self, node_ids: &[u64]) {
356 self.cache_hierarchy.prefetch_neighbors(node_ids);
357 }
358}
359
360impl Default for HotColdStorage {
361 fn default() -> Self {
362 Self::new()
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 #[test]
370 fn test_cache_hierarchy() {
371 let cache = CacheHierarchy::new(10, 100);
372
373 let data = NodeData {
374 id: 1,
375 labels: vec!["Person".to_string()],
376 properties: vec![(
377 "name".to_string(),
378 CachePropertyValue::String("Alice".to_string()),
379 )],
380 };
381
382 cache.insert_node(1, data.clone());
383
384 let retrieved = cache.get_node(1);
385 assert!(retrieved.is_some());
386 }
387
388 #[test]
389 fn test_hot_cold_promotion() {
390 let cache = CacheHierarchy::new(2, 10);
391
392 for i in 1..=3 {
394 cache.insert_node(
395 i,
396 NodeData {
397 id: i,
398 labels: vec![],
399 properties: vec![],
400 },
401 );
402 }
403
404 for _ in 0..10 {
406 cache.get_node(1);
407 }
408
409 assert!(cache.get_node(1).is_some());
411 }
412}