datasynth_graph/builders/
transaction_graph.rs1use std::collections::HashMap;
8
9use rust_decimal::Decimal;
10
11use datasynth_core::accounts::AccountCategory;
12use datasynth_core::framework_accounts::FrameworkAccounts;
13use datasynth_core::models::JournalEntry;
14
15use crate::models::{
16 AccountNode, EdgeType, Graph, GraphEdge, GraphNode, GraphType, NodeId, NodeType,
17 TransactionEdge,
18};
19
20#[derive(Debug, Clone)]
22pub struct TransactionGraphConfig {
23 pub include_vendors: bool,
25 pub include_customers: bool,
27 pub create_debit_credit_edges: bool,
29 pub include_document_nodes: bool,
31 pub min_edge_weight: f64,
33 pub aggregate_parallel_edges: bool,
35 pub framework: Option<String>,
38}
39
40impl Default for TransactionGraphConfig {
41 fn default() -> Self {
42 Self {
43 include_vendors: false,
44 include_customers: false,
45 create_debit_credit_edges: true,
46 include_document_nodes: false,
47 min_edge_weight: 0.0,
48 aggregate_parallel_edges: false,
49 framework: None,
50 }
51 }
52}
53
54pub struct TransactionGraphBuilder {
56 config: TransactionGraphConfig,
57 graph: Graph,
58 framework_accounts: FrameworkAccounts,
60 account_nodes: HashMap<String, NodeId>,
62 document_nodes: HashMap<String, NodeId>,
64 edge_aggregation: HashMap<(NodeId, NodeId), AggregatedEdge>,
66}
67
68impl TransactionGraphBuilder {
69 pub fn new(config: TransactionGraphConfig) -> Self {
71 let framework_accounts =
72 FrameworkAccounts::for_framework(config.framework.as_deref().unwrap_or("us_gaap"));
73 Self {
74 config,
75 graph: Graph::new("transaction_network", GraphType::Transaction),
76 framework_accounts,
77 account_nodes: HashMap::new(),
78 document_nodes: HashMap::new(),
79 edge_aggregation: HashMap::new(),
80 }
81 }
82
83 pub fn add_journal_entry(&mut self, entry: &JournalEntry) {
85 if self.config.include_document_nodes {
86 self.add_journal_entry_with_document(entry);
87 } else if self.config.create_debit_credit_edges {
88 self.add_journal_entry_debit_credit(entry);
89 }
90 }
91
92 fn add_journal_entry_debit_credit(&mut self, entry: &JournalEntry) {
94 let debits: Vec<_> = entry
96 .lines
97 .iter()
98 .filter(|l| l.debit_amount > Decimal::ZERO)
99 .collect();
100
101 let credits: Vec<_> = entry
102 .lines
103 .iter()
104 .filter(|l| l.credit_amount > Decimal::ZERO)
105 .collect();
106
107 for debit in &debits {
109 let source_id = self.get_or_create_account_node(
110 debit.account_code(),
111 debit.account_description(),
112 entry.company_code(),
113 );
114
115 for credit in &credits {
116 let target_id = self.get_or_create_account_node(
117 credit.account_code(),
118 credit.account_description(),
119 entry.company_code(),
120 );
121
122 let total_debit: Decimal = debits.iter().map(|d| d.debit_amount).sum();
124 let total_credit: Decimal = credits.iter().map(|c| c.credit_amount).sum();
125
126 let proportion =
127 (debit.debit_amount / total_debit) * (credit.credit_amount / total_credit);
128 let edge_amount = debit.debit_amount * proportion;
129 let edge_weight: f64 = edge_amount.try_into().unwrap_or(0.0);
130
131 if edge_weight < self.config.min_edge_weight {
132 continue;
133 }
134
135 if self.config.aggregate_parallel_edges {
136 self.aggregate_edge(source_id, target_id, edge_weight, entry);
137 } else {
138 let mut tx_edge = TransactionEdge::new(
139 0,
140 source_id,
141 target_id,
142 entry.document_number(),
143 entry.posting_date(),
144 edge_amount,
145 true,
146 );
147 tx_edge.company_code = entry.company_code().to_string();
148 tx_edge.cost_center = debit.cost_center.clone();
149 tx_edge.business_process = entry
150 .header
151 .business_process
152 .as_ref()
153 .map(|bp| format!("{:?}", bp));
154 tx_edge.compute_features();
155
156 if entry.header.is_anomaly {
158 tx_edge.edge.is_anomaly = true;
159 if let Some(ref anomaly_type) = entry.header.anomaly_type {
160 tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
161 }
162 }
163
164 self.graph.add_edge(tx_edge.edge);
165 }
166 }
167 }
168 }
169
170 fn add_journal_entry_with_document(&mut self, entry: &JournalEntry) {
172 let doc_id =
174 self.get_or_create_document_node(&entry.document_number(), entry.company_code());
175
176 for line in &entry.lines {
178 let account_id = self.get_or_create_account_node(
179 line.account_code(),
180 line.account_description(),
181 entry.company_code(),
182 );
183
184 let is_debit = line.debit_amount > Decimal::ZERO;
185 let amount = if is_debit {
186 line.debit_amount
187 } else {
188 line.credit_amount
189 };
190
191 let mut tx_edge = TransactionEdge::new(
192 0,
193 doc_id,
194 account_id,
195 entry.document_number(),
196 entry.posting_date(),
197 amount,
198 is_debit,
199 );
200 tx_edge.company_code = entry.company_code().to_string();
201 tx_edge.cost_center = line.cost_center.clone();
202 tx_edge.business_process = entry
203 .header
204 .business_process
205 .as_ref()
206 .map(|bp| format!("{:?}", bp));
207 tx_edge.compute_features();
208
209 if entry.header.is_anomaly {
211 tx_edge.edge.is_anomaly = true;
212 if let Some(ref anomaly_type) = entry.header.anomaly_type {
213 tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
214 }
215 }
216
217 self.graph.add_edge(tx_edge.edge);
218 }
219 }
220
221 fn get_or_create_account_node(
223 &mut self,
224 account_code: &str,
225 account_name: &str,
226 company_code: &str,
227 ) -> NodeId {
228 let key = format!("{}_{}", company_code, account_code);
229
230 if let Some(&id) = self.account_nodes.get(&key) {
231 return id;
232 }
233
234 let category = self.framework_accounts.classify(account_code);
235 let mut account = AccountNode::new(
236 0,
237 account_code.to_string(),
238 account_name.to_string(),
239 Self::account_type_label(category),
240 company_code.to_string(),
241 );
242 account.is_balance_sheet = category.is_balance_sheet();
243 account.normal_balance = if category.is_debit_normal() {
244 "Debit".to_string()
245 } else {
246 "Credit".to_string()
247 };
248 account.compute_features();
249
250 let id = self.graph.add_node(account.node);
251 self.account_nodes.insert(key, id);
252 id
253 }
254
255 fn get_or_create_document_node(&mut self, document_number: &str, company_code: &str) -> NodeId {
257 let key = format!("{}_{}", company_code, document_number);
258
259 if let Some(&id) = self.document_nodes.get(&key) {
260 return id;
261 }
262
263 let node = GraphNode::new(
264 0,
265 NodeType::JournalEntry,
266 document_number.to_string(),
267 document_number.to_string(),
268 );
269
270 let id = self.graph.add_node(node);
271 self.document_nodes.insert(key, id);
272 id
273 }
274
275 fn aggregate_edge(
277 &mut self,
278 source: NodeId,
279 target: NodeId,
280 weight: f64,
281 entry: &JournalEntry,
282 ) {
283 let key = (source, target);
284 let agg = self.edge_aggregation.entry(key).or_insert(AggregatedEdge {
285 source,
286 target,
287 total_weight: 0.0,
288 count: 0,
289 first_date: entry.posting_date(),
290 last_date: entry.posting_date(),
291 });
292
293 agg.total_weight += weight;
294 agg.count += 1;
295 if entry.posting_date() < agg.first_date {
296 agg.first_date = entry.posting_date();
297 }
298 if entry.posting_date() > agg.last_date {
299 agg.last_date = entry.posting_date();
300 }
301 }
302
303 fn account_type_label(category: AccountCategory) -> String {
305 match category {
306 AccountCategory::Asset => "Asset".to_string(),
307 AccountCategory::Liability => "Liability".to_string(),
308 AccountCategory::Equity => "Equity".to_string(),
309 AccountCategory::Revenue => "Revenue".to_string(),
310 AccountCategory::Cogs
311 | AccountCategory::OperatingExpense
312 | AccountCategory::OtherIncomeExpense
313 | AccountCategory::Tax => "Expense".to_string(),
314 AccountCategory::Suspense | AccountCategory::Unknown => "Unknown".to_string(),
315 }
316 }
317
318 pub fn build(mut self) -> Graph {
320 if self.config.aggregate_parallel_edges {
322 for ((source, target), agg) in self.edge_aggregation {
323 let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
324 .with_weight(agg.total_weight)
325 .with_timestamp(agg.last_date);
326
327 edge.features.push((agg.total_weight + 1.0).ln());
329 edge.features.push(agg.count as f64);
330
331 let duration = (agg.last_date - agg.first_date).num_days() as f64;
332 edge.features.push(duration);
333
334 self.graph.add_edge(edge);
335 }
336 }
337
338 self.graph.compute_statistics();
339 self.graph
340 }
341
342 pub fn add_journal_entries(&mut self, entries: &[JournalEntry]) {
344 for entry in entries {
345 self.add_journal_entry(entry);
346 }
347 }
348}
349
350#[allow(dead_code)]
352struct AggregatedEdge {
353 source: NodeId,
354 target: NodeId,
355 total_weight: f64,
356 count: usize,
357 first_date: chrono::NaiveDate,
358 last_date: chrono::NaiveDate,
359}
360
361#[cfg(test)]
362#[allow(clippy::unwrap_used)]
363mod tests {
364 use super::*;
365 use datasynth_core::models::{BusinessProcess, JournalEntryLine};
366 use rust_decimal_macros::dec;
367
368 fn create_test_entry() -> JournalEntry {
369 let mut entry = JournalEntry::new_simple(
370 "JE001".to_string(),
371 "1000".to_string(),
372 chrono::NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
373 "Test Entry".to_string(),
374 );
375
376 let doc_id = entry.header.document_id;
377
378 entry.add_line(JournalEntryLine::debit(
379 doc_id,
380 1,
381 "1000".to_string(),
382 dec!(1000),
383 ));
384
385 entry.add_line(JournalEntryLine::credit(
386 doc_id,
387 2,
388 "4000".to_string(),
389 dec!(1000),
390 ));
391
392 entry
393 }
394
395 fn create_test_entry_with_business_process(bp: BusinessProcess) -> JournalEntry {
396 let mut entry = create_test_entry();
397 entry.header.business_process = Some(bp);
398 entry
399 }
400
401 #[test]
402 fn test_build_transaction_graph() {
403 let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
404 builder.add_journal_entry(&create_test_entry());
405
406 let graph = builder.build();
407
408 assert_eq!(graph.node_count(), 2); assert_eq!(graph.edge_count(), 1); }
411
412 #[test]
413 fn test_with_document_nodes() {
414 let config = TransactionGraphConfig {
415 include_document_nodes: true,
416 create_debit_credit_edges: false,
417 ..Default::default()
418 };
419
420 let mut builder = TransactionGraphBuilder::new(config);
421 builder.add_journal_entry(&create_test_entry());
422
423 let graph = builder.build();
424
425 assert_eq!(graph.node_count(), 3); assert_eq!(graph.edge_count(), 2); }
428
429 #[test]
430 fn test_business_process_edge_metadata() {
431 let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
432 let entry = create_test_entry_with_business_process(BusinessProcess::P2P);
433 builder.add_journal_entry(&entry);
434
435 let graph = builder.build();
436
437 for edge in graph.edges.values() {
439 assert!(edge.properties.contains_key("document_number"));
440 }
441 assert_eq!(graph.edge_count(), 1);
442 }
443
444 #[test]
445 fn test_business_process_with_document_nodes() {
446 let config = TransactionGraphConfig {
447 include_document_nodes: true,
448 create_debit_credit_edges: false,
449 ..Default::default()
450 };
451
452 let mut builder = TransactionGraphBuilder::new(config);
453 let entry = create_test_entry_with_business_process(BusinessProcess::O2C);
454 builder.add_journal_entry(&entry);
455
456 let graph = builder.build();
457
458 assert_eq!(graph.node_count(), 3);
459 assert_eq!(graph.edge_count(), 2);
460 }
461}