oxirs_vec/tiering/
access_tracker.rs

1//! Access pattern tracking for tiering decisions
2
3use super::types::{AccessPattern, AccessStatistics, IndexMetadata};
4use std::collections::{HashMap, VecDeque};
5use std::time::{Duration, SystemTime};
6
7/// Access pattern tracker
8pub struct AccessTracker {
9    /// Access history (index_id -> timestamps)
10    access_history: HashMap<String, VecDeque<SystemTime>>,
11    /// Query latencies (index_id -> latencies in microseconds)
12    latency_history: HashMap<String, VecDeque<u64>>,
13    /// Maximum history size
14    max_history_size: usize,
15    /// Time window for access pattern analysis
16    analysis_window: Duration,
17}
18
19impl AccessTracker {
20    /// Create a new access tracker
21    pub fn new(max_history_size: usize, analysis_window: Duration) -> Self {
22        Self {
23            access_history: HashMap::new(),
24            latency_history: HashMap::new(),
25            max_history_size,
26            analysis_window,
27        }
28    }
29
30    /// Record an access to an index
31    pub fn record_access(&mut self, index_id: &str, latency_us: u64) {
32        let now = SystemTime::now();
33
34        // Record access timestamp
35        let history = self.access_history.entry(index_id.to_string()).or_default();
36        history.push_back(now);
37
38        // Maintain history size
39        while history.len() > self.max_history_size {
40            history.pop_front();
41        }
42
43        // Record latency
44        let latencies = self
45            .latency_history
46            .entry(index_id.to_string())
47            .or_default();
48        latencies.push_back(latency_us);
49
50        // Maintain latency history size
51        while latencies.len() > self.max_history_size {
52            latencies.pop_front();
53        }
54    }
55
56    /// Get access statistics for an index
57    pub fn get_statistics(&self, index_id: &str) -> AccessStatistics {
58        let now = SystemTime::now();
59
60        let history = self.access_history.get(index_id);
61        let latencies = self.latency_history.get(index_id);
62
63        if let Some(hist) = history {
64            let total_queries = hist.len() as u64;
65
66            // Count queries in different time windows
67            let queries_last_hour =
68                self.count_queries_in_window(hist, now, Duration::from_secs(3600));
69            let queries_last_day =
70                self.count_queries_in_window(hist, now, Duration::from_secs(86400));
71            let queries_last_week =
72                self.count_queries_in_window(hist, now, Duration::from_secs(604800));
73
74            // Calculate QPS
75            let avg_qps = if !hist.is_empty() {
76                let time_span = now
77                    .duration_since(*hist.front().unwrap())
78                    .unwrap_or(Duration::from_secs(1))
79                    .as_secs_f64();
80                total_queries as f64 / time_span.max(1.0)
81            } else {
82                0.0
83            };
84
85            // Calculate peak QPS (in 1-minute windows)
86            let peak_qps = self.calculate_peak_qps(hist, now);
87
88            // Calculate latency percentiles
89            let query_latencies = if let Some(lats) = latencies {
90                self.calculate_latency_percentiles(lats)
91            } else {
92                Default::default()
93            };
94
95            // Determine access pattern
96            let access_pattern = self.classify_access_pattern(
97                avg_qps,
98                queries_last_hour,
99                queries_last_day,
100                queries_last_week,
101            );
102
103            AccessStatistics {
104                total_queries,
105                queries_last_hour,
106                queries_last_day,
107                queries_last_week,
108                avg_qps,
109                peak_qps,
110                last_access_time: hist.back().copied(),
111                access_pattern,
112                query_latencies,
113            }
114        } else {
115            Default::default()
116        }
117    }
118
119    /// Update metadata with current access statistics
120    pub fn update_metadata(&self, metadata: &mut IndexMetadata) {
121        metadata.access_stats = self.get_statistics(&metadata.index_id);
122        metadata.last_accessed = metadata
123            .access_stats
124            .last_access_time
125            .unwrap_or_else(SystemTime::now);
126    }
127
128    /// Count queries in a time window
129    fn count_queries_in_window(
130        &self,
131        history: &VecDeque<SystemTime>,
132        now: SystemTime,
133        window: Duration,
134    ) -> u64 {
135        history
136            .iter()
137            .filter(|&&t| now.duration_since(t).unwrap_or(Duration::MAX) <= window)
138            .count() as u64
139    }
140
141    /// Calculate peak QPS in 1-minute windows
142    fn calculate_peak_qps(&self, history: &VecDeque<SystemTime>, _now: SystemTime) -> f64 {
143        if history.is_empty() {
144            return 0.0;
145        }
146
147        let window = Duration::from_secs(60);
148        let mut max_qps: f64 = 0.0;
149
150        // Sample windows to avoid quadratic complexity
151        let sample_size = history.len().min(100);
152        for i in (0..history.len()).step_by(history.len() / sample_size.max(1)) {
153            if let Some(&time) = history.get(i) {
154                let count = self.count_queries_in_window(history, time + window, window);
155                let qps = count as f64 / 60.0;
156                max_qps = max_qps.max(qps);
157            }
158        }
159
160        max_qps
161    }
162
163    /// Calculate latency percentiles
164    fn calculate_latency_percentiles(
165        &self,
166        latencies: &VecDeque<u64>,
167    ) -> super::types::LatencyPercentiles {
168        if latencies.is_empty() {
169            return Default::default();
170        }
171
172        let mut sorted: Vec<u64> = latencies.iter().copied().collect();
173        sorted.sort_unstable();
174
175        let p50 = sorted[sorted.len() * 50 / 100];
176        let p95 = sorted[sorted.len() * 95 / 100];
177        let p99 = sorted[sorted.len() * 99 / 100];
178        let max = *sorted.last().unwrap();
179
180        super::types::LatencyPercentiles { p50, p95, p99, max }
181    }
182
183    /// Classify access pattern
184    fn classify_access_pattern(
185        &self,
186        avg_qps: f64,
187        queries_last_hour: u64,
188        queries_last_day: u64,
189        queries_last_week: u64,
190    ) -> AccessPattern {
191        // Hot: > 10 QPS sustained
192        if avg_qps > 10.0 {
193            return AccessPattern::Hot;
194        }
195
196        // Warm: 1-10 QPS
197        if avg_qps > 1.0 {
198            return AccessPattern::Warm;
199        }
200
201        // Cold: < 1 QPS
202        if avg_qps < 1.0 && queries_last_day < 100 {
203            return AccessPattern::Cold;
204        }
205
206        // Bursty: High variance in access rates
207        let hour_rate = queries_last_hour as f64 / 1.0;
208        let day_rate = queries_last_day as f64 / 24.0;
209        if hour_rate > day_rate * 3.0 {
210            return AccessPattern::Bursty;
211        }
212
213        // Seasonal: Access patterns vary by time
214        // (Simplified heuristic - could use ML for better detection)
215        let week_rate = queries_last_week as f64 / (7.0 * 24.0);
216        if day_rate > week_rate * 2.0 || day_rate < week_rate * 0.5 {
217            return AccessPattern::Seasonal;
218        }
219
220        AccessPattern::Unknown
221    }
222
223    /// Predict future access based on historical patterns
224    pub fn predict_future_access(&self, index_id: &str, horizon: Duration) -> f64 {
225        let stats = self.get_statistics(index_id);
226
227        // Simple prediction: exponentially weighted moving average
228        let recent_qps = stats.avg_qps;
229        let historical_qps = if stats.total_queries > 0 {
230            stats.total_queries as f64
231                / stats
232                    .last_access_time
233                    .and_then(|t| SystemTime::now().duration_since(t).ok())
234                    .unwrap_or(Duration::from_secs(1))
235                    .as_secs_f64()
236        } else {
237            0.0
238        };
239
240        // Weight recent activity more heavily
241        let alpha = 0.7;
242        let predicted_qps = alpha * recent_qps + (1.0 - alpha) * historical_qps;
243
244        // Add some uncertainty (simplified, without using distribution)
245        let uncertainty = predicted_qps * 0.05; // 5% uncertainty
246        (predicted_qps + uncertainty) * horizon.as_secs_f64()
247    }
248
249    /// Clear old history entries
250    pub fn cleanup_old_entries(&mut self, retention_period: Duration) {
251        let now = SystemTime::now();
252
253        for history in self.access_history.values_mut() {
254            while let Some(&front) = history.front() {
255                if now.duration_since(front).unwrap_or(Duration::ZERO) > retention_period {
256                    history.pop_front();
257                } else {
258                    break;
259                }
260            }
261        }
262
263        for latencies in self.latency_history.values_mut() {
264            // Keep same size as access history
265            while latencies.len() > self.max_history_size {
266                latencies.pop_front();
267            }
268        }
269
270        // Remove empty entries
271        self.access_history.retain(|_, v| !v.is_empty());
272        self.latency_history.retain(|_, v| !v.is_empty());
273    }
274
275    /// Get indices sorted by access score (descending)
276    pub fn get_hot_indices(&self, limit: usize) -> Vec<(String, f64)> {
277        let mut indices: Vec<(String, f64)> = self
278            .access_history
279            .keys()
280            .map(|id| {
281                let stats = self.get_statistics(id);
282                let score = super::policies::calculate_access_score(&stats);
283                (id.clone(), score)
284            })
285            .collect();
286
287        indices.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
288        indices.truncate(limit);
289        indices
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    #[test]
298    fn test_access_tracker_basic() {
299        let mut tracker = AccessTracker::new(1000, Duration::from_secs(3600));
300
301        // Record some accesses
302        for _ in 0..10 {
303            tracker.record_access("index1", 1000);
304        }
305
306        let stats = tracker.get_statistics("index1");
307        assert_eq!(stats.total_queries, 10);
308        assert!(stats.avg_qps > 0.0);
309    }
310
311    #[test]
312    fn test_access_pattern_classification() {
313        let mut tracker = AccessTracker::new(1000, Duration::from_secs(3600));
314
315        // Simulate hot access pattern (high QPS)
316        for _ in 0..1000 {
317            tracker.record_access("hot_index", 500);
318        }
319
320        let stats = tracker.get_statistics("hot_index");
321        assert!(matches!(
322            stats.access_pattern,
323            AccessPattern::Hot | AccessPattern::Warm
324        ));
325    }
326
327    #[test]
328    fn test_latency_percentiles() {
329        let mut tracker = AccessTracker::new(1000, Duration::from_secs(3600));
330
331        // Record varying latencies
332        let latencies = vec![100, 200, 300, 500, 1000, 2000, 5000];
333        for &lat in &latencies {
334            tracker.record_access("index1", lat);
335        }
336
337        let stats = tracker.get_statistics("index1");
338        assert!(stats.query_latencies.p50 > 0);
339        assert!(stats.query_latencies.p99 > stats.query_latencies.p50);
340    }
341
342    #[test]
343    fn test_hot_indices() {
344        let mut tracker = AccessTracker::new(1000, Duration::from_secs(3600));
345
346        // Create indices with different access patterns
347        for _ in 0..100 {
348            tracker.record_access("hot_index", 100);
349        }
350        for _ in 0..10 {
351            tracker.record_access("warm_index", 200);
352        }
353        tracker.record_access("cold_index", 300);
354
355        let hot = tracker.get_hot_indices(3);
356        assert_eq!(hot.len(), 3);
357        assert_eq!(hot[0].0, "hot_index");
358    }
359}