1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4 store::EventStore,
5};
6use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
12#[serde(rename_all = "lowercase")]
13pub enum TimeWindow {
14 Minute,
15 Hour,
16 Day,
17 Week,
18 Month,
19}
20
21impl TimeWindow {
22 pub fn duration(&self) -> Duration {
23 match self {
24 TimeWindow::Minute => Duration::minutes(1),
25 TimeWindow::Hour => Duration::hours(1),
26 TimeWindow::Day => Duration::days(1),
27 TimeWindow::Week => Duration::weeks(1),
28 TimeWindow::Month => Duration::days(30),
29 }
30 }
31
32 pub fn truncate(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
33 match self {
34 TimeWindow::Minute => timestamp
35 .with_second(0)
36 .unwrap()
37 .with_nanosecond(0)
38 .unwrap(),
39 TimeWindow::Hour => timestamp
40 .with_minute(0)
41 .unwrap()
42 .with_second(0)
43 .unwrap()
44 .with_nanosecond(0)
45 .unwrap(),
46 TimeWindow::Day => timestamp
47 .with_hour(0)
48 .unwrap()
49 .with_minute(0)
50 .unwrap()
51 .with_second(0)
52 .unwrap()
53 .with_nanosecond(0)
54 .unwrap(),
55 TimeWindow::Week => {
56 let days_from_monday = timestamp.weekday().num_days_from_monday();
57 (timestamp - Duration::days(days_from_monday as i64))
58 .with_hour(0)
59 .unwrap()
60 .with_minute(0)
61 .unwrap()
62 .with_second(0)
63 .unwrap()
64 .with_nanosecond(0)
65 .unwrap()
66 }
67 TimeWindow::Month => timestamp
68 .with_day(1)
69 .unwrap()
70 .with_hour(0)
71 .unwrap()
72 .with_minute(0)
73 .unwrap()
74 .with_second(0)
75 .unwrap()
76 .with_nanosecond(0)
77 .unwrap(),
78 }
79 }
80}
81
82#[derive(Debug, Deserialize)]
84pub struct EventFrequencyRequest {
85 pub entity_id: Option<String>,
87
88 pub event_type: Option<String>,
90
91 pub since: DateTime<Utc>,
93
94 pub until: Option<DateTime<Utc>>,
96
97 pub window: TimeWindow,
99}
100
101#[derive(Debug, Clone, Serialize)]
103pub struct TimeBucket {
104 pub timestamp: DateTime<Utc>,
105 pub count: usize,
106 pub event_types: HashMap<String, usize>,
107}
108
109#[derive(Debug, Serialize)]
111pub struct EventFrequencyResponse {
112 pub buckets: Vec<TimeBucket>,
113 pub total_events: usize,
114 pub window: TimeWindow,
115 pub time_range: TimeRange,
116}
117
118#[derive(Debug, Serialize)]
119pub struct TimeRange {
120 pub from: DateTime<Utc>,
121 pub to: DateTime<Utc>,
122}
123
124#[derive(Debug, Deserialize)]
126pub struct StatsSummaryRequest {
127 pub entity_id: Option<String>,
129
130 pub event_type: Option<String>,
132
133 pub since: Option<DateTime<Utc>>,
135
136 pub until: Option<DateTime<Utc>>,
138}
139
140#[derive(Debug, Serialize)]
142pub struct StatsSummaryResponse {
143 pub total_events: usize,
144 pub unique_entities: usize,
145 pub unique_event_types: usize,
146 pub time_range: TimeRange,
147 pub events_per_day: f64,
148 pub top_event_types: Vec<EventTypeCount>,
149 pub top_entities: Vec<EntityCount>,
150 pub first_event: Option<DateTime<Utc>>,
151 pub last_event: Option<DateTime<Utc>>,
152}
153
154#[derive(Debug, Serialize)]
155pub struct EventTypeCount {
156 pub event_type: String,
157 pub count: usize,
158 pub percentage: f64,
159}
160
161#[derive(Debug, Serialize)]
162pub struct EntityCount {
163 pub entity_id: String,
164 pub count: usize,
165 pub percentage: f64,
166}
167
168#[derive(Debug, Deserialize)]
170pub struct CorrelationRequest {
171 pub event_type_a: String,
173
174 pub event_type_b: String,
176
177 pub time_window_seconds: i64,
179
180 pub since: Option<DateTime<Utc>>,
182
183 pub until: Option<DateTime<Utc>>,
185}
186
187#[derive(Debug, Serialize)]
189pub struct CorrelationResponse {
190 pub event_type_a: String,
191 pub event_type_b: String,
192 pub total_a: usize,
193 pub total_b: usize,
194 pub correlated_pairs: usize,
195 pub correlation_percentage: f64,
196 pub avg_time_between_seconds: f64,
197 pub examples: Vec<CorrelationExample>,
198}
199
200#[derive(Debug, Serialize)]
201pub struct CorrelationExample {
202 pub entity_id: String,
203 pub event_a_timestamp: DateTime<Utc>,
204 pub event_b_timestamp: DateTime<Utc>,
205 pub time_between_seconds: i64,
206}
207
208pub struct AnalyticsEngine;
210
211impl AnalyticsEngine {
212 pub fn event_frequency(
214 store: &EventStore,
215 request: EventFrequencyRequest,
216 ) -> Result<EventFrequencyResponse> {
217 let until = request.until.unwrap_or_else(Utc::now);
218
219 let events = store.query(crate::application::dto::QueryEventsRequest {
221 entity_id: request.entity_id.clone(),
222 event_type: request.event_type.clone(),
223 tenant_id: None,
224 as_of: None,
225 since: Some(request.since),
226 until: Some(until),
227 limit: None,
228 })?;
229
230 if events.is_empty() {
231 return Ok(EventFrequencyResponse {
232 buckets: Vec::new(),
233 total_events: 0,
234 window: request.window,
235 time_range: TimeRange {
236 from: request.since,
237 to: until,
238 },
239 });
240 }
241
242 let mut buckets_map: HashMap<DateTime<Utc>, HashMap<String, usize>> = HashMap::new();
244
245 for event in &events {
246 let bucket_time = request.window.truncate(event.timestamp);
247 let bucket = buckets_map.entry(bucket_time).or_default();
248 *bucket
249 .entry(event.event_type_str().to_string())
250 .or_insert(0) += 1;
251 }
252
253 let mut buckets: Vec<TimeBucket> = buckets_map
255 .into_iter()
256 .map(|(timestamp, event_types)| {
257 let count = event_types.values().sum();
258 TimeBucket {
259 timestamp,
260 count,
261 event_types,
262 }
263 })
264 .collect();
265
266 buckets.sort_by_key(|b| b.timestamp);
267
268 let filled_buckets = Self::fill_time_gaps(&buckets, request.since, until, request.window);
270
271 Ok(EventFrequencyResponse {
272 total_events: events.len(),
273 buckets: filled_buckets,
274 window: request.window,
275 time_range: TimeRange {
276 from: request.since,
277 to: until,
278 },
279 })
280 }
281
282 fn fill_time_gaps(
284 buckets: &[TimeBucket],
285 start: DateTime<Utc>,
286 end: DateTime<Utc>,
287 window: TimeWindow,
288 ) -> Vec<TimeBucket> {
289 if buckets.is_empty() {
290 return Vec::new();
291 }
292
293 let mut filled = Vec::new();
294 let mut current = window.truncate(start);
295 let end = window.truncate(end);
296
297 let bucket_map: HashMap<DateTime<Utc>, &TimeBucket> =
298 buckets.iter().map(|b| (b.timestamp, b)).collect();
299
300 while current <= end {
301 if let Some(bucket) = bucket_map.get(¤t) {
302 filled.push((**bucket).clone());
303 } else {
304 filled.push(TimeBucket {
305 timestamp: current,
306 count: 0,
307 event_types: HashMap::new(),
308 });
309 }
310 current += window.duration();
311 }
312
313 filled
314 }
315
316 pub fn stats_summary(
318 store: &EventStore,
319 request: StatsSummaryRequest,
320 ) -> Result<StatsSummaryResponse> {
321 let events = store.query(crate::application::dto::QueryEventsRequest {
323 entity_id: request.entity_id.clone(),
324 event_type: request.event_type.clone(),
325 tenant_id: None,
326 as_of: None,
327 since: request.since,
328 until: request.until,
329 limit: None,
330 })?;
331
332 if events.is_empty() {
333 return Err(AllSourceError::ValidationError(
334 "No events found for the specified criteria".to_string(),
335 ));
336 }
337
338 let first_event = events.first().map(|e| e.timestamp);
340 let last_event = events.last().map(|e| e.timestamp);
341
342 let mut entity_counts: HashMap<String, usize> = HashMap::new();
343 let mut event_type_counts: HashMap<String, usize> = HashMap::new();
344
345 for event in &events {
346 *entity_counts
347 .entry(event.entity_id_str().to_string())
348 .or_insert(0) += 1;
349 *event_type_counts
350 .entry(event.event_type_str().to_string())
351 .or_insert(0) += 1;
352 }
353
354 let time_span = if let (Some(first), Some(last)) = (first_event, last_event) {
356 (last - first).num_days().max(1) as f64
357 } else {
358 1.0
359 };
360
361 let events_per_day = events.len() as f64 / time_span;
362
363 let mut top_event_types: Vec<EventTypeCount> = event_type_counts
365 .into_iter()
366 .map(|(event_type, count)| EventTypeCount {
367 event_type,
368 count,
369 percentage: (count as f64 / events.len() as f64) * 100.0,
370 })
371 .collect();
372 top_event_types.sort_by_key(|x| std::cmp::Reverse(x.count));
373 top_event_types.truncate(10);
374
375 let mut top_entities: Vec<EntityCount> = entity_counts
377 .into_iter()
378 .map(|(entity_id, count)| EntityCount {
379 entity_id,
380 count,
381 percentage: (count as f64 / events.len() as f64) * 100.0,
382 })
383 .collect();
384 top_entities.sort_by_key(|x| std::cmp::Reverse(x.count));
385 top_entities.truncate(10);
386
387 let time_range = TimeRange {
388 from: first_event.unwrap_or_else(Utc::now),
389 to: last_event.unwrap_or_else(Utc::now),
390 };
391
392 Ok(StatsSummaryResponse {
393 total_events: events.len(),
394 unique_entities: top_entities.len(),
395 unique_event_types: top_event_types.len(),
396 time_range,
397 events_per_day,
398 top_event_types,
399 top_entities,
400 first_event,
401 last_event,
402 })
403 }
404
405 pub fn analyze_correlation(
407 store: &EventStore,
408 request: CorrelationRequest,
409 ) -> Result<CorrelationResponse> {
410 let events_a = store.query(crate::application::dto::QueryEventsRequest {
412 entity_id: None,
413 event_type: Some(request.event_type_a.clone()),
414 tenant_id: None,
415 as_of: None,
416 since: request.since,
417 until: request.until,
418 limit: None,
419 })?;
420
421 let events_b = store.query(crate::application::dto::QueryEventsRequest {
422 entity_id: None,
423 event_type: Some(request.event_type_b.clone()),
424 tenant_id: None,
425 as_of: None,
426 since: request.since,
427 until: request.until,
428 limit: None,
429 })?;
430
431 let mut entity_events_a: HashMap<String, Vec<&Event>> = HashMap::new();
433 let mut entity_events_b: HashMap<String, Vec<&Event>> = HashMap::new();
434
435 for event in &events_a {
436 entity_events_a
437 .entry(event.entity_id_str().to_string())
438 .or_default()
439 .push(event);
440 }
441
442 for event in &events_b {
443 entity_events_b
444 .entry(event.entity_id_str().to_string())
445 .or_default()
446 .push(event);
447 }
448
449 let mut correlated_pairs = 0;
451 let mut total_time_between = 0i64;
452 let mut examples = Vec::new();
453
454 for (entity_id, a_events) in &entity_events_a {
455 if let Some(b_events) = entity_events_b.get(entity_id) {
456 for a_event in a_events {
457 for b_event in b_events {
458 let time_diff = (b_event.timestamp - a_event.timestamp).num_seconds().abs();
459
460 if time_diff <= request.time_window_seconds {
461 correlated_pairs += 1;
462 total_time_between += time_diff;
463
464 if examples.len() < 5 {
465 examples.push(CorrelationExample {
466 entity_id: entity_id.clone(),
467 event_a_timestamp: a_event.timestamp,
468 event_b_timestamp: b_event.timestamp,
469 time_between_seconds: time_diff,
470 });
471 }
472 }
473 }
474 }
475 }
476 }
477
478 let correlation_percentage = if !events_a.is_empty() {
479 (correlated_pairs as f64 / events_a.len() as f64) * 100.0
480 } else {
481 0.0
482 };
483
484 let avg_time_between = if correlated_pairs > 0 {
485 total_time_between as f64 / correlated_pairs as f64
486 } else {
487 0.0
488 };
489
490 Ok(CorrelationResponse {
491 event_type_a: request.event_type_a,
492 event_type_b: request.event_type_b,
493 total_a: events_a.len(),
494 total_b: events_b.len(),
495 correlated_pairs,
496 correlation_percentage,
497 avg_time_between_seconds: avg_time_between,
498 examples,
499 })
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506
507 #[test]
508 fn test_time_window_truncation() {
509 let timestamp = chrono::Utc::now();
510
511 let minute_truncated = TimeWindow::Minute.truncate(timestamp);
512 assert_eq!(minute_truncated.second(), 0);
513
514 let hour_truncated = TimeWindow::Hour.truncate(timestamp);
515 assert_eq!(hour_truncated.minute(), 0);
516 assert_eq!(hour_truncated.second(), 0);
517
518 let day_truncated = TimeWindow::Day.truncate(timestamp);
519 assert_eq!(day_truncated.hour(), 0);
520 assert_eq!(day_truncated.minute(), 0);
521 }
522}