Skip to main content

datasynth_runtime/
je_network.rs

1//! Shared Method-A / Method-B/C edge-list builder for the JE network export.
2//!
3//! v5.10: extracted from `output_writer::write_je_network_csv` so the
4//! same logic can be reused by the `datasynth-group` aggregate emitter,
5//! which builds both per-entity and consolidated edge lists from many
6//! per-entity JE batches.
7//!
8//! The builder is pure — no I/O — and returns a `Vec<JeNetworkEdge>`.
9//! Single-entity callers feed the result into the existing CSV writer
10//! (preserving v5.8.0 byte-identical output); group callers serialise
11//! the same struct with extra contextual columns (entity_code,
12//! is_eliminated, eliminates_ic_pair_id) added at write time.
13
14use chrono::NaiveDate;
15use rust_decimal::Decimal;
16use std::collections::HashMap;
17use uuid::Uuid;
18
19use datasynth_config::JeNetworkMethod;
20use datasynth_core::models::JournalEntry;
21
22/// One row of the je_network output.
23///
24/// The 13 columns matching v5.8.0 CSV output are at the top; v5.10
25/// added the optional IC fields (Some only on group-context inputs).
26#[derive(Debug, Clone)]
27pub struct JeNetworkEdge {
28    pub edge_id: String,
29    pub document_id: Uuid,
30    pub posting_date: NaiveDate,
31    pub from_account: String,
32    pub to_account: String,
33    pub from_line_id: String,
34    pub to_line_id: String,
35    pub amount: Decimal,
36    pub confidence: f64,
37    pub predecessor_edge_id: String,
38    pub business_process: String,
39    pub is_fraud: bool,
40    pub is_anomaly: bool,
41    /// v5.27 — fine-grained fraud typology (the `FraudType` variant name,
42    /// e.g. `SuspenseAccountAbuse`), surfaced from the JE header so the edge
43    /// list carries the same label as the JE table. Empty on non-fraud edges.
44    pub fraud_type: Option<String>,
45    /// v5.10 — surfaces `JournalEntryHeader::ic_pair_id` when present.
46    /// `None` for non-IC postings (and on every edge from a single-entity run).
47    pub ic_pair_id: Option<String>,
48    /// v5.10 — surfaces `JournalEntryHeader::ic_partner_entity` when present.
49    pub ic_partner_entity: Option<String>,
50}
51
52/// Build the Method-A / Method-B/C edge list for a batch of JEs.
53///
54/// JEs are processed in the iteration order of the slice; predecessor
55/// edge ids are resolved against earlier JEs in the same batch (so the
56/// caller must pass JEs that share predecessor chains in the same call).
57///
58/// Method semantics — see Ivertowski et al. (2024) Methods A through E:
59/// * `JeNetworkMethod::A` — bijective on 2-line entries only; multi-line
60///   JEs are skipped. Confidence on every emitted edge = `1.0`.
61/// * `JeNetworkMethod::Cartesian` — full Cartesian debit × credit product
62///   with proportional amount allocation; confidence = `1 / (n × m)`.
63pub fn build_je_network_edges(jes: &[JournalEntry], method: JeNetworkMethod) -> Vec<JeNetworkEdge> {
64    let mut builder = JeNetworkEdgeBuilder::with_capacity(method, jes.len() * 2);
65    for je in jes {
66        builder.push_je(je);
67    }
68    builder.into_edges()
69}
70
71/// v5.31 C1 Phase 6 — stateful, JE-at-a-time edge builder.
72///
73/// Maintains the `line_id → edge_id` mapping across `push_je` calls so
74/// cross-JE predecessor chains within an entity (e.g. payment-JE →
75/// invoice-JE → PO-JE document chains) resolve correctly **even when
76/// the caller streams JEs one at a time** rather than passing them as
77/// a single slice to [`build_je_network_edges`].
78///
79/// Use this when the caller wants to feed JEs from a streaming JSON
80/// parse (or other streaming source) without materialising the whole
81/// `Vec<JournalEntry>` in memory. The accumulated `edges` Vec grows as
82/// `push_je` is called and can be drained at any point via
83/// [`Self::drain_edges`].
84///
85/// Equivalent to calling [`build_je_network_edges`] with the same JEs
86/// passed as one slice — byte-identical output (same edge IDs, same
87/// predecessor resolution, same iteration order).
88pub struct JeNetworkEdgeBuilder {
89    method: JeNetworkMethod,
90    edges: Vec<JeNetworkEdge>,
91    line_id_to_edge_id: HashMap<String, String>,
92}
93
94impl JeNetworkEdgeBuilder {
95    /// Create a builder with the given method and zero initial capacity.
96    pub fn new(method: JeNetworkMethod) -> Self {
97        Self::with_capacity(method, 0)
98    }
99
100    /// Create a builder pre-sized for the expected total edge count.
101    pub fn with_capacity(method: JeNetworkMethod, capacity: usize) -> Self {
102        Self {
103            method,
104            edges: Vec::with_capacity(capacity),
105            line_id_to_edge_id: HashMap::with_capacity(capacity),
106        }
107    }
108
109    /// Process one JE — append its edges to the internal `edges` vector,
110    /// resolving `predecessor_line_id` against the line→edge map built
111    /// from prior `push_je` calls.
112    pub fn push_je(&mut self, je: &JournalEntry) {
113        emit_je_edges(
114            je,
115            self.method,
116            &mut self.edges,
117            &mut self.line_id_to_edge_id,
118        );
119    }
120
121    /// Take the accumulated edges, leaving the builder empty (the
122    /// line→edge map is also cleared).
123    pub fn drain_edges(&mut self) -> Vec<JeNetworkEdge> {
124        self.line_id_to_edge_id.clear();
125        std::mem::take(&mut self.edges)
126    }
127
128    /// Consume the builder and return the accumulated edges.
129    pub fn into_edges(self) -> Vec<JeNetworkEdge> {
130        self.edges
131    }
132
133    /// Current edge count (useful for batched flushes from the streaming
134    /// caller).
135    pub fn edge_count(&self) -> usize {
136        self.edges.len()
137    }
138}
139
140/// Internal helper — emit edges for one JE into `out`, using
141/// `line_id_to_edge_id` for cross-JE predecessor resolution. Factored
142/// out so both [`build_je_network_edges`] (single-batch) and
143/// [`JeNetworkEdgeBuilder::push_je`] (streaming) share the exact same
144/// code path → byte-identical output.
145fn emit_je_edges(
146    je: &JournalEntry,
147    method: JeNetworkMethod,
148    out: &mut Vec<JeNetworkEdge>,
149    line_id_to_edge_id: &mut HashMap<String, String>,
150) {
151    let h = &je.header;
152
153    let line_ids: Vec<String> = je
154        .lines
155        .iter()
156        .map(|l| {
157            l.transaction_id.clone().unwrap_or_else(|| {
158                datasynth_core::models::JournalEntryLine::derive_transaction_id(
159                    l.document_id,
160                    l.line_number,
161                )
162            })
163        })
164        .collect();
165
166    let debits: Vec<usize> = je
167        .lines
168        .iter()
169        .enumerate()
170        .filter(|(_, l)| l.debit_amount > Decimal::ZERO)
171        .map(|(i, _)| i)
172        .collect();
173    let credits: Vec<usize> = je
174        .lines
175        .iter()
176        .enumerate()
177        .filter(|(_, l)| l.credit_amount > Decimal::ZERO)
178        .map(|(i, _)| i)
179        .collect();
180    if debits.is_empty() || credits.is_empty() {
181        return;
182    }
183
184    if method == JeNetworkMethod::A && !(debits.len() == 1 && credits.len() == 1) {
185        return;
186    }
187
188    let total_debit: Decimal = debits.iter().map(|i| je.lines[*i].debit_amount).sum();
189    let total_credit: Decimal = credits.iter().map(|i| je.lines[*i].credit_amount).sum();
190    if total_debit.is_zero() || total_credit.is_zero() {
191        return;
192    }
193
194    let confidence: f64 = if debits.len() == 1 && credits.len() == 1 {
195        1.0
196    } else {
197        1.0 / (debits.len() * credits.len()) as f64
198    };
199
200    let bp = h
201        .business_process
202        .map(|bp| format!("{bp:?}"))
203        .unwrap_or_default();
204    let ic_pair_id_str = h.ic_pair_id.as_ref().map(|id| id.to_string());
205    let ic_partner = h.ic_partner_entity.clone();
206    let fraud_type_str = h.fraud_type.map(|ft| format!("{ft:?}"));
207
208    for &di in &debits {
209        let debit_line = &je.lines[di];
210        let to_line_id = &line_ids[di];
211        for &ci in &credits {
212            let credit_line = &je.lines[ci];
213            let from_line_id = &line_ids[ci];
214
215            // Edge id = UUID v5 of (document_id, debit.line_number,
216            // credit.line_number). Stable across regenerations.
217            let mut input = Vec::with_capacity(16 + 8);
218            input.extend_from_slice(h.document_id.as_bytes());
219            input.extend_from_slice(&debit_line.line_number.to_le_bytes());
220            input.extend_from_slice(&credit_line.line_number.to_le_bytes());
221            let edge_id = Uuid::new_v5(&Uuid::NAMESPACE_OID, &input).to_string();
222
223            // Proportional allocation matches
224            // TransactionGraphBuilder::add_journal_entry_debit_credit.
225            let proportion = (debit_line.debit_amount / total_debit)
226                * (credit_line.credit_amount / total_credit);
227            let amount = debit_line.debit_amount * proportion;
228
229            let predecessor_edge_id: String = credit_line
230                .predecessor_line_id
231                .as_ref()
232                .or(debit_line.predecessor_line_id.as_ref())
233                .and_then(|tx_id| line_id_to_edge_id.get(tx_id).cloned())
234                .unwrap_or_default();
235
236            out.push(JeNetworkEdge {
237                edge_id: edge_id.clone(),
238                document_id: h.document_id,
239                posting_date: h.posting_date,
240                from_account: credit_line.gl_account.clone(),
241                to_account: debit_line.gl_account.clone(),
242                from_line_id: from_line_id.clone(),
243                to_line_id: to_line_id.clone(),
244                amount,
245                confidence,
246                predecessor_edge_id,
247                business_process: bp.clone(),
248                is_fraud: h.is_fraud,
249                is_anomaly: h.is_anomaly,
250                fraud_type: fraud_type_str.clone(),
251                ic_pair_id: ic_pair_id_str.clone(),
252                ic_partner_entity: ic_partner.clone(),
253            });
254
255            line_id_to_edge_id
256                .entry(from_line_id.clone())
257                .or_insert(edge_id);
258        }
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use datasynth_core::models::{JournalEntry, JournalEntryHeader, JournalEntryLine};
266
267    fn dec(v: i64) -> Decimal {
268        Decimal::from(v)
269    }
270
271    fn header_for(doc: Uuid) -> JournalEntryHeader {
272        let mut h = JournalEntryHeader::new(
273            "C001".to_string(),
274            NaiveDate::from_ymd_opt(2026, 5, 9).expect("2026-05-09 is a valid date"),
275        );
276        h.document_id = doc;
277        h
278    }
279
280    fn make_line(doc: Uuid, n: u32, account: &str, debit: i64, credit: i64) -> JournalEntryLine {
281        JournalEntryLine {
282            document_id: doc,
283            line_number: n,
284            gl_account: account.into(),
285            debit_amount: dec(debit),
286            credit_amount: dec(credit),
287            ..Default::default()
288        }
289    }
290
291    fn make_two_line_je(debit_account: &str, credit_account: &str, amount: i64) -> JournalEntry {
292        let document_id = Uuid::new_v4();
293        let header = header_for(document_id);
294        let lines = smallvec::smallvec![
295            make_line(document_id, 1, debit_account, amount, 0),
296            make_line(document_id, 2, credit_account, 0, amount),
297        ];
298        JournalEntry { header, lines }
299    }
300
301    #[test]
302    fn method_a_emits_one_edge_per_two_line_je() {
303        let jes = vec![
304            make_two_line_je("1000", "2000", 1_000),
305            make_two_line_je("1000", "4000", 5_000),
306        ];
307        let edges = build_je_network_edges(&jes, JeNetworkMethod::A);
308        assert_eq!(edges.len(), 2, "one edge per 2-line JE");
309        for e in &edges {
310            assert_eq!(e.confidence, 1.0, "Method A confidence is exactly 1.0");
311            assert!(e.ic_pair_id.is_none());
312            assert!(e.ic_partner_entity.is_none());
313        }
314    }
315
316    #[test]
317    fn method_a_skips_multi_line_jes() {
318        let document_id = Uuid::new_v4();
319        let header = header_for(document_id);
320        let lines = smallvec::smallvec![
321            make_line(document_id, 1, "1000", 1_000, 0),
322            make_line(document_id, 2, "1010", 500, 0),
323            make_line(document_id, 3, "2000", 0, 1_500),
324        ];
325        let je = JournalEntry { header, lines };
326        let edges = build_je_network_edges(&[je], JeNetworkMethod::A);
327        assert_eq!(edges.len(), 0, "3-line JE skipped under Method A");
328    }
329
330    #[test]
331    fn cartesian_emits_n_times_m_edges_per_je() {
332        let document_id = Uuid::new_v4();
333        let header = header_for(document_id);
334        let lines = smallvec::smallvec![
335            make_line(document_id, 1, "D1", 100, 0),
336            make_line(document_id, 2, "D2", 50, 0),
337            make_line(document_id, 3, "C1", 0, 80),
338            make_line(document_id, 4, "C2", 0, 70),
339        ];
340        let je = JournalEntry { header, lines };
341        let edges = build_je_network_edges(&[je], JeNetworkMethod::Cartesian);
342        assert_eq!(
343            edges.len(),
344            4,
345            "2 debits × 2 credits = 4 edges under Cartesian"
346        );
347        for e in &edges {
348            assert!((e.confidence - 0.25).abs() < 1e-9, "1/(n*m) = 0.25");
349        }
350    }
351
352    #[test]
353    fn ic_fields_surface_when_present_on_header() {
354        let document_id = Uuid::new_v4();
355        let mut header = header_for(document_id);
356        header.ic_partner_entity = Some("ACME_EUR".to_string());
357
358        let lines = smallvec::smallvec![
359            make_line(document_id, 1, "1150", 1000, 0),
360            make_line(document_id, 2, "4500", 0, 1000),
361        ];
362        let je = JournalEntry { header, lines };
363        let edges = build_je_network_edges(&[je], JeNetworkMethod::A);
364        assert_eq!(edges.len(), 1);
365        assert_eq!(edges[0].ic_partner_entity, Some("ACME_EUR".to_string()));
366        // ic_pair_id requires construction via IcPairId — covered in group tests.
367    }
368}