rustkernel_compliance/
monitoring.rs

1//! Transaction monitoring kernels.
2//!
3//! This module provides real-time transaction monitoring
4//! with configurable rules and thresholds.
5
6use crate::messages::{TransactionMonitoringInput, TransactionMonitoringOutput};
7use crate::ring_messages::{MonitorTransactionResponse, MonitorTransactionRing};
8use crate::types::{
9    Alert, MonitoringResult, MonitoringRule, RuleType, Severity, TimeWindow, Transaction,
10};
11use async_trait::async_trait;
12use ringkernel_core::RingContext;
13use rustkernel_core::error::Result;
14use rustkernel_core::traits::{BatchKernel, RingKernelHandler};
15use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
16use std::collections::HashMap;
17use std::time::Instant;
18
19// ============================================================================
20// Transaction Monitoring Kernel
21// ============================================================================
22
23/// Transaction monitoring kernel.
24///
25/// Monitors transactions in real-time against configurable rules
26/// and generates alerts when thresholds are exceeded.
27#[derive(Debug, Clone)]
28pub struct TransactionMonitoring {
29    metadata: KernelMetadata,
30}
31
32impl Default for TransactionMonitoring {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl TransactionMonitoring {
39    /// Create a new transaction monitoring kernel.
40    #[must_use]
41    pub fn new() -> Self {
42        Self {
43            metadata: KernelMetadata::ring("compliance/transaction-monitoring", Domain::Compliance)
44                .with_description("Real-time transaction threshold monitoring")
45                .with_throughput(500_000)
46                .with_latency_us(1.0),
47        }
48    }
49
50    /// Monitor transactions against rules.
51    ///
52    /// # Arguments
53    /// * `transactions` - Transactions to analyze
54    /// * `rules` - Monitoring rules to apply
55    /// * `current_time` - Current timestamp for time window calculations
56    pub fn compute(
57        transactions: &[Transaction],
58        rules: &[MonitoringRule],
59        current_time: u64,
60    ) -> MonitoringResult {
61        if transactions.is_empty() || rules.is_empty() {
62            return MonitoringResult {
63                alerts: Vec::new(),
64                entities_checked: 0,
65                transactions_analyzed: 0,
66            };
67        }
68
69        let mut alerts = Vec::new();
70        let mut alert_id = 1u64;
71
72        // Group transactions by entity (both source and dest)
73        let mut entity_txs: HashMap<u64, Vec<&Transaction>> = HashMap::new();
74        for tx in transactions {
75            entity_txs.entry(tx.source_id).or_default().push(tx);
76            entity_txs.entry(tx.dest_id).or_default().push(tx);
77        }
78
79        let entities_checked = entity_txs.len();
80
81        // Apply each rule
82        for rule in rules {
83            match rule.rule_type {
84                RuleType::SingleAmount => {
85                    for tx in transactions {
86                        if tx.amount >= rule.threshold {
87                            alerts.push(Alert {
88                                id: alert_id,
89                                rule_id: rule.id,
90                                entity_id: tx.source_id,
91                                timestamp: current_time,
92                                severity: rule.severity,
93                                current_value: tx.amount,
94                                threshold: rule.threshold,
95                                transaction_ids: vec![tx.id],
96                                message: format!(
97                                    "Single transaction ${:.2} exceeds threshold ${:.2}",
98                                    tx.amount, rule.threshold
99                                ),
100                            });
101                            alert_id += 1;
102                        }
103                    }
104                }
105
106                RuleType::AggregateAmount => {
107                    let window = TimeWindow::new(
108                        current_time.saturating_sub(rule.window_seconds),
109                        current_time,
110                    );
111
112                    for (entity_id, txs) in &entity_txs {
113                        let window_txs: Vec<_> = txs
114                            .iter()
115                            .filter(|tx| window.contains(tx.timestamp))
116                            .collect();
117
118                        let total: f64 = window_txs.iter().map(|tx| tx.amount).sum();
119
120                        if total >= rule.threshold {
121                            alerts.push(Alert {
122                                id: alert_id,
123                                rule_id: rule.id,
124                                entity_id: *entity_id,
125                                timestamp: current_time,
126                                severity: rule.severity,
127                                current_value: total,
128                                threshold: rule.threshold,
129                                transaction_ids: window_txs.iter().map(|tx| tx.id).collect(),
130                                message: format!(
131                                    "Aggregate amount ${:.2} over {} hours exceeds ${:.2}",
132                                    total,
133                                    rule.window_seconds / 3600,
134                                    rule.threshold
135                                ),
136                            });
137                            alert_id += 1;
138                        }
139                    }
140                }
141
142                RuleType::TransactionCount => {
143                    let window = TimeWindow::new(
144                        current_time.saturating_sub(rule.window_seconds),
145                        current_time,
146                    );
147
148                    for (entity_id, txs) in &entity_txs {
149                        let window_txs: Vec<_> = txs
150                            .iter()
151                            .filter(|tx| window.contains(tx.timestamp))
152                            .collect();
153
154                        let count = window_txs.len() as f64;
155
156                        if count >= rule.threshold {
157                            alerts.push(Alert {
158                                id: alert_id,
159                                rule_id: rule.id,
160                                entity_id: *entity_id,
161                                timestamp: current_time,
162                                severity: rule.severity,
163                                current_value: count,
164                                threshold: rule.threshold,
165                                transaction_ids: window_txs.iter().map(|tx| tx.id).collect(),
166                                message: format!(
167                                    "{} transactions over {} hours exceeds threshold {}",
168                                    count as u64,
169                                    rule.window_seconds / 3600,
170                                    rule.threshold as u64
171                                ),
172                            });
173                            alert_id += 1;
174                        }
175                    }
176                }
177
178                RuleType::Velocity => {
179                    let window = TimeWindow::new(
180                        current_time.saturating_sub(rule.window_seconds),
181                        current_time,
182                    );
183                    let hours = rule.window_seconds as f64 / 3600.0;
184
185                    for (entity_id, txs) in &entity_txs {
186                        let window_txs: Vec<_> = txs
187                            .iter()
188                            .filter(|tx| window.contains(tx.timestamp))
189                            .collect();
190
191                        let total: f64 = window_txs.iter().map(|tx| tx.amount).sum();
192                        let velocity = if hours > 0.0 { total / hours } else { total };
193
194                        if velocity >= rule.threshold {
195                            alerts.push(Alert {
196                                id: alert_id,
197                                rule_id: rule.id,
198                                entity_id: *entity_id,
199                                timestamp: current_time,
200                                severity: rule.severity,
201                                current_value: velocity,
202                                threshold: rule.threshold,
203                                transaction_ids: window_txs.iter().map(|tx| tx.id).collect(),
204                                message: format!(
205                                    "Velocity ${:.2}/hour exceeds threshold ${:.2}/hour",
206                                    velocity, rule.threshold
207                                ),
208                            });
209                            alert_id += 1;
210                        }
211                    }
212                }
213
214                RuleType::GeographicRisk => {
215                    // For geographic risk, we would need country data on transactions
216                    // This is a simplified implementation
217                    // In practice, would check against high-risk country lists
218                }
219            }
220        }
221
222        // Sort alerts by severity (descending)
223        alerts.sort_by(|a, b| b.severity.cmp(&a.severity));
224
225        MonitoringResult {
226            alerts,
227            entities_checked,
228            transactions_analyzed: transactions.len(),
229        }
230    }
231
232    /// Create default monitoring rules.
233    pub fn default_rules() -> Vec<MonitoringRule> {
234        vec![
235            // Large single transaction
236            MonitoringRule {
237                id: 1,
238                name: "Large Single Transaction".to_string(),
239                rule_type: RuleType::SingleAmount,
240                threshold: 10_000.0,
241                window_seconds: 0,
242                severity: Severity::High,
243            },
244            // CTR threshold
245            MonitoringRule {
246                id: 2,
247                name: "CTR Threshold".to_string(),
248                rule_type: RuleType::AggregateAmount,
249                threshold: 10_000.0,
250                window_seconds: 86400, // 24 hours
251                severity: Severity::High,
252            },
253            // High transaction count
254            MonitoringRule {
255                id: 3,
256                name: "High Transaction Count".to_string(),
257                rule_type: RuleType::TransactionCount,
258                threshold: 50.0,
259                window_seconds: 86400, // 24 hours
260                severity: Severity::Medium,
261            },
262            // High velocity
263            MonitoringRule {
264                id: 4,
265                name: "High Velocity".to_string(),
266                rule_type: RuleType::Velocity,
267                threshold: 5000.0, // $5000/hour
268                window_seconds: 3600,
269                severity: Severity::High,
270            },
271        ]
272    }
273}
274
275impl GpuKernel for TransactionMonitoring {
276    fn metadata(&self) -> &KernelMetadata {
277        &self.metadata
278    }
279}
280
281#[async_trait]
282impl BatchKernel<TransactionMonitoringInput, TransactionMonitoringOutput>
283    for TransactionMonitoring
284{
285    async fn execute(
286        &self,
287        input: TransactionMonitoringInput,
288    ) -> Result<TransactionMonitoringOutput> {
289        let start = Instant::now();
290        let result = Self::compute(&input.transactions, &input.rules, input.current_time);
291        Ok(TransactionMonitoringOutput {
292            result,
293            compute_time_us: start.elapsed().as_micros() as u64,
294        })
295    }
296}
297
298// ============================================================================
299// Ring Kernel Handler Implementation
300// ============================================================================
301
302#[async_trait]
303impl RingKernelHandler<MonitorTransactionRing, MonitorTransactionResponse>
304    for TransactionMonitoring
305{
306    async fn handle(
307        &self,
308        _ctx: &mut RingContext,
309        msg: MonitorTransactionRing,
310    ) -> Result<MonitorTransactionResponse> {
311        // Convert Ring message to domain Transaction
312        let currency = std::str::from_utf8(&msg.currency[..3])
313            .unwrap_or("USD")
314            .trim_end_matches('\0')
315            .to_string();
316
317        let transaction = Transaction {
318            id: msg.tx_id,
319            source_id: msg.source_id,
320            dest_id: msg.dest_id,
321            amount: msg.amount as f64 / 100_000_000.0, // Convert from fixed-point
322            timestamp: msg.timestamp,
323            currency,
324            tx_type: match msg.tx_type {
325                0 => "wire",
326                1 => "ach",
327                2 => "check",
328                _ => "other",
329            }
330            .to_string(),
331        };
332
333        // Apply default monitoring rules
334        let rules = Self::default_rules();
335        let result = Self::compute(&[transaction], &rules, msg.timestamp);
336
337        // Calculate aggregated risk score
338        let risk_score = if result.alerts.is_empty() {
339            0u8
340        } else {
341            result
342                .alerts
343                .iter()
344                .map(|a| match a.severity {
345                    Severity::Info => 10u8,
346                    Severity::Low => 25,
347                    Severity::Medium => 50,
348                    Severity::High => 75,
349                    Severity::Critical => 100,
350                })
351                .max()
352                .unwrap_or(0)
353        };
354
355        // Build alert flags bitmask
356        let mut alert_flags = 0u64;
357        for alert in &result.alerts {
358            alert_flags |= 1u64 << (alert.rule_id % 64);
359        }
360
361        Ok(MonitorTransactionResponse {
362            correlation_id: msg.correlation_id,
363            tx_id: msg.tx_id,
364            alert_count: result.alerts.len() as u32,
365            risk_score,
366            alert_flags,
367        })
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    fn create_test_transactions() -> Vec<Transaction> {
376        vec![
377            Transaction {
378                id: 1,
379                source_id: 100,
380                dest_id: 200,
381                amount: 5000.0,
382                timestamp: 1000,
383                currency: "USD".to_string(),
384                tx_type: "wire".to_string(),
385            },
386            Transaction {
387                id: 2,
388                source_id: 100,
389                dest_id: 201,
390                amount: 4500.0,
391                timestamp: 1100,
392                currency: "USD".to_string(),
393                tx_type: "wire".to_string(),
394            },
395            Transaction {
396                id: 3,
397                source_id: 100,
398                dest_id: 202,
399                amount: 3000.0,
400                timestamp: 1200,
401                currency: "USD".to_string(),
402                tx_type: "wire".to_string(),
403            },
404        ]
405    }
406
407    #[test]
408    fn test_monitoring_metadata() {
409        let kernel = TransactionMonitoring::new();
410        assert_eq!(kernel.metadata().id, "compliance/transaction-monitoring");
411        assert_eq!(kernel.metadata().domain, Domain::Compliance);
412    }
413
414    #[test]
415    fn test_single_amount_rule() {
416        let txs = vec![Transaction {
417            id: 1,
418            source_id: 100,
419            dest_id: 200,
420            amount: 15000.0, // Above threshold
421            timestamp: 1000,
422            currency: "USD".to_string(),
423            tx_type: "wire".to_string(),
424        }];
425
426        let rules = vec![MonitoringRule {
427            id: 1,
428            name: "Large Transaction".to_string(),
429            rule_type: RuleType::SingleAmount,
430            threshold: 10000.0,
431            window_seconds: 0,
432            severity: Severity::High,
433        }];
434
435        let result = TransactionMonitoring::compute(&txs, &rules, 2000);
436
437        assert!(!result.alerts.is_empty());
438        assert_eq!(result.alerts[0].current_value, 15000.0);
439        assert_eq!(result.alerts[0].severity, Severity::High);
440    }
441
442    #[test]
443    fn test_aggregate_amount_rule() {
444        let txs = create_test_transactions();
445
446        let rules = vec![MonitoringRule {
447            id: 1,
448            name: "Aggregate Amount".to_string(),
449            rule_type: RuleType::AggregateAmount,
450            threshold: 10000.0,
451            window_seconds: 3600, // 1 hour
452            severity: Severity::High,
453        }];
454
455        let result = TransactionMonitoring::compute(&txs, &rules, 1500);
456
457        // Total for entity 100 = 12500, should trigger
458        assert!(!result.alerts.is_empty());
459        let entity_alert = result.alerts.iter().find(|a| a.entity_id == 100);
460        assert!(entity_alert.is_some());
461    }
462
463    #[test]
464    fn test_transaction_count_rule() {
465        // Create many transactions
466        let txs: Vec<Transaction> = (0..60)
467            .map(|i| Transaction {
468                id: i as u64,
469                source_id: 100,
470                dest_id: 200,
471                amount: 100.0,
472                timestamp: 1000 + i as u64,
473                currency: "USD".to_string(),
474                tx_type: "wire".to_string(),
475            })
476            .collect();
477
478        let rules = vec![MonitoringRule {
479            id: 1,
480            name: "High Count".to_string(),
481            rule_type: RuleType::TransactionCount,
482            threshold: 50.0,
483            window_seconds: 3600,
484            severity: Severity::Medium,
485        }];
486
487        let result = TransactionMonitoring::compute(&txs, &rules, 2000);
488
489        assert!(!result.alerts.is_empty());
490        assert!(result.alerts[0].current_value >= 50.0);
491    }
492
493    #[test]
494    fn test_velocity_rule() {
495        // High velocity: $10000 in 1 hour = $10000/hour
496        let txs = vec![
497            Transaction {
498                id: 1,
499                source_id: 100,
500                dest_id: 200,
501                amount: 5000.0,
502                timestamp: 1000,
503                currency: "USD".to_string(),
504                tx_type: "wire".to_string(),
505            },
506            Transaction {
507                id: 2,
508                source_id: 100,
509                dest_id: 201,
510                amount: 5000.0,
511                timestamp: 2000,
512                currency: "USD".to_string(),
513                tx_type: "wire".to_string(),
514            },
515        ];
516
517        let rules = vec![MonitoringRule {
518            id: 1,
519            name: "High Velocity".to_string(),
520            rule_type: RuleType::Velocity,
521            threshold: 5000.0, // $5000/hour
522            window_seconds: 3600,
523            severity: Severity::High,
524        }];
525
526        let result = TransactionMonitoring::compute(&txs, &rules, 4000);
527
528        assert!(!result.alerts.is_empty());
529    }
530
531    #[test]
532    fn test_no_alerts_below_threshold() {
533        let txs = vec![Transaction {
534            id: 1,
535            source_id: 100,
536            dest_id: 200,
537            amount: 500.0, // Below threshold
538            timestamp: 1000,
539            currency: "USD".to_string(),
540            tx_type: "wire".to_string(),
541        }];
542
543        let rules = vec![MonitoringRule {
544            id: 1,
545            name: "Large Transaction".to_string(),
546            rule_type: RuleType::SingleAmount,
547            threshold: 10000.0,
548            window_seconds: 0,
549            severity: Severity::High,
550        }];
551
552        let result = TransactionMonitoring::compute(&txs, &rules, 2000);
553
554        assert!(result.alerts.is_empty());
555    }
556
557    #[test]
558    fn test_default_rules() {
559        let rules = TransactionMonitoring::default_rules();
560        assert!(!rules.is_empty());
561        assert!(rules.iter().any(|r| r.rule_type == RuleType::SingleAmount));
562        assert!(
563            rules
564                .iter()
565                .any(|r| r.rule_type == RuleType::AggregateAmount)
566        );
567    }
568
569    #[test]
570    fn test_empty_inputs() {
571        let txs = create_test_transactions();
572        let rules = TransactionMonitoring::default_rules();
573
574        let result1 = TransactionMonitoring::compute(&[], &rules, 1000);
575        assert!(result1.alerts.is_empty());
576
577        let result2 = TransactionMonitoring::compute(&txs, &[], 1000);
578        assert!(result2.alerts.is_empty());
579    }
580}