1use crate::types::{
9 AnomalyResult, AnomalyType, BehaviorProfile, FeatureDeviation, ProfilingResult, UserEvent,
10};
11use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
12use std::collections::HashMap;
13
14#[derive(Debug, Clone)]
23pub struct BehavioralProfiling {
24 metadata: KernelMetadata,
25}
26
27impl Default for BehavioralProfiling {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl BehavioralProfiling {
34 #[must_use]
36 pub fn new() -> Self {
37 Self {
38 metadata: KernelMetadata::ring("behavioral/profiling", Domain::BehavioralAnalytics)
39 .with_description("Behavioral feature extraction and profiling")
40 .with_throughput(100_000)
41 .with_latency_us(50.0),
42 }
43 }
44
45 pub fn compute(
52 user_id: u64,
53 events: &[UserEvent],
54 feature_config: &FeatureConfig,
55 ) -> ProfilingResult {
56 if events.is_empty() {
57 return ProfilingResult {
58 user_id,
59 features: Vec::new(),
60 stability: 0.0,
61 confidence: 0.0,
62 };
63 }
64
65 let mut features = Vec::new();
66
67 if feature_config.extract_temporal {
69 let temporal = Self::extract_temporal_features(events);
70 features.extend(temporal);
71 }
72
73 if feature_config.extract_frequency {
75 let frequency = Self::extract_frequency_features(events);
76 features.extend(frequency);
77 }
78
79 if feature_config.extract_session {
81 let session = Self::extract_session_features(events);
82 features.extend(session);
83 }
84
85 if feature_config.extract_device_location {
87 let device_loc = Self::extract_device_location_features(events);
88 features.extend(device_loc);
89 }
90
91 let stability = Self::calculate_stability(events, &features);
93
94 let confidence = Self::calculate_confidence(events);
96
97 ProfilingResult {
98 user_id,
99 features,
100 stability,
101 confidence,
102 }
103 }
104
105 pub fn build_profile(result: &ProfilingResult, timestamp: u64) -> BehaviorProfile {
107 let feature_names: Vec<String> = result.features.iter().map(|(n, _)| n.clone()).collect();
108 let feature_values: Vec<f64> = result.features.iter().map(|(_, v)| *v).collect();
109
110 BehaviorProfile {
111 user_id: result.user_id,
112 features: feature_values,
113 feature_names,
114 created_at: timestamp,
115 updated_at: timestamp,
116 event_count: 0, }
118 }
119
120 fn extract_temporal_features(events: &[UserEvent]) -> Vec<(String, f64)> {
122 let mut features = Vec::new();
123
124 let mut hour_counts = [0u32; 24];
126 for event in events {
127 let hour = ((event.timestamp / 3600) % 24) as usize;
128 hour_counts[hour] += 1;
129 }
130
131 let total = events.len() as f64;
132
133 let peak_hour = hour_counts
135 .iter()
136 .enumerate()
137 .max_by_key(|&(_, c)| *c)
138 .map(|(h, _)| h)
139 .unwrap_or(0);
140 features.push(("peak_hour".to_string(), peak_hour as f64));
141
142 let business_hours: u32 = hour_counts[9..18].iter().sum();
144 features.push((
145 "business_hours_ratio".to_string(),
146 business_hours as f64 / total,
147 ));
148
149 let night_hours: u32 =
151 hour_counts[22..24].iter().sum::<u32>() + hour_counts[0..6].iter().sum::<u32>();
152 features.push((
153 "night_activity_ratio".to_string(),
154 night_hours as f64 / total,
155 ));
156
157 let weekend_events = events
159 .iter()
160 .filter(|e| {
161 let day = (e.timestamp / 86400) % 7;
162 day == 5 || day == 6 })
164 .count();
165 features.push(("weekend_ratio".to_string(), weekend_events as f64 / total));
166
167 let hour_entropy = Self::calculate_entropy(&hour_counts);
169 features.push(("hour_entropy".to_string(), hour_entropy));
170
171 features
172 }
173
174 fn extract_frequency_features(events: &[UserEvent]) -> Vec<(String, f64)> {
176 let mut features = Vec::new();
177
178 if events.len() < 2 {
179 features.push(("avg_events_per_day".to_string(), 0.0));
180 features.push(("event_rate_variance".to_string(), 0.0));
181 return features;
182 }
183
184 let min_ts = events.iter().map(|e| e.timestamp).min().unwrap();
186 let max_ts = events.iter().map(|e| e.timestamp).max().unwrap();
187 let span_days = ((max_ts - min_ts) as f64 / 86400.0).max(1.0);
188
189 let avg_per_day = events.len() as f64 / span_days;
191 features.push(("avg_events_per_day".to_string(), avg_per_day));
192
193 let mut type_counts: HashMap<&str, u32> = HashMap::new();
195 for event in events {
196 *type_counts.entry(&event.event_type).or_insert(0) += 1;
197 }
198
199 let max_type_count = type_counts.values().max().copied().unwrap_or(0);
201 features.push((
202 "dominant_event_ratio".to_string(),
203 max_type_count as f64 / events.len() as f64,
204 ));
205
206 features.push((
208 "event_type_diversity".to_string(),
209 type_counts.len() as f64 / events.len() as f64,
210 ));
211
212 let mut inter_times: Vec<f64> = Vec::new();
214 let mut sorted_events: Vec<_> = events.iter().collect();
215 sorted_events.sort_by_key(|e| e.timestamp);
216
217 for window in sorted_events.windows(2) {
218 inter_times.push((window[1].timestamp - window[0].timestamp) as f64);
219 }
220
221 if !inter_times.is_empty() {
222 let mean_inter = inter_times.iter().sum::<f64>() / inter_times.len() as f64;
223 features.push(("mean_inter_event_time".to_string(), mean_inter));
224
225 let variance = inter_times
226 .iter()
227 .map(|t| (t - mean_inter).powi(2))
228 .sum::<f64>()
229 / inter_times.len() as f64;
230 features.push(("inter_event_variance".to_string(), variance.sqrt()));
231 }
232
233 features
234 }
235
236 fn extract_session_features(events: &[UserEvent]) -> Vec<(String, f64)> {
238 let mut features = Vec::new();
239
240 let mut sessions: HashMap<u64, Vec<&UserEvent>> = HashMap::new();
242 let mut no_session_count = 0;
243
244 for event in events {
245 if let Some(session_id) = event.session_id {
246 sessions.entry(session_id).or_default().push(event);
247 } else {
248 no_session_count += 1;
249 }
250 }
251
252 let session_count = sessions.len();
253 features.push(("session_count".to_string(), session_count as f64));
254
255 if session_count > 0 {
256 let avg_events_per_session =
258 events.len() as f64 / (session_count + (no_session_count > 0) as usize) as f64;
259 features.push(("avg_events_per_session".to_string(), avg_events_per_session));
260
261 let session_durations: Vec<f64> = sessions
263 .values()
264 .map(|session_events| {
265 let min_ts = session_events.iter().map(|e| e.timestamp).min().unwrap();
266 let max_ts = session_events.iter().map(|e| e.timestamp).max().unwrap();
267 (max_ts - min_ts) as f64
268 })
269 .collect();
270
271 let avg_duration =
272 session_durations.iter().sum::<f64>() / session_durations.len() as f64;
273 features.push(("avg_session_duration".to_string(), avg_duration));
274 } else {
275 features.push(("avg_events_per_session".to_string(), 0.0));
276 features.push(("avg_session_duration".to_string(), 0.0));
277 }
278
279 features
280 }
281
282 fn extract_device_location_features(events: &[UserEvent]) -> Vec<(String, f64)> {
284 let mut features = Vec::new();
285
286 let unique_devices: std::collections::HashSet<_> =
288 events.iter().filter_map(|e| e.device_id.as_ref()).collect();
289 features.push((
290 "unique_device_count".to_string(),
291 unique_devices.len() as f64,
292 ));
293
294 let unique_locations: std::collections::HashSet<_> =
296 events.iter().filter_map(|e| e.location.as_ref()).collect();
297 features.push((
298 "unique_location_count".to_string(),
299 unique_locations.len() as f64,
300 ));
301
302 let device_switches = Self::count_switches(
304 &events
305 .iter()
306 .filter_map(|e| e.device_id.as_deref())
307 .collect::<Vec<_>>(),
308 );
309 features.push((
310 "device_switch_rate".to_string(),
311 device_switches as f64 / events.len().max(1) as f64,
312 ));
313
314 let location_switches = Self::count_switches(
316 &events
317 .iter()
318 .filter_map(|e| e.location.as_deref())
319 .collect::<Vec<_>>(),
320 );
321 features.push((
322 "location_switch_rate".to_string(),
323 location_switches as f64 / events.len().max(1) as f64,
324 ));
325
326 features
327 }
328
329 fn count_switches(sequence: &[&str]) -> usize {
331 if sequence.len() < 2 {
332 return 0;
333 }
334 sequence.windows(2).filter(|w| w[0] != w[1]).count()
335 }
336
337 fn calculate_entropy(counts: &[u32]) -> f64 {
339 let total: u32 = counts.iter().sum();
340 if total == 0 {
341 return 0.0;
342 }
343
344 let mut entropy = 0.0;
345 for &count in counts {
346 if count > 0 {
347 let p = count as f64 / total as f64;
348 entropy -= p * p.ln();
349 }
350 }
351 entropy
352 }
353
354 fn calculate_stability(events: &[UserEvent], features: &[(String, f64)]) -> f64 {
356 if events.len() < 10 || features.is_empty() {
357 return 0.0;
358 }
359
360 let mid = events.len() / 2;
362 let config = FeatureConfig::default();
363
364 let first_half = Self::compute(0, &events[..mid], &config);
365 let second_half = Self::compute(0, &events[mid..], &config);
366
367 let first_map: HashMap<_, _> = first_half.features.into_iter().collect();
369 let second_map: HashMap<_, _> = second_half.features.into_iter().collect();
370
371 let mut correlations = Vec::new();
372 for (name, v1) in &first_map {
373 if let Some(&v2) = second_map.get(name) {
374 if v1.abs() > 0.001 || v2.abs() > 0.001 {
375 let similarity = 1.0 - (v1 - v2).abs() / (v1.abs() + v2.abs() + 0.001);
376 correlations.push(similarity);
377 }
378 }
379 }
380
381 if correlations.is_empty() {
382 return 0.5;
383 }
384
385 correlations.iter().sum::<f64>() / correlations.len() as f64
386 }
387
388 fn calculate_confidence(events: &[UserEvent]) -> f64 {
390 if events.is_empty() {
391 return 0.0;
392 }
393
394 let count_factor = (events.len() as f64 / 100.0).min(1.0);
396
397 let min_ts = events.iter().map(|e| e.timestamp).min().unwrap();
399 let max_ts = events.iter().map(|e| e.timestamp).max().unwrap();
400 let span_days = (max_ts - min_ts) as f64 / 86400.0;
401 let span_factor = (span_days / 30.0).min(1.0); let density = events.len() as f64 / span_days.max(1.0);
405 let density_factor = (density / 10.0).min(1.0); count_factor * 0.4 + span_factor * 0.3 + density_factor * 0.3
408 }
409}
410
411impl GpuKernel for BehavioralProfiling {
412 fn metadata(&self) -> &KernelMetadata {
413 &self.metadata
414 }
415}
416
417#[derive(Debug, Clone)]
419pub struct FeatureConfig {
420 pub extract_temporal: bool,
422 pub extract_frequency: bool,
424 pub extract_session: bool,
426 pub extract_device_location: bool,
428}
429
430impl Default for FeatureConfig {
431 fn default() -> Self {
432 Self {
433 extract_temporal: true,
434 extract_frequency: true,
435 extract_session: true,
436 extract_device_location: true,
437 }
438 }
439}
440
441#[derive(Debug, Clone)]
449pub struct AnomalyProfiling {
450 metadata: KernelMetadata,
451}
452
453impl Default for AnomalyProfiling {
454 fn default() -> Self {
455 Self::new()
456 }
457}
458
459impl AnomalyProfiling {
460 #[must_use]
462 pub fn new() -> Self {
463 Self {
464 metadata: KernelMetadata::ring("behavioral/anomaly", Domain::BehavioralAnalytics)
465 .with_description("Behavioral anomaly detection")
466 .with_throughput(200_000)
467 .with_latency_us(25.0),
468 }
469 }
470
471 pub fn compute(
479 event: &UserEvent,
480 profile: &BehaviorProfile,
481 recent_events: &[UserEvent],
482 threshold: f64,
483 ) -> AnomalyResult {
484 let mut deviations = Vec::new();
485 let mut total_score: f64 = 0.0;
486 let mut anomaly_types = Vec::new();
487
488 let hour = ((event.timestamp / 3600) % 24) as f64;
490 if let Some(expected_hour) = profile.get_feature("peak_hour") {
491 let hour_diff = (hour - expected_hour)
492 .abs()
493 .min(24.0 - (hour - expected_hour).abs());
494 let hour_score = (hour_diff / 12.0) * 100.0;
495
496 if hour_score > 30.0 {
497 deviations.push(FeatureDeviation {
498 feature_name: "hour".to_string(),
499 expected: expected_hour,
500 actual: hour,
501 z_score: hour_diff / 6.0,
502 contribution: hour_score * 0.2,
503 });
504 anomaly_types.push(AnomalyType::Temporal);
505 }
506 total_score += hour_score * 0.2;
507 }
508
509 if let Some(location) = &event.location {
511 if let Some(unique_locs) = profile.get_feature("unique_location_count") {
512 if unique_locs < 3.0 {
514 let known_locations: std::collections::HashSet<_> = recent_events
516 .iter()
517 .filter_map(|e| e.location.as_ref())
518 .collect();
519
520 if !known_locations.contains(location) {
521 let geo_score = 50.0;
522 deviations.push(FeatureDeviation {
523 feature_name: "location".to_string(),
524 expected: unique_locs,
525 actual: unique_locs + 1.0,
526 z_score: 2.0,
527 contribution: geo_score * 0.25,
528 });
529 anomaly_types.push(AnomalyType::Geographic);
530 total_score += geo_score * 0.25;
531 }
532 }
533 }
534 }
535
536 if let Some(device) = &event.device_id {
538 if let Some(unique_devices) = profile.get_feature("unique_device_count") {
539 if unique_devices < 3.0 {
540 let known_devices: std::collections::HashSet<_> = recent_events
541 .iter()
542 .filter_map(|e| e.device_id.as_ref())
543 .collect();
544
545 if !known_devices.contains(device) {
546 let device_score = 40.0;
547 deviations.push(FeatureDeviation {
548 feature_name: "device".to_string(),
549 expected: unique_devices,
550 actual: unique_devices + 1.0,
551 z_score: 1.5,
552 contribution: device_score * 0.2,
553 });
554 anomaly_types.push(AnomalyType::Device);
555 total_score += device_score * 0.2;
556 }
557 }
558 }
559 }
560
561 let window_start = event.timestamp.saturating_sub(3600); let recent_count = recent_events
564 .iter()
565 .filter(|e| e.timestamp >= window_start && e.timestamp <= event.timestamp)
566 .count();
567
568 if let Some(avg_per_day) = profile.get_feature("avg_events_per_day") {
569 let expected_per_hour = avg_per_day / 24.0;
570 if recent_count as f64 > expected_per_hour * 5.0 {
571 let velocity_score = ((recent_count as f64 / expected_per_hour) - 1.0).min(100.0);
572 deviations.push(FeatureDeviation {
573 feature_name: "velocity".to_string(),
574 expected: expected_per_hour,
575 actual: recent_count as f64,
576 z_score: (recent_count as f64 - expected_per_hour) / expected_per_hour.max(1.0),
577 contribution: velocity_score * 0.35,
578 });
579 anomaly_types.push(AnomalyType::Velocity);
580 total_score += velocity_score * 0.35;
581 }
582 }
583
584 let anomaly_type = if anomaly_types.is_empty() {
586 None
587 } else if anomaly_types.len() > 1 {
588 Some(AnomalyType::Mixed)
589 } else {
590 Some(anomaly_types[0])
591 };
592
593 AnomalyResult {
594 user_id: event.user_id,
595 event_id: event.id,
596 anomaly_score: total_score.min(100.0),
597 is_anomaly: total_score >= threshold,
598 deviations,
599 anomaly_type,
600 }
601 }
602
603 pub fn compute_batch(
605 events: &[UserEvent],
606 profile: &BehaviorProfile,
607 threshold: f64,
608 ) -> Vec<AnomalyResult> {
609 let mut results = Vec::new();
610
611 for (i, event) in events.iter().enumerate() {
612 let recent = &events[..i];
614 let result = Self::compute(event, profile, recent, threshold);
615 results.push(result);
616 }
617
618 results
619 }
620}
621
622impl GpuKernel for AnomalyProfiling {
623 fn metadata(&self) -> &KernelMetadata {
624 &self.metadata
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631
632 fn create_test_events() -> Vec<UserEvent> {
633 let base_ts = 1700000000u64;
634 vec![
635 UserEvent {
636 id: 1,
637 user_id: 100,
638 event_type: "login".to_string(),
639 timestamp: base_ts + 36000, attributes: HashMap::new(),
641 session_id: Some(1),
642 device_id: Some("device_a".to_string()),
643 ip_address: Some("192.168.1.1".to_string()),
644 location: Some("US".to_string()),
645 },
646 UserEvent {
647 id: 2,
648 user_id: 100,
649 event_type: "view".to_string(),
650 timestamp: base_ts + 36300, attributes: HashMap::new(),
652 session_id: Some(1),
653 device_id: Some("device_a".to_string()),
654 ip_address: Some("192.168.1.1".to_string()),
655 location: Some("US".to_string()),
656 },
657 UserEvent {
658 id: 3,
659 user_id: 100,
660 event_type: "purchase".to_string(),
661 timestamp: base_ts + 37800, attributes: HashMap::new(),
663 session_id: Some(1),
664 device_id: Some("device_a".to_string()),
665 ip_address: Some("192.168.1.1".to_string()),
666 location: Some("US".to_string()),
667 },
668 UserEvent {
669 id: 4,
670 user_id: 100,
671 event_type: "logout".to_string(),
672 timestamp: base_ts + 39600, attributes: HashMap::new(),
674 session_id: Some(1),
675 device_id: Some("device_a".to_string()),
676 ip_address: Some("192.168.1.1".to_string()),
677 location: Some("US".to_string()),
678 },
679 ]
680 }
681
682 #[test]
683 fn test_behavioral_profiling_metadata() {
684 let kernel = BehavioralProfiling::new();
685 assert_eq!(kernel.metadata().id, "behavioral/profiling");
686 assert_eq!(kernel.metadata().domain, Domain::BehavioralAnalytics);
687 }
688
689 #[test]
690 fn test_feature_extraction() {
691 let events = create_test_events();
692 let config = FeatureConfig::default();
693
694 let result = BehavioralProfiling::compute(100, &events, &config);
695
696 assert_eq!(result.user_id, 100);
697 assert!(!result.features.is_empty());
698 assert!(result.confidence > 0.0);
699 }
700
701 #[test]
702 fn test_temporal_features() {
703 let events = create_test_events();
704 let config = FeatureConfig {
705 extract_temporal: true,
706 extract_frequency: false,
707 extract_session: false,
708 extract_device_location: false,
709 };
710
711 let result = BehavioralProfiling::compute(100, &events, &config);
712
713 let feature_map: HashMap<_, _> = result.features.into_iter().collect();
714 assert!(feature_map.contains_key("peak_hour"));
715 assert!(feature_map.contains_key("business_hours_ratio"));
716 }
717
718 #[test]
719 fn test_empty_events() {
720 let config = FeatureConfig::default();
721 let result = BehavioralProfiling::compute(100, &[], &config);
722
723 assert_eq!(result.user_id, 100);
724 assert!(result.features.is_empty());
725 assert_eq!(result.stability, 0.0);
726 assert_eq!(result.confidence, 0.0);
727 }
728
729 #[test]
730 fn test_build_profile() {
731 let events = create_test_events();
732 let config = FeatureConfig::default();
733 let result = BehavioralProfiling::compute(100, &events, &config);
734
735 let profile = BehavioralProfiling::build_profile(&result, 1700000000);
736
737 assert_eq!(profile.user_id, 100);
738 assert_eq!(profile.features.len(), profile.feature_names.len());
739 }
740
741 #[test]
742 fn test_anomaly_profiling_metadata() {
743 let kernel = AnomalyProfiling::new();
744 assert_eq!(kernel.metadata().id, "behavioral/anomaly");
745 }
746
747 #[test]
748 fn test_anomaly_detection_normal() {
749 let events = create_test_events();
750 let config = FeatureConfig::default();
751 let result = BehavioralProfiling::compute(100, &events, &config);
752 let profile = BehavioralProfiling::build_profile(&result, 1700000000);
753
754 let normal_event = UserEvent {
756 id: 5,
757 user_id: 100,
758 event_type: "view".to_string(),
759 timestamp: 1700000000 + 36000, attributes: HashMap::new(),
761 session_id: Some(2),
762 device_id: Some("device_a".to_string()),
763 ip_address: Some("192.168.1.1".to_string()),
764 location: Some("US".to_string()),
765 };
766
767 let anomaly = AnomalyProfiling::compute(&normal_event, &profile, &events, 50.0);
768
769 assert_eq!(anomaly.user_id, 100);
770 assert!(anomaly.anomaly_score < 50.0);
772 }
773
774 #[test]
775 fn test_anomaly_detection_new_device() {
776 let events = create_test_events();
777 let config = FeatureConfig::default();
778 let result = BehavioralProfiling::compute(100, &events, &config);
779 let profile = BehavioralProfiling::build_profile(&result, 1700000000);
780
781 let suspicious_event = UserEvent {
783 id: 5,
784 user_id: 100,
785 event_type: "login".to_string(),
786 timestamp: 1700000000 + 36000,
787 attributes: HashMap::new(),
788 session_id: Some(2),
789 device_id: Some("unknown_device".to_string()),
790 ip_address: Some("10.0.0.1".to_string()),
791 location: Some("US".to_string()),
792 };
793
794 let anomaly = AnomalyProfiling::compute(&suspicious_event, &profile, &events, 30.0);
795
796 assert!(
798 anomaly
799 .deviations
800 .iter()
801 .any(|d| d.feature_name == "device"),
802 "Should detect device deviation"
803 );
804 }
805
806 #[test]
807 fn test_batch_analysis() {
808 let events = create_test_events();
809 let config = FeatureConfig::default();
810 let result = BehavioralProfiling::compute(100, &events, &config);
811 let profile = BehavioralProfiling::build_profile(&result, 1700000000);
812
813 let new_events: Vec<UserEvent> = (0..5)
814 .map(|i| UserEvent {
815 id: 10 + i as u64,
816 user_id: 100,
817 event_type: "view".to_string(),
818 timestamp: 1700000000 + 40000 + (i as u64 * 300),
819 attributes: HashMap::new(),
820 session_id: Some(3),
821 device_id: Some("device_a".to_string()),
822 ip_address: None,
823 location: Some("US".to_string()),
824 })
825 .collect();
826
827 let results = AnomalyProfiling::compute_batch(&new_events, &profile, 50.0);
828
829 assert_eq!(results.len(), 5);
830 }
831}