Skip to main content

datasynth_generators/concentration/
mod.rs

1//! Central concentration abstraction (#143, Phase 1).
2//!
3//! Post-generation passes that reshape the JE batch's distributional structure
4//! toward a corpus-derived target. Single integration point in the orchestrator
5//! sidesteps the multi-generator coverage problem that blocked SOTA-8 (#141) and
6//! SOTA-11 (#142) during the SOTA-N round.
7//!
8//! Design reference: `docs/superpowers/specs/2026-05-23-concentration-pass-INDEX.md`
9//!
10//! Invariants every implementor MUST preserve:
11//!   1. Per-JE balance: `sum(debits) == sum(credits)`
12//!   2. Subledger-bridge accounts unchanged (see [`STRUCTURAL_BRIDGE_ACCOUNTS`])
13//!   3. Document-chain refs (`DocumentReference` is keyed by `document_id`,
14//!      not by `gl_account` — substituting non-bridge accounts is safe)
15//!   4. Determinism: same RNG seed + same input batch ⇒ same output
16//!
17//! # Phase 1 (this commit)
18//!   - [`ConcentrationPass`] trait + [`ConcentrationStats`] aggregate
19//!   - [`ConcentrationPipeline`] runs passes in order with per-pass RNG isolation
20//!   - [`SourceConditionalRarityPass`] — wraps shipped SOTA-12 tagger
21//!   - [`TradingPartnerPoolPass`] — closes SOTA-11.1 / #142 coverage gap
22//!
23//! # Phase 2 (next commit)
24//!   - `AccountPairSubstitutionPass` — corpus-PMF-driven account rewriting
25//!     respecting the bridge allowlist + AccountType invariant
26
27use std::collections::BTreeMap;
28
29use datasynth_config::schema::ConcentrationConfig;
30use datasynth_core::models::JournalEntry;
31use rand::SeedableRng;
32use rand_chacha::ChaCha8Rng;
33use serde::Serialize;
34
35pub mod account_pair_substitution;
36pub mod source_blanking;
37pub mod source_conditional_rarity_pass;
38pub mod trading_partner_pool;
39
40pub use account_pair_substitution::{AccountPairSubstitutionError, AccountPairSubstitutionPass};
41pub use source_blanking::SourceBlankingPass;
42pub use source_conditional_rarity_pass::SourceConditionalRarityPass;
43pub use trading_partner_pool::TradingPartnerPoolPass;
44
45/// The 7 subledger-bridge accounts whose balances must be preserved across any
46/// post-process substitution. Aggregating to subledger totals (AR/AP aging) or
47/// netting across matching pairs (GR/IR, IC, wire, fixed-asset acquisition),
48/// substituting them silently breaks reconciliation reports.
49///
50/// Source: chain-invariants addendum (`docs/superpowers/specs/2026-05-23-central-abstraction-addendum-chain-invariants.md`).
51pub const STRUCTURAL_BRIDGE_ACCOUNTS: &[&str] = &[
52    "1100", // AR_CONTROL          — GL counterpart of AR subledger
53    "2000", // AP_CONTROL          — GL counterpart of AP subledger
54    "2900", // GR_IR_CLEARING      — Goods Receipt ↔ Invoice Receipt bridge
55    "1150", // IC_AR_CLEARING      — Intercompany AR matching
56    "2050", // IC_AP_CLEARING      — Intercompany AP matching
57    "1030", // WIRE_CLEARING       — Wire-transfer reconciliation
58    "1599", // ACQUISITION_CLEARING — Fixed-asset acquisition clearing
59];
60
61/// True iff the account code is a subledger-bridge account that must be
62/// preserved across post-process substitution. Phase-1 passes don't substitute
63/// accounts at all so this is currently informational; Phase-2's account-pair
64/// substitution pass will gate substitution on this.
65#[inline]
66pub fn is_structural_bridge(account: &str) -> bool {
67    STRUCTURAL_BRIDGE_ACCOUNTS.contains(&account)
68}
69
70// ---------------------------------------------------------------------------
71// Public trait
72// ---------------------------------------------------------------------------
73
74/// A post-generation transformation reshaping the JE batch's concentration /
75/// distributional structure toward a target. See module-level docs for the
76/// invariants every implementor must preserve.
77pub trait ConcentrationPass: Send + Sync {
78    /// Short stable identifier — used in stats + logs.
79    fn name(&self) -> &'static str;
80
81    /// Apply the transformation in place. Returns aggregate counters.
82    /// `rng` is a dedicated per-pass ChaCha8 substream split by the pipeline,
83    /// so adding/removing a pass doesn't perturb downstream passes' RNG state.
84    fn apply(&self, entries: &mut [JournalEntry], rng: &mut ChaCha8Rng) -> ConcentrationStats;
85}
86
87/// Per-pass aggregate counters. Serialised into the orchestrator's run report.
88#[derive(Debug, Clone, Default, Serialize)]
89pub struct ConcentrationStats {
90    pub pass: &'static str,
91    pub entries_examined: usize,
92    pub entries_modified: usize,
93    /// Pass-specific counters (e.g. `lines_substituted`, `tps_remapped`).
94    #[serde(default)]
95    pub extra: BTreeMap<&'static str, u64>,
96}
97
98// ---------------------------------------------------------------------------
99// Pipeline
100// ---------------------------------------------------------------------------
101
102/// Errors from constructing a `ConcentrationPipeline`. Currently only one
103/// fallible source (Phase-2 AccountPairSubstitutionPass loads its PMF file);
104/// the enum is open for future pass-construction errors.
105#[derive(Debug)]
106pub enum ConcentrationPipelineError {
107    AccountPairSubstitution(AccountPairSubstitutionError),
108}
109
110impl std::fmt::Display for ConcentrationPipelineError {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            Self::AccountPairSubstitution(e) => {
114                write!(f, "AccountPairSubstitutionPass: {e}")
115            }
116        }
117    }
118}
119
120impl std::error::Error for ConcentrationPipelineError {}
121
122/// Ordered, deterministic execution of zero or more `ConcentrationPass`
123/// instances. Each pass receives a dedicated ChaCha8 substream so adding or
124/// removing a pass does not perturb the RNG state of any other pass.
125pub struct ConcentrationPipeline {
126    passes: Vec<Box<dyn ConcentrationPass>>,
127}
128
129impl ConcentrationPipeline {
130    /// Build a pipeline from config. Returns an empty pipeline if
131    /// `cfg.enabled == false` or no passes are configured. Returns
132    /// `Err(ConcentrationPipelineError)` if a configured pass fails to
133    /// construct (e.g. Phase-2 AccountPairSubstitutionPass can't read its
134    /// PMF file).
135    pub fn from_config(cfg: &ConcentrationConfig) -> Result<Self, ConcentrationPipelineError> {
136        let mut passes: Vec<Box<dyn ConcentrationPass>> = Vec::new();
137
138        if !cfg.enabled {
139            return Ok(Self { passes });
140        }
141
142        if let Some(c) = cfg.source_conditional_rarity.as_ref() {
143            passes.push(Box::new(SourceConditionalRarityPass::new(c.clone())));
144        }
145        if let Some(c) = cfg.trading_partner_pool.as_ref() {
146            passes.push(Box::new(TradingPartnerPoolPass::new(c.clone())));
147        }
148        if let Some(c) = cfg.account_pair_substitution.as_ref() {
149            let pass = AccountPairSubstitutionPass::from_pmf_file(c.clone())
150                .map_err(ConcentrationPipelineError::AccountPairSubstitution)?;
151            passes.push(Box::new(pass));
152        }
153        // SourceBlankingPass registered LAST — earlier passes that read
154        // sap_source_code must see full coverage. See module-level docs.
155        if let Some(c) = cfg.source_blanking.as_ref() {
156            passes.push(Box::new(SourceBlankingPass::new(c.clone())));
157        }
158
159        Ok(Self { passes })
160    }
161
162    /// Execute every pass in order, each with its own ChaCha8 substream
163    /// derived deterministically from `seed`. Returns one `ConcentrationStats`
164    /// per pass in execution order.
165    pub fn run(&self, entries: &mut [JournalEntry], seed: u64) -> Vec<ConcentrationStats> {
166        // Golden-ratio multiplier keeps substreams well-separated across passes.
167        const STREAM_STRIDE: u64 = 0x9E37_79B9_7F4A_7C15;
168
169        self.passes
170            .iter()
171            .enumerate()
172            .map(|(idx, pass)| {
173                let stream_seed = seed.wrapping_add((idx as u64).wrapping_mul(STREAM_STRIDE));
174                let mut rng = ChaCha8Rng::seed_from_u64(stream_seed);
175                pass.apply(entries, &mut rng)
176            })
177            .collect()
178    }
179
180    /// True iff the pipeline has at least one configured pass.
181    pub fn is_active(&self) -> bool {
182        !self.passes.is_empty()
183    }
184
185    /// Number of configured passes (for tests / observability).
186    pub fn len(&self) -> usize {
187        self.passes.len()
188    }
189
190    /// True iff zero passes are configured.
191    pub fn is_empty(&self) -> bool {
192        self.passes.is_empty()
193    }
194}
195
196// ---------------------------------------------------------------------------
197// Tests
198// ---------------------------------------------------------------------------
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use datasynth_config::schema::{
204        SourceConditionalRarityPassConfig, TradingPartnerPoolPassConfig,
205    };
206
207    #[test]
208    fn empty_config_yields_inactive_pipeline() {
209        let cfg = ConcentrationConfig::default();
210        let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
211        assert!(!pipeline.is_active());
212        assert_eq!(pipeline.len(), 0);
213    }
214
215    #[test]
216    fn disabled_master_switch_yields_inactive_pipeline_even_with_passes() {
217        let cfg = ConcentrationConfig {
218            enabled: false, // master switch off
219            trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
220            ..Default::default()
221        };
222        let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
223        assert!(
224            !pipeline.is_active(),
225            "master-switch-off must override pass configs"
226        );
227    }
228
229    #[test]
230    fn enabled_config_registers_each_configured_pass() {
231        let cfg = ConcentrationConfig {
232            enabled: true,
233            source_conditional_rarity: Some(SourceConditionalRarityPassConfig {
234                rate: 0.01,
235                min_surprise: None,
236                min_per_source_lines: None,
237            }),
238            trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
239            ..Default::default()
240        };
241        let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
242        assert!(pipeline.is_active());
243        assert_eq!(pipeline.len(), 2);
244    }
245
246    #[test]
247    fn structural_bridge_allowlist_recognises_seven_accounts() {
248        assert_eq!(STRUCTURAL_BRIDGE_ACCOUNTS.len(), 7);
249        assert!(is_structural_bridge("1100")); // AR_CONTROL
250        assert!(is_structural_bridge("2000")); // AP_CONTROL
251        assert!(is_structural_bridge("2900")); // GR_IR_CLEARING
252        assert!(is_structural_bridge("1150")); // IC_AR_CLEARING
253        assert!(is_structural_bridge("2050")); // IC_AP_CLEARING
254        assert!(is_structural_bridge("1030")); // WIRE_CLEARING
255        assert!(is_structural_bridge("1599")); // ACQUISITION_CLEARING
256        assert!(!is_structural_bridge("6000")); // expense account — safe
257        assert!(!is_structural_bridge("4000")); // revenue — safe
258        assert!(!is_structural_bridge("")); // empty — safe
259    }
260
261    /// Pass-isolation invariant: adding a no-op pass FIRST should not change
262    /// the second pass's RNG-derived output. Verified by running a TP-pool pass
263    /// alone vs after a no-op and asserting the resulting JEs are identical.
264    #[test]
265    fn rng_streams_are_isolated_across_passes() {
266        use datasynth_core::models::{JournalEntry, JournalEntryLine};
267
268        // Pass that consumes some RNG state but doesn't touch entries.
269        struct NoopRngConsumer;
270        impl ConcentrationPass for NoopRngConsumer {
271            fn name(&self) -> &'static str {
272                "noop_rng_consumer"
273            }
274            fn apply(
275                &self,
276                entries: &mut [JournalEntry],
277                rng: &mut ChaCha8Rng,
278            ) -> ConcentrationStats {
279                use rand::RngExt;
280                // Consume some entropy so this pass has observable RNG effect.
281                let _: u64 = rng.random();
282                let _: u64 = rng.random();
283                ConcentrationStats {
284                    pass: "noop_rng_consumer",
285                    entries_examined: entries.len(),
286                    entries_modified: 0,
287                    extra: BTreeMap::new(),
288                }
289            }
290        }
291
292        // Build two identical JE batches.
293        let make_batch = || {
294            let mut jes = Vec::new();
295            for i in 0..5 {
296                let mut je = JournalEntry::new_simple(
297                    format!("JE{i}"),
298                    "C1".to_string(),
299                    chrono::NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
300                    format!("desc {i}"),
301                );
302                let line = JournalEntryLine {
303                    gl_account: "6000".to_string(),
304                    trading_partner: Some(format!("TP-orig-{i}")),
305                    ..JournalEntryLine::default()
306                };
307                je.lines.push(line);
308                jes.push(je);
309            }
310            jes
311        };
312
313        // Pipeline A: just the TP pool pass.
314        let pipe_a = ConcentrationPipeline {
315            passes: vec![Box::new(TradingPartnerPoolPass::new(
316                TradingPartnerPoolPassConfig { target_size: 3 },
317            ))],
318        };
319        let mut batch_a = make_batch();
320        pipe_a.run(&mut batch_a, 12345);
321
322        // Pipeline B: no-op consumer + TP pool pass. With proper stream
323        // isolation each pass sees the same seed regardless of position.
324        let pipe_b = ConcentrationPipeline {
325            passes: vec![
326                Box::new(NoopRngConsumer),
327                Box::new(TradingPartnerPoolPass::new(TradingPartnerPoolPassConfig {
328                    target_size: 3,
329                })),
330            ],
331        };
332        let mut batch_b = make_batch();
333        pipe_b.run(&mut batch_b, 12345);
334
335        // The TP pool pass is hash-based deterministic — it shouldn't read
336        // its rng at all. So the outputs must match byte-for-byte regardless.
337        for (a, b) in batch_a.iter().zip(batch_b.iter()) {
338            assert_eq!(a.lines[0].trading_partner, b.lines[0].trading_partner);
339        }
340    }
341}