datasynth-graph 3.0.0

Graph/network export for synthetic accounting data - supports PyTorch Geometric, Neo4j, and DGL formats
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
//! Transaction graph builder.
//!
//! Builds a graph where:
//! - Nodes are GL accounts (or entities like vendors, customers)
//! - Edges are transactions (journal entry lines)

use std::collections::HashMap;

use rust_decimal::Decimal;

use datasynth_core::accounts::AccountCategory;
use datasynth_core::framework_accounts::FrameworkAccounts;
use datasynth_core::models::JournalEntry;

use crate::models::{
    AccountNode, EdgeType, Graph, GraphEdge, GraphNode, GraphType, NodeId, NodeType,
    TransactionEdge,
};

/// Configuration for transaction graph building.
#[derive(Debug, Clone)]
pub struct TransactionGraphConfig {
    /// Whether to include vendor nodes.
    pub include_vendors: bool,
    /// Whether to include customer nodes.
    pub include_customers: bool,
    /// Whether to create edges between debit and credit accounts.
    pub create_debit_credit_edges: bool,
    /// Whether to create edges from/to document nodes.
    pub include_document_nodes: bool,
    /// Minimum edge weight to include.
    pub min_edge_weight: f64,
    /// Whether to aggregate parallel edges.
    pub aggregate_parallel_edges: bool,
    /// Accounting framework for account classification (e.g. `"us_gaap"`,
    /// `"french_gaap"`, `"german_gaap"`, `"ifrs"`). Defaults to `"us_gaap"`.
    pub framework: Option<String>,
}

impl Default for TransactionGraphConfig {
    fn default() -> Self {
        Self {
            include_vendors: false,
            include_customers: false,
            create_debit_credit_edges: true,
            include_document_nodes: false,
            min_edge_weight: 0.0,
            aggregate_parallel_edges: false,
            framework: None,
        }
    }
}

/// Builder for transaction graphs.
pub struct TransactionGraphBuilder {
    config: TransactionGraphConfig,
    graph: Graph,
    /// Framework-aware account classification.
    framework_accounts: FrameworkAccounts,
    /// Map from account code to node ID.
    account_nodes: HashMap<String, NodeId>,
    /// Map from document number to node ID (if document nodes enabled).
    document_nodes: HashMap<String, NodeId>,
    /// For edge aggregation: (source, target) -> aggregated amount.
    edge_aggregation: HashMap<(NodeId, NodeId), AggregatedEdge>,
}

impl TransactionGraphBuilder {
    /// Creates a new transaction graph builder.
    pub fn new(config: TransactionGraphConfig) -> Self {
        let framework_accounts =
            FrameworkAccounts::for_framework(config.framework.as_deref().unwrap_or("us_gaap"));
        Self {
            config,
            graph: Graph::new("transaction_network", GraphType::Transaction),
            framework_accounts,
            account_nodes: HashMap::new(),
            document_nodes: HashMap::new(),
            edge_aggregation: HashMap::new(),
        }
    }

    /// Adds a journal entry to the graph.
    pub fn add_journal_entry(&mut self, entry: &JournalEntry) {
        if self.config.include_document_nodes {
            self.add_journal_entry_with_document(entry);
        } else if self.config.create_debit_credit_edges {
            self.add_journal_entry_debit_credit(entry);
        }
    }

    /// Adds journal entry creating edges between debit and credit accounts.
    fn add_journal_entry_debit_credit(&mut self, entry: &JournalEntry) {
        // Collect debit and credit lines
        let debits: Vec<_> = entry
            .lines
            .iter()
            .filter(|l| l.debit_amount > Decimal::ZERO)
            .collect();

        let credits: Vec<_> = entry
            .lines
            .iter()
            .filter(|l| l.credit_amount > Decimal::ZERO)
            .collect();

        // Create edges from debit accounts to credit accounts
        for debit in &debits {
            let source_id = self.get_or_create_account_node(
                debit.account_code(),
                debit.account_description(),
                entry.company_code(),
            );

            for credit in &credits {
                let target_id = self.get_or_create_account_node(
                    credit.account_code(),
                    credit.account_description(),
                    entry.company_code(),
                );

                // Calculate edge weight (proportional allocation)
                let total_debit: Decimal = debits.iter().map(|d| d.debit_amount).sum();
                let total_credit: Decimal = credits.iter().map(|c| c.credit_amount).sum();

                let proportion =
                    (debit.debit_amount / total_debit) * (credit.credit_amount / total_credit);
                let edge_amount = debit.debit_amount * proportion;
                let edge_weight: f64 = edge_amount.try_into().unwrap_or(0.0);

                if edge_weight < self.config.min_edge_weight {
                    continue;
                }

                if self.config.aggregate_parallel_edges {
                    self.aggregate_edge(source_id, target_id, edge_weight, entry);
                } else {
                    let mut tx_edge = TransactionEdge::new(
                        0,
                        source_id,
                        target_id,
                        entry.document_number(),
                        entry.posting_date(),
                        edge_amount,
                        true,
                    );
                    tx_edge.company_code = entry.company_code().to_string();
                    tx_edge.cost_center = debit.cost_center.clone();
                    tx_edge.business_process = entry
                        .header
                        .business_process
                        .as_ref()
                        .map(|bp| format!("{bp:?}"));
                    tx_edge.compute_features();

                    // Propagate anomaly flag from journal entry to graph edge
                    if entry.header.is_anomaly {
                        tx_edge.edge.is_anomaly = true;
                        if let Some(ref anomaly_type) = entry.header.anomaly_type {
                            tx_edge.edge.anomaly_type = Some(format!("{anomaly_type:?}"));
                        }
                    }

                    self.graph.add_edge(tx_edge.edge);
                }
            }
        }
    }

    /// Adds journal entry with document nodes.
    fn add_journal_entry_with_document(&mut self, entry: &JournalEntry) {
        // Create or get document node
        let doc_id =
            self.get_or_create_document_node(&entry.document_number(), entry.company_code());

        // Create edges from document to each account
        for line in &entry.lines {
            let account_id = self.get_or_create_account_node(
                line.account_code(),
                line.account_description(),
                entry.company_code(),
            );

            let is_debit = line.debit_amount > Decimal::ZERO;
            let amount = if is_debit {
                line.debit_amount
            } else {
                line.credit_amount
            };

            let mut tx_edge = TransactionEdge::new(
                0,
                doc_id,
                account_id,
                entry.document_number(),
                entry.posting_date(),
                amount,
                is_debit,
            );
            tx_edge.company_code = entry.company_code().to_string();
            tx_edge.cost_center = line.cost_center.clone();
            tx_edge.business_process = entry
                .header
                .business_process
                .as_ref()
                .map(|bp| format!("{bp:?}"));
            tx_edge.compute_features();

            // Propagate anomaly flag from journal entry to graph edge
            if entry.header.is_anomaly {
                tx_edge.edge.is_anomaly = true;
                if let Some(ref anomaly_type) = entry.header.anomaly_type {
                    tx_edge.edge.anomaly_type = Some(format!("{anomaly_type:?}"));
                }
            }

            self.graph.add_edge(tx_edge.edge);
        }
    }

    /// Gets or creates an account node.
    fn get_or_create_account_node(
        &mut self,
        account_code: &str,
        account_name: &str,
        company_code: &str,
    ) -> NodeId {
        let key = format!("{company_code}_{account_code}");

        if let Some(&id) = self.account_nodes.get(&key) {
            return id;
        }

        let category = self.framework_accounts.classify(account_code);
        let mut account = AccountNode::new(
            0,
            account_code.to_string(),
            account_name.to_string(),
            Self::account_type_label(category),
            company_code.to_string(),
        );
        account.is_balance_sheet = category.is_balance_sheet();
        account.normal_balance = if category.is_debit_normal() {
            "Debit".to_string()
        } else {
            "Credit".to_string()
        };
        account.compute_features();

        let id = self.graph.add_node(account.node);
        self.account_nodes.insert(key, id);
        id
    }

    /// Gets or creates a document node.
    fn get_or_create_document_node(&mut self, document_number: &str, company_code: &str) -> NodeId {
        let key = format!("{company_code}_{document_number}");

        if let Some(&id) = self.document_nodes.get(&key) {
            return id;
        }

        let node = GraphNode::new(
            0,
            NodeType::JournalEntry,
            document_number.to_string(),
            document_number.to_string(),
        );

        let id = self.graph.add_node(node);
        self.document_nodes.insert(key, id);
        id
    }

    /// Aggregates edges between the same source and target.
    fn aggregate_edge(
        &mut self,
        source: NodeId,
        target: NodeId,
        weight: f64,
        entry: &JournalEntry,
    ) {
        let key = (source, target);
        let agg = self.edge_aggregation.entry(key).or_insert(AggregatedEdge {
            total_weight: 0.0,
            count: 0,
            first_date: entry.posting_date(),
            last_date: entry.posting_date(),
        });

        agg.total_weight += weight;
        agg.count += 1;
        if entry.posting_date() < agg.first_date {
            agg.first_date = entry.posting_date();
        }
        if entry.posting_date() > agg.last_date {
            agg.last_date = entry.posting_date();
        }
    }

    /// Returns a human-readable account type label from an [`AccountCategory`].
    fn account_type_label(category: AccountCategory) -> String {
        match category {
            AccountCategory::Asset => "Asset".to_string(),
            AccountCategory::Liability => "Liability".to_string(),
            AccountCategory::Equity => "Equity".to_string(),
            AccountCategory::Revenue => "Revenue".to_string(),
            AccountCategory::Cogs
            | AccountCategory::OperatingExpense
            | AccountCategory::OtherIncomeExpense
            | AccountCategory::Tax => "Expense".to_string(),
            AccountCategory::Suspense | AccountCategory::Unknown => "Unknown".to_string(),
        }
    }

    /// Builds the final graph.
    pub fn build(mut self) -> Graph {
        // If aggregating, create the aggregated edges now
        if self.config.aggregate_parallel_edges {
            for ((source, target), agg) in self.edge_aggregation {
                let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
                    .with_weight(agg.total_weight)
                    .with_timestamp(agg.last_date);

                // Add aggregation features
                edge.features.push((agg.total_weight + 1.0).ln());
                edge.features.push(agg.count as f64);

                let duration = (agg.last_date - agg.first_date).num_days() as f64;
                edge.features.push(duration);

                self.graph.add_edge(edge);
            }
        }

        self.graph.compute_statistics();
        self.graph
    }

    /// Adds multiple journal entries.
    pub fn add_journal_entries(&mut self, entries: &[JournalEntry]) {
        for entry in entries {
            self.add_journal_entry(entry);
        }
    }
}

/// Aggregated edge data for combining multiple transactions between the same accounts.
struct AggregatedEdge {
    total_weight: f64,
    count: usize,
    first_date: chrono::NaiveDate,
    last_date: chrono::NaiveDate,
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
    use super::*;
    use datasynth_core::models::{BusinessProcess, JournalEntryLine};
    use rust_decimal_macros::dec;

    fn create_test_entry() -> JournalEntry {
        let mut entry = JournalEntry::new_simple(
            "JE001".to_string(),
            "1000".to_string(),
            chrono::NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
            "Test Entry".to_string(),
        );

        let doc_id = entry.header.document_id;

        entry.add_line(JournalEntryLine::debit(
            doc_id,
            1,
            "1000".to_string(),
            dec!(1000),
        ));

        entry.add_line(JournalEntryLine::credit(
            doc_id,
            2,
            "4000".to_string(),
            dec!(1000),
        ));

        entry
    }

    fn create_test_entry_with_business_process(bp: BusinessProcess) -> JournalEntry {
        let mut entry = create_test_entry();
        entry.header.business_process = Some(bp);
        entry
    }

    #[test]
    fn test_build_transaction_graph() {
        let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
        builder.add_journal_entry(&create_test_entry());

        let graph = builder.build();

        assert_eq!(graph.node_count(), 2); // Cash and Revenue
        assert_eq!(graph.edge_count(), 1); // One transaction edge
    }

    #[test]
    fn test_with_document_nodes() {
        let config = TransactionGraphConfig {
            include_document_nodes: true,
            create_debit_credit_edges: false,
            ..Default::default()
        };

        let mut builder = TransactionGraphBuilder::new(config);
        builder.add_journal_entry(&create_test_entry());

        let graph = builder.build();

        assert_eq!(graph.node_count(), 3); // Document + Cash + Revenue
        assert_eq!(graph.edge_count(), 2); // Document to each account
    }

    #[test]
    fn test_business_process_edge_metadata() {
        let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
        let entry = create_test_entry_with_business_process(BusinessProcess::P2P);
        builder.add_journal_entry(&entry);

        let graph = builder.build();

        // All edges should have the document_number property set
        for edge in graph.edges.values() {
            assert!(edge.properties.contains_key("document_number"));
        }
        assert_eq!(graph.edge_count(), 1);
    }

    #[test]
    fn test_business_process_with_document_nodes() {
        let config = TransactionGraphConfig {
            include_document_nodes: true,
            create_debit_credit_edges: false,
            ..Default::default()
        };

        let mut builder = TransactionGraphBuilder::new(config);
        let entry = create_test_entry_with_business_process(BusinessProcess::O2C);
        builder.add_journal_entry(&entry);

        let graph = builder.build();

        assert_eq!(graph.node_count(), 3);
        assert_eq!(graph.edge_count(), 2);
    }
}