Skip to main content

datasynth_generators/concentration/
mod.rs

1//! Central concentration abstraction (#143, Phase 1).
2//!
3//! Post-generation passes that reshape the JE batch's distributional structure
4//! toward a corpus-derived target. Single integration point in the orchestrator
5//! sidesteps the multi-generator coverage problem that blocked SOTA-8 (#141) and
6//! SOTA-11 (#142) during the SOTA-N round.
7//!
8//! Design reference: `docs/superpowers/specs/2026-05-23-concentration-pass-INDEX.md`
9//!
10//! Invariants every implementor MUST preserve:
11//!   1. Per-JE balance: `sum(debits) == sum(credits)`
12//!   2. Subledger-bridge accounts unchanged (see [`STRUCTURAL_BRIDGE_ACCOUNTS`])
13//!   3. Document-chain refs (`DocumentReference` is keyed by `document_id`,
14//!      not by `gl_account` — substituting non-bridge accounts is safe)
15//!   4. Determinism: same RNG seed + same input batch ⇒ same output
16//!
17//! # Phase 1 (this commit)
18//!   - [`ConcentrationPass`] trait + [`ConcentrationStats`] aggregate
19//!   - [`ConcentrationPipeline`] runs passes in order with per-pass RNG isolation
20//!   - [`SourceConditionalRarityPass`] — wraps shipped SOTA-12 tagger
21//!   - [`TradingPartnerPoolPass`] — closes SOTA-11.1 / #142 coverage gap
22//!
23//! # Phase 2 (next commit)
24//!   - `AccountPairSubstitutionPass` — corpus-PMF-driven account rewriting
25//!     respecting the bridge allowlist + AccountType invariant
26
27use std::collections::BTreeMap;
28
29use datasynth_config::schema::ConcentrationConfig;
30use datasynth_core::models::JournalEntry;
31use rand::SeedableRng;
32use rand_chacha::ChaCha8Rng;
33use serde::Serialize;
34
35pub mod account_pair_substitution;
36pub mod consolidation_outlier;
37pub mod source_blanking;
38pub mod source_conditional_rarity_pass;
39pub mod trading_partner_pool;
40
41pub use account_pair_substitution::{AccountPairSubstitutionError, AccountPairSubstitutionPass};
42pub use consolidation_outlier::ConsolidationOutlierPass;
43pub use source_blanking::SourceBlankingPass;
44pub use source_conditional_rarity_pass::SourceConditionalRarityPass;
45pub use trading_partner_pool::TradingPartnerPoolPass;
46
47/// The 7 subledger-bridge accounts whose balances must be preserved across any
48/// post-process substitution. Aggregating to subledger totals (AR/AP aging) or
49/// netting across matching pairs (GR/IR, IC, wire, fixed-asset acquisition),
50/// substituting them silently breaks reconciliation reports.
51///
52/// Source: chain-invariants addendum (`docs/superpowers/specs/2026-05-23-central-abstraction-addendum-chain-invariants.md`).
53pub const STRUCTURAL_BRIDGE_ACCOUNTS: &[&str] = &[
54    "1100", // AR_CONTROL          — GL counterpart of AR subledger
55    "2000", // AP_CONTROL          — GL counterpart of AP subledger
56    "2900", // GR_IR_CLEARING      — Goods Receipt ↔ Invoice Receipt bridge
57    "1150", // IC_AR_CLEARING      — Intercompany AR matching
58    "2050", // IC_AP_CLEARING      — Intercompany AP matching
59    "1030", // WIRE_CLEARING       — Wire-transfer reconciliation
60    "1599", // ACQUISITION_CLEARING — Fixed-asset acquisition clearing
61];
62
63/// True iff the account code is a subledger-bridge account that must be
64/// preserved across post-process substitution. Phase-1 passes don't substitute
65/// accounts at all so this is currently informational; Phase-2's account-pair
66/// substitution pass will gate substitution on this.
67#[inline]
68pub fn is_structural_bridge(account: &str) -> bool {
69    STRUCTURAL_BRIDGE_ACCOUNTS.contains(&account)
70}
71
72// ---------------------------------------------------------------------------
73// Public trait
74// ---------------------------------------------------------------------------
75
76/// A post-generation transformation reshaping the JE batch's concentration /
77/// distributional structure toward a target. See module-level docs for the
78/// invariants every implementor must preserve.
79pub trait ConcentrationPass: Send + Sync {
80    /// Short stable identifier — used in stats + logs.
81    fn name(&self) -> &'static str;
82
83    /// Apply the transformation in place. Returns aggregate counters.
84    /// `rng` is a dedicated per-pass ChaCha8 substream split by the pipeline,
85    /// so adding/removing a pass doesn't perturb downstream passes' RNG state.
86    fn apply(&self, entries: &mut [JournalEntry], rng: &mut ChaCha8Rng) -> ConcentrationStats;
87}
88
89/// Per-pass aggregate counters. Serialised into the orchestrator's run report.
90#[derive(Debug, Clone, Default, Serialize)]
91pub struct ConcentrationStats {
92    pub pass: &'static str,
93    pub entries_examined: usize,
94    pub entries_modified: usize,
95    /// Pass-specific counters (e.g. `lines_substituted`, `tps_remapped`).
96    #[serde(default)]
97    pub extra: BTreeMap<&'static str, u64>,
98}
99
100// ---------------------------------------------------------------------------
101// Pipeline
102// ---------------------------------------------------------------------------
103
104/// Errors from constructing a `ConcentrationPipeline`. Currently only one
105/// fallible source (Phase-2 AccountPairSubstitutionPass loads its PMF file);
106/// the enum is open for future pass-construction errors.
107#[derive(Debug)]
108pub enum ConcentrationPipelineError {
109    AccountPairSubstitution(AccountPairSubstitutionError),
110}
111
112impl std::fmt::Display for ConcentrationPipelineError {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        match self {
115            Self::AccountPairSubstitution(e) => {
116                write!(f, "AccountPairSubstitutionPass: {e}")
117            }
118        }
119    }
120}
121
122impl std::error::Error for ConcentrationPipelineError {}
123
124/// Ordered, deterministic execution of zero or more `ConcentrationPass`
125/// instances. Each pass receives a dedicated ChaCha8 substream so adding or
126/// removing a pass does not perturb the RNG state of any other pass.
127pub struct ConcentrationPipeline {
128    passes: Vec<Box<dyn ConcentrationPass>>,
129}
130
131impl ConcentrationPipeline {
132    /// Build a pipeline from config. Returns an empty pipeline if
133    /// `cfg.enabled == false` or no passes are configured. Returns
134    /// `Err(ConcentrationPipelineError)` if a configured pass fails to
135    /// construct (e.g. Phase-2 AccountPairSubstitutionPass can't read its
136    /// PMF file).
137    pub fn from_config(cfg: &ConcentrationConfig) -> Result<Self, ConcentrationPipelineError> {
138        let mut passes: Vec<Box<dyn ConcentrationPass>> = Vec::new();
139
140        if !cfg.enabled {
141            return Ok(Self { passes });
142        }
143
144        if let Some(c) = cfg.source_conditional_rarity.as_ref() {
145            passes.push(Box::new(SourceConditionalRarityPass::new(c.clone())));
146        }
147        if let Some(c) = cfg.trading_partner_pool.as_ref() {
148            passes.push(Box::new(TradingPartnerPoolPass::new(c.clone())));
149        }
150        if let Some(c) = cfg.account_pair_substitution.as_ref() {
151            let pass = AccountPairSubstitutionPass::from_pmf_file(c.clone())
152                .map_err(ConcentrationPipelineError::AccountPairSubstitution)?;
153            passes.push(Box::new(pass));
154        }
155        // v5.30 B2 (#154): ConsolidationOutlier expands a small fraction
156        // of JEs by appending bridge-account DR/CR pairs. Registered
157        // BEFORE SourceBlankingPass so the appended bridge lines inherit
158        // whatever source-code state the original JE carried (and remain
159        // subject to subsequent blanking if configured).
160        if let Some(c) = cfg.consolidation_outlier.as_ref() {
161            passes.push(Box::new(ConsolidationOutlierPass::new(c.clone())));
162        }
163        // SourceBlankingPass registered LAST — earlier passes that read
164        // sap_source_code must see full coverage. See module-level docs.
165        if let Some(c) = cfg.source_blanking.as_ref() {
166            passes.push(Box::new(SourceBlankingPass::new(c.clone())));
167        }
168
169        Ok(Self { passes })
170    }
171
172    /// Execute every pass in order, each with its own ChaCha8 substream
173    /// derived deterministically from `seed`. Returns one `ConcentrationStats`
174    /// per pass in execution order.
175    pub fn run(&self, entries: &mut [JournalEntry], seed: u64) -> Vec<ConcentrationStats> {
176        // Golden-ratio multiplier keeps substreams well-separated across passes.
177        const STREAM_STRIDE: u64 = 0x9E37_79B9_7F4A_7C15;
178
179        self.passes
180            .iter()
181            .enumerate()
182            .map(|(idx, pass)| {
183                let stream_seed = seed.wrapping_add((idx as u64).wrapping_mul(STREAM_STRIDE));
184                let mut rng = ChaCha8Rng::seed_from_u64(stream_seed);
185                pass.apply(entries, &mut rng)
186            })
187            .collect()
188    }
189
190    /// True iff the pipeline has at least one configured pass.
191    pub fn is_active(&self) -> bool {
192        !self.passes.is_empty()
193    }
194
195    /// Number of configured passes (for tests / observability).
196    pub fn len(&self) -> usize {
197        self.passes.len()
198    }
199
200    /// True iff zero passes are configured.
201    pub fn is_empty(&self) -> bool {
202        self.passes.is_empty()
203    }
204}
205
206// ---------------------------------------------------------------------------
207// Tests
208// ---------------------------------------------------------------------------
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213    use datasynth_config::schema::{
214        SourceConditionalRarityPassConfig, TradingPartnerPoolPassConfig,
215    };
216
217    #[test]
218    fn empty_config_yields_inactive_pipeline() {
219        let cfg = ConcentrationConfig::default();
220        let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
221        assert!(!pipeline.is_active());
222        assert_eq!(pipeline.len(), 0);
223    }
224
225    #[test]
226    fn disabled_master_switch_yields_inactive_pipeline_even_with_passes() {
227        let cfg = ConcentrationConfig {
228            enabled: false, // master switch off
229            trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
230            ..Default::default()
231        };
232        let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
233        assert!(
234            !pipeline.is_active(),
235            "master-switch-off must override pass configs"
236        );
237    }
238
239    #[test]
240    fn enabled_config_registers_each_configured_pass() {
241        let cfg = ConcentrationConfig {
242            enabled: true,
243            source_conditional_rarity: Some(SourceConditionalRarityPassConfig {
244                rate: 0.01,
245                min_surprise: None,
246                min_per_source_lines: None,
247            }),
248            trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
249            ..Default::default()
250        };
251        let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
252        assert!(pipeline.is_active());
253        assert_eq!(pipeline.len(), 2);
254    }
255
256    #[test]
257    fn structural_bridge_allowlist_recognises_seven_accounts() {
258        assert_eq!(STRUCTURAL_BRIDGE_ACCOUNTS.len(), 7);
259        assert!(is_structural_bridge("1100")); // AR_CONTROL
260        assert!(is_structural_bridge("2000")); // AP_CONTROL
261        assert!(is_structural_bridge("2900")); // GR_IR_CLEARING
262        assert!(is_structural_bridge("1150")); // IC_AR_CLEARING
263        assert!(is_structural_bridge("2050")); // IC_AP_CLEARING
264        assert!(is_structural_bridge("1030")); // WIRE_CLEARING
265        assert!(is_structural_bridge("1599")); // ACQUISITION_CLEARING
266        assert!(!is_structural_bridge("6000")); // expense account — safe
267        assert!(!is_structural_bridge("4000")); // revenue — safe
268        assert!(!is_structural_bridge("")); // empty — safe
269    }
270
271    /// Pass-isolation invariant: adding a no-op pass FIRST should not change
272    /// the second pass's RNG-derived output. Verified by running a TP-pool pass
273    /// alone vs after a no-op and asserting the resulting JEs are identical.
274    #[test]
275    fn rng_streams_are_isolated_across_passes() {
276        use datasynth_core::models::{JournalEntry, JournalEntryLine};
277
278        // Pass that consumes some RNG state but doesn't touch entries.
279        struct NoopRngConsumer;
280        impl ConcentrationPass for NoopRngConsumer {
281            fn name(&self) -> &'static str {
282                "noop_rng_consumer"
283            }
284            fn apply(
285                &self,
286                entries: &mut [JournalEntry],
287                rng: &mut ChaCha8Rng,
288            ) -> ConcentrationStats {
289                use rand::RngExt;
290                // Consume some entropy so this pass has observable RNG effect.
291                let _: u64 = rng.random();
292                let _: u64 = rng.random();
293                ConcentrationStats {
294                    pass: "noop_rng_consumer",
295                    entries_examined: entries.len(),
296                    entries_modified: 0,
297                    extra: BTreeMap::new(),
298                }
299            }
300        }
301
302        // Build two identical JE batches.
303        let make_batch = || {
304            let mut jes = Vec::new();
305            for i in 0..5 {
306                let mut je = JournalEntry::new_simple(
307                    format!("JE{i}"),
308                    "C1".to_string(),
309                    chrono::NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
310                    format!("desc {i}"),
311                );
312                let line = JournalEntryLine {
313                    gl_account: "6000".to_string(),
314                    trading_partner: Some(format!("TP-orig-{i}")),
315                    ..JournalEntryLine::default()
316                };
317                je.lines.push(line);
318                jes.push(je);
319            }
320            jes
321        };
322
323        // Pipeline A: just the TP pool pass.
324        let pipe_a = ConcentrationPipeline {
325            passes: vec![Box::new(TradingPartnerPoolPass::new(
326                TradingPartnerPoolPassConfig { target_size: 3 },
327            ))],
328        };
329        let mut batch_a = make_batch();
330        pipe_a.run(&mut batch_a, 12345);
331
332        // Pipeline B: no-op consumer + TP pool pass. With proper stream
333        // isolation each pass sees the same seed regardless of position.
334        let pipe_b = ConcentrationPipeline {
335            passes: vec![
336                Box::new(NoopRngConsumer),
337                Box::new(TradingPartnerPoolPass::new(TradingPartnerPoolPassConfig {
338                    target_size: 3,
339                })),
340            ],
341        };
342        let mut batch_b = make_batch();
343        pipe_b.run(&mut batch_b, 12345);
344
345        // The TP pool pass is hash-based deterministic — it shouldn't read
346        // its rng at all. So the outputs must match byte-for-byte regardless.
347        for (a, b) in batch_a.iter().zip(batch_b.iter()) {
348            assert_eq!(a.lines[0].trading_partner, b.lines[0].trading_partner);
349        }
350    }
351}