rustkernel_compliance/
aml.rs

1//! Anti-Money Laundering (AML) kernels.
2//!
3//! This module provides AML detection algorithms:
4//! - Circular flow detection (SCC-based)
5//! - Reciprocity analysis
6//! - Rapid movement (velocity) analysis
7//! - Multi-pattern AML detection
8
9use crate::messages::{
10    AMLPatternInput, AMLPatternOutput, CircularFlowInput, CircularFlowOutput, RapidMovementInput,
11    RapidMovementOutput, ReciprocityFlowInput, ReciprocityFlowOutput,
12};
13use crate::ring_messages::{
14    AddGraphEdgeResponse, AddGraphEdgeRing, MatchPatternResponse, MatchPatternRing,
15    QueryCircularRatioResponse, QueryCircularRatioRing,
16};
17use crate::types::{
18    AMLPattern, AMLPatternResult, CircularFlowResult, PatternDetail, RapidMovementResult,
19    ReciprocityResult, TimeWindow, Transaction,
20};
21use async_trait::async_trait;
22use ringkernel_core::RingContext;
23use rustkernel_core::error::Result;
24use rustkernel_core::traits::{BatchKernel, RingKernelHandler};
25use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
26use std::collections::{HashMap, HashSet};
27use std::time::Instant;
28
29// ============================================================================
30// Circular Flow Ratio Kernel
31// ============================================================================
32
33/// Per-entity circular flow state.
34#[derive(Debug, Clone, Default)]
35pub struct EntityCircularState {
36    /// Total outgoing volume.
37    pub outgoing_volume: f64,
38    /// Total incoming volume.
39    pub incoming_volume: f64,
40    /// Volume in circular flows.
41    pub circular_volume: f64,
42    /// Number of outgoing edges.
43    pub out_degree: u32,
44    /// Number of incoming edges.
45    pub in_degree: u32,
46    /// Whether entity is in an SCC.
47    pub in_scc: bool,
48}
49
50/// Circular flow state for Ring mode operations.
51#[derive(Debug, Clone, Default)]
52pub struct CircularFlowState {
53    /// Transaction graph: source -> [(dest, amount)]
54    pub graph: HashMap<u64, Vec<(u64, f64)>>,
55    /// Per-entity state.
56    pub entities: HashMap<u64, EntityCircularState>,
57    /// Cached SCCs.
58    pub sccs: Vec<Vec<u64>>,
59    /// Total transaction volume.
60    pub total_volume: f64,
61    /// Total circular volume.
62    pub circular_volume: f64,
63    /// Whether SCCs need recalculation.
64    pub sccs_stale: bool,
65}
66
67/// Circular flow detection kernel.
68///
69/// Detects circular transactions using Strongly Connected Components (SCC).
70/// High circular flow ratio indicates potential money laundering.
71#[derive(Debug)]
72pub struct CircularFlowRatio {
73    metadata: KernelMetadata,
74    /// Internal state for Ring mode operations.
75    state: std::sync::RwLock<CircularFlowState>,
76}
77
78impl Clone for CircularFlowRatio {
79    fn clone(&self) -> Self {
80        Self {
81            metadata: self.metadata.clone(),
82            state: std::sync::RwLock::new(self.state.read().unwrap().clone()),
83        }
84    }
85}
86
87impl Default for CircularFlowRatio {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl CircularFlowRatio {
94    /// Create a new circular flow detection kernel.
95    #[must_use]
96    pub fn new() -> Self {
97        Self {
98            metadata: KernelMetadata::ring("compliance/circular-flow", Domain::Compliance)
99                .with_description("Circular flow detection via SCC")
100                .with_throughput(50_000)
101                .with_latency_us(100.0),
102            state: std::sync::RwLock::new(CircularFlowState::default()),
103        }
104    }
105
106    /// Add an edge to the transaction graph.
107    /// Returns (cycle_detected, cycle_size, source_ratio).
108    pub fn add_edge(&self, source_id: u64, dest_id: u64, amount: f64) -> (bool, u32, f64) {
109        let mut state = self.state.write().unwrap();
110
111        // Add edge to graph
112        state
113            .graph
114            .entry(source_id)
115            .or_default()
116            .push((dest_id, amount));
117        state.total_volume += amount;
118        state.sccs_stale = true;
119
120        // Update entity states
121        let source_state = state.entities.entry(source_id).or_default();
122        source_state.outgoing_volume += amount;
123        source_state.out_degree += 1;
124
125        let dest_state = state.entities.entry(dest_id).or_default();
126        dest_state.incoming_volume += amount;
127        dest_state.in_degree += 1;
128
129        // Quick cycle check using DFS from dest back to source
130        let cycle_detected = Self::has_path(&state.graph, dest_id, source_id);
131
132        let cycle_size = if cycle_detected {
133            // Estimate cycle size (full SCC would be more accurate)
134            Self::estimate_cycle_size(&state.graph, source_id, dest_id)
135        } else {
136            0
137        };
138
139        let source_ratio = {
140            let s = state.entities.get(&source_id).unwrap();
141            if s.outgoing_volume > 0.0 {
142                s.circular_volume / s.outgoing_volume
143            } else {
144                0.0
145            }
146        };
147
148        (cycle_detected, cycle_size, source_ratio)
149    }
150
151    /// Query circular flow ratio for an entity.
152    pub fn query_entity(&self, entity_id: u64) -> (f64, u32, u64) {
153        let state = self.state.read().unwrap();
154
155        if let Some(entity_state) = state.entities.get(&entity_id) {
156            let ratio = if entity_state.outgoing_volume > 0.0 {
157                entity_state.circular_volume / entity_state.outgoing_volume
158            } else {
159                0.0
160            };
161            let scc_count = state
162                .sccs
163                .iter()
164                .filter(|scc| scc.contains(&entity_id))
165                .count() as u32;
166            let cycle_volume = (entity_state.circular_volume * 100_000_000.0) as u64;
167            (ratio, scc_count, cycle_volume)
168        } else {
169            (0.0, 0, 0)
170        }
171    }
172
173    /// Check if there's a path from src to dst in the graph.
174    fn has_path(graph: &HashMap<u64, Vec<(u64, f64)>>, src: u64, dst: u64) -> bool {
175        let mut visited = HashSet::new();
176        let mut stack = vec![src];
177
178        while let Some(node) = stack.pop() {
179            if node == dst {
180                return true;
181            }
182            if visited.insert(node) {
183                if let Some(neighbors) = graph.get(&node) {
184                    for &(neighbor, _) in neighbors {
185                        if !visited.contains(&neighbor) {
186                            stack.push(neighbor);
187                        }
188                    }
189                }
190            }
191        }
192        false
193    }
194
195    /// Estimate cycle size using BFS.
196    fn estimate_cycle_size(graph: &HashMap<u64, Vec<(u64, f64)>>, start: u64, end: u64) -> u32 {
197        let mut visited = HashSet::new();
198        let mut queue = std::collections::VecDeque::new();
199        queue.push_back((end, 1u32));
200
201        while let Some((node, depth)) = queue.pop_front() {
202            if node == start {
203                return depth;
204            }
205            if visited.insert(node) {
206                if let Some(neighbors) = graph.get(&node) {
207                    for &(neighbor, _) in neighbors {
208                        if !visited.contains(&neighbor) {
209                            queue.push_back((neighbor, depth + 1));
210                        }
211                    }
212                }
213            }
214        }
215        0
216    }
217
218    /// Detect circular flows in transactions.
219    ///
220    /// # Arguments
221    /// * `transactions` - List of transactions to analyze
222    /// * `min_amount` - Minimum total amount for a cycle to be flagged
223    pub fn compute(transactions: &[Transaction], min_amount: f64) -> CircularFlowResult {
224        if transactions.is_empty() {
225            return CircularFlowResult {
226                circular_ratio: 0.0,
227                sccs: Vec::new(),
228                circular_amount: 0.0,
229                total_amount: 0.0,
230            };
231        }
232
233        // Build transaction graph
234        let mut graph: HashMap<u64, Vec<(u64, f64)>> = HashMap::new();
235        let mut total_amount = 0.0;
236
237        for tx in transactions {
238            graph
239                .entry(tx.source_id)
240                .or_default()
241                .push((tx.dest_id, tx.amount));
242            total_amount += tx.amount;
243        }
244
245        // Find SCCs using Tarjan's algorithm
246        let sccs = Self::tarjan_scc(&graph);
247
248        // Calculate circular amount (amount in non-trivial SCCs)
249        let mut circular_amount = 0.0;
250        let mut significant_sccs = Vec::new();
251
252        for scc in &sccs {
253            if scc.len() > 1 {
254                // Calculate amount flowing within this SCC
255                let scc_set: HashSet<u64> = scc.iter().copied().collect();
256                let mut scc_amount = 0.0;
257
258                for tx in transactions {
259                    if scc_set.contains(&tx.source_id) && scc_set.contains(&tx.dest_id) {
260                        scc_amount += tx.amount;
261                    }
262                }
263
264                if scc_amount >= min_amount {
265                    circular_amount += scc_amount;
266                    significant_sccs.push(scc.clone());
267                }
268            }
269        }
270
271        let circular_ratio = if total_amount > 0.0 {
272            circular_amount / total_amount
273        } else {
274            0.0
275        };
276
277        CircularFlowResult {
278            circular_ratio,
279            sccs: significant_sccs,
280            circular_amount,
281            total_amount,
282        }
283    }
284
285    /// Tarjan's SCC algorithm.
286    fn tarjan_scc(graph: &HashMap<u64, Vec<(u64, f64)>>) -> Vec<Vec<u64>> {
287        let mut index_counter = 0u64;
288        let mut stack = Vec::new();
289        let mut on_stack: HashSet<u64> = HashSet::new();
290        let mut indices: HashMap<u64, u64> = HashMap::new();
291        let mut lowlinks: HashMap<u64, u64> = HashMap::new();
292        let mut sccs = Vec::new();
293
294        // Get all nodes
295        let mut nodes: HashSet<u64> = graph.keys().copied().collect();
296        for edges in graph.values() {
297            for (dest, _) in edges {
298                nodes.insert(*dest);
299            }
300        }
301
302        #[allow(clippy::too_many_arguments)]
303        fn strongconnect(
304            v: u64,
305            graph: &HashMap<u64, Vec<(u64, f64)>>,
306            index_counter: &mut u64,
307            stack: &mut Vec<u64>,
308            on_stack: &mut HashSet<u64>,
309            indices: &mut HashMap<u64, u64>,
310            lowlinks: &mut HashMap<u64, u64>,
311            sccs: &mut Vec<Vec<u64>>,
312        ) {
313            indices.insert(v, *index_counter);
314            lowlinks.insert(v, *index_counter);
315            *index_counter += 1;
316            stack.push(v);
317            on_stack.insert(v);
318
319            if let Some(neighbors) = graph.get(&v) {
320                for (w, _) in neighbors {
321                    if !indices.contains_key(w) {
322                        strongconnect(
323                            *w,
324                            graph,
325                            index_counter,
326                            stack,
327                            on_stack,
328                            indices,
329                            lowlinks,
330                            sccs,
331                        );
332                        let lowlink_w = lowlinks[w];
333                        if let Some(ll) = lowlinks.get_mut(&v) {
334                            *ll = (*ll).min(lowlink_w);
335                        }
336                    } else if on_stack.contains(w) {
337                        let index_w = indices[w];
338                        if let Some(ll) = lowlinks.get_mut(&v) {
339                            *ll = (*ll).min(index_w);
340                        }
341                    }
342                }
343            }
344
345            if lowlinks[&v] == indices[&v] {
346                let mut scc = Vec::new();
347                loop {
348                    let w = stack.pop().unwrap();
349                    on_stack.remove(&w);
350                    scc.push(w);
351                    if w == v {
352                        break;
353                    }
354                }
355                sccs.push(scc);
356            }
357        }
358
359        for node in nodes {
360            if !indices.contains_key(&node) {
361                strongconnect(
362                    node,
363                    graph,
364                    &mut index_counter,
365                    &mut stack,
366                    &mut on_stack,
367                    &mut indices,
368                    &mut lowlinks,
369                    &mut sccs,
370                );
371            }
372        }
373
374        sccs
375    }
376}
377
378impl GpuKernel for CircularFlowRatio {
379    fn metadata(&self) -> &KernelMetadata {
380        &self.metadata
381    }
382}
383
384#[async_trait]
385impl BatchKernel<CircularFlowInput, CircularFlowOutput> for CircularFlowRatio {
386    async fn execute(&self, input: CircularFlowInput) -> Result<CircularFlowOutput> {
387        let start = Instant::now();
388        let result = Self::compute(&input.transactions, input.min_amount);
389        Ok(CircularFlowOutput {
390            result,
391            compute_time_us: start.elapsed().as_micros() as u64,
392        })
393    }
394}
395
396// ----------------------------------------------------------------------------
397// Ring Kernel Handler Implementation for CircularFlowRatio
398// ----------------------------------------------------------------------------
399
400/// Ring handler for adding edges to the transaction graph.
401#[async_trait]
402impl RingKernelHandler<AddGraphEdgeRing, AddGraphEdgeResponse> for CircularFlowRatio {
403    async fn handle(
404        &self,
405        _ctx: &mut RingContext,
406        msg: AddGraphEdgeRing,
407    ) -> Result<AddGraphEdgeResponse> {
408        // Add edge to internal graph state
409        let amount = msg.amount as f64 / 100_000_000.0;
410        let (cycle_detected, cycle_size, source_ratio) =
411            self.add_edge(msg.source_id, msg.dest_id, amount);
412
413        Ok(AddGraphEdgeResponse {
414            correlation_id: msg.correlation_id,
415            cycle_detected,
416            cycle_size,
417            source_ratio: source_ratio as f32,
418        })
419    }
420}
421
422/// Ring handler for querying circular flow ratio.
423#[async_trait]
424impl RingKernelHandler<QueryCircularRatioRing, QueryCircularRatioResponse> for CircularFlowRatio {
425    async fn handle(
426        &self,
427        _ctx: &mut RingContext,
428        msg: QueryCircularRatioRing,
429    ) -> Result<QueryCircularRatioResponse> {
430        // Query internal state for this entity
431        let (ratio, scc_count, cycle_volume) = self.query_entity(msg.entity_id);
432
433        Ok(QueryCircularRatioResponse {
434            correlation_id: msg.correlation_id,
435            entity_id: msg.entity_id,
436            ratio: ratio as f32,
437            scc_count,
438            cycle_volume: cycle_volume as i64,
439        })
440    }
441}
442
443// ============================================================================
444// Reciprocity Flow Ratio Kernel
445// ============================================================================
446
447/// Reciprocity flow detection kernel.
448///
449/// Detects mutual/reciprocal transactions between entities.
450/// High reciprocity can indicate layering or round-tripping.
451#[derive(Debug, Clone)]
452pub struct ReciprocityFlowRatio {
453    metadata: KernelMetadata,
454}
455
456impl Default for ReciprocityFlowRatio {
457    fn default() -> Self {
458        Self::new()
459    }
460}
461
462impl ReciprocityFlowRatio {
463    /// Create a new reciprocity detection kernel.
464    #[must_use]
465    pub fn new() -> Self {
466        Self {
467            metadata: KernelMetadata::ring("compliance/reciprocity-flow", Domain::Compliance)
468                .with_description("Reciprocal transaction detection")
469                .with_throughput(100_000)
470                .with_latency_us(50.0),
471        }
472    }
473
474    /// Detect reciprocal transactions.
475    ///
476    /// # Arguments
477    /// * `transactions` - List of transactions
478    /// * `window` - Time window for reciprocity (None = all time)
479    /// * `min_amount` - Minimum amount to consider
480    pub fn compute(
481        transactions: &[Transaction],
482        window: Option<TimeWindow>,
483        min_amount: f64,
484    ) -> ReciprocityResult {
485        if transactions.is_empty() {
486            return ReciprocityResult {
487                reciprocity_ratio: 0.0,
488                reciprocal_pairs: Vec::new(),
489                reciprocal_amount: 0.0,
490            };
491        }
492
493        // Filter by time window if specified
494        let txs: Vec<&Transaction> = transactions
495            .iter()
496            .filter(|tx| window.map(|w| w.contains(tx.timestamp)).unwrap_or(true))
497            .collect();
498
499        // Build directed edge map: (src, dst) -> total amount
500        let mut edge_amounts: HashMap<(u64, u64), f64> = HashMap::new();
501
502        for tx in &txs {
503            *edge_amounts.entry((tx.source_id, tx.dest_id)).or_default() += tx.amount;
504        }
505
506        // Find reciprocal pairs
507        let mut reciprocal_pairs = Vec::new();
508        let mut reciprocal_amount = 0.0;
509        let mut processed: HashSet<(u64, u64)> = HashSet::new();
510
511        for (&(src, dst), &amount) in &edge_amounts {
512            if processed.contains(&(src, dst)) || processed.contains(&(dst, src)) {
513                continue;
514            }
515
516            if let Some(&reverse_amount) = edge_amounts.get(&(dst, src)) {
517                let min_reciprocal = amount.min(reverse_amount);
518                if min_reciprocal >= min_amount {
519                    reciprocal_pairs.push((src, dst));
520                    reciprocal_amount += min_reciprocal * 2.0;
521                    processed.insert((src, dst));
522                    processed.insert((dst, src));
523                }
524            }
525        }
526
527        let total_amount: f64 = txs.iter().map(|tx| tx.amount).sum();
528        let reciprocity_ratio = if total_amount > 0.0 {
529            reciprocal_amount / total_amount
530        } else {
531            0.0
532        };
533
534        ReciprocityResult {
535            reciprocity_ratio,
536            reciprocal_pairs,
537            reciprocal_amount,
538        }
539    }
540}
541
542impl GpuKernel for ReciprocityFlowRatio {
543    fn metadata(&self) -> &KernelMetadata {
544        &self.metadata
545    }
546}
547
548#[async_trait]
549impl BatchKernel<ReciprocityFlowInput, ReciprocityFlowOutput> for ReciprocityFlowRatio {
550    async fn execute(&self, input: ReciprocityFlowInput) -> Result<ReciprocityFlowOutput> {
551        let start = Instant::now();
552        let result = Self::compute(&input.transactions, input.window, input.min_amount);
553        Ok(ReciprocityFlowOutput {
554            result,
555            compute_time_us: start.elapsed().as_micros() as u64,
556        })
557    }
558}
559
560// ============================================================================
561// Rapid Movement Kernel
562// ============================================================================
563
564/// Rapid movement (velocity) analysis kernel.
565///
566/// Detects accounts with unusually high transaction velocity,
567/// which can indicate structuring or rapid movement schemes.
568#[derive(Debug, Clone)]
569pub struct RapidMovement {
570    metadata: KernelMetadata,
571}
572
573impl Default for RapidMovement {
574    fn default() -> Self {
575        Self::new()
576    }
577}
578
579impl RapidMovement {
580    /// Create a new rapid movement kernel.
581    #[must_use]
582    pub fn new() -> Self {
583        Self {
584            metadata: KernelMetadata::ring("compliance/rapid-movement", Domain::Compliance)
585                .with_description("Velocity analysis for rapid movement detection")
586                .with_throughput(200_000)
587                .with_latency_us(20.0),
588        }
589    }
590
591    /// Detect rapid movement patterns.
592    ///
593    /// # Arguments
594    /// * `transactions` - List of transactions
595    /// * `window_hours` - Time window in hours for velocity calculation
596    /// * `velocity_threshold` - Transactions per hour threshold
597    /// * `amount_threshold` - Minimum amount threshold
598    pub fn compute(
599        transactions: &[Transaction],
600        window_hours: f64,
601        velocity_threshold: f64,
602        amount_threshold: f64,
603    ) -> RapidMovementResult {
604        if transactions.is_empty() || window_hours <= 0.0 {
605            return RapidMovementResult {
606                flagged_entities: Vec::new(),
607                velocity_metrics: Vec::new(),
608                rapid_amount: 0.0,
609            };
610        }
611
612        let window_seconds = (window_hours * 3600.0) as u64;
613
614        // Group transactions by entity (both as source and dest)
615        let mut entity_txs: HashMap<u64, Vec<&Transaction>> = HashMap::new();
616
617        for tx in transactions {
618            entity_txs.entry(tx.source_id).or_default().push(tx);
619            entity_txs.entry(tx.dest_id).or_default().push(tx);
620        }
621
622        let mut flagged_entities = Vec::new();
623        let mut velocity_metrics = Vec::new();
624        let mut rapid_amount = 0.0;
625
626        for (entity_id, txs) in entity_txs {
627            if txs.is_empty() {
628                continue;
629            }
630
631            // Sort by timestamp
632            let mut sorted_txs: Vec<_> = txs.into_iter().collect();
633            sorted_txs.sort_by_key(|tx| tx.timestamp);
634
635            // Calculate velocity using sliding window
636            let mut max_velocity = 0.0f64;
637            let mut max_window_amount = 0.0f64;
638
639            for (i, tx) in sorted_txs.iter().enumerate() {
640                let window_start = tx.timestamp;
641                let window_end = window_start + window_seconds;
642
643                let window_txs: Vec<_> = sorted_txs[i..]
644                    .iter()
645                    .take_while(|t| t.timestamp < window_end)
646                    .collect();
647
648                let count = window_txs.len();
649                let velocity = count as f64 / window_hours;
650                let window_amount: f64 = window_txs.iter().map(|t| t.amount).sum();
651
652                if velocity > max_velocity {
653                    max_velocity = velocity;
654                    max_window_amount = window_amount;
655                }
656            }
657
658            velocity_metrics.push((entity_id, max_velocity));
659
660            if max_velocity >= velocity_threshold && max_window_amount >= amount_threshold {
661                flagged_entities.push(entity_id);
662                rapid_amount += max_window_amount;
663            }
664        }
665
666        // Sort by velocity descending
667        velocity_metrics.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
668
669        RapidMovementResult {
670            flagged_entities,
671            velocity_metrics,
672            rapid_amount,
673        }
674    }
675}
676
677impl GpuKernel for RapidMovement {
678    fn metadata(&self) -> &KernelMetadata {
679        &self.metadata
680    }
681}
682
683#[async_trait]
684impl BatchKernel<RapidMovementInput, RapidMovementOutput> for RapidMovement {
685    async fn execute(&self, input: RapidMovementInput) -> Result<RapidMovementOutput> {
686        let start = Instant::now();
687        let result = Self::compute(
688            &input.transactions,
689            input.window_hours,
690            input.velocity_threshold,
691            input.amount_threshold,
692        );
693        Ok(RapidMovementOutput {
694            result,
695            compute_time_us: start.elapsed().as_micros() as u64,
696        })
697    }
698}
699
700// ============================================================================
701// AML Pattern Detection Kernel
702// ============================================================================
703
704/// Multi-pattern AML detection kernel.
705///
706/// Detects various AML patterns including structuring, layering,
707/// funnel accounts, and fan-out patterns.
708#[derive(Debug, Clone)]
709pub struct AMLPatternDetection {
710    metadata: KernelMetadata,
711}
712
713impl Default for AMLPatternDetection {
714    fn default() -> Self {
715        Self::new()
716    }
717}
718
719impl AMLPatternDetection {
720    /// Create a new AML pattern detection kernel.
721    #[must_use]
722    pub fn new() -> Self {
723        Self {
724            metadata: KernelMetadata::ring("compliance/aml-patterns", Domain::Compliance)
725                .with_description("Multi-pattern AML detection")
726                .with_throughput(30_000)
727                .with_latency_us(200.0),
728        }
729    }
730
731    /// Detect AML patterns in transactions.
732    ///
733    /// # Arguments
734    /// * `transactions` - List of transactions
735    /// * `structuring_threshold` - Amount threshold for structuring (e.g., 10000)
736    /// * `structuring_window_hours` - Time window for structuring detection
737    pub fn compute(
738        transactions: &[Transaction],
739        structuring_threshold: f64,
740        structuring_window_hours: f64,
741    ) -> AMLPatternResult {
742        if transactions.is_empty() {
743            return AMLPatternResult {
744                patterns: Vec::new(),
745                risk_score: 0.0,
746                pattern_details: Vec::new(),
747            };
748        }
749
750        let mut patterns = Vec::new();
751        let mut pattern_details = Vec::new();
752        let mut total_risk_score = 0.0;
753
754        // Detect structuring
755        let structuring = Self::detect_structuring(
756            transactions,
757            structuring_threshold,
758            structuring_window_hours,
759        );
760        if !structuring.is_empty() {
761            for detail in structuring {
762                total_risk_score += detail.confidence * 25.0;
763                patterns.push((AMLPattern::Structuring, detail.entities.clone()));
764                pattern_details.push(detail);
765            }
766        }
767
768        // Detect funnel accounts
769        let funnels = Self::detect_funnel_accounts(transactions);
770        if !funnels.is_empty() {
771            for detail in funnels {
772                total_risk_score += detail.confidence * 20.0;
773                patterns.push((AMLPattern::FunnelAccount, detail.entities.clone()));
774                pattern_details.push(detail);
775            }
776        }
777
778        // Detect fan-out patterns
779        let fanouts = Self::detect_fan_out(transactions);
780        if !fanouts.is_empty() {
781            for detail in fanouts {
782                total_risk_score += detail.confidence * 15.0;
783                patterns.push((AMLPattern::FanOut, detail.entities.clone()));
784                pattern_details.push(detail);
785            }
786        }
787
788        let risk_score = total_risk_score.min(100.0);
789
790        AMLPatternResult {
791            patterns,
792            risk_score,
793            pattern_details,
794        }
795    }
796
797    /// Detect structuring (smurfing) patterns.
798    fn detect_structuring(
799        transactions: &[Transaction],
800        threshold: f64,
801        window_hours: f64,
802    ) -> Vec<PatternDetail> {
803        let window_seconds = (window_hours * 3600.0) as u64;
804        let mut results = Vec::new();
805
806        // Group by source entity
807        let mut by_source: HashMap<u64, Vec<&Transaction>> = HashMap::new();
808        for tx in transactions {
809            by_source.entry(tx.source_id).or_default().push(tx);
810        }
811
812        for (source_id, txs) in by_source {
813            if txs.len() < 3 {
814                continue;
815            }
816
817            let mut sorted_txs: Vec<_> = txs.into_iter().collect();
818            sorted_txs.sort_by_key(|tx| tx.timestamp);
819
820            // Look for multiple transactions just under threshold
821            for (i, tx) in sorted_txs.iter().enumerate() {
822                let window_end = tx.timestamp + window_seconds;
823
824                let window_txs: Vec<_> = sorted_txs[i..]
825                    .iter()
826                    .take_while(|t| t.timestamp < window_end)
827                    .filter(|t| t.amount < threshold && t.amount > threshold * 0.7)
828                    .collect();
829
830                if window_txs.len() >= 3 {
831                    let total_amount: f64 = window_txs.iter().map(|t| t.amount).sum();
832                    if total_amount > threshold {
833                        let confidence = (window_txs.len() as f64 / 10.0).min(1.0);
834                        let dests: HashSet<u64> = window_txs.iter().map(|t| t.dest_id).collect();
835                        let mut entities = vec![source_id];
836                        entities.extend(dests);
837
838                        results.push(PatternDetail {
839                            pattern: AMLPattern::Structuring,
840                            entities,
841                            amount: total_amount,
842                            confidence,
843                            time_span: TimeWindow::new(
844                                tx.timestamp,
845                                window_txs
846                                    .last()
847                                    .map(|t| t.timestamp)
848                                    .unwrap_or(tx.timestamp),
849                            ),
850                        });
851                        break; // One pattern per source
852                    }
853                }
854            }
855        }
856
857        results
858    }
859
860    /// Detect funnel accounts (many sources to one destination).
861    fn detect_funnel_accounts(transactions: &[Transaction]) -> Vec<PatternDetail> {
862        let mut results = Vec::new();
863
864        // Count incoming transactions per destination
865        let mut incoming: HashMap<u64, Vec<&Transaction>> = HashMap::new();
866        for tx in transactions {
867            incoming.entry(tx.dest_id).or_default().push(tx);
868        }
869
870        for (dest_id, txs) in incoming {
871            let unique_sources: HashSet<u64> = txs.iter().map(|tx| tx.source_id).collect();
872
873            // Flag if many sources funnel to one destination
874            if unique_sources.len() >= 5 {
875                let total_amount: f64 = txs.iter().map(|t| t.amount).sum();
876                let confidence = (unique_sources.len() as f64 / 20.0).min(1.0);
877
878                let mut entities = vec![dest_id];
879                entities.extend(unique_sources.iter().take(10));
880
881                let timestamps: Vec<u64> = txs.iter().map(|t| t.timestamp).collect();
882
883                results.push(PatternDetail {
884                    pattern: AMLPattern::FunnelAccount,
885                    entities,
886                    amount: total_amount,
887                    confidence,
888                    time_span: TimeWindow::new(
889                        *timestamps.iter().min().unwrap_or(&0),
890                        *timestamps.iter().max().unwrap_or(&0),
891                    ),
892                });
893            }
894        }
895
896        results
897    }
898
899    /// Detect fan-out patterns (one source to many destinations).
900    fn detect_fan_out(transactions: &[Transaction]) -> Vec<PatternDetail> {
901        let mut results = Vec::new();
902
903        // Count outgoing transactions per source
904        let mut outgoing: HashMap<u64, Vec<&Transaction>> = HashMap::new();
905        for tx in transactions {
906            outgoing.entry(tx.source_id).or_default().push(tx);
907        }
908
909        for (source_id, txs) in outgoing {
910            let unique_dests: HashSet<u64> = txs.iter().map(|tx| tx.dest_id).collect();
911
912            // Flag if one source fans out to many destinations
913            if unique_dests.len() >= 5 {
914                let total_amount: f64 = txs.iter().map(|t| t.amount).sum();
915                let confidence = (unique_dests.len() as f64 / 20.0).min(1.0);
916
917                let mut entities = vec![source_id];
918                entities.extend(unique_dests.iter().take(10));
919
920                let timestamps: Vec<u64> = txs.iter().map(|t| t.timestamp).collect();
921
922                results.push(PatternDetail {
923                    pattern: AMLPattern::FanOut,
924                    entities,
925                    amount: total_amount,
926                    confidence,
927                    time_span: TimeWindow::new(
928                        *timestamps.iter().min().unwrap_or(&0),
929                        *timestamps.iter().max().unwrap_or(&0),
930                    ),
931                });
932            }
933        }
934
935        results
936    }
937}
938
939impl GpuKernel for AMLPatternDetection {
940    fn metadata(&self) -> &KernelMetadata {
941        &self.metadata
942    }
943}
944
945#[async_trait]
946impl BatchKernel<AMLPatternInput, AMLPatternOutput> for AMLPatternDetection {
947    async fn execute(&self, input: AMLPatternInput) -> Result<AMLPatternOutput> {
948        let start = Instant::now();
949        let result = Self::compute(
950            &input.transactions,
951            input.structuring_threshold,
952            input.structuring_window_hours,
953        );
954        Ok(AMLPatternOutput {
955            result,
956            compute_time_us: start.elapsed().as_micros() as u64,
957        })
958    }
959}
960
961// ----------------------------------------------------------------------------
962// Ring Kernel Handler Implementation for AMLPatternDetection
963// ----------------------------------------------------------------------------
964
965/// Ring handler for pattern matching on streaming transactions.
966#[async_trait]
967impl RingKernelHandler<MatchPatternRing, MatchPatternResponse> for AMLPatternDetection {
968    async fn handle(
969        &self,
970        _ctx: &mut RingContext,
971        msg: MatchPatternRing,
972    ) -> Result<MatchPatternResponse> {
973        // Convert Ring message to domain Transaction
974        let transaction = Transaction {
975            id: msg.tx_id,
976            source_id: msg.source_id,
977            dest_id: msg.dest_id,
978            amount: msg.amount as f64 / 100_000_000.0,
979            timestamp: msg.timestamp,
980            currency: "USD".to_string(),
981            tx_type: match msg.tx_type {
982                0 => "wire",
983                1 => "ach",
984                2 => "check",
985                _ => "other",
986            }
987            .to_string(),
988        };
989
990        // Default thresholds for streaming analysis
991        let structuring_threshold = 10_000.0;
992        let structuring_window_hours = 24.0;
993
994        let result = Self::compute(
995            &[transaction],
996            structuring_threshold,
997            structuring_window_hours,
998        );
999
1000        // Build patterns_matched bitmask
1001        let mut patterns_matched = 0u64;
1002        for (pattern, _) in &result.patterns {
1003            let bit = match pattern {
1004                AMLPattern::Structuring => 0,
1005                AMLPattern::Layering => 1,
1006                AMLPattern::RapidMovement => 2,
1007                AMLPattern::RoundTripping => 3,
1008                AMLPattern::FunnelAccount => 4,
1009                AMLPattern::FanOut => 5,
1010            };
1011            patterns_matched |= 1u64 << bit;
1012        }
1013
1014        let max_score = result
1015            .pattern_details
1016            .iter()
1017            .map(|d| d.confidence)
1018            .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
1019            .unwrap_or(0.0) as f32;
1020
1021        Ok(MatchPatternResponse {
1022            correlation_id: msg.correlation_id,
1023            tx_id: msg.tx_id,
1024            patterns_matched,
1025            max_score,
1026            match_count: result.patterns.len() as u32,
1027        })
1028    }
1029}
1030
1031// ============================================================================
1032// Flow Reversal Pattern Kernel
1033// ============================================================================
1034
1035/// Flow reversal pattern detection kernel.
1036///
1037/// Detects transactions that are reversed (A->B followed by B->A).
1038/// Critical for detecting wash trading, round-tripping, and layering.
1039#[derive(Debug, Clone)]
1040pub struct FlowReversalPattern {
1041    metadata: KernelMetadata,
1042}
1043
1044impl Default for FlowReversalPattern {
1045    fn default() -> Self {
1046        Self::new()
1047    }
1048}
1049
1050impl FlowReversalPattern {
1051    /// Create a new flow reversal pattern kernel.
1052    #[must_use]
1053    pub fn new() -> Self {
1054        Self {
1055            metadata: KernelMetadata::batch("compliance/flow-reversal", Domain::Compliance)
1056                .with_description("Transaction reversal pattern detection")
1057                .with_throughput(80_000)
1058                .with_latency_us(50.0),
1059        }
1060    }
1061
1062    /// Detect flow reversal patterns.
1063    ///
1064    /// # Arguments
1065    /// * `transactions` - List of transactions to analyze
1066    /// * `config` - Reversal detection configuration
1067    pub fn compute(
1068        transactions: &[Transaction],
1069        config: &FlowReversalConfig,
1070    ) -> crate::types::FlowReversalResult {
1071        use crate::types::{FlowReversalPair, FlowReversalResult, ReversalRiskLevel};
1072
1073        if transactions.is_empty() {
1074            return FlowReversalResult {
1075                reversals: Vec::new(),
1076                reversal_volume: 0.0,
1077                reversal_ratio: 0.0,
1078                repeat_offenders: Vec::new(),
1079                risk_score: 0.0,
1080            };
1081        }
1082
1083        // Sort transactions by timestamp
1084        let mut sorted_txs: Vec<_> = transactions.iter().collect();
1085        sorted_txs.sort_by_key(|tx| tx.timestamp);
1086
1087        // Build edge map for quick lookup: (src, dst) -> Vec<(tx_id, amount, timestamp)>
1088        #[allow(clippy::type_complexity)]
1089        let mut forward_edges: HashMap<(u64, u64), Vec<(u64, f64, u64)>> = HashMap::new();
1090        for tx in &sorted_txs {
1091            forward_edges
1092                .entry((tx.source_id, tx.dest_id))
1093                .or_default()
1094                .push((tx.id, tx.amount, tx.timestamp));
1095        }
1096
1097        let mut reversals = Vec::new();
1098        let mut processed_pairs: HashSet<(u64, u64)> = HashSet::new();
1099        let mut entity_reversal_count: HashMap<u64, u32> = HashMap::new();
1100
1101        // For each transaction A->B, look for B->A within the time window
1102        for tx in &sorted_txs {
1103            let reverse_key = (tx.dest_id, tx.source_id);
1104
1105            if let Some(reverse_txs) = forward_edges.get(&reverse_key) {
1106                for &(rev_id, rev_amount, rev_timestamp) in reverse_txs {
1107                    // Skip if already processed this pair
1108                    if processed_pairs.contains(&(tx.id, rev_id))
1109                        || processed_pairs.contains(&(rev_id, tx.id))
1110                    {
1111                        continue;
1112                    }
1113
1114                    // Check time window (reversal must come after original)
1115                    if rev_timestamp <= tx.timestamp {
1116                        continue;
1117                    }
1118
1119                    let time_delta = rev_timestamp - tx.timestamp;
1120                    if time_delta > config.max_window_seconds {
1121                        continue;
1122                    }
1123
1124                    // Check amount tolerance
1125                    let amount_ratio = if tx.amount > 0.0 {
1126                        (rev_amount / tx.amount).min(tx.amount / rev_amount)
1127                    } else {
1128                        0.0
1129                    };
1130
1131                    if amount_ratio < config.min_amount_match_ratio {
1132                        continue;
1133                    }
1134
1135                    // Calculate risk level
1136                    let risk_level = Self::calculate_risk_level(time_delta, amount_ratio, config);
1137
1138                    reversals.push(FlowReversalPair {
1139                        original_tx_id: tx.id,
1140                        reversal_tx_id: rev_id,
1141                        entity_a: tx.source_id,
1142                        entity_b: tx.dest_id,
1143                        original_amount: tx.amount,
1144                        reversal_amount: rev_amount,
1145                        time_delta,
1146                        amount_match_ratio: amount_ratio,
1147                        risk_level,
1148                    });
1149
1150                    processed_pairs.insert((tx.id, rev_id));
1151
1152                    // Track repeat offenders
1153                    *entity_reversal_count.entry(tx.source_id).or_insert(0) += 1;
1154                    *entity_reversal_count.entry(tx.dest_id).or_insert(0) += 1;
1155                }
1156            }
1157        }
1158
1159        // Calculate metrics
1160        let reversal_volume: f64 = reversals
1161            .iter()
1162            .map(|r| r.original_amount + r.reversal_amount)
1163            .sum();
1164        let total_volume: f64 = transactions.iter().map(|tx| tx.amount).sum();
1165        let reversal_ratio = if total_volume > 0.0 {
1166            reversal_volume / total_volume
1167        } else {
1168            0.0
1169        };
1170
1171        // Find repeat offenders (entities with 2+ reversals)
1172        let mut repeat_offenders: Vec<_> = entity_reversal_count
1173            .into_iter()
1174            .filter(|(_, count)| *count >= 2)
1175            .collect();
1176        repeat_offenders.sort_by(|a, b| b.1.cmp(&a.1));
1177
1178        // Calculate overall risk score
1179        let high_risk_count = reversals
1180            .iter()
1181            .filter(|r| {
1182                matches!(
1183                    r.risk_level,
1184                    ReversalRiskLevel::High | ReversalRiskLevel::Critical
1185                )
1186            })
1187            .count();
1188
1189        let risk_score = ((reversals.len() as f64 * 5.0)
1190            + (high_risk_count as f64 * 15.0)
1191            + (repeat_offenders.len() as f64 * 10.0)
1192            + (reversal_ratio * 50.0))
1193            .min(100.0);
1194
1195        FlowReversalResult {
1196            reversals,
1197            reversal_volume,
1198            reversal_ratio,
1199            repeat_offenders,
1200            risk_score,
1201        }
1202    }
1203
1204    /// Calculate risk level for a reversal.
1205    fn calculate_risk_level(
1206        time_delta: u64,
1207        amount_ratio: f64,
1208        config: &FlowReversalConfig,
1209    ) -> crate::types::ReversalRiskLevel {
1210        use crate::types::ReversalRiskLevel;
1211
1212        let is_exact_match = amount_ratio >= 0.99;
1213        let is_fast = time_delta < config.suspicious_window_seconds;
1214        let is_very_fast = time_delta < config.critical_window_seconds;
1215
1216        match (is_very_fast, is_fast, is_exact_match) {
1217            (true, _, true) => ReversalRiskLevel::Critical,
1218            (true, _, _) => ReversalRiskLevel::High,
1219            (_, true, true) => ReversalRiskLevel::High,
1220            (_, true, _) => ReversalRiskLevel::Suspicious,
1221            (_, _, true) => ReversalRiskLevel::Suspicious,
1222            _ => ReversalRiskLevel::Normal,
1223        }
1224    }
1225}
1226
1227impl GpuKernel for FlowReversalPattern {
1228    fn metadata(&self) -> &KernelMetadata {
1229        &self.metadata
1230    }
1231}
1232
1233/// Configuration for flow reversal detection.
1234#[derive(Debug, Clone)]
1235pub struct FlowReversalConfig {
1236    /// Maximum time window to consider reversals (seconds).
1237    pub max_window_seconds: u64,
1238    /// Time threshold for suspicious reversals (seconds).
1239    pub suspicious_window_seconds: u64,
1240    /// Time threshold for critical reversals (seconds).
1241    pub critical_window_seconds: u64,
1242    /// Minimum amount match ratio (0-1) to consider a reversal.
1243    pub min_amount_match_ratio: f64,
1244}
1245
1246impl Default for FlowReversalConfig {
1247    fn default() -> Self {
1248        Self {
1249            max_window_seconds: 86400,       // 24 hours
1250            suspicious_window_seconds: 3600, // 1 hour
1251            critical_window_seconds: 300,    // 5 minutes
1252            min_amount_match_ratio: 0.9,     // 90% match
1253        }
1254    }
1255}
1256
1257// ============================================================================
1258// Flow Split Ratio Kernel
1259// ============================================================================
1260
1261/// Flow split ratio detection kernel.
1262///
1263/// Detects structuring patterns where transactions are split to avoid
1264/// reporting thresholds (e.g., BSA $10,000 threshold).
1265#[derive(Debug, Clone)]
1266pub struct FlowSplitRatio {
1267    metadata: KernelMetadata,
1268}
1269
1270impl Default for FlowSplitRatio {
1271    fn default() -> Self {
1272        Self::new()
1273    }
1274}
1275
1276impl FlowSplitRatio {
1277    /// Create a new flow split ratio kernel.
1278    #[must_use]
1279    pub fn new() -> Self {
1280        Self {
1281            metadata: KernelMetadata::batch("compliance/flow-split", Domain::Compliance)
1282                .with_description("Transaction splitting and structuring detection")
1283                .with_throughput(60_000)
1284                .with_latency_us(80.0),
1285        }
1286    }
1287
1288    /// Detect flow split patterns.
1289    ///
1290    /// # Arguments
1291    /// * `transactions` - List of transactions to analyze
1292    /// * `config` - Split detection configuration
1293    pub fn compute(
1294        transactions: &[Transaction],
1295        config: &FlowSplitConfig,
1296    ) -> crate::types::FlowSplitResult {
1297        use crate::types::{FlowSplitPattern, FlowSplitResult, SplitRiskLevel};
1298
1299        if transactions.is_empty() {
1300            return FlowSplitResult {
1301                splits: Vec::new(),
1302                structuring_entities: Vec::new(),
1303                split_volume: 0.0,
1304                split_ratio: 0.0,
1305                risk_score: 0.0,
1306            };
1307        }
1308
1309        // Group transactions by source entity
1310        let mut by_source: HashMap<u64, Vec<&Transaction>> = HashMap::new();
1311        for tx in transactions {
1312            by_source.entry(tx.source_id).or_default().push(tx);
1313        }
1314
1315        let mut splits = Vec::new();
1316        let mut structuring_entities: HashSet<u64> = HashSet::new();
1317
1318        for (source_id, source_txs) in by_source {
1319            if source_txs.len() < config.min_split_count {
1320                continue;
1321            }
1322
1323            // Sort by timestamp
1324            let mut sorted_txs: Vec<_> = source_txs.into_iter().collect();
1325            sorted_txs.sort_by_key(|tx| tx.timestamp);
1326
1327            // Sliding window to find split patterns
1328            for start_idx in 0..sorted_txs.len() {
1329                let start_tx = sorted_txs[start_idx];
1330                let window_end_time = start_tx.timestamp + config.window_seconds;
1331
1332                // Collect transactions in window
1333                let window_txs: Vec<_> = sorted_txs[start_idx..]
1334                    .iter()
1335                    .take_while(|tx| tx.timestamp <= window_end_time)
1336                    .copied()
1337                    .collect();
1338
1339                if window_txs.len() < config.min_split_count {
1340                    continue;
1341                }
1342
1343                // Check if transactions are below threshold but sum near/above it
1344                let under_threshold: Vec<_> = window_txs
1345                    .iter()
1346                    .filter(|tx| tx.amount < config.reporting_threshold)
1347                    .copied()
1348                    .collect();
1349
1350                if under_threshold.len() < config.min_split_count {
1351                    continue;
1352                }
1353
1354                let total_amount: f64 = under_threshold.iter().map(|tx| tx.amount).sum();
1355
1356                // Check if total is near or above reporting threshold
1357                if total_amount < config.reporting_threshold * 0.8 {
1358                    continue;
1359                }
1360
1361                // Check for structuring indicators
1362                let amounts: Vec<f64> = under_threshold.iter().map(|tx| tx.amount).collect();
1363                let avg_amount = total_amount / amounts.len() as f64;
1364                let is_uniform = amounts
1365                    .iter()
1366                    .all(|a| (*a - avg_amount).abs() / avg_amount < 0.3);
1367                let near_threshold = amounts
1368                    .iter()
1369                    .filter(|a| **a > config.reporting_threshold * 0.7)
1370                    .count();
1371
1372                // Calculate risk level
1373                let risk_level = Self::calculate_risk_level(
1374                    total_amount,
1375                    &amounts,
1376                    is_uniform,
1377                    near_threshold,
1378                    config,
1379                );
1380
1381                if matches!(risk_level, SplitRiskLevel::Normal) {
1382                    continue;
1383                }
1384
1385                let time_span = under_threshold
1386                    .last()
1387                    .map(|tx| tx.timestamp)
1388                    .unwrap_or(start_tx.timestamp)
1389                    .saturating_sub(start_tx.timestamp);
1390
1391                let dest_entities: Vec<u64> = under_threshold
1392                    .iter()
1393                    .map(|tx| tx.dest_id)
1394                    .collect::<HashSet<_>>()
1395                    .into_iter()
1396                    .collect();
1397
1398                splits.push(FlowSplitPattern {
1399                    source_entity: source_id,
1400                    dest_entities,
1401                    transaction_ids: under_threshold.iter().map(|tx| tx.id).collect(),
1402                    amounts: amounts.clone(),
1403                    total_amount,
1404                    time_span,
1405                    estimated_threshold: config.reporting_threshold,
1406                    risk_level,
1407                });
1408
1409                if matches!(risk_level, SplitRiskLevel::High | SplitRiskLevel::Critical) {
1410                    structuring_entities.insert(source_id);
1411                }
1412            }
1413        }
1414
1415        // Deduplicate overlapping patterns (keep highest risk)
1416        let splits = Self::deduplicate_splits(splits);
1417
1418        // Calculate metrics
1419        let split_volume: f64 = splits.iter().map(|s| s.total_amount).sum();
1420        let total_volume: f64 = transactions.iter().map(|tx| tx.amount).sum();
1421        let split_ratio = if total_volume > 0.0 {
1422            split_volume / total_volume
1423        } else {
1424            0.0
1425        };
1426
1427        // Calculate risk score
1428        let critical_count = splits
1429            .iter()
1430            .filter(|s| matches!(s.risk_level, SplitRiskLevel::Critical))
1431            .count();
1432        let high_count = splits
1433            .iter()
1434            .filter(|s| matches!(s.risk_level, SplitRiskLevel::High))
1435            .count();
1436
1437        let risk_score = ((splits.len() as f64 * 5.0)
1438            + (high_count as f64 * 15.0)
1439            + (critical_count as f64 * 25.0)
1440            + (structuring_entities.len() as f64 * 10.0))
1441            .min(100.0);
1442
1443        FlowSplitResult {
1444            splits,
1445            structuring_entities: structuring_entities.into_iter().collect(),
1446            split_volume,
1447            split_ratio,
1448            risk_score,
1449        }
1450    }
1451
1452    /// Calculate risk level for a split pattern.
1453    fn calculate_risk_level(
1454        total_amount: f64,
1455        amounts: &[f64],
1456        is_uniform: bool,
1457        near_threshold_count: usize,
1458        config: &FlowSplitConfig,
1459    ) -> crate::types::SplitRiskLevel {
1460        use crate::types::SplitRiskLevel;
1461
1462        let threshold = config.reporting_threshold;
1463        let just_under = total_amount >= threshold * 0.95 && total_amount <= threshold * 1.05;
1464        let multiple_near = near_threshold_count >= 3;
1465
1466        match (just_under, multiple_near, is_uniform) {
1467            (true, true, true) => SplitRiskLevel::Critical,
1468            (true, true, _) => SplitRiskLevel::Critical,
1469            (true, _, true) => SplitRiskLevel::High,
1470            (true, _, _) => SplitRiskLevel::High,
1471            (_, true, _) => SplitRiskLevel::High,
1472            _ if amounts.len() >= 5 && total_amount > threshold => SplitRiskLevel::Elevated,
1473            _ => SplitRiskLevel::Normal,
1474        }
1475    }
1476
1477    /// Deduplicate overlapping split patterns.
1478    fn deduplicate_splits(
1479        mut splits: Vec<crate::types::FlowSplitPattern>,
1480    ) -> Vec<crate::types::FlowSplitPattern> {
1481        // Sort by risk level (highest first), then by total amount
1482        splits.sort_by(|a, b| {
1483            let risk_ord =
1484                Self::risk_level_ord(&b.risk_level).cmp(&Self::risk_level_ord(&a.risk_level));
1485            if risk_ord == std::cmp::Ordering::Equal {
1486                b.total_amount
1487                    .partial_cmp(&a.total_amount)
1488                    .unwrap_or(std::cmp::Ordering::Equal)
1489            } else {
1490                risk_ord
1491            }
1492        });
1493
1494        let mut kept: Vec<crate::types::FlowSplitPattern> = Vec::new();
1495        let mut used_tx_ids: HashSet<u64> = HashSet::new();
1496
1497        for split in splits {
1498            // Check overlap with already kept patterns
1499            let overlap = split
1500                .transaction_ids
1501                .iter()
1502                .filter(|id| used_tx_ids.contains(id))
1503                .count();
1504
1505            // Keep if less than 50% overlap
1506            if overlap < split.transaction_ids.len() / 2 {
1507                for id in &split.transaction_ids {
1508                    used_tx_ids.insert(*id);
1509                }
1510                kept.push(split);
1511            }
1512        }
1513
1514        kept
1515    }
1516
1517    /// Convert risk level to ordinal for sorting.
1518    fn risk_level_ord(level: &crate::types::SplitRiskLevel) -> u8 {
1519        use crate::types::SplitRiskLevel;
1520        match level {
1521            SplitRiskLevel::Normal => 0,
1522            SplitRiskLevel::Elevated => 1,
1523            SplitRiskLevel::High => 2,
1524            SplitRiskLevel::Critical => 3,
1525        }
1526    }
1527}
1528
1529impl GpuKernel for FlowSplitRatio {
1530    fn metadata(&self) -> &KernelMetadata {
1531        &self.metadata
1532    }
1533}
1534
1535/// Configuration for flow split detection.
1536#[derive(Debug, Clone)]
1537pub struct FlowSplitConfig {
1538    /// Reporting threshold to detect structuring around (e.g., $10,000).
1539    pub reporting_threshold: f64,
1540    /// Time window to look for split transactions (seconds).
1541    pub window_seconds: u64,
1542    /// Minimum number of transactions to constitute a split.
1543    pub min_split_count: usize,
1544}
1545
1546impl Default for FlowSplitConfig {
1547    fn default() -> Self {
1548        Self {
1549            reporting_threshold: 10_000.0, // BSA threshold
1550            window_seconds: 86400,         // 24 hours
1551            min_split_count: 3,
1552        }
1553    }
1554}
1555
1556#[cfg(test)]
1557mod tests {
1558    use super::*;
1559
1560    fn create_circular_transactions() -> Vec<Transaction> {
1561        // A -> B -> C -> A circular flow
1562        vec![
1563            Transaction {
1564                id: 1,
1565                source_id: 1,
1566                dest_id: 2,
1567                amount: 1000.0,
1568                timestamp: 100,
1569                currency: "USD".to_string(),
1570                tx_type: "wire".to_string(),
1571            },
1572            Transaction {
1573                id: 2,
1574                source_id: 2,
1575                dest_id: 3,
1576                amount: 950.0,
1577                timestamp: 200,
1578                currency: "USD".to_string(),
1579                tx_type: "wire".to_string(),
1580            },
1581            Transaction {
1582                id: 3,
1583                source_id: 3,
1584                dest_id: 1,
1585                amount: 900.0,
1586                timestamp: 300,
1587                currency: "USD".to_string(),
1588                tx_type: "wire".to_string(),
1589            },
1590        ]
1591    }
1592
1593    fn create_reciprocal_transactions() -> Vec<Transaction> {
1594        vec![
1595            Transaction {
1596                id: 1,
1597                source_id: 1,
1598                dest_id: 2,
1599                amount: 5000.0,
1600                timestamp: 100,
1601                currency: "USD".to_string(),
1602                tx_type: "wire".to_string(),
1603            },
1604            Transaction {
1605                id: 2,
1606                source_id: 2,
1607                dest_id: 1,
1608                amount: 4800.0,
1609                timestamp: 200,
1610                currency: "USD".to_string(),
1611                tx_type: "wire".to_string(),
1612            },
1613        ]
1614    }
1615
1616    #[test]
1617    fn test_circular_flow_metadata() {
1618        let kernel = CircularFlowRatio::new();
1619        assert_eq!(kernel.metadata().id, "compliance/circular-flow");
1620        assert_eq!(kernel.metadata().domain, Domain::Compliance);
1621    }
1622
1623    #[test]
1624    fn test_circular_flow_detection() {
1625        let txs = create_circular_transactions();
1626        let result = CircularFlowRatio::compute(&txs, 100.0);
1627
1628        assert!(result.circular_ratio > 0.0);
1629        assert!(!result.sccs.is_empty());
1630        assert!(result.circular_amount > 0.0);
1631    }
1632
1633    #[test]
1634    fn test_reciprocity_detection() {
1635        let txs = create_reciprocal_transactions();
1636        let result = ReciprocityFlowRatio::compute(&txs, None, 100.0);
1637
1638        assert!(result.reciprocity_ratio > 0.0);
1639        assert!(!result.reciprocal_pairs.is_empty());
1640    }
1641
1642    #[test]
1643    fn test_rapid_movement_detection() {
1644        // Many transactions in short time
1645        let txs: Vec<Transaction> = (0..20)
1646            .map(|i| Transaction {
1647                id: i as u64,
1648                source_id: 1,
1649                dest_id: 2,
1650                amount: 500.0,
1651                timestamp: 100 + i as u64 * 60, // One minute apart
1652                currency: "USD".to_string(),
1653                tx_type: "wire".to_string(),
1654            })
1655            .collect();
1656
1657        let result = RapidMovement::compute(&txs, 1.0, 10.0, 1000.0);
1658
1659        assert!(!result.flagged_entities.is_empty());
1660        assert!(result.rapid_amount > 0.0);
1661    }
1662
1663    #[test]
1664    fn test_aml_pattern_funnel() {
1665        // Many sources to one destination
1666        let txs: Vec<Transaction> = (0..10)
1667            .map(|i| Transaction {
1668                id: i as u64,
1669                source_id: i as u64 + 100, // Different sources
1670                dest_id: 1,                // Same destination
1671                amount: 500.0,
1672                timestamp: 100 + i as u64,
1673                currency: "USD".to_string(),
1674                tx_type: "wire".to_string(),
1675            })
1676            .collect();
1677
1678        let result = AMLPatternDetection::compute(&txs, 10000.0, 24.0);
1679
1680        let funnel_patterns: Vec<_> = result
1681            .patterns
1682            .iter()
1683            .filter(|(p, _)| *p == AMLPattern::FunnelAccount)
1684            .collect();
1685
1686        assert!(!funnel_patterns.is_empty());
1687    }
1688
1689    #[test]
1690    fn test_empty_transactions() {
1691        let empty: Vec<Transaction> = vec![];
1692
1693        let circ = CircularFlowRatio::compute(&empty, 100.0);
1694        assert!(circ.sccs.is_empty());
1695
1696        let recip = ReciprocityFlowRatio::compute(&empty, None, 100.0);
1697        assert!(recip.reciprocal_pairs.is_empty());
1698
1699        let rapid = RapidMovement::compute(&empty, 1.0, 10.0, 1000.0);
1700        assert!(rapid.flagged_entities.is_empty());
1701
1702        let aml = AMLPatternDetection::compute(&empty, 10000.0, 24.0);
1703        assert!(aml.patterns.is_empty());
1704
1705        let reversal = FlowReversalPattern::compute(&empty, &FlowReversalConfig::default());
1706        assert!(reversal.reversals.is_empty());
1707
1708        let split = FlowSplitRatio::compute(&empty, &FlowSplitConfig::default());
1709        assert!(split.splits.is_empty());
1710    }
1711
1712    // ========================================================================
1713    // Flow Reversal Pattern Tests
1714    // ========================================================================
1715
1716    fn create_reversal_transactions() -> Vec<Transaction> {
1717        vec![
1718            // A -> B: 5000 at t=100
1719            Transaction {
1720                id: 1,
1721                source_id: 1,
1722                dest_id: 2,
1723                amount: 5000.0,
1724                timestamp: 100,
1725                currency: "USD".to_string(),
1726                tx_type: "wire".to_string(),
1727            },
1728            // B -> A: 5000 at t=200 (reversal with exact match)
1729            Transaction {
1730                id: 2,
1731                source_id: 2,
1732                dest_id: 1,
1733                amount: 5000.0,
1734                timestamp: 200,
1735                currency: "USD".to_string(),
1736                tx_type: "wire".to_string(),
1737            },
1738            // C -> D: 3000 at t=300
1739            Transaction {
1740                id: 3,
1741                source_id: 3,
1742                dest_id: 4,
1743                amount: 3000.0,
1744                timestamp: 300,
1745                currency: "USD".to_string(),
1746                tx_type: "wire".to_string(),
1747            },
1748            // D -> C: 2800 at t=400 (reversal with ~93% match)
1749            Transaction {
1750                id: 4,
1751                source_id: 4,
1752                dest_id: 3,
1753                amount: 2800.0,
1754                timestamp: 400,
1755                currency: "USD".to_string(),
1756                tx_type: "wire".to_string(),
1757            },
1758        ]
1759    }
1760
1761    #[test]
1762    fn test_flow_reversal_metadata() {
1763        let kernel = FlowReversalPattern::new();
1764        assert_eq!(kernel.metadata().id, "compliance/flow-reversal");
1765        assert_eq!(kernel.metadata().domain, Domain::Compliance);
1766    }
1767
1768    #[test]
1769    fn test_flow_reversal_detection() {
1770        let txs = create_reversal_transactions();
1771        let config = FlowReversalConfig::default();
1772        let result = FlowReversalPattern::compute(&txs, &config);
1773
1774        assert_eq!(result.reversals.len(), 2);
1775        assert!(result.reversal_volume > 0.0);
1776        assert!(result.reversal_ratio > 0.0);
1777    }
1778
1779    #[test]
1780    fn test_flow_reversal_exact_match() {
1781        let txs = vec![
1782            Transaction {
1783                id: 1,
1784                source_id: 1,
1785                dest_id: 2,
1786                amount: 1000.0,
1787                timestamp: 100,
1788                currency: "USD".to_string(),
1789                tx_type: "wire".to_string(),
1790            },
1791            Transaction {
1792                id: 2,
1793                source_id: 2,
1794                dest_id: 1,
1795                amount: 1000.0,
1796                timestamp: 150, // Very fast reversal
1797                currency: "USD".to_string(),
1798                tx_type: "wire".to_string(),
1799            },
1800        ];
1801
1802        let config = FlowReversalConfig {
1803            critical_window_seconds: 100,
1804            suspicious_window_seconds: 600,
1805            ..Default::default()
1806        };
1807        let result = FlowReversalPattern::compute(&txs, &config);
1808
1809        assert_eq!(result.reversals.len(), 1);
1810        assert_eq!(result.reversals[0].amount_match_ratio, 1.0);
1811        // Should be Critical due to very fast + exact match
1812        assert!(matches!(
1813            result.reversals[0].risk_level,
1814            crate::types::ReversalRiskLevel::Critical
1815        ));
1816    }
1817
1818    #[test]
1819    fn test_flow_reversal_outside_window() {
1820        let txs = vec![
1821            Transaction {
1822                id: 1,
1823                source_id: 1,
1824                dest_id: 2,
1825                amount: 1000.0,
1826                timestamp: 100,
1827                currency: "USD".to_string(),
1828                tx_type: "wire".to_string(),
1829            },
1830            Transaction {
1831                id: 2,
1832                source_id: 2,
1833                dest_id: 1,
1834                amount: 1000.0,
1835                timestamp: 100000, // Way outside window
1836                currency: "USD".to_string(),
1837                tx_type: "wire".to_string(),
1838            },
1839        ];
1840
1841        let config = FlowReversalConfig {
1842            max_window_seconds: 3600, // 1 hour window
1843            ..Default::default()
1844        };
1845        let result = FlowReversalPattern::compute(&txs, &config);
1846
1847        assert!(result.reversals.is_empty());
1848    }
1849
1850    // ========================================================================
1851    // Flow Split Ratio Tests
1852    // ========================================================================
1853
1854    fn create_structuring_transactions() -> Vec<Transaction> {
1855        // Classic structuring: 5 transactions of $2500 each = $12,500 total
1856        // All just under the $10,000 threshold indicator
1857        (0..5)
1858            .map(|i| Transaction {
1859                id: i as u64,
1860                source_id: 1,
1861                dest_id: (i + 10) as u64,
1862                amount: 2500.0,
1863                timestamp: 1000 + i as u64 * 100,
1864                currency: "USD".to_string(),
1865                tx_type: "wire".to_string(),
1866            })
1867            .collect()
1868    }
1869
1870    #[test]
1871    fn test_flow_split_metadata() {
1872        let kernel = FlowSplitRatio::new();
1873        assert_eq!(kernel.metadata().id, "compliance/flow-split");
1874        assert_eq!(kernel.metadata().domain, Domain::Compliance);
1875    }
1876
1877    #[test]
1878    fn test_flow_split_detection() {
1879        let txs = create_structuring_transactions();
1880        let config = FlowSplitConfig::default();
1881        let result = FlowSplitRatio::compute(&txs, &config);
1882
1883        // Should detect structuring pattern
1884        assert!(!result.splits.is_empty());
1885        assert!(result.split_volume > 0.0);
1886    }
1887
1888    #[test]
1889    fn test_flow_split_near_threshold() {
1890        // 4 transactions of $2450 each = $9800, just under $10k
1891        let txs: Vec<Transaction> = (0..4)
1892            .map(|i| Transaction {
1893                id: i as u64,
1894                source_id: 1,
1895                dest_id: (i + 10) as u64,
1896                amount: 2450.0,
1897                timestamp: 1000 + i as u64 * 100,
1898                currency: "USD".to_string(),
1899                tx_type: "wire".to_string(),
1900            })
1901            .collect();
1902
1903        let config = FlowSplitConfig {
1904            reporting_threshold: 10_000.0,
1905            min_split_count: 3,
1906            ..Default::default()
1907        };
1908        let result = FlowSplitRatio::compute(&txs, &config);
1909
1910        // Total is $9800, should detect as structuring
1911        assert!(!result.splits.is_empty());
1912
1913        // Should flag the source entity
1914        if !result.structuring_entities.is_empty() {
1915            assert!(result.structuring_entities.contains(&1));
1916        }
1917    }
1918
1919    #[test]
1920    fn test_flow_split_below_threshold() {
1921        // Only 2 small transactions - should NOT be flagged
1922        let txs: Vec<Transaction> = (0..2)
1923            .map(|i| Transaction {
1924                id: i as u64,
1925                source_id: 1,
1926                dest_id: (i + 10) as u64,
1927                amount: 500.0, // Small amounts
1928                timestamp: 1000 + i as u64 * 100,
1929                currency: "USD".to_string(),
1930                tx_type: "wire".to_string(),
1931            })
1932            .collect();
1933
1934        let config = FlowSplitConfig::default();
1935        let result = FlowSplitRatio::compute(&txs, &config);
1936
1937        // Should not detect any structuring
1938        assert!(result.splits.is_empty());
1939    }
1940
1941    #[test]
1942    fn test_flow_split_risk_levels() {
1943        // Create transactions targeting exactly $10k (critical structuring)
1944        let txs: Vec<Transaction> = (0..4)
1945            .map(|i| Transaction {
1946                id: i as u64,
1947                source_id: 1,
1948                dest_id: (i + 10) as u64,
1949                amount: 2500.0, // 4 x 2500 = 10000 exactly
1950                timestamp: 1000 + i as u64 * 100,
1951                currency: "USD".to_string(),
1952                tx_type: "wire".to_string(),
1953            })
1954            .collect();
1955
1956        let config = FlowSplitConfig {
1957            reporting_threshold: 10_000.0,
1958            min_split_count: 3,
1959            ..Default::default()
1960        };
1961        let result = FlowSplitRatio::compute(&txs, &config);
1962
1963        // Should detect high/critical risk
1964        if !result.splits.is_empty() {
1965            let has_high_risk = result.splits.iter().any(|s| {
1966                matches!(
1967                    s.risk_level,
1968                    crate::types::SplitRiskLevel::High | crate::types::SplitRiskLevel::Critical
1969                )
1970            });
1971            assert!(has_high_risk);
1972        }
1973    }
1974}