allsource_core/
analytics.rs

1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use crate::store::EventStore;
4use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8/// Time window granularity for analytics
9#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
10#[serde(rename_all = "lowercase")]
11pub enum TimeWindow {
12    Minute,
13    Hour,
14    Day,
15    Week,
16    Month,
17}
18
19impl TimeWindow {
20    pub fn duration(&self) -> Duration {
21        match self {
22            TimeWindow::Minute => Duration::minutes(1),
23            TimeWindow::Hour => Duration::hours(1),
24            TimeWindow::Day => Duration::days(1),
25            TimeWindow::Week => Duration::weeks(1),
26            TimeWindow::Month => Duration::days(30),
27        }
28    }
29
30    pub fn truncate(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
31        match self {
32            TimeWindow::Minute => timestamp
33                .with_second(0)
34                .unwrap()
35                .with_nanosecond(0)
36                .unwrap(),
37            TimeWindow::Hour => timestamp
38                .with_minute(0)
39                .unwrap()
40                .with_second(0)
41                .unwrap()
42                .with_nanosecond(0)
43                .unwrap(),
44            TimeWindow::Day => timestamp
45                .with_hour(0)
46                .unwrap()
47                .with_minute(0)
48                .unwrap()
49                .with_second(0)
50                .unwrap()
51                .with_nanosecond(0)
52                .unwrap(),
53            TimeWindow::Week => {
54                let days_from_monday = timestamp.weekday().num_days_from_monday();
55                (timestamp - Duration::days(days_from_monday as i64))
56                    .with_hour(0)
57                    .unwrap()
58                    .with_minute(0)
59                    .unwrap()
60                    .with_second(0)
61                    .unwrap()
62                    .with_nanosecond(0)
63                    .unwrap()
64            }
65            TimeWindow::Month => timestamp
66                .with_day(1)
67                .unwrap()
68                .with_hour(0)
69                .unwrap()
70                .with_minute(0)
71                .unwrap()
72                .with_second(0)
73                .unwrap()
74                .with_nanosecond(0)
75                .unwrap(),
76        }
77    }
78}
79
80/// Request for event frequency analysis
81#[derive(Debug, Deserialize)]
82pub struct EventFrequencyRequest {
83    /// Filter by entity ID
84    pub entity_id: Option<String>,
85
86    /// Filter by event type
87    pub event_type: Option<String>,
88
89    /// Start time for analysis
90    pub since: DateTime<Utc>,
91
92    /// End time for analysis (defaults to now)
93    pub until: Option<DateTime<Utc>>,
94
95    /// Time window granularity
96    pub window: TimeWindow,
97}
98
99/// Time bucket with event count
100#[derive(Debug, Clone, Serialize)]
101pub struct TimeBucket {
102    pub timestamp: DateTime<Utc>,
103    pub count: usize,
104    pub event_types: HashMap<String, usize>,
105}
106
107/// Response containing time-series frequency data
108#[derive(Debug, Serialize)]
109pub struct EventFrequencyResponse {
110    pub buckets: Vec<TimeBucket>,
111    pub total_events: usize,
112    pub window: TimeWindow,
113    pub time_range: TimeRange,
114}
115
116#[derive(Debug, Serialize)]
117pub struct TimeRange {
118    pub from: DateTime<Utc>,
119    pub to: DateTime<Utc>,
120}
121
122/// Request for statistical summary
123#[derive(Debug, Deserialize)]
124pub struct StatsSummaryRequest {
125    /// Filter by entity ID
126    pub entity_id: Option<String>,
127
128    /// Filter by event type
129    pub event_type: Option<String>,
130
131    /// Start time for analysis
132    pub since: Option<DateTime<Utc>>,
133
134    /// End time for analysis
135    pub until: Option<DateTime<Utc>>,
136}
137
138/// Statistical summary response
139#[derive(Debug, Serialize)]
140pub struct StatsSummaryResponse {
141    pub total_events: usize,
142    pub unique_entities: usize,
143    pub unique_event_types: usize,
144    pub time_range: TimeRange,
145    pub events_per_day: f64,
146    pub top_event_types: Vec<EventTypeCount>,
147    pub top_entities: Vec<EntityCount>,
148    pub first_event: Option<DateTime<Utc>>,
149    pub last_event: Option<DateTime<Utc>>,
150}
151
152#[derive(Debug, Serialize)]
153pub struct EventTypeCount {
154    pub event_type: String,
155    pub count: usize,
156    pub percentage: f64,
157}
158
159#[derive(Debug, Serialize)]
160pub struct EntityCount {
161    pub entity_id: String,
162    pub count: usize,
163    pub percentage: f64,
164}
165
166/// Request for event correlation analysis
167#[derive(Debug, Deserialize)]
168pub struct CorrelationRequest {
169    /// First event type
170    pub event_type_a: String,
171
172    /// Second event type
173    pub event_type_b: String,
174
175    /// Maximum time window to consider events correlated
176    pub time_window_seconds: i64,
177
178    /// Start time for analysis
179    pub since: Option<DateTime<Utc>>,
180
181    /// End time for analysis
182    pub until: Option<DateTime<Utc>>,
183}
184
185/// Correlation analysis response
186#[derive(Debug, Serialize)]
187pub struct CorrelationResponse {
188    pub event_type_a: String,
189    pub event_type_b: String,
190    pub total_a: usize,
191    pub total_b: usize,
192    pub correlated_pairs: usize,
193    pub correlation_percentage: f64,
194    pub avg_time_between_seconds: f64,
195    pub examples: Vec<CorrelationExample>,
196}
197
198#[derive(Debug, Serialize)]
199pub struct CorrelationExample {
200    pub entity_id: String,
201    pub event_a_timestamp: DateTime<Utc>,
202    pub event_b_timestamp: DateTime<Utc>,
203    pub time_between_seconds: i64,
204}
205
206/// Analytics engine for time-series and statistical analysis
207pub struct AnalyticsEngine;
208
209impl AnalyticsEngine {
210    /// Analyze event frequency over time windows
211    pub fn event_frequency(
212        store: &EventStore,
213        request: EventFrequencyRequest,
214    ) -> Result<EventFrequencyResponse> {
215        let until = request.until.unwrap_or_else(Utc::now);
216
217        // Query events in the time range
218        let events = store.query(crate::application::dto::QueryEventsRequest {
219            entity_id: request.entity_id.clone(),
220            event_type: request.event_type.clone(),
221            tenant_id: None,
222            as_of: None,
223            since: Some(request.since),
224            until: Some(until),
225            limit: None,
226        })?;
227
228        if events.is_empty() {
229            return Ok(EventFrequencyResponse {
230                buckets: Vec::new(),
231                total_events: 0,
232                window: request.window,
233                time_range: TimeRange {
234                    from: request.since,
235                    to: until,
236                },
237            });
238        }
239
240        // Create time buckets
241        let mut buckets_map: HashMap<DateTime<Utc>, HashMap<String, usize>> = HashMap::new();
242
243        for event in &events {
244            let bucket_time = request.window.truncate(event.timestamp);
245            let bucket = buckets_map.entry(bucket_time).or_insert_with(HashMap::new);
246            *bucket.entry(event.event_type_str().to_string()).or_insert(0) += 1;
247        }
248
249        // Convert to sorted vector
250        let mut buckets: Vec<TimeBucket> = buckets_map
251            .into_iter()
252            .map(|(timestamp, event_types)| {
253                let count = event_types.values().sum();
254                TimeBucket {
255                    timestamp,
256                    count,
257                    event_types,
258                }
259            })
260            .collect();
261
262        buckets.sort_by_key(|b| b.timestamp);
263
264        // Fill gaps in the timeline
265        let filled_buckets = Self::fill_time_gaps(&buckets, request.since, until, request.window);
266
267        Ok(EventFrequencyResponse {
268            total_events: events.len(),
269            buckets: filled_buckets,
270            window: request.window,
271            time_range: TimeRange {
272                from: request.since,
273                to: until,
274            },
275        })
276    }
277
278    /// Fill gaps in time buckets for continuous timeline
279    fn fill_time_gaps(
280        buckets: &[TimeBucket],
281        start: DateTime<Utc>,
282        end: DateTime<Utc>,
283        window: TimeWindow,
284    ) -> Vec<TimeBucket> {
285        if buckets.is_empty() {
286            return Vec::new();
287        }
288
289        let mut filled = Vec::new();
290        let mut current = window.truncate(start);
291        let end = window.truncate(end);
292
293        let bucket_map: HashMap<DateTime<Utc>, &TimeBucket> =
294            buckets.iter().map(|b| (b.timestamp, b)).collect();
295
296        while current <= end {
297            if let Some(bucket) = bucket_map.get(&current) {
298                filled.push((**bucket).clone());
299            } else {
300                filled.push(TimeBucket {
301                    timestamp: current,
302                    count: 0,
303                    event_types: HashMap::new(),
304                });
305            }
306            current = current + window.duration();
307        }
308
309        filled
310    }
311
312    /// Generate comprehensive statistical summary
313    pub fn stats_summary(
314        store: &EventStore,
315        request: StatsSummaryRequest,
316    ) -> Result<StatsSummaryResponse> {
317        // Query events based on filters
318        let events = store.query(crate::application::dto::QueryEventsRequest {
319            entity_id: request.entity_id.clone(),
320            event_type: request.event_type.clone(),
321            tenant_id: None,
322            as_of: None,
323            since: request.since,
324            until: request.until,
325            limit: None,
326        })?;
327
328        if events.is_empty() {
329            return Err(AllSourceError::ValidationError(
330                "No events found for the specified criteria".to_string(),
331            ));
332        }
333
334        // Calculate statistics
335        let first_event = events.first().map(|e| e.timestamp);
336        let last_event = events.last().map(|e| e.timestamp);
337
338        let mut entity_counts: HashMap<String, usize> = HashMap::new();
339        let mut event_type_counts: HashMap<String, usize> = HashMap::new();
340
341        for event in &events {
342            *entity_counts.entry(event.entity_id_str().to_string()).or_insert(0) += 1;
343            *event_type_counts.entry(event.event_type_str().to_string()).or_insert(0) += 1;
344        }
345
346        // Calculate events per day
347        let time_span = if let (Some(first), Some(last)) = (first_event, last_event) {
348            (last - first).num_days().max(1) as f64
349        } else {
350            1.0
351        };
352
353        let events_per_day = events.len() as f64 / time_span;
354
355        // Top event types
356        let mut top_event_types: Vec<EventTypeCount> = event_type_counts
357            .into_iter()
358            .map(|(event_type, count)| EventTypeCount {
359                event_type,
360                count,
361                percentage: (count as f64 / events.len() as f64) * 100.0,
362            })
363            .collect();
364        top_event_types.sort_by(|a, b| b.count.cmp(&a.count));
365        top_event_types.truncate(10);
366
367        // Top entities
368        let mut top_entities: Vec<EntityCount> = entity_counts
369            .into_iter()
370            .map(|(entity_id, count)| EntityCount {
371                entity_id,
372                count,
373                percentage: (count as f64 / events.len() as f64) * 100.0,
374            })
375            .collect();
376        top_entities.sort_by(|a, b| b.count.cmp(&a.count));
377        top_entities.truncate(10);
378
379        let time_range = TimeRange {
380            from: first_event.unwrap_or_else(Utc::now),
381            to: last_event.unwrap_or_else(Utc::now),
382        };
383
384        Ok(StatsSummaryResponse {
385            total_events: events.len(),
386            unique_entities: top_entities.len(),
387            unique_event_types: top_event_types.len(),
388            time_range,
389            events_per_day,
390            top_event_types,
391            top_entities,
392            first_event,
393            last_event,
394        })
395    }
396
397    /// Analyze correlation between two event types
398    pub fn analyze_correlation(
399        store: &EventStore,
400        request: CorrelationRequest,
401    ) -> Result<CorrelationResponse> {
402        // Query both event types
403        let events_a = store.query(crate::application::dto::QueryEventsRequest {
404            entity_id: None,
405            event_type: Some(request.event_type_a.clone()),
406            tenant_id: None,
407            as_of: None,
408            since: request.since,
409            until: request.until,
410            limit: None,
411        })?;
412
413        let events_b = store.query(crate::application::dto::QueryEventsRequest {
414            entity_id: None,
415            event_type: Some(request.event_type_b.clone()),
416            tenant_id: None,
417            as_of: None,
418            since: request.since,
419            until: request.until,
420            limit: None,
421        })?;
422
423        // Group events by entity
424        let mut entity_events_a: HashMap<String, Vec<&Event>> = HashMap::new();
425        let mut entity_events_b: HashMap<String, Vec<&Event>> = HashMap::new();
426
427        for event in &events_a {
428            entity_events_a
429                .entry(event.entity_id_str().to_string())
430                .or_insert_with(Vec::new)
431                .push(event);
432        }
433
434        for event in &events_b {
435            entity_events_b
436                .entry(event.entity_id_str().to_string())
437                .or_insert_with(Vec::new)
438                .push(event);
439        }
440
441        // Find correlated pairs
442        let mut correlated_pairs = 0;
443        let mut total_time_between = 0i64;
444        let mut examples = Vec::new();
445
446        for (entity_id, a_events) in &entity_events_a {
447            if let Some(b_events) = entity_events_b.get(entity_id) {
448                for a_event in a_events {
449                    for b_event in b_events {
450                        let time_diff = (b_event.timestamp - a_event.timestamp).num_seconds().abs();
451
452                        if time_diff <= request.time_window_seconds {
453                            correlated_pairs += 1;
454                            total_time_between += time_diff;
455
456                            if examples.len() < 5 {
457                                examples.push(CorrelationExample {
458                                    entity_id: entity_id.clone(),
459                                    event_a_timestamp: a_event.timestamp,
460                                    event_b_timestamp: b_event.timestamp,
461                                    time_between_seconds: time_diff,
462                                });
463                            }
464                        }
465                    }
466                }
467            }
468        }
469
470        let correlation_percentage = if !events_a.is_empty() {
471            (correlated_pairs as f64 / events_a.len() as f64) * 100.0
472        } else {
473            0.0
474        };
475
476        let avg_time_between = if correlated_pairs > 0 {
477            total_time_between as f64 / correlated_pairs as f64
478        } else {
479            0.0
480        };
481
482        Ok(CorrelationResponse {
483            event_type_a: request.event_type_a,
484            event_type_b: request.event_type_b,
485            total_a: events_a.len(),
486            total_b: events_b.len(),
487            correlated_pairs,
488            correlation_percentage,
489            avg_time_between_seconds: avg_time_between,
490            examples,
491        })
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498
499    #[test]
500    fn test_time_window_truncation() {
501        let timestamp = chrono::Utc::now();
502
503        let minute_truncated = TimeWindow::Minute.truncate(timestamp);
504        assert_eq!(minute_truncated.second(), 0);
505
506        let hour_truncated = TimeWindow::Hour.truncate(timestamp);
507        assert_eq!(hour_truncated.minute(), 0);
508        assert_eq!(hour_truncated.second(), 0);
509
510        let day_truncated = TimeWindow::Day.truncate(timestamp);
511        assert_eq!(day_truncated.hour(), 0);
512        assert_eq!(day_truncated.minute(), 0);
513    }
514}