Skip to main content

datasynth_graph/builders/
banking_graph.rs

1//! Banking network graph builder.
2//!
3//! Builds graphs for AML/KYC analysis where:
4//! - Nodes are customers, accounts, and counterparties
5//! - Edges are transactions and ownership relationships
6//! - Ground truth labels identify suspicious activity
7
8use std::collections::HashMap;
9
10use chrono::{Datelike, Timelike};
11
12use datasynth_banking::models::{BankAccount, BankTransaction, BankingCustomer, CounterpartyPool};
13use datasynth_core::models::banking::{AmlTypology, Direction, RiskTier};
14
15use crate::models::{EdgeType, Graph, GraphEdge, GraphNode, GraphType, NodeId, NodeType};
16
17/// Configuration for banking graph building.
18#[derive(Debug, Clone)]
19pub struct BankingGraphConfig {
20    /// Include customer nodes.
21    pub include_customers: bool,
22    /// Include account nodes.
23    pub include_accounts: bool,
24    /// Include counterparty nodes.
25    pub include_counterparties: bool,
26    /// Include beneficial ownership edges.
27    pub include_beneficial_ownership: bool,
28    /// Create transaction edges.
29    pub create_transaction_edges: bool,
30    /// Minimum transaction amount to include as edge.
31    pub min_transaction_amount: f64,
32    /// Aggregate parallel edges between same nodes.
33    pub aggregate_parallel_edges: bool,
34    /// Include temporal features.
35    pub include_temporal_features: bool,
36    /// Include risk features.
37    pub include_risk_features: bool,
38}
39
40impl Default for BankingGraphConfig {
41    fn default() -> Self {
42        Self {
43            include_customers: true,
44            include_accounts: true,
45            include_counterparties: true,
46            include_beneficial_ownership: true,
47            create_transaction_edges: true,
48            min_transaction_amount: 0.0,
49            aggregate_parallel_edges: false,
50            include_temporal_features: true,
51            include_risk_features: true,
52        }
53    }
54}
55
56/// Builder for banking network graphs.
57pub struct BankingGraphBuilder {
58    config: BankingGraphConfig,
59    graph: Graph,
60    /// Map from customer ID to node ID.
61    customer_nodes: HashMap<String, NodeId>,
62    /// Map from account ID to node ID.
63    account_nodes: HashMap<String, NodeId>,
64    /// Map from counterparty name to node ID.
65    counterparty_nodes: HashMap<String, NodeId>,
66    /// For edge aggregation: (source, target) -> aggregated data.
67    edge_aggregation: HashMap<(NodeId, NodeId), AggregatedBankingEdge>,
68}
69
70impl BankingGraphBuilder {
71    /// Creates a new banking graph builder.
72    pub fn new(config: BankingGraphConfig) -> Self {
73        Self {
74            config,
75            graph: Graph::new("banking_network", GraphType::Custom("banking".to_string())),
76            customer_nodes: HashMap::new(),
77            account_nodes: HashMap::new(),
78            counterparty_nodes: HashMap::new(),
79            edge_aggregation: HashMap::new(),
80        }
81    }
82
83    /// Adds customers to the graph.
84    pub fn add_customers(&mut self, customers: &[BankingCustomer]) {
85        if !self.config.include_customers {
86            return;
87        }
88
89        for customer in customers {
90            self.add_customer(customer);
91        }
92    }
93
94    /// Adds a single customer to the graph.
95    fn add_customer(&mut self, customer: &BankingCustomer) -> NodeId {
96        let key = customer.customer_id.to_string();
97
98        if let Some(&id) = self.customer_nodes.get(&key) {
99            return id;
100        }
101
102        let mut node = GraphNode::new(
103            0,
104            NodeType::Customer,
105            key.clone(),
106            customer.name.display_name().to_string(),
107        );
108
109        // Add categorical features
110        node.categorical_features.insert(
111            "customer_type".to_string(),
112            format!("{:?}", customer.customer_type),
113        );
114        node.categorical_features.insert(
115            "residence_country".to_string(),
116            customer.residence_country.clone(),
117        );
118        node.categorical_features
119            .insert("risk_tier".to_string(), format!("{:?}", customer.risk_tier));
120
121        // Add risk features
122        if self.config.include_risk_features {
123            // Risk tier encoding (0=Low to 1=Highest)
124            let risk_score = match customer.risk_tier {
125                RiskTier::Low => 0.0,
126                RiskTier::Medium => 0.33,
127                RiskTier::High => 0.67,
128                RiskTier::VeryHigh | RiskTier::Prohibited => 1.0,
129            };
130            node.features.push(risk_score);
131
132            // PEP indicator
133            node.features.push(if customer.is_pep { 1.0 } else { 0.0 });
134
135            // Number of accounts
136            node.features.push(customer.account_ids.len() as f64);
137
138            // KYC profile features
139            let kyc = &customer.kyc_profile;
140
141            // Expected monthly turnover encoding
142            let turnover_band = match format!("{:?}", kyc.expected_monthly_turnover).as_str() {
143                "VeryLow" => 1.0,
144                "Low" => 2.0,
145                "Medium" => 3.0,
146                "High" => 4.0,
147                "VeryHigh" => 5.0,
148                _ => 3.0,
149            };
150            node.features.push(turnover_band);
151
152            // Cash intensity
153            let cash_intensity: f64 = match format!("{:?}", kyc.cash_intensity).as_str() {
154                "VeryLow" => 0.0,
155                "Low" => 0.25,
156                "Moderate" => 0.5,
157                "High" => 0.75,
158                "VeryHigh" => 1.0,
159                _ => 0.5,
160            };
161            node.features.push(cash_intensity);
162        }
163
164        // Mark anomaly if customer is suspicious
165        if customer.is_mule {
166            node = node.as_anomaly("money_mule");
167            node.labels.push("mule".to_string());
168        }
169
170        let id = self.graph.add_node(node);
171        self.customer_nodes.insert(key, id);
172        id
173    }
174
175    /// Adds accounts to the graph.
176    pub fn add_accounts(&mut self, accounts: &[BankAccount], customers: &[BankingCustomer]) {
177        if !self.config.include_accounts {
178            return;
179        }
180
181        // Build customer lookup
182        let customer_map: HashMap<_, _> = customers.iter().map(|c| (c.customer_id, c)).collect();
183
184        for account in accounts {
185            let account_id = self.add_account(account);
186
187            // Create ownership edge from customer to account
188            if let Some(customer) = customer_map.get(&account.primary_owner_id) {
189                let customer_id = self.add_customer(customer);
190
191                let edge = GraphEdge::new(0, customer_id, account_id, EdgeType::Ownership)
192                    .with_weight(1.0)
193                    .with_property(
194                        "relationship",
195                        crate::models::EdgeProperty::String("account_owner".to_string()),
196                    );
197
198                self.graph.add_edge(edge);
199            }
200        }
201    }
202
203    /// Adds a single account to the graph.
204    fn add_account(&mut self, account: &BankAccount) -> NodeId {
205        let key = account.account_id.to_string();
206
207        if let Some(&id) = self.account_nodes.get(&key) {
208            return id;
209        }
210
211        let mut node = GraphNode::new(
212            0,
213            NodeType::Account,
214            key.clone(),
215            format!("{:?} - {}", account.account_type, account.account_number),
216        );
217
218        // Add categorical features
219        node.categorical_features.insert(
220            "account_type".to_string(),
221            format!("{:?}", account.account_type),
222        );
223        node.categorical_features
224            .insert("currency".to_string(), account.currency.clone());
225        node.categorical_features
226            .insert("status".to_string(), format!("{:?}", account.status));
227
228        // Add numeric features
229        if self.config.include_risk_features {
230            // Balance (log-scaled)
231            let balance: f64 = account.current_balance.try_into().unwrap_or(0.0);
232            node.features.push((balance.abs() + 1.0).ln());
233
234            // Overdraft limit (log-scaled)
235            let limit: f64 = account.overdraft_limit.try_into().unwrap_or(0.0);
236            node.features.push((limit + 1.0).ln());
237
238            // Has debit card
239            node.features.push(if account.features.debit_card {
240                1.0
241            } else {
242                0.0
243            });
244
245            // Has international capability
246            node.features
247                .push(if account.features.international_transfers {
248                    1.0
249                } else {
250                    0.0
251                });
252        }
253
254        let id = self.graph.add_node(node);
255        self.account_nodes.insert(key, id);
256        id
257    }
258
259    /// Adds counterparties to the graph.
260    pub fn add_counterparties(&mut self, pool: &CounterpartyPool) {
261        if !self.config.include_counterparties {
262            return;
263        }
264
265        for merchant in &pool.merchants {
266            self.add_counterparty_node(
267                &merchant.name,
268                "merchant",
269                Some(&format!("{:?}", merchant.mcc)),
270            );
271        }
272
273        for employer in &pool.employers {
274            let industry = employer.industry_code.as_deref().unwrap_or("Unknown");
275            self.add_counterparty_node(&employer.name, "employer", Some(industry));
276        }
277
278        for utility in &pool.utilities {
279            self.add_counterparty_node(
280                &utility.name,
281                "utility",
282                Some(&format!("{:?}", utility.utility_type)),
283            );
284        }
285    }
286
287    /// Adds a counterparty node.
288    fn add_counterparty_node(
289        &mut self,
290        name: &str,
291        cp_type: &str,
292        category: Option<&str>,
293    ) -> NodeId {
294        let key = format!("{}_{}", cp_type, name);
295
296        if let Some(&id) = self.counterparty_nodes.get(&key) {
297            return id;
298        }
299
300        let mut node = GraphNode::new(
301            0,
302            NodeType::Custom("Counterparty".to_string()),
303            key.clone(),
304            name.to_string(),
305        );
306
307        node.categorical_features
308            .insert("counterparty_type".to_string(), cp_type.to_string());
309
310        if let Some(cat) = category {
311            node.categorical_features
312                .insert("category".to_string(), cat.to_string());
313        }
314
315        let id = self.graph.add_node(node);
316        self.counterparty_nodes.insert(key, id);
317        id
318    }
319
320    /// Adds transactions to the graph.
321    pub fn add_transactions(&mut self, transactions: &[BankTransaction]) {
322        if !self.config.create_transaction_edges {
323            return;
324        }
325
326        for txn in transactions {
327            self.add_transaction(txn);
328        }
329    }
330
331    /// Adds a single transaction to the graph.
332    fn add_transaction(&mut self, txn: &BankTransaction) {
333        let amount: f64 = txn.amount.try_into().unwrap_or(0.0);
334        if amount < self.config.min_transaction_amount {
335            return;
336        }
337
338        // Get or create account node
339        let account_key = txn.account_id.to_string();
340        let account_node = *self.account_nodes.get(&account_key).unwrap_or(&0);
341        if account_node == 0 {
342            return; // Account not in graph
343        }
344
345        // Get or create counterparty node
346        let cp_key = format!("counterparty_{}", txn.counterparty.name);
347        let counterparty_node = if let Some(&id) = self.counterparty_nodes.get(&cp_key) {
348            id
349        } else {
350            self.add_counterparty_node(
351                &txn.counterparty.name,
352                &format!("{:?}", txn.counterparty.counterparty_type),
353                None,
354            )
355        };
356
357        // Determine edge direction based on transaction direction
358        let (source, target) = match txn.direction {
359            Direction::Inbound => (counterparty_node, account_node),
360            Direction::Outbound => (account_node, counterparty_node),
361        };
362
363        if self.config.aggregate_parallel_edges {
364            self.aggregate_transaction_edge(source, target, txn);
365        } else {
366            let edge = self.create_transaction_edge(source, target, txn);
367            self.graph.add_edge(edge);
368        }
369    }
370
371    /// Creates a transaction edge with features.
372    fn create_transaction_edge(
373        &self,
374        source: NodeId,
375        target: NodeId,
376        txn: &BankTransaction,
377    ) -> GraphEdge {
378        let amount: f64 = txn.amount.try_into().unwrap_or(0.0);
379
380        let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
381            .with_weight(amount)
382            .with_timestamp(txn.timestamp_initiated.date_naive());
383
384        // Add transaction properties
385        edge.properties.insert(
386            "transaction_id".to_string(),
387            crate::models::EdgeProperty::String(txn.transaction_id.to_string()),
388        );
389        edge.properties.insert(
390            "channel".to_string(),
391            crate::models::EdgeProperty::String(format!("{:?}", txn.channel)),
392        );
393        edge.properties.insert(
394            "category".to_string(),
395            crate::models::EdgeProperty::String(format!("{:?}", txn.category)),
396        );
397
398        // Add numeric features
399        // Log amount
400        edge.features.push((amount + 1.0).ln());
401
402        // Direction encoding
403        edge.features.push(match txn.direction {
404            Direction::Inbound => 1.0,
405            Direction::Outbound => 0.0,
406        });
407
408        // Channel encoding
409        let channel_code = match format!("{:?}", txn.channel).as_str() {
410            "CardPresent" => 0.0,
411            "CardNotPresent" => 1.0,
412            "Ach" => 2.0,
413            "Wire" => 3.0,
414            "Cash" => 4.0,
415            "Atm" => 5.0,
416            "Branch" => 6.0,
417            "Mobile" => 7.0,
418            "Online" => 8.0,
419            "Swift" => 9.0,
420            _ => 10.0,
421        };
422        edge.features.push(channel_code / 10.0); // Normalized
423
424        // Temporal features
425        if self.config.include_temporal_features {
426            let weekday = txn.timestamp_initiated.weekday().num_days_from_monday() as f64;
427            edge.features.push(weekday / 6.0);
428
429            let hour = txn.timestamp_initiated.hour() as f64;
430            edge.features.push(hour / 23.0);
431
432            let day = txn.timestamp_initiated.day() as f64;
433            edge.features.push(day / 31.0);
434
435            let month = txn.timestamp_initiated.month() as f64;
436            edge.features.push(month / 12.0);
437
438            // Is weekend
439            edge.features.push(if weekday >= 5.0 { 1.0 } else { 0.0 });
440
441            // Is off-hours (before 7am or after 10pm)
442            let is_off_hours = !(7.0..=22.0).contains(&hour);
443            edge.features.push(if is_off_hours { 1.0 } else { 0.0 });
444        }
445
446        // Risk features
447        if self.config.include_risk_features {
448            // Is cash transaction
449            edge.features.push(if txn.is_cash() { 1.0 } else { 0.0 });
450
451            // Is cross-border
452            edge.features
453                .push(if txn.is_cross_border() { 1.0 } else { 0.0 });
454
455            // Risk score from transaction
456            edge.features
457                .push(txn.calculate_risk_score() as f64 / 100.0);
458        }
459
460        // Ground truth labels
461        if txn.is_suspicious {
462            edge = edge.as_anomaly(&format!(
463                "{:?}",
464                txn.suspicion_reason.unwrap_or(AmlTypology::Structuring)
465            ));
466
467            if let Some(typology) = txn.suspicion_reason {
468                edge.labels.push(format!("{:?}", typology));
469            }
470
471            if let Some(stage) = txn.laundering_stage {
472                edge.labels.push(format!("{:?}", stage));
473            }
474
475            if txn.is_spoofed {
476                edge.labels.push("spoofed".to_string());
477            }
478        }
479
480        edge
481    }
482
483    /// Aggregates transaction edges between same source and target.
484    fn aggregate_transaction_edge(
485        &mut self,
486        source: NodeId,
487        target: NodeId,
488        txn: &BankTransaction,
489    ) {
490        let key = (source, target);
491        let amount: f64 = txn.amount.try_into().unwrap_or(0.0);
492        let date = txn.timestamp_initiated.date_naive();
493
494        let agg = self
495            .edge_aggregation
496            .entry(key)
497            .or_insert(AggregatedBankingEdge {
498                source,
499                target,
500                total_amount: 0.0,
501                count: 0,
502                suspicious_count: 0,
503                first_date: date,
504                last_date: date,
505                channels: HashMap::new(),
506            });
507
508        agg.total_amount += amount;
509        agg.count += 1;
510
511        if txn.is_suspicious {
512            agg.suspicious_count += 1;
513        }
514
515        if date < agg.first_date {
516            agg.first_date = date;
517        }
518        if date > agg.last_date {
519            agg.last_date = date;
520        }
521
522        let channel = format!("{:?}", txn.channel);
523        *agg.channels.entry(channel).or_insert(0) += 1;
524    }
525
526    /// Builds the final graph.
527    pub fn build(mut self) -> Graph {
528        // If aggregating, create the aggregated edges now
529        if self.config.aggregate_parallel_edges {
530            for ((source, target), agg) in self.edge_aggregation {
531                let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
532                    .with_weight(agg.total_amount)
533                    .with_timestamp(agg.last_date);
534
535                // Aggregation features
536                edge.features.push((agg.total_amount + 1.0).ln());
537                edge.features.push(agg.count as f64);
538                edge.features.push(agg.suspicious_count as f64);
539                edge.features
540                    .push(agg.suspicious_count as f64 / agg.count.max(1) as f64);
541
542                let duration = (agg.last_date - agg.first_date).num_days() as f64;
543                edge.features.push(duration);
544
545                // Average amount per transaction
546                edge.features
547                    .push(agg.total_amount / agg.count.max(1) as f64);
548
549                // Transaction frequency (per day)
550                edge.features.push(agg.count as f64 / duration.max(1.0));
551
552                // Number of unique channels
553                edge.features.push(agg.channels.len() as f64);
554
555                // Mark as anomaly if any suspicious transactions
556                if agg.suspicious_count > 0 {
557                    edge = edge.as_anomaly("suspicious_link");
558                }
559
560                self.graph.add_edge(edge);
561            }
562        }
563
564        self.graph.compute_statistics();
565        self.graph
566    }
567}
568
569/// Aggregated banking edge data (for future edge aggregation support).
570#[allow(dead_code)]
571struct AggregatedBankingEdge {
572    source: NodeId,
573    target: NodeId,
574    total_amount: f64,
575    count: usize,
576    suspicious_count: usize,
577    first_date: chrono::NaiveDate,
578    last_date: chrono::NaiveDate,
579    channels: HashMap<String, usize>,
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use chrono::NaiveDate;
586    use datasynth_banking::models::CounterpartyRef;
587    use datasynth_core::models::banking::{
588        BankAccountType, TransactionCategory, TransactionChannel,
589    };
590    use rust_decimal::Decimal;
591    use uuid::Uuid;
592
593    fn create_test_customer() -> BankingCustomer {
594        BankingCustomer::new_retail(
595            Uuid::new_v4(),
596            "John",
597            "Doe",
598            "US",
599            NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
600        )
601    }
602
603    fn create_test_account(customer: &BankingCustomer) -> BankAccount {
604        BankAccount::new(
605            Uuid::new_v4(),
606            "****1234".to_string(),
607            BankAccountType::Checking,
608            customer.customer_id,
609            "USD",
610            NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
611        )
612    }
613
614    fn create_test_transaction(account: &BankAccount) -> BankTransaction {
615        BankTransaction::new(
616            Uuid::new_v4(),
617            account.account_id,
618            Decimal::from(1000),
619            "USD",
620            Direction::Outbound,
621            TransactionChannel::CardPresent,
622            TransactionCategory::Shopping,
623            CounterpartyRef::merchant(Uuid::new_v4(), "Test Store"),
624            "Test purchase",
625            chrono::Utc::now(),
626        )
627    }
628
629    #[test]
630    fn test_build_banking_graph() {
631        let customer = create_test_customer();
632        let account = create_test_account(&customer);
633        let txn = create_test_transaction(&account);
634
635        let mut builder = BankingGraphBuilder::new(BankingGraphConfig::default());
636        builder.add_customers(&[customer.clone()]);
637        builder.add_accounts(&[account], &[customer]);
638        builder.add_transactions(&[txn]);
639
640        let graph = builder.build();
641
642        // Should have customer, account, and counterparty nodes
643        assert!(graph.node_count() >= 2);
644        // Should have ownership and transaction edges
645        assert!(graph.edge_count() >= 1);
646    }
647
648    #[test]
649    fn test_suspicious_transaction_labels() {
650        let customer = create_test_customer();
651        let account = create_test_account(&customer);
652        let mut txn = create_test_transaction(&account);
653
654        // Mark as suspicious
655        txn = txn.mark_suspicious(AmlTypology::Structuring, "CASE-001");
656
657        let mut builder = BankingGraphBuilder::new(BankingGraphConfig::default());
658        builder.add_customers(&[customer.clone()]);
659        builder.add_accounts(&[account], &[customer]);
660        builder.add_transactions(&[txn]);
661
662        let graph = builder.build();
663
664        // Check that suspicious edge exists
665        let suspicious_edges = graph.anomalous_edges();
666        assert!(!suspicious_edges.is_empty());
667    }
668}