Skip to main content

datasynth_graph/builders/
transaction_graph.rs

1//! Transaction graph builder.
2//!
3//! Builds a graph where:
4//! - Nodes are GL accounts (or entities like vendors, customers)
5//! - Edges are transactions (journal entry lines)
6
7use std::collections::HashMap;
8
9use rust_decimal::Decimal;
10
11use datasynth_core::accounts::AccountCategory;
12use datasynth_core::framework_accounts::FrameworkAccounts;
13use datasynth_core::models::JournalEntry;
14
15use crate::models::{
16    AccountNode, EdgeType, Graph, GraphEdge, GraphNode, GraphType, NodeId, NodeType,
17    TransactionEdge,
18};
19
20/// Configuration for transaction graph building.
21#[derive(Debug, Clone)]
22pub struct TransactionGraphConfig {
23    /// Whether to include vendor nodes.
24    pub include_vendors: bool,
25    /// Whether to include customer nodes.
26    pub include_customers: bool,
27    /// Whether to create edges between debit and credit accounts.
28    pub create_debit_credit_edges: bool,
29    /// Whether to create edges from/to document nodes.
30    pub include_document_nodes: bool,
31    /// Minimum edge weight to include.
32    pub min_edge_weight: f64,
33    /// Whether to aggregate parallel edges.
34    pub aggregate_parallel_edges: bool,
35    /// Accounting framework for account classification (e.g. `"us_gaap"`,
36    /// `"french_gaap"`, `"german_gaap"`, `"ifrs"`). Defaults to `"us_gaap"`.
37    pub framework: Option<String>,
38}
39
40impl Default for TransactionGraphConfig {
41    fn default() -> Self {
42        Self {
43            include_vendors: false,
44            include_customers: false,
45            create_debit_credit_edges: true,
46            include_document_nodes: false,
47            min_edge_weight: 0.0,
48            aggregate_parallel_edges: false,
49            framework: None,
50        }
51    }
52}
53
54/// Builder for transaction graphs.
55pub struct TransactionGraphBuilder {
56    config: TransactionGraphConfig,
57    graph: Graph,
58    /// Framework-aware account classification.
59    framework_accounts: FrameworkAccounts,
60    /// Map from account code to node ID.
61    account_nodes: HashMap<String, NodeId>,
62    /// Map from document number to node ID (if document nodes enabled).
63    document_nodes: HashMap<String, NodeId>,
64    /// For edge aggregation: (source, target) -> aggregated amount.
65    edge_aggregation: HashMap<(NodeId, NodeId), AggregatedEdge>,
66}
67
68impl TransactionGraphBuilder {
69    /// Creates a new transaction graph builder.
70    pub fn new(config: TransactionGraphConfig) -> Self {
71        let framework_accounts =
72            FrameworkAccounts::for_framework(config.framework.as_deref().unwrap_or("us_gaap"));
73        Self {
74            config,
75            graph: Graph::new("transaction_network", GraphType::Transaction),
76            framework_accounts,
77            account_nodes: HashMap::new(),
78            document_nodes: HashMap::new(),
79            edge_aggregation: HashMap::new(),
80        }
81    }
82
83    /// Adds a journal entry to the graph.
84    pub fn add_journal_entry(&mut self, entry: &JournalEntry) {
85        if self.config.include_document_nodes {
86            self.add_journal_entry_with_document(entry);
87        } else if self.config.create_debit_credit_edges {
88            self.add_journal_entry_debit_credit(entry);
89        }
90    }
91
92    /// Adds journal entry creating edges between debit and credit accounts.
93    fn add_journal_entry_debit_credit(&mut self, entry: &JournalEntry) {
94        // Collect debit and credit lines
95        let debits: Vec<_> = entry
96            .lines
97            .iter()
98            .filter(|l| l.debit_amount > Decimal::ZERO)
99            .collect();
100
101        let credits: Vec<_> = entry
102            .lines
103            .iter()
104            .filter(|l| l.credit_amount > Decimal::ZERO)
105            .collect();
106
107        // Create edges from debit accounts to credit accounts
108        for debit in &debits {
109            let source_id = self.get_or_create_account_node(
110                debit.account_code(),
111                debit.account_description(),
112                entry.company_code(),
113            );
114
115            for credit in &credits {
116                let target_id = self.get_or_create_account_node(
117                    credit.account_code(),
118                    credit.account_description(),
119                    entry.company_code(),
120                );
121
122                // Calculate edge weight (proportional allocation)
123                let total_debit: Decimal = debits.iter().map(|d| d.debit_amount).sum();
124                let total_credit: Decimal = credits.iter().map(|c| c.credit_amount).sum();
125
126                let proportion =
127                    (debit.debit_amount / total_debit) * (credit.credit_amount / total_credit);
128                let edge_amount = debit.debit_amount * proportion;
129                let edge_weight: f64 = edge_amount.try_into().unwrap_or(0.0);
130
131                if edge_weight < self.config.min_edge_weight {
132                    continue;
133                }
134
135                if self.config.aggregate_parallel_edges {
136                    self.aggregate_edge(source_id, target_id, edge_weight, entry);
137                } else {
138                    let mut tx_edge = TransactionEdge::new(
139                        0,
140                        source_id,
141                        target_id,
142                        entry.document_number(),
143                        entry.posting_date(),
144                        edge_amount,
145                        true,
146                    );
147                    tx_edge.company_code = entry.company_code().to_string();
148                    tx_edge.cost_center = debit.cost_center.clone();
149                    tx_edge.business_process = entry
150                        .header
151                        .business_process
152                        .as_ref()
153                        .map(|bp| format!("{:?}", bp));
154                    tx_edge.compute_features();
155
156                    // Propagate anomaly flag from journal entry to graph edge
157                    if entry.header.is_anomaly {
158                        tx_edge.edge.is_anomaly = true;
159                        if let Some(ref anomaly_type) = entry.header.anomaly_type {
160                            tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
161                        }
162                    }
163
164                    self.graph.add_edge(tx_edge.edge);
165                }
166            }
167        }
168    }
169
170    /// Adds journal entry with document nodes.
171    fn add_journal_entry_with_document(&mut self, entry: &JournalEntry) {
172        // Create or get document node
173        let doc_id =
174            self.get_or_create_document_node(&entry.document_number(), entry.company_code());
175
176        // Create edges from document to each account
177        for line in &entry.lines {
178            let account_id = self.get_or_create_account_node(
179                line.account_code(),
180                line.account_description(),
181                entry.company_code(),
182            );
183
184            let is_debit = line.debit_amount > Decimal::ZERO;
185            let amount = if is_debit {
186                line.debit_amount
187            } else {
188                line.credit_amount
189            };
190
191            let mut tx_edge = TransactionEdge::new(
192                0,
193                doc_id,
194                account_id,
195                entry.document_number(),
196                entry.posting_date(),
197                amount,
198                is_debit,
199            );
200            tx_edge.company_code = entry.company_code().to_string();
201            tx_edge.cost_center = line.cost_center.clone();
202            tx_edge.business_process = entry
203                .header
204                .business_process
205                .as_ref()
206                .map(|bp| format!("{:?}", bp));
207            tx_edge.compute_features();
208
209            // Propagate anomaly flag from journal entry to graph edge
210            if entry.header.is_anomaly {
211                tx_edge.edge.is_anomaly = true;
212                if let Some(ref anomaly_type) = entry.header.anomaly_type {
213                    tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
214                }
215            }
216
217            self.graph.add_edge(tx_edge.edge);
218        }
219    }
220
221    /// Gets or creates an account node.
222    fn get_or_create_account_node(
223        &mut self,
224        account_code: &str,
225        account_name: &str,
226        company_code: &str,
227    ) -> NodeId {
228        let key = format!("{}_{}", company_code, account_code);
229
230        if let Some(&id) = self.account_nodes.get(&key) {
231            return id;
232        }
233
234        let category = self.framework_accounts.classify(account_code);
235        let mut account = AccountNode::new(
236            0,
237            account_code.to_string(),
238            account_name.to_string(),
239            Self::account_type_label(category),
240            company_code.to_string(),
241        );
242        account.is_balance_sheet = category.is_balance_sheet();
243        account.normal_balance = if category.is_debit_normal() {
244            "Debit".to_string()
245        } else {
246            "Credit".to_string()
247        };
248        account.compute_features();
249
250        let id = self.graph.add_node(account.node);
251        self.account_nodes.insert(key, id);
252        id
253    }
254
255    /// Gets or creates a document node.
256    fn get_or_create_document_node(&mut self, document_number: &str, company_code: &str) -> NodeId {
257        let key = format!("{}_{}", company_code, document_number);
258
259        if let Some(&id) = self.document_nodes.get(&key) {
260            return id;
261        }
262
263        let node = GraphNode::new(
264            0,
265            NodeType::JournalEntry,
266            document_number.to_string(),
267            document_number.to_string(),
268        );
269
270        let id = self.graph.add_node(node);
271        self.document_nodes.insert(key, id);
272        id
273    }
274
275    /// Aggregates edges between the same source and target.
276    fn aggregate_edge(
277        &mut self,
278        source: NodeId,
279        target: NodeId,
280        weight: f64,
281        entry: &JournalEntry,
282    ) {
283        let key = (source, target);
284        let agg = self.edge_aggregation.entry(key).or_insert(AggregatedEdge {
285            source,
286            target,
287            total_weight: 0.0,
288            count: 0,
289            first_date: entry.posting_date(),
290            last_date: entry.posting_date(),
291        });
292
293        agg.total_weight += weight;
294        agg.count += 1;
295        if entry.posting_date() < agg.first_date {
296            agg.first_date = entry.posting_date();
297        }
298        if entry.posting_date() > agg.last_date {
299            agg.last_date = entry.posting_date();
300        }
301    }
302
303    /// Returns a human-readable account type label from an [`AccountCategory`].
304    fn account_type_label(category: AccountCategory) -> String {
305        match category {
306            AccountCategory::Asset => "Asset".to_string(),
307            AccountCategory::Liability => "Liability".to_string(),
308            AccountCategory::Equity => "Equity".to_string(),
309            AccountCategory::Revenue => "Revenue".to_string(),
310            AccountCategory::Cogs
311            | AccountCategory::OperatingExpense
312            | AccountCategory::OtherIncomeExpense
313            | AccountCategory::Tax => "Expense".to_string(),
314            AccountCategory::Suspense | AccountCategory::Unknown => "Unknown".to_string(),
315        }
316    }
317
318    /// Builds the final graph.
319    pub fn build(mut self) -> Graph {
320        // If aggregating, create the aggregated edges now
321        if self.config.aggregate_parallel_edges {
322            for ((source, target), agg) in self.edge_aggregation {
323                let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
324                    .with_weight(agg.total_weight)
325                    .with_timestamp(agg.last_date);
326
327                // Add aggregation features
328                edge.features.push((agg.total_weight + 1.0).ln());
329                edge.features.push(agg.count as f64);
330
331                let duration = (agg.last_date - agg.first_date).num_days() as f64;
332                edge.features.push(duration);
333
334                self.graph.add_edge(edge);
335            }
336        }
337
338        self.graph.compute_statistics();
339        self.graph
340    }
341
342    /// Adds multiple journal entries.
343    pub fn add_journal_entries(&mut self, entries: &[JournalEntry]) {
344        for entry in entries {
345            self.add_journal_entry(entry);
346        }
347    }
348}
349
350/// Aggregated edge data.
351#[allow(dead_code)]
352struct AggregatedEdge {
353    source: NodeId,
354    target: NodeId,
355    total_weight: f64,
356    count: usize,
357    first_date: chrono::NaiveDate,
358    last_date: chrono::NaiveDate,
359}
360
361#[cfg(test)]
362#[allow(clippy::unwrap_used)]
363mod tests {
364    use super::*;
365    use datasynth_core::models::{BusinessProcess, JournalEntryLine};
366    use rust_decimal_macros::dec;
367
368    fn create_test_entry() -> JournalEntry {
369        let mut entry = JournalEntry::new_simple(
370            "JE001".to_string(),
371            "1000".to_string(),
372            chrono::NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
373            "Test Entry".to_string(),
374        );
375
376        let doc_id = entry.header.document_id;
377
378        entry.add_line(JournalEntryLine::debit(
379            doc_id,
380            1,
381            "1000".to_string(),
382            dec!(1000),
383        ));
384
385        entry.add_line(JournalEntryLine::credit(
386            doc_id,
387            2,
388            "4000".to_string(),
389            dec!(1000),
390        ));
391
392        entry
393    }
394
395    fn create_test_entry_with_business_process(bp: BusinessProcess) -> JournalEntry {
396        let mut entry = create_test_entry();
397        entry.header.business_process = Some(bp);
398        entry
399    }
400
401    #[test]
402    fn test_build_transaction_graph() {
403        let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
404        builder.add_journal_entry(&create_test_entry());
405
406        let graph = builder.build();
407
408        assert_eq!(graph.node_count(), 2); // Cash and Revenue
409        assert_eq!(graph.edge_count(), 1); // One transaction edge
410    }
411
412    #[test]
413    fn test_with_document_nodes() {
414        let config = TransactionGraphConfig {
415            include_document_nodes: true,
416            create_debit_credit_edges: false,
417            ..Default::default()
418        };
419
420        let mut builder = TransactionGraphBuilder::new(config);
421        builder.add_journal_entry(&create_test_entry());
422
423        let graph = builder.build();
424
425        assert_eq!(graph.node_count(), 3); // Document + Cash + Revenue
426        assert_eq!(graph.edge_count(), 2); // Document to each account
427    }
428
429    #[test]
430    fn test_business_process_edge_metadata() {
431        let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
432        let entry = create_test_entry_with_business_process(BusinessProcess::P2P);
433        builder.add_journal_entry(&entry);
434
435        let graph = builder.build();
436
437        // All edges should have the document_number property set
438        for edge in graph.edges.values() {
439            assert!(edge.properties.contains_key("document_number"));
440        }
441        assert_eq!(graph.edge_count(), 1);
442    }
443
444    #[test]
445    fn test_business_process_with_document_nodes() {
446        let config = TransactionGraphConfig {
447            include_document_nodes: true,
448            create_debit_credit_edges: false,
449            ..Default::default()
450        };
451
452        let mut builder = TransactionGraphBuilder::new(config);
453        let entry = create_test_entry_with_business_process(BusinessProcess::O2C);
454        builder.add_journal_entry(&entry);
455
456        let graph = builder.build();
457
458        assert_eq!(graph.node_count(), 3);
459        assert_eq!(graph.edge_count(), 2);
460    }
461}