allsource_core/application/services/
analytics.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::store::EventStore;
4use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8/// Time window granularity for analytics
9#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
10#[serde(rename_all = "lowercase")]
11pub enum TimeWindow {
12    Minute,
13    Hour,
14    Day,
15    Week,
16    Month,
17}
18
19impl TimeWindow {
20    pub fn duration(&self) -> Duration {
21        match self {
22            TimeWindow::Minute => Duration::minutes(1),
23            TimeWindow::Hour => Duration::hours(1),
24            TimeWindow::Day => Duration::days(1),
25            TimeWindow::Week => Duration::weeks(1),
26            TimeWindow::Month => Duration::days(30),
27        }
28    }
29
30    pub fn truncate(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
31        match self {
32            TimeWindow::Minute => timestamp
33                .with_second(0)
34                .unwrap()
35                .with_nanosecond(0)
36                .unwrap(),
37            TimeWindow::Hour => timestamp
38                .with_minute(0)
39                .unwrap()
40                .with_second(0)
41                .unwrap()
42                .with_nanosecond(0)
43                .unwrap(),
44            TimeWindow::Day => timestamp
45                .with_hour(0)
46                .unwrap()
47                .with_minute(0)
48                .unwrap()
49                .with_second(0)
50                .unwrap()
51                .with_nanosecond(0)
52                .unwrap(),
53            TimeWindow::Week => {
54                let days_from_monday = timestamp.weekday().num_days_from_monday();
55                (timestamp - Duration::days(days_from_monday as i64))
56                    .with_hour(0)
57                    .unwrap()
58                    .with_minute(0)
59                    .unwrap()
60                    .with_second(0)
61                    .unwrap()
62                    .with_nanosecond(0)
63                    .unwrap()
64            }
65            TimeWindow::Month => timestamp
66                .with_day(1)
67                .unwrap()
68                .with_hour(0)
69                .unwrap()
70                .with_minute(0)
71                .unwrap()
72                .with_second(0)
73                .unwrap()
74                .with_nanosecond(0)
75                .unwrap(),
76        }
77    }
78}
79
80/// Request for event frequency analysis
81#[derive(Debug, Deserialize)]
82pub struct EventFrequencyRequest {
83    /// Filter by entity ID
84    pub entity_id: Option<String>,
85
86    /// Filter by event type
87    pub event_type: Option<String>,
88
89    /// Start time for analysis
90    pub since: DateTime<Utc>,
91
92    /// End time for analysis (defaults to now)
93    pub until: Option<DateTime<Utc>>,
94
95    /// Time window granularity
96    pub window: TimeWindow,
97}
98
99/// Time bucket with event count
100#[derive(Debug, Clone, Serialize)]
101pub struct TimeBucket {
102    pub timestamp: DateTime<Utc>,
103    pub count: usize,
104    pub event_types: HashMap<String, usize>,
105}
106
107/// Response containing time-series frequency data
108#[derive(Debug, Serialize)]
109pub struct EventFrequencyResponse {
110    pub buckets: Vec<TimeBucket>,
111    pub total_events: usize,
112    pub window: TimeWindow,
113    pub time_range: TimeRange,
114}
115
116#[derive(Debug, Serialize)]
117pub struct TimeRange {
118    pub from: DateTime<Utc>,
119    pub to: DateTime<Utc>,
120}
121
122/// Request for statistical summary
123#[derive(Debug, Deserialize)]
124pub struct StatsSummaryRequest {
125    /// Filter by entity ID
126    pub entity_id: Option<String>,
127
128    /// Filter by event type
129    pub event_type: Option<String>,
130
131    /// Start time for analysis
132    pub since: Option<DateTime<Utc>>,
133
134    /// End time for analysis
135    pub until: Option<DateTime<Utc>>,
136}
137
138/// Statistical summary response
139#[derive(Debug, Serialize)]
140pub struct StatsSummaryResponse {
141    pub total_events: usize,
142    pub unique_entities: usize,
143    pub unique_event_types: usize,
144    pub time_range: TimeRange,
145    pub events_per_day: f64,
146    pub top_event_types: Vec<EventTypeCount>,
147    pub top_entities: Vec<EntityCount>,
148    pub first_event: Option<DateTime<Utc>>,
149    pub last_event: Option<DateTime<Utc>>,
150}
151
152#[derive(Debug, Serialize)]
153pub struct EventTypeCount {
154    pub event_type: String,
155    pub count: usize,
156    pub percentage: f64,
157}
158
159#[derive(Debug, Serialize)]
160pub struct EntityCount {
161    pub entity_id: String,
162    pub count: usize,
163    pub percentage: f64,
164}
165
166/// Request for event correlation analysis
167#[derive(Debug, Deserialize)]
168pub struct CorrelationRequest {
169    /// First event type
170    pub event_type_a: String,
171
172    /// Second event type
173    pub event_type_b: String,
174
175    /// Maximum time window to consider events correlated
176    pub time_window_seconds: i64,
177
178    /// Start time for analysis
179    pub since: Option<DateTime<Utc>>,
180
181    /// End time for analysis
182    pub until: Option<DateTime<Utc>>,
183}
184
185/// Correlation analysis response
186#[derive(Debug, Serialize)]
187pub struct CorrelationResponse {
188    pub event_type_a: String,
189    pub event_type_b: String,
190    pub total_a: usize,
191    pub total_b: usize,
192    pub correlated_pairs: usize,
193    pub correlation_percentage: f64,
194    pub avg_time_between_seconds: f64,
195    pub examples: Vec<CorrelationExample>,
196}
197
198#[derive(Debug, Serialize)]
199pub struct CorrelationExample {
200    pub entity_id: String,
201    pub event_a_timestamp: DateTime<Utc>,
202    pub event_b_timestamp: DateTime<Utc>,
203    pub time_between_seconds: i64,
204}
205
206/// Analytics engine for time-series and statistical analysis
207pub struct AnalyticsEngine;
208
209impl AnalyticsEngine {
210    /// Analyze event frequency over time windows
211    pub fn event_frequency(
212        store: &EventStore,
213        request: EventFrequencyRequest,
214    ) -> Result<EventFrequencyResponse> {
215        let until = request.until.unwrap_or_else(Utc::now);
216
217        // Query events in the time range
218        let events = store.query(crate::application::dto::QueryEventsRequest {
219            entity_id: request.entity_id.clone(),
220            event_type: request.event_type.clone(),
221            tenant_id: None,
222            as_of: None,
223            since: Some(request.since),
224            until: Some(until),
225            limit: None,
226        })?;
227
228        if events.is_empty() {
229            return Ok(EventFrequencyResponse {
230                buckets: Vec::new(),
231                total_events: 0,
232                window: request.window,
233                time_range: TimeRange {
234                    from: request.since,
235                    to: until,
236                },
237            });
238        }
239
240        // Create time buckets
241        let mut buckets_map: HashMap<DateTime<Utc>, HashMap<String, usize>> = HashMap::new();
242
243        for event in &events {
244            let bucket_time = request.window.truncate(event.timestamp);
245            let bucket = buckets_map.entry(bucket_time).or_insert_with(HashMap::new);
246            *bucket
247                .entry(event.event_type_str().to_string())
248                .or_insert(0) += 1;
249        }
250
251        // Convert to sorted vector
252        let mut buckets: Vec<TimeBucket> = buckets_map
253            .into_iter()
254            .map(|(timestamp, event_types)| {
255                let count = event_types.values().sum();
256                TimeBucket {
257                    timestamp,
258                    count,
259                    event_types,
260                }
261            })
262            .collect();
263
264        buckets.sort_by_key(|b| b.timestamp);
265
266        // Fill gaps in the timeline
267        let filled_buckets = Self::fill_time_gaps(&buckets, request.since, until, request.window);
268
269        Ok(EventFrequencyResponse {
270            total_events: events.len(),
271            buckets: filled_buckets,
272            window: request.window,
273            time_range: TimeRange {
274                from: request.since,
275                to: until,
276            },
277        })
278    }
279
280    /// Fill gaps in time buckets for continuous timeline
281    fn fill_time_gaps(
282        buckets: &[TimeBucket],
283        start: DateTime<Utc>,
284        end: DateTime<Utc>,
285        window: TimeWindow,
286    ) -> Vec<TimeBucket> {
287        if buckets.is_empty() {
288            return Vec::new();
289        }
290
291        let mut filled = Vec::new();
292        let mut current = window.truncate(start);
293        let end = window.truncate(end);
294
295        let bucket_map: HashMap<DateTime<Utc>, &TimeBucket> =
296            buckets.iter().map(|b| (b.timestamp, b)).collect();
297
298        while current <= end {
299            if let Some(bucket) = bucket_map.get(&current) {
300                filled.push((**bucket).clone());
301            } else {
302                filled.push(TimeBucket {
303                    timestamp: current,
304                    count: 0,
305                    event_types: HashMap::new(),
306                });
307            }
308            current = current + window.duration();
309        }
310
311        filled
312    }
313
314    /// Generate comprehensive statistical summary
315    pub fn stats_summary(
316        store: &EventStore,
317        request: StatsSummaryRequest,
318    ) -> Result<StatsSummaryResponse> {
319        // Query events based on filters
320        let events = store.query(crate::application::dto::QueryEventsRequest {
321            entity_id: request.entity_id.clone(),
322            event_type: request.event_type.clone(),
323            tenant_id: None,
324            as_of: None,
325            since: request.since,
326            until: request.until,
327            limit: None,
328        })?;
329
330        if events.is_empty() {
331            return Err(AllSourceError::ValidationError(
332                "No events found for the specified criteria".to_string(),
333            ));
334        }
335
336        // Calculate statistics
337        let first_event = events.first().map(|e| e.timestamp);
338        let last_event = events.last().map(|e| e.timestamp);
339
340        let mut entity_counts: HashMap<String, usize> = HashMap::new();
341        let mut event_type_counts: HashMap<String, usize> = HashMap::new();
342
343        for event in &events {
344            *entity_counts
345                .entry(event.entity_id_str().to_string())
346                .or_insert(0) += 1;
347            *event_type_counts
348                .entry(event.event_type_str().to_string())
349                .or_insert(0) += 1;
350        }
351
352        // Calculate events per day
353        let time_span = if let (Some(first), Some(last)) = (first_event, last_event) {
354            (last - first).num_days().max(1) as f64
355        } else {
356            1.0
357        };
358
359        let events_per_day = events.len() as f64 / time_span;
360
361        // Top event types
362        let mut top_event_types: Vec<EventTypeCount> = event_type_counts
363            .into_iter()
364            .map(|(event_type, count)| EventTypeCount {
365                event_type,
366                count,
367                percentage: (count as f64 / events.len() as f64) * 100.0,
368            })
369            .collect();
370        top_event_types.sort_by(|a, b| b.count.cmp(&a.count));
371        top_event_types.truncate(10);
372
373        // Top entities
374        let mut top_entities: Vec<EntityCount> = entity_counts
375            .into_iter()
376            .map(|(entity_id, count)| EntityCount {
377                entity_id,
378                count,
379                percentage: (count as f64 / events.len() as f64) * 100.0,
380            })
381            .collect();
382        top_entities.sort_by(|a, b| b.count.cmp(&a.count));
383        top_entities.truncate(10);
384
385        let time_range = TimeRange {
386            from: first_event.unwrap_or_else(Utc::now),
387            to: last_event.unwrap_or_else(Utc::now),
388        };
389
390        Ok(StatsSummaryResponse {
391            total_events: events.len(),
392            unique_entities: top_entities.len(),
393            unique_event_types: top_event_types.len(),
394            time_range,
395            events_per_day,
396            top_event_types,
397            top_entities,
398            first_event,
399            last_event,
400        })
401    }
402
403    /// Analyze correlation between two event types
404    pub fn analyze_correlation(
405        store: &EventStore,
406        request: CorrelationRequest,
407    ) -> Result<CorrelationResponse> {
408        // Query both event types
409        let events_a = store.query(crate::application::dto::QueryEventsRequest {
410            entity_id: None,
411            event_type: Some(request.event_type_a.clone()),
412            tenant_id: None,
413            as_of: None,
414            since: request.since,
415            until: request.until,
416            limit: None,
417        })?;
418
419        let events_b = store.query(crate::application::dto::QueryEventsRequest {
420            entity_id: None,
421            event_type: Some(request.event_type_b.clone()),
422            tenant_id: None,
423            as_of: None,
424            since: request.since,
425            until: request.until,
426            limit: None,
427        })?;
428
429        // Group events by entity
430        let mut entity_events_a: HashMap<String, Vec<&Event>> = HashMap::new();
431        let mut entity_events_b: HashMap<String, Vec<&Event>> = HashMap::new();
432
433        for event in &events_a {
434            entity_events_a
435                .entry(event.entity_id_str().to_string())
436                .or_insert_with(Vec::new)
437                .push(event);
438        }
439
440        for event in &events_b {
441            entity_events_b
442                .entry(event.entity_id_str().to_string())
443                .or_insert_with(Vec::new)
444                .push(event);
445        }
446
447        // Find correlated pairs
448        let mut correlated_pairs = 0;
449        let mut total_time_between = 0i64;
450        let mut examples = Vec::new();
451
452        for (entity_id, a_events) in &entity_events_a {
453            if let Some(b_events) = entity_events_b.get(entity_id) {
454                for a_event in a_events {
455                    for b_event in b_events {
456                        let time_diff = (b_event.timestamp - a_event.timestamp).num_seconds().abs();
457
458                        if time_diff <= request.time_window_seconds {
459                            correlated_pairs += 1;
460                            total_time_between += time_diff;
461
462                            if examples.len() < 5 {
463                                examples.push(CorrelationExample {
464                                    entity_id: entity_id.clone(),
465                                    event_a_timestamp: a_event.timestamp,
466                                    event_b_timestamp: b_event.timestamp,
467                                    time_between_seconds: time_diff,
468                                });
469                            }
470                        }
471                    }
472                }
473            }
474        }
475
476        let correlation_percentage = if !events_a.is_empty() {
477            (correlated_pairs as f64 / events_a.len() as f64) * 100.0
478        } else {
479            0.0
480        };
481
482        let avg_time_between = if correlated_pairs > 0 {
483            total_time_between as f64 / correlated_pairs as f64
484        } else {
485            0.0
486        };
487
488        Ok(CorrelationResponse {
489            event_type_a: request.event_type_a,
490            event_type_b: request.event_type_b,
491            total_a: events_a.len(),
492            total_b: events_b.len(),
493            correlated_pairs,
494            correlation_percentage,
495            avg_time_between_seconds: avg_time_between,
496            examples,
497        })
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn test_time_window_truncation() {
507        let timestamp = chrono::Utc::now();
508
509        let minute_truncated = TimeWindow::Minute.truncate(timestamp);
510        assert_eq!(minute_truncated.second(), 0);
511
512        let hour_truncated = TimeWindow::Hour.truncate(timestamp);
513        assert_eq!(hour_truncated.minute(), 0);
514        assert_eq!(hour_truncated.second(), 0);
515
516        let day_truncated = TimeWindow::Day.truncate(timestamp);
517        assert_eq!(day_truncated.hour(), 0);
518        assert_eq!(day_truncated.minute(), 0);
519    }
520}