ruvector_graph/optimization/
cache_hierarchy.rs

1//! Cache-optimized data layouts with hot/cold data separation
2//!
3//! This module implements cache-friendly storage patterns to minimize
4//! cache misses and maximize memory bandwidth utilization.
5
6use parking_lot::RwLock;
7use std::alloc::{alloc, dealloc, Layout};
8use std::sync::Arc;
9
10/// Cache line size (64 bytes on x86-64)
11const CACHE_LINE_SIZE: usize = 64;
12
13/// L1 cache size estimate (32KB typical)
14const L1_CACHE_SIZE: usize = 32 * 1024;
15
16/// L2 cache size estimate (256KB typical)
17const L2_CACHE_SIZE: usize = 256 * 1024;
18
19/// L3 cache size estimate (8MB typical)
20const L3_CACHE_SIZE: usize = 8 * 1024 * 1024;
21
22/// Cache hierarchy manager for graph data
23pub struct CacheHierarchy {
24    /// Hot data stored in L1-friendly layout
25    hot_storage: Arc<RwLock<HotStorage>>,
26    /// Cold data stored in compressed format
27    cold_storage: Arc<RwLock<ColdStorage>>,
28    /// Access frequency tracker
29    access_tracker: Arc<RwLock<AccessTracker>>,
30}
31
32impl CacheHierarchy {
33    /// Create a new cache hierarchy
34    pub fn new(hot_capacity: usize, cold_capacity: usize) -> Self {
35        Self {
36            hot_storage: Arc::new(RwLock::new(HotStorage::new(hot_capacity))),
37            cold_storage: Arc::new(RwLock::new(ColdStorage::new(cold_capacity))),
38            access_tracker: Arc::new(RwLock::new(AccessTracker::new())),
39        }
40    }
41
42    /// Access node data with automatic hot/cold promotion
43    pub fn get_node(&self, node_id: u64) -> Option<NodeData> {
44        // Record access
45        self.access_tracker.write().record_access(node_id);
46
47        // Try hot storage first
48        if let Some(data) = self.hot_storage.read().get(node_id) {
49            return Some(data);
50        }
51
52        // Fall back to cold storage
53        if let Some(data) = self.cold_storage.read().get(node_id) {
54            // Promote to hot if frequently accessed
55            if self.access_tracker.read().should_promote(node_id) {
56                self.promote_to_hot(node_id, data.clone());
57            }
58            return Some(data);
59        }
60
61        None
62    }
63
64    /// Insert node data with automatic placement
65    pub fn insert_node(&self, node_id: u64, data: NodeData) {
66        // Record initial access for the new node
67        self.access_tracker.write().record_access(node_id);
68
69        // Check if we need to evict before inserting (to avoid double eviction with HotStorage)
70        if self.hot_storage.read().is_at_capacity() {
71            self.evict_one_to_cold(node_id); // Don't evict the one we're about to insert
72        }
73
74        // New data goes to hot storage
75        self.hot_storage.write().insert(node_id, data.clone());
76    }
77
78    /// Promote node from cold to hot storage
79    fn promote_to_hot(&self, node_id: u64, data: NodeData) {
80        // First evict if needed to make room
81        if self.hot_storage.read().is_full() {
82            self.evict_one_to_cold(node_id); // Pass node_id to avoid evicting the one we're promoting
83        }
84
85        self.hot_storage.write().insert(node_id, data);
86        self.cold_storage.write().remove(node_id);
87    }
88
89    /// Evict least recently used hot data to cold storage
90    fn evict_cold(&self) {
91        let tracker = self.access_tracker.read();
92        let lru_nodes = tracker.get_lru_nodes_by_frequency(10);
93        drop(tracker);
94
95        let mut hot = self.hot_storage.write();
96        let mut cold = self.cold_storage.write();
97
98        for node_id in lru_nodes {
99            if let Some(data) = hot.remove(node_id) {
100                cold.insert(node_id, data);
101            }
102        }
103    }
104
105    /// Evict one node to cold storage, avoiding the protected node_id
106    fn evict_one_to_cold(&self, protected_id: u64) {
107        let tracker = self.access_tracker.read();
108        // Get nodes sorted by frequency (least frequently accessed first)
109        let candidates = tracker.get_lru_nodes_by_frequency(5);
110        drop(tracker);
111
112        let mut hot = self.hot_storage.write();
113        let mut cold = self.cold_storage.write();
114
115        for node_id in candidates {
116            if node_id != protected_id {
117                if let Some(data) = hot.remove(node_id) {
118                    cold.insert(node_id, data);
119                    return;
120                }
121            }
122        }
123    }
124
125    /// Prefetch nodes that are likely to be accessed soon
126    pub fn prefetch_neighbors(&self, node_ids: &[u64]) {
127        // Use software prefetching hints
128        for &node_id in node_ids {
129            #[cfg(target_arch = "x86_64")]
130            unsafe {
131                // Prefetch to L1 cache
132                std::arch::x86_64::_mm_prefetch(
133                    &node_id as *const u64 as *const i8,
134                    std::arch::x86_64::_MM_HINT_T0,
135                );
136            }
137        }
138    }
139}
140
141/// Hot storage with cache-line aligned entries
142#[repr(align(64))]
143struct HotStorage {
144    /// Cache-line aligned storage
145    entries: Vec<CacheLineEntry>,
146    /// Capacity in number of entries
147    capacity: usize,
148    /// Current size
149    size: usize,
150}
151
152impl HotStorage {
153    fn new(capacity: usize) -> Self {
154        Self {
155            entries: Vec::with_capacity(capacity),
156            capacity,
157            size: 0,
158        }
159    }
160
161    fn get(&self, node_id: u64) -> Option<NodeData> {
162        self.entries
163            .iter()
164            .find(|e| e.node_id == node_id)
165            .map(|e| e.data.clone())
166    }
167
168    fn insert(&mut self, node_id: u64, data: NodeData) {
169        // Remove old entry if exists
170        self.entries.retain(|e| e.node_id != node_id);
171
172        if self.entries.len() >= self.capacity {
173            self.entries.remove(0); // Simple FIFO eviction
174        }
175
176        self.entries.push(CacheLineEntry { node_id, data });
177        self.size = self.entries.len();
178    }
179
180    fn remove(&mut self, node_id: u64) -> Option<NodeData> {
181        if let Some(pos) = self.entries.iter().position(|e| e.node_id == node_id) {
182            let entry = self.entries.remove(pos);
183            self.size = self.entries.len();
184            Some(entry.data)
185        } else {
186            None
187        }
188    }
189
190    fn is_full(&self) -> bool {
191        self.size >= self.capacity
192    }
193
194    fn is_at_capacity(&self) -> bool {
195        self.size >= self.capacity
196    }
197}
198
199/// Cache-line aligned entry (64 bytes)
200#[repr(align(64))]
201#[derive(Clone)]
202struct CacheLineEntry {
203    node_id: u64,
204    data: NodeData,
205}
206
207/// Cold storage with compression
208struct ColdStorage {
209    /// Compressed data storage
210    entries: dashmap::DashMap<u64, Vec<u8>>,
211    capacity: usize,
212}
213
214impl ColdStorage {
215    fn new(capacity: usize) -> Self {
216        Self {
217            entries: dashmap::DashMap::new(),
218            capacity,
219        }
220    }
221
222    fn get(&self, node_id: u64) -> Option<NodeData> {
223        self.entries.get(&node_id).and_then(|compressed| {
224            // Decompress data using bincode 2.0 API
225            bincode::decode_from_slice(&compressed, bincode::config::standard())
226                .ok()
227                .map(|(data, _)| data)
228        })
229    }
230
231    fn insert(&mut self, node_id: u64, data: NodeData) {
232        // Compress data using bincode 2.0 API
233        if let Ok(compressed) = bincode::encode_to_vec(&data, bincode::config::standard()) {
234            self.entries.insert(node_id, compressed);
235        }
236    }
237
238    fn remove(&mut self, node_id: u64) -> Option<NodeData> {
239        self.entries.remove(&node_id).and_then(|(_, compressed)| {
240            bincode::decode_from_slice(&compressed, bincode::config::standard())
241                .ok()
242                .map(|(data, _)| data)
243        })
244    }
245}
246
247/// Access frequency tracker for hot/cold promotion
248struct AccessTracker {
249    /// Access counts per node
250    access_counts: dashmap::DashMap<u64, u32>,
251    /// Last access timestamp
252    last_access: dashmap::DashMap<u64, u64>,
253    /// Global timestamp
254    timestamp: u64,
255}
256
257impl AccessTracker {
258    fn new() -> Self {
259        Self {
260            access_counts: dashmap::DashMap::new(),
261            last_access: dashmap::DashMap::new(),
262            timestamp: 0,
263        }
264    }
265
266    fn record_access(&mut self, node_id: u64) {
267        self.timestamp += 1;
268
269        self.access_counts
270            .entry(node_id)
271            .and_modify(|count| *count += 1)
272            .or_insert(1);
273
274        self.last_access.insert(node_id, self.timestamp);
275    }
276
277    fn should_promote(&self, node_id: u64) -> bool {
278        // Promote if accessed more than 5 times
279        self.access_counts
280            .get(&node_id)
281            .map(|count| *count > 5)
282            .unwrap_or(false)
283    }
284
285    fn get_lru_nodes(&self, count: usize) -> Vec<u64> {
286        let mut nodes: Vec<_> = self
287            .last_access
288            .iter()
289            .map(|entry| (*entry.key(), *entry.value()))
290            .collect();
291
292        nodes.sort_by_key(|(_, timestamp)| *timestamp);
293        nodes
294            .into_iter()
295            .take(count)
296            .map(|(node_id, _)| node_id)
297            .collect()
298    }
299
300    /// Get least frequently accessed nodes (for smart eviction)
301    fn get_lru_nodes_by_frequency(&self, count: usize) -> Vec<u64> {
302        let mut nodes: Vec<_> = self
303            .access_counts
304            .iter()
305            .map(|entry| (*entry.key(), *entry.value()))
306            .collect();
307
308        // Sort by access count (ascending - least frequently accessed first)
309        nodes.sort_by_key(|(_, access_count)| *access_count);
310        nodes
311            .into_iter()
312            .take(count)
313            .map(|(node_id, _)| node_id)
314            .collect()
315    }
316}
317
318/// Node data structure
319#[derive(Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
320pub struct NodeData {
321    pub id: u64,
322    pub labels: Vec<String>,
323    pub properties: Vec<(String, CachePropertyValue)>,
324}
325
326/// Property value types for cache storage
327#[derive(Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
328pub enum CachePropertyValue {
329    String(String),
330    Integer(i64),
331    Float(f64),
332    Boolean(bool),
333}
334
335/// Hot/cold storage facade
336pub struct HotColdStorage {
337    cache_hierarchy: CacheHierarchy,
338}
339
340impl HotColdStorage {
341    pub fn new() -> Self {
342        Self {
343            cache_hierarchy: CacheHierarchy::new(1000, 10000),
344        }
345    }
346
347    pub fn get(&self, node_id: u64) -> Option<NodeData> {
348        self.cache_hierarchy.get_node(node_id)
349    }
350
351    pub fn insert(&self, node_id: u64, data: NodeData) {
352        self.cache_hierarchy.insert_node(node_id, data);
353    }
354
355    pub fn prefetch(&self, node_ids: &[u64]) {
356        self.cache_hierarchy.prefetch_neighbors(node_ids);
357    }
358}
359
360impl Default for HotColdStorage {
361    fn default() -> Self {
362        Self::new()
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    #[test]
370    fn test_cache_hierarchy() {
371        let cache = CacheHierarchy::new(10, 100);
372
373        let data = NodeData {
374            id: 1,
375            labels: vec!["Person".to_string()],
376            properties: vec![(
377                "name".to_string(),
378                CachePropertyValue::String("Alice".to_string()),
379            )],
380        };
381
382        cache.insert_node(1, data.clone());
383
384        let retrieved = cache.get_node(1);
385        assert!(retrieved.is_some());
386    }
387
388    #[test]
389    fn test_hot_cold_promotion() {
390        let cache = CacheHierarchy::new(2, 10);
391
392        // Insert 3 nodes (exceeds hot capacity)
393        for i in 1..=3 {
394            cache.insert_node(
395                i,
396                NodeData {
397                    id: i,
398                    labels: vec![],
399                    properties: vec![],
400                },
401            );
402        }
403
404        // Access node 1 multiple times to trigger promotion
405        for _ in 0..10 {
406            cache.get_node(1);
407        }
408
409        // Node 1 should still be accessible
410        assert!(cache.get_node(1).is_some());
411    }
412}