1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crate::store::EventStore;
4use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
10#[serde(rename_all = "lowercase")]
11pub enum TimeWindow {
12 Minute,
13 Hour,
14 Day,
15 Week,
16 Month,
17}
18
19impl TimeWindow {
20 pub fn duration(&self) -> Duration {
21 match self {
22 TimeWindow::Minute => Duration::minutes(1),
23 TimeWindow::Hour => Duration::hours(1),
24 TimeWindow::Day => Duration::days(1),
25 TimeWindow::Week => Duration::weeks(1),
26 TimeWindow::Month => Duration::days(30),
27 }
28 }
29
30 pub fn truncate(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
31 match self {
32 TimeWindow::Minute => timestamp
33 .with_second(0)
34 .unwrap()
35 .with_nanosecond(0)
36 .unwrap(),
37 TimeWindow::Hour => timestamp
38 .with_minute(0)
39 .unwrap()
40 .with_second(0)
41 .unwrap()
42 .with_nanosecond(0)
43 .unwrap(),
44 TimeWindow::Day => timestamp
45 .with_hour(0)
46 .unwrap()
47 .with_minute(0)
48 .unwrap()
49 .with_second(0)
50 .unwrap()
51 .with_nanosecond(0)
52 .unwrap(),
53 TimeWindow::Week => {
54 let days_from_monday = timestamp.weekday().num_days_from_monday();
55 (timestamp - Duration::days(days_from_monday as i64))
56 .with_hour(0)
57 .unwrap()
58 .with_minute(0)
59 .unwrap()
60 .with_second(0)
61 .unwrap()
62 .with_nanosecond(0)
63 .unwrap()
64 }
65 TimeWindow::Month => timestamp
66 .with_day(1)
67 .unwrap()
68 .with_hour(0)
69 .unwrap()
70 .with_minute(0)
71 .unwrap()
72 .with_second(0)
73 .unwrap()
74 .with_nanosecond(0)
75 .unwrap(),
76 }
77 }
78}
79
80#[derive(Debug, Deserialize)]
82pub struct EventFrequencyRequest {
83 pub entity_id: Option<String>,
85
86 pub event_type: Option<String>,
88
89 pub since: DateTime<Utc>,
91
92 pub until: Option<DateTime<Utc>>,
94
95 pub window: TimeWindow,
97}
98
99#[derive(Debug, Clone, Serialize)]
101pub struct TimeBucket {
102 pub timestamp: DateTime<Utc>,
103 pub count: usize,
104 pub event_types: HashMap<String, usize>,
105}
106
107#[derive(Debug, Serialize)]
109pub struct EventFrequencyResponse {
110 pub buckets: Vec<TimeBucket>,
111 pub total_events: usize,
112 pub window: TimeWindow,
113 pub time_range: TimeRange,
114}
115
116#[derive(Debug, Serialize)]
117pub struct TimeRange {
118 pub from: DateTime<Utc>,
119 pub to: DateTime<Utc>,
120}
121
122#[derive(Debug, Deserialize)]
124pub struct StatsSummaryRequest {
125 pub entity_id: Option<String>,
127
128 pub event_type: Option<String>,
130
131 pub since: Option<DateTime<Utc>>,
133
134 pub until: Option<DateTime<Utc>>,
136}
137
138#[derive(Debug, Serialize)]
140pub struct StatsSummaryResponse {
141 pub total_events: usize,
142 pub unique_entities: usize,
143 pub unique_event_types: usize,
144 pub time_range: TimeRange,
145 pub events_per_day: f64,
146 pub top_event_types: Vec<EventTypeCount>,
147 pub top_entities: Vec<EntityCount>,
148 pub first_event: Option<DateTime<Utc>>,
149 pub last_event: Option<DateTime<Utc>>,
150}
151
152#[derive(Debug, Serialize)]
153pub struct EventTypeCount {
154 pub event_type: String,
155 pub count: usize,
156 pub percentage: f64,
157}
158
159#[derive(Debug, Serialize)]
160pub struct EntityCount {
161 pub entity_id: String,
162 pub count: usize,
163 pub percentage: f64,
164}
165
166#[derive(Debug, Deserialize)]
168pub struct CorrelationRequest {
169 pub event_type_a: String,
171
172 pub event_type_b: String,
174
175 pub time_window_seconds: i64,
177
178 pub since: Option<DateTime<Utc>>,
180
181 pub until: Option<DateTime<Utc>>,
183}
184
185#[derive(Debug, Serialize)]
187pub struct CorrelationResponse {
188 pub event_type_a: String,
189 pub event_type_b: String,
190 pub total_a: usize,
191 pub total_b: usize,
192 pub correlated_pairs: usize,
193 pub correlation_percentage: f64,
194 pub avg_time_between_seconds: f64,
195 pub examples: Vec<CorrelationExample>,
196}
197
198#[derive(Debug, Serialize)]
199pub struct CorrelationExample {
200 pub entity_id: String,
201 pub event_a_timestamp: DateTime<Utc>,
202 pub event_b_timestamp: DateTime<Utc>,
203 pub time_between_seconds: i64,
204}
205
206pub struct AnalyticsEngine;
208
209impl AnalyticsEngine {
210 pub fn event_frequency(
212 store: &EventStore,
213 request: EventFrequencyRequest,
214 ) -> Result<EventFrequencyResponse> {
215 let until = request.until.unwrap_or_else(Utc::now);
216
217 let events = store.query(crate::application::dto::QueryEventsRequest {
219 entity_id: request.entity_id.clone(),
220 event_type: request.event_type.clone(),
221 tenant_id: None,
222 as_of: None,
223 since: Some(request.since),
224 until: Some(until),
225 limit: None,
226 })?;
227
228 if events.is_empty() {
229 return Ok(EventFrequencyResponse {
230 buckets: Vec::new(),
231 total_events: 0,
232 window: request.window,
233 time_range: TimeRange {
234 from: request.since,
235 to: until,
236 },
237 });
238 }
239
240 let mut buckets_map: HashMap<DateTime<Utc>, HashMap<String, usize>> = HashMap::new();
242
243 for event in &events {
244 let bucket_time = request.window.truncate(event.timestamp);
245 let bucket = buckets_map.entry(bucket_time).or_insert_with(HashMap::new);
246 *bucket
247 .entry(event.event_type_str().to_string())
248 .or_insert(0) += 1;
249 }
250
251 let mut buckets: Vec<TimeBucket> = buckets_map
253 .into_iter()
254 .map(|(timestamp, event_types)| {
255 let count = event_types.values().sum();
256 TimeBucket {
257 timestamp,
258 count,
259 event_types,
260 }
261 })
262 .collect();
263
264 buckets.sort_by_key(|b| b.timestamp);
265
266 let filled_buckets = Self::fill_time_gaps(&buckets, request.since, until, request.window);
268
269 Ok(EventFrequencyResponse {
270 total_events: events.len(),
271 buckets: filled_buckets,
272 window: request.window,
273 time_range: TimeRange {
274 from: request.since,
275 to: until,
276 },
277 })
278 }
279
280 fn fill_time_gaps(
282 buckets: &[TimeBucket],
283 start: DateTime<Utc>,
284 end: DateTime<Utc>,
285 window: TimeWindow,
286 ) -> Vec<TimeBucket> {
287 if buckets.is_empty() {
288 return Vec::new();
289 }
290
291 let mut filled = Vec::new();
292 let mut current = window.truncate(start);
293 let end = window.truncate(end);
294
295 let bucket_map: HashMap<DateTime<Utc>, &TimeBucket> =
296 buckets.iter().map(|b| (b.timestamp, b)).collect();
297
298 while current <= end {
299 if let Some(bucket) = bucket_map.get(¤t) {
300 filled.push((**bucket).clone());
301 } else {
302 filled.push(TimeBucket {
303 timestamp: current,
304 count: 0,
305 event_types: HashMap::new(),
306 });
307 }
308 current = current + window.duration();
309 }
310
311 filled
312 }
313
314 pub fn stats_summary(
316 store: &EventStore,
317 request: StatsSummaryRequest,
318 ) -> Result<StatsSummaryResponse> {
319 let events = store.query(crate::application::dto::QueryEventsRequest {
321 entity_id: request.entity_id.clone(),
322 event_type: request.event_type.clone(),
323 tenant_id: None,
324 as_of: None,
325 since: request.since,
326 until: request.until,
327 limit: None,
328 })?;
329
330 if events.is_empty() {
331 return Err(AllSourceError::ValidationError(
332 "No events found for the specified criteria".to_string(),
333 ));
334 }
335
336 let first_event = events.first().map(|e| e.timestamp);
338 let last_event = events.last().map(|e| e.timestamp);
339
340 let mut entity_counts: HashMap<String, usize> = HashMap::new();
341 let mut event_type_counts: HashMap<String, usize> = HashMap::new();
342
343 for event in &events {
344 *entity_counts
345 .entry(event.entity_id_str().to_string())
346 .or_insert(0) += 1;
347 *event_type_counts
348 .entry(event.event_type_str().to_string())
349 .or_insert(0) += 1;
350 }
351
352 let time_span = if let (Some(first), Some(last)) = (first_event, last_event) {
354 (last - first).num_days().max(1) as f64
355 } else {
356 1.0
357 };
358
359 let events_per_day = events.len() as f64 / time_span;
360
361 let mut top_event_types: Vec<EventTypeCount> = event_type_counts
363 .into_iter()
364 .map(|(event_type, count)| EventTypeCount {
365 event_type,
366 count,
367 percentage: (count as f64 / events.len() as f64) * 100.0,
368 })
369 .collect();
370 top_event_types.sort_by(|a, b| b.count.cmp(&a.count));
371 top_event_types.truncate(10);
372
373 let mut top_entities: Vec<EntityCount> = entity_counts
375 .into_iter()
376 .map(|(entity_id, count)| EntityCount {
377 entity_id,
378 count,
379 percentage: (count as f64 / events.len() as f64) * 100.0,
380 })
381 .collect();
382 top_entities.sort_by(|a, b| b.count.cmp(&a.count));
383 top_entities.truncate(10);
384
385 let time_range = TimeRange {
386 from: first_event.unwrap_or_else(Utc::now),
387 to: last_event.unwrap_or_else(Utc::now),
388 };
389
390 Ok(StatsSummaryResponse {
391 total_events: events.len(),
392 unique_entities: top_entities.len(),
393 unique_event_types: top_event_types.len(),
394 time_range,
395 events_per_day,
396 top_event_types,
397 top_entities,
398 first_event,
399 last_event,
400 })
401 }
402
403 pub fn analyze_correlation(
405 store: &EventStore,
406 request: CorrelationRequest,
407 ) -> Result<CorrelationResponse> {
408 let events_a = store.query(crate::application::dto::QueryEventsRequest {
410 entity_id: None,
411 event_type: Some(request.event_type_a.clone()),
412 tenant_id: None,
413 as_of: None,
414 since: request.since,
415 until: request.until,
416 limit: None,
417 })?;
418
419 let events_b = store.query(crate::application::dto::QueryEventsRequest {
420 entity_id: None,
421 event_type: Some(request.event_type_b.clone()),
422 tenant_id: None,
423 as_of: None,
424 since: request.since,
425 until: request.until,
426 limit: None,
427 })?;
428
429 let mut entity_events_a: HashMap<String, Vec<&Event>> = HashMap::new();
431 let mut entity_events_b: HashMap<String, Vec<&Event>> = HashMap::new();
432
433 for event in &events_a {
434 entity_events_a
435 .entry(event.entity_id_str().to_string())
436 .or_insert_with(Vec::new)
437 .push(event);
438 }
439
440 for event in &events_b {
441 entity_events_b
442 .entry(event.entity_id_str().to_string())
443 .or_insert_with(Vec::new)
444 .push(event);
445 }
446
447 let mut correlated_pairs = 0;
449 let mut total_time_between = 0i64;
450 let mut examples = Vec::new();
451
452 for (entity_id, a_events) in &entity_events_a {
453 if let Some(b_events) = entity_events_b.get(entity_id) {
454 for a_event in a_events {
455 for b_event in b_events {
456 let time_diff = (b_event.timestamp - a_event.timestamp).num_seconds().abs();
457
458 if time_diff <= request.time_window_seconds {
459 correlated_pairs += 1;
460 total_time_between += time_diff;
461
462 if examples.len() < 5 {
463 examples.push(CorrelationExample {
464 entity_id: entity_id.clone(),
465 event_a_timestamp: a_event.timestamp,
466 event_b_timestamp: b_event.timestamp,
467 time_between_seconds: time_diff,
468 });
469 }
470 }
471 }
472 }
473 }
474 }
475
476 let correlation_percentage = if !events_a.is_empty() {
477 (correlated_pairs as f64 / events_a.len() as f64) * 100.0
478 } else {
479 0.0
480 };
481
482 let avg_time_between = if correlated_pairs > 0 {
483 total_time_between as f64 / correlated_pairs as f64
484 } else {
485 0.0
486 };
487
488 Ok(CorrelationResponse {
489 event_type_a: request.event_type_a,
490 event_type_b: request.event_type_b,
491 total_a: events_a.len(),
492 total_b: events_b.len(),
493 correlated_pairs,
494 correlation_percentage,
495 avg_time_between_seconds: avg_time_between,
496 examples,
497 })
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504
505 #[test]
506 fn test_time_window_truncation() {
507 let timestamp = chrono::Utc::now();
508
509 let minute_truncated = TimeWindow::Minute.truncate(timestamp);
510 assert_eq!(minute_truncated.second(), 0);
511
512 let hour_truncated = TimeWindow::Hour.truncate(timestamp);
513 assert_eq!(hour_truncated.minute(), 0);
514 assert_eq!(hour_truncated.second(), 0);
515
516 let day_truncated = TimeWindow::Day.truncate(timestamp);
517 assert_eq!(day_truncated.hour(), 0);
518 assert_eq!(day_truncated.minute(), 0);
519 }
520}