1use std::collections::{HashMap, VecDeque};
4
5use super::config::TrendsConfig;
6use super::types::{
7 BucketSummary, CategoryTrendSummary, Signal, SignalBucketData, SignalTrend, SignalType,
8 TimeRange, TopSignalType, TrendHistogramBucket, TrendQueryOptions, TrendsSummary,
9};
10
11#[derive(Debug, Clone)]
13pub struct SignalBucket {
14 pub timestamp: i64,
16 pub end_timestamp: i64,
18 pub signals: Vec<Signal>,
20 pub summary: BucketSummary,
22}
23
24impl SignalBucket {
25 pub fn new(timestamp: i64, bucket_size_ms: u64) -> Self {
27 Self {
28 timestamp,
29 end_timestamp: timestamp + bucket_size_ms as i64,
30 signals: Vec::new(),
31 summary: BucketSummary::default(),
32 }
33 }
34
35 pub fn add_signal(&mut self, signal: Signal, max_signals: usize) {
37 self.summary.total_count += 1;
39
40 let category_summary = self.summary.by_category.entry(signal.category).or_default();
41
42 category_summary.count += 1;
43 category_summary.unique_values.insert(signal.value.clone());
44 category_summary
45 .unique_entities
46 .insert(signal.entity_id.clone());
47 *category_summary
48 .by_type
49 .entry(signal.signal_type)
50 .or_insert(0) += 1;
51
52 if self.signals.len() < max_signals {
54 self.signals.push(signal);
55 }
56 }
57
58 pub fn to_data(&self) -> SignalBucketData {
60 SignalBucketData {
61 timestamp: self.timestamp,
62 end_timestamp: self.end_timestamp,
63 signals: self.signals.clone(),
64 summary: self.summary.clone(),
65 }
66 }
67}
68
69pub struct TimeStore {
71 config: TrendsConfig,
72 buckets: VecDeque<SignalBucket>,
74 entity_index: HashMap<String, Vec<usize>>,
76 current_bucket_idx: Option<usize>,
78}
79
80impl TimeStore {
81 pub fn new(config: &TrendsConfig) -> Self {
83 Self {
84 config: config.clone(),
85 buckets: VecDeque::with_capacity(config.bucket_count()),
86 entity_index: HashMap::new(),
87 current_bucket_idx: None,
88 }
89 }
90
91 pub fn record(&mut self, signal: Signal) {
93 let bucket_timestamp = self.bucket_timestamp(signal.timestamp);
94
95 let bucket_idx = self.get_or_create_bucket(bucket_timestamp);
97
98 if let Some(bucket) = self.buckets.get_mut(bucket_idx) {
100 bucket.add_signal(signal.clone(), self.config.max_signals_per_bucket);
101
102 self.entity_index
104 .entry(signal.entity_id)
105 .or_default()
106 .push(bucket_idx);
107 }
108
109 self.current_bucket_idx = Some(bucket_idx);
110 }
111
112 fn bucket_timestamp(&self, timestamp: i64) -> i64 {
114 let bucket_size = self.config.bucket_size_ms as i64;
115 (timestamp / bucket_size) * bucket_size
116 }
117
118 fn get_or_create_bucket(&mut self, timestamp: i64) -> usize {
120 for (idx, bucket) in self.buckets.iter().enumerate() {
122 if bucket.timestamp == timestamp {
123 return idx;
124 }
125 }
126
127 let bucket = SignalBucket::new(timestamp, self.config.bucket_size_ms);
129
130 let max_buckets = self.config.bucket_count();
132 while self.buckets.len() >= max_buckets {
133 self.buckets.pop_front();
134 self.rebuild_entity_index();
136 }
137
138 self.buckets.push_back(bucket);
139 self.buckets.len() - 1
140 }
141
142 fn rebuild_entity_index(&mut self) {
144 self.entity_index.clear();
145 for (bucket_idx, bucket) in self.buckets.iter().enumerate() {
146 for signal in &bucket.signals {
147 self.entity_index
148 .entry(signal.entity_id.clone())
149 .or_default()
150 .push(bucket_idx);
151 }
152 }
153 }
154
155 pub fn get_recent_buckets(&self, count: usize) -> Vec<&SignalBucket> {
157 self.buckets.iter().rev().take(count).collect()
158 }
159
160 pub fn get_signals_for_entity(
162 &self,
163 entity_id: &str,
164 options: &TrendQueryOptions,
165 ) -> Vec<Signal> {
166 let mut signals = Vec::new();
167
168 let bucket_indices = match self.entity_index.get(entity_id) {
169 Some(indices) => indices.clone(),
170 None => return signals,
171 };
172
173 let unique_indices: std::collections::HashSet<usize> = bucket_indices.into_iter().collect();
175
176 for idx in unique_indices {
177 if let Some(bucket) = self.buckets.get(idx) {
178 if let Some(from) = options.from {
180 if bucket.end_timestamp < from {
181 continue;
182 }
183 }
184 if let Some(to) = options.to {
185 if bucket.timestamp > to {
186 continue;
187 }
188 }
189
190 for signal in &bucket.signals {
191 if signal.entity_id != entity_id {
192 continue;
193 }
194
195 if let Some(cat) = options.category {
197 if signal.category != cat {
198 continue;
199 }
200 }
201 if let Some(st) = options.signal_type {
202 if signal.signal_type != st {
203 continue;
204 }
205 }
206
207 signals.push(signal.clone());
208 }
209 }
210 }
211
212 if let Some(limit) = options.limit {
214 signals.truncate(limit);
215 }
216
217 signals
218 }
219
220 pub fn get_signals(&self, options: &TrendQueryOptions) -> Vec<Signal> {
222 let mut signals = Vec::new();
223
224 for bucket in &self.buckets {
225 if let Some(from) = options.from {
227 if bucket.end_timestamp < from {
228 continue;
229 }
230 }
231 if let Some(to) = options.to {
232 if bucket.timestamp > to {
233 continue;
234 }
235 }
236
237 for signal in &bucket.signals {
238 if let Some(ref entity_id) = options.entity_id {
240 if &signal.entity_id != entity_id {
241 continue;
242 }
243 }
244
245 if let Some(cat) = options.category {
247 if signal.category != cat {
248 continue;
249 }
250 }
251 if let Some(st) = options.signal_type {
252 if signal.signal_type != st {
253 continue;
254 }
255 }
256
257 signals.push(signal.clone());
258 }
259 }
260
261 if let Some(limit) = options.limit {
263 signals.truncate(limit);
264 }
265
266 signals
267 }
268
269 pub fn get_summary(&self, options: &TrendQueryOptions) -> TrendsSummary {
271 let mut summary = TrendsSummary::default();
272
273 let now = chrono::Utc::now().timestamp_millis();
274 summary.time_range = TimeRange {
275 from: options.from.unwrap_or(now - 3_600_000), to: options.to.unwrap_or(now),
277 };
278
279 let mut type_counts: HashMap<SignalType, usize> = HashMap::new();
280
281 for bucket in &self.buckets {
282 if bucket.timestamp < summary.time_range.from
284 || bucket.timestamp > summary.time_range.to
285 {
286 continue;
287 }
288
289 summary.total_signals += bucket.summary.total_count;
290
291 for (category, cat_summary) in &bucket.summary.by_category {
292 let trend_summary = summary
293 .by_category
294 .entry(*category)
295 .or_insert_with(CategoryTrendSummary::default);
296
297 trend_summary.count += cat_summary.count;
298 trend_summary.unique_values += cat_summary.unique_values.len();
299 trend_summary.unique_entities += cat_summary.unique_entities.len();
300
301 for (signal_type, count) in &cat_summary.by_type {
302 *type_counts.entry(*signal_type).or_insert(0) += count;
303 }
304 }
305 }
306
307 let mut sorted_types: Vec<_> = type_counts.into_iter().collect();
309 sorted_types.sort_by(|a, b| b.1.cmp(&a.1));
310
311 summary.top_signal_types = sorted_types
312 .into_iter()
313 .take(10)
314 .map(|(signal_type, count)| TopSignalType {
315 signal_type,
316 category: signal_type.category(),
317 count,
318 })
319 .collect();
320
321 summary
322 }
323
324 pub fn get_trends(&self, options: &TrendQueryOptions) -> Vec<SignalTrend> {
326 let mut trends: HashMap<SignalType, SignalTrend> = HashMap::new();
327
328 for bucket in &self.buckets {
329 if let Some(from) = options.from {
331 if bucket.end_timestamp < from {
332 continue;
333 }
334 }
335 if let Some(to) = options.to {
336 if bucket.timestamp > to {
337 continue;
338 }
339 }
340
341 for (category, cat_summary) in &bucket.summary.by_category {
342 if let Some(cat) = options.category {
344 if *category != cat {
345 continue;
346 }
347 }
348
349 for (signal_type, count) in &cat_summary.by_type {
350 if let Some(st) = options.signal_type {
352 if *signal_type != st {
353 continue;
354 }
355 }
356
357 let trend = trends.entry(*signal_type).or_insert_with(|| SignalTrend {
358 signal_type: *signal_type,
359 category: *category,
360 count: 0,
361 unique_values: 0,
362 unique_entities: 0,
363 first_seen: bucket.timestamp,
364 last_seen: bucket.timestamp,
365 histogram: Vec::new(),
366 change_rate: 0.0,
367 });
368
369 trend.count += count;
370 trend.unique_values += cat_summary.unique_values.len();
371 trend.unique_entities += cat_summary.unique_entities.len();
372 trend.first_seen = trend.first_seen.min(bucket.timestamp);
373 trend.last_seen = trend.last_seen.max(bucket.timestamp);
374
375 trend.histogram.push(TrendHistogramBucket {
376 timestamp: bucket.timestamp,
377 count: *count,
378 unique_values: cat_summary.unique_values.len(),
379 unique_entities: cat_summary.unique_entities.len(),
380 });
381 }
382 }
383 }
384
385 trends.into_values().collect()
386 }
387
388 pub fn get_stats(&self) -> TimeStoreStats {
390 TimeStoreStats {
391 bucket_count: self.buckets.len(),
392 total_signals: self.buckets.iter().map(|b| b.summary.total_count).sum(),
393 entity_count: self.entity_index.len(),
394 oldest_bucket: self.buckets.front().map(|b| b.timestamp),
395 newest_bucket: self.buckets.back().map(|b| b.timestamp),
396 }
397 }
398
399 pub fn export(&self) -> Vec<SignalBucketData> {
401 self.buckets.iter().map(|b| b.to_data()).collect()
402 }
403
404 pub fn import(&mut self, buckets: Vec<SignalBucketData>) {
406 self.clear();
407
408 for data in buckets {
409 let mut bucket = SignalBucket::new(data.timestamp, self.config.bucket_size_ms);
410 bucket.end_timestamp = data.end_timestamp;
411 bucket.signals = data.signals;
412 bucket.summary = data.summary;
413 self.buckets.push_back(bucket);
414 }
415
416 self.rebuild_entity_index();
417 }
418
419 pub fn clear(&mut self) {
421 self.buckets.clear();
422 self.entity_index.clear();
423 self.current_bucket_idx = None;
424 }
425
426 pub fn cleanup(&mut self) {
428 let cutoff = chrono::Utc::now().timestamp_millis()
429 - (self.config.retention_hours as i64 * 60 * 60 * 1000);
430
431 while let Some(bucket) = self.buckets.front() {
432 if bucket.end_timestamp < cutoff {
433 self.buckets.pop_front();
434 } else {
435 break;
436 }
437 }
438
439 self.rebuild_entity_index();
440 }
441
442 pub fn destroy(&mut self) {
444 self.clear();
445 }
446}
447
448#[derive(Debug, Clone, Default)]
450pub struct TimeStoreStats {
451 pub bucket_count: usize,
452 pub total_signals: usize,
453 pub entity_count: usize,
454 pub oldest_bucket: Option<i64>,
455 pub newest_bucket: Option<i64>,
456}
457
458#[cfg(test)]
459mod tests {
460 use super::super::types::SignalCategory;
461 use super::*;
462
463 fn create_test_signal(entity_id: &str, timestamp: i64) -> Signal {
464 Signal {
465 id: uuid::Uuid::new_v4().to_string(),
466 timestamp,
467 category: SignalCategory::Network,
468 signal_type: SignalType::Ip,
469 value: "192.168.1.1".to_string(),
470 entity_id: entity_id.to_string(),
471 session_id: None,
472 metadata: super::super::types::SignalMetadata::default(),
473 }
474 }
475
476 #[test]
477 fn test_record_signal() {
478 let config = TrendsConfig::default();
479 let mut store = TimeStore::new(&config);
480
481 let signal = create_test_signal("entity-1", chrono::Utc::now().timestamp_millis());
482 store.record(signal);
483
484 let stats = store.get_stats();
485 assert_eq!(stats.total_signals, 1);
486 assert_eq!(stats.entity_count, 1);
487 }
488
489 #[test]
490 fn test_get_signals_for_entity() {
491 let config = TrendsConfig::default();
492 let mut store = TimeStore::new(&config);
493
494 let now = chrono::Utc::now().timestamp_millis();
495 store.record(create_test_signal("entity-1", now));
496 store.record(create_test_signal("entity-1", now + 1000));
497 store.record(create_test_signal("entity-2", now + 2000));
498
499 let signals = store.get_signals_for_entity("entity-1", &TrendQueryOptions::default());
500 assert_eq!(signals.len(), 2);
501 }
502
503 #[test]
504 fn test_bucket_eviction() {
505 let mut config = TrendsConfig::default();
506 config.retention_hours = 1;
507 config.bucket_size_ms = 60_000; let mut store = TimeStore::new(&config);
510
511 let now = chrono::Utc::now().timestamp_millis();
513 for i in 0..100 {
514 store.record(create_test_signal("entity-1", now + i * 60_000));
515 }
516
517 let stats = store.get_stats();
519 assert!(stats.bucket_count <= 60);
520 }
521}