Skip to main content

allsource_core/application/services/
analytics.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4    store::EventStore,
5};
6use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Time window granularity for analytics
11#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
12#[serde(rename_all = "lowercase")]
13pub enum TimeWindow {
14    Minute,
15    Hour,
16    Day,
17    Week,
18    Month,
19}
20
21impl TimeWindow {
22    pub fn duration(&self) -> Duration {
23        match self {
24            TimeWindow::Minute => Duration::minutes(1),
25            TimeWindow::Hour => Duration::hours(1),
26            TimeWindow::Day => Duration::days(1),
27            TimeWindow::Week => Duration::weeks(1),
28            TimeWindow::Month => Duration::days(30),
29        }
30    }
31
32    pub fn truncate(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
33        match self {
34            TimeWindow::Minute => timestamp
35                .with_second(0)
36                .unwrap()
37                .with_nanosecond(0)
38                .unwrap(),
39            TimeWindow::Hour => timestamp
40                .with_minute(0)
41                .unwrap()
42                .with_second(0)
43                .unwrap()
44                .with_nanosecond(0)
45                .unwrap(),
46            TimeWindow::Day => timestamp
47                .with_hour(0)
48                .unwrap()
49                .with_minute(0)
50                .unwrap()
51                .with_second(0)
52                .unwrap()
53                .with_nanosecond(0)
54                .unwrap(),
55            TimeWindow::Week => {
56                let days_from_monday = timestamp.weekday().num_days_from_monday();
57                (timestamp - Duration::days(days_from_monday as i64))
58                    .with_hour(0)
59                    .unwrap()
60                    .with_minute(0)
61                    .unwrap()
62                    .with_second(0)
63                    .unwrap()
64                    .with_nanosecond(0)
65                    .unwrap()
66            }
67            TimeWindow::Month => timestamp
68                .with_day(1)
69                .unwrap()
70                .with_hour(0)
71                .unwrap()
72                .with_minute(0)
73                .unwrap()
74                .with_second(0)
75                .unwrap()
76                .with_nanosecond(0)
77                .unwrap(),
78        }
79    }
80}
81
82/// Request for event frequency analysis
83#[derive(Debug, Deserialize)]
84pub struct EventFrequencyRequest {
85    /// Filter by entity ID
86    pub entity_id: Option<String>,
87
88    /// Filter by event type
89    pub event_type: Option<String>,
90
91    /// Start time for analysis
92    pub since: DateTime<Utc>,
93
94    /// End time for analysis (defaults to now)
95    pub until: Option<DateTime<Utc>>,
96
97    /// Time window granularity
98    pub window: TimeWindow,
99}
100
101/// Time bucket with event count
102#[derive(Debug, Clone, Serialize)]
103pub struct TimeBucket {
104    pub timestamp: DateTime<Utc>,
105    pub count: usize,
106    pub event_types: HashMap<String, usize>,
107}
108
109/// Response containing time-series frequency data
110#[derive(Debug, Serialize)]
111pub struct EventFrequencyResponse {
112    pub buckets: Vec<TimeBucket>,
113    pub total_events: usize,
114    pub window: TimeWindow,
115    pub time_range: TimeRange,
116}
117
118#[derive(Debug, Serialize)]
119pub struct TimeRange {
120    pub from: DateTime<Utc>,
121    pub to: DateTime<Utc>,
122}
123
124/// Request for statistical summary
125#[derive(Debug, Deserialize)]
126pub struct StatsSummaryRequest {
127    /// Filter by entity ID
128    pub entity_id: Option<String>,
129
130    /// Filter by event type
131    pub event_type: Option<String>,
132
133    /// Start time for analysis
134    pub since: Option<DateTime<Utc>>,
135
136    /// End time for analysis
137    pub until: Option<DateTime<Utc>>,
138}
139
140/// Statistical summary response
141#[derive(Debug, Serialize)]
142pub struct StatsSummaryResponse {
143    pub total_events: usize,
144    pub unique_entities: usize,
145    pub unique_event_types: usize,
146    pub time_range: TimeRange,
147    pub events_per_day: f64,
148    pub top_event_types: Vec<EventTypeCount>,
149    pub top_entities: Vec<EntityCount>,
150    pub first_event: Option<DateTime<Utc>>,
151    pub last_event: Option<DateTime<Utc>>,
152}
153
154#[derive(Debug, Serialize)]
155pub struct EventTypeCount {
156    pub event_type: String,
157    pub count: usize,
158    pub percentage: f64,
159}
160
161#[derive(Debug, Serialize)]
162pub struct EntityCount {
163    pub entity_id: String,
164    pub count: usize,
165    pub percentage: f64,
166}
167
168/// Request for event correlation analysis
169#[derive(Debug, Deserialize)]
170pub struct CorrelationRequest {
171    /// First event type
172    pub event_type_a: String,
173
174    /// Second event type
175    pub event_type_b: String,
176
177    /// Maximum time window to consider events correlated
178    pub time_window_seconds: i64,
179
180    /// Start time for analysis
181    pub since: Option<DateTime<Utc>>,
182
183    /// End time for analysis
184    pub until: Option<DateTime<Utc>>,
185}
186
187/// Correlation analysis response
188#[derive(Debug, Serialize)]
189pub struct CorrelationResponse {
190    pub event_type_a: String,
191    pub event_type_b: String,
192    pub total_a: usize,
193    pub total_b: usize,
194    pub correlated_pairs: usize,
195    pub correlation_percentage: f64,
196    pub avg_time_between_seconds: f64,
197    pub examples: Vec<CorrelationExample>,
198}
199
200#[derive(Debug, Serialize)]
201pub struct CorrelationExample {
202    pub entity_id: String,
203    pub event_a_timestamp: DateTime<Utc>,
204    pub event_b_timestamp: DateTime<Utc>,
205    pub time_between_seconds: i64,
206}
207
208/// Analytics engine for time-series and statistical analysis
209pub struct AnalyticsEngine;
210
211impl AnalyticsEngine {
212    /// Analyze event frequency over time windows
213    pub fn event_frequency(
214        store: &EventStore,
215        request: EventFrequencyRequest,
216    ) -> Result<EventFrequencyResponse> {
217        let until = request.until.unwrap_or_else(Utc::now);
218
219        // Query events in the time range
220        let events = store.query(crate::application::dto::QueryEventsRequest {
221            entity_id: request.entity_id.clone(),
222            event_type: request.event_type.clone(),
223            tenant_id: None,
224            as_of: None,
225            since: Some(request.since),
226            until: Some(until),
227            limit: None,
228        })?;
229
230        if events.is_empty() {
231            return Ok(EventFrequencyResponse {
232                buckets: Vec::new(),
233                total_events: 0,
234                window: request.window,
235                time_range: TimeRange {
236                    from: request.since,
237                    to: until,
238                },
239            });
240        }
241
242        // Create time buckets
243        let mut buckets_map: HashMap<DateTime<Utc>, HashMap<String, usize>> = HashMap::new();
244
245        for event in &events {
246            let bucket_time = request.window.truncate(event.timestamp);
247            let bucket = buckets_map.entry(bucket_time).or_default();
248            *bucket
249                .entry(event.event_type_str().to_string())
250                .or_insert(0) += 1;
251        }
252
253        // Convert to sorted vector
254        let mut buckets: Vec<TimeBucket> = buckets_map
255            .into_iter()
256            .map(|(timestamp, event_types)| {
257                let count = event_types.values().sum();
258                TimeBucket {
259                    timestamp,
260                    count,
261                    event_types,
262                }
263            })
264            .collect();
265
266        buckets.sort_by_key(|b| b.timestamp);
267
268        // Fill gaps in the timeline
269        let filled_buckets = Self::fill_time_gaps(&buckets, request.since, until, request.window);
270
271        Ok(EventFrequencyResponse {
272            total_events: events.len(),
273            buckets: filled_buckets,
274            window: request.window,
275            time_range: TimeRange {
276                from: request.since,
277                to: until,
278            },
279        })
280    }
281
282    /// Fill gaps in time buckets for continuous timeline
283    fn fill_time_gaps(
284        buckets: &[TimeBucket],
285        start: DateTime<Utc>,
286        end: DateTime<Utc>,
287        window: TimeWindow,
288    ) -> Vec<TimeBucket> {
289        if buckets.is_empty() {
290            return Vec::new();
291        }
292
293        let mut filled = Vec::new();
294        let mut current = window.truncate(start);
295        let end = window.truncate(end);
296
297        let bucket_map: HashMap<DateTime<Utc>, &TimeBucket> =
298            buckets.iter().map(|b| (b.timestamp, b)).collect();
299
300        while current <= end {
301            if let Some(bucket) = bucket_map.get(&current) {
302                filled.push((**bucket).clone());
303            } else {
304                filled.push(TimeBucket {
305                    timestamp: current,
306                    count: 0,
307                    event_types: HashMap::new(),
308                });
309            }
310            current += window.duration();
311        }
312
313        filled
314    }
315
316    /// Generate comprehensive statistical summary
317    pub fn stats_summary(
318        store: &EventStore,
319        request: StatsSummaryRequest,
320    ) -> Result<StatsSummaryResponse> {
321        // Query events based on filters
322        let events = store.query(crate::application::dto::QueryEventsRequest {
323            entity_id: request.entity_id.clone(),
324            event_type: request.event_type.clone(),
325            tenant_id: None,
326            as_of: None,
327            since: request.since,
328            until: request.until,
329            limit: None,
330        })?;
331
332        if events.is_empty() {
333            return Err(AllSourceError::ValidationError(
334                "No events found for the specified criteria".to_string(),
335            ));
336        }
337
338        // Calculate statistics
339        let first_event = events.first().map(|e| e.timestamp);
340        let last_event = events.last().map(|e| e.timestamp);
341
342        let mut entity_counts: HashMap<String, usize> = HashMap::new();
343        let mut event_type_counts: HashMap<String, usize> = HashMap::new();
344
345        for event in &events {
346            *entity_counts
347                .entry(event.entity_id_str().to_string())
348                .or_insert(0) += 1;
349            *event_type_counts
350                .entry(event.event_type_str().to_string())
351                .or_insert(0) += 1;
352        }
353
354        // Calculate events per day
355        let time_span = if let (Some(first), Some(last)) = (first_event, last_event) {
356            (last - first).num_days().max(1) as f64
357        } else {
358            1.0
359        };
360
361        let events_per_day = events.len() as f64 / time_span;
362
363        // Top event types
364        let mut top_event_types: Vec<EventTypeCount> = event_type_counts
365            .into_iter()
366            .map(|(event_type, count)| EventTypeCount {
367                event_type,
368                count,
369                percentage: (count as f64 / events.len() as f64) * 100.0,
370            })
371            .collect();
372        top_event_types.sort_by_key(|x| std::cmp::Reverse(x.count));
373        top_event_types.truncate(10);
374
375        // Top entities
376        let mut top_entities: Vec<EntityCount> = entity_counts
377            .into_iter()
378            .map(|(entity_id, count)| EntityCount {
379                entity_id,
380                count,
381                percentage: (count as f64 / events.len() as f64) * 100.0,
382            })
383            .collect();
384        top_entities.sort_by_key(|x| std::cmp::Reverse(x.count));
385        top_entities.truncate(10);
386
387        let time_range = TimeRange {
388            from: first_event.unwrap_or_else(Utc::now),
389            to: last_event.unwrap_or_else(Utc::now),
390        };
391
392        Ok(StatsSummaryResponse {
393            total_events: events.len(),
394            unique_entities: top_entities.len(),
395            unique_event_types: top_event_types.len(),
396            time_range,
397            events_per_day,
398            top_event_types,
399            top_entities,
400            first_event,
401            last_event,
402        })
403    }
404
405    /// Analyze correlation between two event types
406    pub fn analyze_correlation(
407        store: &EventStore,
408        request: CorrelationRequest,
409    ) -> Result<CorrelationResponse> {
410        // Query both event types
411        let events_a = store.query(crate::application::dto::QueryEventsRequest {
412            entity_id: None,
413            event_type: Some(request.event_type_a.clone()),
414            tenant_id: None,
415            as_of: None,
416            since: request.since,
417            until: request.until,
418            limit: None,
419        })?;
420
421        let events_b = store.query(crate::application::dto::QueryEventsRequest {
422            entity_id: None,
423            event_type: Some(request.event_type_b.clone()),
424            tenant_id: None,
425            as_of: None,
426            since: request.since,
427            until: request.until,
428            limit: None,
429        })?;
430
431        // Group events by entity
432        let mut entity_events_a: HashMap<String, Vec<&Event>> = HashMap::new();
433        let mut entity_events_b: HashMap<String, Vec<&Event>> = HashMap::new();
434
435        for event in &events_a {
436            entity_events_a
437                .entry(event.entity_id_str().to_string())
438                .or_default()
439                .push(event);
440        }
441
442        for event in &events_b {
443            entity_events_b
444                .entry(event.entity_id_str().to_string())
445                .or_default()
446                .push(event);
447        }
448
449        // Find correlated pairs
450        let mut correlated_pairs = 0;
451        let mut total_time_between = 0i64;
452        let mut examples = Vec::new();
453
454        for (entity_id, a_events) in &entity_events_a {
455            if let Some(b_events) = entity_events_b.get(entity_id) {
456                for a_event in a_events {
457                    for b_event in b_events {
458                        let time_diff = (b_event.timestamp - a_event.timestamp).num_seconds().abs();
459
460                        if time_diff <= request.time_window_seconds {
461                            correlated_pairs += 1;
462                            total_time_between += time_diff;
463
464                            if examples.len() < 5 {
465                                examples.push(CorrelationExample {
466                                    entity_id: entity_id.clone(),
467                                    event_a_timestamp: a_event.timestamp,
468                                    event_b_timestamp: b_event.timestamp,
469                                    time_between_seconds: time_diff,
470                                });
471                            }
472                        }
473                    }
474                }
475            }
476        }
477
478        let correlation_percentage = if !events_a.is_empty() {
479            (correlated_pairs as f64 / events_a.len() as f64) * 100.0
480        } else {
481            0.0
482        };
483
484        let avg_time_between = if correlated_pairs > 0 {
485            total_time_between as f64 / correlated_pairs as f64
486        } else {
487            0.0
488        };
489
490        Ok(CorrelationResponse {
491            event_type_a: request.event_type_a,
492            event_type_b: request.event_type_b,
493            total_a: events_a.len(),
494            total_b: events_b.len(),
495            correlated_pairs,
496            correlation_percentage,
497            avg_time_between_seconds: avg_time_between,
498            examples,
499        })
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    #[test]
508    fn test_time_window_truncation() {
509        let timestamp = chrono::Utc::now();
510
511        let minute_truncated = TimeWindow::Minute.truncate(timestamp);
512        assert_eq!(minute_truncated.second(), 0);
513
514        let hour_truncated = TimeWindow::Hour.truncate(timestamp);
515        assert_eq!(hour_truncated.minute(), 0);
516        assert_eq!(hour_truncated.second(), 0);
517
518        let day_truncated = TimeWindow::Day.truncate(timestamp);
519        assert_eq!(day_truncated.hour(), 0);
520        assert_eq!(day_truncated.minute(), 0);
521    }
522}