Skip to main content

ruvector_graph/optimization/
cache_hierarchy.rs

1//! Cache-optimized data layouts with hot/cold data separation
2//!
3//! This module implements cache-friendly storage patterns to minimize
4//! cache misses and maximize memory bandwidth utilization.
5
6use parking_lot::RwLock;
7use std::alloc::{alloc, dealloc, Layout};
8use std::sync::Arc;
9
10/// Cache line size (64 bytes on x86-64)
11const CACHE_LINE_SIZE: usize = 64;
12
13/// L1 cache size estimate (32KB typical)
14const L1_CACHE_SIZE: usize = 32 * 1024;
15
16/// L2 cache size estimate (256KB typical)
17const L2_CACHE_SIZE: usize = 256 * 1024;
18
19/// L3 cache size estimate (8MB typical)
20const L3_CACHE_SIZE: usize = 8 * 1024 * 1024;
21
22/// Cache hierarchy manager for graph data
23pub struct CacheHierarchy {
24    /// Hot data stored in L1-friendly layout
25    hot_storage: Arc<RwLock<HotStorage>>,
26    /// Cold data stored in compressed format
27    cold_storage: Arc<RwLock<ColdStorage>>,
28    /// Access frequency tracker
29    access_tracker: Arc<RwLock<AccessTracker>>,
30}
31
32impl CacheHierarchy {
33    /// Create a new cache hierarchy
34    pub fn new(hot_capacity: usize, cold_capacity: usize) -> Self {
35        Self {
36            hot_storage: Arc::new(RwLock::new(HotStorage::new(hot_capacity))),
37            cold_storage: Arc::new(RwLock::new(ColdStorage::new(cold_capacity))),
38            access_tracker: Arc::new(RwLock::new(AccessTracker::new())),
39        }
40    }
41
42    /// Access node data with automatic hot/cold promotion
43    pub fn get_node(&self, node_id: u64) -> Option<NodeData> {
44        // Record access
45        self.access_tracker.write().record_access(node_id);
46
47        // Try hot storage first
48        if let Some(data) = self.hot_storage.read().get(node_id) {
49            return Some(data);
50        }
51
52        // Fall back to cold storage. Read into an owned value and drop the
53        // read guard before calling `promote_to_hot`, which acquires
54        // `cold_storage.write()` — `parking_lot::RwLock` is not re-entrant,
55        // so holding the read guard across the write would deadlock.
56        let cold_data = self.cold_storage.read().get(node_id);
57        if let Some(data) = cold_data {
58            if self.access_tracker.read().should_promote(node_id) {
59                self.promote_to_hot(node_id, data.clone());
60            }
61            return Some(data);
62        }
63
64        None
65    }
66
67    /// Insert node data with automatic placement
68    pub fn insert_node(&self, node_id: u64, data: NodeData) {
69        // Record initial access for the new node
70        self.access_tracker.write().record_access(node_id);
71
72        // Check if we need to evict before inserting (to avoid double eviction with HotStorage)
73        if self.hot_storage.read().is_at_capacity() {
74            self.evict_one_to_cold(node_id); // Don't evict the one we're about to insert
75        }
76
77        // New data goes to hot storage
78        self.hot_storage.write().insert(node_id, data.clone());
79    }
80
81    /// Promote node from cold to hot storage
82    fn promote_to_hot(&self, node_id: u64, data: NodeData) {
83        // First evict if needed to make room
84        if self.hot_storage.read().is_full() {
85            self.evict_one_to_cold(node_id); // Pass node_id to avoid evicting the one we're promoting
86        }
87
88        self.hot_storage.write().insert(node_id, data);
89        self.cold_storage.write().remove(node_id);
90    }
91
92    /// Evict least recently used hot data to cold storage
93    fn evict_cold(&self) {
94        let tracker = self.access_tracker.read();
95        let lru_nodes = tracker.get_lru_nodes_by_frequency(10);
96        drop(tracker);
97
98        let mut hot = self.hot_storage.write();
99        let mut cold = self.cold_storage.write();
100
101        for node_id in lru_nodes {
102            if let Some(data) = hot.remove(node_id) {
103                cold.insert(node_id, data);
104            }
105        }
106    }
107
108    /// Evict one node to cold storage, avoiding the protected node_id
109    fn evict_one_to_cold(&self, protected_id: u64) {
110        let tracker = self.access_tracker.read();
111        // Get nodes sorted by frequency (least frequently accessed first)
112        let candidates = tracker.get_lru_nodes_by_frequency(5);
113        drop(tracker);
114
115        let mut hot = self.hot_storage.write();
116        let mut cold = self.cold_storage.write();
117
118        for node_id in candidates {
119            if node_id != protected_id {
120                if let Some(data) = hot.remove(node_id) {
121                    cold.insert(node_id, data);
122                    return;
123                }
124            }
125        }
126    }
127
128    /// Prefetch nodes that are likely to be accessed soon
129    pub fn prefetch_neighbors(&self, node_ids: &[u64]) {
130        // Use software prefetching hints
131        for &node_id in node_ids {
132            #[cfg(target_arch = "x86_64")]
133            unsafe {
134                // Prefetch to L1 cache
135                std::arch::x86_64::_mm_prefetch(
136                    &node_id as *const u64 as *const i8,
137                    std::arch::x86_64::_MM_HINT_T0,
138                );
139            }
140        }
141    }
142}
143
144/// Hot storage with cache-line aligned entries
145#[repr(align(64))]
146struct HotStorage {
147    /// Cache-line aligned storage
148    entries: Vec<CacheLineEntry>,
149    /// Capacity in number of entries
150    capacity: usize,
151    /// Current size
152    size: usize,
153}
154
155impl HotStorage {
156    fn new(capacity: usize) -> Self {
157        Self {
158            entries: Vec::with_capacity(capacity),
159            capacity,
160            size: 0,
161        }
162    }
163
164    fn get(&self, node_id: u64) -> Option<NodeData> {
165        self.entries
166            .iter()
167            .find(|e| e.node_id == node_id)
168            .map(|e| e.data.clone())
169    }
170
171    fn insert(&mut self, node_id: u64, data: NodeData) {
172        // Remove old entry if exists
173        self.entries.retain(|e| e.node_id != node_id);
174
175        if self.entries.len() >= self.capacity {
176            self.entries.remove(0); // Simple FIFO eviction
177        }
178
179        self.entries.push(CacheLineEntry { node_id, data });
180        self.size = self.entries.len();
181    }
182
183    fn remove(&mut self, node_id: u64) -> Option<NodeData> {
184        if let Some(pos) = self.entries.iter().position(|e| e.node_id == node_id) {
185            let entry = self.entries.remove(pos);
186            self.size = self.entries.len();
187            Some(entry.data)
188        } else {
189            None
190        }
191    }
192
193    fn is_full(&self) -> bool {
194        self.size >= self.capacity
195    }
196
197    fn is_at_capacity(&self) -> bool {
198        self.size >= self.capacity
199    }
200}
201
202/// Cache-line aligned entry (64 bytes)
203#[repr(align(64))]
204#[derive(Clone)]
205struct CacheLineEntry {
206    node_id: u64,
207    data: NodeData,
208}
209
210/// Cold storage with compression
211struct ColdStorage {
212    /// Compressed data storage
213    entries: dashmap::DashMap<u64, Vec<u8>>,
214    capacity: usize,
215}
216
217impl ColdStorage {
218    fn new(capacity: usize) -> Self {
219        Self {
220            entries: dashmap::DashMap::new(),
221            capacity,
222        }
223    }
224
225    fn get(&self, node_id: u64) -> Option<NodeData> {
226        self.entries.get(&node_id).and_then(|compressed| {
227            // Decompress data using bincode 2.0 API
228            bincode::decode_from_slice(&compressed, bincode::config::standard())
229                .ok()
230                .map(|(data, _)| data)
231        })
232    }
233
234    fn insert(&mut self, node_id: u64, data: NodeData) {
235        // Compress data using bincode 2.0 API
236        if let Ok(compressed) = bincode::encode_to_vec(&data, bincode::config::standard()) {
237            self.entries.insert(node_id, compressed);
238        }
239    }
240
241    fn remove(&mut self, node_id: u64) -> Option<NodeData> {
242        self.entries.remove(&node_id).and_then(|(_, compressed)| {
243            bincode::decode_from_slice(&compressed, bincode::config::standard())
244                .ok()
245                .map(|(data, _)| data)
246        })
247    }
248}
249
250/// Access frequency tracker for hot/cold promotion
251struct AccessTracker {
252    /// Access counts per node
253    access_counts: dashmap::DashMap<u64, u32>,
254    /// Last access timestamp
255    last_access: dashmap::DashMap<u64, u64>,
256    /// Global timestamp
257    timestamp: u64,
258}
259
260impl AccessTracker {
261    fn new() -> Self {
262        Self {
263            access_counts: dashmap::DashMap::new(),
264            last_access: dashmap::DashMap::new(),
265            timestamp: 0,
266        }
267    }
268
269    fn record_access(&mut self, node_id: u64) {
270        self.timestamp += 1;
271
272        self.access_counts
273            .entry(node_id)
274            .and_modify(|count| *count += 1)
275            .or_insert(1);
276
277        self.last_access.insert(node_id, self.timestamp);
278    }
279
280    fn should_promote(&self, node_id: u64) -> bool {
281        // Promote if accessed more than 5 times
282        self.access_counts
283            .get(&node_id)
284            .map(|count| *count > 5)
285            .unwrap_or(false)
286    }
287
288    fn get_lru_nodes(&self, count: usize) -> Vec<u64> {
289        let mut nodes: Vec<_> = self
290            .last_access
291            .iter()
292            .map(|entry| (*entry.key(), *entry.value()))
293            .collect();
294
295        nodes.sort_by_key(|(_, timestamp)| *timestamp);
296        nodes
297            .into_iter()
298            .take(count)
299            .map(|(node_id, _)| node_id)
300            .collect()
301    }
302
303    /// Get least frequently accessed nodes (for smart eviction)
304    fn get_lru_nodes_by_frequency(&self, count: usize) -> Vec<u64> {
305        let mut nodes: Vec<_> = self
306            .access_counts
307            .iter()
308            .map(|entry| (*entry.key(), *entry.value()))
309            .collect();
310
311        // Sort by access count (ascending - least frequently accessed first)
312        nodes.sort_by_key(|(_, access_count)| *access_count);
313        nodes
314            .into_iter()
315            .take(count)
316            .map(|(node_id, _)| node_id)
317            .collect()
318    }
319}
320
321/// Node data structure
322#[derive(Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
323pub struct NodeData {
324    pub id: u64,
325    pub labels: Vec<String>,
326    pub properties: Vec<(String, CachePropertyValue)>,
327}
328
329/// Property value types for cache storage
330#[derive(Clone, serde::Serialize, serde::Deserialize, bincode::Encode, bincode::Decode)]
331pub enum CachePropertyValue {
332    String(String),
333    Integer(i64),
334    Float(f64),
335    Boolean(bool),
336}
337
338/// Hot/cold storage facade
339pub struct HotColdStorage {
340    cache_hierarchy: CacheHierarchy,
341}
342
343impl HotColdStorage {
344    pub fn new() -> Self {
345        Self {
346            cache_hierarchy: CacheHierarchy::new(1000, 10000),
347        }
348    }
349
350    pub fn get(&self, node_id: u64) -> Option<NodeData> {
351        self.cache_hierarchy.get_node(node_id)
352    }
353
354    pub fn insert(&self, node_id: u64, data: NodeData) {
355        self.cache_hierarchy.insert_node(node_id, data);
356    }
357
358    pub fn prefetch(&self, node_ids: &[u64]) {
359        self.cache_hierarchy.prefetch_neighbors(node_ids);
360    }
361}
362
363impl Default for HotColdStorage {
364    fn default() -> Self {
365        Self::new()
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use super::*;
372    #[test]
373    fn test_cache_hierarchy() {
374        let cache = CacheHierarchy::new(10, 100);
375
376        let data = NodeData {
377            id: 1,
378            labels: vec!["Person".to_string()],
379            properties: vec![(
380                "name".to_string(),
381                CachePropertyValue::String("Alice".to_string()),
382            )],
383        };
384
385        cache.insert_node(1, data.clone());
386
387        let retrieved = cache.get_node(1);
388        assert!(retrieved.is_some());
389    }
390
391    #[test]
392    fn test_hot_cold_promotion() {
393        let cache = CacheHierarchy::new(2, 10);
394
395        // Insert 3 nodes (exceeds hot capacity)
396        for i in 1..=3 {
397            cache.insert_node(
398                i,
399                NodeData {
400                    id: i,
401                    labels: vec![],
402                    properties: vec![],
403                },
404            );
405        }
406
407        // Access node 1 multiple times to trigger promotion
408        for _ in 0..10 {
409            cache.get_node(1);
410        }
411
412        // Node 1 should still be accessible
413        assert!(cache.get_node(1).is_some());
414    }
415}