Skip to main content

heliosdb_proxy/distribcache/
heatmap.rs

1//! Cache heatmap analytics
2//!
3//! Provides visual cache utilization metrics and optimization recommendations.
4
5use dashmap::DashMap;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::RwLock;
9use std::time::Duration;
10
11use super::QueryFingerprint;
12
13/// Access statistics for a table or query
14#[derive(Debug)]
15pub struct AccessStats {
16    /// Total hits
17    pub hits: AtomicU64,
18    /// Total misses
19    pub misses: AtomicU64,
20    /// Total time saved in microseconds
21    pub total_time_saved_us: AtomicU64,
22    /// Last access timestamp (Unix nanos)
23    pub last_access: AtomicU64,
24}
25
26impl Default for AccessStats {
27    fn default() -> Self {
28        Self {
29            hits: AtomicU64::new(0),
30            misses: AtomicU64::new(0),
31            total_time_saved_us: AtomicU64::new(0),
32            last_access: AtomicU64::new(0),
33        }
34    }
35}
36
37impl AccessStats {
38    fn record_hit(&self, time_saved: Duration) {
39        self.hits.fetch_add(1, Ordering::Relaxed);
40        self.total_time_saved_us.fetch_add(
41            time_saved.as_micros() as u64,
42            Ordering::Relaxed,
43        );
44        self.update_last_access();
45    }
46
47    fn record_miss(&self) {
48        self.misses.fetch_add(1, Ordering::Relaxed);
49        self.update_last_access();
50    }
51
52    fn update_last_access(&self) {
53        let now = std::time::SystemTime::now()
54            .duration_since(std::time::SystemTime::UNIX_EPOCH)
55            .unwrap_or_default()
56            .as_nanos() as u64;
57        self.last_access.store(now, Ordering::Relaxed);
58    }
59
60    fn hit_ratio(&self) -> f64 {
61        let hits = self.hits.load(Ordering::Relaxed);
62        let misses = self.misses.load(Ordering::Relaxed);
63        let total = hits + misses;
64        if total > 0 {
65            hits as f64 / total as f64
66        } else {
67            0.0
68        }
69    }
70
71    fn total_accesses(&self) -> u64 {
72        self.hits.load(Ordering::Relaxed) + self.misses.load(Ordering::Relaxed)
73    }
74}
75
76/// Time bucket for time-series data
77#[derive(Debug, Clone)]
78pub struct TimeBucket {
79    /// Bucket start time (Unix timestamp)
80    pub start: u64,
81    /// Bucket end time (Unix timestamp)
82    pub end: u64,
83    /// Accesses per table
84    pub accesses: HashMap<String, u64>,
85    /// Hit ratio for this bucket
86    pub hit_ratio: f64,
87}
88
89/// Table heat information
90#[derive(Debug, Clone)]
91pub struct TableHeat {
92    /// Table name
93    pub name: String,
94    /// Total accesses
95    pub total_accesses: u64,
96    /// Hit ratio
97    pub hit_ratio: f64,
98    /// Time saved in milliseconds
99    pub time_saved_ms: u64,
100    /// Temperature classification
101    pub temperature: Temperature,
102}
103
104/// Temperature classification
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub enum Temperature {
107    /// Very frequently accessed
108    Hot,
109    /// Moderately accessed
110    Warm,
111    /// Infrequently accessed
112    Cold,
113    /// Rarely accessed
114    Frozen,
115}
116
117/// Priority level for recommendations
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub enum Priority {
120    High,
121    Medium,
122    Low,
123}
124
125/// Cache optimization recommendation
126#[derive(Debug, Clone)]
127pub struct Recommendation {
128    /// Target table
129    pub table: String,
130    /// Issue description
131    pub issue: String,
132    /// Suggestion for improvement
133    pub suggestion: String,
134    /// Priority level
135    pub priority: Priority,
136}
137
138/// Heatmap visualization data
139#[derive(Debug, Clone)]
140pub struct HeatmapData {
141    /// Per-table heat information
142    pub tables: Vec<TableHeat>,
143    /// Time series data
144    pub time_series: Vec<TimeBucket>,
145    /// Optimization recommendations
146    pub recommendations: Vec<Recommendation>,
147}
148
149/// Cache heatmap analytics
150pub struct CacheHeatmap {
151    /// Access stats per table
152    table_accesses: DashMap<String, AccessStats>,
153
154    /// Access stats per query fingerprint
155    query_accesses: DashMap<QueryFingerprint, AccessStats>,
156
157    /// Time-bucketed data
158    time_buckets: RwLock<Vec<TimeBucket>>,
159
160    /// Current bucket
161    current_bucket: RwLock<TimeBucket>,
162
163    /// Bucket size in seconds
164    bucket_size_secs: u64,
165
166    /// Maximum buckets to retain
167    max_buckets: usize,
168}
169
170impl CacheHeatmap {
171    /// Create a new heatmap
172    pub fn new() -> Self {
173        let now = std::time::SystemTime::now()
174            .duration_since(std::time::SystemTime::UNIX_EPOCH)
175            .unwrap_or_default()
176            .as_secs();
177
178        Self {
179            table_accesses: DashMap::new(),
180            query_accesses: DashMap::new(),
181            time_buckets: RwLock::new(Vec::new()),
182            current_bucket: RwLock::new(TimeBucket {
183                start: now,
184                end: now + 300, // 5 minute default
185                accesses: HashMap::new(),
186                hit_ratio: 0.0,
187            }),
188            bucket_size_secs: 300,
189            max_buckets: 2016, // 7 days at 5-minute buckets
190        }
191    }
192
193    /// Record a cache access
194    pub fn record_access(&self, fingerprint: &QueryFingerprint, hit: bool, time_saved: Duration) {
195        // Update table stats
196        for table in &fingerprint.tables {
197            let stats = self.table_accesses
198                .entry(table.clone())
199                .or_default();
200
201            if hit {
202                stats.record_hit(time_saved);
203            } else {
204                stats.record_miss();
205            }
206        }
207
208        // Update query stats
209        let query_stats = self.query_accesses
210            .entry(fingerprint.clone())
211            .or_default();
212
213        if hit {
214            query_stats.record_hit(time_saved);
215        } else {
216            query_stats.record_miss();
217        }
218
219        // Update time bucket
220        self.update_time_bucket(&fingerprint.tables, hit);
221    }
222
223    /// Update time bucket
224    fn update_time_bucket(&self, tables: &[String], _hit: bool) {
225        let now = std::time::SystemTime::now()
226            .duration_since(std::time::SystemTime::UNIX_EPOCH)
227            .unwrap_or_default()
228            .as_secs();
229
230        let mut current = self.current_bucket.write().unwrap();
231
232        // Check if we need to roll to a new bucket
233        if now >= current.end {
234            // Finalize current bucket
235            let mut buckets = self.time_buckets.write().unwrap();
236
237            // Calculate hit ratio for completed bucket
238            let total_hits: u64 = self.table_accesses.iter()
239                .map(|e| e.value().hits.load(Ordering::Relaxed))
240                .sum();
241            let total_misses: u64 = self.table_accesses.iter()
242                .map(|e| e.value().misses.load(Ordering::Relaxed))
243                .sum();
244
245            let total = total_hits + total_misses;
246            current.hit_ratio = if total > 0 {
247                total_hits as f64 / total as f64
248            } else {
249                0.0
250            };
251
252            buckets.push(current.clone());
253
254            // Trim old buckets
255            while buckets.len() > self.max_buckets {
256                buckets.remove(0);
257            }
258
259            // Create new bucket
260            *current = TimeBucket {
261                start: now,
262                end: now + self.bucket_size_secs,
263                accesses: HashMap::new(),
264                hit_ratio: 0.0,
265            };
266        }
267
268        // Update current bucket
269        for table in tables {
270            *current.accesses.entry(table.clone()).or_default() += 1;
271        }
272    }
273
274    /// Calculate temperature from access count
275    fn calculate_temperature(&self, accesses: u64) -> Temperature {
276        // Get percentiles from all tables
277        let mut all_accesses: Vec<u64> = self.table_accesses.iter()
278            .map(|e| e.value().total_accesses())
279            .collect();
280        all_accesses.sort();
281
282        if all_accesses.is_empty() {
283            return Temperature::Cold;
284        }
285
286        let p75 = all_accesses.get(all_accesses.len() * 3 / 4).copied().unwrap_or(0);
287        let p50 = all_accesses.get(all_accesses.len() / 2).copied().unwrap_or(0);
288        let p25 = all_accesses.get(all_accesses.len() / 4).copied().unwrap_or(0);
289
290        if accesses >= p75 {
291            Temperature::Hot
292        } else if accesses >= p50 {
293            Temperature::Warm
294        } else if accesses >= p25 {
295            Temperature::Cold
296        } else {
297            Temperature::Frozen
298        }
299    }
300
301    /// Generate heatmap visualization data
302    pub fn generate_heatmap(&self) -> HeatmapData {
303        let mut tables: Vec<TableHeat> = self.table_accesses.iter()
304            .map(|entry| {
305                let stats = entry.value();
306                let hits = stats.hits.load(Ordering::Relaxed);
307                let misses = stats.misses.load(Ordering::Relaxed);
308                let total = hits + misses;
309
310                TableHeat {
311                    name: entry.key().clone(),
312                    total_accesses: total,
313                    hit_ratio: stats.hit_ratio(),
314                    time_saved_ms: stats.total_time_saved_us.load(Ordering::Relaxed) / 1000,
315                    temperature: self.calculate_temperature(total),
316                }
317            })
318            .collect();
319
320        // Sort by total accesses (descending)
321        tables.sort_by(|a, b| b.total_accesses.cmp(&a.total_accesses));
322
323        let time_series = self.get_time_series();
324        let recommendations = self.generate_recommendations();
325
326        HeatmapData {
327            tables,
328            time_series,
329            recommendations,
330        }
331    }
332
333    /// Get time series data
334    fn get_time_series(&self) -> Vec<TimeBucket> {
335        let buckets = self.time_buckets.read().unwrap();
336        buckets.clone()
337    }
338
339    /// Generate optimization recommendations
340    fn generate_recommendations(&self) -> Vec<Recommendation> {
341        let mut recs = Vec::new();
342
343        for entry in self.table_accesses.iter() {
344            let table = entry.key();
345            let stats = entry.value();
346            let hits = stats.hits.load(Ordering::Relaxed);
347            let misses = stats.misses.load(Ordering::Relaxed);
348            let total = hits + misses;
349
350            if total < 100 {
351                continue; // Not enough data
352            }
353
354            let hit_ratio = stats.hit_ratio();
355
356            // Low hit ratio recommendation
357            if hit_ratio < 0.5 {
358                recs.push(Recommendation {
359                    table: table.clone(),
360                    issue: "Low cache hit ratio".to_string(),
361                    suggestion: format!(
362                        "Consider increasing TTL or cache size for '{}' (current hit ratio: {:.1}%)",
363                        table,
364                        hit_ratio * 100.0
365                    ),
366                    priority: Priority::High,
367                });
368            }
369
370            // Cold data in cache recommendation
371            let last_access = stats.last_access.load(Ordering::Relaxed);
372            if last_access > 0 {
373                let now = std::time::SystemTime::now()
374                    .duration_since(std::time::SystemTime::UNIX_EPOCH)
375                    .unwrap_or_default()
376                    .as_nanos() as u64;
377
378                let age_secs = (now - last_access) / 1_000_000_000;
379
380                if age_secs > 3600 && total < 1000 {
381                    recs.push(Recommendation {
382                        table: table.clone(),
383                        issue: "Cold data in cache".to_string(),
384                        suggestion: format!(
385                            "'{}' hasn't been accessed in {} minutes, consider reducing TTL",
386                            table,
387                            age_secs / 60
388                        ),
389                        priority: Priority::Medium,
390                    });
391                }
392            }
393        }
394
395        recs
396    }
397
398    /// Clear all heatmap data
399    pub fn clear(&self) {
400        self.table_accesses.clear();
401        self.query_accesses.clear();
402        self.time_buckets.write().unwrap().clear();
403    }
404}
405
406impl Default for CacheHeatmap {
407    fn default() -> Self {
408        Self::new()
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415
416    #[test]
417    fn test_access_stats() {
418        let stats = AccessStats::default();
419
420        stats.record_hit(Duration::from_millis(10));
421        stats.record_hit(Duration::from_millis(20));
422        stats.record_miss();
423
424        assert_eq!(stats.hits.load(Ordering::Relaxed), 2);
425        assert_eq!(stats.misses.load(Ordering::Relaxed), 1);
426        assert!((stats.hit_ratio() - 0.666).abs() < 0.01);
427    }
428
429    #[test]
430    fn test_record_access() {
431        let heatmap = CacheHeatmap::new();
432        let fp = QueryFingerprint::from_query("SELECT * FROM users");
433
434        heatmap.record_access(&fp, true, Duration::from_millis(10));
435        heatmap.record_access(&fp, true, Duration::from_millis(15));
436        heatmap.record_access(&fp, false, Duration::ZERO);
437
438        let data = heatmap.generate_heatmap();
439        assert!(!data.tables.is_empty());
440
441        let users_heat = data.tables.iter()
442            .find(|t| t.name == "USERS")
443            .unwrap();
444
445        assert_eq!(users_heat.total_accesses, 3);
446        assert!((users_heat.hit_ratio - 0.666).abs() < 0.01);
447    }
448
449    #[test]
450    fn test_temperature_classification() {
451        let heatmap = CacheHeatmap::new();
452
453        // Add varying access patterns
454        for i in 0..100 {
455            let fp = QueryFingerprint::from_query(&format!("SELECT * FROM table_{}", i % 10));
456            for _ in 0..(i * 10) {
457                heatmap.record_access(&fp, true, Duration::from_millis(1));
458            }
459        }
460
461        let data = heatmap.generate_heatmap();
462
463        // Should have hot, warm, cold, and frozen tables
464        let temps: Vec<_> = data.tables.iter().map(|t| t.temperature).collect();
465        assert!(temps.contains(&Temperature::Hot));
466    }
467
468    #[test]
469    fn test_recommendations() {
470        let heatmap = CacheHeatmap::new();
471
472        // Create low hit ratio scenario
473        let fp = QueryFingerprint::from_query("SELECT * FROM slow_table");
474        for _ in 0..50 {
475            heatmap.record_access(&fp, true, Duration::from_millis(1));
476        }
477        for _ in 0..150 {
478            heatmap.record_access(&fp, false, Duration::ZERO);
479        }
480
481        let data = heatmap.generate_heatmap();
482
483        // Should have a recommendation for low hit ratio
484        assert!(!data.recommendations.is_empty());
485        let rec = data.recommendations.iter()
486            .find(|r| r.issue.contains("hit ratio"));
487        assert!(rec.is_some());
488    }
489}