1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4 store::EventStore,
5};
6use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
12#[serde(rename_all = "lowercase")]
13pub enum TimeWindow {
14 Minute,
15 Hour,
16 Day,
17 Week,
18 Month,
19}
20
21impl TimeWindow {
22 pub fn duration(&self) -> Duration {
23 match self {
24 TimeWindow::Minute => Duration::minutes(1),
25 TimeWindow::Hour => Duration::hours(1),
26 TimeWindow::Day => Duration::days(1),
27 TimeWindow::Week => Duration::weeks(1),
28 TimeWindow::Month => Duration::days(30),
29 }
30 }
31
32 pub fn truncate(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
33 match self {
34 TimeWindow::Minute => timestamp
35 .with_second(0)
36 .unwrap()
37 .with_nanosecond(0)
38 .unwrap(),
39 TimeWindow::Hour => timestamp
40 .with_minute(0)
41 .unwrap()
42 .with_second(0)
43 .unwrap()
44 .with_nanosecond(0)
45 .unwrap(),
46 TimeWindow::Day => timestamp
47 .with_hour(0)
48 .unwrap()
49 .with_minute(0)
50 .unwrap()
51 .with_second(0)
52 .unwrap()
53 .with_nanosecond(0)
54 .unwrap(),
55 TimeWindow::Week => {
56 let days_from_monday = timestamp.weekday().num_days_from_monday();
57 (timestamp - Duration::days(days_from_monday as i64))
58 .with_hour(0)
59 .unwrap()
60 .with_minute(0)
61 .unwrap()
62 .with_second(0)
63 .unwrap()
64 .with_nanosecond(0)
65 .unwrap()
66 }
67 TimeWindow::Month => timestamp
68 .with_day(1)
69 .unwrap()
70 .with_hour(0)
71 .unwrap()
72 .with_minute(0)
73 .unwrap()
74 .with_second(0)
75 .unwrap()
76 .with_nanosecond(0)
77 .unwrap(),
78 }
79 }
80}
81
82#[derive(Debug, Deserialize)]
84pub struct EventFrequencyRequest {
85 pub entity_id: Option<String>,
87
88 pub event_type: Option<String>,
90
91 pub since: DateTime<Utc>,
93
94 pub until: Option<DateTime<Utc>>,
96
97 pub window: TimeWindow,
99}
100
101#[derive(Debug, Clone, Serialize)]
103pub struct TimeBucket {
104 pub timestamp: DateTime<Utc>,
105 pub count: usize,
106 pub event_types: HashMap<String, usize>,
107}
108
109#[derive(Debug, Serialize)]
111pub struct EventFrequencyResponse {
112 pub buckets: Vec<TimeBucket>,
113 pub total_events: usize,
114 pub window: TimeWindow,
115 pub time_range: TimeRange,
116}
117
118#[derive(Debug, Serialize)]
119pub struct TimeRange {
120 pub from: DateTime<Utc>,
121 pub to: DateTime<Utc>,
122}
123
124#[derive(Debug, Deserialize)]
126pub struct StatsSummaryRequest {
127 pub entity_id: Option<String>,
129
130 pub event_type: Option<String>,
132
133 pub since: Option<DateTime<Utc>>,
135
136 pub until: Option<DateTime<Utc>>,
138}
139
140#[derive(Debug, Serialize)]
142pub struct StatsSummaryResponse {
143 pub total_events: usize,
144 pub unique_entities: usize,
145 pub unique_event_types: usize,
146 pub time_range: TimeRange,
147 pub events_per_day: f64,
148 pub top_event_types: Vec<EventTypeCount>,
149 pub top_entities: Vec<EntityCount>,
150 pub first_event: Option<DateTime<Utc>>,
151 pub last_event: Option<DateTime<Utc>>,
152}
153
154#[derive(Debug, Serialize)]
155pub struct EventTypeCount {
156 pub event_type: String,
157 pub count: usize,
158 pub percentage: f64,
159}
160
161#[derive(Debug, Serialize)]
162pub struct EntityCount {
163 pub entity_id: String,
164 pub count: usize,
165 pub percentage: f64,
166}
167
168#[derive(Debug, Deserialize)]
170pub struct CorrelationRequest {
171 pub event_type_a: String,
173
174 pub event_type_b: String,
176
177 pub time_window_seconds: i64,
179
180 pub since: Option<DateTime<Utc>>,
182
183 pub until: Option<DateTime<Utc>>,
185}
186
187#[derive(Debug, Serialize)]
189pub struct CorrelationResponse {
190 pub event_type_a: String,
191 pub event_type_b: String,
192 pub total_a: usize,
193 pub total_b: usize,
194 pub correlated_pairs: usize,
195 pub correlation_percentage: f64,
196 pub avg_time_between_seconds: f64,
197 pub examples: Vec<CorrelationExample>,
198}
199
200#[derive(Debug, Serialize)]
201pub struct CorrelationExample {
202 pub entity_id: String,
203 pub event_a_timestamp: DateTime<Utc>,
204 pub event_b_timestamp: DateTime<Utc>,
205 pub time_between_seconds: i64,
206}
207
208pub struct AnalyticsEngine;
210
211impl AnalyticsEngine {
212 pub fn event_frequency(
214 store: &EventStore,
215 request: EventFrequencyRequest,
216 ) -> Result<EventFrequencyResponse> {
217 let until = request.until.unwrap_or_else(Utc::now);
218
219 let events = store.query(crate::application::dto::QueryEventsRequest {
221 entity_id: request.entity_id.clone(),
222 event_type: request.event_type.clone(),
223 tenant_id: None,
224 as_of: None,
225 since: Some(request.since),
226 until: Some(until),
227 limit: None,
228 event_type_prefix: None,
229 payload_filter: None,
230 })?;
231
232 if events.is_empty() {
233 return Ok(EventFrequencyResponse {
234 buckets: Vec::new(),
235 total_events: 0,
236 window: request.window,
237 time_range: TimeRange {
238 from: request.since,
239 to: until,
240 },
241 });
242 }
243
244 let mut buckets_map: HashMap<DateTime<Utc>, HashMap<String, usize>> = HashMap::new();
246
247 for event in &events {
248 let bucket_time = request.window.truncate(event.timestamp);
249 let bucket = buckets_map.entry(bucket_time).or_default();
250 *bucket
251 .entry(event.event_type_str().to_string())
252 .or_insert(0) += 1;
253 }
254
255 let mut buckets: Vec<TimeBucket> = buckets_map
257 .into_iter()
258 .map(|(timestamp, event_types)| {
259 let count = event_types.values().sum();
260 TimeBucket {
261 timestamp,
262 count,
263 event_types,
264 }
265 })
266 .collect();
267
268 buckets.sort_by_key(|b| b.timestamp);
269
270 let filled_buckets = Self::fill_time_gaps(&buckets, request.since, until, request.window);
272
273 Ok(EventFrequencyResponse {
274 total_events: events.len(),
275 buckets: filled_buckets,
276 window: request.window,
277 time_range: TimeRange {
278 from: request.since,
279 to: until,
280 },
281 })
282 }
283
284 fn fill_time_gaps(
286 buckets: &[TimeBucket],
287 start: DateTime<Utc>,
288 end: DateTime<Utc>,
289 window: TimeWindow,
290 ) -> Vec<TimeBucket> {
291 if buckets.is_empty() {
292 return Vec::new();
293 }
294
295 let mut filled = Vec::new();
296 let mut current = window.truncate(start);
297 let end = window.truncate(end);
298
299 let bucket_map: HashMap<DateTime<Utc>, &TimeBucket> =
300 buckets.iter().map(|b| (b.timestamp, b)).collect();
301
302 while current <= end {
303 if let Some(bucket) = bucket_map.get(¤t) {
304 filled.push((**bucket).clone());
305 } else {
306 filled.push(TimeBucket {
307 timestamp: current,
308 count: 0,
309 event_types: HashMap::new(),
310 });
311 }
312 current += window.duration();
313 }
314
315 filled
316 }
317
318 pub fn stats_summary(
320 store: &EventStore,
321 request: StatsSummaryRequest,
322 ) -> Result<StatsSummaryResponse> {
323 let events = store.query(crate::application::dto::QueryEventsRequest {
325 entity_id: request.entity_id.clone(),
326 event_type: request.event_type.clone(),
327 tenant_id: None,
328 as_of: None,
329 since: request.since,
330 until: request.until,
331 limit: None,
332 event_type_prefix: None,
333 payload_filter: None,
334 })?;
335
336 if events.is_empty() {
337 return Err(AllSourceError::ValidationError(
338 "No events found for the specified criteria".to_string(),
339 ));
340 }
341
342 let first_event = events.first().map(|e| e.timestamp);
344 let last_event = events.last().map(|e| e.timestamp);
345
346 let mut entity_counts: HashMap<String, usize> = HashMap::new();
347 let mut event_type_counts: HashMap<String, usize> = HashMap::new();
348
349 for event in &events {
350 *entity_counts
351 .entry(event.entity_id_str().to_string())
352 .or_insert(0) += 1;
353 *event_type_counts
354 .entry(event.event_type_str().to_string())
355 .or_insert(0) += 1;
356 }
357
358 let time_span = if let (Some(first), Some(last)) = (first_event, last_event) {
360 (last - first).num_days().max(1) as f64
361 } else {
362 1.0
363 };
364
365 let events_per_day = events.len() as f64 / time_span;
366
367 let mut top_event_types: Vec<EventTypeCount> = event_type_counts
369 .into_iter()
370 .map(|(event_type, count)| EventTypeCount {
371 event_type,
372 count,
373 percentage: (count as f64 / events.len() as f64) * 100.0,
374 })
375 .collect();
376 top_event_types.sort_by_key(|x| std::cmp::Reverse(x.count));
377 top_event_types.truncate(10);
378
379 let mut top_entities: Vec<EntityCount> = entity_counts
381 .into_iter()
382 .map(|(entity_id, count)| EntityCount {
383 entity_id,
384 count,
385 percentage: (count as f64 / events.len() as f64) * 100.0,
386 })
387 .collect();
388 top_entities.sort_by_key(|x| std::cmp::Reverse(x.count));
389 top_entities.truncate(10);
390
391 let time_range = TimeRange {
392 from: first_event.unwrap_or_else(Utc::now),
393 to: last_event.unwrap_or_else(Utc::now),
394 };
395
396 Ok(StatsSummaryResponse {
397 total_events: events.len(),
398 unique_entities: top_entities.len(),
399 unique_event_types: top_event_types.len(),
400 time_range,
401 events_per_day,
402 top_event_types,
403 top_entities,
404 first_event,
405 last_event,
406 })
407 }
408
409 pub fn analyze_correlation(
411 store: &EventStore,
412 request: CorrelationRequest,
413 ) -> Result<CorrelationResponse> {
414 let events_a = store.query(crate::application::dto::QueryEventsRequest {
416 entity_id: None,
417 event_type: Some(request.event_type_a.clone()),
418 tenant_id: None,
419 as_of: None,
420 since: request.since,
421 until: request.until,
422 limit: None,
423 event_type_prefix: None,
424 payload_filter: None,
425 })?;
426
427 let events_b = store.query(crate::application::dto::QueryEventsRequest {
428 entity_id: None,
429 event_type: Some(request.event_type_b.clone()),
430 tenant_id: None,
431 as_of: None,
432 since: request.since,
433 until: request.until,
434 limit: None,
435 event_type_prefix: None,
436 payload_filter: None,
437 })?;
438
439 let mut entity_events_a: HashMap<String, Vec<&Event>> = HashMap::new();
441 let mut entity_events_b: HashMap<String, Vec<&Event>> = HashMap::new();
442
443 for event in &events_a {
444 entity_events_a
445 .entry(event.entity_id_str().to_string())
446 .or_default()
447 .push(event);
448 }
449
450 for event in &events_b {
451 entity_events_b
452 .entry(event.entity_id_str().to_string())
453 .or_default()
454 .push(event);
455 }
456
457 let mut correlated_pairs = 0;
459 let mut total_time_between = 0i64;
460 let mut examples = Vec::new();
461
462 for (entity_id, a_events) in &entity_events_a {
463 if let Some(b_events) = entity_events_b.get(entity_id) {
464 for a_event in a_events {
465 for b_event in b_events {
466 let time_diff = (b_event.timestamp - a_event.timestamp).num_seconds().abs();
467
468 if time_diff <= request.time_window_seconds {
469 correlated_pairs += 1;
470 total_time_between += time_diff;
471
472 if examples.len() < 5 {
473 examples.push(CorrelationExample {
474 entity_id: entity_id.clone(),
475 event_a_timestamp: a_event.timestamp,
476 event_b_timestamp: b_event.timestamp,
477 time_between_seconds: time_diff,
478 });
479 }
480 }
481 }
482 }
483 }
484 }
485
486 let correlation_percentage = if !events_a.is_empty() {
487 (correlated_pairs as f64 / events_a.len() as f64) * 100.0
488 } else {
489 0.0
490 };
491
492 let avg_time_between = if correlated_pairs > 0 {
493 total_time_between as f64 / correlated_pairs as f64
494 } else {
495 0.0
496 };
497
498 Ok(CorrelationResponse {
499 event_type_a: request.event_type_a,
500 event_type_b: request.event_type_b,
501 total_a: events_a.len(),
502 total_b: events_b.len(),
503 correlated_pairs,
504 correlation_percentage,
505 avg_time_between_seconds: avg_time_between,
506 examples,
507 })
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::*;
514
515 #[test]
516 fn test_time_window_truncation() {
517 let timestamp = chrono::Utc::now();
518
519 let minute_truncated = TimeWindow::Minute.truncate(timestamp);
520 assert_eq!(minute_truncated.second(), 0);
521
522 let hour_truncated = TimeWindow::Hour.truncate(timestamp);
523 assert_eq!(hour_truncated.minute(), 0);
524 assert_eq!(hour_truncated.second(), 0);
525
526 let day_truncated = TimeWindow::Day.truncate(timestamp);
527 assert_eq!(day_truncated.hour(), 0);
528 assert_eq!(day_truncated.minute(), 0);
529 }
530}