mod output_writer;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use anyhow::Result;
use clap::{Parser, Subcommand};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use datasynth_config::schema::AccountingFrameworkConfig;
use datasynth_config::{presets, GeneratorConfig};
use datasynth_core::memory_guard::{MemoryGuard, MemoryGuardConfig};
use datasynth_core::models::{CoAComplexity, IndustrySector};
use datasynth_fingerprint::{
evaluation::FidelityEvaluator,
extraction::{CsvDataSource, DataSource, ExtractionConfig, FingerprintExtractor},
io::{validate_dsf, FingerprintReader, FingerprintWriter},
models::PrivacyLevel,
privacy::PrivacyConfig,
};
use datasynth_output::{write_fec_csv, SapExportConfig, SapExporter};
use datasynth_runtime::{
export_labels_all_formats, EnhancedOrchestrator, LabelExportConfig, LabelExportSummary,
OutputFileInfo, PhaseConfig, RunManifest,
};
#[cfg(unix)]
use signal_hook::consts::SIGUSR1;
#[derive(Parser)]
#[command(name = "datasynth-data")]
#[command(about = "Synthetic Enterprise Accounting Data Generator")]
#[command(version)]
struct Cli {
#[command(subcommand)]
command: Commands,
#[arg(short, long, global = true)]
verbose: bool,
}
#[derive(Subcommand)]
#[allow(clippy::large_enum_variant)] enum Commands {
Generate {
#[arg(short, long)]
config: Option<PathBuf>,
#[arg(short, long, default_value = "./output")]
output: PathBuf,
#[arg(long)]
demo: bool,
#[arg(long)]
preset: Option<String>,
#[arg(long)]
scenario_pack: Option<String>,
#[arg(long)]
fingerprint: Option<PathBuf>,
#[arg(long, default_value = "1.0")]
scale: f64,
#[arg(short, long)]
seed: Option<u64>,
#[arg(long)]
banking: bool,
#[arg(long)]
audit: bool,
#[arg(long, default_value = "1024")]
memory_limit: usize,
#[arg(long)]
max_threads: Option<usize>,
#[arg(long)]
graph_export: bool,
#[arg(long)]
stream_target: Option<String>,
#[arg(long)]
stream_api_key: Option<String>,
#[arg(long, default_value = "1000")]
stream_batch_size: usize,
#[arg(long, default_value = "none")]
quality_gate: String,
#[arg(long)]
fiscal_year_months: Option<u32>,
#[arg(long)]
append: bool,
#[arg(long)]
months: Option<u32>,
#[arg(long, action = clap::ArgAction::Append)]
fraud_scenario: Vec<String>,
#[arg(long)]
fraud_rate: Option<f64>,
#[arg(long)]
stream_file: Option<std::path::PathBuf>,
#[arg(long = "export-format", action = clap::ArgAction::Append)]
export_format: Vec<String>,
#[arg(long)]
auto_tune: bool,
#[arg(long, default_value = "3")]
max_iterations: usize,
},
Validate {
#[arg(short, long)]
config: PathBuf,
},
Init {
#[arg(short, long, default_value = "datasynth_config.yaml")]
output: PathBuf,
#[arg(short, long, default_value = "manufacturing")]
industry: String,
#[arg(short, long, default_value = "medium")]
complexity: String,
#[arg(long)]
from_description: Option<String>,
},
Info,
Verify {
#[arg(short, long, default_value = "./output")]
output: PathBuf,
#[arg(long)]
checksums: bool,
#[arg(long)]
record_counts: bool,
},
Fingerprint {
#[command(subcommand)]
command: FingerprintCommands,
},
Scenario {
#[command(subcommand)]
command: ScenarioCommands,
},
Adversarial {
#[arg(short, long)]
model: PathBuf,
#[arg(short, long, default_value = "1000")]
probes: usize,
#[arg(short, long)]
features: usize,
#[arg(short, long, default_value = "0.5")]
threshold: f64,
#[arg(long, default_value = "0.05")]
perturbation: f64,
#[arg(short, long)]
output: Option<PathBuf>,
#[arg(short, long, default_value = "42")]
seed: u64,
},
Audit {
#[command(subcommand)]
command: AuditCommands,
},
}
#[derive(Subcommand)]
enum ScenarioCommands {
List {
#[arg(short, long)]
config: PathBuf,
},
Validate {
#[arg(short, long)]
config: PathBuf,
#[arg(short, long)]
scenario: Option<String>,
},
Generate {
#[arg(short, long)]
config: PathBuf,
#[arg(short, long, default_value = "./output")]
output: PathBuf,
#[arg(short, long)]
scenario: Option<String>,
},
Diff {
#[arg(short, long)]
baseline: PathBuf,
#[arg(long)]
counterfactual: PathBuf,
#[arg(short, long, default_value = "summary")]
format: String,
#[arg(short, long)]
output: Option<PathBuf>,
},
Export {
#[arg(short, long)]
config: PathBuf,
#[arg(short, long)]
scenario: String,
#[arg(short, long)]
output: PathBuf,
},
Import {
#[arg(required = true)]
file: PathBuf,
#[arg(short, long, default_value = "config.yaml")]
config: PathBuf,
},
}
#[derive(Subcommand)]
enum FingerprintCommands {
Extract {
#[arg(short, long)]
input: PathBuf,
#[arg(short, long)]
output: PathBuf,
#[arg(long, default_value = "standard")]
privacy_level: String,
#[arg(long)]
privacy_epsilon: Option<f64>,
#[arg(long)]
privacy_k: Option<u32>,
#[arg(long)]
sign: bool,
#[arg(long, requires = "sign")]
sign_key_hex: Option<String>,
#[arg(long, requires = "sign")]
sign_key_file: Option<PathBuf>,
#[arg(long, default_value = "default")]
sign_key_id: String,
},
Validate {
#[arg(required = true)]
file: PathBuf,
},
Info {
#[arg(required = true)]
file: PathBuf,
#[arg(long)]
detailed: bool,
},
Diff {
#[arg(required = true)]
file1: PathBuf,
#[arg(required = true)]
file2: PathBuf,
},
Evaluate {
#[arg(short, long)]
fingerprint: PathBuf,
#[arg(short, long)]
synthetic: PathBuf,
#[arg(short, long)]
output: Option<PathBuf>,
#[arg(long, default_value = "0.8")]
threshold: f64,
},
Synthesize {
#[arg(short, long)]
fingerprint: PathBuf,
#[arg(short, long, default_value = "./synthetic")]
output: PathBuf,
#[arg(short, long, default_value = "10000")]
rows: usize,
#[arg(long)]
neural: bool,
#[arg(short, long, default_value = "42")]
seed: u64,
},
}
#[derive(Subcommand)]
enum AuditCommands {
Validate {
#[arg(long, default_value = "builtin:fsa")]
blueprint: String,
},
Info {
#[arg(long, default_value = "builtin:fsa")]
blueprint: String,
},
Run {
#[arg(long, default_value = "builtin:fsa")]
blueprint: String,
#[arg(long, default_value = "builtin:default")]
overlay: String,
#[arg(short, long, default_value = "./audit_output")]
output: PathBuf,
#[arg(long, default_value = "42")]
seed: u64,
},
Diff {
#[arg(long)]
blueprint_a: String,
#[arg(long)]
blueprint_b: String,
},
Benchmark {
#[arg(long, default_value = "simple")]
complexity: String,
#[arg(long)]
anomaly_rate: Option<f64>,
#[arg(short, long, default_value = "./audit_benchmark")]
output: PathBuf,
#[arg(long, default_value = "42")]
seed: u64,
},
}
fn main() -> Result<()> {
let cli = Cli::parse();
let filter = if cli.verbose { "debug" } else { "info" };
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| filter.into()),
)
.init();
match cli.command {
Commands::Generate {
config,
output,
demo,
preset,
scenario_pack,
fingerprint,
scale,
seed,
banking,
audit,
memory_limit,
max_threads,
graph_export,
stream_target,
stream_api_key,
stream_batch_size,
quality_gate,
fiscal_year_months,
append,
months,
fraud_scenario,
fraud_rate,
stream_file,
export_format,
auto_tune,
max_iterations,
} => {
let available_cpus = num_cpus::get();
let effective_threads = max_threads.unwrap_or_else(|| {
(available_cpus / 2).clamp(1, 4)
});
if let Err(e) = rayon::ThreadPoolBuilder::new()
.num_threads(effective_threads)
.build_global()
{
eprintln!(
"Warning: failed to configure thread pool with {effective_threads} threads: {e}"
);
}
tracing::info!(
"CPU safeguard: using {} threads (of {} available)",
effective_threads,
available_cpus
);
let effective_memory_limit = if memory_limit > 0 {
memory_limit.min(get_safe_memory_limit()) } else {
1024 };
let memory_config =
MemoryGuardConfig::with_limit_mb(effective_memory_limit).aggressive();
let memory_guard = Arc::new(MemoryGuard::new(memory_config));
tracing::info!(
"Memory safeguard: {} MB limit ({} MB soft limit)",
effective_memory_limit,
(effective_memory_limit * 80) / 100
);
let initial_memory = memory_guard.current_usage_mb();
tracing::info!("Initial memory usage: {} MB", initial_memory);
#[allow(clippy::large_enum_variant)] enum ConfigOrOrchestrator {
Config(GeneratorConfig),
Orchestrator(Box<EnhancedOrchestrator>),
}
let config_or_orchestrator = if demo {
tracing::info!("Using demo preset (conservative settings)");
ConfigOrOrchestrator::Config(create_safe_demo_preset())
} else if let Some(ref fp_path) = fingerprint {
tracing::info!("Generating from fingerprint: {}", fp_path.display());
tracing::info!("Scale factor: {:.2}", scale);
let phase_config = PhaseConfig {
generate_banking: banking,
generate_audit: audit,
generate_graph_export: graph_export,
show_progress: true,
inject_anomalies: true, inject_data_quality: true,
..PhaseConfig::default()
};
let orchestrator =
EnhancedOrchestrator::from_fingerprint(fp_path, phase_config, scale)?;
ConfigOrOrchestrator::Orchestrator(Box::new(orchestrator))
} else if let Some(ref pack) = scenario_pack {
tracing::info!("Loading scenario pack: {}", pack);
let scenario_path = find_scenario_pack(pack)?;
let content = std::fs::read_to_string(&scenario_path)?;
let mut cfg: GeneratorConfig = serde_yaml::from_str(&content)?;
apply_safety_limits(&mut cfg);
ConfigOrOrchestrator::Config(cfg)
} else if let Some(config_path) = config {
let content = std::fs::read_to_string(&config_path)?;
let mut cfg: GeneratorConfig = serde_yaml::from_str(&content)?;
apply_safety_limits(&mut cfg);
ConfigOrOrchestrator::Config(cfg)
} else {
tracing::info!("No config specified, using safe demo preset");
ConfigOrOrchestrator::Config(create_safe_demo_preset())
};
let config_or_orchestrator = match config_or_orchestrator {
ConfigOrOrchestrator::Config(mut cfg) => {
if let Some(s) = seed {
cfg.global.seed = Some(s);
}
if banking {
cfg.banking.enabled = true;
cfg.banking.population.retail_customers =
cfg.banking.population.retail_customers.min(100);
cfg.banking.population.business_customers =
cfg.banking.population.business_customers.min(20);
cfg.banking.population.trusts = cfg.banking.population.trusts.min(5);
tracing::info!("Banking KYC/AML generation enabled (conservative mode)");
}
if graph_export {
cfg.graph_export.enabled = true;
tracing::info!("Graph export enabled (PyTorch Geometric format)");
}
if let Some(ref target) = stream_target {
cfg.graph_export.enabled = true;
cfg.graph_export.hypergraph.enabled = true;
cfg.graph_export.hypergraph.output_format = "unified".to_string();
cfg.graph_export.hypergraph.stream_target = Some(target.clone());
cfg.graph_export.hypergraph.stream_batch_size = stream_batch_size;
if let Some(ref key) = stream_api_key {
std::env::set_var("RUSTGRAPH_API_KEY", key);
tracing::debug!("API key set from CLI argument");
}
tracing::info!("Streaming unified hypergraph to: {}", target);
}
if let Some(fy_months) = fiscal_year_months {
cfg.global.fiscal_year_months = Some(fy_months);
}
if let Some(ref preset_name) = preset {
match preset_name.as_str() {
"audit-group" => {
cfg = presets::audit_group_overlay(cfg);
tracing::info!("Applied 'audit-group' overlay preset");
}
other => {
tracing::warn!(
"Unknown preset '{}'; supported: audit-group",
other
);
}
}
}
if !fraud_scenario.is_empty() {
cfg =
datasynth_config::fraud_packs::apply_fraud_packs(&cfg, &fraud_scenario)
.map_err(|e| anyhow::anyhow!("Failed to apply fraud packs: {e}"))?;
tracing::info!("Applied fraud packs: {:?}", fraud_scenario);
}
if let Some(rate) = fraud_rate {
cfg.fraud.enabled = true;
cfg.fraud.fraud_rate = rate;
}
if let Some(ref stream_path) = stream_file {
tracing::info!("Streaming to: {}", stream_path.display());
}
cfg.output.output_directory = output.clone();
cfg.global.parallel = false;
cfg.global.worker_threads = effective_threads;
cfg.global.memory_limit_mb = effective_memory_limit;
ConfigOrOrchestrator::Config(cfg)
}
orch @ ConfigOrOrchestrator::Orchestrator(_) => {
orch
}
};
let generator_config = match &config_or_orchestrator {
ConfigOrOrchestrator::Config(cfg) => cfg.clone(),
ConfigOrOrchestrator::Orchestrator(_) => {
tracing::warn!(
"Fingerprint-based generation: manifest uses approximate config metadata"
);
create_safe_demo_preset()
}
};
tracing::info!("Starting generation...");
match &config_or_orchestrator {
ConfigOrOrchestrator::Config(cfg) => {
tracing::info!("Industry: {:?}", cfg.global.industry);
tracing::info!("Period: {} months", cfg.global.period_months);
tracing::info!("Companies: {}", cfg.companies.len());
}
ConfigOrOrchestrator::Orchestrator(_) => {
tracing::info!("Mode: Fingerprint-based generation (scale: {:.2})", scale);
}
}
let pause_flag = Arc::new(AtomicBool::new(false));
#[cfg(unix)]
{
let pause_flag_clone = Arc::clone(&pause_flag);
let signal_flag = Arc::new(AtomicBool::new(false));
let signal_flag_clone = Arc::clone(&signal_flag);
if signal_hook::flag::register(SIGUSR1, signal_flag_clone).is_ok() {
let pid = std::process::id();
tracing::info!("Pause/resume: send SIGUSR1 to toggle (kill -USR1 {})", pid);
std::thread::spawn(move || loop {
if signal_flag.swap(false, Ordering::Relaxed) {
let was_paused = pause_flag_clone.load(Ordering::Relaxed);
pause_flag_clone.store(!was_paused, Ordering::Relaxed);
if was_paused {
eprintln!("\n>>> RESUMED");
} else {
eprintln!("\n>>> PAUSED - send SIGUSR1 again to resume");
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
});
}
}
if let Err(e) = memory_guard.check_now() {
tracing::error!("Memory limit already exceeded before generation: {}", e);
return Err(anyhow::anyhow!("Insufficient memory to start generation"));
}
if append {
if let ConfigOrOrchestrator::Config(cfg) = config_or_orchestrator {
use datasynth_runtime::generation_session::GenerationSession;
let dss_path = output.join("session.dss");
if !dss_path.exists() {
eprintln!(
"Error: No session.dss found in output directory. Cannot append."
);
std::process::exit(1);
}
let additional = months.unwrap_or(12);
let mut session = GenerationSession::resume(&dss_path, cfg)?;
let results = session.generate_delta(additional)?;
session.save(&dss_path)?;
println!("\nIncremental generation complete ({additional} new months):");
for r in &results {
println!(
" {} - {} JEs, {:.1}s",
r.period.label, r.journal_entry_count, r.duration_secs
);
}
return Ok(());
} else {
return Err(anyhow::anyhow!(
"--append is not supported with fingerprint-based generation"
));
}
}
if let ConfigOrOrchestrator::Config(ref cfg) = config_or_orchestrator {
let fy_months = cfg.global.fiscal_year_months;
let total_months = cfg.global.period_months;
let use_session = fy_months.is_some() && fy_months.unwrap() < total_months;
if use_session {
if let ConfigOrOrchestrator::Config(cfg) = config_or_orchestrator {
use datasynth_runtime::generation_session::GenerationSession;
let mut session = GenerationSession::new(cfg, output.clone())?;
let results = session.generate_all()?;
let dss_path = output.join("session.dss");
session.save(&dss_path)?;
println!("\nMulti-period generation complete:");
for r in &results {
println!(
" {} - {} JEs, {} docs, {:.1}s",
r.period.label,
r.journal_entry_count,
r.document_count,
r.duration_secs
);
}
return Ok(());
}
}
}
let effective_seed = generator_config.global.seed.unwrap_or(42);
let config_for_manifest = generator_config.clone();
let mut orchestrator = match config_or_orchestrator {
ConfigOrOrchestrator::Orchestrator(orch) => {
tracing::info!("Using orchestrator from fingerprint");
*orch
}
ConfigOrOrchestrator::Config(cfg) => {
let mut phase_config = PhaseConfig::from_config(&cfg);
if banking {
phase_config.generate_banking = true;
}
if audit {
phase_config.generate_audit = true;
}
if graph_export {
phase_config.generate_graph_export = true;
}
phase_config.show_progress = true;
phase_config.p2p_chains = phase_config.p2p_chains.min(50);
phase_config.o2c_chains = phase_config.o2c_chains.min(50);
phase_config.vendors_per_company = phase_config.vendors_per_company.min(20);
phase_config.customers_per_company = phase_config.customers_per_company.min(30);
phase_config.materials_per_company = phase_config.materials_per_company.min(50);
phase_config.assets_per_company = phase_config.assets_per_company.min(20);
phase_config.employees_per_company = phase_config.employees_per_company.min(30);
EnhancedOrchestrator::new(cfg, phase_config)?
}
};
let result = orchestrator.generate()?;
if auto_tune {
use datasynth_eval::{AiTuner, AiTunerConfig};
let llm_provider = datasynth_core::llm::MockLlmProvider::new(42);
let tuner_config = AiTunerConfig {
max_iterations,
use_llm: true,
..AiTunerConfig::default()
};
let mut tuner = AiTuner::new(&llm_provider, tuner_config);
let eval = datasynth_eval::ComprehensiveEvaluation::new();
let iteration = tuner.analyze_iteration(&eval, 1);
tracing::info!(
"Auto-tune iteration 1: health={:.2}, rule_patches={}, ai_patches={}, applied={}",
iteration.health_score,
iteration.rule_patches.len(),
iteration.ai_patches.len(),
iteration.applied_patches.len(),
);
if !iteration.applied_patches.is_empty() {
tracing::info!("Suggested config patches:");
for patch in &iteration.applied_patches {
tracing::info!(
" {} = {} (confidence: {:.2})",
patch.path,
patch.suggested_value,
patch.confidence
);
}
}
tracing::info!("Auto-tune complete ({} iterations)", max_iterations);
}
tracing::info!("Generation complete!");
tracing::info!("Total entries: {}", result.statistics.total_entries);
tracing::info!("Total line items: {}", result.statistics.total_line_items);
tracing::info!("Accounts in CoA: {}", result.statistics.accounts_count);
let stats = memory_guard.stats();
let peak_mb = stats.peak_resident_bytes / (1024 * 1024);
let current_mb = stats.resident_bytes / (1024 * 1024);
tracing::info!(
"Memory usage: current {} MB, peak {} MB",
current_mb,
peak_mb
);
if stats.soft_limit_warnings > 0 {
tracing::warn!(
"Memory soft limit was exceeded {} times during generation",
stats.soft_limit_warnings
);
}
if result.statistics.banking_customer_count > 0 {
tracing::info!(
"Banking: {} customers, {} accounts, {} transactions ({} suspicious)",
result.statistics.banking_customer_count,
result.statistics.banking_account_count,
result.statistics.banking_transaction_count,
result.statistics.banking_suspicious_count
);
}
if result.statistics.audit_engagement_count > 0 {
tracing::info!(
"Audit: {} engagements, {} workpapers, {} findings",
result.statistics.audit_engagement_count,
result.statistics.audit_workpaper_count,
result.statistics.audit_finding_count
);
}
std::fs::create_dir_all(&output)?;
if memory_guard.check_now().is_err() {
tracing::warn!("Memory limit reached, writing minimal output");
}
datasynth_core::serde_decimal::set_numeric_native(
generator_config.output.numeric_mode == datasynth_config::NumericMode::Native,
);
if let Err(e) = output_writer::write_all_output_with_layout(
&result,
&output,
generator_config.output.export_layout,
&generator_config.output.formats,
) {
tracing::warn!("Some output files may not have been written: {}", e);
}
datasynth_core::serde_decimal::set_numeric_native(false);
if matches!(
config_for_manifest.accounting_standards.framework,
Some(AccountingFrameworkConfig::FrenchGaap)
) && !result.journal_entries.is_empty()
{
let fec_path = output.join("fec.csv");
match write_fec_csv(
&fec_path,
&result.journal_entries,
&result.chart_of_accounts,
) {
Ok(()) => tracing::info!(
"FEC (18 columns) written to: {} ({} entries, {} lines)",
fec_path.display(),
result.journal_entries.len(),
result
.journal_entries
.iter()
.map(|e| e.lines.len())
.sum::<usize>()
),
Err(e) => tracing::warn!("Could not write FEC file: {}", e),
}
}
if matches!(
config_for_manifest.accounting_standards.framework,
Some(AccountingFrameworkConfig::GermanGaap)
) && !result.journal_entries.is_empty()
{
let gobd_dir = output.join("gobd_export");
if let Err(e) = std::fs::create_dir_all(&gobd_dir) {
tracing::warn!("Could not create gobd_export directory: {}", e);
} else {
match datasynth_output::write_gobd_journal_csv(
&gobd_dir.join("gobd_journal.csv"),
&result.journal_entries,
&result.chart_of_accounts,
) {
Ok(()) => tracing::info!(
"GoBD journal (13 columns) written: {} entries",
result.journal_entries.len()
),
Err(e) => tracing::warn!("Could not write GoBD journal: {}", e),
}
match datasynth_output::write_gobd_accounts_csv(
&gobd_dir.join("gobd_accounts.csv"),
&result.chart_of_accounts,
) {
Ok(()) => tracing::info!(
"GoBD accounts written: {} accounts",
result.chart_of_accounts.accounts.len()
),
Err(e) => tracing::warn!("Could not write GoBD accounts: {}", e),
}
let company_code = config_for_manifest
.companies
.first()
.map(|c| c.code.as_str())
.unwrap_or("UNKNOWN");
let fiscal_year: i32 = config_for_manifest
.global
.start_date
.split('-')
.next()
.and_then(|y| y.parse().ok())
.unwrap_or(2024);
let tables = vec![
("gobd_journal.csv", "Buchungsjournal"),
("gobd_accounts.csv", "Kontenplan"),
];
match datasynth_output::write_gobd_index_xml(
&gobd_dir.join("index.xml"),
company_code,
fiscal_year,
&tables,
) {
Ok(()) => tracing::info!("GoBD index.xml written"),
Err(e) => tracing::warn!("Could not write GoBD index.xml: {}", e),
}
}
}
for fmt in &export_format {
match fmt.to_ascii_lowercase().as_str() {
"sap" => {
let sap_dir = output.join("sap_export");
if let Err(e) = std::fs::create_dir_all(&sap_dir) {
tracing::warn!("Could not create sap_export directory: {}", e);
} else if result.journal_entries.is_empty() {
tracing::warn!("SAP export skipped: no journal entries");
} else {
let sap_config = SapExportConfig::default();
let mut sap_exporter = SapExporter::new(sap_config);
match sap_exporter.export_to_files(&result.journal_entries, &sap_dir) {
Ok(files) => {
tracing::info!(
"SAP export: {} tables written to {}",
files.len(),
sap_dir.display()
);
for (table, path) in &files {
tracing::info!(
" SAP {}: {}",
format!("{:?}", table),
path
);
}
}
Err(e) => tracing::warn!("SAP export failed: {}", e),
}
}
}
"fec" => {
if result.journal_entries.is_empty() {
tracing::warn!("FEC export skipped: no journal entries");
} else {
let fec_path = output.join("fec_export.csv");
match write_fec_csv(
&fec_path,
&result.journal_entries,
&result.chart_of_accounts,
) {
Ok(()) => tracing::info!(
"FEC export written to: {} ({} entries)",
fec_path.display(),
result.journal_entries.len()
),
Err(e) => tracing::warn!("FEC export failed: {}", e),
}
}
}
"gobd" => {
let gobd_dir = output.join("gobd_explicit");
if let Err(e) = std::fs::create_dir_all(&gobd_dir) {
tracing::warn!("Could not create gobd_explicit directory: {}", e);
} else if result.journal_entries.is_empty() {
tracing::warn!("GoBD export skipped: no journal entries");
} else {
match datasynth_output::write_gobd_journal_csv(
&gobd_dir.join("gobd_journal.csv"),
&result.journal_entries,
&result.chart_of_accounts,
) {
Ok(()) => tracing::info!(
"GoBD journal written: {} entries",
result.journal_entries.len()
),
Err(e) => tracing::warn!("GoBD journal export failed: {}", e),
}
match datasynth_output::write_gobd_accounts_csv(
&gobd_dir.join("gobd_accounts.csv"),
&result.chart_of_accounts,
) {
Ok(()) => tracing::info!(
"GoBD accounts written: {} accounts",
result.chart_of_accounts.accounts.len()
),
Err(e) => tracing::warn!("GoBD accounts export failed: {}", e),
}
let company_code_exp = config_for_manifest
.companies
.first()
.map(|c| c.code.as_str())
.unwrap_or("UNKNOWN");
let fiscal_year_exp: i32 = config_for_manifest
.global
.start_date
.split('-')
.next()
.and_then(|y| y.parse().ok())
.unwrap_or(2024);
let tables_exp = vec![
("gobd_journal.csv", "Buchungsjournal"),
("gobd_accounts.csv", "Kontenplan"),
];
match datasynth_output::write_gobd_index_xml(
&gobd_dir.join("index.xml"),
company_code_exp,
fiscal_year_exp,
&tables_exp,
) {
Ok(()) => tracing::info!("GoBD explicit index.xml written"),
Err(e) => {
tracing::warn!("GoBD explicit index.xml failed: {}", e)
}
}
}
}
unknown => {
tracing::warn!(
"Unknown --export-format value '{}'; valid options: sap, fec, gobd",
unknown
);
}
}
}
if !result.anomaly_labels.labels.is_empty() {
let labels_dir = output.join("labels");
std::fs::create_dir_all(&labels_dir)?;
let export_config = LabelExportConfig::default();
match export_labels_all_formats(
&result.anomaly_labels.labels,
&labels_dir,
"anomaly_labels",
&export_config,
) {
Ok(results) => {
for (path, count) in &results {
tracing::info!(
"Anomaly labels written to: {} ({} labels)",
path,
count
);
}
}
Err(e) => {
tracing::warn!("Failed to write anomaly labels: {}", e);
}
}
let summary = LabelExportSummary::from_labels(&result.anomaly_labels.labels);
if let Err(e) =
summary.write_to_file(&labels_dir.join("anomaly_labels_summary.json"))
{
tracing::warn!("Failed to write anomaly label summary: {}", e);
}
tracing::info!(
"Anomaly labels: {} total, {} with provenance, {} in clusters",
summary.total_labels,
summary.with_provenance,
summary.in_clusters
);
}
let mut manifest = RunManifest::new(&config_for_manifest, effective_seed);
manifest.set_output_directory(&output);
manifest.complete(result.statistics.clone());
if !result.journal_entries.is_empty() {
let total_lines: usize =
result.journal_entries.iter().map(|je| je.lines.len()).sum();
manifest.add_output_file(OutputFileInfo {
path: "journal_entries.csv".to_string(),
format: "csv".to_string(),
record_count: Some(total_lines),
size_bytes: None,
sha256_checksum: None,
first_record_index: None,
last_record_index: None,
});
manifest.add_output_file(OutputFileInfo {
path: "journal_entries.json".to_string(),
format: "json".to_string(),
record_count: Some(result.journal_entries.len()),
size_bytes: None,
sha256_checksum: None,
first_record_index: None,
last_record_index: None,
});
}
for (name, count) in [
("master_data/vendors.json", result.master_data.vendors.len()),
(
"master_data/customers.json",
result.master_data.customers.len(),
),
(
"master_data/materials.json",
result.master_data.materials.len(),
),
(
"master_data/fixed_assets.json",
result.master_data.assets.len(),
),
(
"master_data/employees.json",
result.master_data.employees.len(),
),
] {
if count > 0 {
manifest.add_output_file(OutputFileInfo {
path: name.to_string(),
format: "json".to_string(),
record_count: Some(count),
size_bytes: None,
sha256_checksum: None,
first_record_index: None,
last_record_index: None,
});
}
}
for (name, count) in [
(
"document_flows/purchase_orders.json",
result.document_flows.purchase_orders.len(),
),
(
"document_flows/goods_receipts.json",
result.document_flows.goods_receipts.len(),
),
(
"document_flows/vendor_invoices.json",
result.document_flows.vendor_invoices.len(),
),
(
"document_flows/payments.json",
result.document_flows.payments.len(),
),
(
"document_flows/sales_orders.json",
result.document_flows.sales_orders.len(),
),
(
"document_flows/deliveries.json",
result.document_flows.deliveries.len(),
),
(
"document_flows/customer_invoices.json",
result.document_flows.customer_invoices.len(),
),
] {
if count > 0 {
manifest.add_output_file(OutputFileInfo {
path: name.to_string(),
format: "json".to_string(),
record_count: Some(count),
size_bytes: None,
sha256_checksum: None,
first_record_index: None,
last_record_index: None,
});
}
}
if !result.anomaly_labels.labels.is_empty() {
manifest.add_output_file(OutputFileInfo {
path: "labels/anomaly_labels.csv".to_string(),
format: "csv".to_string(),
record_count: Some(result.anomaly_labels.labels.len()),
size_bytes: None,
sha256_checksum: None,
first_record_index: None,
last_record_index: None,
});
}
let mut register = |path: &str, count: usize| {
if count > 0 {
manifest.add_output_file(OutputFileInfo {
path: path.to_string(),
format: "json".to_string(),
record_count: Some(count),
size_bytes: None,
sha256_checksum: None,
first_record_index: None,
last_record_index: None,
});
}
};
register(
"subledger/ar_invoices.json",
result.subledger.ar_invoices.len(),
);
register(
"subledger/ap_invoices.json",
result.subledger.ap_invoices.len(),
);
register(
"subledger/fa_records.json",
result.subledger.fa_records.len(),
);
register(
"subledger/inventory_positions.json",
result.subledger.inventory_positions.len(),
);
register(
"subledger/inventory_movements.json",
result.subledger.inventory_movements.len(),
);
register(
"subledger/ar_aging.json",
result.subledger.ar_aging_reports.len(),
);
register(
"subledger/ap_aging.json",
result.subledger.ap_aging_reports.len(),
);
register(
"subledger/depreciation_runs.json",
result.subledger.depreciation_runs.len(),
);
register(
"subledger/inventory_valuation.json",
result.subledger.inventory_valuations.len(),
);
register(
"audit/audit_engagements.json",
result.audit.engagements.len(),
);
register("audit/audit_workpapers.json", result.audit.workpapers.len());
register("audit/audit_evidence.json", result.audit.evidence.len());
register(
"audit/audit_risk_assessments.json",
result.audit.risk_assessments.len(),
);
register("audit/audit_findings.json", result.audit.findings.len());
register("audit/audit_judgments.json", result.audit.judgments.len());
register(
"audit/audit_opinions.json",
result.audit.audit_opinions.len(),
);
register(
"audit/key_audit_matters.json",
result.audit.key_audit_matters.len(),
);
register(
"audit/sox_302_certifications.json",
result.audit.sox_302_certifications.len(),
);
register(
"audit/sox_404_assessments.json",
result.audit.sox_404_assessments.len(),
);
register(
"audit/materiality_calculations.json",
result.audit.materiality_calculations.len(),
);
register(
"audit/combined_risk_assessments.json",
result.audit.combined_risk_assessments.len(),
);
register(
"banking/banking_customers.json",
result.banking.customers.len(),
);
register(
"banking/banking_transactions.json",
result.banking.transactions.len(),
);
register(
"banking/banking_accounts.json",
result.banking.accounts.len(),
);
register(
"banking/aml_transaction_labels.json",
result.banking.transaction_labels.len(),
);
register(
"banking/aml_customer_labels.json",
result.banking.customer_labels.len(),
);
register(
"banking/aml_account_labels.json",
result.banking.account_labels.len(),
);
register(
"banking/aml_relationship_labels.json",
result.banking.relationship_labels.len(),
);
register(
"banking/aml_narratives.json",
result.banking.narratives.len(),
);
register(
"sourcing/sourcing_projects.json",
result.sourcing.sourcing_projects.len(),
);
register(
"sourcing/spend_analyses.json",
result.sourcing.spend_analyses.len(),
);
register(
"sourcing/supplier_qualifications.json",
result.sourcing.qualifications.len(),
);
register("sourcing/rfx_events.json", result.sourcing.rfx_events.len());
register("sourcing/supplier_bids.json", result.sourcing.bids.len());
register(
"sourcing/bid_evaluations.json",
result.sourcing.bid_evaluations.len(),
);
register(
"sourcing/procurement_contracts.json",
result.sourcing.contracts.len(),
);
register(
"sourcing/catalog_items.json",
result.sourcing.catalog_items.len(),
);
register(
"sourcing/supplier_scorecards.json",
result.sourcing.scorecards.len(),
);
register(
"intercompany/ic_matched_pairs.json",
result.intercompany.matched_pairs.len(),
);
register(
"intercompany/ic_elimination_entries.json",
result.intercompany.elimination_entries.len(),
);
register(
"intercompany/ic_seller_journal_entries.json",
result.intercompany.seller_journal_entries.len(),
);
register(
"intercompany/ic_buyer_journal_entries.json",
result.intercompany.buyer_journal_entries.len(),
);
register(
"financial_reporting/financial_statements.json",
result.financial_reporting.financial_statements.len(),
);
register(
"financial_reporting/bank_reconciliations.json",
result.financial_reporting.bank_reconciliations.len(),
);
register(
"period_close/trial_balances.json",
result.financial_reporting.trial_balances.len(),
);
register("hr/payroll_runs.json", result.hr.payroll_runs.len());
register("hr/time_entries.json", result.hr.time_entries.len());
register("hr/expense_reports.json", result.hr.expense_reports.len());
register(
"hr/payroll_line_items.json",
result.hr.payroll_line_items.len(),
);
register(
"manufacturing/production_orders.json",
result.manufacturing.production_orders.len(),
);
register(
"manufacturing/quality_inspections.json",
result.manufacturing.quality_inspections.len(),
);
register(
"manufacturing/cycle_counts.json",
result.manufacturing.cycle_counts.len(),
);
register(
"sales_kpi_budgets/sales_quotes.json",
result.sales_kpi_budgets.sales_quotes.len(),
);
register(
"sales_kpi_budgets/management_kpis.json",
result.sales_kpi_budgets.kpis.len(),
);
register(
"sales_kpi_budgets/budgets.json",
result.sales_kpi_budgets.budgets.len(),
);
register(
"internal_controls/internal_controls.json",
result.internal_controls.len(),
);
register(
"accounting_standards/customer_contracts.json",
result.accounting_standards.contracts.len(),
);
register(
"accounting_standards/impairment_tests.json",
result.accounting_standards.impairment_tests.len(),
);
register(
"accounting_standards/business_combinations.json",
result.accounting_standards.business_combinations.len(),
);
register(
"accounting_standards/business_combination_journal_entries.json",
result
.accounting_standards
.business_combination_journal_entries
.len(),
);
register(
"treasury/debt_instruments.json",
result.treasury.debt_instruments.len(),
);
register(
"treasury/hedging_instruments.json",
result.treasury.hedging_instruments.len(),
);
register(
"treasury/hedge_relationships.json",
result.treasury.hedge_relationships.len(),
);
register(
"treasury/cash_positions.json",
result.treasury.cash_positions.len(),
);
register(
"treasury/cash_forecasts.json",
result.treasury.cash_forecasts.len(),
);
register("treasury/cash_pools.json", result.treasury.cash_pools.len());
register(
"treasury/cash_pool_sweeps.json",
result.treasury.cash_pool_sweeps.len(),
);
register(
"treasury/treasury_anomaly_labels.json",
result.treasury.treasury_anomaly_labels.len(),
);
register(
"project_accounting/projects.json",
result.project_accounting.projects.len(),
);
register(
"project_accounting/change_orders.json",
result.project_accounting.change_orders.len(),
);
register(
"project_accounting/milestones.json",
result.project_accounting.milestones.len(),
);
register(
"project_accounting/cost_lines.json",
result.project_accounting.cost_lines.len(),
);
register(
"project_accounting/revenue_records.json",
result.project_accounting.revenue_records.len(),
);
register(
"project_accounting/earned_value_metrics.json",
result.project_accounting.earned_value_metrics.len(),
);
register("tax/tax_provisions.json", result.tax.tax_provisions.len());
register("tax/tax_jurisdictions.json", result.tax.jurisdictions.len());
register("tax/tax_codes.json", result.tax.codes.len());
register("tax/tax_lines.json", result.tax.tax_lines.len());
register("tax/tax_returns.json", result.tax.tax_returns.len());
register(
"tax/withholding_records.json",
result.tax.withholding_records.len(),
);
register(
"tax/tax_anomaly_labels.json",
result.tax.tax_anomaly_labels.len(),
);
register(
"tax/temporary_differences.json",
result.tax.deferred_tax.temporary_differences.len(),
);
register(
"tax/etr_reconciliation.json",
result.tax.deferred_tax.etr_reconciliations.len(),
);
register(
"tax/deferred_tax_rollforward.json",
result.tax.deferred_tax.rollforwards.len(),
);
register(
"tax/deferred_tax_journal_entries.json",
result.tax.deferred_tax.journal_entries.len(),
);
register("esg/emission_records.json", result.esg.emissions.len());
register("esg/energy_consumption.json", result.esg.energy.len());
register("esg/water_usage.json", result.esg.water.len());
register("esg/waste_records.json", result.esg.waste.len());
register("esg/workforce_diversity.json", result.esg.diversity.len());
register("esg/pay_equity.json", result.esg.pay_equity.len());
register(
"esg/safety_incidents.json",
result.esg.safety_incidents.len(),
);
register("esg/safety_metrics.json", result.esg.safety_metrics.len());
register("esg/governance_metrics.json", result.esg.governance.len());
register(
"esg/supplier_esg_assessments.json",
result.esg.supplier_assessments.len(),
);
register(
"esg/materiality_assessments.json",
result.esg.materiality.len(),
);
register("esg/esg_disclosures.json", result.esg.disclosures.len());
register(
"esg/climate_scenarios.json",
result.esg.climate_scenarios.len(),
);
register(
"esg/esg_anomaly_labels.json",
result.esg.anomaly_labels.len(),
);
register(
"balance/opening_balances.json",
result.opening_balances.len(),
);
register(
"balance/subledger_reconciliation.json",
result.subledger_reconciliation.len(),
);
register("process_mining/event_log.json", result.ocpm.event_count);
register("chart_of_accounts.json", 1);
register("generation_statistics.json", 1);
if let Some(ref lineage) = result.lineage {
manifest.lineage = Some(lineage.clone());
let lineage_path = output.join("lineage_graph.json");
if let Ok(json) = lineage.to_json() {
if let Err(e) = std::fs::write(&lineage_path, json) {
tracing::warn!("Failed to write lineage graph: {}", e);
} else {
tracing::info!(
"Lineage graph written to: {} ({} nodes, {} edges)",
lineage_path.display(),
lineage.node_count(),
lineage.edge_count()
);
}
}
}
{
let prov_path = output.join("prov.json");
let prov_doc = datasynth_runtime::prov::manifest_to_prov(&manifest);
match serde_json::to_string_pretty(&prov_doc) {
Ok(json) => {
if let Err(e) = std::fs::write(&prov_path, json) {
tracing::warn!("Failed to write PROV-JSON: {}", e);
} else {
tracing::info!("PROV-JSON written to: {}", prov_path.display());
}
}
Err(e) => tracing::warn!("Failed to serialize PROV-JSON: {}", e),
}
}
manifest.populate_file_checksums(&output);
let manifest_path = output.join("run_manifest.json");
if let Err(e) = manifest.write_to_file(&manifest_path) {
tracing::warn!("Failed to write run manifest: {}", e);
} else {
tracing::info!(
"Run manifest written to: {} (run_id: {})",
manifest_path.display(),
manifest.run_id()
);
}
if quality_gate != "none" {
if let Some(profile) = datasynth_eval::gates::get_profile(&quality_gate) {
tracing::warn!(
"Quality gate evaluation not yet integrated with generation output — requires ComprehensiveEvaluation population"
);
let evaluation = datasynth_eval::ComprehensiveEvaluation::new();
let gate_result =
datasynth_eval::gates::GateEngine::evaluate(&evaluation, &profile);
println!();
println!(
"Quality Gate Evaluation (profile: {})",
gate_result.profile_name
);
println!("==========================================");
for check in &gate_result.results {
let status = if check.passed { "PASS" } else { "FAIL" };
println!(" [{}] {}: {}", status, check.gate_name, check.message);
}
println!();
println!(
"Result: {}/{} gates passed",
gate_result.gates_passed, gate_result.gates_total
);
println!("{}", gate_result.summary);
if !gate_result.passed {
tracing::error!(
"Quality gates FAILED: {}/{}",
gate_result.gates_total - gate_result.gates_passed,
gate_result.gates_total
);
std::process::exit(2);
}
} else {
tracing::warn!(
"Unknown quality gate profile '{}'. Valid profiles: none, lenient, default, strict",
quality_gate
);
}
}
Ok(())
}
Commands::Validate { config } => {
let content = std::fs::read_to_string(&config)?;
let generator_config: GeneratorConfig = serde_yaml::from_str(&content)?;
datasynth_config::validate_config(&generator_config)?;
tracing::info!("Configuration is valid!");
Ok(())
}
Commands::Init {
output,
industry,
complexity,
from_description,
} => {
if let Some(desc) = from_description {
#[cfg(feature = "llm")]
let provider: Box<dyn datasynth_core::llm::LlmProvider> = {
let (env_var_name, provider_type, base_url) =
if std::env::var("ANTHROPIC_API_KEY").is_ok() {
(
"ANTHROPIC_API_KEY",
datasynth_core::llm::LlmProviderType::Anthropic,
None,
)
} else if std::env::var("OPENROUTER_API_KEY").is_ok() {
(
"OPENROUTER_API_KEY",
datasynth_core::llm::LlmProviderType::OpenAi,
Some("https://openrouter.ai/api".to_string()),
)
} else if let Ok(k) = std::env::var("OPENAI_API_KEY") {
let base = if k.starts_with("sk-or-") {
Some("https://openrouter.ai/api".to_string())
} else {
None
};
(
"OPENAI_API_KEY",
datasynth_core::llm::LlmProviderType::OpenAi,
base,
)
} else {
("", datasynth_core::llm::LlmProviderType::Mock, None)
};
if env_var_name.is_empty() {
Box::new(datasynth_core::llm::MockLlmProvider::new(42))
} else {
let config = datasynth_core::llm::LlmConfig {
provider: provider_type,
api_key_env: env_var_name.to_string(),
base_url,
..Default::default()
};
match datasynth_core::llm::HttpLlmProvider::new(config) {
Ok(p) => {
tracing::info!("Using real LLM provider for config generation");
Box::new(p)
}
Err(e) => {
tracing::warn!("Failed to init LLM provider: {e}, using fallback");
Box::new(datasynth_core::llm::MockLlmProvider::new(42))
}
}
}
};
#[cfg(not(feature = "llm"))]
let provider: Box<dyn datasynth_core::llm::LlmProvider> =
Box::new(datasynth_core::llm::MockLlmProvider::new(42));
let yaml = datasynth_core::llm::nl_config::NlConfigGenerator::generate_full(
&desc,
provider.as_ref(),
)
.map_err(|e| anyhow::anyhow!("{e}"))?;
std::fs::write(&output, &yaml)?;
tracing::info!(
"Configuration generated from description and written to: {}",
output.display()
);
return Ok(());
}
let industry_lower = industry.to_lowercase();
let industry_sector = match industry_lower.as_str() {
"manufacturing" => IndustrySector::Manufacturing,
"retail" => IndustrySector::Retail,
"financial" | "financial_services" => IndustrySector::FinancialServices,
"healthcare" => IndustrySector::Healthcare,
"technology" | "tech" => IndustrySector::Technology,
_ => {
eprintln!(
"Warning: unrecognized industry '{industry}'. Valid values: manufacturing, retail, financial_services, healthcare, technology. Defaulting to manufacturing."
);
IndustrySector::Manufacturing
}
};
let complexity_lower = complexity.to_lowercase();
let coa_complexity = match complexity_lower.as_str() {
"small" => CoAComplexity::Small,
"medium" => CoAComplexity::Medium,
"large" => CoAComplexity::Large,
_ => {
eprintln!(
"Warning: unrecognized complexity '{complexity}'. Valid values: small, medium, large. Defaulting to medium."
);
CoAComplexity::Medium
}
};
let config = presets::create_preset(
industry_sector,
2,
12,
coa_complexity,
datasynth_config::TransactionVolume::TenK, );
let yaml = serde_yaml::to_string(&config)?;
std::fs::write(&output, yaml)?;
tracing::info!("Configuration written to: {}", output.display());
Ok(())
}
Commands::Info => {
println!("Available Industry Presets:");
println!(" - manufacturing: Manufacturing industry");
println!(" - retail: Retail industry");
println!(" - financial_services: Financial services");
println!(" - healthcare: Healthcare industry");
println!(" - technology: Technology industry");
println!();
println!("Chart of Accounts Complexity:");
println!(" - small: ~100 accounts");
println!(" - medium: ~400 accounts");
println!(" - large: ~2500 accounts");
println!();
println!("Transaction Volumes:");
println!(" - ten_k: 10,000 transactions/year");
println!(" - hundred_k: 100,000 transactions/year");
println!(" - one_m: 1,000,000 transactions/year");
println!(" - ten_m: 10,000,000 transactions/year");
println!(" - hundred_m: 100,000,000 transactions/year");
println!();
println!("Resource Safeguards:");
println!(" --memory-limit <MB> : Set memory limit (default: 1024 MB)");
println!(" --max-threads <N> : Limit CPU threads (default: half of cores, max 4)");
Ok(())
}
Commands::Verify {
output,
checksums,
record_counts,
} => {
let manifest_path = output.join("run_manifest.json");
if !manifest_path.exists() {
anyhow::bail!("No run_manifest.json found in {}", output.display());
}
let manifest_json = std::fs::read_to_string(&manifest_path)?;
let manifest: RunManifest = serde_json::from_str(&manifest_json)?;
println!("Verifying output: {}", output.display());
println!(" Manifest version: {}", manifest.manifest_version);
println!(" Run ID: {}", manifest.run_id);
println!(" Generator version: {}", manifest.generator_version);
println!(" Output files: {}", manifest.output_files.len());
println!();
let mut all_pass = true;
let mut checked = 0;
let mut passed = 0;
let mut failed = 0;
for file_info in &manifest.output_files {
let file_path = output.join(&file_info.path);
checked += 1;
if file_path.exists() {
passed += 1;
println!(" [PASS] {} exists", file_info.path);
} else {
failed += 1;
all_pass = false;
println!(" [FAIL] {} missing", file_info.path);
}
}
if checksums {
println!();
println!("Checksum verification:");
let results = manifest.verify_file_checksums(&output);
for result in &results {
match result.status {
datasynth_runtime::ChecksumStatus::Ok => {
println!(" [PASS] {} checksum OK", result.path);
passed += 1;
}
datasynth_runtime::ChecksumStatus::Mismatch => {
println!(" [FAIL] {} checksum MISMATCH", result.path);
if let (Some(ref exp), Some(ref act)) =
(&result.expected, &result.actual)
{
println!(" expected: {exp}");
println!(" actual: {act}");
}
failed += 1;
all_pass = false;
}
datasynth_runtime::ChecksumStatus::Missing => {
println!(" [FAIL] {} file missing", result.path);
failed += 1;
all_pass = false;
}
datasynth_runtime::ChecksumStatus::NoChecksum => {
println!(" [SKIP] {} no checksum recorded", result.path);
}
}
checked += 1;
}
}
if record_counts {
println!();
println!("Record count verification:");
for file_info in &manifest.output_files {
let file_path = output.join(&file_info.path);
if let Some(expected_count) = file_info.record_count {
checked += 1;
if file_path.exists() {
let content = std::fs::read_to_string(&file_path).unwrap_or_default();
let line_count = if file_info.format == "csv" {
content.lines().count().saturating_sub(1) } else if file_info.format == "json" {
if let Ok(arr) =
serde_json::from_str::<Vec<serde_json::Value>>(&content)
{
arr.len()
} else {
content.lines().count()
}
} else {
content.lines().count()
};
if line_count == expected_count {
println!(
" [PASS] {} count: {} records",
file_info.path, expected_count
);
passed += 1;
} else {
println!(
" [WARN] {} count: expected {}, found {}",
file_info.path, expected_count, line_count
);
passed += 1;
}
} else {
println!(" [SKIP] {} file missing", file_info.path);
}
}
}
}
println!();
println!("Summary: {checked} checked, {passed} passed, {failed} failed");
if all_pass {
println!("Verification: PASSED");
Ok(())
} else {
anyhow::bail!("Verification: FAILED ({failed} failures)");
}
}
Commands::Fingerprint { command } => handle_fingerprint_command(command),
Commands::Scenario { command } => handle_scenario_command(command),
Commands::Adversarial {
model,
probes,
features,
threshold,
perturbation,
output: out_path,
seed: adv_seed,
} => {
tracing::info!("Adversarial model probing: {}", model.display());
tracing::info!(
"Probes: {}, features: {}, threshold: {}, perturbation: {}",
probes,
features,
threshold,
perturbation
);
#[cfg(feature = "adversarial")]
{
use datasynth_eval::adversarial::{ModelProbe, ModelProbeConfig};
let config = ModelProbeConfig {
n_features: features,
n_probes: probes,
perturbation_budget: perturbation,
threshold,
target_class: 0,
};
let mut probe =
ModelProbe::load(&model, config).map_err(|e| anyhow::anyhow!("{e}"))?;
let result = probe
.probe(&[], adv_seed)
.map_err(|e| anyhow::anyhow!("{e}"))?;
tracing::info!("Probe results:");
tracing::info!(" Mean score: {:.4}", result.stats.mean_score);
tracing::info!(
" Positive rate: {:.2}%",
result.stats.positive_rate * 100.0
);
tracing::info!(
" Boundary samples (<0.1 margin): {}",
result.stats.boundary_samples
);
tracing::info!(" Mean margin: {:.4}", result.stats.mean_margin);
if let Some(ref path) = out_path {
let json = serde_json::to_string_pretty(&result)?;
std::fs::write(path, json)?;
tracing::info!("Results written to: {}", path.display());
}
}
#[cfg(not(feature = "adversarial"))]
{
let _ = (
model,
probes,
features,
threshold,
perturbation,
out_path,
adv_seed,
);
tracing::error!(
"Adversarial testing requires the 'adversarial' feature. \
Build with: cargo build --features adversarial"
);
Err(anyhow::anyhow!("adversarial feature not enabled"))
}
}
Commands::Audit { command } => match command {
AuditCommands::Validate { blueprint } => handle_audit_validate(&blueprint),
AuditCommands::Info { blueprint } => handle_audit_info(&blueprint),
AuditCommands::Run {
blueprint,
overlay,
output,
seed,
} => handle_audit_run(&blueprint, &overlay, &output, seed),
AuditCommands::Diff {
blueprint_a,
blueprint_b,
} => handle_audit_diff(&blueprint_a, &blueprint_b),
AuditCommands::Benchmark {
complexity,
anomaly_rate,
output,
seed,
} => handle_audit_benchmark(&complexity, anomaly_rate, &output, seed),
},
}
}
fn resolve_signing_key(
hex: Option<&str>,
file: Option<&std::path::Path>,
key_id: &str,
) -> Result<datasynth_fingerprint::io::signing::SigningKey> {
use datasynth_fingerprint::io::signing::SigningKey;
if let Some(h) = hex {
return SigningKey::from_hex(key_id, h.trim())
.map_err(|e| anyhow::anyhow!("invalid --sign-key-hex: {e}"));
}
if let Some(path) = file {
let raw = std::fs::read_to_string(path)
.map_err(|e| anyhow::anyhow!("reading sign-key-file {}: {e}", path.display()))?;
return SigningKey::from_hex(key_id, raw.trim())
.map_err(|e| anyhow::anyhow!("invalid key in {}: {e}", path.display()));
}
if let Ok(env_hex) = std::env::var("DATASYNTH_FINGERPRINT_KEY") {
return SigningKey::from_hex(key_id, env_hex.trim())
.map_err(|e| anyhow::anyhow!("invalid DATASYNTH_FINGERPRINT_KEY: {e}"));
}
let key = SigningKey::generate(key_id);
tracing::warn!(
"No signing key provided; generated an ephemeral HMAC-SHA256 key. \
Save this hex-encoded key to verify the fingerprint later: {}",
key.to_hex()
);
Ok(key)
}
fn handle_fingerprint_command(command: FingerprintCommands) -> Result<()> {
match command {
FingerprintCommands::Extract {
input,
output,
privacy_level,
privacy_epsilon,
privacy_k,
sign,
sign_key_hex,
sign_key_file,
sign_key_id,
} => {
tracing::info!("Extracting fingerprint from: {}", input.display());
let level = match privacy_level.to_lowercase().as_str() {
"minimal" => PrivacyLevel::Minimal,
"standard" => PrivacyLevel::Standard,
"high" => PrivacyLevel::High,
"maximum" => PrivacyLevel::Maximum,
_ => {
tracing::warn!("Unknown privacy level '{}', using standard", privacy_level);
PrivacyLevel::Standard
}
};
let mut privacy_config = PrivacyConfig::from_level(level);
if let Some(eps) = privacy_epsilon {
privacy_config.epsilon = eps;
}
if let Some(k) = privacy_k {
privacy_config.k_anonymity = k;
}
let extraction_config = ExtractionConfig {
privacy: privacy_config,
..Default::default()
};
let data_source = if input.is_file() {
DataSource::Csv(CsvDataSource::new(input.clone()))
} else {
let csv_files: Vec<_> = std::fs::read_dir(&input)?
.filter_map(std::result::Result::ok)
.filter(|e| e.path().extension().is_some_and(|ext| ext == "csv"))
.collect();
if csv_files.is_empty() {
anyhow::bail!("No CSV files found in directory: {}", input.display());
}
let first_csv = csv_files[0].path();
tracing::info!("Using CSV file: {}", first_csv.display());
DataSource::Csv(CsvDataSource::new(first_csv))
};
let extractor = FingerprintExtractor::with_config(extraction_config);
let fingerprint = extractor.extract(&data_source)?;
let writer = FingerprintWriter::new();
if sign {
use datasynth_fingerprint::io::signing::DsfSigner;
let key = resolve_signing_key(
sign_key_hex.as_deref(),
sign_key_file.as_deref(),
&sign_key_id,
)?;
let signer = DsfSigner::new(key);
writer.write_to_file_signed(&fingerprint, &output, &signer)?;
tracing::info!(
"Signed fingerprint (key_id={}) written to: {}",
signer.key_id(),
output.display()
);
} else {
writer.write_to_file(&fingerprint, &output)?;
tracing::info!("Fingerprint written to: {}", output.display());
}
tracing::info!(
"Privacy audit: {} actions recorded",
fingerprint.privacy_audit.actions.len()
);
tracing::info!(
"Epsilon spent: {:.3} of {:.3} budget",
fingerprint.privacy_audit.total_epsilon_spent,
fingerprint.privacy_audit.epsilon_budget
);
Ok(())
}
FingerprintCommands::Validate { file } => {
tracing::info!("Validating fingerprint: {}", file.display());
match validate_dsf(&file) {
Ok(report) => {
if report.is_valid {
println!("✓ Fingerprint is valid");
println!(" Version: {}", report.version);
println!(" Components: {:?}", report.components);
if !report.warnings.is_empty() {
println!(" Warnings:");
for warning in &report.warnings {
println!(" - {warning}");
}
}
} else {
println!("✗ Fingerprint validation failed");
for error in &report.errors {
println!(" Error: {error}");
}
}
}
Err(e) => {
println!("✗ Failed to validate fingerprint: {e}");
return Err(e.into());
}
}
Ok(())
}
FingerprintCommands::Info { file, detailed } => {
let reader = FingerprintReader::new();
let fingerprint = reader.read_from_file(&file)?;
println!("Fingerprint Information");
println!("=======================");
println!();
println!("Manifest:");
println!(" Version: {}", fingerprint.manifest.version);
println!(" Format: {}", fingerprint.manifest.format);
println!(" Created: {}", fingerprint.manifest.created_at);
println!();
println!("Source:");
println!(" Description: {}", fingerprint.manifest.source.description);
println!(" Tables: {}", fingerprint.manifest.source.table_count);
println!(" Total Rows: {}", fingerprint.manifest.source.total_rows);
if let Some(ref industry) = fingerprint.manifest.source.industry {
println!(" Industry: {industry}");
}
println!();
println!("Privacy:");
println!(" Level: {:?}", fingerprint.manifest.privacy.level);
println!(" Epsilon: {}", fingerprint.manifest.privacy.epsilon);
println!(
" K-Anonymity: {}",
fingerprint.manifest.privacy.k_anonymity
);
println!();
println!("Schema:");
println!(" Tables: {}", fingerprint.schema.tables.len());
for (name, table) in &fingerprint.schema.tables {
println!(" - {} ({} columns)", name, table.columns.len());
}
println!();
println!("Statistics:");
println!(
" Numeric columns: {}",
fingerprint.statistics.numeric_columns.len()
);
println!(
" Categorical columns: {}",
fingerprint.statistics.categorical_columns.len()
);
if detailed {
println!();
println!("Detailed Statistics:");
for (name, stats) in &fingerprint.statistics.numeric_columns {
println!(" {name}:");
println!(" Count: {}", stats.count);
println!(" Min: {:.2}, Max: {:.2}", stats.min, stats.max);
println!(" Mean: {:.2}, StdDev: {:.2}", stats.mean, stats.std_dev);
println!(" Distribution: {:?}", stats.distribution);
}
for (name, stats) in &fingerprint.statistics.categorical_columns {
println!(" {name}:");
println!(" Count: {}", stats.count);
println!(" Cardinality: {}", stats.cardinality);
println!(" Top values: {}", stats.top_values.len());
}
}
println!();
println!("Privacy Audit:");
println!(
" Total actions: {}",
fingerprint.privacy_audit.actions.len()
);
println!(
" Epsilon spent: {:.3}",
fingerprint.privacy_audit.total_epsilon_spent
);
println!(" Warnings: {}", fingerprint.privacy_audit.warnings.len());
Ok(())
}
FingerprintCommands::Diff { file1, file2 } => {
let reader = FingerprintReader::new();
let fp1 = reader.read_from_file(&file1)?;
let fp2 = reader.read_from_file(&file2)?;
println!("Fingerprint Comparison");
println!("======================");
println!();
println!("Manifests:");
if fp1.manifest.version != fp2.manifest.version {
println!(
" Version: {} vs {}",
fp1.manifest.version, fp2.manifest.version
);
}
if fp1.manifest.privacy.level != fp2.manifest.privacy.level {
println!(
" Privacy Level: {:?} vs {:?}",
fp1.manifest.privacy.level, fp2.manifest.privacy.level
);
}
if fp1.manifest.privacy.epsilon != fp2.manifest.privacy.epsilon {
println!(
" Epsilon: {} vs {}",
fp1.manifest.privacy.epsilon, fp2.manifest.privacy.epsilon
);
}
println!();
println!("Schema:");
let tables1: std::collections::HashSet<_> = fp1.schema.tables.keys().collect();
let tables2: std::collections::HashSet<_> = fp2.schema.tables.keys().collect();
let only_in_1: Vec<_> = tables1.difference(&tables2).collect();
let only_in_2: Vec<_> = tables2.difference(&tables1).collect();
let common: Vec<_> = tables1.intersection(&tables2).collect();
if !only_in_1.is_empty() {
println!(" Only in {}: {:?}", file1.display(), only_in_1);
}
if !only_in_2.is_empty() {
println!(" Only in {}: {:?}", file2.display(), only_in_2);
}
println!(" Common tables: {}", common.len());
println!();
println!("Statistics:");
println!(
" Numeric columns: {} vs {}",
fp1.statistics.numeric_columns.len(),
fp2.statistics.numeric_columns.len()
);
println!(
" Categorical columns: {} vs {}",
fp1.statistics.categorical_columns.len(),
fp2.statistics.categorical_columns.len()
);
for col in fp1.statistics.numeric_columns.keys() {
if let (Some(s1), Some(s2)) = (
fp1.statistics.numeric_columns.get(col),
fp2.statistics.numeric_columns.get(col),
) {
let mean_diff = (s1.mean - s2.mean).abs();
let std_diff = (s1.std_dev - s2.std_dev).abs();
if mean_diff > 0.01 || std_diff > 0.01 {
println!(" {col}:");
println!(
" Mean: {:.2} vs {:.2} (diff: {:.2})",
s1.mean, s2.mean, mean_diff
);
println!(
" StdDev: {:.2} vs {:.2} (diff: {:.2})",
s1.std_dev, s2.std_dev, std_diff
);
}
}
}
Ok(())
}
FingerprintCommands::Evaluate {
fingerprint,
synthetic,
output,
threshold,
} => {
tracing::info!("Evaluating fidelity of synthetic data");
tracing::info!(" Fingerprint: {}", fingerprint.display());
tracing::info!(" Synthetic data: {}", synthetic.display());
let reader = FingerprintReader::new();
let fp = reader.read_from_file(&fingerprint)?;
let csv_files: Vec<PathBuf> = std::fs::read_dir(&synthetic)?
.filter_map(std::result::Result::ok)
.filter(|e| e.path().extension().is_some_and(|ext| ext == "csv"))
.map(|e| e.path())
.collect();
if csv_files.is_empty() {
anyhow::bail!(
"No CSV files found in synthetic directory: {}",
synthetic.display()
);
}
tracing::info!(" Found {} CSV file(s) to evaluate", csv_files.len());
let extractor = FingerprintExtractor::new();
let evaluator = FidelityEvaluator::with_threshold(threshold);
let mut all_reports = Vec::with_capacity(csv_files.len());
for csv_path in &csv_files {
tracing::info!(" Evaluating: {}", csv_path.display());
let data_source = DataSource::Csv(CsvDataSource::new(csv_path.clone()));
match extractor.extract(&data_source) {
Ok(synthetic_fp) => match evaluator.evaluate_fingerprints(&fp, &synthetic_fp) {
Ok(r) => all_reports.push(r),
Err(e) => {
tracing::warn!(
" Skipping {} — evaluation error: {}",
csv_path.display(),
e
);
}
},
Err(e) => {
tracing::warn!(
" Skipping {} — extraction error: {}",
csv_path.display(),
e
);
}
}
}
if all_reports.is_empty() {
anyhow::bail!("No CSV files could be evaluated successfully");
}
let n = all_reports.len() as f64;
use datasynth_fingerprint::evaluation::FidelityReport;
let report = FidelityReport {
overall_score: all_reports.iter().map(|r| r.overall_score).sum::<f64>() / n,
statistical_fidelity: all_reports
.iter()
.map(|r| r.statistical_fidelity)
.sum::<f64>()
/ n,
correlation_fidelity: all_reports
.iter()
.map(|r| r.correlation_fidelity)
.sum::<f64>()
/ n,
schema_fidelity: all_reports.iter().map(|r| r.schema_fidelity).sum::<f64>() / n,
rule_compliance: all_reports.iter().map(|r| r.rule_compliance).sum::<f64>() / n,
anomaly_fidelity: all_reports.iter().map(|r| r.anomaly_fidelity).sum::<f64>() / n,
passes: all_reports.iter().all(|r| r.passes),
details: all_reports
.into_iter()
.next()
.map(|r| r.details)
.unwrap_or_default(),
};
println!();
println!("Fidelity Report");
println!("===============");
println!();
println!("Overall Score: {:.1}%", report.overall_score * 100.0);
println!("Threshold: {:.1}%", threshold * 100.0);
println!(
"Status: {}",
if report.passes {
"PASS ✓"
} else {
"FAIL ✗"
}
);
println!();
println!("Component Scores:");
println!(
" Statistical Fidelity: {:.1}%",
report.statistical_fidelity * 100.0
);
println!(
" Correlation Fidelity: {:.1}%",
report.correlation_fidelity * 100.0
);
println!(
" Schema Fidelity: {:.1}%",
report.schema_fidelity * 100.0
);
println!(
" Rule Compliance: {:.1}%",
report.rule_compliance * 100.0
);
println!(
" Anomaly Fidelity: {:.1}%",
report.anomaly_fidelity * 100.0
);
if let Some(output_path) = output {
let json = serde_json::to_string_pretty(&report)?;
std::fs::write(&output_path, json)?;
tracing::info!("Report written to: {}", output_path.display());
}
if !report.passes {
anyhow::bail!(
"Fidelity check failed: {:.1}% < {:.1}%",
report.overall_score * 100.0,
threshold * 100.0
);
}
Ok(())
}
FingerprintCommands::Synthesize {
fingerprint,
output: synth_output,
rows,
neural,
seed: synth_seed,
} => {
tracing::info!("Fingerprint → Synthesize pipeline");
tracing::info!(" Fingerprint: {}", fingerprint.display());
tracing::info!(" Output: {}", synth_output.display());
tracing::info!(" Rows: {}, Neural: {}, Seed: {}", rows, neural, synth_seed);
let reader = FingerprintReader::new();
let fp = reader.read_from_file(&fingerprint)?;
let col_names: Vec<String> = fp.statistics.numeric_columns.keys().cloned().collect();
let n_cols = col_names.len();
if n_cols == 0 {
anyhow::bail!("Fingerprint has no numeric columns to synthesize from");
}
tracing::info!(" Columns: {} ({})", n_cols, col_names.join(", "));
use datasynth_core::diffusion::{
ColumnDiffusionParams, ColumnType, DiffusionConfig, DiffusionTrainer,
};
let column_params: Vec<ColumnDiffusionParams> = col_names
.iter()
.map(|name| {
let stats = &fp.statistics.numeric_columns[name];
ColumnDiffusionParams {
name: name.clone(),
mean: stats.mean,
std: stats.std_dev.max(1e-8),
min: stats.min,
max: stats.max,
col_type: ColumnType::Continuous,
}
})
.collect();
let corr: Vec<Vec<f64>> = (0..n_cols)
.map(|i| {
(0..n_cols)
.map(|j| if i == j { 1.0 } else { 0.0 })
.collect()
})
.collect();
let diffusion_config = DiffusionConfig {
n_steps: 100,
schedule: datasynth_core::diffusion::NoiseScheduleType::Cosine,
seed: synth_seed,
};
let model = DiffusionTrainer::fit(column_params, corr, diffusion_config);
let samples = model.generate(rows, synth_seed);
std::fs::create_dir_all(&synth_output)?;
let csv_path = synth_output.join("synthesized.csv");
let mut writer = csv::Writer::from_path(&csv_path)?;
writer.write_record(&col_names)?;
for row in &samples {
let fields: Vec<String> = row.iter().map(|v| format!("{v:.6}")).collect();
writer.write_record(&fields)?;
}
writer.flush()?;
tracing::info!(
"Synthesized {} rows x {} columns → {}",
samples.len(),
n_cols,
csv_path.display()
);
if neural {
tracing::info!(
"Neural enhancement requested. Build with --features neural for \
NeuralDiffusionTrainer-based synthesis."
);
}
Ok(())
}
}
}
fn find_scenario_pack(pack: &str) -> Result<PathBuf> {
let pack_name = pack.trim_end_matches(".yaml");
let search_paths = [
PathBuf::from(format!("templates/scenarios/{pack_name}.yaml")),
PathBuf::from(format!("./templates/scenarios/{pack_name}.yaml")),
std::env::current_exe()
.ok()
.and_then(|p| p.parent().map(std::path::Path::to_path_buf))
.map(|p| p.join(format!("templates/scenarios/{pack_name}.yaml")))
.unwrap_or_default(),
];
for path in search_paths.iter() {
if path.exists() {
tracing::info!("Found scenario pack at: {}", path.display());
return Ok(path.clone());
}
}
let available = list_available_scenarios();
anyhow::bail!(
"Scenario pack '{}' not found.\n\nAvailable scenario packs:\n{}",
pack,
available.join("\n")
);
}
fn list_available_scenarios() -> Vec<String> {
let mut scenarios = Vec::new();
let base_path = PathBuf::from("templates/scenarios");
if let Ok(industries) = std::fs::read_dir(&base_path) {
for industry in industries.flatten() {
if industry.path().is_dir() {
let industry_name = industry.file_name().to_string_lossy().to_string();
if let Ok(files) = std::fs::read_dir(industry.path()) {
for file in files.flatten() {
let file_name = file.file_name().to_string_lossy().to_string();
if file_name.ends_with(".yaml") {
let scenario_name = file_name.trim_end_matches(".yaml");
scenarios.push(format!(" - {industry_name}/{scenario_name}"));
}
}
}
}
}
}
if scenarios.is_empty() {
scenarios.push(" (no scenario packs found in templates/scenarios/)".to_string());
}
scenarios
}
fn create_safe_demo_preset() -> GeneratorConfig {
use datasynth_config::schema::*;
GeneratorConfig {
global: GlobalConfig {
industry: IndustrySector::Manufacturing,
start_date: "2024-01-01".to_string(),
period_months: 1, seed: Some(42),
parallel: false,
group_currency: "USD".to_string(),
presentation_currency: None,
worker_threads: 2,
memory_limit_mb: 512,
fiscal_year_months: None,
},
companies: vec![CompanyConfig {
code: "DEMO".to_string(),
name: "Demo Company".to_string(),
currency: "USD".to_string(),
functional_currency: None,
country: "US".to_string(),
annual_transaction_volume: TransactionVolume::TenK, volume_weight: 1.0,
fiscal_year_variant: "K4".to_string(),
}],
chart_of_accounts: ChartOfAccountsConfig {
complexity: CoAComplexity::Small,
industry_specific: false,
custom_accounts: None,
min_hierarchy_depth: 2,
max_hierarchy_depth: 3,
},
transactions: TransactionConfig::default(),
output: OutputConfig::default(),
fraud: FraudConfig {
enabled: false,
..Default::default()
},
internal_controls: InternalControlsConfig::default(),
business_processes: BusinessProcessConfig::default(),
user_personas: UserPersonaConfig::default(),
templates: TemplateConfig::default(),
approval: ApprovalConfig::default(),
departments: DepartmentConfig::default(),
master_data: MasterDataConfig::default(),
document_flows: DocumentFlowConfig::default(),
intercompany: IntercompanyConfig::default(),
balance: BalanceConfig::default(),
ocpm: OcpmConfig::default(),
audit: AuditGenerationConfig {
enabled: true,
fsm: Some(AuditFsmConfig {
enabled: true,
blueprint: "builtin:fsa".into(),
overlay: "builtin:default".into(),
..Default::default()
}),
..Default::default()
},
banking: datasynth_banking::BankingConfig::small(), data_quality: DataQualitySchemaConfig::default(),
scenario: datasynth_config::schema::ScenarioConfig::default(),
temporal: datasynth_config::schema::TemporalDriftConfig::default(),
graph_export: datasynth_config::schema::GraphExportConfig::default(),
streaming: datasynth_config::schema::StreamingSchemaConfig::default(),
rate_limit: datasynth_config::schema::RateLimitSchemaConfig::default(),
temporal_attributes: datasynth_config::schema::TemporalAttributeSchemaConfig::default(),
relationships: datasynth_config::schema::RelationshipSchemaConfig::default(),
accounting_standards: datasynth_config::schema::AccountingStandardsConfig::default(),
audit_standards: datasynth_config::schema::AuditStandardsConfig::default(),
distributions: datasynth_config::schema::AdvancedDistributionConfig::default(),
temporal_patterns: datasynth_config::schema::TemporalPatternsConfig::default(),
vendor_network: datasynth_config::schema::VendorNetworkSchemaConfig::default(),
customer_segmentation: datasynth_config::schema::CustomerSegmentationSchemaConfig::default(
),
relationship_strength: datasynth_config::schema::RelationshipStrengthSchemaConfig::default(
),
cross_process_links: datasynth_config::schema::CrossProcessLinksSchemaConfig::default(),
organizational_events: datasynth_config::schema::OrganizationalEventsSchemaConfig::default(
),
behavioral_drift: datasynth_config::schema::BehavioralDriftSchemaConfig::default(),
market_drift: datasynth_config::schema::MarketDriftSchemaConfig::default(),
drift_labeling: datasynth_config::schema::DriftLabelingSchemaConfig::default(),
anomaly_injection: Default::default(),
industry_specific: Default::default(),
fingerprint_privacy: Default::default(),
quality_gates: Default::default(),
compliance: Default::default(),
webhooks: Default::default(),
llm: Default::default(),
diffusion: Default::default(),
causal: Default::default(),
source_to_pay: Default::default(),
financial_reporting: Default::default(),
hr: Default::default(),
manufacturing: Default::default(),
sales_quotes: Default::default(),
tax: Default::default(),
treasury: Default::default(),
project_accounting: Default::default(),
esg: Default::default(),
country_packs: None,
scenarios: Default::default(),
session: Default::default(),
compliance_regulations: Default::default(),
}
}
fn apply_safety_limits(config: &mut GeneratorConfig) {
if config.global.period_months > 12 {
tracing::warn!(
"Safety limit: period_months truncated from {} to 12",
config.global.period_months
);
config.global.period_months = 12;
}
for company in &mut config.companies {
let original = company.annual_transaction_volume;
company.annual_transaction_volume = match company.annual_transaction_volume {
datasynth_config::TransactionVolume::OneM
| datasynth_config::TransactionVolume::TenM
| datasynth_config::TransactionVolume::HundredM => {
tracing::warn!(
"Safety limit: transaction volume for company '{}' capped from {:?} to HundredK",
company.code,
original
);
datasynth_config::TransactionVolume::HundredK
}
other => other,
};
}
if config.banking.enabled {
let orig_retail = config.banking.population.retail_customers;
let orig_business = config.banking.population.business_customers;
let orig_trusts = config.banking.population.trusts;
config.banking.population.retail_customers = orig_retail.min(500);
config.banking.population.business_customers = orig_business.min(100);
config.banking.population.trusts = orig_trusts.min(20);
if orig_retail > 500 || orig_business > 100 || orig_trusts > 20 {
tracing::warn!(
"Safety limit: banking population capped (retail: {} -> {}, business: {} -> {}, trusts: {} -> {})",
orig_retail,
config.banking.population.retail_customers,
orig_business,
config.banking.population.business_customers,
orig_trusts,
config.banking.population.trusts,
);
}
}
config.global.parallel = false;
config.global.worker_threads = config.global.worker_threads.min(4);
}
fn get_safe_memory_limit() -> usize {
#[cfg(target_os = "linux")]
{
if let Ok(content) = std::fs::read_to_string("/proc/meminfo") {
for line in content.lines() {
if line.starts_with("MemAvailable:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(kb) = parts[1].parse::<usize>() {
let mb = kb / 1024;
return (mb / 2).min(4096);
}
}
break;
}
}
}
}
1024
}
fn resolve_blueprint(s: &str) -> Result<datasynth_audit_fsm::loader::BlueprintWithPreconditions> {
use datasynth_audit_fsm::loader::BlueprintWithPreconditions;
match s {
"builtin:fsa" => {
BlueprintWithPreconditions::load_builtin_fsa().map_err(|e| anyhow::anyhow!("{e}"))
}
"builtin:ia" => {
BlueprintWithPreconditions::load_builtin_ia().map_err(|e| anyhow::anyhow!("{e}"))
}
"builtin:kpmg" => {
BlueprintWithPreconditions::load_builtin_kpmg().map_err(|e| anyhow::anyhow!("{e}"))
}
"builtin:pwc" => {
BlueprintWithPreconditions::load_builtin_pwc().map_err(|e| anyhow::anyhow!("{e}"))
}
"builtin:deloitte" => {
BlueprintWithPreconditions::load_builtin_deloitte().map_err(|e| anyhow::anyhow!("{e}"))
}
"builtin:ey_gam_lite" | "ey_gam_lite" => {
BlueprintWithPreconditions::load_builtin_ey_gam_lite()
.map_err(|e| anyhow::anyhow!("{e}"))
}
"builtin:soc2" => {
BlueprintWithPreconditions::load_builtin_soc2().map_err(|e| anyhow::anyhow!("{e}"))
}
"builtin:pcaob" => {
BlueprintWithPreconditions::load_builtin_pcaob().map_err(|e| anyhow::anyhow!("{e}"))
}
"builtin:regulatory" => BlueprintWithPreconditions::load_builtin_regulatory()
.map_err(|e| anyhow::anyhow!("{e}")),
path => BlueprintWithPreconditions::load_from_file(PathBuf::from(path))
.map_err(|e| anyhow::anyhow!("{e}")),
}
}
fn resolve_overlay(s: &str) -> Result<datasynth_audit_fsm::schema::GenerationOverlay> {
use datasynth_audit_fsm::loader::{load_overlay, BuiltinOverlay, OverlaySource};
let source = match s {
"builtin:default" => OverlaySource::Builtin(BuiltinOverlay::Default),
"builtin:thorough" => OverlaySource::Builtin(BuiltinOverlay::Thorough),
"builtin:rushed" => OverlaySource::Builtin(BuiltinOverlay::Rushed),
"builtin:retail" => OverlaySource::Builtin(BuiltinOverlay::IndustryRetail),
"builtin:manufacturing" => OverlaySource::Builtin(BuiltinOverlay::IndustryManufacturing),
"builtin:financial_services" => {
OverlaySource::Builtin(BuiltinOverlay::IndustryFinancialServices)
}
path => OverlaySource::Custom(PathBuf::from(path)),
};
load_overlay(&source).map_err(|e| anyhow::anyhow!("{e}"))
}
fn handle_audit_validate(blueprint_str: &str) -> Result<()> {
let bwp = resolve_blueprint(blueprint_str)?;
let bp = &bwp.blueprint;
match bwp.validate() {
Ok(()) => {
let total_procedures: usize = bp.phases.iter().map(|p| p.procedures.len()).sum();
let total_steps: usize = bp
.phases
.iter()
.flat_map(|p| &p.procedures)
.map(|proc| proc.steps.len())
.sum();
println!("Blueprint is valid.");
println!();
println!(" Framework: {}", bp.methodology.framework);
println!(" Phases: {}", bp.phases.len());
println!(" Procedures: {}", total_procedures);
println!(" Steps: {}", total_steps);
println!(" Standards: {}", bp.standards.len());
println!(" Actors: {}", bp.actors.len());
Ok(())
}
Err(datasynth_audit_fsm::error::AuditFsmError::BlueprintValidation { violations }) => {
eprintln!(
"Blueprint validation FAILED ({} violation(s)):",
violations.len()
);
for v in &violations {
eprintln!(" - {v}");
}
std::process::exit(1);
}
Err(e) => Err(anyhow::anyhow!("{e}")),
}
}
fn handle_audit_info(blueprint_str: &str) -> Result<()> {
let bwp = resolve_blueprint(blueprint_str)?;
let bp = &bwp.blueprint;
let total_procedures: usize = bp.phases.iter().map(|p| p.procedures.len()).sum();
let total_steps: usize = bp
.phases
.iter()
.flat_map(|p| &p.procedures)
.map(|proc| proc.steps.len())
.sum();
let evidence_ids: std::collections::HashSet<&str> = bp
.evidence_templates
.iter()
.map(|e| e.id.as_str())
.collect();
println!("Audit Blueprint Information");
println!("===========================");
println!();
println!(" Name: {}", bp.name);
println!(" Version: {}", bp.version);
println!(" Framework: {}", bp.methodology.framework);
println!(" Phases: {}", bp.phases.len());
println!(" Procedures: {}", total_procedures);
println!(" Steps: {}", total_steps);
println!(" Standards: {}", bp.standards.len());
println!(" Actors: {}", bp.actors.len());
println!(" Evidence: {} template(s)", evidence_ids.len());
println!();
let continuous: Vec<_> = bp
.phases
.iter()
.filter(|p| p.order.is_some_and(|o| o < 0))
.collect();
let sequential: Vec<_> = bp
.phases
.iter()
.filter(|p| p.order.is_none_or(|o| o >= 0))
.collect();
if !continuous.is_empty() {
println!(" Continuous phases ({}):", continuous.len());
for phase in &continuous {
let proc_count = phase.procedures.len();
let step_count: usize = phase.procedures.iter().map(|p| p.steps.len()).sum();
println!(
" - {} ({} procedure(s), {} step(s))",
phase.name, proc_count, step_count
);
}
println!();
}
println!(" Sequential phases ({}):", sequential.len());
for phase in &sequential {
let proc_count = phase.procedures.len();
let step_count: usize = phase.procedures.iter().map(|p| p.steps.len()).sum();
println!(
" - {} ({} procedure(s), {} step(s))",
phase.name, proc_count, step_count
);
}
println!();
if !bp.actors.is_empty() {
println!(" Actors:");
for actor in &bp.actors {
println!(" - {} ({})", actor.label, actor.id);
}
}
Ok(())
}
fn handle_audit_run(
blueprint_str: &str,
overlay_str: &str,
output: &std::path::Path,
seed: u64,
) -> Result<()> {
use datasynth_audit_fsm::context::EngagementContext;
use datasynth_audit_fsm::engine::AuditFsmEngine;
use datasynth_audit_fsm::export::flat_log::export_events_to_file;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
let bwp = resolve_blueprint(blueprint_str)?;
let overlay = resolve_overlay(overlay_str)?;
bwp.validate().map_err(|e| anyhow::anyhow!("{e}"))?;
let rng = ChaCha8Rng::seed_from_u64(seed);
let mut engine = AuditFsmEngine::new(bwp, overlay, rng);
let ctx = EngagementContext::demo();
let start = std::time::Instant::now();
let result = engine
.run_engagement(&ctx)
.map_err(|e| anyhow::anyhow!("{e}"))?;
let elapsed = start.elapsed();
std::fs::create_dir_all(output)?;
let trail_path = output.join("audit_event_trail.json");
export_events_to_file(&result.event_log, &trail_path)
.map_err(|e| anyhow::anyhow!("Failed to write event trail: {e}"))?;
println!("Audit FSM engagement complete.");
println!();
println!(" Events: {}", result.event_log.len());
println!(" Artifacts: {}", result.artifacts.total_artifacts());
println!(" Phases: {}", result.phases_completed.len());
println!(" Anomalies: {}", result.anomalies.len());
println!(
" Duration: {:.1} simulated hours",
result.total_duration_hours
);
println!(" Wall clock: {:.2}s", elapsed.as_secs_f64());
println!();
println!(" Event trail: {}", trail_path.display());
Ok(())
}
fn handle_audit_benchmark(
complexity_str: &str,
anomaly_rate: Option<f64>,
output: &std::path::Path,
seed: u64,
) -> Result<()> {
use datasynth_audit_fsm::benchmark::{
export_benchmark, generate_benchmark, BenchmarkComplexity, BenchmarkConfig,
};
let complexity = match complexity_str.to_lowercase().as_str() {
"simple" => BenchmarkComplexity::Simple,
"medium" => BenchmarkComplexity::Medium,
"complex" => BenchmarkComplexity::Complex,
other => {
anyhow::bail!(
"Unknown complexity '{}'. Use: simple, medium, complex",
other
);
}
};
let config = BenchmarkConfig {
complexity,
anomaly_rate,
seed,
};
let start = std::time::Instant::now();
let dataset = generate_benchmark(&config).map_err(|e| anyhow::anyhow!("{e}"))?;
let elapsed = start.elapsed();
export_benchmark(&dataset, output)
.map_err(|e| anyhow::anyhow!("Failed to export benchmark: {e}"))?;
println!("Benchmark dataset generated.");
println!();
println!(" Complexity: {}", dataset.metadata.complexity);
println!(" Blueprint: {}", dataset.metadata.blueprint);
println!(" Overlay: {}", dataset.metadata.overlay);
println!(" Events: {}", dataset.metadata.event_count);
println!(" Anomalies: {}", dataset.metadata.anomaly_count);
println!(" Anomaly rate: {:.4}", dataset.metadata.anomaly_rate);
println!(" Procedures: {}", dataset.metadata.procedure_count);
println!(" Artifacts: {}", dataset.metadata.artifact_count);
println!(" Seed: {}", dataset.metadata.seed);
println!(" Wall clock: {:.2}s", elapsed.as_secs_f64());
println!();
println!(" Output: {}", output.display());
println!(" - event_trail.json");
println!(" - event_trail.csv");
println!(" - event_trail_ocel.json");
println!(" - anomaly_labels.json");
println!(" - metadata.json");
Ok(())
}
fn handle_audit_diff(blueprint_a_str: &str, blueprint_b_str: &str) -> Result<()> {
let bwp_a = resolve_blueprint(blueprint_a_str)?;
let bwp_b = resolve_blueprint(blueprint_b_str)?;
use datasynth_audit_fsm::context::EngagementContext;
use datasynth_audit_fsm::engine::AuditFsmEngine;
use datasynth_audit_fsm::loader::default_overlay;
use datasynth_audit_optimizer::discovery::{compare_blueprints, discover_blueprint};
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
let overlay = default_overlay();
let ctx = EngagementContext::demo();
let rng_a = ChaCha8Rng::seed_from_u64(42);
let mut engine_a = AuditFsmEngine::new(bwp_a, overlay.clone(), rng_a);
let result_a = engine_a
.run_engagement(&ctx)
.map_err(|e| anyhow::anyhow!("{e}"))?;
let discovered_a = discover_blueprint(&result_a.event_log);
let diff = compare_blueprints(&discovered_a, &bwp_b.blueprint);
println!("Blueprint Diff: {} vs {}", blueprint_a_str, blueprint_b_str);
println!("============================================");
println!();
println!("Conformance score: {:.2}%", diff.conformance_score * 100.0);
println!();
if !diff.matching_procedures.is_empty() {
println!("Matching procedures ({}):", diff.matching_procedures.len());
for p in &diff.matching_procedures {
println!(" + {}", p);
}
println!();
}
if !diff.missing_procedures.is_empty() {
println!(
"Missing from A (in B only) ({}):",
diff.missing_procedures.len()
);
for p in &diff.missing_procedures {
println!(" - {}", p);
}
println!();
}
if !diff.extra_procedures.is_empty() {
println!("Extra in A (not in B) ({}):", diff.extra_procedures.len());
for p in &diff.extra_procedures {
println!(" ~ {}", p);
}
println!();
}
if !diff.transition_diffs.is_empty() {
println!("Transition differences ({}):", diff.transition_diffs.len());
for td in &diff.transition_diffs {
let marker = if td.diff_type == "missing" { "-" } else { "+" };
println!(
" {} [{}] {} -> {}",
marker, td.procedure_id, td.from_state, td.to_state
);
}
}
Ok(())
}
fn handle_scenario_command(command: ScenarioCommands) -> Result<()> {
use datasynth_eval::diff_engine::{DiffConfig, DiffEngine, DiffFormat};
use datasynth_runtime::scenario_engine::ScenarioEngine;
match command {
ScenarioCommands::List { config } => {
let config_str = std::fs::read_to_string(&config)?;
let gen_config: GeneratorConfig = serde_yaml::from_str(&config_str)?;
if !gen_config.scenarios.enabled {
println!("Scenarios are disabled in this config.");
return Ok(());
}
let engine = ScenarioEngine::new(gen_config)?;
let scenarios = engine.list_scenarios();
if scenarios.is_empty() {
println!("No scenarios defined.");
return Ok(());
}
println!("Scenarios ({}):", scenarios.len());
println!("{:-<60}", "");
for s in &scenarios {
println!(" Name: {}", s.name);
println!(" Description: {}", s.description);
if !s.tags.is_empty() {
println!(" Tags: {}", s.tags.join(", "));
}
println!(" Interventions: {}", s.intervention_count);
if let Some(w) = s.probability_weight {
println!(" Probability Weight: {w:.2}");
}
println!("{:-<60}", "");
}
Ok(())
}
ScenarioCommands::Validate { config, scenario } => {
let config_str = std::fs::read_to_string(&config)?;
let gen_config: GeneratorConfig = serde_yaml::from_str(&config_str)?;
if !gen_config.scenarios.enabled {
anyhow::bail!("Scenarios are disabled in this config.");
}
let engine = ScenarioEngine::new(gen_config)?;
let results = engine.validate_all();
let filtered: Vec<_> = if let Some(ref name) = scenario {
results.into_iter().filter(|r| r.name == *name).collect()
} else {
results
};
if filtered.is_empty() {
if let Some(name) = scenario {
anyhow::bail!("Scenario '{name}' not found.");
}
println!("No scenarios to validate.");
return Ok(());
}
let mut all_valid = true;
for r in &filtered {
if r.valid {
println!(" [PASS] {}", r.name);
} else {
println!(
" [FAIL] {}: {}",
r.name,
r.error.as_deref().unwrap_or("unknown")
);
all_valid = false;
}
}
if all_valid {
println!("\nAll {} scenario(s) valid.", filtered.len());
Ok(())
} else {
anyhow::bail!("Some scenarios failed validation.")
}
}
ScenarioCommands::Generate {
config,
output,
scenario: _scenario_filter,
} => {
let config_str = std::fs::read_to_string(&config)?;
let gen_config: GeneratorConfig = serde_yaml::from_str(&config_str)?;
if !gen_config.scenarios.enabled {
anyhow::bail!("Scenarios are disabled in this config.");
}
let engine = ScenarioEngine::new(gen_config)?;
let results = engine.generate_all(&output)?;
println!("Generated {} scenario(s):", results.len());
for r in &results {
println!(
" {} — {} interventions, {} months affected",
r.scenario_name, r.interventions_applied, r.months_affected
);
println!(" Baseline: {}", r.baseline_path.display());
println!(" Counterfactual: {}", r.counterfactual_path.display());
}
Ok(())
}
ScenarioCommands::Diff {
baseline,
counterfactual,
format,
output,
} => {
let formats = match format.as_str() {
"summary" => vec![DiffFormat::Summary],
"record_level" => vec![DiffFormat::RecordLevel],
"aggregate" => vec![DiffFormat::Aggregate],
"all" => vec![
DiffFormat::Summary,
DiffFormat::RecordLevel,
DiffFormat::Aggregate,
],
other => anyhow::bail!(
"Unknown diff format: '{other}'. Use: summary, record_level, aggregate, all"
),
};
let diff_config = DiffConfig {
formats,
..Default::default()
};
let diff = DiffEngine::compute(&baseline, &counterfactual, &diff_config)?;
let json = serde_json::to_string_pretty(&diff)?;
if let Some(out_path) = output {
std::fs::write(&out_path, &json)?;
println!("Diff written to {}", out_path.display());
} else {
println!("{json}");
}
Ok(())
}
ScenarioCommands::Export {
config,
scenario,
output,
} => {
let config_str = std::fs::read_to_string(&config)?;
let gen_config: GeneratorConfig = serde_yaml::from_str(&config_str)?;
let found = gen_config
.scenarios
.scenarios
.iter()
.find(|s| s.name == scenario);
match found {
Some(s) => {
let yaml = serde_yaml::to_string(s)?;
let dss = format!(
"# DataSynth Scenario (.dss)\n\
# format_version: 1.0\n\
# exported_from: {}\n\
# datasynth_version: {}\n\n\
{yaml}",
config.display(),
env!("CARGO_PKG_VERSION"),
);
std::fs::write(&output, dss)?;
println!("Scenario '{}' exported to {}", scenario, output.display());
Ok(())
}
None => {
anyhow::bail!(
"Scenario '{}' not found. Available: {}",
scenario,
gen_config
.scenarios
.scenarios
.iter()
.map(|s| s.name.as_str())
.collect::<Vec<_>>()
.join(", ")
);
}
}
}
ScenarioCommands::Import { file, config } => {
let dss_content = std::fs::read_to_string(&file)?;
let yaml_content: String = dss_content
.lines()
.filter(|line| !line.starts_with('#'))
.collect::<Vec<_>>()
.join("\n");
let imported: datasynth_config::ScenarioSchemaConfig =
serde_yaml::from_str(&yaml_content)?;
if !config.exists() {
anyhow::bail!(
"Config file {} does not exist. Create one first with: datasynth-data init",
config.display()
);
}
let existing = std::fs::read_to_string(&config)?;
let mut gen_config: GeneratorConfig = serde_yaml::from_str(&existing)?;
if gen_config
.scenarios
.scenarios
.iter()
.any(|s| s.name == imported.name)
{
anyhow::bail!("Scenario '{}' already exists in config", imported.name);
}
gen_config.scenarios.enabled = true;
let name = imported.name.clone();
gen_config.scenarios.scenarios.push(imported);
let yaml = serde_yaml::to_string(&gen_config)?;
std::fs::write(&config, yaml)?;
println!("Scenario '{}' imported into {}", name, config.display());
Ok(())
}
}
}