datasynth_graph/builders/
transaction_graph.rs1use std::collections::HashMap;
8
9use rust_decimal::Decimal;
10
11use datasynth_core::models::JournalEntry;
12
13use crate::models::{
14 AccountNode, EdgeType, Graph, GraphEdge, GraphNode, GraphType, NodeId, NodeType,
15 TransactionEdge,
16};
17
18#[derive(Debug, Clone)]
20pub struct TransactionGraphConfig {
21 pub include_vendors: bool,
23 pub include_customers: bool,
25 pub create_debit_credit_edges: bool,
27 pub include_document_nodes: bool,
29 pub min_edge_weight: f64,
31 pub aggregate_parallel_edges: bool,
33}
34
35impl Default for TransactionGraphConfig {
36 fn default() -> Self {
37 Self {
38 include_vendors: false,
39 include_customers: false,
40 create_debit_credit_edges: true,
41 include_document_nodes: false,
42 min_edge_weight: 0.0,
43 aggregate_parallel_edges: false,
44 }
45 }
46}
47
48pub struct TransactionGraphBuilder {
50 config: TransactionGraphConfig,
51 graph: Graph,
52 account_nodes: HashMap<String, NodeId>,
54 document_nodes: HashMap<String, NodeId>,
56 edge_aggregation: HashMap<(NodeId, NodeId), AggregatedEdge>,
58}
59
60impl TransactionGraphBuilder {
61 pub fn new(config: TransactionGraphConfig) -> Self {
63 Self {
64 config,
65 graph: Graph::new("transaction_network", GraphType::Transaction),
66 account_nodes: HashMap::new(),
67 document_nodes: HashMap::new(),
68 edge_aggregation: HashMap::new(),
69 }
70 }
71
72 pub fn add_journal_entry(&mut self, entry: &JournalEntry) {
74 if self.config.include_document_nodes {
75 self.add_journal_entry_with_document(entry);
76 } else if self.config.create_debit_credit_edges {
77 self.add_journal_entry_debit_credit(entry);
78 }
79 }
80
81 fn add_journal_entry_debit_credit(&mut self, entry: &JournalEntry) {
83 let debits: Vec<_> = entry
85 .lines
86 .iter()
87 .filter(|l| l.debit_amount > Decimal::ZERO)
88 .collect();
89
90 let credits: Vec<_> = entry
91 .lines
92 .iter()
93 .filter(|l| l.credit_amount > Decimal::ZERO)
94 .collect();
95
96 for debit in &debits {
98 let source_id = self.get_or_create_account_node(
99 debit.account_code(),
100 debit.account_description(),
101 entry.company_code(),
102 );
103
104 for credit in &credits {
105 let target_id = self.get_or_create_account_node(
106 credit.account_code(),
107 credit.account_description(),
108 entry.company_code(),
109 );
110
111 let total_debit: Decimal = debits.iter().map(|d| d.debit_amount).sum();
113 let total_credit: Decimal = credits.iter().map(|c| c.credit_amount).sum();
114
115 let proportion =
116 (debit.debit_amount / total_debit) * (credit.credit_amount / total_credit);
117 let edge_amount = debit.debit_amount * proportion;
118 let edge_weight: f64 = edge_amount.try_into().unwrap_or(0.0);
119
120 if edge_weight < self.config.min_edge_weight {
121 continue;
122 }
123
124 if self.config.aggregate_parallel_edges {
125 self.aggregate_edge(source_id, target_id, edge_weight, entry);
126 } else {
127 let mut tx_edge = TransactionEdge::new(
128 0,
129 source_id,
130 target_id,
131 entry.document_number(),
132 entry.posting_date(),
133 edge_amount,
134 true,
135 );
136 tx_edge.company_code = entry.company_code().to_string();
137 tx_edge.cost_center = debit.cost_center.clone();
138 tx_edge.business_process = entry
139 .header
140 .business_process
141 .as_ref()
142 .map(|bp| format!("{:?}", bp));
143 tx_edge.compute_features();
144
145 if entry.header.is_anomaly {
147 tx_edge.edge.is_anomaly = true;
148 if let Some(ref anomaly_type) = entry.header.anomaly_type {
149 tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
150 }
151 }
152
153 self.graph.add_edge(tx_edge.edge);
154 }
155 }
156 }
157 }
158
159 fn add_journal_entry_with_document(&mut self, entry: &JournalEntry) {
161 let doc_id =
163 self.get_or_create_document_node(&entry.document_number(), entry.company_code());
164
165 for line in &entry.lines {
167 let account_id = self.get_or_create_account_node(
168 line.account_code(),
169 line.account_description(),
170 entry.company_code(),
171 );
172
173 let is_debit = line.debit_amount > Decimal::ZERO;
174 let amount = if is_debit {
175 line.debit_amount
176 } else {
177 line.credit_amount
178 };
179
180 let mut tx_edge = TransactionEdge::new(
181 0,
182 doc_id,
183 account_id,
184 entry.document_number(),
185 entry.posting_date(),
186 amount,
187 is_debit,
188 );
189 tx_edge.company_code = entry.company_code().to_string();
190 tx_edge.cost_center = line.cost_center.clone();
191 tx_edge.business_process = entry
192 .header
193 .business_process
194 .as_ref()
195 .map(|bp| format!("{:?}", bp));
196 tx_edge.compute_features();
197
198 if entry.header.is_anomaly {
200 tx_edge.edge.is_anomaly = true;
201 if let Some(ref anomaly_type) = entry.header.anomaly_type {
202 tx_edge.edge.anomaly_type = Some(format!("{:?}", anomaly_type));
203 }
204 }
205
206 self.graph.add_edge(tx_edge.edge);
207 }
208 }
209
210 fn get_or_create_account_node(
212 &mut self,
213 account_code: &str,
214 account_name: &str,
215 company_code: &str,
216 ) -> NodeId {
217 let key = format!("{}_{}", company_code, account_code);
218
219 if let Some(&id) = self.account_nodes.get(&key) {
220 return id;
221 }
222
223 let mut account = AccountNode::new(
224 0,
225 account_code.to_string(),
226 account_name.to_string(),
227 Self::infer_account_type(account_code),
228 company_code.to_string(),
229 );
230 account.is_balance_sheet = Self::is_balance_sheet_account(account_code);
231 account.normal_balance = Self::infer_normal_balance(account_code);
232 account.compute_features();
233
234 let id = self.graph.add_node(account.node);
235 self.account_nodes.insert(key, id);
236 id
237 }
238
239 fn get_or_create_document_node(&mut self, document_number: &str, company_code: &str) -> NodeId {
241 let key = format!("{}_{}", company_code, document_number);
242
243 if let Some(&id) = self.document_nodes.get(&key) {
244 return id;
245 }
246
247 let node = GraphNode::new(
248 0,
249 NodeType::JournalEntry,
250 document_number.to_string(),
251 document_number.to_string(),
252 );
253
254 let id = self.graph.add_node(node);
255 self.document_nodes.insert(key, id);
256 id
257 }
258
259 fn aggregate_edge(
261 &mut self,
262 source: NodeId,
263 target: NodeId,
264 weight: f64,
265 entry: &JournalEntry,
266 ) {
267 let key = (source, target);
268 let agg = self.edge_aggregation.entry(key).or_insert(AggregatedEdge {
269 source,
270 target,
271 total_weight: 0.0,
272 count: 0,
273 first_date: entry.posting_date(),
274 last_date: entry.posting_date(),
275 });
276
277 agg.total_weight += weight;
278 agg.count += 1;
279 if entry.posting_date() < agg.first_date {
280 agg.first_date = entry.posting_date();
281 }
282 if entry.posting_date() > agg.last_date {
283 agg.last_date = entry.posting_date();
284 }
285 }
286
287 fn infer_account_type(account_code: &str) -> String {
289 if account_code.is_empty() {
290 return "Unknown".to_string();
291 }
292
293 match account_code.chars().next().unwrap() {
294 '1' => "Asset".to_string(),
295 '2' => "Liability".to_string(),
296 '3' => "Equity".to_string(),
297 '4' => "Revenue".to_string(),
298 '5' | '6' | '7' => "Expense".to_string(),
299 _ => "Unknown".to_string(),
300 }
301 }
302
303 fn is_balance_sheet_account(account_code: &str) -> bool {
305 if account_code.is_empty() {
306 return false;
307 }
308
309 matches!(account_code.chars().next().unwrap(), '1' | '2' | '3')
310 }
311
312 fn infer_normal_balance(account_code: &str) -> String {
314 if account_code.is_empty() {
315 return "Debit".to_string();
316 }
317
318 match account_code.chars().next().unwrap() {
319 '1' | '5' | '6' | '7' => "Debit".to_string(),
320 '2' | '3' | '4' => "Credit".to_string(),
321 _ => "Debit".to_string(),
322 }
323 }
324
325 pub fn build(mut self) -> Graph {
327 if self.config.aggregate_parallel_edges {
329 for ((source, target), agg) in self.edge_aggregation {
330 let mut edge = GraphEdge::new(0, source, target, EdgeType::Transaction)
331 .with_weight(agg.total_weight)
332 .with_timestamp(agg.last_date);
333
334 edge.features.push((agg.total_weight + 1.0).ln());
336 edge.features.push(agg.count as f64);
337
338 let duration = (agg.last_date - agg.first_date).num_days() as f64;
339 edge.features.push(duration);
340
341 self.graph.add_edge(edge);
342 }
343 }
344
345 self.graph.compute_statistics();
346 self.graph
347 }
348
349 pub fn add_journal_entries(&mut self, entries: &[JournalEntry]) {
351 for entry in entries {
352 self.add_journal_entry(entry);
353 }
354 }
355}
356
357#[allow(dead_code)]
359struct AggregatedEdge {
360 source: NodeId,
361 target: NodeId,
362 total_weight: f64,
363 count: usize,
364 first_date: chrono::NaiveDate,
365 last_date: chrono::NaiveDate,
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use datasynth_core::models::{BusinessProcess, JournalEntryLine};
372 use rust_decimal_macros::dec;
373
374 fn create_test_entry() -> JournalEntry {
375 let mut entry = JournalEntry::new_simple(
376 "JE001".to_string(),
377 "1000".to_string(),
378 chrono::NaiveDate::from_ymd_opt(2024, 1, 15).unwrap(),
379 "Test Entry".to_string(),
380 );
381
382 let doc_id = entry.header.document_id;
383
384 entry.add_line(JournalEntryLine::debit(
385 doc_id,
386 1,
387 "1000".to_string(),
388 dec!(1000),
389 ));
390
391 entry.add_line(JournalEntryLine::credit(
392 doc_id,
393 2,
394 "4000".to_string(),
395 dec!(1000),
396 ));
397
398 entry
399 }
400
401 fn create_test_entry_with_business_process(bp: BusinessProcess) -> JournalEntry {
402 let mut entry = create_test_entry();
403 entry.header.business_process = Some(bp);
404 entry
405 }
406
407 #[test]
408 fn test_build_transaction_graph() {
409 let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
410 builder.add_journal_entry(&create_test_entry());
411
412 let graph = builder.build();
413
414 assert_eq!(graph.node_count(), 2); assert_eq!(graph.edge_count(), 1); }
417
418 #[test]
419 fn test_with_document_nodes() {
420 let config = TransactionGraphConfig {
421 include_document_nodes: true,
422 create_debit_credit_edges: false,
423 ..Default::default()
424 };
425
426 let mut builder = TransactionGraphBuilder::new(config);
427 builder.add_journal_entry(&create_test_entry());
428
429 let graph = builder.build();
430
431 assert_eq!(graph.node_count(), 3); assert_eq!(graph.edge_count(), 2); }
434
435 #[test]
436 fn test_business_process_edge_metadata() {
437 let mut builder = TransactionGraphBuilder::new(TransactionGraphConfig::default());
438 let entry = create_test_entry_with_business_process(BusinessProcess::P2P);
439 builder.add_journal_entry(&entry);
440
441 let graph = builder.build();
442
443 for edge in graph.edges.values() {
445 assert!(edge.properties.contains_key("document_number"));
446 }
447 assert_eq!(graph.edge_count(), 1);
448 }
449
450 #[test]
451 fn test_business_process_with_document_nodes() {
452 let config = TransactionGraphConfig {
453 include_document_nodes: true,
454 create_debit_credit_edges: false,
455 ..Default::default()
456 };
457
458 let mut builder = TransactionGraphBuilder::new(config);
459 let entry = create_test_entry_with_business_process(BusinessProcess::O2C);
460 builder.add_journal_entry(&entry);
461
462 let graph = builder.build();
463
464 assert_eq!(graph.node_count(), 3);
465 assert_eq!(graph.edge_count(), 2);
466 }
467}