Skip to main content

datasynth_graph/builders/
banking_graph.rs

1//! Banking network graph builder.
2//!
3//! Builds graphs for AML/KYC analysis where:
4//! - Nodes are customers, accounts, and counterparties
5//! - Edges are transactions and ownership relationships
6//! - Ground truth labels identify suspicious activity
7
8use std::collections::HashMap;
9
10use chrono::{Datelike, Timelike};
11
12use datasynth_banking::models::{BankAccount, BankTransaction, BankingCustomer, CounterpartyPool};
13use datasynth_core::models::banking::{
14    AmlTypology, CashIntensity, Direction, RiskTier, TransactionChannel, TurnoverBand,
15};
16
17use crate::models::{EdgeType, Graph, GraphEdge, GraphNode, GraphType, NodeId, NodeType};
18
19/// Configuration for banking graph building.
20#[derive(Debug, Clone)]
21pub struct BankingGraphConfig {
22    /// Include customer nodes.
23    pub include_customers: bool,
24    /// Include account nodes.
25    pub include_accounts: bool,
26    /// Include counterparty nodes.
27    pub include_counterparties: bool,
28    /// Include beneficial ownership edges.
29    pub include_beneficial_ownership: bool,
30    /// Create transaction edges.
31    pub create_transaction_edges: bool,
32    /// Minimum transaction amount to include as edge.
33    pub min_transaction_amount: f64,
34    /// Aggregate parallel edges between same nodes.
35    pub aggregate_parallel_edges: bool,
36    /// Include temporal features.
37    pub include_temporal_features: bool,
38    /// Include risk features.
39    pub include_risk_features: bool,
40}
41
42impl Default for BankingGraphConfig {
43    fn default() -> Self {
44        Self {
45            include_customers: true,
46            include_accounts: true,
47            include_counterparties: true,
48            include_beneficial_ownership: true,
49            create_transaction_edges: true,
50            min_transaction_amount: 0.0,
51            aggregate_parallel_edges: false,
52            include_temporal_features: true,
53            include_risk_features: true,
54        }
55    }
56}
57
58/// Builder for banking network graphs.
59pub struct BankingGraphBuilder {
60    config: BankingGraphConfig,
61    graph: Graph,
62    /// Map from customer ID to node ID.
63    customer_nodes: HashMap<String, NodeId>,
64    /// Map from account ID to node ID.
65    account_nodes: HashMap<String, NodeId>,
66    /// Map from counterparty name to node ID.
67    counterparty_nodes: HashMap<String, NodeId>,
68    /// For edge aggregation: (source, target) -> aggregated data.
69    edge_aggregation: HashMap<(NodeId, NodeId), AggregatedBankingEdge>,
70}
71
72impl BankingGraphBuilder {
73    /// Creates a new banking graph builder.
74    pub fn new(config: BankingGraphConfig) -> Self {
75        Self {
76            config,
77            graph: Graph::new("banking_network", GraphType::Custom("banking".to_string())),
78            customer_nodes: HashMap::new(),
79            account_nodes: HashMap::new(),
80            counterparty_nodes: HashMap::new(),
81            edge_aggregation: HashMap::new(),
82        }
83    }
84
85    /// Adds customers to the graph.
86    pub fn add_customers(&mut self, customers: &[BankingCustomer]) {
87        if !self.config.include_customers {
88            return;
89        }
90
91        for customer in customers {
92            self.add_customer(customer);
93        }
94    }
95
96    /// Adds a single customer to the graph.
97    fn add_customer(&mut self, customer: &BankingCustomer) -> NodeId {
98        let key = customer.customer_id.to_string();
99
100        if let Some(&id) = self.customer_nodes.get(&key) {
101            return id;
102        }
103
104        let mut node = GraphNode::new(
105            0,
106            NodeType::Customer,
107            key.clone(),
108            customer.name.display_name().to_string(),
109        );
110
111        // Add categorical features
112        node.categorical_features.insert(
113            "customer_type".to_string(),
114            format!("{:?}", customer.customer_type),
115        );
116        node.categorical_features.insert(
117            "residence_country".to_string(),
118            customer.residence_country.clone(),
119        );
120        node.categorical_features
121            .insert("risk_tier".to_string(), format!("{:?}", customer.risk_tier));
122
123        // Add risk features
124        if self.config.include_risk_features {
125            // Risk tier encoding (0=Low to 1=Highest)
126            let risk_score = match customer.risk_tier {
127                RiskTier::Low => 0.0,
128                RiskTier::Medium => 0.33,
129                RiskTier::High => 0.67,
130                RiskTier::VeryHigh | RiskTier::Prohibited => 1.0,
131            };
132            node.features.push(risk_score);
133
134            // PEP indicator
135            node.features.push(if customer.is_pep { 1.0 } else { 0.0 });
136
137            // Number of accounts
138            node.features.push(customer.account_ids.len() as f64);
139
140            // KYC profile features
141            let kyc = &customer.kyc_profile;
142
143            // Expected monthly turnover encoding
144            let turnover_band = match kyc.expected_monthly_turnover {
145                TurnoverBand::VeryLow => 1.0,
146                TurnoverBand::Low => 2.0,
147                TurnoverBand::Medium => 3.0,
148                TurnoverBand::High => 4.0,
149                TurnoverBand::VeryHigh => 5.0,
150                TurnoverBand::UltraHigh => 6.0,
151            };
152            node.features.push(turnover_band);
153
154            // Cash intensity
155            let cash_intensity: f64 = match kyc.cash_intensity {
156                CashIntensity::VeryLow => 0.0,
157                CashIntensity::Low => 0.25,
158                CashIntensity::Moderate => 0.5,
159                CashIntensity::High => 0.75,
160                CashIntensity::VeryHigh => 1.0,
161            };
162            node.features.push(cash_intensity);
163        }
164
165        // Mark anomaly if customer is suspicious
166        if customer.is_mule {
167            node = node.as_anomaly("money_mule");
168            node.labels.push("mule".to_string());
169        }
170
171        let id = self.graph.add_node(node);
172        self.customer_nodes.insert(key, id);
173        id
174    }
175
176    /// Adds accounts to the graph.
177    pub fn add_accounts(&mut self, accounts: &[BankAccount], customers: &[BankingCustomer]) {
178        if !self.config.include_accounts {
179            return;
180        }
181
182        // Build customer lookup
183        let customer_map: HashMap<_, _> = customers.iter().map(|c| (c.customer_id, c)).collect();
184
185        for account in accounts {
186            let account_id = self.add_account(account);
187
188            // Create ownership edge from customer to account
189            if let Some(customer) = customer_map.get(&account.primary_owner_id) {
190                let customer_id = self.add_customer(customer);
191
192                let edge = GraphEdge::new(0, customer_id, account_id, EdgeType::Ownership)
193                    .with_weight(1.0)
194                    .with_property(
195                        "relationship",
196                        crate::models::EdgeProperty::String("account_owner".to_string()),
197                    );
198
199                self.graph.add_edge(edge);
200            }
201        }
202    }
203
204    /// Adds a single account to the graph.
205    fn add_account(&mut self, account: &BankAccount) -> NodeId {
206        let key = account.account_id.to_string();
207
208        if let Some(&id) = self.account_nodes.get(&key) {
209            return id;
210        }
211
212        let mut node = GraphNode::new(
213            0,
214            NodeType::Account,
215            key.clone(),
216            format!("{:?} - {}", account.account_type, account.account_number),
217        );
218
219        // Add categorical features
220        node.categorical_features.insert(
221            "account_type".to_string(),
222            format!("{:?}", account.account_type),
223        );
224        node.categorical_features
225            .insert("currency".to_string(), account.currency.clone());
226        node.categorical_features
227            .insert("status".to_string(), format!("{:?}", account.status));
228
229        // Add numeric features
230        if self.config.include_risk_features {
231            // Balance (log-scaled)
232            let balance: f64 = account.current_balance.try_into().unwrap_or(0.0);
233            node.features.push((balance.abs() + 1.0).ln());
234
235            // Overdraft limit (log-scaled)
236            let limit: f64 = account.overdraft_limit.try_into().unwrap_or(0.0);
237            node.features.push((limit + 1.0).ln());
238
239            // Has debit card
240            node.features.push(if account.features.debit_card {
241                1.0
242            } else {
243                0.0
244            });
245
246            // Has international capability
247            node.features
248                .push(if account.features.international_transfers {
249                    1.0
250                } else {
251                    0.0
252                });
253        }
254
255        let id = self.graph.add_node(node);
256        self.account_nodes.insert(key, id);
257        id
258    }
259
260    /// Adds counterparties to the graph.
261    pub fn add_counterparties(&mut self, pool: &CounterpartyPool) {
262        if !self.config.include_counterparties {
263            return;
264        }
265
266        for merchant in &pool.merchants {
267            self.add_counterparty_node(
268                &merchant.name,
269                "merchant",
270                Some(&format!("{:?}", merchant.mcc)),
271            );
272        }
273
274        for employer in &pool.employers {
275            let industry = employer.industry_code.as_deref().unwrap_or("Unknown");
276            self.add_counterparty_node(&employer.name, "employer", Some(industry));
277        }
278
279        for utility in &pool.utilities {
280            self.add_counterparty_node(
281                &utility.name,
282                "utility",
283                Some(&format!("{:?}", utility.utility_type)),
284            );
285        }
286    }
287
288    /// Adds a counterparty node.
289    fn add_counterparty_node(
290        &mut self,
291        name: &str,
292        cp_type: &str,
293        category: Option<&str>,
294    ) -> NodeId {
295        let key = format!("{}_{}", cp_type, name);
296
297        if let Some(&id) = self.counterparty_nodes.get(&key) {
298            return id;
299        }
300
301        let mut node = GraphNode::new(
302            0,
303            NodeType::Custom("Counterparty".to_string()),
304            key.clone(),
305            name.to_string(),
306        );
307
308        node.categorical_features
309            .insert("counterparty_type".to_string(), cp_type.to_string());
310
311        if let Some(cat) = category {
312            node.categorical_features
313                .insert("category".to_string(), cat.to_string());
314        }
315
316        let id = self.graph.add_node(node);
317        self.counterparty_nodes.insert(key, id);
318        id
319    }
320
321    /// Adds transactions to the graph.
322    pub fn add_transactions(&mut self, transactions: &[BankTransaction]) {
323        if !self.config.create_transaction_edges {
324            return;
325        }
326
327        for txn in transactions {
328            self.add_transaction(txn);
329        }
330    }
331
332    /// Adds a single transaction to the graph.
333    fn add_transaction(&mut self, txn: &BankTransaction) {
334        let amount: f64 = txn.amount.try_into().unwrap_or(0.0);
335        if amount < self.config.min_transaction_amount {
336            return;
337        }
338
339        // Get or create account node
340        let account_key = txn.account_id.to_string();
341        let account_node = *self.account_nodes.get(&account_key).unwrap_or(&0);
342        if account_node == 0 {
343            return; // Account not in graph
344        }
345
346        // Get or create counterparty node
347        let cp_key = format!("counterparty_{}", txn.counterparty.name);
348        let counterparty_node = if let Some(&id) = self.counterparty_nodes.get(&cp_key) {
349            id
350        } else {
351            self.add_counterparty_node(
352                &txn.counterparty.name,
353                &format!("{:?}", txn.counterparty.counterparty_type),
354                None,
355            )
356        };
357
358        // Determine edge direction based on transaction direction
359        let (source, target) = match txn.direction {
360            Direction::Inbound => (counterparty_node, account_node),
361            Direction::Outbound => (account_node, counterparty_node),
362        };
363
364        if self.config.aggregate_parallel_edges {
365            self.aggregate_transaction_edge(source, target, txn);
366        } else {
367            let edge = self.create_transaction_edge(source, target, txn);
368            self.graph.add_edge(edge);
369        }
370    }
371
372    /// Creates a transaction edge with features.
373    fn create_transaction_edge(
374        &self,
375        source: NodeId,
376        target: NodeId,
377        txn: &BankTransaction,
378    ) -> GraphEdge {
379        let amount: f64 = txn.amount.try_into().unwrap_or(0.0);
380
381        let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
382            .with_weight(amount)
383            .with_timestamp(txn.timestamp_initiated.date_naive());
384
385        // Add transaction properties
386        edge.properties.insert(
387            "transaction_id".to_string(),
388            crate::models::EdgeProperty::String(txn.transaction_id.to_string()),
389        );
390        edge.properties.insert(
391            "channel".to_string(),
392            crate::models::EdgeProperty::String(format!("{:?}", txn.channel)),
393        );
394        edge.properties.insert(
395            "category".to_string(),
396            crate::models::EdgeProperty::String(format!("{:?}", txn.category)),
397        );
398
399        // Add numeric features
400        // Log amount
401        edge.features.push((amount + 1.0).ln());
402
403        // Direction encoding
404        edge.features.push(match txn.direction {
405            Direction::Inbound => 1.0,
406            Direction::Outbound => 0.0,
407        });
408
409        // Channel encoding
410        let channel_code = match txn.channel {
411            TransactionChannel::CardPresent => 0.0,
412            TransactionChannel::CardNotPresent => 1.0,
413            TransactionChannel::Ach => 2.0,
414            TransactionChannel::Wire => 3.0,
415            TransactionChannel::Cash => 4.0,
416            TransactionChannel::Atm => 5.0,
417            TransactionChannel::Branch => 6.0,
418            TransactionChannel::Mobile => 7.0,
419            TransactionChannel::Online => 8.0,
420            TransactionChannel::Swift => 9.0,
421            TransactionChannel::InternalTransfer => 10.0,
422            TransactionChannel::Check => 11.0,
423            TransactionChannel::RealTimePayment => 12.0,
424            TransactionChannel::PeerToPeer => 13.0,
425        };
426        edge.features.push(channel_code / 13.0); // Normalized
427
428        // Temporal features
429        if self.config.include_temporal_features {
430            let weekday = txn.timestamp_initiated.weekday().num_days_from_monday() as f64;
431            edge.features.push(weekday / 6.0);
432
433            let hour = txn.timestamp_initiated.hour() as f64;
434            edge.features.push(hour / 23.0);
435
436            let day = txn.timestamp_initiated.day() as f64;
437            edge.features.push(day / 31.0);
438
439            let month = txn.timestamp_initiated.month() as f64;
440            edge.features.push(month / 12.0);
441
442            // Is weekend
443            edge.features.push(if weekday >= 5.0 { 1.0 } else { 0.0 });
444
445            // Is off-hours (before 7am or after 10pm)
446            let is_off_hours = !(7.0..=22.0).contains(&hour);
447            edge.features.push(if is_off_hours { 1.0 } else { 0.0 });
448        }
449
450        // Risk features
451        if self.config.include_risk_features {
452            // Is cash transaction
453            edge.features.push(if txn.is_cash() { 1.0 } else { 0.0 });
454
455            // Is cross-border
456            edge.features
457                .push(if txn.is_cross_border() { 1.0 } else { 0.0 });
458
459            // Risk score from transaction
460            edge.features
461                .push(txn.calculate_risk_score() as f64 / 100.0);
462        }
463
464        // Ground truth labels
465        if txn.is_suspicious {
466            edge = edge.as_anomaly(&format!(
467                "{:?}",
468                txn.suspicion_reason.unwrap_or(AmlTypology::Structuring)
469            ));
470
471            if let Some(typology) = txn.suspicion_reason {
472                edge.labels.push(format!("{:?}", typology));
473            }
474
475            if let Some(stage) = txn.laundering_stage {
476                edge.labels.push(format!("{:?}", stage));
477            }
478
479            if txn.is_spoofed {
480                edge.labels.push("spoofed".to_string());
481            }
482        }
483
484        edge
485    }
486
487    /// Aggregates transaction edges between same source and target.
488    fn aggregate_transaction_edge(
489        &mut self,
490        source: NodeId,
491        target: NodeId,
492        txn: &BankTransaction,
493    ) {
494        let key = (source, target);
495        let amount: f64 = txn.amount.try_into().unwrap_or(0.0);
496        let date = txn.timestamp_initiated.date_naive();
497
498        let agg = self
499            .edge_aggregation
500            .entry(key)
501            .or_insert(AggregatedBankingEdge {
502                total_amount: 0.0,
503                count: 0,
504                suspicious_count: 0,
505                first_date: date,
506                last_date: date,
507                channels: HashMap::new(),
508            });
509
510        agg.total_amount += amount;
511        agg.count += 1;
512
513        if txn.is_suspicious {
514            agg.suspicious_count += 1;
515        }
516
517        if date < agg.first_date {
518            agg.first_date = date;
519        }
520        if date > agg.last_date {
521            agg.last_date = date;
522        }
523
524        let channel = format!("{:?}", txn.channel);
525        *agg.channels.entry(channel).or_insert(0) += 1;
526    }
527
528    /// Builds the final graph.
529    pub fn build(mut self) -> Graph {
530        // If aggregating, create the aggregated edges now
531        if self.config.aggregate_parallel_edges {
532            for ((source, target), agg) in self.edge_aggregation {
533                let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
534                    .with_weight(agg.total_amount)
535                    .with_timestamp(agg.last_date);
536
537                // Aggregation features
538                edge.features.push((agg.total_amount + 1.0).ln());
539                edge.features.push(agg.count as f64);
540                edge.features.push(agg.suspicious_count as f64);
541                edge.features
542                    .push(agg.suspicious_count as f64 / agg.count.max(1) as f64);
543
544                let duration = (agg.last_date - agg.first_date).num_days() as f64;
545                edge.features.push(duration);
546
547                // Average amount per transaction
548                edge.features
549                    .push(agg.total_amount / agg.count.max(1) as f64);
550
551                // Transaction frequency (per day)
552                edge.features.push(agg.count as f64 / duration.max(1.0));
553
554                // Number of unique channels
555                edge.features.push(agg.channels.len() as f64);
556
557                // Mark as anomaly if any suspicious transactions
558                if agg.suspicious_count > 0 {
559                    edge = edge.as_anomaly("suspicious_link");
560                }
561
562                self.graph.add_edge(edge);
563            }
564        }
565
566        self.graph.compute_statistics();
567        self.graph
568    }
569}
570
571/// Aggregated banking edge data for combining multiple transactions between the same accounts.
572struct AggregatedBankingEdge {
573    total_amount: f64,
574    count: usize,
575    suspicious_count: usize,
576    first_date: chrono::NaiveDate,
577    last_date: chrono::NaiveDate,
578    channels: HashMap<String, usize>,
579}
580
581#[cfg(test)]
582#[allow(clippy::unwrap_used)]
583mod tests {
584    use super::*;
585    use chrono::NaiveDate;
586    use datasynth_banking::models::CounterpartyRef;
587    use datasynth_core::models::banking::{
588        BankAccountType, TransactionCategory, TransactionChannel,
589    };
590    use rust_decimal::Decimal;
591    use uuid::Uuid;
592
593    fn create_test_customer() -> BankingCustomer {
594        BankingCustomer::new_retail(
595            Uuid::new_v4(),
596            "John",
597            "Doe",
598            "US",
599            NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
600        )
601    }
602
603    fn create_test_account(customer: &BankingCustomer) -> BankAccount {
604        BankAccount::new(
605            Uuid::new_v4(),
606            "****1234".to_string(),
607            BankAccountType::Checking,
608            customer.customer_id,
609            "USD",
610            NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
611        )
612    }
613
614    fn create_test_transaction(account: &BankAccount) -> BankTransaction {
615        BankTransaction::new(
616            Uuid::new_v4(),
617            account.account_id,
618            Decimal::from(1000),
619            "USD",
620            Direction::Outbound,
621            TransactionChannel::CardPresent,
622            TransactionCategory::Shopping,
623            CounterpartyRef::merchant(Uuid::new_v4(), "Test Store"),
624            "Test purchase",
625            chrono::Utc::now(),
626        )
627    }
628
629    #[test]
630    fn test_build_banking_graph() {
631        let customer = create_test_customer();
632        let account = create_test_account(&customer);
633        let txn = create_test_transaction(&account);
634
635        let mut builder = BankingGraphBuilder::new(BankingGraphConfig::default());
636        builder.add_customers(std::slice::from_ref(&customer));
637        builder.add_accounts(
638            std::slice::from_ref(&account),
639            std::slice::from_ref(&customer),
640        );
641        builder.add_transactions(std::slice::from_ref(&txn));
642
643        let graph = builder.build();
644
645        // Should have customer, account, and counterparty nodes
646        assert!(graph.node_count() >= 2);
647        // Should have ownership and transaction edges
648        assert!(graph.edge_count() >= 1);
649    }
650
651    #[test]
652    fn test_suspicious_transaction_labels() {
653        let customer = create_test_customer();
654        let account = create_test_account(&customer);
655        let mut txn = create_test_transaction(&account);
656
657        // Mark as suspicious
658        txn = txn.mark_suspicious(AmlTypology::Structuring, "CASE-001");
659
660        let mut builder = BankingGraphBuilder::new(BankingGraphConfig::default());
661        builder.add_customers(std::slice::from_ref(&customer));
662        builder.add_accounts(
663            std::slice::from_ref(&account),
664            std::slice::from_ref(&customer),
665        );
666        builder.add_transactions(std::slice::from_ref(&txn));
667
668        let graph = builder.build();
669
670        // Check that suspicious edge exists
671        let suspicious_edges = graph.anomalous_edges();
672        assert!(!suspicious_edges.is_empty());
673    }
674}