datasynth_generators/concentration/mod.rs
1//! Central concentration abstraction (#143, Phase 1).
2//!
3//! Post-generation passes that reshape the JE batch's distributional structure
4//! toward a corpus-derived target. Single integration point in the orchestrator
5//! sidesteps the multi-generator coverage problem that blocked SOTA-8 (#141) and
6//! SOTA-11 (#142) during the SOTA-N round.
7//!
8//! Design reference: `docs/superpowers/specs/2026-05-23-concentration-pass-INDEX.md`
9//!
10//! Invariants every implementor MUST preserve:
11//! 1. Per-JE balance: `sum(debits) == sum(credits)`
12//! 2. Subledger-bridge accounts unchanged (see [`STRUCTURAL_BRIDGE_ACCOUNTS`])
13//! 3. Document-chain refs (`DocumentReference` is keyed by `document_id`,
14//! not by `gl_account` — substituting non-bridge accounts is safe)
15//! 4. Determinism: same RNG seed + same input batch ⇒ same output
16//!
17//! # Phase 1 (this commit)
18//! - [`ConcentrationPass`] trait + [`ConcentrationStats`] aggregate
19//! - [`ConcentrationPipeline`] runs passes in order with per-pass RNG isolation
20//! - [`SourceConditionalRarityPass`] — wraps shipped SOTA-12 tagger
21//! - [`TradingPartnerPoolPass`] — closes SOTA-11.1 / #142 coverage gap
22//!
23//! # Phase 2 (next commit)
24//! - `AccountPairSubstitutionPass` — corpus-PMF-driven account rewriting
25//! respecting the bridge allowlist + AccountType invariant
26
27use std::collections::BTreeMap;
28
29use datasynth_config::schema::ConcentrationConfig;
30use datasynth_core::models::JournalEntry;
31use rand::SeedableRng;
32use rand_chacha::ChaCha8Rng;
33use serde::Serialize;
34
35pub mod account_pair_substitution;
36pub mod consolidation_outlier;
37pub mod source_blanking;
38pub mod source_conditional_rarity_pass;
39pub mod trading_partner_pool;
40
41pub use account_pair_substitution::{AccountPairSubstitutionError, AccountPairSubstitutionPass};
42pub use consolidation_outlier::ConsolidationOutlierPass;
43pub use source_blanking::SourceBlankingPass;
44pub use source_conditional_rarity_pass::SourceConditionalRarityPass;
45pub use trading_partner_pool::TradingPartnerPoolPass;
46
47/// The 7 subledger-bridge accounts whose balances must be preserved across any
48/// post-process substitution. Aggregating to subledger totals (AR/AP aging) or
49/// netting across matching pairs (GR/IR, IC, wire, fixed-asset acquisition),
50/// substituting them silently breaks reconciliation reports.
51///
52/// Source: chain-invariants addendum (`docs/superpowers/specs/2026-05-23-central-abstraction-addendum-chain-invariants.md`).
53pub const STRUCTURAL_BRIDGE_ACCOUNTS: &[&str] = &[
54 "1100", // AR_CONTROL — GL counterpart of AR subledger
55 "2000", // AP_CONTROL — GL counterpart of AP subledger
56 "2900", // GR_IR_CLEARING — Goods Receipt ↔ Invoice Receipt bridge
57 "1150", // IC_AR_CLEARING — Intercompany AR matching
58 "2050", // IC_AP_CLEARING — Intercompany AP matching
59 "1030", // WIRE_CLEARING — Wire-transfer reconciliation
60 "1599", // ACQUISITION_CLEARING — Fixed-asset acquisition clearing
61];
62
63/// True iff the account code is a subledger-bridge account that must be
64/// preserved across post-process substitution. Phase-1 passes don't substitute
65/// accounts at all so this is currently informational; Phase-2's account-pair
66/// substitution pass will gate substitution on this.
67#[inline]
68pub fn is_structural_bridge(account: &str) -> bool {
69 STRUCTURAL_BRIDGE_ACCOUNTS.contains(&account)
70}
71
72// ---------------------------------------------------------------------------
73// Public trait
74// ---------------------------------------------------------------------------
75
76/// A post-generation transformation reshaping the JE batch's concentration /
77/// distributional structure toward a target. See module-level docs for the
78/// invariants every implementor must preserve.
79pub trait ConcentrationPass: Send + Sync {
80 /// Short stable identifier — used in stats + logs.
81 fn name(&self) -> &'static str;
82
83 /// Apply the transformation in place. Returns aggregate counters.
84 /// `rng` is a dedicated per-pass ChaCha8 substream split by the pipeline,
85 /// so adding/removing a pass doesn't perturb downstream passes' RNG state.
86 fn apply(&self, entries: &mut [JournalEntry], rng: &mut ChaCha8Rng) -> ConcentrationStats;
87}
88
89/// Per-pass aggregate counters. Serialised into the orchestrator's run report.
90#[derive(Debug, Clone, Default, Serialize)]
91pub struct ConcentrationStats {
92 pub pass: &'static str,
93 pub entries_examined: usize,
94 pub entries_modified: usize,
95 /// Pass-specific counters (e.g. `lines_substituted`, `tps_remapped`).
96 #[serde(default)]
97 pub extra: BTreeMap<&'static str, u64>,
98}
99
100// ---------------------------------------------------------------------------
101// Pipeline
102// ---------------------------------------------------------------------------
103
104/// Errors from constructing a `ConcentrationPipeline`. Currently only one
105/// fallible source (Phase-2 AccountPairSubstitutionPass loads its PMF file);
106/// the enum is open for future pass-construction errors.
107#[derive(Debug)]
108pub enum ConcentrationPipelineError {
109 AccountPairSubstitution(AccountPairSubstitutionError),
110}
111
112impl std::fmt::Display for ConcentrationPipelineError {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 match self {
115 Self::AccountPairSubstitution(e) => {
116 write!(f, "AccountPairSubstitutionPass: {e}")
117 }
118 }
119 }
120}
121
122impl std::error::Error for ConcentrationPipelineError {}
123
124/// Ordered, deterministic execution of zero or more `ConcentrationPass`
125/// instances. Each pass receives a dedicated ChaCha8 substream so adding or
126/// removing a pass does not perturb the RNG state of any other pass.
127pub struct ConcentrationPipeline {
128 passes: Vec<Box<dyn ConcentrationPass>>,
129}
130
131impl ConcentrationPipeline {
132 /// Build a pipeline from config. Returns an empty pipeline if
133 /// `cfg.enabled == false` or no passes are configured. Returns
134 /// `Err(ConcentrationPipelineError)` if a configured pass fails to
135 /// construct (e.g. Phase-2 AccountPairSubstitutionPass can't read its
136 /// PMF file).
137 pub fn from_config(cfg: &ConcentrationConfig) -> Result<Self, ConcentrationPipelineError> {
138 let mut passes: Vec<Box<dyn ConcentrationPass>> = Vec::new();
139
140 if !cfg.enabled {
141 return Ok(Self { passes });
142 }
143
144 if let Some(c) = cfg.source_conditional_rarity.as_ref() {
145 passes.push(Box::new(SourceConditionalRarityPass::new(c.clone())));
146 }
147 if let Some(c) = cfg.trading_partner_pool.as_ref() {
148 passes.push(Box::new(TradingPartnerPoolPass::new(c.clone())));
149 }
150 if let Some(c) = cfg.account_pair_substitution.as_ref() {
151 let pass = AccountPairSubstitutionPass::from_pmf_file(c.clone())
152 .map_err(ConcentrationPipelineError::AccountPairSubstitution)?;
153 passes.push(Box::new(pass));
154 }
155 // v5.30 B2 (#154): ConsolidationOutlier expands a small fraction
156 // of JEs by appending bridge-account DR/CR pairs. Registered
157 // BEFORE SourceBlankingPass so the appended bridge lines inherit
158 // whatever source-code state the original JE carried (and remain
159 // subject to subsequent blanking if configured).
160 if let Some(c) = cfg.consolidation_outlier.as_ref() {
161 passes.push(Box::new(ConsolidationOutlierPass::new(c.clone())));
162 }
163 // SourceBlankingPass registered LAST — earlier passes that read
164 // sap_source_code must see full coverage. See module-level docs.
165 if let Some(c) = cfg.source_blanking.as_ref() {
166 passes.push(Box::new(SourceBlankingPass::new(c.clone())));
167 }
168
169 Ok(Self { passes })
170 }
171
172 /// Execute every pass in order, each with its own ChaCha8 substream
173 /// derived deterministically from `seed`. Returns one `ConcentrationStats`
174 /// per pass in execution order.
175 pub fn run(&self, entries: &mut [JournalEntry], seed: u64) -> Vec<ConcentrationStats> {
176 // Golden-ratio multiplier keeps substreams well-separated across passes.
177 const STREAM_STRIDE: u64 = 0x9E37_79B9_7F4A_7C15;
178
179 self.passes
180 .iter()
181 .enumerate()
182 .map(|(idx, pass)| {
183 let stream_seed = seed.wrapping_add((idx as u64).wrapping_mul(STREAM_STRIDE));
184 let mut rng = ChaCha8Rng::seed_from_u64(stream_seed);
185 pass.apply(entries, &mut rng)
186 })
187 .collect()
188 }
189
190 /// True iff the pipeline has at least one configured pass.
191 pub fn is_active(&self) -> bool {
192 !self.passes.is_empty()
193 }
194
195 /// Number of configured passes (for tests / observability).
196 pub fn len(&self) -> usize {
197 self.passes.len()
198 }
199
200 /// True iff zero passes are configured.
201 pub fn is_empty(&self) -> bool {
202 self.passes.is_empty()
203 }
204}
205
206// ---------------------------------------------------------------------------
207// Tests
208// ---------------------------------------------------------------------------
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213 use datasynth_config::schema::{
214 SourceConditionalRarityPassConfig, TradingPartnerPoolPassConfig,
215 };
216
217 #[test]
218 fn empty_config_yields_inactive_pipeline() {
219 let cfg = ConcentrationConfig::default();
220 let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
221 assert!(!pipeline.is_active());
222 assert_eq!(pipeline.len(), 0);
223 }
224
225 #[test]
226 fn disabled_master_switch_yields_inactive_pipeline_even_with_passes() {
227 let cfg = ConcentrationConfig {
228 enabled: false, // master switch off
229 trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
230 ..Default::default()
231 };
232 let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
233 assert!(
234 !pipeline.is_active(),
235 "master-switch-off must override pass configs"
236 );
237 }
238
239 #[test]
240 fn enabled_config_registers_each_configured_pass() {
241 let cfg = ConcentrationConfig {
242 enabled: true,
243 source_conditional_rarity: Some(SourceConditionalRarityPassConfig {
244 rate: 0.01,
245 min_surprise: None,
246 min_per_source_lines: None,
247 }),
248 trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
249 ..Default::default()
250 };
251 let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
252 assert!(pipeline.is_active());
253 assert_eq!(pipeline.len(), 2);
254 }
255
256 #[test]
257 fn structural_bridge_allowlist_recognises_seven_accounts() {
258 assert_eq!(STRUCTURAL_BRIDGE_ACCOUNTS.len(), 7);
259 assert!(is_structural_bridge("1100")); // AR_CONTROL
260 assert!(is_structural_bridge("2000")); // AP_CONTROL
261 assert!(is_structural_bridge("2900")); // GR_IR_CLEARING
262 assert!(is_structural_bridge("1150")); // IC_AR_CLEARING
263 assert!(is_structural_bridge("2050")); // IC_AP_CLEARING
264 assert!(is_structural_bridge("1030")); // WIRE_CLEARING
265 assert!(is_structural_bridge("1599")); // ACQUISITION_CLEARING
266 assert!(!is_structural_bridge("6000")); // expense account — safe
267 assert!(!is_structural_bridge("4000")); // revenue — safe
268 assert!(!is_structural_bridge("")); // empty — safe
269 }
270
271 /// Pass-isolation invariant: adding a no-op pass FIRST should not change
272 /// the second pass's RNG-derived output. Verified by running a TP-pool pass
273 /// alone vs after a no-op and asserting the resulting JEs are identical.
274 #[test]
275 fn rng_streams_are_isolated_across_passes() {
276 use datasynth_core::models::{JournalEntry, JournalEntryLine};
277
278 // Pass that consumes some RNG state but doesn't touch entries.
279 struct NoopRngConsumer;
280 impl ConcentrationPass for NoopRngConsumer {
281 fn name(&self) -> &'static str {
282 "noop_rng_consumer"
283 }
284 fn apply(
285 &self,
286 entries: &mut [JournalEntry],
287 rng: &mut ChaCha8Rng,
288 ) -> ConcentrationStats {
289 use rand::RngExt;
290 // Consume some entropy so this pass has observable RNG effect.
291 let _: u64 = rng.random();
292 let _: u64 = rng.random();
293 ConcentrationStats {
294 pass: "noop_rng_consumer",
295 entries_examined: entries.len(),
296 entries_modified: 0,
297 extra: BTreeMap::new(),
298 }
299 }
300 }
301
302 // Build two identical JE batches.
303 let make_batch = || {
304 let mut jes = Vec::new();
305 for i in 0..5 {
306 let mut je = JournalEntry::new_simple(
307 format!("JE{i}"),
308 "C1".to_string(),
309 chrono::NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
310 format!("desc {i}"),
311 );
312 let line = JournalEntryLine {
313 gl_account: "6000".to_string(),
314 trading_partner: Some(format!("TP-orig-{i}")),
315 ..JournalEntryLine::default()
316 };
317 je.lines.push(line);
318 jes.push(je);
319 }
320 jes
321 };
322
323 // Pipeline A: just the TP pool pass.
324 let pipe_a = ConcentrationPipeline {
325 passes: vec![Box::new(TradingPartnerPoolPass::new(
326 TradingPartnerPoolPassConfig { target_size: 3 },
327 ))],
328 };
329 let mut batch_a = make_batch();
330 pipe_a.run(&mut batch_a, 12345);
331
332 // Pipeline B: no-op consumer + TP pool pass. With proper stream
333 // isolation each pass sees the same seed regardless of position.
334 let pipe_b = ConcentrationPipeline {
335 passes: vec![
336 Box::new(NoopRngConsumer),
337 Box::new(TradingPartnerPoolPass::new(TradingPartnerPoolPassConfig {
338 target_size: 3,
339 })),
340 ],
341 };
342 let mut batch_b = make_batch();
343 pipe_b.run(&mut batch_b, 12345);
344
345 // The TP pool pass is hash-based deterministic — it shouldn't read
346 // its rng at all. So the outputs must match byte-for-byte regardless.
347 for (a, b) in batch_a.iter().zip(batch_b.iter()) {
348 assert_eq!(a.lines[0].trading_partner, b.lines[0].trading_partner);
349 }
350 }
351}