Skip to main content

synapse_pingora/trends/
time_store.rs

1//! Time-bucketed signal storage with LRU eviction.
2
3use std::collections::{HashMap, VecDeque};
4
5use super::config::TrendsConfig;
6use super::types::{
7    BucketSummary, CategoryTrendSummary, Signal, SignalBucketData, SignalTrend, SignalType,
8    TimeRange, TopSignalType, TrendHistogramBucket, TrendQueryOptions, TrendsSummary,
9};
10
11/// A time bucket for signal aggregation.
12#[derive(Debug, Clone)]
13pub struct SignalBucket {
14    /// Bucket start time (Unix ms)
15    pub timestamp: i64,
16    /// Bucket end time (Unix ms)
17    pub end_timestamp: i64,
18    /// Signals in this bucket
19    pub signals: Vec<Signal>,
20    /// Summary statistics
21    pub summary: BucketSummary,
22}
23
24impl SignalBucket {
25    /// Create a new bucket for the given timestamp.
26    pub fn new(timestamp: i64, bucket_size_ms: u64) -> Self {
27        Self {
28            timestamp,
29            end_timestamp: timestamp + bucket_size_ms as i64,
30            signals: Vec::new(),
31            summary: BucketSummary::default(),
32        }
33    }
34
35    /// Add a signal to the bucket.
36    pub fn add_signal(&mut self, signal: Signal, max_signals: usize) {
37        // Update summary
38        self.summary.total_count += 1;
39
40        let category_summary = self.summary.by_category.entry(signal.category).or_default();
41
42        category_summary.count += 1;
43        category_summary.unique_values.insert(signal.value.clone());
44        category_summary
45            .unique_entities
46            .insert(signal.entity_id.clone());
47        *category_summary
48            .by_type
49            .entry(signal.signal_type)
50            .or_insert(0) += 1;
51
52        // Only store signals up to max
53        if self.signals.len() < max_signals {
54            self.signals.push(signal);
55        }
56    }
57
58    /// Convert to serializable format.
59    pub fn to_data(&self) -> SignalBucketData {
60        SignalBucketData {
61            timestamp: self.timestamp,
62            end_timestamp: self.end_timestamp,
63            signals: self.signals.clone(),
64            summary: self.summary.clone(),
65        }
66    }
67}
68
69/// Time-series signal store with ring buffer.
70pub struct TimeStore {
71    config: TrendsConfig,
72    /// Ring buffer of time buckets
73    buckets: VecDeque<SignalBucket>,
74    /// Index from entity ID to bucket indices
75    entity_index: HashMap<String, Vec<usize>>,
76    /// Current bucket (most recent)
77    current_bucket_idx: Option<usize>,
78}
79
80impl TimeStore {
81    /// Create a new time store.
82    pub fn new(config: &TrendsConfig) -> Self {
83        Self {
84            config: config.clone(),
85            buckets: VecDeque::with_capacity(config.bucket_count()),
86            entity_index: HashMap::new(),
87            current_bucket_idx: None,
88        }
89    }
90
91    /// Record a signal.
92    pub fn record(&mut self, signal: Signal) {
93        let bucket_timestamp = self.bucket_timestamp(signal.timestamp);
94
95        // Get or create the bucket
96        let bucket_idx = self.get_or_create_bucket(bucket_timestamp);
97
98        // Add signal to bucket
99        if let Some(bucket) = self.buckets.get_mut(bucket_idx) {
100            bucket.add_signal(signal.clone(), self.config.max_signals_per_bucket);
101
102            // Update entity index
103            self.entity_index
104                .entry(signal.entity_id)
105                .or_default()
106                .push(bucket_idx);
107        }
108
109        self.current_bucket_idx = Some(bucket_idx);
110    }
111
112    /// Get the bucket timestamp for a given time.
113    fn bucket_timestamp(&self, timestamp: i64) -> i64 {
114        let bucket_size = self.config.bucket_size_ms as i64;
115        (timestamp / bucket_size) * bucket_size
116    }
117
118    /// Get or create a bucket for the given timestamp.
119    fn get_or_create_bucket(&mut self, timestamp: i64) -> usize {
120        // Check if we already have this bucket
121        for (idx, bucket) in self.buckets.iter().enumerate() {
122            if bucket.timestamp == timestamp {
123                return idx;
124            }
125        }
126
127        // Create new bucket
128        let bucket = SignalBucket::new(timestamp, self.config.bucket_size_ms);
129
130        // Evict old buckets if necessary
131        let max_buckets = self.config.bucket_count();
132        while self.buckets.len() >= max_buckets {
133            self.buckets.pop_front();
134            // Rebuild entity index (indices shifted)
135            self.rebuild_entity_index();
136        }
137
138        self.buckets.push_back(bucket);
139        self.buckets.len() - 1
140    }
141
142    /// Rebuild the entity index after eviction.
143    fn rebuild_entity_index(&mut self) {
144        self.entity_index.clear();
145        for (bucket_idx, bucket) in self.buckets.iter().enumerate() {
146            for signal in &bucket.signals {
147                self.entity_index
148                    .entry(signal.entity_id.clone())
149                    .or_default()
150                    .push(bucket_idx);
151            }
152        }
153    }
154
155    /// Get recent buckets (most recent first).
156    pub fn get_recent_buckets(&self, count: usize) -> Vec<&SignalBucket> {
157        self.buckets.iter().rev().take(count).collect()
158    }
159
160    /// Get signals for an entity.
161    pub fn get_signals_for_entity(
162        &self,
163        entity_id: &str,
164        options: &TrendQueryOptions,
165    ) -> Vec<Signal> {
166        let mut signals = Vec::new();
167
168        let bucket_indices = match self.entity_index.get(entity_id) {
169            Some(indices) => indices.clone(),
170            None => return signals,
171        };
172
173        // Deduplicate bucket indices (same bucket may be added multiple times for different signals)
174        let unique_indices: std::collections::HashSet<usize> = bucket_indices.into_iter().collect();
175
176        for idx in unique_indices {
177            if let Some(bucket) = self.buckets.get(idx) {
178                // Apply time filters
179                if let Some(from) = options.from {
180                    if bucket.end_timestamp < from {
181                        continue;
182                    }
183                }
184                if let Some(to) = options.to {
185                    if bucket.timestamp > to {
186                        continue;
187                    }
188                }
189
190                for signal in &bucket.signals {
191                    if signal.entity_id != entity_id {
192                        continue;
193                    }
194
195                    // Apply type/category filters
196                    if let Some(cat) = options.category {
197                        if signal.category != cat {
198                            continue;
199                        }
200                    }
201                    if let Some(st) = options.signal_type {
202                        if signal.signal_type != st {
203                            continue;
204                        }
205                    }
206
207                    signals.push(signal.clone());
208                }
209            }
210        }
211
212        // Apply limit
213        if let Some(limit) = options.limit {
214            signals.truncate(limit);
215        }
216
217        signals
218    }
219
220    /// Get all signals matching criteria.
221    pub fn get_signals(&self, options: &TrendQueryOptions) -> Vec<Signal> {
222        let mut signals = Vec::new();
223
224        for bucket in &self.buckets {
225            // Apply time filters
226            if let Some(from) = options.from {
227                if bucket.end_timestamp < from {
228                    continue;
229                }
230            }
231            if let Some(to) = options.to {
232                if bucket.timestamp > to {
233                    continue;
234                }
235            }
236
237            for signal in &bucket.signals {
238                // Apply entity filter
239                if let Some(ref entity_id) = options.entity_id {
240                    if &signal.entity_id != entity_id {
241                        continue;
242                    }
243                }
244
245                // Apply type/category filters
246                if let Some(cat) = options.category {
247                    if signal.category != cat {
248                        continue;
249                    }
250                }
251                if let Some(st) = options.signal_type {
252                    if signal.signal_type != st {
253                        continue;
254                    }
255                }
256
257                signals.push(signal.clone());
258            }
259        }
260
261        // Apply limit
262        if let Some(limit) = options.limit {
263            signals.truncate(limit);
264        }
265
266        signals
267    }
268
269    /// Get trends summary.
270    pub fn get_summary(&self, options: &TrendQueryOptions) -> TrendsSummary {
271        let mut summary = TrendsSummary::default();
272
273        let now = chrono::Utc::now().timestamp_millis();
274        summary.time_range = TimeRange {
275            from: options.from.unwrap_or(now - 3_600_000), // 1 hour ago
276            to: options.to.unwrap_or(now),
277        };
278
279        let mut type_counts: HashMap<SignalType, usize> = HashMap::new();
280
281        for bucket in &self.buckets {
282            // Apply time filters
283            if bucket.timestamp < summary.time_range.from
284                || bucket.timestamp > summary.time_range.to
285            {
286                continue;
287            }
288
289            summary.total_signals += bucket.summary.total_count;
290
291            for (category, cat_summary) in &bucket.summary.by_category {
292                let trend_summary = summary
293                    .by_category
294                    .entry(*category)
295                    .or_insert_with(CategoryTrendSummary::default);
296
297                trend_summary.count += cat_summary.count;
298                trend_summary.unique_values += cat_summary.unique_values.len();
299                trend_summary.unique_entities += cat_summary.unique_entities.len();
300
301                for (signal_type, count) in &cat_summary.by_type {
302                    *type_counts.entry(*signal_type).or_insert(0) += count;
303                }
304            }
305        }
306
307        // Build top signal types
308        let mut sorted_types: Vec<_> = type_counts.into_iter().collect();
309        sorted_types.sort_by(|a, b| b.1.cmp(&a.1));
310
311        summary.top_signal_types = sorted_types
312            .into_iter()
313            .take(10)
314            .map(|(signal_type, count)| TopSignalType {
315                signal_type,
316                category: signal_type.category(),
317                count,
318            })
319            .collect();
320
321        summary
322    }
323
324    /// Get detailed trends by type.
325    pub fn get_trends(&self, options: &TrendQueryOptions) -> Vec<SignalTrend> {
326        let mut trends: HashMap<SignalType, SignalTrend> = HashMap::new();
327
328        for bucket in &self.buckets {
329            // Apply time filters
330            if let Some(from) = options.from {
331                if bucket.end_timestamp < from {
332                    continue;
333                }
334            }
335            if let Some(to) = options.to {
336                if bucket.timestamp > to {
337                    continue;
338                }
339            }
340
341            for (category, cat_summary) in &bucket.summary.by_category {
342                // Apply category filter
343                if let Some(cat) = options.category {
344                    if *category != cat {
345                        continue;
346                    }
347                }
348
349                for (signal_type, count) in &cat_summary.by_type {
350                    // Apply type filter
351                    if let Some(st) = options.signal_type {
352                        if *signal_type != st {
353                            continue;
354                        }
355                    }
356
357                    let trend = trends.entry(*signal_type).or_insert_with(|| SignalTrend {
358                        signal_type: *signal_type,
359                        category: *category,
360                        count: 0,
361                        unique_values: 0,
362                        unique_entities: 0,
363                        first_seen: bucket.timestamp,
364                        last_seen: bucket.timestamp,
365                        histogram: Vec::new(),
366                        change_rate: 0.0,
367                    });
368
369                    trend.count += count;
370                    trend.unique_values += cat_summary.unique_values.len();
371                    trend.unique_entities += cat_summary.unique_entities.len();
372                    trend.first_seen = trend.first_seen.min(bucket.timestamp);
373                    trend.last_seen = trend.last_seen.max(bucket.timestamp);
374
375                    trend.histogram.push(TrendHistogramBucket {
376                        timestamp: bucket.timestamp,
377                        count: *count,
378                        unique_values: cat_summary.unique_values.len(),
379                        unique_entities: cat_summary.unique_entities.len(),
380                    });
381                }
382            }
383        }
384
385        trends.into_values().collect()
386    }
387
388    /// Get statistics.
389    pub fn get_stats(&self) -> TimeStoreStats {
390        TimeStoreStats {
391            bucket_count: self.buckets.len(),
392            total_signals: self.buckets.iter().map(|b| b.summary.total_count).sum(),
393            entity_count: self.entity_index.len(),
394            oldest_bucket: self.buckets.front().map(|b| b.timestamp),
395            newest_bucket: self.buckets.back().map(|b| b.timestamp),
396        }
397    }
398
399    /// Export buckets for persistence.
400    pub fn export(&self) -> Vec<SignalBucketData> {
401        self.buckets.iter().map(|b| b.to_data()).collect()
402    }
403
404    /// Import buckets from persistence.
405    pub fn import(&mut self, buckets: Vec<SignalBucketData>) {
406        self.clear();
407
408        for data in buckets {
409            let mut bucket = SignalBucket::new(data.timestamp, self.config.bucket_size_ms);
410            bucket.end_timestamp = data.end_timestamp;
411            bucket.signals = data.signals;
412            bucket.summary = data.summary;
413            self.buckets.push_back(bucket);
414        }
415
416        self.rebuild_entity_index();
417    }
418
419    /// Clear all data.
420    pub fn clear(&mut self) {
421        self.buckets.clear();
422        self.entity_index.clear();
423        self.current_bucket_idx = None;
424    }
425
426    /// Cleanup old data.
427    pub fn cleanup(&mut self) {
428        let cutoff = chrono::Utc::now().timestamp_millis()
429            - (self.config.retention_hours as i64 * 60 * 60 * 1000);
430
431        while let Some(bucket) = self.buckets.front() {
432            if bucket.end_timestamp < cutoff {
433                self.buckets.pop_front();
434            } else {
435                break;
436            }
437        }
438
439        self.rebuild_entity_index();
440    }
441
442    /// Destroy the store.
443    pub fn destroy(&mut self) {
444        self.clear();
445    }
446}
447
448/// Statistics for the time store.
449#[derive(Debug, Clone, Default)]
450pub struct TimeStoreStats {
451    pub bucket_count: usize,
452    pub total_signals: usize,
453    pub entity_count: usize,
454    pub oldest_bucket: Option<i64>,
455    pub newest_bucket: Option<i64>,
456}
457
458#[cfg(test)]
459mod tests {
460    use super::super::types::SignalCategory;
461    use super::*;
462
463    fn create_test_signal(entity_id: &str, timestamp: i64) -> Signal {
464        Signal {
465            id: uuid::Uuid::new_v4().to_string(),
466            timestamp,
467            category: SignalCategory::Network,
468            signal_type: SignalType::Ip,
469            value: "192.168.1.1".to_string(),
470            entity_id: entity_id.to_string(),
471            session_id: None,
472            metadata: super::super::types::SignalMetadata::default(),
473        }
474    }
475
476    #[test]
477    fn test_record_signal() {
478        let config = TrendsConfig::default();
479        let mut store = TimeStore::new(&config);
480
481        let signal = create_test_signal("entity-1", chrono::Utc::now().timestamp_millis());
482        store.record(signal);
483
484        let stats = store.get_stats();
485        assert_eq!(stats.total_signals, 1);
486        assert_eq!(stats.entity_count, 1);
487    }
488
489    #[test]
490    fn test_get_signals_for_entity() {
491        let config = TrendsConfig::default();
492        let mut store = TimeStore::new(&config);
493
494        let now = chrono::Utc::now().timestamp_millis();
495        store.record(create_test_signal("entity-1", now));
496        store.record(create_test_signal("entity-1", now + 1000));
497        store.record(create_test_signal("entity-2", now + 2000));
498
499        let signals = store.get_signals_for_entity("entity-1", &TrendQueryOptions::default());
500        assert_eq!(signals.len(), 2);
501    }
502
503    #[test]
504    fn test_bucket_eviction() {
505        let mut config = TrendsConfig::default();
506        config.retention_hours = 1;
507        config.bucket_size_ms = 60_000; // 1 minute
508
509        let mut store = TimeStore::new(&config);
510
511        // Add signals to fill buckets
512        let now = chrono::Utc::now().timestamp_millis();
513        for i in 0..100 {
514            store.record(create_test_signal("entity-1", now + i * 60_000));
515        }
516
517        // Should have at most 60 buckets (1 hour / 1 minute)
518        let stats = store.get_stats();
519        assert!(stats.bucket_count <= 60);
520    }
521}