Skip to main content

datasynth_graph/builders/
transaction_graph.rs

1//! Transaction graph builder.
2//!
3//! Builds a graph where:
4//! - Nodes are GL accounts (or entities like vendors, customers)
5//! - Edges are transactions (journal entry lines)
6
7use std::collections::HashMap;
8
9use rust_decimal::Decimal;
10
11use datasynth_core::models::JournalEntry;
12
13use crate::models::{
14    AccountNode, EdgeType, Graph, GraphEdge, GraphNode, GraphType, NodeId, NodeType,
15    TransactionEdge,
16};
17
18/// Configuration for transaction graph building.
19#[derive(Debug, Clone)]
20pub struct TransactionGraphConfig {
21    /// Whether to include vendor nodes.
22    pub include_vendors: bool,
23    /// Whether to include customer nodes.
24    pub include_customers: bool,
25    /// Whether to create edges between debit and credit accounts.
26    pub create_debit_credit_edges: bool,
27    /// Whether to create edges from/to document nodes.
28    pub include_document_nodes: bool,
29    /// Minimum edge weight to include.
30    pub min_edge_weight: f64,
31    /// Whether to aggregate parallel edges.
32    pub aggregate_parallel_edges: bool,
33}
34
35impl Default for TransactionGraphConfig {
36    fn default() -> Self {
37        Self {
38            include_vendors: false,
39            include_customers: false,
40            create_debit_credit_edges: true,
41            include_document_nodes: false,
42            min_edge_weight: 0.0,
43            aggregate_parallel_edges: false,
44        }
45    }
46}
47
48/// Builder for transaction graphs.
49pub struct TransactionGraphBuilder {
50    config: TransactionGraphConfig,
51    graph: Graph,
52    /// Map from account code to node ID.
53    account_nodes: HashMap<String, NodeId>,
54    /// Map from document number to node ID (if document nodes enabled).
55    document_nodes: HashMap<String, NodeId>,
56    /// For edge aggregation: (source, target) -> aggregated amount.
57    edge_aggregation: HashMap<(NodeId, NodeId), AggregatedEdge>,
58}
59
60impl TransactionGraphBuilder {
61    /// Creates a new transaction graph builder.
62    pub fn new(config: TransactionGraphConfig) -> Self {
63        Self {
64            config,
65            graph: Graph::new("transaction_network", GraphType::Transaction),
66            account_nodes: HashMap::new(),
67            document_nodes: HashMap::new(),
68            edge_aggregation: HashMap::new(),
69        }
70    }
71
72    /// Adds a journal entry to the graph.
73    pub fn add_journal_entry(&mut self, entry: &JournalEntry) {
74        if self.config.include_document_nodes {
75            self.add_journal_entry_with_document(entry);
76        } else if self.config.create_debit_credit_edges {
77            self.add_journal_entry_debit_credit(entry);
78        }
79    }
80
81    /// Adds journal entry creating edges between debit and credit accounts.
82    fn add_journal_entry_debit_credit(&mut self, entry: &JournalEntry) {
83        // Collect debit and credit lines
84        let debits: Vec<_> = entry
85            .lines
86            .iter()
87            .filter(|l| l.debit_amount > Decimal::ZERO)
88            .collect();
89
90        let credits: Vec<_> = entry
91            .lines
92            .iter()
93            .filter(|l| l.credit_amount > Decimal::ZERO)
94            .collect();
95
96        // Create edges from debit accounts to credit accounts
97        for debit in &debits {
98            let source_id = self.get_or_create_account_node(
99                debit.account_code(),
100                debit.account_description(),
101                entry.company_code(),
102            );
103
104            for credit in &credits {
105                let target_id = self.get_or_create_account_node(
106                    credit.account_code(),
107                    credit.account_description(),
108                    entry.company_code(),
109                );
110
111                // Calculate edge weight (proportional allocation)
112                let total_debit: Decimal = debits.iter().map(|d| d.debit_amount).sum();
113                let total_credit: Decimal = credits.iter().map(|c| c.credit_amount).sum();
114
115                let proportion =
116                    (debit.debit_amount / total_debit) * (credit.credit_amount / total_credit);
117                let edge_amount = debit.debit_amount * proportion;
118                let edge_weight: f64 = edge_amount.try_into().unwrap_or(0.0);
119
120                if edge_weight < self.config.min_edge_weight {
121                    continue;
122                }
123
124                if self.config.aggregate_parallel_edges {
125                    self.aggregate_edge(source_id, target_id, edge_weight, entry);
126                } else {
127                    let mut tx_edge = TransactionEdge::new(
128                        0,
129                        source_id,
130                        target_id,
131                        entry.document_number(),
132                        entry.posting_date(),
133                        edge_amount,
134                        true,
135                    );
136                    tx_edge.company_code = entry.company_code().to_string();
137                    tx_edge.cost_center = debit.cost_center.clone();
138                    tx_edge.business_process = entry
139                        .header
140                        .business_process
141                        .as_ref()
142                        .map(|bp| format!("{:?}", bp));
143                    tx_edge.compute_features();
144
145                    // Propagate anomaly flag from journal entry to graph edge
146                    if entry.header.is_anomaly {
147                        tx_edge.edge.is_anomaly = true;
148                        if let Some(ref anomaly_type) = entry.header.anomaly_type {
149                            tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
150                        }
151                    }
152
153                    self.graph.add_edge(tx_edge.edge);
154                }
155            }
156        }
157    }
158
159    /// Adds journal entry with document nodes.
160    fn add_journal_entry_with_document(&mut self, entry: &JournalEntry) {
161        // Create or get document node
162        let doc_id =
163            self.get_or_create_document_node(&entry.document_number(), entry.company_code());
164
165        // Create edges from document to each account
166        for line in &entry.lines {
167            let account_id = self.get_or_create_account_node(
168                line.account_code(),
169                line.account_description(),
170                entry.company_code(),
171            );
172
173            let is_debit = line.debit_amount > Decimal::ZERO;
174            let amount = if is_debit {
175                line.debit_amount
176            } else {
177                line.credit_amount
178            };
179
180            let mut tx_edge = TransactionEdge::new(
181                0,
182                doc_id,
183                account_id,
184                entry.document_number(),
185                entry.posting_date(),
186                amount,
187                is_debit,
188            );
189            tx_edge.company_code = entry.company_code().to_string();
190            tx_edge.cost_center = line.cost_center.clone();
191            tx_edge.business_process = entry
192                .header
193                .business_process
194                .as_ref()
195                .map(|bp| format!("{:?}", bp));
196            tx_edge.compute_features();
197
198            // Propagate anomaly flag from journal entry to graph edge
199            if entry.header.is_anomaly {
200                tx_edge.edge.is_anomaly = true;
201                if let Some(ref anomaly_type) = entry.header.anomaly_type {
202                    tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
203                }
204            }
205
206            self.graph.add_edge(tx_edge.edge);
207        }
208    }
209
210    /// Gets or creates an account node.
211    fn get_or_create_account_node(
212        &mut self,
213        account_code: &str,
214        account_name: &str,
215        company_code: &str,
216    ) -> NodeId {
217        let key = format!("{}_{}", company_code, account_code);
218
219        if let Some(&id) = self.account_nodes.get(&key) {
220            return id;
221        }
222
223        let mut account = AccountNode::new(
224            0,
225            account_code.to_string(),
226            account_name.to_string(),
227            Self::infer_account_type(account_code),
228            company_code.to_string(),
229        );
230        account.is_balance_sheet = Self::is_balance_sheet_account(account_code);
231        account.normal_balance = Self::infer_normal_balance(account_code);
232        account.compute_features();
233
234        let id = self.graph.add_node(account.node);
235        self.account_nodes.insert(key, id);
236        id
237    }
238
239    /// Gets or creates a document node.
240    fn get_or_create_document_node(&mut self, document_number: &str, company_code: &str) -> NodeId {
241        let key = format!("{}_{}", company_code, document_number);
242
243        if let Some(&id) = self.document_nodes.get(&key) {
244            return id;
245        }
246
247        let node = GraphNode::new(
248            0,
249            NodeType::JournalEntry,
250            document_number.to_string(),
251            document_number.to_string(),
252        );
253
254        let id = self.graph.add_node(node);
255        self.document_nodes.insert(key, id);
256        id
257    }
258
259    /// Aggregates edges between the same source and target.
260    fn aggregate_edge(
261        &mut self,
262        source: NodeId,
263        target: NodeId,
264        weight: f64,
265        entry: &JournalEntry,
266    ) {
267        let key = (source, target);
268        let agg = self.edge_aggregation.entry(key).or_insert(AggregatedEdge {
269            source,
270            target,
271            total_weight: 0.0,
272            count: 0,
273            first_date: entry.posting_date(),
274            last_date: entry.posting_date(),
275        });
276
277        agg.total_weight += weight;
278        agg.count += 1;
279        if entry.posting_date() < agg.first_date {
280            agg.first_date = entry.posting_date();
281        }
282        if entry.posting_date() > agg.last_date {
283            agg.last_date = entry.posting_date();
284        }
285    }
286
287    /// Infers account type from account code.
288    fn infer_account_type(account_code: &str) -> String {
289        if account_code.is_empty() {
290            return "Unknown".to_string();
291        }
292
293        match account_code.chars().next().unwrap() {
294            '1' => "Asset".to_string(),
295            '2' => "Liability".to_string(),
296            '3' => "Equity".to_string(),
297            '4' => "Revenue".to_string(),
298            '5' | '6' | '7' => "Expense".to_string(),
299            _ => "Unknown".to_string(),
300        }
301    }
302
303    /// Checks if account is balance sheet.
304    fn is_balance_sheet_account(account_code: &str) -> bool {
305        if account_code.is_empty() {
306            return false;
307        }
308
309        matches!(account_code.chars().next().unwrap(), '1' | '2' | '3')
310    }
311
312    /// Infers normal balance from account code.
313    fn infer_normal_balance(account_code: &str) -> String {
314        if account_code.is_empty() {
315            return "Debit".to_string();
316        }
317
318        match account_code.chars().next().unwrap() {
319            '1' | '5' | '6' | '7' => "Debit".to_string(),
320            '2' | '3' | '4' => "Credit".to_string(),
321            _ => "Debit".to_string(),
322        }
323    }
324
325    /// Builds the final graph.
326    pub fn build(mut self) -> Graph {
327        // If aggregating, create the aggregated edges now
328        if self.config.aggregate_parallel_edges {
329            for ((source, target), agg) in self.edge_aggregation {
330                let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
331                    .with_weight(agg.total_weight)
332                    .with_timestamp(agg.last_date);
333
334                // Add aggregation features
335                edge.features.push((agg.total_weight + 1.0).ln());
336                edge.features.push(agg.count as f64);
337
338                let duration = (agg.last_date - agg.first_date).num_days() as f64;
339                edge.features.push(duration);
340
341                self.graph.add_edge(edge);
342            }
343        }
344
345        self.graph.compute_statistics();
346        self.graph
347    }
348
349    /// Adds multiple journal entries.
350    pub fn add_journal_entries(&mut self, entries: &[JournalEntry]) {
351        for entry in entries {
352            self.add_journal_entry(entry);
353        }
354    }
355}
356
357/// Aggregated edge data.
358#[allow(dead_code)]
359struct AggregatedEdge {
360    source: NodeId,
361    target: NodeId,
362    total_weight: f64,
363    count: usize,
364    first_date: chrono::NaiveDate,
365    last_date: chrono::NaiveDate,
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use datasynth_core::models::{BusinessProcess, JournalEntryLine};
372    use rust_decimal_macros::dec;
373
374    fn create_test_entry() -> JournalEntry {
375        let mut entry = JournalEntry::new_simple(
376            "JE001".to_string(),
377            "1000".to_string(),
378            chrono::NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
379            "Test Entry".to_string(),
380        );
381
382        let doc_id = entry.header.document_id;
383
384        entry.add_line(JournalEntryLine::debit(
385            doc_id,
386            1,
387            "1000".to_string(),
388            dec!(1000),
389        ));
390
391        entry.add_line(JournalEntryLine::credit(
392            doc_id,
393            2,
394            "4000".to_string(),
395            dec!(1000),
396        ));
397
398        entry
399    }
400
401    fn create_test_entry_with_business_process(bp: BusinessProcess) -> JournalEntry {
402        let mut entry = create_test_entry();
403        entry.header.business_process = Some(bp);
404        entry
405    }
406
407    #[test]
408    fn test_build_transaction_graph() {
409        let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
410        builder.add_journal_entry(&create_test_entry());
411
412        let graph = builder.build();
413
414        assert_eq!(graph.node_count(), 2); // Cash and Revenue
415        assert_eq!(graph.edge_count(), 1); // One transaction edge
416    }
417
418    #[test]
419    fn test_with_document_nodes() {
420        let config = TransactionGraphConfig {
421            include_document_nodes: true,
422            create_debit_credit_edges: false,
423            ..Default::default()
424        };
425
426        let mut builder = TransactionGraphBuilder::new(config);
427        builder.add_journal_entry(&create_test_entry());
428
429        let graph = builder.build();
430
431        assert_eq!(graph.node_count(), 3); // Document + Cash + Revenue
432        assert_eq!(graph.edge_count(), 2); // Document to each account
433    }
434
435    #[test]
436    fn test_business_process_edge_metadata() {
437        let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
438        let entry = create_test_entry_with_business_process(BusinessProcess::P2P);
439        builder.add_journal_entry(&entry);
440
441        let graph = builder.build();
442
443        // All edges should have the document_number property set
444        for edge in graph.edges.values() {
445            assert!(edge.properties.contains_key("document_number"));
446        }
447        assert_eq!(graph.edge_count(), 1);
448    }
449
450    #[test]
451    fn test_business_process_with_document_nodes() {
452        let config = TransactionGraphConfig {
453            include_document_nodes: true,
454            create_debit_credit_edges: false,
455            ..Default::default()
456        };
457
458        let mut builder = TransactionGraphBuilder::new(config);
459        let entry = create_test_entry_with_business_process(BusinessProcess::O2C);
460        builder.add_journal_entry(&entry);
461
462        let graph = builder.build();
463
464        assert_eq!(graph.node_count(), 3);
465        assert_eq!(graph.edge_count(), 2);
466    }
467}