use std::collections::BTreeMap;
use datasynth_config::schema::ConcentrationConfig;
use datasynth_core::models::JournalEntry;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use serde::Serialize;
pub mod account_pair_substitution;
pub mod consolidation_outlier;
pub mod source_blanking;
pub mod source_conditional_rarity_pass;
pub mod trading_partner_pool;
pub use account_pair_substitution::{AccountPairSubstitutionError, AccountPairSubstitutionPass};
pub use consolidation_outlier::ConsolidationOutlierPass;
pub use source_blanking::SourceBlankingPass;
pub use source_conditional_rarity_pass::SourceConditionalRarityPass;
pub use trading_partner_pool::TradingPartnerPoolPass;
pub const STRUCTURAL_BRIDGE_ACCOUNTS: &[&str] = &[
"1100", "2000", "2900", "1150", "2050", "1030", "1599", ];
#[inline]
pub fn is_structural_bridge(account: &str) -> bool {
STRUCTURAL_BRIDGE_ACCOUNTS.contains(&account)
}
pub trait ConcentrationPass: Send + Sync {
fn name(&self) -> &'static str;
fn apply(&self, entries: &mut [JournalEntry], rng: &mut ChaCha8Rng) -> ConcentrationStats;
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct ConcentrationStats {
pub pass: &'static str,
pub entries_examined: usize,
pub entries_modified: usize,
#[serde(default)]
pub extra: BTreeMap<&'static str, u64>,
}
#[derive(Debug)]
pub enum ConcentrationPipelineError {
AccountPairSubstitution(AccountPairSubstitutionError),
}
impl std::fmt::Display for ConcentrationPipelineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AccountPairSubstitution(e) => {
write!(f, "AccountPairSubstitutionPass: {e}")
}
}
}
}
impl std::error::Error for ConcentrationPipelineError {}
pub struct ConcentrationPipeline {
passes: Vec<Box<dyn ConcentrationPass>>,
}
impl ConcentrationPipeline {
pub fn from_config(cfg: &ConcentrationConfig) -> Result<Self, ConcentrationPipelineError> {
let mut passes: Vec<Box<dyn ConcentrationPass>> = Vec::new();
if !cfg.enabled {
return Ok(Self { passes });
}
if let Some(c) = cfg.source_conditional_rarity.as_ref() {
passes.push(Box::new(SourceConditionalRarityPass::new(c.clone())));
}
if let Some(c) = cfg.trading_partner_pool.as_ref() {
passes.push(Box::new(TradingPartnerPoolPass::new(c.clone())));
}
if let Some(c) = cfg.account_pair_substitution.as_ref() {
let pass = AccountPairSubstitutionPass::from_pmf_file(c.clone())
.map_err(ConcentrationPipelineError::AccountPairSubstitution)?;
passes.push(Box::new(pass));
}
if let Some(c) = cfg.consolidation_outlier.as_ref() {
passes.push(Box::new(ConsolidationOutlierPass::new(c.clone())));
}
if let Some(c) = cfg.source_blanking.as_ref() {
passes.push(Box::new(SourceBlankingPass::new(c.clone())));
}
Ok(Self { passes })
}
pub fn run(&self, entries: &mut [JournalEntry], seed: u64) -> Vec<ConcentrationStats> {
const STREAM_STRIDE: u64 = 0x9E37_79B9_7F4A_7C15;
self.passes
.iter()
.enumerate()
.map(|(idx, pass)| {
let stream_seed = seed.wrapping_add((idx as u64).wrapping_mul(STREAM_STRIDE));
let mut rng = ChaCha8Rng::seed_from_u64(stream_seed);
pass.apply(entries, &mut rng)
})
.collect()
}
pub fn is_active(&self) -> bool {
!self.passes.is_empty()
}
pub fn len(&self) -> usize {
self.passes.len()
}
pub fn is_empty(&self) -> bool {
self.passes.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use datasynth_config::schema::{
SourceConditionalRarityPassConfig, TradingPartnerPoolPassConfig,
};
#[test]
fn empty_config_yields_inactive_pipeline() {
let cfg = ConcentrationConfig::default();
let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
assert!(!pipeline.is_active());
assert_eq!(pipeline.len(), 0);
}
#[test]
fn disabled_master_switch_yields_inactive_pipeline_even_with_passes() {
let cfg = ConcentrationConfig {
enabled: false, trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
..Default::default()
};
let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
assert!(
!pipeline.is_active(),
"master-switch-off must override pass configs"
);
}
#[test]
fn enabled_config_registers_each_configured_pass() {
let cfg = ConcentrationConfig {
enabled: true,
source_conditional_rarity: Some(SourceConditionalRarityPassConfig {
rate: 0.01,
min_surprise: None,
min_per_source_lines: None,
}),
trading_partner_pool: Some(TradingPartnerPoolPassConfig { target_size: 25 }),
..Default::default()
};
let pipeline = ConcentrationPipeline::from_config(&cfg).unwrap();
assert!(pipeline.is_active());
assert_eq!(pipeline.len(), 2);
}
#[test]
fn structural_bridge_allowlist_recognises_seven_accounts() {
assert_eq!(STRUCTURAL_BRIDGE_ACCOUNTS.len(), 7);
assert!(is_structural_bridge("1100")); assert!(is_structural_bridge("2000")); assert!(is_structural_bridge("2900")); assert!(is_structural_bridge("1150")); assert!(is_structural_bridge("2050")); assert!(is_structural_bridge("1030")); assert!(is_structural_bridge("1599")); assert!(!is_structural_bridge("6000")); assert!(!is_structural_bridge("4000")); assert!(!is_structural_bridge("")); }
#[test]
fn rng_streams_are_isolated_across_passes() {
use datasynth_core::models::{JournalEntry, JournalEntryLine};
struct NoopRngConsumer;
impl ConcentrationPass for NoopRngConsumer {
fn name(&self) -> &'static str {
"noop_rng_consumer"
}
fn apply(
&self,
entries: &mut [JournalEntry],
rng: &mut ChaCha8Rng,
) -> ConcentrationStats {
use rand::RngExt;
let _: u64 = rng.random();
let _: u64 = rng.random();
ConcentrationStats {
pass: "noop_rng_consumer",
entries_examined: entries.len(),
entries_modified: 0,
extra: BTreeMap::new(),
}
}
}
let make_batch = || {
let mut jes = Vec::new();
for i in 0..5 {
let mut je = JournalEntry::new_simple(
format!("JE{i}"),
"C1".to_string(),
chrono::NaiveDate::from_ymd_opt(2024, 1, 1).unwrap(),
format!("desc {i}"),
);
let line = JournalEntryLine {
gl_account: "6000".to_string(),
trading_partner: Some(format!("TP-orig-{i}")),
..JournalEntryLine::default()
};
je.lines.push(line);
jes.push(je);
}
jes
};
let pipe_a = ConcentrationPipeline {
passes: vec![Box::new(TradingPartnerPoolPass::new(
TradingPartnerPoolPassConfig { target_size: 3 },
))],
};
let mut batch_a = make_batch();
pipe_a.run(&mut batch_a, 12345);
let pipe_b = ConcentrationPipeline {
passes: vec![
Box::new(NoopRngConsumer),
Box::new(TradingPartnerPoolPass::new(TradingPartnerPoolPassConfig {
target_size: 3,
})),
],
};
let mut batch_b = make_batch();
pipe_b.run(&mut batch_b, 12345);
for (a, b) in batch_a.iter().zip(batch_b.iter()) {
assert_eq!(a.lines[0].trading_partner, b.lines[0].trading_partner);
}
}
}