use datasynth_config::schema::TransactionVolume;
use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
use datasynth_test_utils::{
assertions::{benford_mad, check_benford_distribution, is_balanced},
fixtures::minimal_config,
};
use rust_decimal::Decimal;
use std::collections::{HashMap, HashSet};
fn sequential_config(seed: u64) -> datasynth_config::schema::GeneratorConfig {
let mut config = minimal_config();
config.global.seed = Some(seed);
config.global.period_months = 1; config.companies[0].annual_transaction_volume = TransactionVolume::TenK;
config.fraud.enabled = false;
config
}
fn parallel_config(seed: u64) -> datasynth_config::schema::GeneratorConfig {
let mut config = minimal_config();
config.global.seed = Some(seed);
config.global.period_months = 12; config.companies[0].annual_transaction_volume = TransactionVolume::TenK;
config.fraud.enabled = false;
config
}
fn phase_config_je_only() -> PhaseConfig {
PhaseConfig {
generate_master_data: false,
generate_document_flows: false,
generate_journal_entries: true,
inject_anomalies: false,
show_progress: false,
..Default::default()
}
}
fn generate(
config: datasynth_config::schema::GeneratorConfig,
) -> datasynth_runtime::EnhancedGenerationResult {
let phase = phase_config_je_only();
let mut orchestrator = EnhancedOrchestrator::new(config, phase).expect("orchestrator");
orchestrator.generate().expect("generation")
}
fn analyze_line_item_distribution(
entries: &[datasynth_core::models::JournalEntry],
) -> (f64, f64, f64, f64) {
let total = entries.len() as f64;
if total == 0.0 {
return (0.0, 0.0, 0.0, 0.0);
}
let mut two_count = 0usize;
let mut four_count = 0usize;
let mut even_count = 0usize;
let mut equal_dc_count = 0usize;
for entry in entries {
let n = entry.lines.len();
if n == 2 {
two_count += 1;
}
if n == 4 {
four_count += 1;
}
if n % 2 == 0 {
even_count += 1;
}
let debits = entry
.lines
.iter()
.filter(|l| l.debit_amount > Decimal::ZERO)
.count();
let credits = entry
.lines
.iter()
.filter(|l| l.credit_amount > Decimal::ZERO)
.count();
if debits == credits {
equal_dc_count += 1;
}
}
(
two_count as f64 / total,
four_count as f64 / total,
even_count as f64 / total,
equal_dc_count as f64 / total,
)
}
fn analyze_batch_patterns(entries: &[datasynth_core::models::JournalEntry]) -> f64 {
if entries.len() < 2 {
return 0.0;
}
let mut batch_count = 0usize;
for window in entries.windows(2) {
let prev = &window[0];
let curr = &window[1];
if prev.header.posting_date == curr.header.posting_date
&& prev.header.business_process == curr.header.business_process
&& prev.header.company_code == curr.header.company_code
{
batch_count += 1;
}
}
batch_count as f64 / entries.len() as f64
}
#[test]
fn test_sequential_all_entries_balanced() {
let result = generate(sequential_config(42));
assert!(!result.journal_entries.is_empty(), "should have entries");
let non_error_entries: Vec<_> = result
.journal_entries
.iter()
.filter(|e| {
e.header
.header_text
.as_ref()
.map(|t| !t.contains("[HUMAN_ERROR:"))
.unwrap_or(true)
})
.collect();
let mut unbalanced_count = 0;
for (i, entry) in non_error_entries.iter().enumerate() {
if !is_balanced(entry) {
let total_debits: Decimal = entry.lines.iter().map(|l| l.debit_amount).sum();
let total_credits: Decimal = entry.lines.iter().map(|l| l.credit_amount).sum();
eprintln!(
"UNBALANCED entry {} (doc_id={}): {} lines, debits={}, credits={}, diff={}, source={:?}",
i,
entry.header.document_id,
entry.lines.len(),
total_debits,
total_credits,
total_debits - total_credits,
entry.header.source,
);
unbalanced_count += 1;
}
}
assert_eq!(
unbalanced_count,
0,
"{} of {} non-error entries are unbalanced",
unbalanced_count,
non_error_entries.len()
);
}
#[test]
fn test_parallel_all_entries_balanced() {
let result = generate(parallel_config(42));
assert!(
result.journal_entries.len() >= 10_000,
"expected >= 10K entries for parallel path, got {}",
result.journal_entries.len()
);
let non_error_entries: Vec<_> = result
.journal_entries
.iter()
.filter(|e| {
e.header
.header_text
.as_ref()
.map(|t| !t.contains("[HUMAN_ERROR:"))
.unwrap_or(true)
})
.collect();
let mut unbalanced = 0;
for entry in &non_error_entries {
if !is_balanced(entry) {
unbalanced += 1;
}
}
assert_eq!(
unbalanced,
0,
"Parallel path: {} of {} non-error entries are unbalanced",
unbalanced,
non_error_entries.len()
);
}
#[test]
fn test_sequential_line_item_distribution() {
let result = generate(sequential_config(12345));
let (two_ratio, four_ratio, even_ratio, equal_dc_ratio) =
analyze_line_item_distribution(&result.journal_entries);
let n = result.journal_entries.len();
println!(
"Sequential ({} entries): two={:.1}%, four={:.1}%, even={:.1}%, equal_dc={:.1}%",
n,
two_ratio * 100.0,
four_ratio * 100.0,
even_ratio * 100.0,
equal_dc_ratio * 100.0
);
assert!(
two_ratio > 0.30,
"Expected >30% two-line entries, got {:.1}%",
two_ratio * 100.0
);
assert!(
even_ratio > 0.65,
"Expected >65% even-line entries, got {:.1}%",
even_ratio * 100.0
);
}
#[test]
fn test_parallel_line_item_distribution() {
let result = generate(parallel_config(12345));
let (two_ratio, four_ratio, even_ratio, equal_dc_ratio) =
analyze_line_item_distribution(&result.journal_entries);
let n = result.journal_entries.len();
println!(
"Parallel ({} entries): two={:.1}%, four={:.1}%, even={:.1}%, equal_dc={:.1}%",
n,
two_ratio * 100.0,
four_ratio * 100.0,
even_ratio * 100.0,
equal_dc_ratio * 100.0
);
assert!(
two_ratio > 0.40 && two_ratio < 0.80,
"Expected 40-80% two-line entries, got {:.1}%",
two_ratio * 100.0
);
assert!(
four_ratio > 0.05 && four_ratio < 0.30,
"Expected 5-30% four-line entries, got {:.1}%",
four_ratio * 100.0
);
assert!(
even_ratio > 0.70,
"Expected >70% even-line entries, got {:.1}%",
even_ratio * 100.0
);
assert!(
equal_dc_ratio > 0.60,
"Expected >60% equal debit/credit count entries, got {:.1}%",
equal_dc_ratio * 100.0
);
}
#[test]
fn test_line_item_distribution_seq_vs_par() {
let mut seq_config = minimal_config();
seq_config.global.seed = Some(55555);
seq_config.global.period_months = 1;
seq_config.companies[0].annual_transaction_volume = TransactionVolume::HundredK;
seq_config.fraud.enabled = false;
let par_config = parallel_config(55555);
let seq_result = generate(seq_config);
let par_result = generate(par_config);
let (seq_two, seq_four, seq_even, seq_dc) =
analyze_line_item_distribution(&seq_result.journal_entries);
let (par_two, par_four, par_even, par_dc) =
analyze_line_item_distribution(&par_result.journal_entries);
println!(
"Sequential ({} entries): two={:.1}%, four={:.1}%, even={:.1}%, dc={:.1}%",
seq_result.journal_entries.len(),
seq_two * 100.0,
seq_four * 100.0,
seq_even * 100.0,
seq_dc * 100.0
);
println!(
"Parallel ({} entries): two={:.1}%, four={:.1}%, even={:.1}%, dc={:.1}%",
par_result.journal_entries.len(),
par_two * 100.0,
par_four * 100.0,
par_even * 100.0,
par_dc * 100.0
);
let tolerance = 0.15;
assert!(
(seq_two - par_two).abs() < tolerance,
"Two-line ratio diverges: seq={:.3} par={:.3}",
seq_two,
par_two
);
assert!(
(seq_four - par_four).abs() < tolerance,
"Four-line ratio diverges: seq={:.3} par={:.3}",
seq_four,
par_four
);
assert!(
(seq_even - par_even).abs() < tolerance,
"Even-line ratio diverges: seq={:.3} par={:.3}",
seq_even,
par_even
);
assert!(
(seq_dc - par_dc).abs() < tolerance,
"Equal D/C ratio diverges: seq={:.3} par={:.3}",
seq_dc,
par_dc
);
}
#[test]
fn test_parallel_benford_compliance() {
let result = generate(parallel_config(99999));
let amounts: Vec<Decimal> = result
.journal_entries
.iter()
.flat_map(|e| e.lines.iter().map(|l| l.debit_amount + l.credit_amount))
.filter(|&a| a > Decimal::ZERO)
.collect();
assert!(
amounts.len() >= 10_000,
"Need sufficient amounts for Benford test, got {}",
amounts.len()
);
let (chi_squared, passes) = check_benford_distribution(&amounts);
let mad = benford_mad(&amounts);
println!(
"Parallel Benford: chi-squared={:.2}, MAD={:.4}, passes={}",
chi_squared, mad, passes
);
assert!(
mad < 0.025,
"Parallel path MAD too high: {:.4} (expected < 0.025)",
mad
);
}
#[test]
fn test_benford_seq_vs_par() {
let mut seq_config = minimal_config();
seq_config.global.seed = Some(77777);
seq_config.global.period_months = 1;
seq_config.companies[0].annual_transaction_volume = TransactionVolume::HundredK;
seq_config.fraud.enabled = false;
let par_config = parallel_config(77777);
let seq_result = generate(seq_config);
let par_result = generate(par_config);
let seq_amounts: Vec<Decimal> = seq_result
.journal_entries
.iter()
.flat_map(|e| e.lines.iter().map(|l| l.debit_amount + l.credit_amount))
.filter(|&a| a > Decimal::ZERO)
.collect();
let par_amounts: Vec<Decimal> = par_result
.journal_entries
.iter()
.flat_map(|e| e.lines.iter().map(|l| l.debit_amount + l.credit_amount))
.filter(|&a| a > Decimal::ZERO)
.collect();
let seq_mad = benford_mad(&seq_amounts);
let par_mad = benford_mad(&par_amounts);
println!(
"Benford MAD: sequential={:.4} ({} amounts), parallel={:.4} ({} amounts)",
seq_mad,
seq_amounts.len(),
par_mad,
par_amounts.len()
);
assert!(seq_mad < 0.025, "Sequential MAD too high: {}", seq_mad);
assert!(par_mad < 0.025, "Parallel MAD too high: {}", par_mad);
assert!(
(seq_mad - par_mad).abs() < 0.015,
"Benford MAD diverges: seq={:.4} par={:.4}",
seq_mad,
par_mad
);
}
#[test]
fn test_parallel_no_uuid_collisions() {
let result = generate(parallel_config(11111));
let mut doc_ids = HashSet::new();
let mut line_ids = HashSet::new();
let mut dup_docs = 0;
let mut dup_lines = 0;
for entry in &result.journal_entries {
if !doc_ids.insert(entry.header.document_id) {
dup_docs += 1;
}
for line in &entry.lines {
if !line_ids.insert((line.document_id, line.line_number)) {
dup_lines += 1;
}
}
}
assert_eq!(
dup_docs,
0,
"Found {} duplicate document IDs in {} entries",
dup_docs,
result.journal_entries.len()
);
assert_eq!(
dup_lines,
0,
"Found {} duplicate line IDs in {} lines",
dup_lines,
line_ids.len()
);
}
#[test]
#[ignore = "non-deterministic UUIDs in period-close/elimination JEs — see TODO above"]
fn test_parallel_deterministic() {
let result1 = generate(parallel_config(33333));
let result2 = generate(parallel_config(33333));
assert_eq!(
result1.journal_entries.len(),
result2.journal_entries.len(),
"Same seed should produce same count"
);
for (e1, e2) in result1
.journal_entries
.iter()
.zip(result2.journal_entries.iter())
{
assert_eq!(
e1.header.document_id, e2.header.document_id,
"Document IDs should match for deterministic generation"
);
assert_eq!(e1.header.company_code, e2.header.company_code);
assert_eq!(e1.header.posting_date, e2.header.posting_date);
assert_eq!(e1.lines.len(), e2.lines.len());
}
}
#[test]
fn test_parallel_source_distribution() {
let result = generate(parallel_config(44444));
let mut source_counts: HashMap<String, usize> = HashMap::new();
for entry in &result.journal_entries {
*source_counts
.entry(format!("{:?}", entry.header.source))
.or_default() += 1;
}
let total = result.journal_entries.len() as f64;
println!(
"Source distribution ({} entries):",
result.journal_entries.len()
);
for (source, count) in &source_counts {
println!(
" {}: {} ({:.1}%)",
source,
count,
*count as f64 / total * 100.0
);
}
assert!(
source_counts.len() >= 2,
"Expected at least 2 source types, got {}",
source_counts.len()
);
assert!(
source_counts.contains_key("Manual"),
"Expected Manual source type in output"
);
}
#[test]
fn test_parallel_batch_patterns() {
let result = generate(parallel_config(66666));
let batch_ratio = analyze_batch_patterns(&result.journal_entries);
println!(
"Parallel batch adjacency ratio: {:.1}% ({} entries)",
batch_ratio * 100.0,
result.journal_entries.len()
);
}
#[test]
fn test_parallel_line_item_histogram() {
let result = generate(parallel_config(88888));
let mut histogram: HashMap<usize, usize> = HashMap::new();
for entry in &result.journal_entries {
*histogram.entry(entry.lines.len()).or_default() += 1;
}
let total = result.journal_entries.len() as f64;
let mut items: Vec<_> = histogram.iter().collect();
items.sort_by_key(|(k, _)| *k);
println!(
"Line item count histogram ({} entries):",
result.journal_entries.len()
);
for (count, freq) in &items {
if **count <= 10 || **freq as f64 / total > 0.005 {
println!(
" {} lines: {} ({:.2}%)",
count,
freq,
**freq as f64 / total * 100.0
);
}
}
let two = *histogram.get(&2).unwrap_or(&0) as f64 / total;
assert!(
two > 0.35,
"2-line entries should be >35%, got {:.1}%",
two * 100.0
);
let unique_counts = histogram.len();
assert!(
unique_counts >= 4,
"Expected at least 4 different line counts, got {}",
unique_counts
);
let ten_plus: usize = histogram
.iter()
.filter(|(k, _)| **k >= 10)
.map(|(_, v)| v)
.sum();
let ten_plus_ratio = ten_plus as f64 / total;
assert!(
ten_plus_ratio > 0.01,
"Expected >1% entries with >=10 lines, got {:.2}%",
ten_plus_ratio * 100.0
);
}
#[test]
fn test_parallel_line_validity() {
let result = generate(parallel_config(22222));
let mut zero_count = 0;
let mut total_lines = 0;
for entry in &result.journal_entries {
assert!(
entry.lines.len() >= 2,
"Entry {} has fewer than 2 lines",
entry.header.document_id
);
for line in &entry.lines {
total_lines += 1;
let amount = line.debit_amount + line.credit_amount;
if amount == Decimal::ZERO {
zero_count += 1;
}
}
}
let zero_ratio = zero_count as f64 / total_lines as f64;
println!(
"Parallel zero-amount lines: {} of {} ({:.4}%)",
zero_count,
total_lines,
zero_ratio * 100.0
);
assert!(
zero_ratio < 0.05,
"Too many zero-amount lines: {} of {} ({:.2}%)",
zero_count,
total_lines,
zero_ratio * 100.0
);
}
#[test]
fn test_parallel_company_distribution() {
let mut config = parallel_config(54321);
config.companies = vec![
datasynth_config::schema::CompanyConfig {
code: "1000".to_string(),
name: "Parent Corp".to_string(),
currency: "USD".to_string(),
functional_currency: None,
country: "US".to_string(),
annual_transaction_volume: TransactionVolume::TenK,
volume_weight: 0.6,
fiscal_year_variant: "K4".to_string(),
},
datasynth_config::schema::CompanyConfig {
code: "2000".to_string(),
name: "Sub EU".to_string(),
currency: "EUR".to_string(),
functional_currency: None,
country: "DE".to_string(),
annual_transaction_volume: TransactionVolume::TenK,
volume_weight: 0.4,
fiscal_year_variant: "K4".to_string(),
},
];
let result = generate(config);
let mut company_counts: HashMap<String, usize> = HashMap::new();
for entry in &result.journal_entries {
*company_counts
.entry(entry.header.company_code.clone())
.or_default() += 1;
}
let total = result.journal_entries.len() as f64;
println!(
"Company distribution ({} entries):",
result.journal_entries.len()
);
for (company, count) in &company_counts {
println!(
" {}: {} ({:.1}%)",
company,
count,
*count as f64 / total * 100.0
);
}
assert!(
company_counts.contains_key("1000"),
"Company 1000 missing from output"
);
assert!(
company_counts.contains_key("2000"),
"Company 2000 missing from output"
);
let c1000_ratio = *company_counts.get("1000").unwrap_or(&0) as f64 / total;
assert!(
c1000_ratio > 0.30 && c1000_ratio < 0.70,
"Company 1000 should be represented, got {:.1}%",
c1000_ratio * 100.0
);
}