Skip to main content

heliosdb_proxy/distribcache/
heatmap.rs

1//! Cache heatmap analytics
2//!
3//! Provides visual cache utilization metrics and optimization recommendations.
4
5use dashmap::DashMap;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::RwLock;
9use std::time::Duration;
10
11use super::QueryFingerprint;
12
13/// Access statistics for a table or query
14#[derive(Debug)]
15pub struct AccessStats {
16    /// Total hits
17    pub hits: AtomicU64,
18    /// Total misses
19    pub misses: AtomicU64,
20    /// Total time saved in microseconds
21    pub total_time_saved_us: AtomicU64,
22    /// Last access timestamp (Unix nanos)
23    pub last_access: AtomicU64,
24}
25
26impl Default for AccessStats {
27    fn default() -> Self {
28        Self {
29            hits: AtomicU64::new(0),
30            misses: AtomicU64::new(0),
31            total_time_saved_us: AtomicU64::new(0),
32            last_access: AtomicU64::new(0),
33        }
34    }
35}
36
37impl AccessStats {
38    fn record_hit(&self, time_saved: Duration) {
39        self.hits.fetch_add(1, Ordering::Relaxed);
40        self.total_time_saved_us
41            .fetch_add(time_saved.as_micros() as u64, Ordering::Relaxed);
42        self.update_last_access();
43    }
44
45    fn record_miss(&self) {
46        self.misses.fetch_add(1, Ordering::Relaxed);
47        self.update_last_access();
48    }
49
50    fn update_last_access(&self) {
51        let now = std::time::SystemTime::now()
52            .duration_since(std::time::SystemTime::UNIX_EPOCH)
53            .unwrap_or_default()
54            .as_nanos() as u64;
55        self.last_access.store(now, Ordering::Relaxed);
56    }
57
58    fn hit_ratio(&self) -> f64 {
59        let hits = self.hits.load(Ordering::Relaxed);
60        let misses = self.misses.load(Ordering::Relaxed);
61        let total = hits + misses;
62        if total > 0 {
63            hits as f64 / total as f64
64        } else {
65            0.0
66        }
67    }
68
69    fn total_accesses(&self) -> u64 {
70        self.hits.load(Ordering::Relaxed) + self.misses.load(Ordering::Relaxed)
71    }
72}
73
74/// Time bucket for time-series data
75#[derive(Debug, Clone)]
76pub struct TimeBucket {
77    /// Bucket start time (Unix timestamp)
78    pub start: u64,
79    /// Bucket end time (Unix timestamp)
80    pub end: u64,
81    /// Accesses per table
82    pub accesses: HashMap<String, u64>,
83    /// Hit ratio for this bucket
84    pub hit_ratio: f64,
85}
86
87/// Table heat information
88#[derive(Debug, Clone)]
89pub struct TableHeat {
90    /// Table name
91    pub name: String,
92    /// Total accesses
93    pub total_accesses: u64,
94    /// Hit ratio
95    pub hit_ratio: f64,
96    /// Time saved in milliseconds
97    pub time_saved_ms: u64,
98    /// Temperature classification
99    pub temperature: Temperature,
100}
101
102/// Temperature classification
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub enum Temperature {
105    /// Very frequently accessed
106    Hot,
107    /// Moderately accessed
108    Warm,
109    /// Infrequently accessed
110    Cold,
111    /// Rarely accessed
112    Frozen,
113}
114
115/// Priority level for recommendations
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum Priority {
118    High,
119    Medium,
120    Low,
121}
122
123/// Cache optimization recommendation
124#[derive(Debug, Clone)]
125pub struct Recommendation {
126    /// Target table
127    pub table: String,
128    /// Issue description
129    pub issue: String,
130    /// Suggestion for improvement
131    pub suggestion: String,
132    /// Priority level
133    pub priority: Priority,
134}
135
136/// Heatmap visualization data
137#[derive(Debug, Clone)]
138pub struct HeatmapData {
139    /// Per-table heat information
140    pub tables: Vec<TableHeat>,
141    /// Time series data
142    pub time_series: Vec<TimeBucket>,
143    /// Optimization recommendations
144    pub recommendations: Vec<Recommendation>,
145}
146
147/// Cache heatmap analytics
148pub struct CacheHeatmap {
149    /// Access stats per table
150    table_accesses: DashMap<String, AccessStats>,
151
152    /// Access stats per query fingerprint
153    query_accesses: DashMap<QueryFingerprint, AccessStats>,
154
155    /// Time-bucketed data
156    time_buckets: RwLock<Vec<TimeBucket>>,
157
158    /// Current bucket
159    current_bucket: RwLock<TimeBucket>,
160
161    /// Bucket size in seconds
162    bucket_size_secs: u64,
163
164    /// Maximum buckets to retain
165    max_buckets: usize,
166}
167
168impl CacheHeatmap {
169    /// Create a new heatmap
170    pub fn new() -> Self {
171        let now = std::time::SystemTime::now()
172            .duration_since(std::time::SystemTime::UNIX_EPOCH)
173            .unwrap_or_default()
174            .as_secs();
175
176        Self {
177            table_accesses: DashMap::new(),
178            query_accesses: DashMap::new(),
179            time_buckets: RwLock::new(Vec::new()),
180            current_bucket: RwLock::new(TimeBucket {
181                start: now,
182                end: now + 300, // 5 minute default
183                accesses: HashMap::new(),
184                hit_ratio: 0.0,
185            }),
186            bucket_size_secs: 300,
187            max_buckets: 2016, // 7 days at 5-minute buckets
188        }
189    }
190
191    /// Record a cache access
192    pub fn record_access(&self, fingerprint: &QueryFingerprint, hit: bool, time_saved: Duration) {
193        // Update table stats
194        for table in &fingerprint.tables {
195            let stats = self.table_accesses.entry(table.clone()).or_default();
196
197            if hit {
198                stats.record_hit(time_saved);
199            } else {
200                stats.record_miss();
201            }
202        }
203
204        // Update query stats
205        let query_stats = self.query_accesses.entry(fingerprint.clone()).or_default();
206
207        if hit {
208            query_stats.record_hit(time_saved);
209        } else {
210            query_stats.record_miss();
211        }
212
213        // Update time bucket
214        self.update_time_bucket(&fingerprint.tables, hit);
215    }
216
217    /// Update time bucket
218    fn update_time_bucket(&self, tables: &[String], _hit: bool) {
219        let now = std::time::SystemTime::now()
220            .duration_since(std::time::SystemTime::UNIX_EPOCH)
221            .unwrap_or_default()
222            .as_secs();
223
224        let mut current = self.current_bucket.write().unwrap();
225
226        // Check if we need to roll to a new bucket
227        if now >= current.end {
228            // Finalize current bucket
229            let mut buckets = self.time_buckets.write().unwrap();
230
231            // Calculate hit ratio for completed bucket
232            let total_hits: u64 = self
233                .table_accesses
234                .iter()
235                .map(|e| e.value().hits.load(Ordering::Relaxed))
236                .sum();
237            let total_misses: u64 = self
238                .table_accesses
239                .iter()
240                .map(|e| e.value().misses.load(Ordering::Relaxed))
241                .sum();
242
243            let total = total_hits + total_misses;
244            current.hit_ratio = if total > 0 {
245                total_hits as f64 / total as f64
246            } else {
247                0.0
248            };
249
250            buckets.push(current.clone());
251
252            // Trim old buckets
253            while buckets.len() > self.max_buckets {
254                buckets.remove(0);
255            }
256
257            // Create new bucket
258            *current = TimeBucket {
259                start: now,
260                end: now + self.bucket_size_secs,
261                accesses: HashMap::new(),
262                hit_ratio: 0.0,
263            };
264        }
265
266        // Update current bucket
267        for table in tables {
268            *current.accesses.entry(table.clone()).or_default() += 1;
269        }
270    }
271
272    /// Calculate temperature from access count
273    fn calculate_temperature(&self, accesses: u64) -> Temperature {
274        // Get percentiles from all tables
275        let mut all_accesses: Vec<u64> = self
276            .table_accesses
277            .iter()
278            .map(|e| e.value().total_accesses())
279            .collect();
280        all_accesses.sort();
281
282        if all_accesses.is_empty() {
283            return Temperature::Cold;
284        }
285
286        let p75 = all_accesses
287            .get(all_accesses.len() * 3 / 4)
288            .copied()
289            .unwrap_or(0);
290        let p50 = all_accesses
291            .get(all_accesses.len() / 2)
292            .copied()
293            .unwrap_or(0);
294        let p25 = all_accesses
295            .get(all_accesses.len() / 4)
296            .copied()
297            .unwrap_or(0);
298
299        if accesses >= p75 {
300            Temperature::Hot
301        } else if accesses >= p50 {
302            Temperature::Warm
303        } else if accesses >= p25 {
304            Temperature::Cold
305        } else {
306            Temperature::Frozen
307        }
308    }
309
310    /// Generate heatmap visualization data
311    pub fn generate_heatmap(&self) -> HeatmapData {
312        let mut tables: Vec<TableHeat> = self
313            .table_accesses
314            .iter()
315            .map(|entry| {
316                let stats = entry.value();
317                let hits = stats.hits.load(Ordering::Relaxed);
318                let misses = stats.misses.load(Ordering::Relaxed);
319                let total = hits + misses;
320
321                TableHeat {
322                    name: entry.key().clone(),
323                    total_accesses: total,
324                    hit_ratio: stats.hit_ratio(),
325                    time_saved_ms: stats.total_time_saved_us.load(Ordering::Relaxed) / 1000,
326                    temperature: self.calculate_temperature(total),
327                }
328            })
329            .collect();
330
331        // Sort by total accesses (descending)
332        tables.sort_by_key(|b| std::cmp::Reverse(b.total_accesses));
333
334        let time_series = self.get_time_series();
335        let recommendations = self.generate_recommendations();
336
337        HeatmapData {
338            tables,
339            time_series,
340            recommendations,
341        }
342    }
343
344    /// Get time series data
345    fn get_time_series(&self) -> Vec<TimeBucket> {
346        let buckets = self.time_buckets.read().unwrap();
347        buckets.clone()
348    }
349
350    /// Generate optimization recommendations
351    fn generate_recommendations(&self) -> Vec<Recommendation> {
352        let mut recs = Vec::new();
353
354        for entry in self.table_accesses.iter() {
355            let table = entry.key();
356            let stats = entry.value();
357            let hits = stats.hits.load(Ordering::Relaxed);
358            let misses = stats.misses.load(Ordering::Relaxed);
359            let total = hits + misses;
360
361            if total < 100 {
362                continue; // Not enough data
363            }
364
365            let hit_ratio = stats.hit_ratio();
366
367            // Low hit ratio recommendation
368            if hit_ratio < 0.5 {
369                recs.push(Recommendation {
370                    table: table.clone(),
371                    issue: "Low cache hit ratio".to_string(),
372                    suggestion: format!(
373                        "Consider increasing TTL or cache size for '{}' (current hit ratio: {:.1}%)",
374                        table,
375                        hit_ratio * 100.0
376                    ),
377                    priority: Priority::High,
378                });
379            }
380
381            // Cold data in cache recommendation
382            let last_access = stats.last_access.load(Ordering::Relaxed);
383            if last_access > 0 {
384                let now = std::time::SystemTime::now()
385                    .duration_since(std::time::SystemTime::UNIX_EPOCH)
386                    .unwrap_or_default()
387                    .as_nanos() as u64;
388
389                let age_secs = (now - last_access) / 1_000_000_000;
390
391                if age_secs > 3600 && total < 1000 {
392                    recs.push(Recommendation {
393                        table: table.clone(),
394                        issue: "Cold data in cache".to_string(),
395                        suggestion: format!(
396                            "'{}' hasn't been accessed in {} minutes, consider reducing TTL",
397                            table,
398                            age_secs / 60
399                        ),
400                        priority: Priority::Medium,
401                    });
402                }
403            }
404        }
405
406        recs
407    }
408
409    /// Clear all heatmap data
410    pub fn clear(&self) {
411        self.table_accesses.clear();
412        self.query_accesses.clear();
413        self.time_buckets.write().unwrap().clear();
414    }
415}
416
417impl Default for CacheHeatmap {
418    fn default() -> Self {
419        Self::new()
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426
427    #[test]
428    fn test_access_stats() {
429        let stats = AccessStats::default();
430
431        stats.record_hit(Duration::from_millis(10));
432        stats.record_hit(Duration::from_millis(20));
433        stats.record_miss();
434
435        assert_eq!(stats.hits.load(Ordering::Relaxed), 2);
436        assert_eq!(stats.misses.load(Ordering::Relaxed), 1);
437        assert!((stats.hit_ratio() - 0.666).abs() < 0.01);
438    }
439
440    #[test]
441    fn test_record_access() {
442        let heatmap = CacheHeatmap::new();
443        let fp = QueryFingerprint::from_query("SELECT * FROM users");
444
445        heatmap.record_access(&fp, true, Duration::from_millis(10));
446        heatmap.record_access(&fp, true, Duration::from_millis(15));
447        heatmap.record_access(&fp, false, Duration::ZERO);
448
449        let data = heatmap.generate_heatmap();
450        assert!(!data.tables.is_empty());
451
452        let users_heat = data.tables.iter().find(|t| t.name == "USERS").unwrap();
453
454        assert_eq!(users_heat.total_accesses, 3);
455        assert!((users_heat.hit_ratio - 0.666).abs() < 0.01);
456    }
457
458    #[test]
459    fn test_temperature_classification() {
460        let heatmap = CacheHeatmap::new();
461
462        // Add varying access patterns
463        for i in 0..100 {
464            let fp = QueryFingerprint::from_query(&format!("SELECT * FROM table_{}", i % 10));
465            for _ in 0..(i * 10) {
466                heatmap.record_access(&fp, true, Duration::from_millis(1));
467            }
468        }
469
470        let data = heatmap.generate_heatmap();
471
472        // Should have hot, warm, cold, and frozen tables
473        let temps: Vec<_> = data.tables.iter().map(|t| t.temperature).collect();
474        assert!(temps.contains(&Temperature::Hot));
475    }
476
477    #[test]
478    fn test_recommendations() {
479        let heatmap = CacheHeatmap::new();
480
481        // Create low hit ratio scenario
482        let fp = QueryFingerprint::from_query("SELECT * FROM slow_table");
483        for _ in 0..50 {
484            heatmap.record_access(&fp, true, Duration::from_millis(1));
485        }
486        for _ in 0..150 {
487            heatmap.record_access(&fp, false, Duration::ZERO);
488        }
489
490        let data = heatmap.generate_heatmap();
491
492        // Should have a recommendation for low hit ratio
493        assert!(!data.recommendations.is_empty());
494        let rec = data
495            .recommendations
496            .iter()
497            .find(|r| r.issue.contains("hit ratio"));
498        assert!(rec.is_some());
499    }
500}