1use crate::types::*;
6use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
7use std::collections::{HashMap, HashSet};
8
9#[derive(Debug, Clone)]
18pub struct FlowAnalysis {
19 metadata: KernelMetadata,
20}
21
22impl Default for FlowAnalysis {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl FlowAnalysis {
29 #[must_use]
31 pub fn new() -> Self {
32 Self {
33 metadata: KernelMetadata::batch("payments/flow-analysis", Domain::PaymentProcessing)
34 .with_description("Payment flow network analysis and metrics")
35 .with_throughput(100_000)
36 .with_latency_us(50.0),
37 }
38 }
39
40 pub fn analyze(payments: &[Payment], config: &FlowAnalysisConfig) -> FlowAnalysisResult {
42 let flows = Self::build_flows(payments);
44
45 let node_metrics = Self::calculate_node_metrics(&flows, payments);
47
48 let overall_metrics = Self::calculate_overall_metrics(payments, &node_metrics);
50
51 let anomalies = if config.detect_anomalies {
53 Self::detect_anomalies(&flows, &node_metrics, config)
54 } else {
55 Vec::new()
56 };
57
58 FlowAnalysisResult {
59 flows,
60 node_metrics,
61 overall_metrics,
62 anomalies,
63 }
64 }
65
66 fn build_flows(payments: &[Payment]) -> Vec<PaymentFlow> {
68 let mut flow_map: HashMap<(String, String), (f64, usize)> = HashMap::new();
69
70 for payment in payments {
71 if payment.status == PaymentStatus::Completed
72 || payment.status == PaymentStatus::Processing
73 {
74 let key = (payment.payer_account.clone(), payment.payee_account.clone());
75 let entry = flow_map.entry(key).or_insert((0.0, 0));
76 entry.0 += payment.amount;
77 entry.1 += 1;
78 }
79 }
80
81 flow_map
82 .into_iter()
83 .map(|((source, target), (volume, count))| PaymentFlow {
84 source,
85 target,
86 volume,
87 count,
88 avg_amount: volume / count as f64,
89 })
90 .collect()
91 }
92
93 fn calculate_node_metrics(
95 flows: &[PaymentFlow],
96 payments: &[Payment],
97 ) -> HashMap<String, NodeMetrics> {
98 let mut metrics: HashMap<String, NodeMetrics> = HashMap::new();
99
100 let mut accounts: HashSet<String> = HashSet::new();
102 for payment in payments {
103 accounts.insert(payment.payer_account.clone());
104 accounts.insert(payment.payee_account.clone());
105 }
106
107 for account in accounts {
108 metrics.insert(
109 account.clone(),
110 NodeMetrics {
111 node_id: account,
112 total_inflow: 0.0,
113 total_outflow: 0.0,
114 net_flow: 0.0,
115 inbound_count: 0,
116 outbound_count: 0,
117 centrality: 0.0,
118 },
119 );
120 }
121
122 for flow in flows {
124 if let Some(source_metrics) = metrics.get_mut(&flow.source) {
125 source_metrics.total_outflow += flow.volume;
126 source_metrics.outbound_count += flow.count;
127 }
128 if let Some(target_metrics) = metrics.get_mut(&flow.target) {
129 target_metrics.total_inflow += flow.volume;
130 target_metrics.inbound_count += flow.count;
131 }
132 }
133
134 let total_flow: f64 = flows.iter().map(|f| f.volume).sum();
136 for (_, node) in metrics.iter_mut() {
137 node.net_flow = node.total_inflow - node.total_outflow;
138 if total_flow > 0.0 {
140 node.centrality = (node.total_inflow + node.total_outflow) / (2.0 * total_flow);
141 }
142 }
143
144 metrics
145 }
146
147 fn calculate_overall_metrics(
149 payments: &[Payment],
150 node_metrics: &HashMap<String, NodeMetrics>,
151 ) -> OverallMetrics {
152 let completed: Vec<_> = payments
153 .iter()
154 .filter(|p| {
155 p.status == PaymentStatus::Completed || p.status == PaymentStatus::Processing
156 })
157 .collect();
158
159 let total_volume: f64 = completed.iter().map(|p| p.amount).sum();
160 let total_transactions = completed.len();
161
162 let unique_payers: HashSet<_> = completed.iter().map(|p| &p.payer_account).collect();
163 let unique_payees: HashSet<_> = completed.iter().map(|p| &p.payee_account).collect();
164
165 let avg_transaction_size = if total_transactions > 0 {
166 total_volume / total_transactions as f64
167 } else {
168 0.0
169 };
170
171 let mut hour_volumes: HashMap<u32, f64> = HashMap::new();
173 for payment in completed.iter() {
174 let hour = ((payment.initiated_at / 3600) % 24) as u32;
176 *hour_volumes.entry(hour).or_insert(0.0) += payment.amount;
177 }
178 let peak_hour = hour_volumes
179 .into_iter()
180 .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
181 .map(|(h, _)| h);
182
183 let n = node_metrics.len();
185 let actual_edges: HashSet<_> = completed
186 .iter()
187 .map(|p| (&p.payer_account, &p.payee_account))
188 .collect();
189 let max_edges = if n > 1 { n * (n - 1) } else { 1 };
190 let network_density = actual_edges.len() as f64 / max_edges as f64;
191
192 OverallMetrics {
193 total_volume,
194 total_transactions,
195 unique_payers: unique_payers.len(),
196 unique_payees: unique_payees.len(),
197 avg_transaction_size,
198 peak_hour,
199 network_density,
200 }
201 }
202
203 fn detect_anomalies(
205 flows: &[PaymentFlow],
206 node_metrics: &HashMap<String, NodeMetrics>,
207 config: &FlowAnalysisConfig,
208 ) -> Vec<FlowAnomaly> {
209 let mut anomalies = Vec::new();
210
211 let avg_volume: f64 =
213 flows.iter().map(|f| f.volume).sum::<f64>() / flows.len().max(1) as f64;
214 let std_volume = Self::calculate_std(&flows.iter().map(|f| f.volume).collect::<Vec<_>>());
215
216 for flow in flows {
217 if flow.volume > avg_volume + config.volume_threshold_std * std_volume {
218 anomalies.push(FlowAnomaly {
219 anomaly_type: FlowAnomalyType::UnusualVolume,
220 entity: format!("{}->{}", flow.source, flow.target),
221 description: format!(
222 "Unusually high volume: ${:.2} (avg: ${:.2})",
223 flow.volume, avg_volume
224 ),
225 severity: ((flow.volume - avg_volume) / std_volume / 10.0).min(1.0),
226 timestamp: 0,
227 });
228 }
229 }
230
231 let avg_count: f64 =
233 flows.iter().map(|f| f.count as f64).sum::<f64>() / flows.len().max(1) as f64;
234 for flow in flows {
235 if flow.count as f64 > avg_count * config.frequency_threshold_multiple {
236 anomalies.push(FlowAnomaly {
237 anomaly_type: FlowAnomalyType::UnusualFrequency,
238 entity: format!("{}->{}", flow.source, flow.target),
239 description: format!(
240 "Unusually high frequency: {} transactions (avg: {:.1})",
241 flow.count, avg_count
242 ),
243 severity: ((flow.count as f64 / avg_count) / 5.0).min(1.0),
244 timestamp: 0,
245 });
246 }
247 }
248
249 if config.detect_circular_flows {
251 let circular = Self::detect_circular_flows(flows);
252 anomalies.extend(circular);
253 }
254
255 if config.detect_rapid_movement {
257 let rapid = Self::detect_rapid_movement(node_metrics);
258 anomalies.extend(rapid);
259 }
260
261 anomalies
262 }
263
264 fn detect_circular_flows(flows: &[PaymentFlow]) -> Vec<FlowAnomaly> {
266 let mut anomalies = Vec::new();
267
268 let mut graph: HashMap<&str, HashSet<&str>> = HashMap::new();
270 for flow in flows {
271 graph.entry(&flow.source).or_default().insert(&flow.target);
272 }
273
274 for flow in flows {
276 if let Some(targets) = graph.get(flow.target.as_str()) {
277 if targets.contains(flow.source.as_str()) {
278 anomalies.push(FlowAnomaly {
279 anomaly_type: FlowAnomalyType::CircularFlow,
280 entity: format!("{}<->{}", flow.source, flow.target),
281 description: format!(
282 "Circular flow detected between {} and {}",
283 flow.source, flow.target
284 ),
285 severity: 0.7,
286 timestamp: 0,
287 });
288 }
289 }
290 }
291
292 let mut seen: HashSet<String> = HashSet::new();
294 anomalies.retain(|a| {
295 let key = if a.entity.contains("<->") {
296 let parts: Vec<&str> = a.entity.split("<->").collect();
297 if parts.len() == 2 {
298 let mut sorted = [parts[0], parts[1]];
299 sorted.sort();
300 format!("{}<->{}", sorted[0], sorted[1])
301 } else {
302 a.entity.clone()
303 }
304 } else {
305 a.entity.clone()
306 };
307 seen.insert(key)
308 });
309
310 anomalies
311 }
312
313 fn detect_rapid_movement(node_metrics: &HashMap<String, NodeMetrics>) -> Vec<FlowAnomaly> {
315 let mut anomalies = Vec::new();
316
317 for (node_id, metrics) in node_metrics {
319 let total_flow = metrics.total_inflow + metrics.total_outflow;
320 if total_flow > 0.0 {
321 let pass_through_ratio =
322 metrics.total_inflow.min(metrics.total_outflow) / (total_flow / 2.0);
323
324 if pass_through_ratio > 0.8
326 && metrics.inbound_count >= 2
327 && metrics.outbound_count >= 2
328 {
329 anomalies.push(FlowAnomaly {
330 anomaly_type: FlowAnomalyType::RapidMovement,
331 entity: node_id.clone(),
332 description: format!(
333 "Pass-through account: ${:.2} in, ${:.2} out ({:.0}% pass-through)",
334 metrics.total_inflow,
335 metrics.total_outflow,
336 pass_through_ratio * 100.0
337 ),
338 severity: pass_through_ratio,
339 timestamp: 0,
340 });
341 }
342 }
343 }
344
345 anomalies
346 }
347
348 fn calculate_std(values: &[f64]) -> f64 {
350 if values.is_empty() {
351 return 0.0;
352 }
353 let mean = values.iter().sum::<f64>() / values.len() as f64;
354 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
355 variance.sqrt()
356 }
357
358 pub fn top_flows_by_volume(payments: &[Payment], limit: usize) -> Vec<PaymentFlow> {
360 let mut flows = Self::build_flows(payments);
361 flows.sort_by(|a, b| {
362 b.volume
363 .partial_cmp(&a.volume)
364 .unwrap_or(std::cmp::Ordering::Equal)
365 });
366 flows.truncate(limit);
367 flows
368 }
369
370 pub fn top_flows_by_count(payments: &[Payment], limit: usize) -> Vec<PaymentFlow> {
372 let mut flows = Self::build_flows(payments);
373 flows.sort_by(|a, b| b.count.cmp(&a.count));
374 flows.truncate(limit);
375 flows
376 }
377
378 pub fn analyze_account(payments: &[Payment], account_id: &str) -> AccountFlowAnalysis {
380 let inbound: Vec<_> = payments
381 .iter()
382 .filter(|p| p.payee_account == account_id)
383 .collect();
384 let outbound: Vec<_> = payments
385 .iter()
386 .filter(|p| p.payer_account == account_id)
387 .collect();
388
389 let total_inbound: f64 = inbound.iter().map(|p| p.amount).sum();
390 let total_outbound: f64 = outbound.iter().map(|p| p.amount).sum();
391
392 let unique_sources: HashSet<_> = inbound.iter().map(|p| &p.payer_account).collect();
393 let unique_destinations: HashSet<_> = outbound.iter().map(|p| &p.payee_account).collect();
394
395 let mut type_breakdown: HashMap<PaymentType, (usize, f64)> = HashMap::new();
397 for payment in inbound.iter().chain(outbound.iter()) {
398 let entry = type_breakdown
399 .entry(payment.payment_type)
400 .or_insert((0, 0.0));
401 entry.0 += 1;
402 entry.1 += payment.amount;
403 }
404
405 AccountFlowAnalysis {
406 account_id: account_id.to_string(),
407 total_inbound,
408 total_outbound,
409 net_flow: total_inbound - total_outbound,
410 inbound_count: inbound.len(),
411 outbound_count: outbound.len(),
412 unique_sources: unique_sources.len(),
413 unique_destinations: unique_destinations.len(),
414 payment_type_breakdown: type_breakdown,
415 }
416 }
417}
418
419impl GpuKernel for FlowAnalysis {
420 fn metadata(&self) -> &KernelMetadata {
421 &self.metadata
422 }
423}
424
425#[derive(Debug, Clone)]
431pub struct FlowAnalysisConfig {
432 pub detect_anomalies: bool,
434 pub volume_threshold_std: f64,
436 pub frequency_threshold_multiple: f64,
438 pub detect_circular_flows: bool,
440 pub structuring_threshold: f64,
442 pub detect_rapid_movement: bool,
444}
445
446impl Default for FlowAnalysisConfig {
447 fn default() -> Self {
448 Self {
449 detect_anomalies: true,
450 volume_threshold_std: 3.0,
451 frequency_threshold_multiple: 5.0,
452 detect_circular_flows: true,
453 structuring_threshold: 10000.0,
454 detect_rapid_movement: true,
455 }
456 }
457}
458
459#[derive(Debug, Clone)]
461pub struct AccountFlowAnalysis {
462 pub account_id: String,
464 pub total_inbound: f64,
466 pub total_outbound: f64,
468 pub net_flow: f64,
470 pub inbound_count: usize,
472 pub outbound_count: usize,
474 pub unique_sources: usize,
476 pub unique_destinations: usize,
478 pub payment_type_breakdown: HashMap<PaymentType, (usize, f64)>,
480}
481
482#[cfg(test)]
487mod tests {
488 use super::*;
489
490 fn create_test_payment(
491 id: &str,
492 payer: &str,
493 payee: &str,
494 amount: f64,
495 payment_type: PaymentType,
496 ) -> Payment {
497 Payment {
498 id: id.to_string(),
499 payer_account: payer.to_string(),
500 payee_account: payee.to_string(),
501 amount,
502 currency: "USD".to_string(),
503 payment_type,
504 status: PaymentStatus::Completed,
505 initiated_at: 1000,
506 completed_at: Some(1001),
507 reference: format!("REF-{}", id),
508 priority: PaymentPriority::Normal,
509 attributes: HashMap::new(),
510 }
511 }
512
513 fn create_test_payments() -> Vec<Payment> {
514 vec![
515 create_test_payment("P001", "A", "B", 1000.0, PaymentType::ACH),
516 create_test_payment("P002", "A", "B", 500.0, PaymentType::ACH),
517 create_test_payment("P003", "B", "C", 800.0, PaymentType::Wire),
518 create_test_payment("P004", "C", "A", 600.0, PaymentType::RealTime),
519 create_test_payment("P005", "A", "C", 300.0, PaymentType::ACH),
520 ]
521 }
522
523 #[test]
524 fn test_build_flows() {
525 let payments = create_test_payments();
526
527 let flows = FlowAnalysis::build_flows(&payments);
528
529 assert_eq!(flows.len(), 4); let ab_flow = flows
532 .iter()
533 .find(|f| f.source == "A" && f.target == "B")
534 .unwrap();
535 assert_eq!(ab_flow.volume, 1500.0);
536 assert_eq!(ab_flow.count, 2);
537 assert_eq!(ab_flow.avg_amount, 750.0);
538 }
539
540 #[test]
541 fn test_calculate_node_metrics() {
542 let payments = create_test_payments();
543 let flows = FlowAnalysis::build_flows(&payments);
544
545 let metrics = FlowAnalysis::calculate_node_metrics(&flows, &payments);
546
547 assert_eq!(metrics.len(), 3); let a_metrics = metrics.get("A").unwrap();
550 assert_eq!(a_metrics.total_outflow, 1800.0); assert_eq!(a_metrics.total_inflow, 600.0); assert_eq!(a_metrics.net_flow, -1200.0);
553 }
554
555 #[test]
556 fn test_overall_metrics() {
557 let payments = create_test_payments();
558 let config = FlowAnalysisConfig::default();
559
560 let result = FlowAnalysis::analyze(&payments, &config);
561
562 assert_eq!(result.overall_metrics.total_transactions, 5);
563 assert_eq!(result.overall_metrics.total_volume, 3200.0);
564 assert_eq!(result.overall_metrics.unique_payers, 3);
565 assert_eq!(result.overall_metrics.unique_payees, 3);
566 assert_eq!(result.overall_metrics.avg_transaction_size, 640.0);
567 }
568
569 #[test]
570 fn test_detect_circular_flows() {
571 let payments = vec![
572 create_test_payment("P001", "A", "B", 1000.0, PaymentType::ACH),
573 create_test_payment("P002", "B", "A", 900.0, PaymentType::ACH),
574 ];
575 let config = FlowAnalysisConfig::default();
576
577 let result = FlowAnalysis::analyze(&payments, &config);
578
579 let circular = result
580 .anomalies
581 .iter()
582 .filter(|a| a.anomaly_type == FlowAnomalyType::CircularFlow)
583 .count();
584 assert!(circular > 0);
585 }
586
587 #[test]
588 fn test_detect_rapid_movement() {
589 let payments = vec![
591 create_test_payment("P001", "A", "B", 10000.0, PaymentType::ACH),
592 create_test_payment("P002", "C", "B", 10000.0, PaymentType::ACH),
593 create_test_payment("P003", "B", "D", 9500.0, PaymentType::Wire),
594 create_test_payment("P004", "B", "E", 10000.0, PaymentType::Wire),
595 ];
596 let config = FlowAnalysisConfig::default();
597
598 let result = FlowAnalysis::analyze(&payments, &config);
599
600 let rapid = result
601 .anomalies
602 .iter()
603 .filter(|a| a.anomaly_type == FlowAnomalyType::RapidMovement)
604 .count();
605 assert!(rapid > 0);
606 }
607
608 #[test]
609 fn test_top_flows_by_volume() {
610 let payments = create_test_payments();
611
612 let top = FlowAnalysis::top_flows_by_volume(&payments, 2);
613
614 assert_eq!(top.len(), 2);
615 assert_eq!(top[0].volume, 1500.0); }
617
618 #[test]
619 fn test_top_flows_by_count() {
620 let payments = create_test_payments();
621
622 let top = FlowAnalysis::top_flows_by_count(&payments, 2);
623
624 assert_eq!(top.len(), 2);
625 assert_eq!(top[0].count, 2); }
627
628 #[test]
629 fn test_analyze_account() {
630 let payments = create_test_payments();
631
632 let analysis = FlowAnalysis::analyze_account(&payments, "A");
633
634 assert_eq!(analysis.account_id, "A");
635 assert_eq!(analysis.total_outbound, 1800.0);
636 assert_eq!(analysis.total_inbound, 600.0);
637 assert_eq!(analysis.outbound_count, 3);
638 assert_eq!(analysis.inbound_count, 1);
639 assert_eq!(analysis.unique_destinations, 2); assert_eq!(analysis.unique_sources, 1); }
642
643 #[test]
644 fn test_network_density() {
645 let payments = vec![
647 create_test_payment("P001", "A", "B", 1000.0, PaymentType::ACH),
648 create_test_payment("P002", "B", "C", 500.0, PaymentType::ACH),
649 ];
650 let config = FlowAnalysisConfig::default();
651
652 let result = FlowAnalysis::analyze(&payments, &config);
653
654 assert!((result.overall_metrics.network_density - 2.0 / 6.0).abs() < 0.01);
656 }
657
658 #[test]
659 fn test_unusual_volume_detection() {
660 let payments = vec![
662 create_test_payment("P001", "A", "B", 100.0, PaymentType::ACH),
663 create_test_payment("P002", "B", "C", 100.0, PaymentType::ACH),
664 create_test_payment("P003", "C", "D", 100.0, PaymentType::ACH),
665 create_test_payment("P004", "D", "E", 100.0, PaymentType::ACH),
666 create_test_payment("P005", "E", "F", 100.0, PaymentType::ACH),
667 create_test_payment("P006", "F", "G", 100.0, PaymentType::ACH),
668 create_test_payment("P007", "G", "H", 100.0, PaymentType::ACH),
669 create_test_payment("P008", "H", "I", 100.0, PaymentType::ACH),
670 create_test_payment("P009", "I", "J", 100.0, PaymentType::ACH),
671 create_test_payment("P010", "J", "K", 100000.0, PaymentType::Wire), ];
673 let config = FlowAnalysisConfig {
674 volume_threshold_std: 2.0,
675 ..Default::default()
676 };
677
678 let result = FlowAnalysis::analyze(&payments, &config);
679
680 let unusual_volume = result
681 .anomalies
682 .iter()
683 .filter(|a| a.anomaly_type == FlowAnomalyType::UnusualVolume)
684 .count();
685 assert!(unusual_volume > 0);
686 }
687
688 #[test]
689 fn test_payment_type_breakdown() {
690 let payments = vec![
691 create_test_payment("P001", "A", "B", 1000.0, PaymentType::ACH),
692 create_test_payment("P002", "A", "C", 500.0, PaymentType::ACH),
693 create_test_payment("P003", "D", "A", 800.0, PaymentType::Wire),
694 ];
695
696 let analysis = FlowAnalysis::analyze_account(&payments, "A");
697
698 assert!(
700 analysis
701 .payment_type_breakdown
702 .contains_key(&PaymentType::ACH)
703 );
704 assert!(
705 analysis
706 .payment_type_breakdown
707 .contains_key(&PaymentType::Wire)
708 );
709 assert_eq!(analysis.payment_type_breakdown[&PaymentType::ACH].0, 2);
710 }
711
712 #[test]
713 fn test_empty_payments() {
714 let payments: Vec<Payment> = vec![];
715 let config = FlowAnalysisConfig::default();
716
717 let result = FlowAnalysis::analyze(&payments, &config);
718
719 assert!(result.flows.is_empty());
720 assert!(result.node_metrics.is_empty());
721 assert_eq!(result.overall_metrics.total_transactions, 0);
722 assert_eq!(result.overall_metrics.total_volume, 0.0);
723 }
724
725 #[test]
726 fn test_no_anomaly_detection() {
727 let payments = create_test_payments();
728 let config = FlowAnalysisConfig {
729 detect_anomalies: false,
730 ..Default::default()
731 };
732
733 let result = FlowAnalysis::analyze(&payments, &config);
734
735 assert!(result.anomalies.is_empty());
736 }
737}