1use crate::messages::{
10 AMLPatternInput, AMLPatternOutput, CircularFlowInput, CircularFlowOutput, RapidMovementInput,
11 RapidMovementOutput, ReciprocityFlowInput, ReciprocityFlowOutput,
12};
13use crate::ring_messages::{
14 AddGraphEdgeResponse, AddGraphEdgeRing, MatchPatternResponse, MatchPatternRing,
15 QueryCircularRatioResponse, QueryCircularRatioRing,
16};
17use crate::types::{
18 AMLPattern, AMLPatternResult, CircularFlowResult, PatternDetail, RapidMovementResult,
19 ReciprocityResult, TimeWindow, Transaction,
20};
21use async_trait::async_trait;
22use ringkernel_core::RingContext;
23use rustkernel_core::error::Result;
24use rustkernel_core::traits::{BatchKernel, RingKernelHandler};
25use rustkernel_core::{domain::Domain, kernel::KernelMetadata, traits::GpuKernel};
26use std::collections::{HashMap, HashSet};
27use std::time::Instant;
28
29#[derive(Debug, Clone, Default)]
35pub struct EntityCircularState {
36 pub outgoing_volume: f64,
38 pub incoming_volume: f64,
40 pub circular_volume: f64,
42 pub out_degree: u32,
44 pub in_degree: u32,
46 pub in_scc: bool,
48}
49
50#[derive(Debug, Clone, Default)]
52pub struct CircularFlowState {
53 pub graph: HashMap<u64, Vec<(u64, f64)>>,
55 pub entities: HashMap<u64, EntityCircularState>,
57 pub sccs: Vec<Vec<u64>>,
59 pub total_volume: f64,
61 pub circular_volume: f64,
63 pub sccs_stale: bool,
65}
66
67#[derive(Debug)]
72pub struct CircularFlowRatio {
73 metadata: KernelMetadata,
74 state: std::sync::RwLock<CircularFlowState>,
76}
77
78impl Clone for CircularFlowRatio {
79 fn clone(&self) -> Self {
80 Self {
81 metadata: self.metadata.clone(),
82 state: std::sync::RwLock::new(self.state.read().unwrap().clone()),
83 }
84 }
85}
86
87impl Default for CircularFlowRatio {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl CircularFlowRatio {
94 #[must_use]
96 pub fn new() -> Self {
97 Self {
98 metadata: KernelMetadata::ring("compliance/circular-flow", Domain::Compliance)
99 .with_description("Circular flow detection via SCC")
100 .with_throughput(50_000)
101 .with_latency_us(100.0),
102 state: std::sync::RwLock::new(CircularFlowState::default()),
103 }
104 }
105
106 pub fn add_edge(&self, source_id: u64, dest_id: u64, amount: f64) -> (bool, u32, f64) {
109 let mut state = self.state.write().unwrap();
110
111 state
113 .graph
114 .entry(source_id)
115 .or_default()
116 .push((dest_id, amount));
117 state.total_volume += amount;
118 state.sccs_stale = true;
119
120 let source_state = state.entities.entry(source_id).or_default();
122 source_state.outgoing_volume += amount;
123 source_state.out_degree += 1;
124
125 let dest_state = state.entities.entry(dest_id).or_default();
126 dest_state.incoming_volume += amount;
127 dest_state.in_degree += 1;
128
129 let cycle_detected = Self::has_path(&state.graph, dest_id, source_id);
131
132 let cycle_size = if cycle_detected {
133 Self::estimate_cycle_size(&state.graph, source_id, dest_id)
135 } else {
136 0
137 };
138
139 let source_ratio = {
140 let s = state.entities.get(&source_id).unwrap();
141 if s.outgoing_volume > 0.0 {
142 s.circular_volume / s.outgoing_volume
143 } else {
144 0.0
145 }
146 };
147
148 (cycle_detected, cycle_size, source_ratio)
149 }
150
151 pub fn query_entity(&self, entity_id: u64) -> (f64, u32, u64) {
153 let state = self.state.read().unwrap();
154
155 if let Some(entity_state) = state.entities.get(&entity_id) {
156 let ratio = if entity_state.outgoing_volume > 0.0 {
157 entity_state.circular_volume / entity_state.outgoing_volume
158 } else {
159 0.0
160 };
161 let scc_count = state
162 .sccs
163 .iter()
164 .filter(|scc| scc.contains(&entity_id))
165 .count() as u32;
166 let cycle_volume = (entity_state.circular_volume * 100_000_000.0) as u64;
167 (ratio, scc_count, cycle_volume)
168 } else {
169 (0.0, 0, 0)
170 }
171 }
172
173 fn has_path(graph: &HashMap<u64, Vec<(u64, f64)>>, src: u64, dst: u64) -> bool {
175 let mut visited = HashSet::new();
176 let mut stack = vec![src];
177
178 while let Some(node) = stack.pop() {
179 if node == dst {
180 return true;
181 }
182 if visited.insert(node) {
183 if let Some(neighbors) = graph.get(&node) {
184 for &(neighbor, _) in neighbors {
185 if !visited.contains(&neighbor) {
186 stack.push(neighbor);
187 }
188 }
189 }
190 }
191 }
192 false
193 }
194
195 fn estimate_cycle_size(graph: &HashMap<u64, Vec<(u64, f64)>>, start: u64, end: u64) -> u32 {
197 let mut visited = HashSet::new();
198 let mut queue = std::collections::VecDeque::new();
199 queue.push_back((end, 1u32));
200
201 while let Some((node, depth)) = queue.pop_front() {
202 if node == start {
203 return depth;
204 }
205 if visited.insert(node) {
206 if let Some(neighbors) = graph.get(&node) {
207 for &(neighbor, _) in neighbors {
208 if !visited.contains(&neighbor) {
209 queue.push_back((neighbor, depth + 1));
210 }
211 }
212 }
213 }
214 }
215 0
216 }
217
218 pub fn compute(transactions: &[Transaction], min_amount: f64) -> CircularFlowResult {
224 if transactions.is_empty() {
225 return CircularFlowResult {
226 circular_ratio: 0.0,
227 sccs: Vec::new(),
228 circular_amount: 0.0,
229 total_amount: 0.0,
230 };
231 }
232
233 let mut graph: HashMap<u64, Vec<(u64, f64)>> = HashMap::new();
235 let mut total_amount = 0.0;
236
237 for tx in transactions {
238 graph
239 .entry(tx.source_id)
240 .or_default()
241 .push((tx.dest_id, tx.amount));
242 total_amount += tx.amount;
243 }
244
245 let sccs = Self::tarjan_scc(&graph);
247
248 let mut circular_amount = 0.0;
250 let mut significant_sccs = Vec::new();
251
252 for scc in &sccs {
253 if scc.len() > 1 {
254 let scc_set: HashSet<u64> = scc.iter().copied().collect();
256 let mut scc_amount = 0.0;
257
258 for tx in transactions {
259 if scc_set.contains(&tx.source_id) && scc_set.contains(&tx.dest_id) {
260 scc_amount += tx.amount;
261 }
262 }
263
264 if scc_amount >= min_amount {
265 circular_amount += scc_amount;
266 significant_sccs.push(scc.clone());
267 }
268 }
269 }
270
271 let circular_ratio = if total_amount > 0.0 {
272 circular_amount / total_amount
273 } else {
274 0.0
275 };
276
277 CircularFlowResult {
278 circular_ratio,
279 sccs: significant_sccs,
280 circular_amount,
281 total_amount,
282 }
283 }
284
285 fn tarjan_scc(graph: &HashMap<u64, Vec<(u64, f64)>>) -> Vec<Vec<u64>> {
287 let mut index_counter = 0u64;
288 let mut stack = Vec::new();
289 let mut on_stack: HashSet<u64> = HashSet::new();
290 let mut indices: HashMap<u64, u64> = HashMap::new();
291 let mut lowlinks: HashMap<u64, u64> = HashMap::new();
292 let mut sccs = Vec::new();
293
294 let mut nodes: HashSet<u64> = graph.keys().copied().collect();
296 for edges in graph.values() {
297 for (dest, _) in edges {
298 nodes.insert(*dest);
299 }
300 }
301
302 #[allow(clippy::too_many_arguments)]
303 fn strongconnect(
304 v: u64,
305 graph: &HashMap<u64, Vec<(u64, f64)>>,
306 index_counter: &mut u64,
307 stack: &mut Vec<u64>,
308 on_stack: &mut HashSet<u64>,
309 indices: &mut HashMap<u64, u64>,
310 lowlinks: &mut HashMap<u64, u64>,
311 sccs: &mut Vec<Vec<u64>>,
312 ) {
313 indices.insert(v, *index_counter);
314 lowlinks.insert(v, *index_counter);
315 *index_counter += 1;
316 stack.push(v);
317 on_stack.insert(v);
318
319 if let Some(neighbors) = graph.get(&v) {
320 for (w, _) in neighbors {
321 if !indices.contains_key(w) {
322 strongconnect(
323 *w,
324 graph,
325 index_counter,
326 stack,
327 on_stack,
328 indices,
329 lowlinks,
330 sccs,
331 );
332 let lowlink_w = lowlinks[w];
333 if let Some(ll) = lowlinks.get_mut(&v) {
334 *ll = (*ll).min(lowlink_w);
335 }
336 } else if on_stack.contains(w) {
337 let index_w = indices[w];
338 if let Some(ll) = lowlinks.get_mut(&v) {
339 *ll = (*ll).min(index_w);
340 }
341 }
342 }
343 }
344
345 if lowlinks[&v] == indices[&v] {
346 let mut scc = Vec::new();
347 loop {
348 let w = stack.pop().unwrap();
349 on_stack.remove(&w);
350 scc.push(w);
351 if w == v {
352 break;
353 }
354 }
355 sccs.push(scc);
356 }
357 }
358
359 for node in nodes {
360 if !indices.contains_key(&node) {
361 strongconnect(
362 node,
363 graph,
364 &mut index_counter,
365 &mut stack,
366 &mut on_stack,
367 &mut indices,
368 &mut lowlinks,
369 &mut sccs,
370 );
371 }
372 }
373
374 sccs
375 }
376}
377
378impl GpuKernel for CircularFlowRatio {
379 fn metadata(&self) -> &KernelMetadata {
380 &self.metadata
381 }
382}
383
384#[async_trait]
385impl BatchKernel<CircularFlowInput, CircularFlowOutput> for CircularFlowRatio {
386 async fn execute(&self, input: CircularFlowInput) -> Result<CircularFlowOutput> {
387 let start = Instant::now();
388 let result = Self::compute(&input.transactions, input.min_amount);
389 Ok(CircularFlowOutput {
390 result,
391 compute_time_us: start.elapsed().as_micros() as u64,
392 })
393 }
394}
395
396#[async_trait]
402impl RingKernelHandler<AddGraphEdgeRing, AddGraphEdgeResponse> for CircularFlowRatio {
403 async fn handle(
404 &self,
405 _ctx: &mut RingContext,
406 msg: AddGraphEdgeRing,
407 ) -> Result<AddGraphEdgeResponse> {
408 let amount = msg.amount as f64 / 100_000_000.0;
410 let (cycle_detected, cycle_size, source_ratio) =
411 self.add_edge(msg.source_id, msg.dest_id, amount);
412
413 Ok(AddGraphEdgeResponse {
414 correlation_id: msg.correlation_id,
415 cycle_detected,
416 cycle_size,
417 source_ratio: source_ratio as f32,
418 })
419 }
420}
421
422#[async_trait]
424impl RingKernelHandler<QueryCircularRatioRing, QueryCircularRatioResponse> for CircularFlowRatio {
425 async fn handle(
426 &self,
427 _ctx: &mut RingContext,
428 msg: QueryCircularRatioRing,
429 ) -> Result<QueryCircularRatioResponse> {
430 let (ratio, scc_count, cycle_volume) = self.query_entity(msg.entity_id);
432
433 Ok(QueryCircularRatioResponse {
434 correlation_id: msg.correlation_id,
435 entity_id: msg.entity_id,
436 ratio: ratio as f32,
437 scc_count,
438 cycle_volume: cycle_volume as i64,
439 })
440 }
441}
442
443#[derive(Debug, Clone)]
452pub struct ReciprocityFlowRatio {
453 metadata: KernelMetadata,
454}
455
456impl Default for ReciprocityFlowRatio {
457 fn default() -> Self {
458 Self::new()
459 }
460}
461
462impl ReciprocityFlowRatio {
463 #[must_use]
465 pub fn new() -> Self {
466 Self {
467 metadata: KernelMetadata::ring("compliance/reciprocity-flow", Domain::Compliance)
468 .with_description("Reciprocal transaction detection")
469 .with_throughput(100_000)
470 .with_latency_us(50.0),
471 }
472 }
473
474 pub fn compute(
481 transactions: &[Transaction],
482 window: Option<TimeWindow>,
483 min_amount: f64,
484 ) -> ReciprocityResult {
485 if transactions.is_empty() {
486 return ReciprocityResult {
487 reciprocity_ratio: 0.0,
488 reciprocal_pairs: Vec::new(),
489 reciprocal_amount: 0.0,
490 };
491 }
492
493 let txs: Vec<&Transaction> = transactions
495 .iter()
496 .filter(|tx| window.map(|w| w.contains(tx.timestamp)).unwrap_or(true))
497 .collect();
498
499 let mut edge_amounts: HashMap<(u64, u64), f64> = HashMap::new();
501
502 for tx in &txs {
503 *edge_amounts.entry((tx.source_id, tx.dest_id)).or_default() += tx.amount;
504 }
505
506 let mut reciprocal_pairs = Vec::new();
508 let mut reciprocal_amount = 0.0;
509 let mut processed: HashSet<(u64, u64)> = HashSet::new();
510
511 for (&(src, dst), &amount) in &edge_amounts {
512 if processed.contains(&(src, dst)) || processed.contains(&(dst, src)) {
513 continue;
514 }
515
516 if let Some(&reverse_amount) = edge_amounts.get(&(dst, src)) {
517 let min_reciprocal = amount.min(reverse_amount);
518 if min_reciprocal >= min_amount {
519 reciprocal_pairs.push((src, dst));
520 reciprocal_amount += min_reciprocal * 2.0;
521 processed.insert((src, dst));
522 processed.insert((dst, src));
523 }
524 }
525 }
526
527 let total_amount: f64 = txs.iter().map(|tx| tx.amount).sum();
528 let reciprocity_ratio = if total_amount > 0.0 {
529 reciprocal_amount / total_amount
530 } else {
531 0.0
532 };
533
534 ReciprocityResult {
535 reciprocity_ratio,
536 reciprocal_pairs,
537 reciprocal_amount,
538 }
539 }
540}
541
542impl GpuKernel for ReciprocityFlowRatio {
543 fn metadata(&self) -> &KernelMetadata {
544 &self.metadata
545 }
546}
547
548#[async_trait]
549impl BatchKernel<ReciprocityFlowInput, ReciprocityFlowOutput> for ReciprocityFlowRatio {
550 async fn execute(&self, input: ReciprocityFlowInput) -> Result<ReciprocityFlowOutput> {
551 let start = Instant::now();
552 let result = Self::compute(&input.transactions, input.window, input.min_amount);
553 Ok(ReciprocityFlowOutput {
554 result,
555 compute_time_us: start.elapsed().as_micros() as u64,
556 })
557 }
558}
559
560#[derive(Debug, Clone)]
569pub struct RapidMovement {
570 metadata: KernelMetadata,
571}
572
573impl Default for RapidMovement {
574 fn default() -> Self {
575 Self::new()
576 }
577}
578
579impl RapidMovement {
580 #[must_use]
582 pub fn new() -> Self {
583 Self {
584 metadata: KernelMetadata::ring("compliance/rapid-movement", Domain::Compliance)
585 .with_description("Velocity analysis for rapid movement detection")
586 .with_throughput(200_000)
587 .with_latency_us(20.0),
588 }
589 }
590
591 pub fn compute(
599 transactions: &[Transaction],
600 window_hours: f64,
601 velocity_threshold: f64,
602 amount_threshold: f64,
603 ) -> RapidMovementResult {
604 if transactions.is_empty() || window_hours <= 0.0 {
605 return RapidMovementResult {
606 flagged_entities: Vec::new(),
607 velocity_metrics: Vec::new(),
608 rapid_amount: 0.0,
609 };
610 }
611
612 let window_seconds = (window_hours * 3600.0) as u64;
613
614 let mut entity_txs: HashMap<u64, Vec<&Transaction>> = HashMap::new();
616
617 for tx in transactions {
618 entity_txs.entry(tx.source_id).or_default().push(tx);
619 entity_txs.entry(tx.dest_id).or_default().push(tx);
620 }
621
622 let mut flagged_entities = Vec::new();
623 let mut velocity_metrics = Vec::new();
624 let mut rapid_amount = 0.0;
625
626 for (entity_id, txs) in entity_txs {
627 if txs.is_empty() {
628 continue;
629 }
630
631 let mut sorted_txs: Vec<_> = txs.into_iter().collect();
633 sorted_txs.sort_by_key(|tx| tx.timestamp);
634
635 let mut max_velocity = 0.0f64;
637 let mut max_window_amount = 0.0f64;
638
639 for (i, tx) in sorted_txs.iter().enumerate() {
640 let window_start = tx.timestamp;
641 let window_end = window_start + window_seconds;
642
643 let window_txs: Vec<_> = sorted_txs[i..]
644 .iter()
645 .take_while(|t| t.timestamp < window_end)
646 .collect();
647
648 let count = window_txs.len();
649 let velocity = count as f64 / window_hours;
650 let window_amount: f64 = window_txs.iter().map(|t| t.amount).sum();
651
652 if velocity > max_velocity {
653 max_velocity = velocity;
654 max_window_amount = window_amount;
655 }
656 }
657
658 velocity_metrics.push((entity_id, max_velocity));
659
660 if max_velocity >= velocity_threshold && max_window_amount >= amount_threshold {
661 flagged_entities.push(entity_id);
662 rapid_amount += max_window_amount;
663 }
664 }
665
666 velocity_metrics.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
668
669 RapidMovementResult {
670 flagged_entities,
671 velocity_metrics,
672 rapid_amount,
673 }
674 }
675}
676
677impl GpuKernel for RapidMovement {
678 fn metadata(&self) -> &KernelMetadata {
679 &self.metadata
680 }
681}
682
683#[async_trait]
684impl BatchKernel<RapidMovementInput, RapidMovementOutput> for RapidMovement {
685 async fn execute(&self, input: RapidMovementInput) -> Result<RapidMovementOutput> {
686 let start = Instant::now();
687 let result = Self::compute(
688 &input.transactions,
689 input.window_hours,
690 input.velocity_threshold,
691 input.amount_threshold,
692 );
693 Ok(RapidMovementOutput {
694 result,
695 compute_time_us: start.elapsed().as_micros() as u64,
696 })
697 }
698}
699
700#[derive(Debug, Clone)]
709pub struct AMLPatternDetection {
710 metadata: KernelMetadata,
711}
712
713impl Default for AMLPatternDetection {
714 fn default() -> Self {
715 Self::new()
716 }
717}
718
719impl AMLPatternDetection {
720 #[must_use]
722 pub fn new() -> Self {
723 Self {
724 metadata: KernelMetadata::ring("compliance/aml-patterns", Domain::Compliance)
725 .with_description("Multi-pattern AML detection")
726 .with_throughput(30_000)
727 .with_latency_us(200.0),
728 }
729 }
730
731 pub fn compute(
738 transactions: &[Transaction],
739 structuring_threshold: f64,
740 structuring_window_hours: f64,
741 ) -> AMLPatternResult {
742 if transactions.is_empty() {
743 return AMLPatternResult {
744 patterns: Vec::new(),
745 risk_score: 0.0,
746 pattern_details: Vec::new(),
747 };
748 }
749
750 let mut patterns = Vec::new();
751 let mut pattern_details = Vec::new();
752 let mut total_risk_score = 0.0;
753
754 let structuring = Self::detect_structuring(
756 transactions,
757 structuring_threshold,
758 structuring_window_hours,
759 );
760 if !structuring.is_empty() {
761 for detail in structuring {
762 total_risk_score += detail.confidence * 25.0;
763 patterns.push((AMLPattern::Structuring, detail.entities.clone()));
764 pattern_details.push(detail);
765 }
766 }
767
768 let funnels = Self::detect_funnel_accounts(transactions);
770 if !funnels.is_empty() {
771 for detail in funnels {
772 total_risk_score += detail.confidence * 20.0;
773 patterns.push((AMLPattern::FunnelAccount, detail.entities.clone()));
774 pattern_details.push(detail);
775 }
776 }
777
778 let fanouts = Self::detect_fan_out(transactions);
780 if !fanouts.is_empty() {
781 for detail in fanouts {
782 total_risk_score += detail.confidence * 15.0;
783 patterns.push((AMLPattern::FanOut, detail.entities.clone()));
784 pattern_details.push(detail);
785 }
786 }
787
788 let risk_score = total_risk_score.min(100.0);
789
790 AMLPatternResult {
791 patterns,
792 risk_score,
793 pattern_details,
794 }
795 }
796
797 fn detect_structuring(
799 transactions: &[Transaction],
800 threshold: f64,
801 window_hours: f64,
802 ) -> Vec<PatternDetail> {
803 let window_seconds = (window_hours * 3600.0) as u64;
804 let mut results = Vec::new();
805
806 let mut by_source: HashMap<u64, Vec<&Transaction>> = HashMap::new();
808 for tx in transactions {
809 by_source.entry(tx.source_id).or_default().push(tx);
810 }
811
812 for (source_id, txs) in by_source {
813 if txs.len() < 3 {
814 continue;
815 }
816
817 let mut sorted_txs: Vec<_> = txs.into_iter().collect();
818 sorted_txs.sort_by_key(|tx| tx.timestamp);
819
820 for (i, tx) in sorted_txs.iter().enumerate() {
822 let window_end = tx.timestamp + window_seconds;
823
824 let window_txs: Vec<_> = sorted_txs[i..]
825 .iter()
826 .take_while(|t| t.timestamp < window_end)
827 .filter(|t| t.amount < threshold && t.amount > threshold * 0.7)
828 .collect();
829
830 if window_txs.len() >= 3 {
831 let total_amount: f64 = window_txs.iter().map(|t| t.amount).sum();
832 if total_amount > threshold {
833 let confidence = (window_txs.len() as f64 / 10.0).min(1.0);
834 let dests: HashSet<u64> = window_txs.iter().map(|t| t.dest_id).collect();
835 let mut entities = vec![source_id];
836 entities.extend(dests);
837
838 results.push(PatternDetail {
839 pattern: AMLPattern::Structuring,
840 entities,
841 amount: total_amount,
842 confidence,
843 time_span: TimeWindow::new(
844 tx.timestamp,
845 window_txs
846 .last()
847 .map(|t| t.timestamp)
848 .unwrap_or(tx.timestamp),
849 ),
850 });
851 break; }
853 }
854 }
855 }
856
857 results
858 }
859
860 fn detect_funnel_accounts(transactions: &[Transaction]) -> Vec<PatternDetail> {
862 let mut results = Vec::new();
863
864 let mut incoming: HashMap<u64, Vec<&Transaction>> = HashMap::new();
866 for tx in transactions {
867 incoming.entry(tx.dest_id).or_default().push(tx);
868 }
869
870 for (dest_id, txs) in incoming {
871 let unique_sources: HashSet<u64> = txs.iter().map(|tx| tx.source_id).collect();
872
873 if unique_sources.len() >= 5 {
875 let total_amount: f64 = txs.iter().map(|t| t.amount).sum();
876 let confidence = (unique_sources.len() as f64 / 20.0).min(1.0);
877
878 let mut entities = vec![dest_id];
879 entities.extend(unique_sources.iter().take(10));
880
881 let timestamps: Vec<u64> = txs.iter().map(|t| t.timestamp).collect();
882
883 results.push(PatternDetail {
884 pattern: AMLPattern::FunnelAccount,
885 entities,
886 amount: total_amount,
887 confidence,
888 time_span: TimeWindow::new(
889 *timestamps.iter().min().unwrap_or(&0),
890 *timestamps.iter().max().unwrap_or(&0),
891 ),
892 });
893 }
894 }
895
896 results
897 }
898
899 fn detect_fan_out(transactions: &[Transaction]) -> Vec<PatternDetail> {
901 let mut results = Vec::new();
902
903 let mut outgoing: HashMap<u64, Vec<&Transaction>> = HashMap::new();
905 for tx in transactions {
906 outgoing.entry(tx.source_id).or_default().push(tx);
907 }
908
909 for (source_id, txs) in outgoing {
910 let unique_dests: HashSet<u64> = txs.iter().map(|tx| tx.dest_id).collect();
911
912 if unique_dests.len() >= 5 {
914 let total_amount: f64 = txs.iter().map(|t| t.amount).sum();
915 let confidence = (unique_dests.len() as f64 / 20.0).min(1.0);
916
917 let mut entities = vec![source_id];
918 entities.extend(unique_dests.iter().take(10));
919
920 let timestamps: Vec<u64> = txs.iter().map(|t| t.timestamp).collect();
921
922 results.push(PatternDetail {
923 pattern: AMLPattern::FanOut,
924 entities,
925 amount: total_amount,
926 confidence,
927 time_span: TimeWindow::new(
928 *timestamps.iter().min().unwrap_or(&0),
929 *timestamps.iter().max().unwrap_or(&0),
930 ),
931 });
932 }
933 }
934
935 results
936 }
937}
938
939impl GpuKernel for AMLPatternDetection {
940 fn metadata(&self) -> &KernelMetadata {
941 &self.metadata
942 }
943}
944
945#[async_trait]
946impl BatchKernel<AMLPatternInput, AMLPatternOutput> for AMLPatternDetection {
947 async fn execute(&self, input: AMLPatternInput) -> Result<AMLPatternOutput> {
948 let start = Instant::now();
949 let result = Self::compute(
950 &input.transactions,
951 input.structuring_threshold,
952 input.structuring_window_hours,
953 );
954 Ok(AMLPatternOutput {
955 result,
956 compute_time_us: start.elapsed().as_micros() as u64,
957 })
958 }
959}
960
961#[async_trait]
967impl RingKernelHandler<MatchPatternRing, MatchPatternResponse> for AMLPatternDetection {
968 async fn handle(
969 &self,
970 _ctx: &mut RingContext,
971 msg: MatchPatternRing,
972 ) -> Result<MatchPatternResponse> {
973 let transaction = Transaction {
975 id: msg.tx_id,
976 source_id: msg.source_id,
977 dest_id: msg.dest_id,
978 amount: msg.amount as f64 / 100_000_000.0,
979 timestamp: msg.timestamp,
980 currency: "USD".to_string(),
981 tx_type: match msg.tx_type {
982 0 => "wire",
983 1 => "ach",
984 2 => "check",
985 _ => "other",
986 }
987 .to_string(),
988 };
989
990 let structuring_threshold = 10_000.0;
992 let structuring_window_hours = 24.0;
993
994 let result = Self::compute(
995 &[transaction],
996 structuring_threshold,
997 structuring_window_hours,
998 );
999
1000 let mut patterns_matched = 0u64;
1002 for (pattern, _) in &result.patterns {
1003 let bit = match pattern {
1004 AMLPattern::Structuring => 0,
1005 AMLPattern::Layering => 1,
1006 AMLPattern::RapidMovement => 2,
1007 AMLPattern::RoundTripping => 3,
1008 AMLPattern::FunnelAccount => 4,
1009 AMLPattern::FanOut => 5,
1010 };
1011 patterns_matched |= 1u64 << bit;
1012 }
1013
1014 let max_score = result
1015 .pattern_details
1016 .iter()
1017 .map(|d| d.confidence)
1018 .max_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal))
1019 .unwrap_or(0.0) as f32;
1020
1021 Ok(MatchPatternResponse {
1022 correlation_id: msg.correlation_id,
1023 tx_id: msg.tx_id,
1024 patterns_matched,
1025 max_score,
1026 match_count: result.patterns.len() as u32,
1027 })
1028 }
1029}
1030
1031#[derive(Debug, Clone)]
1040pub struct FlowReversalPattern {
1041 metadata: KernelMetadata,
1042}
1043
1044impl Default for FlowReversalPattern {
1045 fn default() -> Self {
1046 Self::new()
1047 }
1048}
1049
1050impl FlowReversalPattern {
1051 #[must_use]
1053 pub fn new() -> Self {
1054 Self {
1055 metadata: KernelMetadata::batch("compliance/flow-reversal", Domain::Compliance)
1056 .with_description("Transaction reversal pattern detection")
1057 .with_throughput(80_000)
1058 .with_latency_us(50.0),
1059 }
1060 }
1061
1062 pub fn compute(
1068 transactions: &[Transaction],
1069 config: &FlowReversalConfig,
1070 ) -> crate::types::FlowReversalResult {
1071 use crate::types::{FlowReversalPair, FlowReversalResult, ReversalRiskLevel};
1072
1073 if transactions.is_empty() {
1074 return FlowReversalResult {
1075 reversals: Vec::new(),
1076 reversal_volume: 0.0,
1077 reversal_ratio: 0.0,
1078 repeat_offenders: Vec::new(),
1079 risk_score: 0.0,
1080 };
1081 }
1082
1083 let mut sorted_txs: Vec<_> = transactions.iter().collect();
1085 sorted_txs.sort_by_key(|tx| tx.timestamp);
1086
1087 #[allow(clippy::type_complexity)]
1089 let mut forward_edges: HashMap<(u64, u64), Vec<(u64, f64, u64)>> = HashMap::new();
1090 for tx in &sorted_txs {
1091 forward_edges
1092 .entry((tx.source_id, tx.dest_id))
1093 .or_default()
1094 .push((tx.id, tx.amount, tx.timestamp));
1095 }
1096
1097 let mut reversals = Vec::new();
1098 let mut processed_pairs: HashSet<(u64, u64)> = HashSet::new();
1099 let mut entity_reversal_count: HashMap<u64, u32> = HashMap::new();
1100
1101 for tx in &sorted_txs {
1103 let reverse_key = (tx.dest_id, tx.source_id);
1104
1105 if let Some(reverse_txs) = forward_edges.get(&reverse_key) {
1106 for &(rev_id, rev_amount, rev_timestamp) in reverse_txs {
1107 if processed_pairs.contains(&(tx.id, rev_id))
1109 || processed_pairs.contains(&(rev_id, tx.id))
1110 {
1111 continue;
1112 }
1113
1114 if rev_timestamp <= tx.timestamp {
1116 continue;
1117 }
1118
1119 let time_delta = rev_timestamp - tx.timestamp;
1120 if time_delta > config.max_window_seconds {
1121 continue;
1122 }
1123
1124 let amount_ratio = if tx.amount > 0.0 {
1126 (rev_amount / tx.amount).min(tx.amount / rev_amount)
1127 } else {
1128 0.0
1129 };
1130
1131 if amount_ratio < config.min_amount_match_ratio {
1132 continue;
1133 }
1134
1135 let risk_level = Self::calculate_risk_level(time_delta, amount_ratio, config);
1137
1138 reversals.push(FlowReversalPair {
1139 original_tx_id: tx.id,
1140 reversal_tx_id: rev_id,
1141 entity_a: tx.source_id,
1142 entity_b: tx.dest_id,
1143 original_amount: tx.amount,
1144 reversal_amount: rev_amount,
1145 time_delta,
1146 amount_match_ratio: amount_ratio,
1147 risk_level,
1148 });
1149
1150 processed_pairs.insert((tx.id, rev_id));
1151
1152 *entity_reversal_count.entry(tx.source_id).or_insert(0) += 1;
1154 *entity_reversal_count.entry(tx.dest_id).or_insert(0) += 1;
1155 }
1156 }
1157 }
1158
1159 let reversal_volume: f64 = reversals
1161 .iter()
1162 .map(|r| r.original_amount + r.reversal_amount)
1163 .sum();
1164 let total_volume: f64 = transactions.iter().map(|tx| tx.amount).sum();
1165 let reversal_ratio = if total_volume > 0.0 {
1166 reversal_volume / total_volume
1167 } else {
1168 0.0
1169 };
1170
1171 let mut repeat_offenders: Vec<_> = entity_reversal_count
1173 .into_iter()
1174 .filter(|(_, count)| *count >= 2)
1175 .collect();
1176 repeat_offenders.sort_by(|a, b| b.1.cmp(&a.1));
1177
1178 let high_risk_count = reversals
1180 .iter()
1181 .filter(|r| {
1182 matches!(
1183 r.risk_level,
1184 ReversalRiskLevel::High | ReversalRiskLevel::Critical
1185 )
1186 })
1187 .count();
1188
1189 let risk_score = ((reversals.len() as f64 * 5.0)
1190 + (high_risk_count as f64 * 15.0)
1191 + (repeat_offenders.len() as f64 * 10.0)
1192 + (reversal_ratio * 50.0))
1193 .min(100.0);
1194
1195 FlowReversalResult {
1196 reversals,
1197 reversal_volume,
1198 reversal_ratio,
1199 repeat_offenders,
1200 risk_score,
1201 }
1202 }
1203
1204 fn calculate_risk_level(
1206 time_delta: u64,
1207 amount_ratio: f64,
1208 config: &FlowReversalConfig,
1209 ) -> crate::types::ReversalRiskLevel {
1210 use crate::types::ReversalRiskLevel;
1211
1212 let is_exact_match = amount_ratio >= 0.99;
1213 let is_fast = time_delta < config.suspicious_window_seconds;
1214 let is_very_fast = time_delta < config.critical_window_seconds;
1215
1216 match (is_very_fast, is_fast, is_exact_match) {
1217 (true, _, true) => ReversalRiskLevel::Critical,
1218 (true, _, _) => ReversalRiskLevel::High,
1219 (_, true, true) => ReversalRiskLevel::High,
1220 (_, true, _) => ReversalRiskLevel::Suspicious,
1221 (_, _, true) => ReversalRiskLevel::Suspicious,
1222 _ => ReversalRiskLevel::Normal,
1223 }
1224 }
1225}
1226
1227impl GpuKernel for FlowReversalPattern {
1228 fn metadata(&self) -> &KernelMetadata {
1229 &self.metadata
1230 }
1231}
1232
1233#[derive(Debug, Clone)]
1235pub struct FlowReversalConfig {
1236 pub max_window_seconds: u64,
1238 pub suspicious_window_seconds: u64,
1240 pub critical_window_seconds: u64,
1242 pub min_amount_match_ratio: f64,
1244}
1245
1246impl Default for FlowReversalConfig {
1247 fn default() -> Self {
1248 Self {
1249 max_window_seconds: 86400, suspicious_window_seconds: 3600, critical_window_seconds: 300, min_amount_match_ratio: 0.9, }
1254 }
1255}
1256
1257#[derive(Debug, Clone)]
1266pub struct FlowSplitRatio {
1267 metadata: KernelMetadata,
1268}
1269
1270impl Default for FlowSplitRatio {
1271 fn default() -> Self {
1272 Self::new()
1273 }
1274}
1275
1276impl FlowSplitRatio {
1277 #[must_use]
1279 pub fn new() -> Self {
1280 Self {
1281 metadata: KernelMetadata::batch("compliance/flow-split", Domain::Compliance)
1282 .with_description("Transaction splitting and structuring detection")
1283 .with_throughput(60_000)
1284 .with_latency_us(80.0),
1285 }
1286 }
1287
1288 pub fn compute(
1294 transactions: &[Transaction],
1295 config: &FlowSplitConfig,
1296 ) -> crate::types::FlowSplitResult {
1297 use crate::types::{FlowSplitPattern, FlowSplitResult, SplitRiskLevel};
1298
1299 if transactions.is_empty() {
1300 return FlowSplitResult {
1301 splits: Vec::new(),
1302 structuring_entities: Vec::new(),
1303 split_volume: 0.0,
1304 split_ratio: 0.0,
1305 risk_score: 0.0,
1306 };
1307 }
1308
1309 let mut by_source: HashMap<u64, Vec<&Transaction>> = HashMap::new();
1311 for tx in transactions {
1312 by_source.entry(tx.source_id).or_default().push(tx);
1313 }
1314
1315 let mut splits = Vec::new();
1316 let mut structuring_entities: HashSet<u64> = HashSet::new();
1317
1318 for (source_id, source_txs) in by_source {
1319 if source_txs.len() < config.min_split_count {
1320 continue;
1321 }
1322
1323 let mut sorted_txs: Vec<_> = source_txs.into_iter().collect();
1325 sorted_txs.sort_by_key(|tx| tx.timestamp);
1326
1327 for start_idx in 0..sorted_txs.len() {
1329 let start_tx = sorted_txs[start_idx];
1330 let window_end_time = start_tx.timestamp + config.window_seconds;
1331
1332 let window_txs: Vec<_> = sorted_txs[start_idx..]
1334 .iter()
1335 .take_while(|tx| tx.timestamp <= window_end_time)
1336 .copied()
1337 .collect();
1338
1339 if window_txs.len() < config.min_split_count {
1340 continue;
1341 }
1342
1343 let under_threshold: Vec<_> = window_txs
1345 .iter()
1346 .filter(|tx| tx.amount < config.reporting_threshold)
1347 .copied()
1348 .collect();
1349
1350 if under_threshold.len() < config.min_split_count {
1351 continue;
1352 }
1353
1354 let total_amount: f64 = under_threshold.iter().map(|tx| tx.amount).sum();
1355
1356 if total_amount < config.reporting_threshold * 0.8 {
1358 continue;
1359 }
1360
1361 let amounts: Vec<f64> = under_threshold.iter().map(|tx| tx.amount).collect();
1363 let avg_amount = total_amount / amounts.len() as f64;
1364 let is_uniform = amounts
1365 .iter()
1366 .all(|a| (*a - avg_amount).abs() / avg_amount < 0.3);
1367 let near_threshold = amounts
1368 .iter()
1369 .filter(|a| **a > config.reporting_threshold * 0.7)
1370 .count();
1371
1372 let risk_level = Self::calculate_risk_level(
1374 total_amount,
1375 &amounts,
1376 is_uniform,
1377 near_threshold,
1378 config,
1379 );
1380
1381 if matches!(risk_level, SplitRiskLevel::Normal) {
1382 continue;
1383 }
1384
1385 let time_span = under_threshold
1386 .last()
1387 .map(|tx| tx.timestamp)
1388 .unwrap_or(start_tx.timestamp)
1389 .saturating_sub(start_tx.timestamp);
1390
1391 let dest_entities: Vec<u64> = under_threshold
1392 .iter()
1393 .map(|tx| tx.dest_id)
1394 .collect::<HashSet<_>>()
1395 .into_iter()
1396 .collect();
1397
1398 splits.push(FlowSplitPattern {
1399 source_entity: source_id,
1400 dest_entities,
1401 transaction_ids: under_threshold.iter().map(|tx| tx.id).collect(),
1402 amounts: amounts.clone(),
1403 total_amount,
1404 time_span,
1405 estimated_threshold: config.reporting_threshold,
1406 risk_level,
1407 });
1408
1409 if matches!(risk_level, SplitRiskLevel::High | SplitRiskLevel::Critical) {
1410 structuring_entities.insert(source_id);
1411 }
1412 }
1413 }
1414
1415 let splits = Self::deduplicate_splits(splits);
1417
1418 let split_volume: f64 = splits.iter().map(|s| s.total_amount).sum();
1420 let total_volume: f64 = transactions.iter().map(|tx| tx.amount).sum();
1421 let split_ratio = if total_volume > 0.0 {
1422 split_volume / total_volume
1423 } else {
1424 0.0
1425 };
1426
1427 let critical_count = splits
1429 .iter()
1430 .filter(|s| matches!(s.risk_level, SplitRiskLevel::Critical))
1431 .count();
1432 let high_count = splits
1433 .iter()
1434 .filter(|s| matches!(s.risk_level, SplitRiskLevel::High))
1435 .count();
1436
1437 let risk_score = ((splits.len() as f64 * 5.0)
1438 + (high_count as f64 * 15.0)
1439 + (critical_count as f64 * 25.0)
1440 + (structuring_entities.len() as f64 * 10.0))
1441 .min(100.0);
1442
1443 FlowSplitResult {
1444 splits,
1445 structuring_entities: structuring_entities.into_iter().collect(),
1446 split_volume,
1447 split_ratio,
1448 risk_score,
1449 }
1450 }
1451
1452 fn calculate_risk_level(
1454 total_amount: f64,
1455 amounts: &[f64],
1456 is_uniform: bool,
1457 near_threshold_count: usize,
1458 config: &FlowSplitConfig,
1459 ) -> crate::types::SplitRiskLevel {
1460 use crate::types::SplitRiskLevel;
1461
1462 let threshold = config.reporting_threshold;
1463 let just_under = total_amount >= threshold * 0.95 && total_amount <= threshold * 1.05;
1464 let multiple_near = near_threshold_count >= 3;
1465
1466 match (just_under, multiple_near, is_uniform) {
1467 (true, true, true) => SplitRiskLevel::Critical,
1468 (true, true, _) => SplitRiskLevel::Critical,
1469 (true, _, true) => SplitRiskLevel::High,
1470 (true, _, _) => SplitRiskLevel::High,
1471 (_, true, _) => SplitRiskLevel::High,
1472 _ if amounts.len() >= 5 && total_amount > threshold => SplitRiskLevel::Elevated,
1473 _ => SplitRiskLevel::Normal,
1474 }
1475 }
1476
1477 fn deduplicate_splits(
1479 mut splits: Vec<crate::types::FlowSplitPattern>,
1480 ) -> Vec<crate::types::FlowSplitPattern> {
1481 splits.sort_by(|a, b| {
1483 let risk_ord =
1484 Self::risk_level_ord(&b.risk_level).cmp(&Self::risk_level_ord(&a.risk_level));
1485 if risk_ord == std::cmp::Ordering::Equal {
1486 b.total_amount
1487 .partial_cmp(&a.total_amount)
1488 .unwrap_or(std::cmp::Ordering::Equal)
1489 } else {
1490 risk_ord
1491 }
1492 });
1493
1494 let mut kept: Vec<crate::types::FlowSplitPattern> = Vec::new();
1495 let mut used_tx_ids: HashSet<u64> = HashSet::new();
1496
1497 for split in splits {
1498 let overlap = split
1500 .transaction_ids
1501 .iter()
1502 .filter(|id| used_tx_ids.contains(id))
1503 .count();
1504
1505 if overlap < split.transaction_ids.len() / 2 {
1507 for id in &split.transaction_ids {
1508 used_tx_ids.insert(*id);
1509 }
1510 kept.push(split);
1511 }
1512 }
1513
1514 kept
1515 }
1516
1517 fn risk_level_ord(level: &crate::types::SplitRiskLevel) -> u8 {
1519 use crate::types::SplitRiskLevel;
1520 match level {
1521 SplitRiskLevel::Normal => 0,
1522 SplitRiskLevel::Elevated => 1,
1523 SplitRiskLevel::High => 2,
1524 SplitRiskLevel::Critical => 3,
1525 }
1526 }
1527}
1528
1529impl GpuKernel for FlowSplitRatio {
1530 fn metadata(&self) -> &KernelMetadata {
1531 &self.metadata
1532 }
1533}
1534
1535#[derive(Debug, Clone)]
1537pub struct FlowSplitConfig {
1538 pub reporting_threshold: f64,
1540 pub window_seconds: u64,
1542 pub min_split_count: usize,
1544}
1545
1546impl Default for FlowSplitConfig {
1547 fn default() -> Self {
1548 Self {
1549 reporting_threshold: 10_000.0, window_seconds: 86400, min_split_count: 3,
1552 }
1553 }
1554}
1555
1556#[cfg(test)]
1557mod tests {
1558 use super::*;
1559
1560 fn create_circular_transactions() -> Vec<Transaction> {
1561 vec![
1563 Transaction {
1564 id: 1,
1565 source_id: 1,
1566 dest_id: 2,
1567 amount: 1000.0,
1568 timestamp: 100,
1569 currency: "USD".to_string(),
1570 tx_type: "wire".to_string(),
1571 },
1572 Transaction {
1573 id: 2,
1574 source_id: 2,
1575 dest_id: 3,
1576 amount: 950.0,
1577 timestamp: 200,
1578 currency: "USD".to_string(),
1579 tx_type: "wire".to_string(),
1580 },
1581 Transaction {
1582 id: 3,
1583 source_id: 3,
1584 dest_id: 1,
1585 amount: 900.0,
1586 timestamp: 300,
1587 currency: "USD".to_string(),
1588 tx_type: "wire".to_string(),
1589 },
1590 ]
1591 }
1592
1593 fn create_reciprocal_transactions() -> Vec<Transaction> {
1594 vec![
1595 Transaction {
1596 id: 1,
1597 source_id: 1,
1598 dest_id: 2,
1599 amount: 5000.0,
1600 timestamp: 100,
1601 currency: "USD".to_string(),
1602 tx_type: "wire".to_string(),
1603 },
1604 Transaction {
1605 id: 2,
1606 source_id: 2,
1607 dest_id: 1,
1608 amount: 4800.0,
1609 timestamp: 200,
1610 currency: "USD".to_string(),
1611 tx_type: "wire".to_string(),
1612 },
1613 ]
1614 }
1615
1616 #[test]
1617 fn test_circular_flow_metadata() {
1618 let kernel = CircularFlowRatio::new();
1619 assert_eq!(kernel.metadata().id, "compliance/circular-flow");
1620 assert_eq!(kernel.metadata().domain, Domain::Compliance);
1621 }
1622
1623 #[test]
1624 fn test_circular_flow_detection() {
1625 let txs = create_circular_transactions();
1626 let result = CircularFlowRatio::compute(&txs, 100.0);
1627
1628 assert!(result.circular_ratio > 0.0);
1629 assert!(!result.sccs.is_empty());
1630 assert!(result.circular_amount > 0.0);
1631 }
1632
1633 #[test]
1634 fn test_reciprocity_detection() {
1635 let txs = create_reciprocal_transactions();
1636 let result = ReciprocityFlowRatio::compute(&txs, None, 100.0);
1637
1638 assert!(result.reciprocity_ratio > 0.0);
1639 assert!(!result.reciprocal_pairs.is_empty());
1640 }
1641
1642 #[test]
1643 fn test_rapid_movement_detection() {
1644 let txs: Vec<Transaction> = (0..20)
1646 .map(|i| Transaction {
1647 id: i as u64,
1648 source_id: 1,
1649 dest_id: 2,
1650 amount: 500.0,
1651 timestamp: 100 + i as u64 * 60, currency: "USD".to_string(),
1653 tx_type: "wire".to_string(),
1654 })
1655 .collect();
1656
1657 let result = RapidMovement::compute(&txs, 1.0, 10.0, 1000.0);
1658
1659 assert!(!result.flagged_entities.is_empty());
1660 assert!(result.rapid_amount > 0.0);
1661 }
1662
1663 #[test]
1664 fn test_aml_pattern_funnel() {
1665 let txs: Vec<Transaction> = (0..10)
1667 .map(|i| Transaction {
1668 id: i as u64,
1669 source_id: i as u64 + 100, dest_id: 1, amount: 500.0,
1672 timestamp: 100 + i as u64,
1673 currency: "USD".to_string(),
1674 tx_type: "wire".to_string(),
1675 })
1676 .collect();
1677
1678 let result = AMLPatternDetection::compute(&txs, 10000.0, 24.0);
1679
1680 let funnel_patterns: Vec<_> = result
1681 .patterns
1682 .iter()
1683 .filter(|(p, _)| *p == AMLPattern::FunnelAccount)
1684 .collect();
1685
1686 assert!(!funnel_patterns.is_empty());
1687 }
1688
1689 #[test]
1690 fn test_empty_transactions() {
1691 let empty: Vec<Transaction> = vec![];
1692
1693 let circ = CircularFlowRatio::compute(&empty, 100.0);
1694 assert!(circ.sccs.is_empty());
1695
1696 let recip = ReciprocityFlowRatio::compute(&empty, None, 100.0);
1697 assert!(recip.reciprocal_pairs.is_empty());
1698
1699 let rapid = RapidMovement::compute(&empty, 1.0, 10.0, 1000.0);
1700 assert!(rapid.flagged_entities.is_empty());
1701
1702 let aml = AMLPatternDetection::compute(&empty, 10000.0, 24.0);
1703 assert!(aml.patterns.is_empty());
1704
1705 let reversal = FlowReversalPattern::compute(&empty, &FlowReversalConfig::default());
1706 assert!(reversal.reversals.is_empty());
1707
1708 let split = FlowSplitRatio::compute(&empty, &FlowSplitConfig::default());
1709 assert!(split.splits.is_empty());
1710 }
1711
1712 fn create_reversal_transactions() -> Vec<Transaction> {
1717 vec![
1718 Transaction {
1720 id: 1,
1721 source_id: 1,
1722 dest_id: 2,
1723 amount: 5000.0,
1724 timestamp: 100,
1725 currency: "USD".to_string(),
1726 tx_type: "wire".to_string(),
1727 },
1728 Transaction {
1730 id: 2,
1731 source_id: 2,
1732 dest_id: 1,
1733 amount: 5000.0,
1734 timestamp: 200,
1735 currency: "USD".to_string(),
1736 tx_type: "wire".to_string(),
1737 },
1738 Transaction {
1740 id: 3,
1741 source_id: 3,
1742 dest_id: 4,
1743 amount: 3000.0,
1744 timestamp: 300,
1745 currency: "USD".to_string(),
1746 tx_type: "wire".to_string(),
1747 },
1748 Transaction {
1750 id: 4,
1751 source_id: 4,
1752 dest_id: 3,
1753 amount: 2800.0,
1754 timestamp: 400,
1755 currency: "USD".to_string(),
1756 tx_type: "wire".to_string(),
1757 },
1758 ]
1759 }
1760
1761 #[test]
1762 fn test_flow_reversal_metadata() {
1763 let kernel = FlowReversalPattern::new();
1764 assert_eq!(kernel.metadata().id, "compliance/flow-reversal");
1765 assert_eq!(kernel.metadata().domain, Domain::Compliance);
1766 }
1767
1768 #[test]
1769 fn test_flow_reversal_detection() {
1770 let txs = create_reversal_transactions();
1771 let config = FlowReversalConfig::default();
1772 let result = FlowReversalPattern::compute(&txs, &config);
1773
1774 assert_eq!(result.reversals.len(), 2);
1775 assert!(result.reversal_volume > 0.0);
1776 assert!(result.reversal_ratio > 0.0);
1777 }
1778
1779 #[test]
1780 fn test_flow_reversal_exact_match() {
1781 let txs = vec![
1782 Transaction {
1783 id: 1,
1784 source_id: 1,
1785 dest_id: 2,
1786 amount: 1000.0,
1787 timestamp: 100,
1788 currency: "USD".to_string(),
1789 tx_type: "wire".to_string(),
1790 },
1791 Transaction {
1792 id: 2,
1793 source_id: 2,
1794 dest_id: 1,
1795 amount: 1000.0,
1796 timestamp: 150, currency: "USD".to_string(),
1798 tx_type: "wire".to_string(),
1799 },
1800 ];
1801
1802 let config = FlowReversalConfig {
1803 critical_window_seconds: 100,
1804 suspicious_window_seconds: 600,
1805 ..Default::default()
1806 };
1807 let result = FlowReversalPattern::compute(&txs, &config);
1808
1809 assert_eq!(result.reversals.len(), 1);
1810 assert_eq!(result.reversals[0].amount_match_ratio, 1.0);
1811 assert!(matches!(
1813 result.reversals[0].risk_level,
1814 crate::types::ReversalRiskLevel::Critical
1815 ));
1816 }
1817
1818 #[test]
1819 fn test_flow_reversal_outside_window() {
1820 let txs = vec![
1821 Transaction {
1822 id: 1,
1823 source_id: 1,
1824 dest_id: 2,
1825 amount: 1000.0,
1826 timestamp: 100,
1827 currency: "USD".to_string(),
1828 tx_type: "wire".to_string(),
1829 },
1830 Transaction {
1831 id: 2,
1832 source_id: 2,
1833 dest_id: 1,
1834 amount: 1000.0,
1835 timestamp: 100000, currency: "USD".to_string(),
1837 tx_type: "wire".to_string(),
1838 },
1839 ];
1840
1841 let config = FlowReversalConfig {
1842 max_window_seconds: 3600, ..Default::default()
1844 };
1845 let result = FlowReversalPattern::compute(&txs, &config);
1846
1847 assert!(result.reversals.is_empty());
1848 }
1849
1850 fn create_structuring_transactions() -> Vec<Transaction> {
1855 (0..5)
1858 .map(|i| Transaction {
1859 id: i as u64,
1860 source_id: 1,
1861 dest_id: (i + 10) as u64,
1862 amount: 2500.0,
1863 timestamp: 1000 + i as u64 * 100,
1864 currency: "USD".to_string(),
1865 tx_type: "wire".to_string(),
1866 })
1867 .collect()
1868 }
1869
1870 #[test]
1871 fn test_flow_split_metadata() {
1872 let kernel = FlowSplitRatio::new();
1873 assert_eq!(kernel.metadata().id, "compliance/flow-split");
1874 assert_eq!(kernel.metadata().domain, Domain::Compliance);
1875 }
1876
1877 #[test]
1878 fn test_flow_split_detection() {
1879 let txs = create_structuring_transactions();
1880 let config = FlowSplitConfig::default();
1881 let result = FlowSplitRatio::compute(&txs, &config);
1882
1883 assert!(!result.splits.is_empty());
1885 assert!(result.split_volume > 0.0);
1886 }
1887
1888 #[test]
1889 fn test_flow_split_near_threshold() {
1890 let txs: Vec<Transaction> = (0..4)
1892 .map(|i| Transaction {
1893 id: i as u64,
1894 source_id: 1,
1895 dest_id: (i + 10) as u64,
1896 amount: 2450.0,
1897 timestamp: 1000 + i as u64 * 100,
1898 currency: "USD".to_string(),
1899 tx_type: "wire".to_string(),
1900 })
1901 .collect();
1902
1903 let config = FlowSplitConfig {
1904 reporting_threshold: 10_000.0,
1905 min_split_count: 3,
1906 ..Default::default()
1907 };
1908 let result = FlowSplitRatio::compute(&txs, &config);
1909
1910 assert!(!result.splits.is_empty());
1912
1913 if !result.structuring_entities.is_empty() {
1915 assert!(result.structuring_entities.contains(&1));
1916 }
1917 }
1918
1919 #[test]
1920 fn test_flow_split_below_threshold() {
1921 let txs: Vec<Transaction> = (0..2)
1923 .map(|i| Transaction {
1924 id: i as u64,
1925 source_id: 1,
1926 dest_id: (i + 10) as u64,
1927 amount: 500.0, timestamp: 1000 + i as u64 * 100,
1929 currency: "USD".to_string(),
1930 tx_type: "wire".to_string(),
1931 })
1932 .collect();
1933
1934 let config = FlowSplitConfig::default();
1935 let result = FlowSplitRatio::compute(&txs, &config);
1936
1937 assert!(result.splits.is_empty());
1939 }
1940
1941 #[test]
1942 fn test_flow_split_risk_levels() {
1943 let txs: Vec<Transaction> = (0..4)
1945 .map(|i| Transaction {
1946 id: i as u64,
1947 source_id: 1,
1948 dest_id: (i + 10) as u64,
1949 amount: 2500.0, timestamp: 1000 + i as u64 * 100,
1951 currency: "USD".to_string(),
1952 tx_type: "wire".to_string(),
1953 })
1954 .collect();
1955
1956 let config = FlowSplitConfig {
1957 reporting_threshold: 10_000.0,
1958 min_split_count: 3,
1959 ..Default::default()
1960 };
1961 let result = FlowSplitRatio::compute(&txs, &config);
1962
1963 if !result.splits.is_empty() {
1965 let has_high_risk = result.splits.iter().any(|s| {
1966 matches!(
1967 s.risk_level,
1968 crate::types::SplitRiskLevel::High | crate::types::SplitRiskLevel::Critical
1969 )
1970 });
1971 assert!(has_high_risk);
1972 }
1973 }
1974}