Skip to main content

allsource_core/application/services/
analytics.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4    store::EventStore,
5};
6use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Time window granularity for analytics
11#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
12#[serde(rename_all = "lowercase")]
13pub enum TimeWindow {
14    Minute,
15    Hour,
16    Day,
17    Week,
18    Month,
19}
20
21impl TimeWindow {
22    pub fn duration(&self) -> Duration {
23        match self {
24            TimeWindow::Minute => Duration::minutes(1),
25            TimeWindow::Hour => Duration::hours(1),
26            TimeWindow::Day => Duration::days(1),
27            TimeWindow::Week => Duration::weeks(1),
28            TimeWindow::Month => Duration::days(30),
29        }
30    }
31
32    pub fn truncate(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
33        match self {
34            TimeWindow::Minute => timestamp
35                .with_second(0)
36                .unwrap()
37                .with_nanosecond(0)
38                .unwrap(),
39            TimeWindow::Hour => timestamp
40                .with_minute(0)
41                .unwrap()
42                .with_second(0)
43                .unwrap()
44                .with_nanosecond(0)
45                .unwrap(),
46            TimeWindow::Day => timestamp
47                .with_hour(0)
48                .unwrap()
49                .with_minute(0)
50                .unwrap()
51                .with_second(0)
52                .unwrap()
53                .with_nanosecond(0)
54                .unwrap(),
55            TimeWindow::Week => {
56                let days_from_monday = timestamp.weekday().num_days_from_monday();
57                (timestamp - Duration::days(days_from_monday as i64))
58                    .with_hour(0)
59                    .unwrap()
60                    .with_minute(0)
61                    .unwrap()
62                    .with_second(0)
63                    .unwrap()
64                    .with_nanosecond(0)
65                    .unwrap()
66            }
67            TimeWindow::Month => timestamp
68                .with_day(1)
69                .unwrap()
70                .with_hour(0)
71                .unwrap()
72                .with_minute(0)
73                .unwrap()
74                .with_second(0)
75                .unwrap()
76                .with_nanosecond(0)
77                .unwrap(),
78        }
79    }
80}
81
82/// Request for event frequency analysis
83#[derive(Debug, Deserialize)]
84pub struct EventFrequencyRequest {
85    /// Filter by entity ID
86    pub entity_id: Option<String>,
87
88    /// Filter by event type
89    pub event_type: Option<String>,
90
91    /// Start time for analysis
92    pub since: DateTime<Utc>,
93
94    /// End time for analysis (defaults to now)
95    pub until: Option<DateTime<Utc>>,
96
97    /// Time window granularity
98    pub window: TimeWindow,
99}
100
101/// Time bucket with event count
102#[derive(Debug, Clone, Serialize)]
103pub struct TimeBucket {
104    pub timestamp: DateTime<Utc>,
105    pub count: usize,
106    pub event_types: HashMap<String, usize>,
107}
108
109/// Response containing time-series frequency data
110#[derive(Debug, Serialize)]
111pub struct EventFrequencyResponse {
112    pub buckets: Vec<TimeBucket>,
113    pub total_events: usize,
114    pub window: TimeWindow,
115    pub time_range: TimeRange,
116}
117
118#[derive(Debug, Serialize)]
119pub struct TimeRange {
120    pub from: DateTime<Utc>,
121    pub to: DateTime<Utc>,
122}
123
124/// Request for statistical summary
125#[derive(Debug, Deserialize)]
126pub struct StatsSummaryRequest {
127    /// Filter by entity ID
128    pub entity_id: Option<String>,
129
130    /// Filter by event type
131    pub event_type: Option<String>,
132
133    /// Start time for analysis
134    pub since: Option<DateTime<Utc>>,
135
136    /// End time for analysis
137    pub until: Option<DateTime<Utc>>,
138}
139
140/// Statistical summary response
141#[derive(Debug, Serialize)]
142pub struct StatsSummaryResponse {
143    pub total_events: usize,
144    pub unique_entities: usize,
145    pub unique_event_types: usize,
146    pub time_range: TimeRange,
147    pub events_per_day: f64,
148    pub top_event_types: Vec<EventTypeCount>,
149    pub top_entities: Vec<EntityCount>,
150    pub first_event: Option<DateTime<Utc>>,
151    pub last_event: Option<DateTime<Utc>>,
152}
153
154#[derive(Debug, Serialize)]
155pub struct EventTypeCount {
156    pub event_type: String,
157    pub count: usize,
158    pub percentage: f64,
159}
160
161#[derive(Debug, Serialize)]
162pub struct EntityCount {
163    pub entity_id: String,
164    pub count: usize,
165    pub percentage: f64,
166}
167
168/// Request for event correlation analysis
169#[derive(Debug, Deserialize)]
170pub struct CorrelationRequest {
171    /// First event type
172    pub event_type_a: String,
173
174    /// Second event type
175    pub event_type_b: String,
176
177    /// Maximum time window to consider events correlated
178    pub time_window_seconds: i64,
179
180    /// Start time for analysis
181    pub since: Option<DateTime<Utc>>,
182
183    /// End time for analysis
184    pub until: Option<DateTime<Utc>>,
185}
186
187/// Correlation analysis response
188#[derive(Debug, Serialize)]
189pub struct CorrelationResponse {
190    pub event_type_a: String,
191    pub event_type_b: String,
192    pub total_a: usize,
193    pub total_b: usize,
194    pub correlated_pairs: usize,
195    pub correlation_percentage: f64,
196    pub avg_time_between_seconds: f64,
197    pub examples: Vec<CorrelationExample>,
198}
199
200#[derive(Debug, Serialize)]
201pub struct CorrelationExample {
202    pub entity_id: String,
203    pub event_a_timestamp: DateTime<Utc>,
204    pub event_b_timestamp: DateTime<Utc>,
205    pub time_between_seconds: i64,
206}
207
208/// Analytics engine for time-series and statistical analysis
209pub struct AnalyticsEngine;
210
211impl AnalyticsEngine {
212    /// Analyze event frequency over time windows
213    pub fn event_frequency(
214        store: &EventStore,
215        request: EventFrequencyRequest,
216    ) -> Result<EventFrequencyResponse> {
217        let until = request.until.unwrap_or_else(Utc::now);
218
219        // Query events in the time range
220        let events = store.query(crate::application::dto::QueryEventsRequest {
221            entity_id: request.entity_id.clone(),
222            event_type: request.event_type.clone(),
223            tenant_id: None,
224            as_of: None,
225            since: Some(request.since),
226            until: Some(until),
227            limit: None,
228            event_type_prefix: None,
229            payload_filter: None,
230        })?;
231
232        if events.is_empty() {
233            return Ok(EventFrequencyResponse {
234                buckets: Vec::new(),
235                total_events: 0,
236                window: request.window,
237                time_range: TimeRange {
238                    from: request.since,
239                    to: until,
240                },
241            });
242        }
243
244        // Create time buckets
245        let mut buckets_map: HashMap<DateTime<Utc>, HashMap<String, usize>> = HashMap::new();
246
247        for event in &events {
248            let bucket_time = request.window.truncate(event.timestamp);
249            let bucket = buckets_map.entry(bucket_time).or_default();
250            *bucket
251                .entry(event.event_type_str().to_string())
252                .or_insert(0) += 1;
253        }
254
255        // Convert to sorted vector
256        let mut buckets: Vec<TimeBucket> = buckets_map
257            .into_iter()
258            .map(|(timestamp, event_types)| {
259                let count = event_types.values().sum();
260                TimeBucket {
261                    timestamp,
262                    count,
263                    event_types,
264                }
265            })
266            .collect();
267
268        buckets.sort_by_key(|b| b.timestamp);
269
270        // Fill gaps in the timeline
271        let filled_buckets = Self::fill_time_gaps(&buckets, request.since, until, request.window);
272
273        Ok(EventFrequencyResponse {
274            total_events: events.len(),
275            buckets: filled_buckets,
276            window: request.window,
277            time_range: TimeRange {
278                from: request.since,
279                to: until,
280            },
281        })
282    }
283
284    /// Fill gaps in time buckets for continuous timeline
285    fn fill_time_gaps(
286        buckets: &[TimeBucket],
287        start: DateTime<Utc>,
288        end: DateTime<Utc>,
289        window: TimeWindow,
290    ) -> Vec<TimeBucket> {
291        if buckets.is_empty() {
292            return Vec::new();
293        }
294
295        let mut filled = Vec::new();
296        let mut current = window.truncate(start);
297        let end = window.truncate(end);
298
299        let bucket_map: HashMap<DateTime<Utc>, &TimeBucket> =
300            buckets.iter().map(|b| (b.timestamp, b)).collect();
301
302        while current <= end {
303            if let Some(bucket) = bucket_map.get(&current) {
304                filled.push((**bucket).clone());
305            } else {
306                filled.push(TimeBucket {
307                    timestamp: current,
308                    count: 0,
309                    event_types: HashMap::new(),
310                });
311            }
312            current += window.duration();
313        }
314
315        filled
316    }
317
318    /// Generate comprehensive statistical summary
319    pub fn stats_summary(
320        store: &EventStore,
321        request: StatsSummaryRequest,
322    ) -> Result<StatsSummaryResponse> {
323        // Query events based on filters
324        let events = store.query(crate::application::dto::QueryEventsRequest {
325            entity_id: request.entity_id.clone(),
326            event_type: request.event_type.clone(),
327            tenant_id: None,
328            as_of: None,
329            since: request.since,
330            until: request.until,
331            limit: None,
332            event_type_prefix: None,
333            payload_filter: None,
334        })?;
335
336        if events.is_empty() {
337            return Err(AllSourceError::ValidationError(
338                "No events found for the specified criteria".to_string(),
339            ));
340        }
341
342        // Calculate statistics
343        let first_event = events.first().map(|e| e.timestamp);
344        let last_event = events.last().map(|e| e.timestamp);
345
346        let mut entity_counts: HashMap<String, usize> = HashMap::new();
347        let mut event_type_counts: HashMap<String, usize> = HashMap::new();
348
349        for event in &events {
350            *entity_counts
351                .entry(event.entity_id_str().to_string())
352                .or_insert(0) += 1;
353            *event_type_counts
354                .entry(event.event_type_str().to_string())
355                .or_insert(0) += 1;
356        }
357
358        // Calculate events per day
359        let time_span = if let (Some(first), Some(last)) = (first_event, last_event) {
360            (last - first).num_days().max(1) as f64
361        } else {
362            1.0
363        };
364
365        let events_per_day = events.len() as f64 / time_span;
366
367        // Top event types
368        let mut top_event_types: Vec<EventTypeCount> = event_type_counts
369            .into_iter()
370            .map(|(event_type, count)| EventTypeCount {
371                event_type,
372                count,
373                percentage: (count as f64 / events.len() as f64) * 100.0,
374            })
375            .collect();
376        top_event_types.sort_by_key(|x| std::cmp::Reverse(x.count));
377        top_event_types.truncate(10);
378
379        // Top entities
380        let mut top_entities: Vec<EntityCount> = entity_counts
381            .into_iter()
382            .map(|(entity_id, count)| EntityCount {
383                entity_id,
384                count,
385                percentage: (count as f64 / events.len() as f64) * 100.0,
386            })
387            .collect();
388        top_entities.sort_by_key(|x| std::cmp::Reverse(x.count));
389        top_entities.truncate(10);
390
391        let time_range = TimeRange {
392            from: first_event.unwrap_or_else(Utc::now),
393            to: last_event.unwrap_or_else(Utc::now),
394        };
395
396        Ok(StatsSummaryResponse {
397            total_events: events.len(),
398            unique_entities: top_entities.len(),
399            unique_event_types: top_event_types.len(),
400            time_range,
401            events_per_day,
402            top_event_types,
403            top_entities,
404            first_event,
405            last_event,
406        })
407    }
408
409    /// Analyze correlation between two event types
410    pub fn analyze_correlation(
411        store: &EventStore,
412        request: CorrelationRequest,
413    ) -> Result<CorrelationResponse> {
414        // Query both event types
415        let events_a = store.query(crate::application::dto::QueryEventsRequest {
416            entity_id: None,
417            event_type: Some(request.event_type_a.clone()),
418            tenant_id: None,
419            as_of: None,
420            since: request.since,
421            until: request.until,
422            limit: None,
423            event_type_prefix: None,
424            payload_filter: None,
425        })?;
426
427        let events_b = store.query(crate::application::dto::QueryEventsRequest {
428            entity_id: None,
429            event_type: Some(request.event_type_b.clone()),
430            tenant_id: None,
431            as_of: None,
432            since: request.since,
433            until: request.until,
434            limit: None,
435            event_type_prefix: None,
436            payload_filter: None,
437        })?;
438
439        // Group events by entity
440        let mut entity_events_a: HashMap<String, Vec<&Event>> = HashMap::new();
441        let mut entity_events_b: HashMap<String, Vec<&Event>> = HashMap::new();
442
443        for event in &events_a {
444            entity_events_a
445                .entry(event.entity_id_str().to_string())
446                .or_default()
447                .push(event);
448        }
449
450        for event in &events_b {
451            entity_events_b
452                .entry(event.entity_id_str().to_string())
453                .or_default()
454                .push(event);
455        }
456
457        // Find correlated pairs
458        let mut correlated_pairs = 0;
459        let mut total_time_between = 0i64;
460        let mut examples = Vec::new();
461
462        for (entity_id, a_events) in &entity_events_a {
463            if let Some(b_events) = entity_events_b.get(entity_id) {
464                for a_event in a_events {
465                    for b_event in b_events {
466                        let time_diff = (b_event.timestamp - a_event.timestamp).num_seconds().abs();
467
468                        if time_diff <= request.time_window_seconds {
469                            correlated_pairs += 1;
470                            total_time_between += time_diff;
471
472                            if examples.len() < 5 {
473                                examples.push(CorrelationExample {
474                                    entity_id: entity_id.clone(),
475                                    event_a_timestamp: a_event.timestamp,
476                                    event_b_timestamp: b_event.timestamp,
477                                    time_between_seconds: time_diff,
478                                });
479                            }
480                        }
481                    }
482                }
483            }
484        }
485
486        let correlation_percentage = if !events_a.is_empty() {
487            (correlated_pairs as f64 / events_a.len() as f64) * 100.0
488        } else {
489            0.0
490        };
491
492        let avg_time_between = if correlated_pairs > 0 {
493            total_time_between as f64 / correlated_pairs as f64
494        } else {
495            0.0
496        };
497
498        Ok(CorrelationResponse {
499            event_type_a: request.event_type_a,
500            event_type_b: request.event_type_b,
501            total_a: events_a.len(),
502            total_b: events_b.len(),
503            correlated_pairs,
504            correlation_percentage,
505            avg_time_between_seconds: avg_time_between,
506            examples,
507        })
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::*;
514
515    #[test]
516    fn test_time_window_truncation() {
517        let timestamp = chrono::Utc::now();
518
519        let minute_truncated = TimeWindow::Minute.truncate(timestamp);
520        assert_eq!(minute_truncated.second(), 0);
521
522        let hour_truncated = TimeWindow::Hour.truncate(timestamp);
523        assert_eq!(hour_truncated.minute(), 0);
524        assert_eq!(hour_truncated.second(), 0);
525
526        let day_truncated = TimeWindow::Day.truncate(timestamp);
527        assert_eq!(day_truncated.hour(), 0);
528        assert_eq!(day_truncated.minute(), 0);
529    }
530}