Skip to main content

datasynth_runtime/
je_network.rs

1//! Shared Method-A / Method-B/C edge-list builder for the JE network export.
2//!
3//! v5.10: extracted from `output_writer::write_je_network_csv` so the
4//! same logic can be reused by the `datasynth-group` aggregate emitter,
5//! which builds both per-entity and consolidated edge lists from many
6//! per-entity JE batches.
7//!
8//! The builder is pure — no I/O — and returns a `Vec<JeNetworkEdge>`.
9//! Single-entity callers feed the result into the existing CSV writer
10//! (preserving v5.8.0 byte-identical output); group callers serialise
11//! the same struct with extra contextual columns (entity_code,
12//! is_eliminated, eliminates_ic_pair_id) added at write time.
13
14use chrono::NaiveDate;
15use rust_decimal::Decimal;
16use std::collections::HashMap;
17use uuid::Uuid;
18
19use datasynth_config::JeNetworkMethod;
20use datasynth_core::models::JournalEntry;
21
22/// One row of the je_network output.
23///
24/// The 13 columns matching v5.8.0 CSV output are at the top; v5.10
25/// added the optional IC fields (Some only on group-context inputs).
26#[derive(Debug, Clone)]
27pub struct JeNetworkEdge {
28    pub edge_id: String,
29    pub document_id: Uuid,
30    pub posting_date: NaiveDate,
31    pub from_account: String,
32    pub to_account: String,
33    pub from_line_id: String,
34    pub to_line_id: String,
35    pub amount: Decimal,
36    pub confidence: f64,
37    pub predecessor_edge_id: String,
38    pub business_process: String,
39    pub is_fraud: bool,
40    pub is_anomaly: bool,
41    /// v5.27 — fine-grained fraud typology (the `FraudType` variant name,
42    /// e.g. `SuspenseAccountAbuse`), surfaced from the JE header so the edge
43    /// list carries the same label as the JE table. Empty on non-fraud edges.
44    pub fraud_type: Option<String>,
45    /// v5.10 — surfaces `JournalEntryHeader::ic_pair_id` when present.
46    /// `None` for non-IC postings (and on every edge from a single-entity run).
47    pub ic_pair_id: Option<String>,
48    /// v5.10 — surfaces `JournalEntryHeader::ic_partner_entity` when present.
49    pub ic_partner_entity: Option<String>,
50}
51
52/// Build the Method-A / Method-B/C edge list for a batch of JEs.
53///
54/// JEs are processed in the iteration order of the slice; predecessor
55/// edge ids are resolved against earlier JEs in the same batch (so the
56/// caller must pass JEs that share predecessor chains in the same call).
57///
58/// Method semantics — see Ivertowski et al. (2024) Methods A through E:
59/// * `JeNetworkMethod::A` — bijective on 2-line entries only; multi-line
60///   JEs are skipped. Confidence on every emitted edge = `1.0`.
61/// * `JeNetworkMethod::Cartesian` — full Cartesian debit × credit product
62///   with proportional amount allocation; confidence = `1 / (n × m)`.
63pub fn build_je_network_edges(jes: &[JournalEntry], method: JeNetworkMethod) -> Vec<JeNetworkEdge> {
64    let mut edges = Vec::with_capacity(jes.len() * 2);
65    let mut line_id_to_edge_id: HashMap<String, String> = HashMap::with_capacity(jes.len() * 2);
66
67    for je in jes {
68        let h = &je.header;
69
70        let line_ids: Vec<String> = je
71            .lines
72            .iter()
73            .map(|l| {
74                l.transaction_id.clone().unwrap_or_else(|| {
75                    datasynth_core::models::JournalEntryLine::derive_transaction_id(
76                        l.document_id,
77                        l.line_number,
78                    )
79                })
80            })
81            .collect();
82
83        let debits: Vec<usize> = je
84            .lines
85            .iter()
86            .enumerate()
87            .filter(|(_, l)| l.debit_amount > Decimal::ZERO)
88            .map(|(i, _)| i)
89            .collect();
90        let credits: Vec<usize> = je
91            .lines
92            .iter()
93            .enumerate()
94            .filter(|(_, l)| l.credit_amount > Decimal::ZERO)
95            .map(|(i, _)| i)
96            .collect();
97        if debits.is_empty() || credits.is_empty() {
98            continue;
99        }
100
101        if method == JeNetworkMethod::A && !(debits.len() == 1 && credits.len() == 1) {
102            continue;
103        }
104
105        let total_debit: Decimal = debits.iter().map(|i| je.lines[*i].debit_amount).sum();
106        let total_credit: Decimal = credits.iter().map(|i| je.lines[*i].credit_amount).sum();
107        if total_debit.is_zero() || total_credit.is_zero() {
108            continue;
109        }
110
111        let confidence: f64 = if debits.len() == 1 && credits.len() == 1 {
112            1.0
113        } else {
114            1.0 / (debits.len() * credits.len()) as f64
115        };
116
117        let bp = h
118            .business_process
119            .map(|bp| format!("{bp:?}"))
120            .unwrap_or_default();
121        let ic_pair_id_str = h.ic_pair_id.as_ref().map(|id| id.to_string());
122        let ic_partner = h.ic_partner_entity.clone();
123        let fraud_type_str = h.fraud_type.map(|ft| format!("{ft:?}"));
124
125        for &di in &debits {
126            let debit_line = &je.lines[di];
127            let to_line_id = &line_ids[di];
128            for &ci in &credits {
129                let credit_line = &je.lines[ci];
130                let from_line_id = &line_ids[ci];
131
132                // Edge id = UUID v5 of (document_id, debit.line_number,
133                // credit.line_number). Stable across regenerations.
134                let mut input = Vec::with_capacity(16 + 8);
135                input.extend_from_slice(h.document_id.as_bytes());
136                input.extend_from_slice(&debit_line.line_number.to_le_bytes());
137                input.extend_from_slice(&credit_line.line_number.to_le_bytes());
138                let edge_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, &input).to_string();
139
140                // Proportional allocation matches
141                // TransactionGraphBuilder::add_journal_entry_debit_credit.
142                let proportion = (debit_line.debit_amount / total_debit)
143                    * (credit_line.credit_amount / total_credit);
144                let amount = debit_line.debit_amount * proportion;
145
146                let predecessor_edge_id: String = credit_line
147                    .predecessor_line_id
148                    .as_ref()
149                    .or(debit_line.predecessor_line_id.as_ref())
150                    .and_then(|tx_id| line_id_to_edge_id.get(tx_id).cloned())
151                    .unwrap_or_default();
152
153                edges.push(JeNetworkEdge {
154                    edge_id: edge_id.clone(),
155                    document_id: h.document_id,
156                    posting_date: h.posting_date,
157                    from_account: credit_line.gl_account.clone(),
158                    to_account: debit_line.gl_account.clone(),
159                    from_line_id: from_line_id.clone(),
160                    to_line_id: to_line_id.clone(),
161                    amount,
162                    confidence,
163                    predecessor_edge_id,
164                    business_process: bp.clone(),
165                    is_fraud: h.is_fraud,
166                    is_anomaly: h.is_anomaly,
167                    fraud_type: fraud_type_str.clone(),
168                    ic_pair_id: ic_pair_id_str.clone(),
169                    ic_partner_entity: ic_partner.clone(),
170                });
171
172                line_id_to_edge_id
173                    .entry(from_line_id.clone())
174                    .or_insert(edge_id);
175            }
176        }
177    }
178
179    edges
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use datasynth_core::models::{JournalEntry, JournalEntryHeader, JournalEntryLine};
186
187    fn dec(v: i64) -> Decimal {
188        Decimal::from(v)
189    }
190
191    fn header_for(doc: Uuid) -> JournalEntryHeader {
192        let mut h = JournalEntryHeader::new(
193            "C001".to_string(),
194            NaiveDate::from_ymd_opt(2026, 5, 9).expect("2026-05-09 is a valid date"),
195        );
196        h.document_id = doc;
197        h
198    }
199
200    fn make_line(doc: Uuid, n: u32, account: &str, debit: i64, credit: i64) -> JournalEntryLine {
201        JournalEntryLine {
202            document_id: doc,
203            line_number: n,
204            gl_account: account.into(),
205            debit_amount: dec(debit),
206            credit_amount: dec(credit),
207            ..Default::default()
208        }
209    }
210
211    fn make_two_line_je(debit_account: &str, credit_account: &str, amount: i64) -> JournalEntry {
212        let document_id = Uuid::new_v4();
213        let header = header_for(document_id);
214        let lines = smallvec::smallvec![
215            make_line(document_id, 1, debit_account, amount, 0),
216            make_line(document_id, 2, credit_account, 0, amount),
217        ];
218        JournalEntry { header, lines }
219    }
220
221    #[test]
222    fn method_a_emits_one_edge_per_two_line_je() {
223        let jes = vec![
224            make_two_line_je("1000", "2000", 1_000),
225            make_two_line_je("1000", "4000", 5_000),
226        ];
227        let edges = build_je_network_edges(&jes, JeNetworkMethod::A);
228        assert_eq!(edges.len(), 2, "one edge per 2-line JE");
229        for e in &edges {
230            assert_eq!(e.confidence, 1.0, "Method A confidence is exactly 1.0");
231            assert!(e.ic_pair_id.is_none());
232            assert!(e.ic_partner_entity.is_none());
233        }
234    }
235
236    #[test]
237    fn method_a_skips_multi_line_jes() {
238        let document_id = Uuid::new_v4();
239        let header = header_for(document_id);
240        let lines = smallvec::smallvec![
241            make_line(document_id, 1, "1000", 1_000, 0),
242            make_line(document_id, 2, "1010", 500, 0),
243            make_line(document_id, 3, "2000", 0, 1_500),
244        ];
245        let je = JournalEntry { header, lines };
246        let edges = build_je_network_edges(&[je], JeNetworkMethod::A);
247        assert_eq!(edges.len(), 0, "3-line JE skipped under Method A");
248    }
249
250    #[test]
251    fn cartesian_emits_n_times_m_edges_per_je() {
252        let document_id = Uuid::new_v4();
253        let header = header_for(document_id);
254        let lines = smallvec::smallvec![
255            make_line(document_id, 1, "D1", 100, 0),
256            make_line(document_id, 2, "D2", 50, 0),
257            make_line(document_id, 3, "C1", 0, 80),
258            make_line(document_id, 4, "C2", 0, 70),
259        ];
260        let je = JournalEntry { header, lines };
261        let edges = build_je_network_edges(&[je], JeNetworkMethod::Cartesian);
262        assert_eq!(
263            edges.len(),
264            4,
265            "2 debits × 2 credits = 4 edges under Cartesian"
266        );
267        for e in &edges {
268            assert!((e.confidence - 0.25).abs() < 1e-9, "1/(n*m) = 0.25");
269        }
270    }
271
272    #[test]
273    fn ic_fields_surface_when_present_on_header() {
274        let document_id = Uuid::new_v4();
275        let mut header = header_for(document_id);
276        header.ic_partner_entity = Some("ACME_EUR".to_string());
277
278        let lines = smallvec::smallvec![
279            make_line(document_id, 1, "1150", 1000, 0),
280            make_line(document_id, 2, "4500", 0, 1000),
281        ];
282        let je = JournalEntry { header, lines };
283        let edges = build_je_network_edges(&[je], JeNetworkMethod::A);
284        assert_eq!(edges.len(), 1);
285        assert_eq!(edges[0].ic_partner_entity, Some("ACME_EUR".to_string()));
286        // ic_pair_id requires construction via IcPairId — covered in group tests.
287    }
288}