rustkernel_payments/
flow.rs

1//! Payment flow analysis kernel.
2//!
3//! Batch-mode kernel for analyzing payment flows and network metrics.
4
5use crate::types::*;
6use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
7use std::collections::{HashMap, HashSet};
8
9// ============================================================================
10// FlowAnalysis Kernel
11// ============================================================================
12
13/// Payment flow analysis kernel for network metrics and pattern detection.
14///
15/// Analyzes payment flows to identify patterns, calculate network metrics,
16/// and detect anomalies in payment behavior.
17#[derive(Debug, Clone)]
18pub struct FlowAnalysis {
19    metadata: KernelMetadata,
20}
21
22impl Default for FlowAnalysis {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl FlowAnalysis {
29    /// Create a new flow analysis kernel.
30    #[must_use]
31    pub fn new() -> Self {
32        Self {
33            metadata: KernelMetadata::batch("payments/flow-analysis", Domain::PaymentProcessing)
34                .with_description("Payment flow network analysis and metrics")
35                .with_throughput(100_000)
36                .with_latency_us(50.0),
37        }
38    }
39
40    /// Analyze payment flows.
41    pub fn analyze(payments: &[Payment], config: &FlowAnalysisConfig) -> FlowAnalysisResult {
42        // Build flow graph
43        let flows = Self::build_flows(payments);
44
45        // Calculate node metrics
46        let node_metrics = Self::calculate_node_metrics(&flows, payments);
47
48        // Calculate overall metrics
49        let overall_metrics = Self::calculate_overall_metrics(payments, &node_metrics);
50
51        // Detect anomalies
52        let anomalies = if config.detect_anomalies {
53            Self::detect_anomalies(&flows, &node_metrics, config)
54        } else {
55            Vec::new()
56        };
57
58        FlowAnalysisResult {
59            flows,
60            node_metrics,
61            overall_metrics,
62            anomalies,
63        }
64    }
65
66    /// Build payment flows from transactions.
67    fn build_flows(payments: &[Payment]) -> Vec<PaymentFlow> {
68        let mut flow_map: HashMap<(String, String), (f64, usize)> = HashMap::new();
69
70        for payment in payments {
71            if payment.status == PaymentStatus::Completed
72                || payment.status == PaymentStatus::Processing
73            {
74                let key = (payment.payer_account.clone(), payment.payee_account.clone());
75                let entry = flow_map.entry(key).or_insert((0.0, 0));
76                entry.0 += payment.amount;
77                entry.1 += 1;
78            }
79        }
80
81        flow_map
82            .into_iter()
83            .map(|((source, target), (volume, count))| PaymentFlow {
84                source,
85                target,
86                volume,
87                count,
88                avg_amount: volume / count as f64,
89            })
90            .collect()
91    }
92
93    /// Calculate metrics for each node (account).
94    fn calculate_node_metrics(
95        flows: &[PaymentFlow],
96        payments: &[Payment],
97    ) -> HashMap<String, NodeMetrics> {
98        let mut metrics: HashMap<String, NodeMetrics> = HashMap::new();
99
100        // Initialize with all accounts seen
101        let mut accounts: HashSet<String> = HashSet::new();
102        for payment in payments {
103            accounts.insert(payment.payer_account.clone());
104            accounts.insert(payment.payee_account.clone());
105        }
106
107        for account in accounts {
108            metrics.insert(
109                account.clone(),
110                NodeMetrics {
111                    node_id: account,
112                    total_inflow: 0.0,
113                    total_outflow: 0.0,
114                    net_flow: 0.0,
115                    inbound_count: 0,
116                    outbound_count: 0,
117                    centrality: 0.0,
118                },
119            );
120        }
121
122        // Calculate in/out flows
123        for flow in flows {
124            if let Some(source_metrics) = metrics.get_mut(&flow.source) {
125                source_metrics.total_outflow += flow.volume;
126                source_metrics.outbound_count += flow.count;
127            }
128            if let Some(target_metrics) = metrics.get_mut(&flow.target) {
129                target_metrics.total_inflow += flow.volume;
130                target_metrics.inbound_count += flow.count;
131            }
132        }
133
134        // Calculate net flow and basic centrality
135        let total_flow: f64 = flows.iter().map(|f| f.volume).sum();
136        for (_, node) in metrics.iter_mut() {
137            node.net_flow = node.total_inflow - node.total_outflow;
138            // Simple degree centrality based on flow volume
139            if total_flow > 0.0 {
140                node.centrality = (node.total_inflow + node.total_outflow) / (2.0 * total_flow);
141            }
142        }
143
144        metrics
145    }
146
147    /// Calculate overall network metrics.
148    fn calculate_overall_metrics(
149        payments: &[Payment],
150        node_metrics: &HashMap<String, NodeMetrics>,
151    ) -> OverallMetrics {
152        let completed: Vec<_> = payments
153            .iter()
154            .filter(|p| {
155                p.status == PaymentStatus::Completed || p.status == PaymentStatus::Processing
156            })
157            .collect();
158
159        let total_volume: f64 = completed.iter().map(|p| p.amount).sum();
160        let total_transactions = completed.len();
161
162        let unique_payers: HashSet<_> = completed.iter().map(|p| &p.payer_account).collect();
163        let unique_payees: HashSet<_> = completed.iter().map(|p| &p.payee_account).collect();
164
165        let avg_transaction_size = if total_transactions > 0 {
166            total_volume / total_transactions as f64
167        } else {
168            0.0
169        };
170
171        // Calculate peak hour
172        let mut hour_volumes: HashMap<u32, f64> = HashMap::new();
173        for payment in completed.iter() {
174            // Extract hour from timestamp (simplified)
175            let hour = ((payment.initiated_at / 3600) % 24) as u32;
176            *hour_volumes.entry(hour).or_insert(0.0) += payment.amount;
177        }
178        let peak_hour = hour_volumes
179            .into_iter()
180            .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal))
181            .map(|(h, _)| h);
182
183        // Calculate network density
184        let n = node_metrics.len();
185        let actual_edges: HashSet<_> = completed
186            .iter()
187            .map(|p| (&p.payer_account, &p.payee_account))
188            .collect();
189        let max_edges = if n > 1 { n * (n - 1) } else { 1 };
190        let network_density = actual_edges.len() as f64 / max_edges as f64;
191
192        OverallMetrics {
193            total_volume,
194            total_transactions,
195            unique_payers: unique_payers.len(),
196            unique_payees: unique_payees.len(),
197            avg_transaction_size,
198            peak_hour,
199            network_density,
200        }
201    }
202
203    /// Detect anomalies in payment flows.
204    fn detect_anomalies(
205        flows: &[PaymentFlow],
206        node_metrics: &HashMap<String, NodeMetrics>,
207        config: &FlowAnalysisConfig,
208    ) -> Vec<FlowAnomaly> {
209        let mut anomalies = Vec::new();
210
211        // Detect unusual volume
212        let avg_volume: f64 =
213            flows.iter().map(|f| f.volume).sum::<f64>() / flows.len().max(1) as f64;
214        let std_volume = Self::calculate_std(&flows.iter().map(|f| f.volume).collect::<Vec<_>>());
215
216        for flow in flows {
217            if flow.volume > avg_volume + config.volume_threshold_std * std_volume {
218                anomalies.push(FlowAnomaly {
219                    anomaly_type: FlowAnomalyType::UnusualVolume,
220                    entity: format!("{}->{}", flow.source, flow.target),
221                    description: format!(
222                        "Unusually high volume: ${:.2} (avg: ${:.2})",
223                        flow.volume, avg_volume
224                    ),
225                    severity: ((flow.volume - avg_volume) / std_volume / 10.0).min(1.0),
226                    timestamp: 0,
227                });
228            }
229        }
230
231        // Detect unusual frequency
232        let avg_count: f64 =
233            flows.iter().map(|f| f.count as f64).sum::<f64>() / flows.len().max(1) as f64;
234        for flow in flows {
235            if flow.count as f64 > avg_count * config.frequency_threshold_multiple {
236                anomalies.push(FlowAnomaly {
237                    anomaly_type: FlowAnomalyType::UnusualFrequency,
238                    entity: format!("{}->{}", flow.source, flow.target),
239                    description: format!(
240                        "Unusually high frequency: {} transactions (avg: {:.1})",
241                        flow.count, avg_count
242                    ),
243                    severity: ((flow.count as f64 / avg_count) / 5.0).min(1.0),
244                    timestamp: 0,
245                });
246            }
247        }
248
249        // Detect circular flows
250        if config.detect_circular_flows {
251            let circular = Self::detect_circular_flows(flows);
252            anomalies.extend(circular);
253        }
254
255        // Detect rapid movement
256        if config.detect_rapid_movement {
257            let rapid = Self::detect_rapid_movement(node_metrics);
258            anomalies.extend(rapid);
259        }
260
261        anomalies
262    }
263
264    /// Detect circular payment flows.
265    fn detect_circular_flows(flows: &[PaymentFlow]) -> Vec<FlowAnomaly> {
266        let mut anomalies = Vec::new();
267
268        // Build adjacency list
269        let mut graph: HashMap<&str, HashSet<&str>> = HashMap::new();
270        for flow in flows {
271            graph.entry(&flow.source).or_default().insert(&flow.target);
272        }
273
274        // Check for direct cycles (A->B->A)
275        for flow in flows {
276            if let Some(targets) = graph.get(flow.target.as_str()) {
277                if targets.contains(flow.source.as_str()) {
278                    anomalies.push(FlowAnomaly {
279                        anomaly_type: FlowAnomalyType::CircularFlow,
280                        entity: format!("{}<->{}", flow.source, flow.target),
281                        description: format!(
282                            "Circular flow detected between {} and {}",
283                            flow.source, flow.target
284                        ),
285                        severity: 0.7,
286                        timestamp: 0,
287                    });
288                }
289            }
290        }
291
292        // Deduplicate (A->B and B->A would create two entries)
293        let mut seen: HashSet<String> = HashSet::new();
294        anomalies.retain(|a| {
295            let key = if a.entity.contains("<->") {
296                let parts: Vec<&str> = a.entity.split("<->").collect();
297                if parts.len() == 2 {
298                    let mut sorted = [parts[0], parts[1]];
299                    sorted.sort();
300                    format!("{}<->{}", sorted[0], sorted[1])
301                } else {
302                    a.entity.clone()
303                }
304            } else {
305                a.entity.clone()
306            };
307            seen.insert(key)
308        });
309
310        anomalies
311    }
312
313    /// Detect rapid money movement.
314    fn detect_rapid_movement(node_metrics: &HashMap<String, NodeMetrics>) -> Vec<FlowAnomaly> {
315        let mut anomalies = Vec::new();
316
317        // Find accounts that are pass-through (high inflow and outflow, low net)
318        for (node_id, metrics) in node_metrics {
319            let total_flow = metrics.total_inflow + metrics.total_outflow;
320            if total_flow > 0.0 {
321                let pass_through_ratio =
322                    metrics.total_inflow.min(metrics.total_outflow) / (total_flow / 2.0);
323
324                // If >80% of money flows through (in ~= out), it's suspicious
325                if pass_through_ratio > 0.8
326                    && metrics.inbound_count >= 2
327                    && metrics.outbound_count >= 2
328                {
329                    anomalies.push(FlowAnomaly {
330                        anomaly_type: FlowAnomalyType::RapidMovement,
331                        entity: node_id.clone(),
332                        description: format!(
333                            "Pass-through account: ${:.2} in, ${:.2} out ({:.0}% pass-through)",
334                            metrics.total_inflow,
335                            metrics.total_outflow,
336                            pass_through_ratio * 100.0
337                        ),
338                        severity: pass_through_ratio,
339                        timestamp: 0,
340                    });
341                }
342            }
343        }
344
345        anomalies
346    }
347
348    /// Calculate standard deviation.
349    fn calculate_std(values: &[f64]) -> f64 {
350        if values.is_empty() {
351            return 0.0;
352        }
353        let mean = values.iter().sum::<f64>() / values.len() as f64;
354        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
355        variance.sqrt()
356    }
357
358    /// Get top flows by volume.
359    pub fn top_flows_by_volume(payments: &[Payment], limit: usize) -> Vec<PaymentFlow> {
360        let mut flows = Self::build_flows(payments);
361        flows.sort_by(|a, b| {
362            b.volume
363                .partial_cmp(&a.volume)
364                .unwrap_or(std::cmp::Ordering::Equal)
365        });
366        flows.truncate(limit);
367        flows
368    }
369
370    /// Get top flows by count.
371    pub fn top_flows_by_count(payments: &[Payment], limit: usize) -> Vec<PaymentFlow> {
372        let mut flows = Self::build_flows(payments);
373        flows.sort_by(|a, b| b.count.cmp(&a.count));
374        flows.truncate(limit);
375        flows
376    }
377
378    /// Analyze flows for a specific account.
379    pub fn analyze_account(payments: &[Payment], account_id: &str) -> AccountFlowAnalysis {
380        let inbound: Vec<_> = payments
381            .iter()
382            .filter(|p| p.payee_account == account_id)
383            .collect();
384        let outbound: Vec<_> = payments
385            .iter()
386            .filter(|p| p.payer_account == account_id)
387            .collect();
388
389        let total_inbound: f64 = inbound.iter().map(|p| p.amount).sum();
390        let total_outbound: f64 = outbound.iter().map(|p| p.amount).sum();
391
392        let unique_sources: HashSet<_> = inbound.iter().map(|p| &p.payer_account).collect();
393        let unique_destinations: HashSet<_> = outbound.iter().map(|p| &p.payee_account).collect();
394
395        // Payment type breakdown
396        let mut type_breakdown: HashMap<PaymentType, (usize, f64)> = HashMap::new();
397        for payment in inbound.iter().chain(outbound.iter()) {
398            let entry = type_breakdown
399                .entry(payment.payment_type)
400                .or_insert((0, 0.0));
401            entry.0 += 1;
402            entry.1 += payment.amount;
403        }
404
405        AccountFlowAnalysis {
406            account_id: account_id.to_string(),
407            total_inbound,
408            total_outbound,
409            net_flow: total_inbound - total_outbound,
410            inbound_count: inbound.len(),
411            outbound_count: outbound.len(),
412            unique_sources: unique_sources.len(),
413            unique_destinations: unique_destinations.len(),
414            payment_type_breakdown: type_breakdown,
415        }
416    }
417}
418
419impl GpuKernel for FlowAnalysis {
420    fn metadata(&self) -> &KernelMetadata {
421        &self.metadata
422    }
423}
424
425// ============================================================================
426// Configuration Types
427// ============================================================================
428
429/// Flow analysis configuration.
430#[derive(Debug, Clone)]
431pub struct FlowAnalysisConfig {
432    /// Detect anomalies.
433    pub detect_anomalies: bool,
434    /// Volume threshold (standard deviations).
435    pub volume_threshold_std: f64,
436    /// Frequency threshold (multiple of average).
437    pub frequency_threshold_multiple: f64,
438    /// Detect circular flows.
439    pub detect_circular_flows: bool,
440    /// Structuring detection threshold.
441    pub structuring_threshold: f64,
442    /// Detect rapid movement patterns.
443    pub detect_rapid_movement: bool,
444}
445
446impl Default for FlowAnalysisConfig {
447    fn default() -> Self {
448        Self {
449            detect_anomalies: true,
450            volume_threshold_std: 3.0,
451            frequency_threshold_multiple: 5.0,
452            detect_circular_flows: true,
453            structuring_threshold: 10000.0,
454            detect_rapid_movement: true,
455        }
456    }
457}
458
459/// Account flow analysis result.
460#[derive(Debug, Clone)]
461pub struct AccountFlowAnalysis {
462    /// Account ID.
463    pub account_id: String,
464    /// Total inbound volume.
465    pub total_inbound: f64,
466    /// Total outbound volume.
467    pub total_outbound: f64,
468    /// Net flow (inbound - outbound).
469    pub net_flow: f64,
470    /// Inbound transaction count.
471    pub inbound_count: usize,
472    /// Outbound transaction count.
473    pub outbound_count: usize,
474    /// Unique sources.
475    pub unique_sources: usize,
476    /// Unique destinations.
477    pub unique_destinations: usize,
478    /// Breakdown by payment type.
479    pub payment_type_breakdown: HashMap<PaymentType, (usize, f64)>,
480}
481
482// ============================================================================
483// Tests
484// ============================================================================
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489
490    fn create_test_payment(
491        id: &str,
492        payer: &str,
493        payee: &str,
494        amount: f64,
495        payment_type: PaymentType,
496    ) -> Payment {
497        Payment {
498            id: id.to_string(),
499            payer_account: payer.to_string(),
500            payee_account: payee.to_string(),
501            amount,
502            currency: "USD".to_string(),
503            payment_type,
504            status: PaymentStatus::Completed,
505            initiated_at: 1000,
506            completed_at: Some(1001),
507            reference: format!("REF-{}", id),
508            priority: PaymentPriority::Normal,
509            attributes: HashMap::new(),
510        }
511    }
512
513    fn create_test_payments() -> Vec<Payment> {
514        vec![
515            create_test_payment("P001", "A", "B", 1000.0, PaymentType::ACH),
516            create_test_payment("P002", "A", "B", 500.0, PaymentType::ACH),
517            create_test_payment("P003", "B", "C", 800.0, PaymentType::Wire),
518            create_test_payment("P004", "C", "A", 600.0, PaymentType::RealTime),
519            create_test_payment("P005", "A", "C", 300.0, PaymentType::ACH),
520        ]
521    }
522
523    #[test]
524    fn test_build_flows() {
525        let payments = create_test_payments();
526
527        let flows = FlowAnalysis::build_flows(&payments);
528
529        assert_eq!(flows.len(), 4); // A->B, B->C, C->A, A->C
530
531        let ab_flow = flows
532            .iter()
533            .find(|f| f.source == "A" && f.target == "B")
534            .unwrap();
535        assert_eq!(ab_flow.volume, 1500.0);
536        assert_eq!(ab_flow.count, 2);
537        assert_eq!(ab_flow.avg_amount, 750.0);
538    }
539
540    #[test]
541    fn test_calculate_node_metrics() {
542        let payments = create_test_payments();
543        let flows = FlowAnalysis::build_flows(&payments);
544
545        let metrics = FlowAnalysis::calculate_node_metrics(&flows, &payments);
546
547        assert_eq!(metrics.len(), 3); // A, B, C
548
549        let a_metrics = metrics.get("A").unwrap();
550        assert_eq!(a_metrics.total_outflow, 1800.0); // 1000 + 500 + 300
551        assert_eq!(a_metrics.total_inflow, 600.0); // From C
552        assert_eq!(a_metrics.net_flow, -1200.0);
553    }
554
555    #[test]
556    fn test_overall_metrics() {
557        let payments = create_test_payments();
558        let config = FlowAnalysisConfig::default();
559
560        let result = FlowAnalysis::analyze(&payments, &config);
561
562        assert_eq!(result.overall_metrics.total_transactions, 5);
563        assert_eq!(result.overall_metrics.total_volume, 3200.0);
564        assert_eq!(result.overall_metrics.unique_payers, 3);
565        assert_eq!(result.overall_metrics.unique_payees, 3);
566        assert_eq!(result.overall_metrics.avg_transaction_size, 640.0);
567    }
568
569    #[test]
570    fn test_detect_circular_flows() {
571        let payments = vec![
572            create_test_payment("P001", "A", "B", 1000.0, PaymentType::ACH),
573            create_test_payment("P002", "B", "A", 900.0, PaymentType::ACH),
574        ];
575        let config = FlowAnalysisConfig::default();
576
577        let result = FlowAnalysis::analyze(&payments, &config);
578
579        let circular = result
580            .anomalies
581            .iter()
582            .filter(|a| a.anomaly_type == FlowAnomalyType::CircularFlow)
583            .count();
584        assert!(circular > 0);
585    }
586
587    #[test]
588    fn test_detect_rapid_movement() {
589        // Account B is a pass-through: receives from A and C, sends to D and E
590        let payments = vec![
591            create_test_payment("P001", "A", "B", 10000.0, PaymentType::ACH),
592            create_test_payment("P002", "C", "B", 10000.0, PaymentType::ACH),
593            create_test_payment("P003", "B", "D", 9500.0, PaymentType::Wire),
594            create_test_payment("P004", "B", "E", 10000.0, PaymentType::Wire),
595        ];
596        let config = FlowAnalysisConfig::default();
597
598        let result = FlowAnalysis::analyze(&payments, &config);
599
600        let rapid = result
601            .anomalies
602            .iter()
603            .filter(|a| a.anomaly_type == FlowAnomalyType::RapidMovement)
604            .count();
605        assert!(rapid > 0);
606    }
607
608    #[test]
609    fn test_top_flows_by_volume() {
610        let payments = create_test_payments();
611
612        let top = FlowAnalysis::top_flows_by_volume(&payments, 2);
613
614        assert_eq!(top.len(), 2);
615        assert_eq!(top[0].volume, 1500.0); // A->B
616    }
617
618    #[test]
619    fn test_top_flows_by_count() {
620        let payments = create_test_payments();
621
622        let top = FlowAnalysis::top_flows_by_count(&payments, 2);
623
624        assert_eq!(top.len(), 2);
625        assert_eq!(top[0].count, 2); // A->B has 2 transactions
626    }
627
628    #[test]
629    fn test_analyze_account() {
630        let payments = create_test_payments();
631
632        let analysis = FlowAnalysis::analyze_account(&payments, "A");
633
634        assert_eq!(analysis.account_id, "A");
635        assert_eq!(analysis.total_outbound, 1800.0);
636        assert_eq!(analysis.total_inbound, 600.0);
637        assert_eq!(analysis.outbound_count, 3);
638        assert_eq!(analysis.inbound_count, 1);
639        assert_eq!(analysis.unique_destinations, 2); // B and C
640        assert_eq!(analysis.unique_sources, 1); // C
641    }
642
643    #[test]
644    fn test_network_density() {
645        // Sparse network: 3 nodes but only 2 edges
646        let payments = vec![
647            create_test_payment("P001", "A", "B", 1000.0, PaymentType::ACH),
648            create_test_payment("P002", "B", "C", 500.0, PaymentType::ACH),
649        ];
650        let config = FlowAnalysisConfig::default();
651
652        let result = FlowAnalysis::analyze(&payments, &config);
653
654        // 3 nodes, 2 actual edges, max possible = 3*2 = 6
655        assert!((result.overall_metrics.network_density - 2.0 / 6.0).abs() < 0.01);
656    }
657
658    #[test]
659    fn test_unusual_volume_detection() {
660        // Many flows with similar volume, one massive outlier
661        let payments = vec![
662            create_test_payment("P001", "A", "B", 100.0, PaymentType::ACH),
663            create_test_payment("P002", "B", "C", 100.0, PaymentType::ACH),
664            create_test_payment("P003", "C", "D", 100.0, PaymentType::ACH),
665            create_test_payment("P004", "D", "E", 100.0, PaymentType::ACH),
666            create_test_payment("P005", "E", "F", 100.0, PaymentType::ACH),
667            create_test_payment("P006", "F", "G", 100.0, PaymentType::ACH),
668            create_test_payment("P007", "G", "H", 100.0, PaymentType::ACH),
669            create_test_payment("P008", "H", "I", 100.0, PaymentType::ACH),
670            create_test_payment("P009", "I", "J", 100.0, PaymentType::ACH),
671            create_test_payment("P010", "J", "K", 100000.0, PaymentType::Wire), // Outlier
672        ];
673        let config = FlowAnalysisConfig {
674            volume_threshold_std: 2.0,
675            ..Default::default()
676        };
677
678        let result = FlowAnalysis::analyze(&payments, &config);
679
680        let unusual_volume = result
681            .anomalies
682            .iter()
683            .filter(|a| a.anomaly_type == FlowAnomalyType::UnusualVolume)
684            .count();
685        assert!(unusual_volume > 0);
686    }
687
688    #[test]
689    fn test_payment_type_breakdown() {
690        let payments = vec![
691            create_test_payment("P001", "A", "B", 1000.0, PaymentType::ACH),
692            create_test_payment("P002", "A", "C", 500.0, PaymentType::ACH),
693            create_test_payment("P003", "D", "A", 800.0, PaymentType::Wire),
694        ];
695
696        let analysis = FlowAnalysis::analyze_account(&payments, "A");
697
698        // A has 2 ACH outbound and 1 Wire inbound
699        assert!(
700            analysis
701                .payment_type_breakdown
702                .contains_key(&PaymentType::ACH)
703        );
704        assert!(
705            analysis
706                .payment_type_breakdown
707                .contains_key(&PaymentType::Wire)
708        );
709        assert_eq!(analysis.payment_type_breakdown[&PaymentType::ACH].0, 2);
710    }
711
712    #[test]
713    fn test_empty_payments() {
714        let payments: Vec<Payment> = vec![];
715        let config = FlowAnalysisConfig::default();
716
717        let result = FlowAnalysis::analyze(&payments, &config);
718
719        assert!(result.flows.is_empty());
720        assert!(result.node_metrics.is_empty());
721        assert_eq!(result.overall_metrics.total_transactions, 0);
722        assert_eq!(result.overall_metrics.total_volume, 0.0);
723    }
724
725    #[test]
726    fn test_no_anomaly_detection() {
727        let payments = create_test_payments();
728        let config = FlowAnalysisConfig {
729            detect_anomalies: false,
730            ..Default::default()
731        };
732
733        let result = FlowAnalysis::analyze(&payments, &config);
734
735        assert!(result.anomalies.is_empty());
736    }
737}