Skip to main content

datasynth_graph/builders/
transaction_graph.rs

1//! Transaction graph builder.
2//!
3//! Builds a graph where:
4//! - Nodes are GL accounts (or entities like vendors, customers)
5//! - Edges are transactions (journal entry lines)
6
7use std::collections::HashMap;
8
9use rust_decimal::Decimal;
10
11use datasynth_core::models::JournalEntry;
12
13use crate::models::{
14    AccountNode, EdgeType, Graph, GraphEdge, GraphNode, GraphType, NodeId, NodeType,
15    TransactionEdge,
16};
17
18/// Configuration for transaction graph building.
19#[derive(Debug, Clone)]
20pub struct TransactionGraphConfig {
21    /// Whether to include vendor nodes.
22    pub include_vendors: bool,
23    /// Whether to include customer nodes.
24    pub include_customers: bool,
25    /// Whether to create edges between debit and credit accounts.
26    pub create_debit_credit_edges: bool,
27    /// Whether to create edges from/to document nodes.
28    pub include_document_nodes: bool,
29    /// Minimum edge weight to include.
30    pub min_edge_weight: f64,
31    /// Whether to aggregate parallel edges.
32    pub aggregate_parallel_edges: bool,
33}
34
35impl Default for TransactionGraphConfig {
36    fn default() -> Self {
37        Self {
38            include_vendors: false,
39            include_customers: false,
40            create_debit_credit_edges: true,
41            include_document_nodes: false,
42            min_edge_weight: 0.0,
43            aggregate_parallel_edges: false,
44        }
45    }
46}
47
48/// Builder for transaction graphs.
49pub struct TransactionGraphBuilder {
50    config: TransactionGraphConfig,
51    graph: Graph,
52    /// Map from account code to node ID.
53    account_nodes: HashMap<String, NodeId>,
54    /// Map from document number to node ID (if document nodes enabled).
55    document_nodes: HashMap<String, NodeId>,
56    /// For edge aggregation: (source, target) -> aggregated amount.
57    edge_aggregation: HashMap<(NodeId, NodeId), AggregatedEdge>,
58}
59
60impl TransactionGraphBuilder {
61    /// Creates a new transaction graph builder.
62    pub fn new(config: TransactionGraphConfig) -> Self {
63        Self {
64            config,
65            graph: Graph::new("transaction_network", GraphType::Transaction),
66            account_nodes: HashMap::new(),
67            document_nodes: HashMap::new(),
68            edge_aggregation: HashMap::new(),
69        }
70    }
71
72    /// Adds a journal entry to the graph.
73    pub fn add_journal_entry(&mut self, entry: &JournalEntry) {
74        if self.config.include_document_nodes {
75            self.add_journal_entry_with_document(entry);
76        } else if self.config.create_debit_credit_edges {
77            self.add_journal_entry_debit_credit(entry);
78        }
79    }
80
81    /// Adds journal entry creating edges between debit and credit accounts.
82    fn add_journal_entry_debit_credit(&mut self, entry: &JournalEntry) {
83        // Collect debit and credit lines
84        let debits: Vec<_> = entry
85            .lines
86            .iter()
87            .filter(|l| l.debit_amount > Decimal::ZERO)
88            .collect();
89
90        let credits: Vec<_> = entry
91            .lines
92            .iter()
93            .filter(|l| l.credit_amount > Decimal::ZERO)
94            .collect();
95
96        // Create edges from debit accounts to credit accounts
97        for debit in &debits {
98            let source_id = self.get_or_create_account_node(
99                debit.account_code(),
100                debit.account_description(),
101                entry.company_code(),
102            );
103
104            for credit in &credits {
105                let target_id = self.get_or_create_account_node(
106                    credit.account_code(),
107                    credit.account_description(),
108                    entry.company_code(),
109                );
110
111                // Calculate edge weight (proportional allocation)
112                let total_debit: Decimal = debits.iter().map(|d| d.debit_amount).sum();
113                let total_credit: Decimal = credits.iter().map(|c| c.credit_amount).sum();
114
115                let proportion =
116                    (debit.debit_amount / total_debit) * (credit.credit_amount / total_credit);
117                let edge_amount = debit.debit_amount * proportion;
118                let edge_weight: f64 = edge_amount.try_into().unwrap_or(0.0);
119
120                if edge_weight < self.config.min_edge_weight {
121                    continue;
122                }
123
124                if self.config.aggregate_parallel_edges {
125                    self.aggregate_edge(source_id, target_id, edge_weight, entry);
126                } else {
127                    let mut tx_edge = TransactionEdge::new(
128                        0,
129                        source_id,
130                        target_id,
131                        entry.document_number(),
132                        entry.posting_date(),
133                        edge_amount,
134                        true,
135                    );
136                    tx_edge.company_code = entry.company_code().to_string();
137                    tx_edge.cost_center = debit.cost_center.clone();
138                    tx_edge.business_process = entry
139                        .header
140                        .business_process
141                        .as_ref()
142                        .map(|bp| format!("{:?}", bp));
143                    tx_edge.compute_features();
144
145                    // Propagate anomaly flag from journal entry to graph edge
146                    if entry.header.is_anomaly {
147                        tx_edge.edge.is_anomaly = true;
148                        if let Some(ref anomaly_type) = entry.header.anomaly_type {
149                            tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
150                        }
151                    }
152
153                    self.graph.add_edge(tx_edge.edge);
154                }
155            }
156        }
157    }
158
159    /// Adds journal entry with document nodes.
160    fn add_journal_entry_with_document(&mut self, entry: &JournalEntry) {
161        // Create or get document node
162        let doc_id =
163            self.get_or_create_document_node(&entry.document_number(), entry.company_code());
164
165        // Create edges from document to each account
166        for line in &entry.lines {
167            let account_id = self.get_or_create_account_node(
168                line.account_code(),
169                line.account_description(),
170                entry.company_code(),
171            );
172
173            let is_debit = line.debit_amount > Decimal::ZERO;
174            let amount = if is_debit {
175                line.debit_amount
176            } else {
177                line.credit_amount
178            };
179
180            let mut tx_edge = TransactionEdge::new(
181                0,
182                doc_id,
183                account_id,
184                entry.document_number(),
185                entry.posting_date(),
186                amount,
187                is_debit,
188            );
189            tx_edge.company_code = entry.company_code().to_string();
190            tx_edge.cost_center = line.cost_center.clone();
191            tx_edge.business_process = entry
192                .header
193                .business_process
194                .as_ref()
195                .map(|bp| format!("{:?}", bp));
196            tx_edge.compute_features();
197
198            // Propagate anomaly flag from journal entry to graph edge
199            if entry.header.is_anomaly {
200                tx_edge.edge.is_anomaly = true;
201                if let Some(ref anomaly_type) = entry.header.anomaly_type {
202                    tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
203                }
204            }
205
206            self.graph.add_edge(tx_edge.edge);
207        }
208    }
209
210    /// Gets or creates an account node.
211    fn get_or_create_account_node(
212        &mut self,
213        account_code: &str,
214        account_name: &str,
215        company_code: &str,
216    ) -> NodeId {
217        let key = format!("{}_{}", company_code, account_code);
218
219        if let Some(&id) = self.account_nodes.get(&key) {
220            return id;
221        }
222
223        let mut account = AccountNode::new(
224            0,
225            account_code.to_string(),
226            account_name.to_string(),
227            Self::infer_account_type(account_code),
228            company_code.to_string(),
229        );
230        account.is_balance_sheet = Self::is_balance_sheet_account(account_code);
231        account.normal_balance = Self::infer_normal_balance(account_code);
232        account.compute_features();
233
234        let id = self.graph.add_node(account.node);
235        self.account_nodes.insert(key, id);
236        id
237    }
238
239    /// Gets or creates a document node.
240    fn get_or_create_document_node(&mut self, document_number: &str, company_code: &str) -> NodeId {
241        let key = format!("{}_{}", company_code, document_number);
242
243        if let Some(&id) = self.document_nodes.get(&key) {
244            return id;
245        }
246
247        let node = GraphNode::new(
248            0,
249            NodeType::JournalEntry,
250            document_number.to_string(),
251            document_number.to_string(),
252        );
253
254        let id = self.graph.add_node(node);
255        self.document_nodes.insert(key, id);
256        id
257    }
258
259    /// Aggregates edges between the same source and target.
260    fn aggregate_edge(
261        &mut self,
262        source: NodeId,
263        target: NodeId,
264        weight: f64,
265        entry: &JournalEntry,
266    ) {
267        let key = (source, target);
268        let agg = self.edge_aggregation.entry(key).or_insert(AggregatedEdge {
269            source,
270            target,
271            total_weight: 0.0,
272            count: 0,
273            first_date: entry.posting_date(),
274            last_date: entry.posting_date(),
275        });
276
277        agg.total_weight += weight;
278        agg.count += 1;
279        if entry.posting_date() < agg.first_date {
280            agg.first_date = entry.posting_date();
281        }
282        if entry.posting_date() > agg.last_date {
283            agg.last_date = entry.posting_date();
284        }
285    }
286
287    /// Infers account type from account code.
288    fn infer_account_type(account_code: &str) -> String {
289        if account_code.is_empty() {
290            return "Unknown".to_string();
291        }
292
293        match account_code
294            .chars()
295            .next()
296            .expect("non-empty checked above")
297        {
298            '1' => "Asset".to_string(),
299            '2' => "Liability".to_string(),
300            '3' => "Equity".to_string(),
301            '4' => "Revenue".to_string(),
302            '5' | '6' | '7' => "Expense".to_string(),
303            _ => "Unknown".to_string(),
304        }
305    }
306
307    /// Checks if account is balance sheet.
308    fn is_balance_sheet_account(account_code: &str) -> bool {
309        if account_code.is_empty() {
310            return false;
311        }
312
313        matches!(
314            account_code
315                .chars()
316                .next()
317                .expect("non-empty checked above"),
318            '1' | '2' | '3'
319        )
320    }
321
322    /// Infers normal balance from account code.
323    fn infer_normal_balance(account_code: &str) -> String {
324        if account_code.is_empty() {
325            return "Debit".to_string();
326        }
327
328        match account_code
329            .chars()
330            .next()
331            .expect("non-empty checked above")
332        {
333            '1' | '5' | '6' | '7' => "Debit".to_string(),
334            '2' | '3' | '4' => "Credit".to_string(),
335            _ => "Debit".to_string(),
336        }
337    }
338
339    /// Builds the final graph.
340    pub fn build(mut self) -> Graph {
341        // If aggregating, create the aggregated edges now
342        if self.config.aggregate_parallel_edges {
343            for ((source, target), agg) in self.edge_aggregation {
344                let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
345                    .with_weight(agg.total_weight)
346                    .with_timestamp(agg.last_date);
347
348                // Add aggregation features
349                edge.features.push((agg.total_weight + 1.0).ln());
350                edge.features.push(agg.count as f64);
351
352                let duration = (agg.last_date - agg.first_date).num_days() as f64;
353                edge.features.push(duration);
354
355                self.graph.add_edge(edge);
356            }
357        }
358
359        self.graph.compute_statistics();
360        self.graph
361    }
362
363    /// Adds multiple journal entries.
364    pub fn add_journal_entries(&mut self, entries: &[JournalEntry]) {
365        for entry in entries {
366            self.add_journal_entry(entry);
367        }
368    }
369}
370
371/// Aggregated edge data.
372#[allow(dead_code)]
373struct AggregatedEdge {
374    source: NodeId,
375    target: NodeId,
376    total_weight: f64,
377    count: usize,
378    first_date: chrono::NaiveDate,
379    last_date: chrono::NaiveDate,
380}
381
382#[cfg(test)]
383#[allow(clippy::unwrap_used)]
384mod tests {
385    use super::*;
386    use datasynth_core::models::{BusinessProcess, JournalEntryLine};
387    use rust_decimal_macros::dec;
388
389    fn create_test_entry() -> JournalEntry {
390        let mut entry = JournalEntry::new_simple(
391            "JE001".to_string(),
392            "1000".to_string(),
393            chrono::NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
394            "Test Entry".to_string(),
395        );
396
397        let doc_id = entry.header.document_id;
398
399        entry.add_line(JournalEntryLine::debit(
400            doc_id,
401            1,
402            "1000".to_string(),
403            dec!(1000),
404        ));
405
406        entry.add_line(JournalEntryLine::credit(
407            doc_id,
408            2,
409            "4000".to_string(),
410            dec!(1000),
411        ));
412
413        entry
414    }
415
416    fn create_test_entry_with_business_process(bp: BusinessProcess) -> JournalEntry {
417        let mut entry = create_test_entry();
418        entry.header.business_process = Some(bp);
419        entry
420    }
421
422    #[test]
423    fn test_build_transaction_graph() {
424        let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
425        builder.add_journal_entry(&create_test_entry());
426
427        let graph = builder.build();
428
429        assert_eq!(graph.node_count(), 2); // Cash and Revenue
430        assert_eq!(graph.edge_count(), 1); // One transaction edge
431    }
432
433    #[test]
434    fn test_with_document_nodes() {
435        let config = TransactionGraphConfig {
436            include_document_nodes: true,
437            create_debit_credit_edges: false,
438            ..Default::default()
439        };
440
441        let mut builder = TransactionGraphBuilder::new(config);
442        builder.add_journal_entry(&create_test_entry());
443
444        let graph = builder.build();
445
446        assert_eq!(graph.node_count(), 3); // Document + Cash + Revenue
447        assert_eq!(graph.edge_count(), 2); // Document to each account
448    }
449
450    #[test]
451    fn test_business_process_edge_metadata() {
452        let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
453        let entry = create_test_entry_with_business_process(BusinessProcess::P2P);
454        builder.add_journal_entry(&entry);
455
456        let graph = builder.build();
457
458        // All edges should have the document_number property set
459        for edge in graph.edges.values() {
460            assert!(edge.properties.contains_key("document_number"));
461        }
462        assert_eq!(graph.edge_count(), 1);
463    }
464
465    #[test]
466    fn test_business_process_with_document_nodes() {
467        let config = TransactionGraphConfig {
468            include_document_nodes: true,
469            create_debit_credit_edges: false,
470            ..Default::default()
471        };
472
473        let mut builder = TransactionGraphBuilder::new(config);
474        let entry = create_test_entry_with_business_process(BusinessProcess::O2C);
475        builder.add_journal_entry(&entry);
476
477        let graph = builder.build();
478
479        assert_eq!(graph.node_count(), 3);
480        assert_eq!(graph.edge_count(), 2);
481    }
482}