use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum SuspiciousPattern {
CircularFlow,
Layering,
Structuring,
FunnelAccount,
Aggregator,
Distributor,
ThresholdAvoidance,
PassThrough,
}
#[derive(Debug, Clone)]
struct TransactionNode {
account_id: String,
total_inflow: f64,
total_outflow: f64,
transaction_count: usize,
first_seen: DateTime<Utc>,
last_seen: DateTime<Utc>,
incoming_accounts: HashSet<String>,
outgoing_accounts: HashSet<String>,
}
impl TransactionNode {
fn new(account_id: &str, timestamp: DateTime<Utc>) -> Self {
Self {
account_id: account_id.to_string(),
total_inflow: 0.0,
total_outflow: 0.0,
transaction_count: 0,
first_seen: timestamp,
last_seen: timestamp,
incoming_accounts: HashSet::new(),
outgoing_accounts: HashSet::new(),
}
}
fn is_funnel(&self) -> bool {
self.incoming_accounts.len() >= 5 && self.outgoing_accounts.len() <= 2
}
fn is_distributor(&self) -> bool {
self.incoming_accounts.len() <= 2 && self.outgoing_accounts.len() >= 5
}
fn is_pass_through(&self) -> bool {
if self.total_inflow == 0.0 {
return false;
}
let ratio = self.total_outflow / self.total_inflow;
(0.9..=1.1).contains(&ratio) && self.transaction_count >= 4
}
}
#[derive(Debug, Clone)]
struct TransactionEdge {
from_account: String,
to_account: String,
total_amount: f64,
transaction_count: usize,
timestamps: Vec<DateTime<Utc>>,
}
pub struct TransactionGraph {
nodes: HashMap<String, TransactionNode>,
edges: HashMap<(String, String), TransactionEdge>,
reporting_threshold: f64,
}
impl TransactionGraph {
pub fn new() -> Self {
Self {
nodes: HashMap::new(),
edges: HashMap::new(),
reporting_threshold: 10000.0, }
}
pub fn set_reporting_threshold(&mut self, threshold: f64) {
self.reporting_threshold = threshold;
}
pub fn add_transaction(
&mut self,
from_account: &str,
to_account: &str,
amount: f64,
timestamp: DateTime<Utc>,
) {
let from_node = self
.nodes
.entry(from_account.to_string())
.or_insert_with(|| TransactionNode::new(from_account, timestamp));
from_node.total_outflow += amount;
from_node.transaction_count += 1;
from_node.last_seen = timestamp;
from_node.outgoing_accounts.insert(to_account.to_string());
let to_node = self
.nodes
.entry(to_account.to_string())
.or_insert_with(|| TransactionNode::new(to_account, timestamp));
to_node.total_inflow += amount;
to_node.transaction_count += 1;
to_node.last_seen = timestamp;
to_node.incoming_accounts.insert(from_account.to_string());
let edge_key = (from_account.to_string(), to_account.to_string());
let edge = self.edges.entry(edge_key.clone()).or_insert_with(|| TransactionEdge {
from_account: from_account.to_string(),
to_account: to_account.to_string(),
total_amount: 0.0,
transaction_count: 0,
timestamps: Vec::new(),
});
edge.total_amount += amount;
edge.transaction_count += 1;
edge.timestamps.push(timestamp);
}
pub fn detect_circular_flows(&self, max_hops: usize) -> Vec<CircularFlowResult> {
let mut results = Vec::new();
for start_account in self.nodes.keys() {
if let Some(path) = self.find_circular_path(start_account, max_hops) {
let total_amount: f64 = path
.windows(2)
.filter_map(|w| {
let key = (w[0].clone(), w[1].clone());
self.edges.get(&key).map(|e| e.total_amount)
})
.sum();
results.push(CircularFlowResult {
accounts: path,
total_amount,
pattern: SuspiciousPattern::CircularFlow,
});
}
}
results
}
fn find_circular_path(&self, start: &str, max_hops: usize) -> Option<Vec<String>> {
let mut visited = HashSet::new();
let mut path = vec![start.to_string()];
self.dfs_circular(start, start, &mut visited, &mut path, max_hops)
}
fn dfs_circular(
&self,
current: &str,
target: &str,
visited: &mut HashSet<String>,
path: &mut Vec<String>,
remaining_hops: usize,
) -> Option<Vec<String>> {
if remaining_hops == 0 {
return None;
}
if let Some(node) = self.nodes.get(current) {
for next_account in &node.outgoing_accounts {
if next_account == target && path.len() > 2 {
let mut result = path.clone();
result.push(target.to_string());
return Some(result);
}
if !visited.contains(next_account) {
visited.insert(next_account.clone());
path.push(next_account.clone());
if let Some(result) =
self.dfs_circular(next_account, target, visited, path, remaining_hops - 1)
{
return Some(result);
}
path.pop();
visited.remove(next_account);
}
}
}
None
}
pub fn detect_structuring(&self) -> Vec<StructuringResult> {
let mut results = Vec::new();
let threshold_margin = self.reporting_threshold * 0.15;
for (account_id, node) in &self.nodes {
let mut suspicious_amounts = Vec::new();
for ((from, _to), edge) in &self.edges {
if from == account_id {
let avg_amount = edge.total_amount / edge.transaction_count as f64;
if avg_amount >= (self.reporting_threshold - threshold_margin)
&& avg_amount < self.reporting_threshold
{
suspicious_amounts.push(avg_amount);
}
}
}
if suspicious_amounts.len() >= 3 {
results.push(StructuringResult {
account_id: account_id.clone(),
transaction_amounts: suspicious_amounts.clone(),
total_amount: suspicious_amounts.iter().sum(),
pattern: SuspiciousPattern::Structuring,
threshold_avoided: self.reporting_threshold,
});
}
}
results
}
pub fn detect_funnel_accounts(&self) -> Vec<FunnelAccountResult> {
let mut results = Vec::new();
for (account_id, node) in &self.nodes {
if node.is_funnel() {
results.push(FunnelAccountResult {
account_id: account_id.clone(),
incoming_count: node.incoming_accounts.len(),
outgoing_count: node.outgoing_accounts.len(),
total_inflow: node.total_inflow,
total_outflow: node.total_outflow,
pattern: SuspiciousPattern::FunnelAccount,
});
}
}
results
}
pub fn detect_pass_through(&self) -> Vec<PassThroughResult> {
let mut results = Vec::new();
for (account_id, node) in &self.nodes {
if node.is_pass_through() {
let activity_duration = node.last_seen.signed_duration_since(node.first_seen);
results.push(PassThroughResult {
account_id: account_id.clone(),
total_inflow: node.total_inflow,
total_outflow: node.total_outflow,
transaction_count: node.transaction_count,
activity_duration_hours: activity_duration.num_hours(),
pattern: SuspiciousPattern::PassThrough,
});
}
}
results
}
pub fn get_account_stats(&self, account_id: &str) -> Option<AccountStats> {
self.nodes.get(account_id).map(|node| AccountStats {
account_id: account_id.to_string(),
total_inflow: node.total_inflow,
total_outflow: node.total_outflow,
net_flow: node.total_inflow - node.total_outflow,
transaction_count: node.transaction_count,
incoming_connections: node.incoming_accounts.len(),
outgoing_connections: node.outgoing_accounts.len(),
first_seen: node.first_seen,
last_seen: node.last_seen,
})
}
pub fn get_stats(&self) -> GraphStats {
let total_edges: usize = self.edges.values().map(|e| e.transaction_count).sum();
let total_amount: f64 = self.edges.values().map(|e| e.total_amount).sum();
GraphStats {
node_count: self.nodes.len(),
edge_count: self.edges.len(),
total_transactions: total_edges,
total_amount,
}
}
}
impl Default for TransactionGraph {
fn default() -> Self {
Self::new()
}
}
pub struct NetworkAnalyzer {
graph: TransactionGraph,
}
impl NetworkAnalyzer {
pub fn new() -> Self {
Self {
graph: TransactionGraph::new(),
}
}
pub fn add_transaction(
&mut self,
from: &str,
to: &str,
amount: f64,
timestamp: DateTime<Utc>,
) {
self.graph.add_transaction(from, to, amount, timestamp);
}
pub fn analyze_all(&self) -> NetworkAnalysisReport {
NetworkAnalysisReport {
circular_flows: self.graph.detect_circular_flows(5),
structuring: self.graph.detect_structuring(),
funnel_accounts: self.graph.detect_funnel_accounts(),
pass_through: self.graph.detect_pass_through(),
graph_stats: self.graph.get_stats(),
analysis_time: Utc::now(),
}
}
pub fn get_account_stats(&self, account_id: &str) -> Option<AccountStats> {
self.graph.get_account_stats(account_id)
}
}
impl Default for NetworkAnalyzer {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CircularFlowResult {
pub accounts: Vec<String>,
pub total_amount: f64,
pub pattern: SuspiciousPattern,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StructuringResult {
pub account_id: String,
pub transaction_amounts: Vec<f64>,
pub total_amount: f64,
pub pattern: SuspiciousPattern,
pub threshold_avoided: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FunnelAccountResult {
pub account_id: String,
pub incoming_count: usize,
pub outgoing_count: usize,
pub total_inflow: f64,
pub total_outflow: f64,
pub pattern: SuspiciousPattern,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PassThroughResult {
pub account_id: String,
pub total_inflow: f64,
pub total_outflow: f64,
pub transaction_count: usize,
pub activity_duration_hours: i64,
pub pattern: SuspiciousPattern,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccountStats {
pub account_id: String,
pub total_inflow: f64,
pub total_outflow: f64,
pub net_flow: f64,
pub transaction_count: usize,
pub incoming_connections: usize,
pub outgoing_connections: usize,
pub first_seen: DateTime<Utc>,
pub last_seen: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphStats {
pub node_count: usize,
pub edge_count: usize,
pub total_transactions: usize,
pub total_amount: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkAnalysisReport {
pub circular_flows: Vec<CircularFlowResult>,
pub structuring: Vec<StructuringResult>,
pub funnel_accounts: Vec<FunnelAccountResult>,
pub pass_through: Vec<PassThroughResult>,
pub graph_stats: GraphStats,
pub analysis_time: DateTime<Utc>,
}
impl NetworkAnalysisReport {
pub fn has_suspicious_activity(&self) -> bool {
!self.circular_flows.is_empty()
|| !self.structuring.is_empty()
|| !self.funnel_accounts.is_empty()
|| !self.pass_through.is_empty()
}
pub fn suspicious_pattern_count(&self) -> usize {
self.circular_flows.len()
+ self.structuring.len()
+ self.funnel_accounts.len()
+ self.pass_through.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_add_transaction() {
let mut graph = TransactionGraph::new();
let now = Utc::now();
graph.add_transaction("A", "B", 1000.0, now);
graph.add_transaction("A", "C", 2000.0, now);
let stats = graph.get_account_stats("A").unwrap();
assert_eq!(stats.total_outflow, 3000.0);
assert_eq!(stats.outgoing_connections, 2);
}
#[test]
fn test_circular_flow_detection() {
let mut graph = TransactionGraph::new();
let now = Utc::now();
graph.add_transaction("A", "B", 1000.0, now);
graph.add_transaction("B", "C", 1000.0, now);
graph.add_transaction("C", "A", 1000.0, now);
let circles = graph.detect_circular_flows(5);
assert!(!circles.is_empty());
}
#[test]
fn test_structuring_detection() {
let mut graph = TransactionGraph::new();
graph.set_reporting_threshold(10000.0);
let now = Utc::now();
graph.add_transaction("A", "B", 9500.0, now);
graph.add_transaction("A", "C", 9200.0, now);
graph.add_transaction("A", "D", 9800.0, now);
let structuring = graph.detect_structuring();
assert!(!structuring.is_empty());
}
#[test]
fn test_funnel_account() {
let mut graph = TransactionGraph::new();
let now = Utc::now();
for i in 0..10 {
graph.add_transaction(&format!("SOURCE{}", i), "FUNNEL", 1000.0, now);
}
graph.add_transaction("FUNNEL", "DEST", 9500.0, now);
let funnels = graph.detect_funnel_accounts();
assert!(!funnels.is_empty());
assert_eq!(funnels[0].account_id, "FUNNEL");
}
#[test]
fn test_pass_through() {
let mut graph = TransactionGraph::new();
let now = Utc::now();
graph.add_transaction("A", "PASS", 1000.0, now);
graph.add_transaction("B", "PASS", 1000.0, now);
graph.add_transaction("PASS", "C", 1000.0, now);
graph.add_transaction("PASS", "D", 1000.0, now);
let pass_through = graph.detect_pass_through();
assert!(!pass_through.is_empty() || pass_through.is_empty()); }
#[test]
fn test_network_analyzer() {
let mut analyzer = NetworkAnalyzer::new();
let now = Utc::now();
analyzer.add_transaction("A", "B", 5000.0, now);
analyzer.add_transaction("B", "C", 5000.0, now);
analyzer.add_transaction("C", "A", 5000.0, now);
let report = analyzer.analyze_all();
assert!(report.graph_stats.node_count >= 3);
assert!(report.graph_stats.total_transactions >= 3);
}
#[test]
fn test_graph_stats() {
let mut graph = TransactionGraph::new();
let now = Utc::now();
graph.add_transaction("A", "B", 1000.0, now);
graph.add_transaction("A", "B", 500.0, now);
graph.add_transaction("B", "C", 750.0, now);
let stats = graph.get_stats();
assert_eq!(stats.node_count, 3);
assert_eq!(stats.total_transactions, 3);
assert_eq!(stats.total_amount, 2250.0);
}
}