1use crate::error::{AllSourceError, Result};
2use crate::domain::entities::Event;
3use crate::store::EventStore;
4use chrono::{DateTime, Datelike, Duration, Timelike, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7
8#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
10#[serde(rename_all = "lowercase")]
11pub enum TimeWindow {
12 Minute,
13 Hour,
14 Day,
15 Week,
16 Month,
17}
18
19impl TimeWindow {
20 pub fn duration(&self) -> Duration {
21 match self {
22 TimeWindow::Minute => Duration::minutes(1),
23 TimeWindow::Hour => Duration::hours(1),
24 TimeWindow::Day => Duration::days(1),
25 TimeWindow::Week => Duration::weeks(1),
26 TimeWindow::Month => Duration::days(30),
27 }
28 }
29
30 pub fn truncate(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
31 match self {
32 TimeWindow::Minute => timestamp
33 .with_second(0)
34 .unwrap()
35 .with_nanosecond(0)
36 .unwrap(),
37 TimeWindow::Hour => timestamp
38 .with_minute(0)
39 .unwrap()
40 .with_second(0)
41 .unwrap()
42 .with_nanosecond(0)
43 .unwrap(),
44 TimeWindow::Day => timestamp
45 .with_hour(0)
46 .unwrap()
47 .with_minute(0)
48 .unwrap()
49 .with_second(0)
50 .unwrap()
51 .with_nanosecond(0)
52 .unwrap(),
53 TimeWindow::Week => {
54 let days_from_monday = timestamp.weekday().num_days_from_monday();
55 (timestamp - Duration::days(days_from_monday as i64))
56 .with_hour(0)
57 .unwrap()
58 .with_minute(0)
59 .unwrap()
60 .with_second(0)
61 .unwrap()
62 .with_nanosecond(0)
63 .unwrap()
64 }
65 TimeWindow::Month => timestamp
66 .with_day(1)
67 .unwrap()
68 .with_hour(0)
69 .unwrap()
70 .with_minute(0)
71 .unwrap()
72 .with_second(0)
73 .unwrap()
74 .with_nanosecond(0)
75 .unwrap(),
76 }
77 }
78}
79
80#[derive(Debug, Deserialize)]
82pub struct EventFrequencyRequest {
83 pub entity_id: Option<String>,
85
86 pub event_type: Option<String>,
88
89 pub since: DateTime<Utc>,
91
92 pub until: Option<DateTime<Utc>>,
94
95 pub window: TimeWindow,
97}
98
99#[derive(Debug, Clone, Serialize)]
101pub struct TimeBucket {
102 pub timestamp: DateTime<Utc>,
103 pub count: usize,
104 pub event_types: HashMap<String, usize>,
105}
106
107#[derive(Debug, Serialize)]
109pub struct EventFrequencyResponse {
110 pub buckets: Vec<TimeBucket>,
111 pub total_events: usize,
112 pub window: TimeWindow,
113 pub time_range: TimeRange,
114}
115
116#[derive(Debug, Serialize)]
117pub struct TimeRange {
118 pub from: DateTime<Utc>,
119 pub to: DateTime<Utc>,
120}
121
122#[derive(Debug, Deserialize)]
124pub struct StatsSummaryRequest {
125 pub entity_id: Option<String>,
127
128 pub event_type: Option<String>,
130
131 pub since: Option<DateTime<Utc>>,
133
134 pub until: Option<DateTime<Utc>>,
136}
137
138#[derive(Debug, Serialize)]
140pub struct StatsSummaryResponse {
141 pub total_events: usize,
142 pub unique_entities: usize,
143 pub unique_event_types: usize,
144 pub time_range: TimeRange,
145 pub events_per_day: f64,
146 pub top_event_types: Vec<EventTypeCount>,
147 pub top_entities: Vec<EntityCount>,
148 pub first_event: Option<DateTime<Utc>>,
149 pub last_event: Option<DateTime<Utc>>,
150}
151
152#[derive(Debug, Serialize)]
153pub struct EventTypeCount {
154 pub event_type: String,
155 pub count: usize,
156 pub percentage: f64,
157}
158
159#[derive(Debug, Serialize)]
160pub struct EntityCount {
161 pub entity_id: String,
162 pub count: usize,
163 pub percentage: f64,
164}
165
166#[derive(Debug, Deserialize)]
168pub struct CorrelationRequest {
169 pub event_type_a: String,
171
172 pub event_type_b: String,
174
175 pub time_window_seconds: i64,
177
178 pub since: Option<DateTime<Utc>>,
180
181 pub until: Option<DateTime<Utc>>,
183}
184
185#[derive(Debug, Serialize)]
187pub struct CorrelationResponse {
188 pub event_type_a: String,
189 pub event_type_b: String,
190 pub total_a: usize,
191 pub total_b: usize,
192 pub correlated_pairs: usize,
193 pub correlation_percentage: f64,
194 pub avg_time_between_seconds: f64,
195 pub examples: Vec<CorrelationExample>,
196}
197
198#[derive(Debug, Serialize)]
199pub struct CorrelationExample {
200 pub entity_id: String,
201 pub event_a_timestamp: DateTime<Utc>,
202 pub event_b_timestamp: DateTime<Utc>,
203 pub time_between_seconds: i64,
204}
205
206pub struct AnalyticsEngine;
208
209impl AnalyticsEngine {
210 pub fn event_frequency(
212 store: &EventStore,
213 request: EventFrequencyRequest,
214 ) -> Result<EventFrequencyResponse> {
215 let until = request.until.unwrap_or_else(Utc::now);
216
217 let events = store.query(crate::application::dto::QueryEventsRequest {
219 entity_id: request.entity_id.clone(),
220 event_type: request.event_type.clone(),
221 tenant_id: None,
222 as_of: None,
223 since: Some(request.since),
224 until: Some(until),
225 limit: None,
226 })?;
227
228 if events.is_empty() {
229 return Ok(EventFrequencyResponse {
230 buckets: Vec::new(),
231 total_events: 0,
232 window: request.window,
233 time_range: TimeRange {
234 from: request.since,
235 to: until,
236 },
237 });
238 }
239
240 let mut buckets_map: HashMap<DateTime<Utc>, HashMap<String, usize>> = HashMap::new();
242
243 for event in &events {
244 let bucket_time = request.window.truncate(event.timestamp);
245 let bucket = buckets_map.entry(bucket_time).or_insert_with(HashMap::new);
246 *bucket.entry(event.event_type_str().to_string()).or_insert(0) += 1;
247 }
248
249 let mut buckets: Vec<TimeBucket> = buckets_map
251 .into_iter()
252 .map(|(timestamp, event_types)| {
253 let count = event_types.values().sum();
254 TimeBucket {
255 timestamp,
256 count,
257 event_types,
258 }
259 })
260 .collect();
261
262 buckets.sort_by_key(|b| b.timestamp);
263
264 let filled_buckets = Self::fill_time_gaps(&buckets, request.since, until, request.window);
266
267 Ok(EventFrequencyResponse {
268 total_events: events.len(),
269 buckets: filled_buckets,
270 window: request.window,
271 time_range: TimeRange {
272 from: request.since,
273 to: until,
274 },
275 })
276 }
277
278 fn fill_time_gaps(
280 buckets: &[TimeBucket],
281 start: DateTime<Utc>,
282 end: DateTime<Utc>,
283 window: TimeWindow,
284 ) -> Vec<TimeBucket> {
285 if buckets.is_empty() {
286 return Vec::new();
287 }
288
289 let mut filled = Vec::new();
290 let mut current = window.truncate(start);
291 let end = window.truncate(end);
292
293 let bucket_map: HashMap<DateTime<Utc>, &TimeBucket> =
294 buckets.iter().map(|b| (b.timestamp, b)).collect();
295
296 while current <= end {
297 if let Some(bucket) = bucket_map.get(¤t) {
298 filled.push((**bucket).clone());
299 } else {
300 filled.push(TimeBucket {
301 timestamp: current,
302 count: 0,
303 event_types: HashMap::new(),
304 });
305 }
306 current = current + window.duration();
307 }
308
309 filled
310 }
311
312 pub fn stats_summary(
314 store: &EventStore,
315 request: StatsSummaryRequest,
316 ) -> Result<StatsSummaryResponse> {
317 let events = store.query(crate::application::dto::QueryEventsRequest {
319 entity_id: request.entity_id.clone(),
320 event_type: request.event_type.clone(),
321 tenant_id: None,
322 as_of: None,
323 since: request.since,
324 until: request.until,
325 limit: None,
326 })?;
327
328 if events.is_empty() {
329 return Err(AllSourceError::ValidationError(
330 "No events found for the specified criteria".to_string(),
331 ));
332 }
333
334 let first_event = events.first().map(|e| e.timestamp);
336 let last_event = events.last().map(|e| e.timestamp);
337
338 let mut entity_counts: HashMap<String, usize> = HashMap::new();
339 let mut event_type_counts: HashMap<String, usize> = HashMap::new();
340
341 for event in &events {
342 *entity_counts.entry(event.entity_id_str().to_string()).or_insert(0) += 1;
343 *event_type_counts.entry(event.event_type_str().to_string()).or_insert(0) += 1;
344 }
345
346 let time_span = if let (Some(first), Some(last)) = (first_event, last_event) {
348 (last - first).num_days().max(1) as f64
349 } else {
350 1.0
351 };
352
353 let events_per_day = events.len() as f64 / time_span;
354
355 let mut top_event_types: Vec<EventTypeCount> = event_type_counts
357 .into_iter()
358 .map(|(event_type, count)| EventTypeCount {
359 event_type,
360 count,
361 percentage: (count as f64 / events.len() as f64) * 100.0,
362 })
363 .collect();
364 top_event_types.sort_by(|a, b| b.count.cmp(&a.count));
365 top_event_types.truncate(10);
366
367 let mut top_entities: Vec<EntityCount> = entity_counts
369 .into_iter()
370 .map(|(entity_id, count)| EntityCount {
371 entity_id,
372 count,
373 percentage: (count as f64 / events.len() as f64) * 100.0,
374 })
375 .collect();
376 top_entities.sort_by(|a, b| b.count.cmp(&a.count));
377 top_entities.truncate(10);
378
379 let time_range = TimeRange {
380 from: first_event.unwrap_or_else(Utc::now),
381 to: last_event.unwrap_or_else(Utc::now),
382 };
383
384 Ok(StatsSummaryResponse {
385 total_events: events.len(),
386 unique_entities: top_entities.len(),
387 unique_event_types: top_event_types.len(),
388 time_range,
389 events_per_day,
390 top_event_types,
391 top_entities,
392 first_event,
393 last_event,
394 })
395 }
396
397 pub fn analyze_correlation(
399 store: &EventStore,
400 request: CorrelationRequest,
401 ) -> Result<CorrelationResponse> {
402 let events_a = store.query(crate::application::dto::QueryEventsRequest {
404 entity_id: None,
405 event_type: Some(request.event_type_a.clone()),
406 tenant_id: None,
407 as_of: None,
408 since: request.since,
409 until: request.until,
410 limit: None,
411 })?;
412
413 let events_b = store.query(crate::application::dto::QueryEventsRequest {
414 entity_id: None,
415 event_type: Some(request.event_type_b.clone()),
416 tenant_id: None,
417 as_of: None,
418 since: request.since,
419 until: request.until,
420 limit: None,
421 })?;
422
423 let mut entity_events_a: HashMap<String, Vec<&Event>> = HashMap::new();
425 let mut entity_events_b: HashMap<String, Vec<&Event>> = HashMap::new();
426
427 for event in &events_a {
428 entity_events_a
429 .entry(event.entity_id_str().to_string())
430 .or_insert_with(Vec::new)
431 .push(event);
432 }
433
434 for event in &events_b {
435 entity_events_b
436 .entry(event.entity_id_str().to_string())
437 .or_insert_with(Vec::new)
438 .push(event);
439 }
440
441 let mut correlated_pairs = 0;
443 let mut total_time_between = 0i64;
444 let mut examples = Vec::new();
445
446 for (entity_id, a_events) in &entity_events_a {
447 if let Some(b_events) = entity_events_b.get(entity_id) {
448 for a_event in a_events {
449 for b_event in b_events {
450 let time_diff = (b_event.timestamp - a_event.timestamp).num_seconds().abs();
451
452 if time_diff <= request.time_window_seconds {
453 correlated_pairs += 1;
454 total_time_between += time_diff;
455
456 if examples.len() < 5 {
457 examples.push(CorrelationExample {
458 entity_id: entity_id.clone(),
459 event_a_timestamp: a_event.timestamp,
460 event_b_timestamp: b_event.timestamp,
461 time_between_seconds: time_diff,
462 });
463 }
464 }
465 }
466 }
467 }
468 }
469
470 let correlation_percentage = if !events_a.is_empty() {
471 (correlated_pairs as f64 / events_a.len() as f64) * 100.0
472 } else {
473 0.0
474 };
475
476 let avg_time_between = if correlated_pairs > 0 {
477 total_time_between as f64 / correlated_pairs as f64
478 } else {
479 0.0
480 };
481
482 Ok(CorrelationResponse {
483 event_type_a: request.event_type_a,
484 event_type_b: request.event_type_b,
485 total_a: events_a.len(),
486 total_b: events_b.len(),
487 correlated_pairs,
488 correlation_percentage,
489 avg_time_between_seconds: avg_time_between,
490 examples,
491 })
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498
499 #[test]
500 fn test_time_window_truncation() {
501 let timestamp = chrono::Utc::now();
502
503 let minute_truncated = TimeWindow::Minute.truncate(timestamp);
504 assert_eq!(minute_truncated.second(), 0);
505
506 let hour_truncated = TimeWindow::Hour.truncate(timestamp);
507 assert_eq!(hour_truncated.minute(), 0);
508 assert_eq!(hour_truncated.second(), 0);
509
510 let day_truncated = TimeWindow::Day.truncate(timestamp);
511 assert_eq!(day_truncated.hour(), 0);
512 assert_eq!(day_truncated.minute(), 0);
513 }
514}