heliosdb_proxy/distribcache/
heatmap.rs1use dashmap::DashMap;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::RwLock;
9use std::time::Duration;
10
11use super::QueryFingerprint;
12
13#[derive(Debug)]
15pub struct AccessStats {
16 pub hits: AtomicU64,
18 pub misses: AtomicU64,
20 pub total_time_saved_us: AtomicU64,
22 pub last_access: AtomicU64,
24}
25
26impl Default for AccessStats {
27 fn default() -> Self {
28 Self {
29 hits: AtomicU64::new(0),
30 misses: AtomicU64::new(0),
31 total_time_saved_us: AtomicU64::new(0),
32 last_access: AtomicU64::new(0),
33 }
34 }
35}
36
37impl AccessStats {
38 fn record_hit(&self, time_saved: Duration) {
39 self.hits.fetch_add(1, Ordering::Relaxed);
40 self.total_time_saved_us.fetch_add(
41 time_saved.as_micros() as u64,
42 Ordering::Relaxed,
43 );
44 self.update_last_access();
45 }
46
47 fn record_miss(&self) {
48 self.misses.fetch_add(1, Ordering::Relaxed);
49 self.update_last_access();
50 }
51
52 fn update_last_access(&self) {
53 let now = std::time::SystemTime::now()
54 .duration_since(std::time::SystemTime::UNIX_EPOCH)
55 .unwrap_or_default()
56 .as_nanos() as u64;
57 self.last_access.store(now, Ordering::Relaxed);
58 }
59
60 fn hit_ratio(&self) -> f64 {
61 let hits = self.hits.load(Ordering::Relaxed);
62 let misses = self.misses.load(Ordering::Relaxed);
63 let total = hits + misses;
64 if total > 0 {
65 hits as f64 / total as f64
66 } else {
67 0.0
68 }
69 }
70
71 fn total_accesses(&self) -> u64 {
72 self.hits.load(Ordering::Relaxed) + self.misses.load(Ordering::Relaxed)
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct TimeBucket {
79 pub start: u64,
81 pub end: u64,
83 pub accesses: HashMap<String, u64>,
85 pub hit_ratio: f64,
87}
88
89#[derive(Debug, Clone)]
91pub struct TableHeat {
92 pub name: String,
94 pub total_accesses: u64,
96 pub hit_ratio: f64,
98 pub time_saved_ms: u64,
100 pub temperature: Temperature,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub enum Temperature {
107 Hot,
109 Warm,
111 Cold,
113 Frozen,
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub enum Priority {
120 High,
121 Medium,
122 Low,
123}
124
125#[derive(Debug, Clone)]
127pub struct Recommendation {
128 pub table: String,
130 pub issue: String,
132 pub suggestion: String,
134 pub priority: Priority,
136}
137
138#[derive(Debug, Clone)]
140pub struct HeatmapData {
141 pub tables: Vec<TableHeat>,
143 pub time_series: Vec<TimeBucket>,
145 pub recommendations: Vec<Recommendation>,
147}
148
149pub struct CacheHeatmap {
151 table_accesses: DashMap<String, AccessStats>,
153
154 query_accesses: DashMap<QueryFingerprint, AccessStats>,
156
157 time_buckets: RwLock<Vec<TimeBucket>>,
159
160 current_bucket: RwLock<TimeBucket>,
162
163 bucket_size_secs: u64,
165
166 max_buckets: usize,
168}
169
170impl CacheHeatmap {
171 pub fn new() -> Self {
173 let now = std::time::SystemTime::now()
174 .duration_since(std::time::SystemTime::UNIX_EPOCH)
175 .unwrap_or_default()
176 .as_secs();
177
178 Self {
179 table_accesses: DashMap::new(),
180 query_accesses: DashMap::new(),
181 time_buckets: RwLock::new(Vec::new()),
182 current_bucket: RwLock::new(TimeBucket {
183 start: now,
184 end: now + 300, accesses: HashMap::new(),
186 hit_ratio: 0.0,
187 }),
188 bucket_size_secs: 300,
189 max_buckets: 2016, }
191 }
192
193 pub fn record_access(&self, fingerprint: &QueryFingerprint, hit: bool, time_saved: Duration) {
195 for table in &fingerprint.tables {
197 let stats = self.table_accesses
198 .entry(table.clone())
199 .or_default();
200
201 if hit {
202 stats.record_hit(time_saved);
203 } else {
204 stats.record_miss();
205 }
206 }
207
208 let query_stats = self.query_accesses
210 .entry(fingerprint.clone())
211 .or_default();
212
213 if hit {
214 query_stats.record_hit(time_saved);
215 } else {
216 query_stats.record_miss();
217 }
218
219 self.update_time_bucket(&fingerprint.tables, hit);
221 }
222
223 fn update_time_bucket(&self, tables: &[String], _hit: bool) {
225 let now = std::time::SystemTime::now()
226 .duration_since(std::time::SystemTime::UNIX_EPOCH)
227 .unwrap_or_default()
228 .as_secs();
229
230 let mut current = self.current_bucket.write().unwrap();
231
232 if now >= current.end {
234 let mut buckets = self.time_buckets.write().unwrap();
236
237 let total_hits: u64 = self.table_accesses.iter()
239 .map(|e| e.value().hits.load(Ordering::Relaxed))
240 .sum();
241 let total_misses: u64 = self.table_accesses.iter()
242 .map(|e| e.value().misses.load(Ordering::Relaxed))
243 .sum();
244
245 let total = total_hits + total_misses;
246 current.hit_ratio = if total > 0 {
247 total_hits as f64 / total as f64
248 } else {
249 0.0
250 };
251
252 buckets.push(current.clone());
253
254 while buckets.len() > self.max_buckets {
256 buckets.remove(0);
257 }
258
259 *current = TimeBucket {
261 start: now,
262 end: now + self.bucket_size_secs,
263 accesses: HashMap::new(),
264 hit_ratio: 0.0,
265 };
266 }
267
268 for table in tables {
270 *current.accesses.entry(table.clone()).or_default() += 1;
271 }
272 }
273
274 fn calculate_temperature(&self, accesses: u64) -> Temperature {
276 let mut all_accesses: Vec<u64> = self.table_accesses.iter()
278 .map(|e| e.value().total_accesses())
279 .collect();
280 all_accesses.sort();
281
282 if all_accesses.is_empty() {
283 return Temperature::Cold;
284 }
285
286 let p75 = all_accesses.get(all_accesses.len() * 3 / 4).copied().unwrap_or(0);
287 let p50 = all_accesses.get(all_accesses.len() / 2).copied().unwrap_or(0);
288 let p25 = all_accesses.get(all_accesses.len() / 4).copied().unwrap_or(0);
289
290 if accesses >= p75 {
291 Temperature::Hot
292 } else if accesses >= p50 {
293 Temperature::Warm
294 } else if accesses >= p25 {
295 Temperature::Cold
296 } else {
297 Temperature::Frozen
298 }
299 }
300
301 pub fn generate_heatmap(&self) -> HeatmapData {
303 let mut tables: Vec<TableHeat> = self.table_accesses.iter()
304 .map(|entry| {
305 let stats = entry.value();
306 let hits = stats.hits.load(Ordering::Relaxed);
307 let misses = stats.misses.load(Ordering::Relaxed);
308 let total = hits + misses;
309
310 TableHeat {
311 name: entry.key().clone(),
312 total_accesses: total,
313 hit_ratio: stats.hit_ratio(),
314 time_saved_ms: stats.total_time_saved_us.load(Ordering::Relaxed) / 1000,
315 temperature: self.calculate_temperature(total),
316 }
317 })
318 .collect();
319
320 tables.sort_by(|a, b| b.total_accesses.cmp(&a.total_accesses));
322
323 let time_series = self.get_time_series();
324 let recommendations = self.generate_recommendations();
325
326 HeatmapData {
327 tables,
328 time_series,
329 recommendations,
330 }
331 }
332
333 fn get_time_series(&self) -> Vec<TimeBucket> {
335 let buckets = self.time_buckets.read().unwrap();
336 buckets.clone()
337 }
338
339 fn generate_recommendations(&self) -> Vec<Recommendation> {
341 let mut recs = Vec::new();
342
343 for entry in self.table_accesses.iter() {
344 let table = entry.key();
345 let stats = entry.value();
346 let hits = stats.hits.load(Ordering::Relaxed);
347 let misses = stats.misses.load(Ordering::Relaxed);
348 let total = hits + misses;
349
350 if total < 100 {
351 continue; }
353
354 let hit_ratio = stats.hit_ratio();
355
356 if hit_ratio < 0.5 {
358 recs.push(Recommendation {
359 table: table.clone(),
360 issue: "Low cache hit ratio".to_string(),
361 suggestion: format!(
362 "Consider increasing TTL or cache size for '{}' (current hit ratio: {:.1}%)",
363 table,
364 hit_ratio * 100.0
365 ),
366 priority: Priority::High,
367 });
368 }
369
370 let last_access = stats.last_access.load(Ordering::Relaxed);
372 if last_access > 0 {
373 let now = std::time::SystemTime::now()
374 .duration_since(std::time::SystemTime::UNIX_EPOCH)
375 .unwrap_or_default()
376 .as_nanos() as u64;
377
378 let age_secs = (now - last_access) / 1_000_000_000;
379
380 if age_secs > 3600 && total < 1000 {
381 recs.push(Recommendation {
382 table: table.clone(),
383 issue: "Cold data in cache".to_string(),
384 suggestion: format!(
385 "'{}' hasn't been accessed in {} minutes, consider reducing TTL",
386 table,
387 age_secs / 60
388 ),
389 priority: Priority::Medium,
390 });
391 }
392 }
393 }
394
395 recs
396 }
397
398 pub fn clear(&self) {
400 self.table_accesses.clear();
401 self.query_accesses.clear();
402 self.time_buckets.write().unwrap().clear();
403 }
404}
405
406impl Default for CacheHeatmap {
407 fn default() -> Self {
408 Self::new()
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415
416 #[test]
417 fn test_access_stats() {
418 let stats = AccessStats::default();
419
420 stats.record_hit(Duration::from_millis(10));
421 stats.record_hit(Duration::from_millis(20));
422 stats.record_miss();
423
424 assert_eq!(stats.hits.load(Ordering::Relaxed), 2);
425 assert_eq!(stats.misses.load(Ordering::Relaxed), 1);
426 assert!((stats.hit_ratio() - 0.666).abs() < 0.01);
427 }
428
429 #[test]
430 fn test_record_access() {
431 let heatmap = CacheHeatmap::new();
432 let fp = QueryFingerprint::from_query("SELECT * FROM users");
433
434 heatmap.record_access(&fp, true, Duration::from_millis(10));
435 heatmap.record_access(&fp, true, Duration::from_millis(15));
436 heatmap.record_access(&fp, false, Duration::ZERO);
437
438 let data = heatmap.generate_heatmap();
439 assert!(!data.tables.is_empty());
440
441 let users_heat = data.tables.iter()
442 .find(|t| t.name == "USERS")
443 .unwrap();
444
445 assert_eq!(users_heat.total_accesses, 3);
446 assert!((users_heat.hit_ratio - 0.666).abs() < 0.01);
447 }
448
449 #[test]
450 fn test_temperature_classification() {
451 let heatmap = CacheHeatmap::new();
452
453 for i in 0..100 {
455 let fp = QueryFingerprint::from_query(&format!("SELECT * FROM table_{}", i % 10));
456 for _ in 0..(i * 10) {
457 heatmap.record_access(&fp, true, Duration::from_millis(1));
458 }
459 }
460
461 let data = heatmap.generate_heatmap();
462
463 let temps: Vec<_> = data.tables.iter().map(|t| t.temperature).collect();
465 assert!(temps.contains(&Temperature::Hot));
466 }
467
468 #[test]
469 fn test_recommendations() {
470 let heatmap = CacheHeatmap::new();
471
472 let fp = QueryFingerprint::from_query("SELECT * FROM slow_table");
474 for _ in 0..50 {
475 heatmap.record_access(&fp, true, Duration::from_millis(1));
476 }
477 for _ in 0..150 {
478 heatmap.record_access(&fp, false, Duration::ZERO);
479 }
480
481 let data = heatmap.generate_heatmap();
482
483 assert!(!data.recommendations.is_empty());
485 let rec = data.recommendations.iter()
486 .find(|r| r.issue.contains("hit ratio"));
487 assert!(rec.is_some());
488 }
489}