ruvector_graph/optimization/
cache_hierarchy.rs1use parking_lot::RwLock;
7use std::alloc::{alloc, dealloc, Layout};
8use std::sync::Arc;
9
10const CACHE_LINE_SIZE: usize = 64;
12
13const L1_CACHE_SIZE: usize = 32 * 1024;
15
16const L2_CACHE_SIZE: usize = 256 * 1024;
18
19const L3_CACHE_SIZE: usize = 8 * 1024 * 1024;
21
22pub struct CacheHierarchy {
24 hot_storage: Arc<RwLock<HotStorage>>,
26 cold_storage: Arc<RwLock<ColdStorage>>,
28 access_tracker: Arc<RwLock<AccessTracker>>,
30}
31
32impl CacheHierarchy {
33 pub fn new(hot_capacity: usize, cold_capacity: usize) -> Self {
35 Self {
36 hot_storage: Arc::new(RwLock::new(HotStorage::new(hot_capacity))),
37 cold_storage: Arc::new(RwLock::new(ColdStorage::new(cold_capacity))),
38 access_tracker: Arc::new(RwLock::new(AccessTracker::new())),
39 }
40 }
41
42 pub fn get_node(&self, node_id: u64) -> Option<NodeData> {
44 self.access_tracker.write().record_access(node_id);
46
47 if let Some(data) = self.hot_storage.read().get(node_id) {
49 return Some(data);
50 }
51
52 let cold_data = self.cold_storage.read().get(node_id);
57 if let Some(data) = cold_data {
58 if self.access_tracker.read().should_promote(node_id) {
59 self.promote_to_hot(node_id, data.clone());
60 }
61 return Some(data);
62 }
63
64 None
65 }
66
67 pub fn insert_node(&self, node_id: u64, data: NodeData) {
69 self.access_tracker.write().record_access(node_id);
71
72 if self.hot_storage.read().is_at_capacity() {
74 self.evict_one_to_cold(node_id); }
76
77 self.hot_storage.write().insert(node_id, data.clone());
79 }
80
81 fn promote_to_hot(&self, node_id: u64, data: NodeData) {
83 if self.hot_storage.read().is_full() {
85 self.evict_one_to_cold(node_id); }
87
88 self.hot_storage.write().insert(node_id, data);
89 self.cold_storage.write().remove(node_id);
90 }
91
92 fn evict_cold(&self) {
94 let tracker = self.access_tracker.read();
95 let lru_nodes = tracker.get_lru_nodes_by_frequency(10);
96 drop(tracker);
97
98 let mut hot = self.hot_storage.write();
99 let mut cold = self.cold_storage.write();
100
101 for node_id in lru_nodes {
102 if let Some(data) = hot.remove(node_id) {
103 cold.insert(node_id, data);
104 }
105 }
106 }
107
108 fn evict_one_to_cold(&self, protected_id: u64) {
110 let tracker = self.access_tracker.read();
111 let candidates = tracker.get_lru_nodes_by_frequency(5);
113 drop(tracker);
114
115 let mut hot = self.hot_storage.write();
116 let mut cold = self.cold_storage.write();
117
118 for node_id in candidates {
119 if node_id != protected_id {
120 if let Some(data) = hot.remove(node_id) {
121 cold.insert(node_id, data);
122 return;
123 }
124 }
125 }
126 }
127
128 pub fn prefetch_neighbors(&self, node_ids: &[u64]) {
130 for &node_id in node_ids {
132 #[cfg(target_arch = "x86_64")]
133 unsafe {
134 std::arch::x86_64::_mm_prefetch(
136 &node_id as *const u64 as *const i8,
137 std::arch::x86_64::_MM_HINT_T0,
138 );
139 }
140 }
141 }
142}
143
144#[repr(align(64))]
146struct HotStorage {
147 entries: Vec<CacheLineEntry>,
149 capacity: usize,
151 size: usize,
153}
154
155impl HotStorage {
156 fn new(capacity: usize) -> Self {
157 Self {
158 entries: Vec::with_capacity(capacity),
159 capacity,
160 size: 0,
161 }
162 }
163
164 fn get(&self, node_id: u64) -> Option<NodeData> {
165 self.entries
166 .iter()
167 .find(|e| e.node_id == node_id)
168 .map(|e| e.data.clone())
169 }
170
171 fn insert(&mut self, node_id: u64, data: NodeData) {
172 self.entries.retain(|e| e.node_id != node_id);
174
175 if self.entries.len() >= self.capacity {
176 self.entries.remove(0); }
178
179 self.entries.push(CacheLineEntry { node_id, data });
180 self.size = self.entries.len();
181 }
182
183 fn remove(&mut self, node_id: u64) -> Option<NodeData> {
184 if let Some(pos) = self.entries.iter().position(|e| e.node_id == node_id) {
185 let entry = self.entries.remove(pos);
186 self.size = self.entries.len();
187 Some(entry.data)
188 } else {
189 None
190 }
191 }
192
193 fn is_full(&self) -> bool {
194 self.size >= self.capacity
195 }
196
197 fn is_at_capacity(&self) -> bool {
198 self.size >= self.capacity
199 }
200}
201
202#[repr(align(64))]
204#[derive(Clone)]
205struct CacheLineEntry {
206 node_id: u64,
207 data: NodeData,
208}
209
210struct ColdStorage {
212 entries: dashmap::DashMap<u64, Vec<u8>>,
214 capacity: usize,
215}
216
217impl ColdStorage {
218 fn new(capacity: usize) -> Self {
219 Self {
220 entries: dashmap::DashMap::new(),
221 capacity,
222 }
223 }
224
225 fn get(&self, node_id: u64) -> Option<NodeData> {
226 self.entries.get(&node_id).and_then(|compressed| {
227 bincode::decode_from_slice(&compressed, bincode::config::standard())
229 .ok()
230 .map(|(data, _)| data)
231 })
232 }
233
234 fn insert(&mut self, node_id: u64, data: NodeData) {
235 if let Ok(compressed) = bincode::encode_to_vec(&data, bincode::config::standard()) {
237 self.entries.insert(node_id, compressed);
238 }
239 }
240
241 fn remove(&mut self, node_id: u64) -> Option<NodeData> {
242 self.entries.remove(&node_id).and_then(|(_, compressed)| {
243 bincode::decode_from_slice(&compressed, bincode::config::standard())
244 .ok()
245 .map(|(data, _)| data)
246 })
247 }
248}
249
250struct AccessTracker {
252 access_counts: dashmap::DashMap<u64, u32>,
254 last_access: dashmap::DashMap<u64, u64>,
256 timestamp: u64,
258}
259
260impl AccessTracker {
261 fn new() -> Self {
262 Self {
263 access_counts: dashmap::DashMap::new(),
264 last_access: dashmap::DashMap::new(),
265 timestamp: 0,
266 }
267 }
268
269 fn record_access(&mut self, node_id: u64) {
270 self.timestamp += 1;
271
272 self.access_counts
273 .entry(node_id)
274 .and_modify(|count| *count += 1)
275 .or_insert(1);
276
277 self.last_access.insert(node_id, self.timestamp);
278 }
279
280 fn should_promote(&self, node_id: u64) -> bool {
281 self.access_counts
283 .get(&node_id)
284 .map(|count| *count > 5)
285 .unwrap_or(false)
286 }
287
288 fn get_lru_nodes(&self, count: usize) -> Vec<u64> {
289 let mut nodes: Vec<_> = self
290 .last_access
291 .iter()
292 .map(|entry| (*entry.key(), *entry.value()))
293 .collect();
294
295 nodes.sort_by_key(|(_, timestamp)| *timestamp);
296 nodes
297 .into_iter()
298 .take(count)
299 .map(|(node_id, _)| node_id)
300 .collect()
301 }
302
303 fn get_lru_nodes_by_frequency(&self, count: usize) -> Vec<u64> {
305 let mut nodes: Vec<_> = self
306 .access_counts
307 .iter()
308 .map(|entry| (*entry.key(), *entry.value()))
309 .collect();
310
311 nodes.sort_by_key(|(_, access_count)| *access_count);
313 nodes
314 .into_iter()
315 .take(count)
316 .map(|(node_id, _)| node_id)
317 .collect()
318 }
319}
320
321#[derive(Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
323pub struct NodeData {
324 pub id: u64,
325 pub labels: Vec<String>,
326 pub properties: Vec<(String, CachePropertyValue)>,
327}
328
329#[derive(Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
331pub enum CachePropertyValue {
332 String(String),
333 Integer(i64),
334 Float(f64),
335 Boolean(bool),
336}
337
338pub struct HotColdStorage {
340 cache_hierarchy: CacheHierarchy,
341}
342
343impl HotColdStorage {
344 pub fn new() -> Self {
345 Self {
346 cache_hierarchy: CacheHierarchy::new(1000, 10000),
347 }
348 }
349
350 pub fn get(&self, node_id: u64) -> Option<NodeData> {
351 self.cache_hierarchy.get_node(node_id)
352 }
353
354 pub fn insert(&self, node_id: u64, data: NodeData) {
355 self.cache_hierarchy.insert_node(node_id, data);
356 }
357
358 pub fn prefetch(&self, node_ids: &[u64]) {
359 self.cache_hierarchy.prefetch_neighbors(node_ids);
360 }
361}
362
363impl Default for HotColdStorage {
364 fn default() -> Self {
365 Self::new()
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 #[test]
373 fn test_cache_hierarchy() {
374 let cache = CacheHierarchy::new(10, 100);
375
376 let data = NodeData {
377 id: 1,
378 labels: vec!["Person".to_string()],
379 properties: vec![(
380 "name".to_string(),
381 CachePropertyValue::String("Alice".to_string()),
382 )],
383 };
384
385 cache.insert_node(1, data.clone());
386
387 let retrieved = cache.get_node(1);
388 assert!(retrieved.is_some());
389 }
390
391 #[test]
392 fn test_hot_cold_promotion() {
393 let cache = CacheHierarchy::new(2, 10);
394
395 for i in 1..=3 {
397 cache.insert_node(
398 i,
399 NodeData {
400 id: i,
401 labels: vec![],
402 properties: vec![],
403 },
404 );
405 }
406
407 for _ in 0..10 {
409 cache.get_node(1);
410 }
411
412 assert!(cache.get_node(1).is_some());
414 }
415}