1use crate::messages::{TransactionMonitoringInput, TransactionMonitoringOutput};
7use crate::ring_messages::{MonitorTransactionResponse, MonitorTransactionRing};
8use crate::types::{
9 Alert, MonitoringResult, MonitoringRule, RuleType, Severity, TimeWindow, Transaction,
10};
11use async_trait::async_trait;
12use ringkernel_core::RingContext;
13use rustkernel_core::error::Result;
14use rustkernel_core::traits::{BatchKernel, RingKernelHandler};
15use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
16use std::collections::HashMap;
17use std::time::Instant;
18
19#[derive(Debug, Clone)]
28pub struct TransactionMonitoring {
29 metadata: KernelMetadata,
30}
31
32impl Default for TransactionMonitoring {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl TransactionMonitoring {
39 #[must_use]
41 pub fn new() -> Self {
42 Self {
43 metadata: KernelMetadata::ring("compliance/transaction-monitoring", Domain::Compliance)
44 .with_description("Real-time transaction threshold monitoring")
45 .with_throughput(500_000)
46 .with_latency_us(1.0),
47 }
48 }
49
50 pub fn compute(
57 transactions: &[Transaction],
58 rules: &[MonitoringRule],
59 current_time: u64,
60 ) -> MonitoringResult {
61 if transactions.is_empty() || rules.is_empty() {
62 return MonitoringResult {
63 alerts: Vec::new(),
64 entities_checked: 0,
65 transactions_analyzed: 0,
66 };
67 }
68
69 let mut alerts = Vec::new();
70 let mut alert_id = 1u64;
71
72 let mut entity_txs: HashMap<u64, Vec<&Transaction>> = HashMap::new();
74 for tx in transactions {
75 entity_txs.entry(tx.source_id).or_default().push(tx);
76 entity_txs.entry(tx.dest_id).or_default().push(tx);
77 }
78
79 let entities_checked = entity_txs.len();
80
81 for rule in rules {
83 match rule.rule_type {
84 RuleType::SingleAmount => {
85 for tx in transactions {
86 if tx.amount >= rule.threshold {
87 alerts.push(Alert {
88 id: alert_id,
89 rule_id: rule.id,
90 entity_id: tx.source_id,
91 timestamp: current_time,
92 severity: rule.severity,
93 current_value: tx.amount,
94 threshold: rule.threshold,
95 transaction_ids: vec![tx.id],
96 message: format!(
97 "Single transaction ${:.2} exceeds threshold ${:.2}",
98 tx.amount, rule.threshold
99 ),
100 });
101 alert_id += 1;
102 }
103 }
104 }
105
106 RuleType::AggregateAmount => {
107 let window = TimeWindow::new(
108 current_time.saturating_sub(rule.window_seconds),
109 current_time,
110 );
111
112 for (entity_id, txs) in &entity_txs {
113 let window_txs: Vec<_> = txs
114 .iter()
115 .filter(|tx| window.contains(tx.timestamp))
116 .collect();
117
118 let total: f64 = window_txs.iter().map(|tx| tx.amount).sum();
119
120 if total >= rule.threshold {
121 alerts.push(Alert {
122 id: alert_id,
123 rule_id: rule.id,
124 entity_id: *entity_id,
125 timestamp: current_time,
126 severity: rule.severity,
127 current_value: total,
128 threshold: rule.threshold,
129 transaction_ids: window_txs.iter().map(|tx| tx.id).collect(),
130 message: format!(
131 "Aggregate amount ${:.2} over {} hours exceeds ${:.2}",
132 total,
133 rule.window_seconds / 3600,
134 rule.threshold
135 ),
136 });
137 alert_id += 1;
138 }
139 }
140 }
141
142 RuleType::TransactionCount => {
143 let window = TimeWindow::new(
144 current_time.saturating_sub(rule.window_seconds),
145 current_time,
146 );
147
148 for (entity_id, txs) in &entity_txs {
149 let window_txs: Vec<_> = txs
150 .iter()
151 .filter(|tx| window.contains(tx.timestamp))
152 .collect();
153
154 let count = window_txs.len() as f64;
155
156 if count >= rule.threshold {
157 alerts.push(Alert {
158 id: alert_id,
159 rule_id: rule.id,
160 entity_id: *entity_id,
161 timestamp: current_time,
162 severity: rule.severity,
163 current_value: count,
164 threshold: rule.threshold,
165 transaction_ids: window_txs.iter().map(|tx| tx.id).collect(),
166 message: format!(
167 "{} transactions over {} hours exceeds threshold {}",
168 count as u64,
169 rule.window_seconds / 3600,
170 rule.threshold as u64
171 ),
172 });
173 alert_id += 1;
174 }
175 }
176 }
177
178 RuleType::Velocity => {
179 let window = TimeWindow::new(
180 current_time.saturating_sub(rule.window_seconds),
181 current_time,
182 );
183 let hours = rule.window_seconds as f64 / 3600.0;
184
185 for (entity_id, txs) in &entity_txs {
186 let window_txs: Vec<_> = txs
187 .iter()
188 .filter(|tx| window.contains(tx.timestamp))
189 .collect();
190
191 let total: f64 = window_txs.iter().map(|tx| tx.amount).sum();
192 let velocity = if hours > 0.0 { total / hours } else { total };
193
194 if velocity >= rule.threshold {
195 alerts.push(Alert {
196 id: alert_id,
197 rule_id: rule.id,
198 entity_id: *entity_id,
199 timestamp: current_time,
200 severity: rule.severity,
201 current_value: velocity,
202 threshold: rule.threshold,
203 transaction_ids: window_txs.iter().map(|tx| tx.id).collect(),
204 message: format!(
205 "Velocity ${:.2}/hour exceeds threshold ${:.2}/hour",
206 velocity, rule.threshold
207 ),
208 });
209 alert_id += 1;
210 }
211 }
212 }
213
214 RuleType::GeographicRisk => {
215 }
219 }
220 }
221
222 alerts.sort_by(|a, b| b.severity.cmp(&a.severity));
224
225 MonitoringResult {
226 alerts,
227 entities_checked,
228 transactions_analyzed: transactions.len(),
229 }
230 }
231
232 pub fn default_rules() -> Vec<MonitoringRule> {
234 vec![
235 MonitoringRule {
237 id: 1,
238 name: "Large Single Transaction".to_string(),
239 rule_type: RuleType::SingleAmount,
240 threshold: 10_000.0,
241 window_seconds: 0,
242 severity: Severity::High,
243 },
244 MonitoringRule {
246 id: 2,
247 name: "CTR Threshold".to_string(),
248 rule_type: RuleType::AggregateAmount,
249 threshold: 10_000.0,
250 window_seconds: 86400, severity: Severity::High,
252 },
253 MonitoringRule {
255 id: 3,
256 name: "High Transaction Count".to_string(),
257 rule_type: RuleType::TransactionCount,
258 threshold: 50.0,
259 window_seconds: 86400, severity: Severity::Medium,
261 },
262 MonitoringRule {
264 id: 4,
265 name: "High Velocity".to_string(),
266 rule_type: RuleType::Velocity,
267 threshold: 5000.0, window_seconds: 3600,
269 severity: Severity::High,
270 },
271 ]
272 }
273}
274
275impl GpuKernel for TransactionMonitoring {
276 fn metadata(&self) -> &KernelMetadata {
277 &self.metadata
278 }
279}
280
281#[async_trait]
282impl BatchKernel<TransactionMonitoringInput, TransactionMonitoringOutput>
283 for TransactionMonitoring
284{
285 async fn execute(
286 &self,
287 input: TransactionMonitoringInput,
288 ) -> Result<TransactionMonitoringOutput> {
289 let start = Instant::now();
290 let result = Self::compute(&input.transactions, &input.rules, input.current_time);
291 Ok(TransactionMonitoringOutput {
292 result,
293 compute_time_us: start.elapsed().as_micros() as u64,
294 })
295 }
296}
297
298#[async_trait]
303impl RingKernelHandler<MonitorTransactionRing, MonitorTransactionResponse>
304 for TransactionMonitoring
305{
306 async fn handle(
307 &self,
308 _ctx: &mut RingContext,
309 msg: MonitorTransactionRing,
310 ) -> Result<MonitorTransactionResponse> {
311 let currency = std::str::from_utf8(&msg.currency[..3])
313 .unwrap_or("USD")
314 .trim_end_matches('\0')
315 .to_string();
316
317 let transaction = Transaction {
318 id: msg.tx_id,
319 source_id: msg.source_id,
320 dest_id: msg.dest_id,
321 amount: msg.amount as f64 / 100_000_000.0, timestamp: msg.timestamp,
323 currency,
324 tx_type: match msg.tx_type {
325 0 => "wire",
326 1 => "ach",
327 2 => "check",
328 _ => "other",
329 }
330 .to_string(),
331 };
332
333 let rules = Self::default_rules();
335 let result = Self::compute(&[transaction], &rules, msg.timestamp);
336
337 let risk_score = if result.alerts.is_empty() {
339 0u8
340 } else {
341 result
342 .alerts
343 .iter()
344 .map(|a| match a.severity {
345 Severity::Info => 10u8,
346 Severity::Low => 25,
347 Severity::Medium => 50,
348 Severity::High => 75,
349 Severity::Critical => 100,
350 })
351 .max()
352 .unwrap_or(0)
353 };
354
355 let mut alert_flags = 0u64;
357 for alert in &result.alerts {
358 alert_flags |= 1u64 << (alert.rule_id % 64);
359 }
360
361 Ok(MonitorTransactionResponse {
362 correlation_id: msg.correlation_id,
363 tx_id: msg.tx_id,
364 alert_count: result.alerts.len() as u32,
365 risk_score,
366 alert_flags,
367 })
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 fn create_test_transactions() -> Vec<Transaction> {
376 vec![
377 Transaction {
378 id: 1,
379 source_id: 100,
380 dest_id: 200,
381 amount: 5000.0,
382 timestamp: 1000,
383 currency: "USD".to_string(),
384 tx_type: "wire".to_string(),
385 },
386 Transaction {
387 id: 2,
388 source_id: 100,
389 dest_id: 201,
390 amount: 4500.0,
391 timestamp: 1100,
392 currency: "USD".to_string(),
393 tx_type: "wire".to_string(),
394 },
395 Transaction {
396 id: 3,
397 source_id: 100,
398 dest_id: 202,
399 amount: 3000.0,
400 timestamp: 1200,
401 currency: "USD".to_string(),
402 tx_type: "wire".to_string(),
403 },
404 ]
405 }
406
407 #[test]
408 fn test_monitoring_metadata() {
409 let kernel = TransactionMonitoring::new();
410 assert_eq!(kernel.metadata().id, "compliance/transaction-monitoring");
411 assert_eq!(kernel.metadata().domain, Domain::Compliance);
412 }
413
414 #[test]
415 fn test_single_amount_rule() {
416 let txs = vec![Transaction {
417 id: 1,
418 source_id: 100,
419 dest_id: 200,
420 amount: 15000.0, timestamp: 1000,
422 currency: "USD".to_string(),
423 tx_type: "wire".to_string(),
424 }];
425
426 let rules = vec![MonitoringRule {
427 id: 1,
428 name: "Large Transaction".to_string(),
429 rule_type: RuleType::SingleAmount,
430 threshold: 10000.0,
431 window_seconds: 0,
432 severity: Severity::High,
433 }];
434
435 let result = TransactionMonitoring::compute(&txs, &rules, 2000);
436
437 assert!(!result.alerts.is_empty());
438 assert_eq!(result.alerts[0].current_value, 15000.0);
439 assert_eq!(result.alerts[0].severity, Severity::High);
440 }
441
442 #[test]
443 fn test_aggregate_amount_rule() {
444 let txs = create_test_transactions();
445
446 let rules = vec![MonitoringRule {
447 id: 1,
448 name: "Aggregate Amount".to_string(),
449 rule_type: RuleType::AggregateAmount,
450 threshold: 10000.0,
451 window_seconds: 3600, severity: Severity::High,
453 }];
454
455 let result = TransactionMonitoring::compute(&txs, &rules, 1500);
456
457 assert!(!result.alerts.is_empty());
459 let entity_alert = result.alerts.iter().find(|a| a.entity_id == 100);
460 assert!(entity_alert.is_some());
461 }
462
463 #[test]
464 fn test_transaction_count_rule() {
465 let txs: Vec<Transaction> = (0..60)
467 .map(|i| Transaction {
468 id: i as u64,
469 source_id: 100,
470 dest_id: 200,
471 amount: 100.0,
472 timestamp: 1000 + i as u64,
473 currency: "USD".to_string(),
474 tx_type: "wire".to_string(),
475 })
476 .collect();
477
478 let rules = vec![MonitoringRule {
479 id: 1,
480 name: "High Count".to_string(),
481 rule_type: RuleType::TransactionCount,
482 threshold: 50.0,
483 window_seconds: 3600,
484 severity: Severity::Medium,
485 }];
486
487 let result = TransactionMonitoring::compute(&txs, &rules, 2000);
488
489 assert!(!result.alerts.is_empty());
490 assert!(result.alerts[0].current_value >= 50.0);
491 }
492
493 #[test]
494 fn test_velocity_rule() {
495 let txs = vec![
497 Transaction {
498 id: 1,
499 source_id: 100,
500 dest_id: 200,
501 amount: 5000.0,
502 timestamp: 1000,
503 currency: "USD".to_string(),
504 tx_type: "wire".to_string(),
505 },
506 Transaction {
507 id: 2,
508 source_id: 100,
509 dest_id: 201,
510 amount: 5000.0,
511 timestamp: 2000,
512 currency: "USD".to_string(),
513 tx_type: "wire".to_string(),
514 },
515 ];
516
517 let rules = vec![MonitoringRule {
518 id: 1,
519 name: "High Velocity".to_string(),
520 rule_type: RuleType::Velocity,
521 threshold: 5000.0, window_seconds: 3600,
523 severity: Severity::High,
524 }];
525
526 let result = TransactionMonitoring::compute(&txs, &rules, 4000);
527
528 assert!(!result.alerts.is_empty());
529 }
530
531 #[test]
532 fn test_no_alerts_below_threshold() {
533 let txs = vec![Transaction {
534 id: 1,
535 source_id: 100,
536 dest_id: 200,
537 amount: 500.0, timestamp: 1000,
539 currency: "USD".to_string(),
540 tx_type: "wire".to_string(),
541 }];
542
543 let rules = vec![MonitoringRule {
544 id: 1,
545 name: "Large Transaction".to_string(),
546 rule_type: RuleType::SingleAmount,
547 threshold: 10000.0,
548 window_seconds: 0,
549 severity: Severity::High,
550 }];
551
552 let result = TransactionMonitoring::compute(&txs, &rules, 2000);
553
554 assert!(result.alerts.is_empty());
555 }
556
557 #[test]
558 fn test_default_rules() {
559 let rules = TransactionMonitoring::default_rules();
560 assert!(!rules.is_empty());
561 assert!(rules.iter().any(|r| r.rule_type == RuleType::SingleAmount));
562 assert!(
563 rules
564 .iter()
565 .any(|r| r.rule_type == RuleType::AggregateAmount)
566 );
567 }
568
569 #[test]
570 fn test_empty_inputs() {
571 let txs = create_test_transactions();
572 let rules = TransactionMonitoring::default_rules();
573
574 let result1 = TransactionMonitoring::compute(&[], &rules, 1000);
575 assert!(result1.alerts.is_empty());
576
577 let result2 = TransactionMonitoring::compute(&txs, &[], 1000);
578 assert!(result2.alerts.is_empty());
579 }
580}