1use crate::types::{ForensicQuery, ForensicResult, QueryType, UserEvent};
10use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
11use std::collections::HashMap;
12
13#[derive(Debug, Clone)]
22pub struct ForensicQueryExecution {
23 metadata: KernelMetadata,
24}
25
26impl Default for ForensicQueryExecution {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl ForensicQueryExecution {
33 #[must_use]
35 pub fn new() -> Self {
36 Self {
37 metadata: KernelMetadata::batch(
38 "behavioral/forensic-query",
39 Domain::BehavioralAnalytics,
40 )
41 .with_description("Forensic query execution for historical analysis")
42 .with_throughput(5_000)
43 .with_latency_us(1000.0),
44 }
45 }
46
47 pub fn compute(query: &ForensicQuery, events: &[UserEvent]) -> ForensicResult {
53 let start_time = std::time::Instant::now();
54
55 let filtered_events: Vec<_> = events
57 .iter()
58 .filter(|e| e.timestamp >= query.start_time && e.timestamp <= query.end_time)
59 .collect();
60
61 let filtered_events: Vec<_> = if let Some(ref user_ids) = query.user_ids {
63 filtered_events
64 .into_iter()
65 .filter(|e| user_ids.contains(&e.user_id))
66 .collect()
67 } else {
68 filtered_events
69 };
70
71 let filtered_events: Vec<_> = if let Some(ref event_types) = query.event_types {
73 filtered_events
74 .into_iter()
75 .filter(|e| event_types.contains(&e.event_type))
76 .collect()
77 } else {
78 filtered_events
79 };
80
81 let (event_ids, summary) = match query.query_type {
83 QueryType::PatternSearch => Self::pattern_search(&filtered_events, &query.filters),
84 QueryType::Timeline => Self::timeline_reconstruction(&filtered_events),
85 QueryType::ActivitySummary => Self::activity_summary(&filtered_events),
86 QueryType::AnomalyHunt => Self::anomaly_hunt(&filtered_events, &query.filters),
87 QueryType::Correlation => Self::correlation_analysis(&filtered_events),
88 };
89
90 let execution_time_ms = start_time.elapsed().as_millis() as u64;
91
92 ForensicResult {
93 query_id: query.id,
94 events: event_ids.clone(),
95 total_matches: event_ids.len() as u64,
96 summary,
97 execution_time_ms,
98 }
99 }
100
101 pub fn compute_batch(queries: &[ForensicQuery], events: &[UserEvent]) -> Vec<ForensicResult> {
103 queries.iter().map(|q| Self::compute(q, events)).collect()
104 }
105
106 fn pattern_search(
108 events: &[&UserEvent],
109 filters: &HashMap<String, String>,
110 ) -> (Vec<u64>, HashMap<String, f64>) {
111 let mut matched_ids = Vec::new();
112 let mut summary = HashMap::new();
113
114 for event in events {
116 let mut matches = true;
117
118 for (key, expected) in filters {
119 match key.as_str() {
120 "event_type_pattern" => {
121 if !event.event_type.contains(expected) {
122 matches = false;
123 }
124 }
125 "device_id" => {
126 if event.device_id.as_ref() != Some(expected) {
127 matches = false;
128 }
129 }
130 "location" => {
131 if event.location.as_ref() != Some(expected) {
132 matches = false;
133 }
134 }
135 "ip_pattern" => {
136 if let Some(ref ip) = event.ip_address {
137 if !ip.contains(expected) {
138 matches = false;
139 }
140 } else {
141 matches = false;
142 }
143 }
144 _ => {}
145 }
146
147 if !matches {
148 break;
149 }
150 }
151
152 if matches {
153 matched_ids.push(event.id);
154 }
155 }
156
157 summary.insert("match_count".to_string(), matched_ids.len() as f64);
158 summary.insert("total_searched".to_string(), events.len() as f64);
159 summary.insert(
160 "match_rate".to_string(),
161 matched_ids.len() as f64 / events.len().max(1) as f64,
162 );
163
164 (matched_ids, summary)
165 }
166
167 fn timeline_reconstruction(events: &[&UserEvent]) -> (Vec<u64>, HashMap<String, f64>) {
169 let mut sorted: Vec<_> = events.iter().collect();
171 sorted.sort_by_key(|e| e.timestamp);
172
173 let event_ids: Vec<_> = sorted.iter().map(|e| e.id).collect();
174 let mut summary = HashMap::new();
175
176 if !sorted.is_empty() {
177 let first_ts = sorted.first().unwrap().timestamp;
178 let last_ts = sorted.last().unwrap().timestamp;
179 let duration = (last_ts - first_ts) as f64;
180
181 summary.insert("timeline_start".to_string(), first_ts as f64);
182 summary.insert("timeline_end".to_string(), last_ts as f64);
183 summary.insert("duration_seconds".to_string(), duration);
184 summary.insert("event_count".to_string(), sorted.len() as f64);
185 summary.insert(
186 "events_per_hour".to_string(),
187 sorted.len() as f64 / (duration / 3600.0).max(1.0),
188 );
189
190 let unique_users: std::collections::HashSet<_> =
192 sorted.iter().map(|e| e.user_id).collect();
193 summary.insert("unique_users".to_string(), unique_users.len() as f64);
194
195 let unique_sessions: std::collections::HashSet<_> =
197 sorted.iter().filter_map(|e| e.session_id).collect();
198 summary.insert("unique_sessions".to_string(), unique_sessions.len() as f64);
199 }
200
201 (event_ids, summary)
202 }
203
204 fn activity_summary(events: &[&UserEvent]) -> (Vec<u64>, HashMap<String, f64>) {
206 let event_ids: Vec<_> = events.iter().map(|e| e.id).collect();
207 let mut summary = HashMap::new();
208
209 let mut type_counts: HashMap<&str, u64> = HashMap::new();
211 for event in events {
212 *type_counts.entry(&event.event_type).or_insert(0) += 1;
213 }
214
215 let total = events.len() as f64;
216 for (event_type, count) in &type_counts {
217 let key = format!("type_{}_count", event_type);
218 summary.insert(key, *count as f64);
219
220 let ratio_key = format!("type_{}_ratio", event_type);
221 summary.insert(ratio_key, *count as f64 / total);
222 }
223
224 let mut hour_counts = [0u64; 24];
226 for event in events {
227 let hour = ((event.timestamp / 3600) % 24) as usize;
228 hour_counts[hour] += 1;
229 }
230
231 let peak_hour = hour_counts
232 .iter()
233 .enumerate()
234 .max_by_key(|&(_, c)| *c)
235 .map(|(h, _)| h)
236 .unwrap_or(0);
237 summary.insert("peak_activity_hour".to_string(), peak_hour as f64);
238
239 let mut user_counts: HashMap<u64, u64> = HashMap::new();
241 for event in events {
242 *user_counts.entry(event.user_id).or_insert(0) += 1;
243 }
244
245 summary.insert("unique_users".to_string(), user_counts.len() as f64);
246
247 if !user_counts.is_empty() {
248 let avg_events_per_user = total / user_counts.len() as f64;
249 summary.insert("avg_events_per_user".to_string(), avg_events_per_user);
250
251 let max_user_events = *user_counts.values().max().unwrap_or(&0);
252 summary.insert("max_user_events".to_string(), max_user_events as f64);
253 }
254
255 let unique_locations: std::collections::HashSet<_> =
257 events.iter().filter_map(|e| e.location.as_ref()).collect();
258 summary.insert(
259 "unique_locations".to_string(),
260 unique_locations.len() as f64,
261 );
262
263 let unique_devices: std::collections::HashSet<_> =
265 events.iter().filter_map(|e| e.device_id.as_ref()).collect();
266 summary.insert("unique_devices".to_string(), unique_devices.len() as f64);
267
268 summary.insert("total_events".to_string(), total);
269
270 (event_ids, summary)
271 }
272
273 fn anomaly_hunt(
275 events: &[&UserEvent],
276 filters: &HashMap<String, String>,
277 ) -> (Vec<u64>, HashMap<String, f64>) {
278 let mut anomalous_ids = Vec::new();
279 let mut summary = HashMap::new();
280
281 let velocity_threshold: f64 = filters
283 .get("velocity_threshold")
284 .and_then(|v| v.parse().ok())
285 .unwrap_or(10.0);
286
287 let _time_anomaly_hours: Vec<u8> = filters
288 .get("unusual_hours")
289 .map(|h| h.split(',').filter_map(|s| s.trim().parse().ok()).collect())
290 .unwrap_or_else(|| vec![0, 1, 2, 3, 4, 5]);
291
292 let mut user_stats: HashMap<u64, UserStats> = HashMap::new();
294
295 for event in events {
296 user_stats
297 .entry(event.user_id)
298 .or_default()
299 .add_event(event);
300 }
301
302 let mut velocity_anomalies = 0u64;
304 let mut time_anomalies = 0u64;
305 let mut location_anomalies = 0u64;
306
307 for event in events {
308 let stats = user_stats.get(&event.user_id).unwrap();
309 let mut is_anomaly = false;
310
311 let hour_window = events
313 .iter()
314 .filter(|e| {
315 e.user_id == event.user_id
316 && e.timestamp <= event.timestamp
317 && e.timestamp > event.timestamp.saturating_sub(3600)
318 })
319 .count();
320
321 if hour_window as f64 > velocity_threshold {
322 is_anomaly = true;
323 velocity_anomalies += 1;
324 }
325
326 let hour = ((event.timestamp / 3600) % 24) as u8;
328 if hour < 6 {
329 is_anomaly = true;
330 time_anomalies += 1;
331 }
332
333 if let Some(ref location) = event.location {
335 if stats.unique_locations.len() > 1
336 && stats.unique_locations.len() < 3
337 && !stats.location_counts.contains_key(location.as_str())
338 {
339 is_anomaly = true;
340 location_anomalies += 1;
341 }
342 }
343
344 if is_anomaly {
345 anomalous_ids.push(event.id);
346 }
347 }
348
349 summary.insert("total_events".to_string(), events.len() as f64);
350 summary.insert("anomalous_events".to_string(), anomalous_ids.len() as f64);
351 summary.insert(
352 "anomaly_rate".to_string(),
353 anomalous_ids.len() as f64 / events.len().max(1) as f64,
354 );
355 summary.insert("velocity_anomalies".to_string(), velocity_anomalies as f64);
356 summary.insert("time_anomalies".to_string(), time_anomalies as f64);
357 summary.insert("location_anomalies".to_string(), location_anomalies as f64);
358
359 (anomalous_ids, summary)
360 }
361
362 fn correlation_analysis(events: &[&UserEvent]) -> (Vec<u64>, HashMap<String, f64>) {
364 let event_ids: Vec<_> = events.iter().map(|e| e.id).collect();
365 let mut summary = HashMap::new();
366
367 if events.len() < 2 {
368 summary.insert("correlation_count".to_string(), 0.0);
369 return (event_ids, summary);
370 }
371
372 let mut sorted: Vec<_> = events.iter().collect();
374 sorted.sort_by_key(|e| e.timestamp);
375
376 let mut pair_counts: HashMap<(&str, &str), u64> = HashMap::new();
378 let mut single_counts: HashMap<&str, u64> = HashMap::new();
379
380 for event in &sorted {
381 *single_counts.entry(&event.event_type).or_insert(0) += 1;
382 }
383
384 for window in sorted.windows(2) {
385 *pair_counts
386 .entry((&window[0].event_type, &window[1].event_type))
387 .or_insert(0) += 1;
388 }
389
390 let total_pairs = (sorted.len() - 1) as f64;
392 let mut correlations: Vec<_> = pair_counts
393 .iter()
394 .map(|((a, b), &count)| {
395 let a_count = single_counts.get(a).copied().unwrap_or(1) as f64;
396 let b_count = single_counts.get(b).copied().unwrap_or(1) as f64;
397
398 let p_ab = count as f64 / total_pairs;
400 let p_a = a_count / sorted.len() as f64;
401 let p_b = b_count / sorted.len() as f64;
402 let lift = p_ab / (p_a * p_b);
403
404 (format!("{}->{}", a, b), lift, count)
405 })
406 .collect();
407
408 correlations.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
409
410 for (i, (pair, lift, count)) in correlations.iter().take(5).enumerate() {
412 summary.insert(format!("top{}_pair", i + 1), 0.0); summary.insert(format!("top{}_lift", i + 1), *lift);
414 summary.insert(format!("top{}_count", i + 1), *count as f64);
415 let _ = pair; }
418
419 summary.insert("unique_event_types".to_string(), single_counts.len() as f64);
420 summary.insert("unique_pairs".to_string(), pair_counts.len() as f64);
421 summary.insert("total_transitions".to_string(), total_pairs);
422
423 (event_ids, summary)
424 }
425
426 pub fn pattern_search_query(
428 id: u64,
429 start_time: u64,
430 end_time: u64,
431 pattern: &str,
432 ) -> ForensicQuery {
433 let mut filters = HashMap::new();
434 filters.insert("event_type_pattern".to_string(), pattern.to_string());
435
436 ForensicQuery {
437 id,
438 query_type: QueryType::PatternSearch,
439 start_time,
440 end_time,
441 user_ids: None,
442 event_types: None,
443 filters,
444 }
445 }
446
447 pub fn timeline_query(
449 id: u64,
450 start_time: u64,
451 end_time: u64,
452 user_ids: Option<Vec<u64>>,
453 ) -> ForensicQuery {
454 ForensicQuery {
455 id,
456 query_type: QueryType::Timeline,
457 start_time,
458 end_time,
459 user_ids,
460 event_types: None,
461 filters: HashMap::new(),
462 }
463 }
464}
465
466impl GpuKernel for ForensicQueryExecution {
467 fn metadata(&self) -> &KernelMetadata {
468 &self.metadata
469 }
470}
471
472#[derive(Debug, Default)]
474struct UserStats {
475 event_count: u64,
476 unique_locations: std::collections::HashSet<String>,
477 location_counts: HashMap<String, u64>,
478 unique_devices: std::collections::HashSet<String>,
479}
480
481impl UserStats {
482 fn add_event(&mut self, event: &UserEvent) {
483 self.event_count += 1;
484
485 if let Some(ref loc) = event.location {
486 self.unique_locations.insert(loc.clone());
487 *self.location_counts.entry(loc.clone()).or_insert(0) += 1;
488 }
489
490 if let Some(ref dev) = event.device_id {
491 self.unique_devices.insert(dev.clone());
492 }
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 fn create_test_events() -> Vec<UserEvent> {
501 let base_ts = 1700000000u64;
502 vec![
503 UserEvent {
504 id: 1,
505 user_id: 100,
506 event_type: "login".to_string(),
507 timestamp: base_ts,
508 attributes: HashMap::new(),
509 session_id: Some(1),
510 device_id: Some("device_a".to_string()),
511 ip_address: Some("192.168.1.1".to_string()),
512 location: Some("US".to_string()),
513 },
514 UserEvent {
515 id: 2,
516 user_id: 100,
517 event_type: "view".to_string(),
518 timestamp: base_ts + 60,
519 attributes: HashMap::new(),
520 session_id: Some(1),
521 device_id: Some("device_a".to_string()),
522 ip_address: Some("192.168.1.1".to_string()),
523 location: Some("US".to_string()),
524 },
525 UserEvent {
526 id: 3,
527 user_id: 100,
528 event_type: "purchase".to_string(),
529 timestamp: base_ts + 120,
530 attributes: HashMap::new(),
531 session_id: Some(1),
532 device_id: Some("device_a".to_string()),
533 ip_address: Some("192.168.1.1".to_string()),
534 location: Some("US".to_string()),
535 },
536 UserEvent {
537 id: 4,
538 user_id: 200,
539 event_type: "login".to_string(),
540 timestamp: base_ts + 30,
541 attributes: HashMap::new(),
542 session_id: Some(2),
543 device_id: Some("device_b".to_string()),
544 ip_address: Some("10.0.0.1".to_string()),
545 location: Some("UK".to_string()),
546 },
547 UserEvent {
548 id: 5,
549 user_id: 200,
550 event_type: "logout".to_string(),
551 timestamp: base_ts + 180,
552 attributes: HashMap::new(),
553 session_id: Some(2),
554 device_id: Some("device_b".to_string()),
555 ip_address: Some("10.0.0.1".to_string()),
556 location: Some("UK".to_string()),
557 },
558 ]
559 }
560
561 #[test]
562 fn test_forensic_query_metadata() {
563 let kernel = ForensicQueryExecution::new();
564 assert_eq!(kernel.metadata().id, "behavioral/forensic-query");
565 assert_eq!(kernel.metadata().domain, Domain::BehavioralAnalytics);
566 }
567
568 #[test]
569 fn test_pattern_search() {
570 let events = create_test_events();
571 let query =
572 ForensicQueryExecution::pattern_search_query(1, 1700000000, 1700000500, "login");
573
574 let result = ForensicQueryExecution::compute(&query, &events);
575
576 assert_eq!(result.query_id, 1);
577 assert!(result.total_matches > 0);
578 assert!(result.summary.contains_key("match_count"));
579 }
580
581 #[test]
582 fn test_timeline_reconstruction() {
583 let events = create_test_events();
584 let query = ForensicQuery {
585 id: 2,
586 query_type: QueryType::Timeline,
587 start_time: 1700000000,
588 end_time: 1700000500,
589 user_ids: Some(vec![100]),
590 event_types: None,
591 filters: HashMap::new(),
592 };
593
594 let result = ForensicQueryExecution::compute(&query, &events);
595
596 assert_eq!(result.query_id, 2);
597 assert_eq!(result.total_matches, 3); assert!(result.summary.contains_key("duration_seconds"));
599 }
600
601 #[test]
602 fn test_activity_summary() {
603 let events = create_test_events();
604 let query = ForensicQuery {
605 id: 3,
606 query_type: QueryType::ActivitySummary,
607 start_time: 1700000000,
608 end_time: 1700000500,
609 user_ids: None,
610 event_types: None,
611 filters: HashMap::new(),
612 };
613
614 let result = ForensicQueryExecution::compute(&query, &events);
615
616 assert!(result.summary.contains_key("unique_users"));
617 assert!(result.summary.contains_key("total_events"));
618 assert_eq!(result.summary.get("unique_users").copied(), Some(2.0));
619 }
620
621 #[test]
622 fn test_anomaly_hunt() {
623 let events = create_test_events();
624 let query = ForensicQuery {
625 id: 4,
626 query_type: QueryType::AnomalyHunt,
627 start_time: 1700000000,
628 end_time: 1700000500,
629 user_ids: None,
630 event_types: None,
631 filters: HashMap::new(),
632 };
633
634 let result = ForensicQueryExecution::compute(&query, &events);
635
636 assert!(result.summary.contains_key("anomaly_rate"));
637 assert!(result.summary.contains_key("velocity_anomalies"));
638 }
639
640 #[test]
641 fn test_correlation_analysis() {
642 let events = create_test_events();
643 let query = ForensicQuery {
644 id: 5,
645 query_type: QueryType::Correlation,
646 start_time: 1700000000,
647 end_time: 1700000500,
648 user_ids: None,
649 event_types: None,
650 filters: HashMap::new(),
651 };
652
653 let result = ForensicQueryExecution::compute(&query, &events);
654
655 assert!(result.summary.contains_key("unique_pairs"));
656 assert!(result.summary.contains_key("total_transitions"));
657 }
658
659 #[test]
660 fn test_user_filter() {
661 let events = create_test_events();
662 let query = ForensicQuery {
663 id: 6,
664 query_type: QueryType::ActivitySummary,
665 start_time: 1700000000,
666 end_time: 1700000500,
667 user_ids: Some(vec![100]),
668 event_types: None,
669 filters: HashMap::new(),
670 };
671
672 let result = ForensicQueryExecution::compute(&query, &events);
673
674 assert_eq!(result.summary.get("unique_users").copied(), Some(1.0));
675 }
676
677 #[test]
678 fn test_event_type_filter() {
679 let events = create_test_events();
680 let query = ForensicQuery {
681 id: 7,
682 query_type: QueryType::ActivitySummary,
683 start_time: 1700000000,
684 end_time: 1700000500,
685 user_ids: None,
686 event_types: Some(vec!["login".to_string()]),
687 filters: HashMap::new(),
688 };
689
690 let result = ForensicQueryExecution::compute(&query, &events);
691
692 assert_eq!(result.total_matches, 2); }
694
695 #[test]
696 fn test_time_filter() {
697 let events = create_test_events();
698 let query = ForensicQuery {
699 id: 8,
700 query_type: QueryType::Timeline,
701 start_time: 1700000050, end_time: 1700000130, user_ids: None,
704 event_types: None,
705 filters: HashMap::new(),
706 };
707
708 let result = ForensicQueryExecution::compute(&query, &events);
709
710 assert!(result.total_matches < 5);
712 }
713
714 #[test]
715 fn test_batch_queries() {
716 let events = create_test_events();
717 let queries = vec![
718 ForensicQuery {
719 id: 1,
720 query_type: QueryType::ActivitySummary,
721 start_time: 1700000000,
722 end_time: 1700000500,
723 user_ids: None,
724 event_types: None,
725 filters: HashMap::new(),
726 },
727 ForensicQuery {
728 id: 2,
729 query_type: QueryType::Timeline,
730 start_time: 1700000000,
731 end_time: 1700000500,
732 user_ids: Some(vec![100]),
733 event_types: None,
734 filters: HashMap::new(),
735 },
736 ];
737
738 let results = ForensicQueryExecution::compute_batch(&queries, &events);
739
740 assert_eq!(results.len(), 2);
741 assert_eq!(results[0].query_id, 1);
742 assert_eq!(results[1].query_id, 2);
743 }
744
745 #[test]
746 fn test_execution_time_tracking() {
747 let events = create_test_events();
748 let query = ForensicQuery {
749 id: 1,
750 query_type: QueryType::ActivitySummary,
751 start_time: 1700000000,
752 end_time: 1700000500,
753 user_ids: None,
754 event_types: None,
755 filters: HashMap::new(),
756 };
757
758 let result = ForensicQueryExecution::compute(&query, &events);
759
760 assert!(result.execution_time_ms < 1000); }
763}