datasynth_generators/concentration/mod.rs
1//! Central concentration abstraction (#143, Phase 1).
2//!
3//! Post-generation passes that reshape the JE batch's distributional structure
4//! toward a corpus-derived target. Single integration point in the orchestrator
5//! sidesteps the multi-generator coverage problem that blocked SOTA-8 (#141) and
6//! SOTA-11 (#142) during the SOTA-N round.
7//!
8//! Design reference: `docs/superpowers/specs/2026-05-23-concentration-pass-INDEX.md`
9//!
10//! Invariants every implementor MUST preserve:
11//! 1. Per-JE balance: `sum(debits) == sum(credits)`
12//! 2. Subledger-bridge accounts unchanged (see [`STRUCTURAL_BRIDGE_ACCOUNTS`])
13//! 3. Document-chain refs (`DocumentReference` is keyed by `document_id`,
14//! not by `gl_account` — substituting non-bridge accounts is safe)
15//! 4. Determinism: same RNG seed + same input batch ⇒ same output
16//!
17//! # Phase 1 (this commit)
18//! - [`ConcentrationPass`] trait + [`ConcentrationStats`] aggregate
19//! - [`ConcentrationPipeline`] runs passes in order with per-pass RNG isolation
20//! - [`SourceConditionalRarityPass`] — wraps shipped SOTA-12 tagger
21//! - [`TradingPartnerPoolPass`] — closes SOTA-11.1 / #142 coverage gap
22//!
23//! # Phase 2 (next commit)
24//! - `AccountPairSubstitutionPass` — corpus-PMF-driven account rewriting
25//! respecting the bridge allowlist + AccountType invariant
26
27use std::collections::BTreeMap;
28
29use datasynth_config::schema::ConcentrationConfig;
30use datasynth_core::models::JournalEntry;
31use rand::SeedableRng;
32use rand_chacha::ChaCha8Rng;
33use serde::Serialize;
34
35pub mod account_pair_substitution;
36pub mod source_blanking;
37pub mod source_conditional_rarity_pass;
38pub mod trading_partner_pool;
39
40pub use account_pair_substitution::{AccountPairSubstitutionError, AccountPairSubstitutionPass};
41pub use source_blanking::SourceBlankingPass;
42pub use source_conditional_rarity_pass::SourceConditionalRarityPass;
43pub use trading_partner_pool::TradingPartnerPoolPass;
44
45/// The 7 subledger-bridge accounts whose balances must be preserved across any
46/// post-process substitution. Aggregating to subledger totals (AR/AP aging) or
47/// netting across matching pairs (GR/IR, IC, wire, fixed-asset acquisition),
48/// substituting them silently breaks reconciliation reports.
49///
50/// Source: chain-invariants addendum (`docs/superpowers/specs/2026-05-23-central-abstraction-addendum-chain-invariants.md`).
51pub const STRUCTURAL_BRIDGE_ACCOUNTS: &[&str] = &[
52 "1100", // AR_CONTROL — GL counterpart of AR subledger
53 "2000", // AP_CONTROL — GL counterpart of AP subledger
54 "2900", // GR_IR_CLEARING — Goods Receipt ↔ Invoice Receipt bridge
55 "1150", // IC_AR_CLEARING — Intercompany AR matching
56 "2050", // IC_AP_CLEARING — Intercompany AP matching
57 "1030", // WIRE_CLEARING — Wire-transfer reconciliation
58 "1599", // ACQUISITION_CLEARING — Fixed-asset acquisition clearing
59];
60
61/// True iff the account code is a subledger-bridge account that must be
62/// preserved across post-process substitution. Phase-1 passes don't substitute
63/// accounts at all so this is currently informational; Phase-2's account-pair
64/// substitution pass will gate substitution on this.
65#[inline]
66pub fn is_structural_bridge(account: &str) -> bool {
67 STRUCTURAL_BRIDGE_ACCOUNTS.contains(&account)
68}
69
70// ---------------------------------------------------------------------------
71// Public trait
72// ---------------------------------------------------------------------------
73
74/// A post-generation transformation reshaping the JE batch's concentration /
75/// distributional structure toward a target. See module-level docs for the
76/// invariants every implementor must preserve.
77pub trait ConcentrationPass: Send + Sync {
78 /// Short stable identifier — used in stats + logs.
79 fn name(&self) -> &'static str;
80
81 /// Apply the transformation in place. Returns aggregate counters.
82 /// `rng` is a dedicated per-pass ChaCha8 substream split by the pipeline,
83 /// so adding/removing a pass doesn't perturb downstream passes' RNG state.
84 fn apply(&self, entries: &mut [JournalEntry], rng: &mut ChaCha8Rng) -> ConcentrationStats;
85}
86
87/// Per-pass aggregate counters. Serialised into the orchestrator's run report.
88#[derive(Debug, Clone, Default, Serialize)]
89pub struct ConcentrationStats {
90 pub pass: &'static str,
91 pub entries_examined: usize,
92 pub entries_modified: usize,
93 /// Pass-specific counters (e.g. `lines_substituted`, `tps_remapped`).
94 #[serde(default)]
95 pub extra: BTreeMap<&'static str, u64>,
96}
97
98// ---------------------------------------------------------------------------
99// Pipeline
100// ---------------------------------------------------------------------------
101
102/// Errors from constructing a `ConcentrationPipeline`. Currently only one
103/// fallible source (Phase-2 AccountPairSubstitutionPass loads its PMF file);
104/// the enum is open for future pass-construction errors.
105#[derive(Debug)]
106pub enum ConcentrationPipelineError {
107 AccountPairSubstitution(AccountPairSubstitutionError),
108}
109
110impl std::fmt::Display for ConcentrationPipelineError {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 match self {
113 Self::AccountPairSubstitution(e) => {
114 write!(f, "AccountPairSubstitutionPass: {e}")
115 }
116 }
117 }
118}
119
120impl std::error::Error for ConcentrationPipelineError {}
121
122/// Ordered, deterministic execution of zero or more `ConcentrationPass`
123/// instances. Each pass receives a dedicated ChaCha8 substream so adding or
124/// removing a pass does not perturb the RNG state of any other pass.
125pub struct ConcentrationPipeline {
126 passes: Vec<Box<dyn ConcentrationPass>>,
127}
128
129impl ConcentrationPipeline {
130 /// Build a pipeline from config. Returns an empty pipeline if
131 /// `cfg.enabled == false` or no passes are configured. Returns
132 /// `Err(ConcentrationPipelineError)` if a configured pass fails to
133 /// construct (e.g. Phase-2 AccountPairSubstitutionPass can't read its
134 /// PMF file).
135 pub fn from_config(cfg: &ConcentrationConfig) -> Result<Self, ConcentrationPipelineError> {
136 let mut passes: Vec<Box<dyn ConcentrationPass>> = Vec::new();
137
138 if !cfg.enabled {
139 return Ok(Self { passes });
140 }
141
142 if let Some(c) = cfg.source_conditional_rarity.as_ref() {
143 passes.push(Box::new(SourceConditionalRarityPass::new(c.clone())));
144 }
145 if let Some(c) = cfg.trading_partner_pool.as_ref() {
146 passes.push(Box::new(TradingPartnerPoolPass::new(c.clone())));
147 }
148 if let Some(c) = cfg.account_pair_substitution.as_ref() {
149 let pass = AccountPairSubstitutionPass::from_pmf_file(c.clone())
150 .map_err(ConcentrationPipelineError::AccountPairSubstitution)?;
151 passes.push(Box::new(pass));
152 }
153 // SourceBlankingPass registered LAST — earlier passes that read
154 // sap_source_code must see full coverage. See module-level docs.
155 if let Some(c) = cfg.source_blanking.as_ref() {
156 passes.push(Box::new(SourceBlankingPass::new(c.clone())));
157 }
158
159 Ok(Self { passes })
160 }
161
162 /// Execute every pass in order, each with its own ChaCha8 substream
163 /// derived deterministically from `seed`. Returns one `ConcentrationStats`
164 /// per pass in execution order.
165 pub fn run(&self, entries: &mut [JournalEntry], seed: u64) -> Vec<ConcentrationStats> {
166 // Golden-ratio multiplier keeps substreams well-separated across passes.
167 const STREAM_STRIDE: u64 = 0x9E37_79B9_7F4A_7C15;
168
169 self.passes
170 .iter()
171 .enumerate()
172 .map(|(idx, pass)| {
173 let stream_seed = seed.wrapping_add((idx as u64).wrapping_mul(STREAM_STRIDE));
174 let mut rng = ChaCha8Rng::seed_from_u64(stream_seed);
175 pass.apply(entries, &mut rng)
176 })
177 .collect()
178 }
179
180 /// True iff the pipeline has at least one configured pass.
181 pub fn is_active(&self) -> bool {
182 !self.passes.is_empty()
183 }
184
185 /// Number of configured passes (for tests / observability).
186 pub fn len(&self) -> usize {
187 self.passes.len()
188 }
189
190 /// True iff zero passes are configured.
191 pub fn is_empty(&self) -> bool {
192 self.passes.is_empty()
193 }
194}
195
196// ---------------------------------------------------------------------------
197// Tests
198// ---------------------------------------------------------------------------
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use datasynth_config::schema::{
204 SourceConditionalRarityPassConfig, TradingPartnerPoolPassConfig,
205 };
206
207 #[test]
208 fn empty_config_yields_inactive_pipeline() {
209 let cfg = ConcentrationConfig::default();
210 let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
211 assert!(!pipeline.is_active());
212 assert_eq!(pipeline.len(), 0);
213 }
214
215 #[test]
216 fn disabled_master_switch_yields_inactive_pipeline_even_with_passes() {
217 let cfg = ConcentrationConfig {
218 enabled: false, // master switch off
219 trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
220 ..Default::default()
221 };
222 let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
223 assert!(
224 !pipeline.is_active(),
225 "master-switch-off must override pass configs"
226 );
227 }
228
229 #[test]
230 fn enabled_config_registers_each_configured_pass() {
231 let cfg = ConcentrationConfig {
232 enabled: true,
233 source_conditional_rarity: Some(SourceConditionalRarityPassConfig {
234 rate: 0.01,
235 min_surprise: None,
236 min_per_source_lines: None,
237 }),
238 trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
239 ..Default::default()
240 };
241 let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
242 assert!(pipeline.is_active());
243 assert_eq!(pipeline.len(), 2);
244 }
245
246 #[test]
247 fn structural_bridge_allowlist_recognises_seven_accounts() {
248 assert_eq!(STRUCTURAL_BRIDGE_ACCOUNTS.len(), 7);
249 assert!(is_structural_bridge("1100")); // AR_CONTROL
250 assert!(is_structural_bridge("2000")); // AP_CONTROL
251 assert!(is_structural_bridge("2900")); // GR_IR_CLEARING
252 assert!(is_structural_bridge("1150")); // IC_AR_CLEARING
253 assert!(is_structural_bridge("2050")); // IC_AP_CLEARING
254 assert!(is_structural_bridge("1030")); // WIRE_CLEARING
255 assert!(is_structural_bridge("1599")); // ACQUISITION_CLEARING
256 assert!(!is_structural_bridge("6000")); // expense account — safe
257 assert!(!is_structural_bridge("4000")); // revenue — safe
258 assert!(!is_structural_bridge("")); // empty — safe
259 }
260
261 /// Pass-isolation invariant: adding a no-op pass FIRST should not change
262 /// the second pass's RNG-derived output. Verified by running a TP-pool pass
263 /// alone vs after a no-op and asserting the resulting JEs are identical.
264 #[test]
265 fn rng_streams_are_isolated_across_passes() {
266 use datasynth_core::models::{JournalEntry, JournalEntryLine};
267
268 // Pass that consumes some RNG state but doesn't touch entries.
269 struct NoopRngConsumer;
270 impl ConcentrationPass for NoopRngConsumer {
271 fn name(&self) -> &'static str {
272 "noop_rng_consumer"
273 }
274 fn apply(
275 &self,
276 entries: &mut [JournalEntry],
277 rng: &mut ChaCha8Rng,
278 ) -> ConcentrationStats {
279 use rand::RngExt;
280 // Consume some entropy so this pass has observable RNG effect.
281 let _: u64 = rng.random();
282 let _: u64 = rng.random();
283 ConcentrationStats {
284 pass: "noop_rng_consumer",
285 entries_examined: entries.len(),
286 entries_modified: 0,
287 extra: BTreeMap::new(),
288 }
289 }
290 }
291
292 // Build two identical JE batches.
293 let make_batch = || {
294 let mut jes = Vec::new();
295 for i in 0..5 {
296 let mut je = JournalEntry::new_simple(
297 format!("JE{i}"),
298 "C1".to_string(),
299 chrono::NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
300 format!("desc {i}"),
301 );
302 let line = JournalEntryLine {
303 gl_account: "6000".to_string(),
304 trading_partner: Some(format!("TP-orig-{i}")),
305 ..JournalEntryLine::default()
306 };
307 je.lines.push(line);
308 jes.push(je);
309 }
310 jes
311 };
312
313 // Pipeline A: just the TP pool pass.
314 let pipe_a = ConcentrationPipeline {
315 passes: vec![Box::new(TradingPartnerPoolPass::new(
316 TradingPartnerPoolPassConfig { target_size: 3 },
317 ))],
318 };
319 let mut batch_a = make_batch();
320 pipe_a.run(&mut batch_a, 12345);
321
322 // Pipeline B: no-op consumer + TP pool pass. With proper stream
323 // isolation each pass sees the same seed regardless of position.
324 let pipe_b = ConcentrationPipeline {
325 passes: vec![
326 Box::new(NoopRngConsumer),
327 Box::new(TradingPartnerPoolPass::new(TradingPartnerPoolPassConfig {
328 target_size: 3,
329 })),
330 ],
331 };
332 let mut batch_b = make_batch();
333 pipe_b.run(&mut batch_b, 12345);
334
335 // The TP pool pass is hash-based deterministic — it shouldn't read
336 // its rng at all. So the outputs must match byte-for-byte regardless.
337 for (a, b) in batch_a.iter().zip(batch_b.iter()) {
338 assert_eq!(a.lines[0].trading_partner, b.lines[0].trading_partner);
339 }
340 }
341}