ringkernel_txmon/monitoring/
engine.rs

1//! Monitoring engine abstraction.
2
3use super::rules::{monitor_transaction, MonitoringConfig};
4use crate::types::{CustomerRiskProfile, MonitoringAlert, Transaction};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7/// Monitoring engine that processes transactions and generates alerts.
8pub struct MonitoringEngine {
9    config: MonitoringConfig,
10    alert_counter: AtomicU64,
11}
12
13impl MonitoringEngine {
14    /// Create a new CPU-based monitoring engine.
15    pub fn new_cpu(config: MonitoringConfig) -> Self {
16        Self {
17            config,
18            alert_counter: AtomicU64::new(1),
19        }
20    }
21
22    /// Get a reference to the configuration.
23    pub fn config(&self) -> &MonitoringConfig {
24        &self.config
25    }
26
27    /// Update the configuration.
28    pub fn set_config(&mut self, config: MonitoringConfig) {
29        self.config = config;
30    }
31
32    /// Process a batch of transactions and return generated alerts.
33    ///
34    /// Each transaction is paired with its corresponding customer profile.
35    pub fn process_batch(
36        &self,
37        transactions: &[Transaction],
38        profiles: &[CustomerRiskProfile],
39    ) -> Vec<MonitoringAlert> {
40        let mut all_alerts = Vec::new();
41
42        for (tx, profile) in transactions.iter().zip(profiles.iter()) {
43            let alert_id = self.alert_counter.fetch_add(10, Ordering::Relaxed);
44            let alerts = monitor_transaction(tx, profile, &self.config, alert_id);
45            all_alerts.extend(alerts);
46        }
47
48        all_alerts
49    }
50
51    /// Process a single transaction.
52    pub fn process_single(
53        &self,
54        tx: &Transaction,
55        profile: &CustomerRiskProfile,
56    ) -> Vec<MonitoringAlert> {
57        let alert_id = self.alert_counter.fetch_add(10, Ordering::Relaxed);
58        monitor_transaction(tx, profile, &self.config, alert_id)
59    }
60
61    /// Get the current alert counter value.
62    pub fn alert_count(&self) -> u64 {
63        self.alert_counter.load(Ordering::Relaxed)
64    }
65
66    /// Reset the alert counter.
67    pub fn reset_counter(&self) {
68        self.alert_counter.store(1, Ordering::Relaxed);
69    }
70}
71
72impl Default for MonitoringEngine {
73    fn default() -> Self {
74        Self::new_cpu(MonitoringConfig::default())
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81
82    #[test]
83    fn test_process_batch() {
84        let engine = MonitoringEngine::default();
85
86        let transactions = vec![
87            Transaction::new(1, 1, 1_500_000, 1, 0), // Over threshold
88            Transaction::new(2, 2, 50_000, 1, 0),    // Normal
89            Transaction::new(3, 3, 950_000, 1, 0),   // Just under threshold
90        ];
91
92        let mut profiles: Vec<CustomerRiskProfile> =
93            (1..=3).map(|id| CustomerRiskProfile::new(id, 1)).collect();
94
95        // Set up profile 3 for structuring detection
96        profiles[2].velocity_count = 5;
97
98        let alerts = engine.process_batch(&transactions, &profiles);
99
100        // Should have alerts for tx 1 (amount) and tx 3 (structuring)
101        assert!(alerts.len() >= 2);
102    }
103
104    #[test]
105    fn test_alert_counter_increment() {
106        let engine = MonitoringEngine::default();
107
108        let initial = engine.alert_count();
109
110        let tx = Transaction::new(1, 1, 1_500_000, 1, 0);
111        let profile = CustomerRiskProfile::new(1, 1);
112        let _ = engine.process_single(&tx, &profile);
113
114        assert!(engine.alert_count() > initial);
115    }
116}