use std::fs;
use std::path::Path;
use serde::{Deserialize, Serialize};
use datasynth_config::{ExportLayout, FileFormat};
use datasynth_output::OutputRootConfig;
use datasynth_runtime::output_writer::write_all_output_with_root;
use datasynth_runtime::{EnhancedOrchestrator, PhaseConfig};
use crate::errors::{GroupError, GroupResult};
use crate::manifest::builder::{GroupManifest, ManifestEntity};
use crate::shard::context::build_shard_context;
use crate::shard::per_entity_config::build_entity_generator_config;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EntitySummary {
pub entity_code: String,
pub journal_entry_count: usize,
pub ic_journal_entry_count: usize,
pub output_subdir: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ShardSummary {
pub shard_id: String,
pub entity_summaries: Vec<EntitySummary>,
}
pub fn run_shard(
manifest: &GroupManifest,
shard_id: &str,
out_dir: &Path,
) -> GroupResult<ShardSummary> {
run_shard_with_opening_balances(
manifest,
shard_id,
out_dir,
&std::collections::BTreeMap::new(),
)
}
pub fn run_shard_with_opening_balances(
manifest: &GroupManifest,
shard_id: &str,
out_dir: &Path,
entity_opening_balances: &std::collections::BTreeMap<
String,
Vec<datasynth_core::models::balance::EntityOpeningBalance>,
>,
) -> GroupResult<ShardSummary> {
let entities: Vec<&ManifestEntity> = manifest
.ownership_graph
.entities
.iter()
.filter(|e| e.shard_id == shard_id)
.collect();
if entities.is_empty() {
return Err(GroupError::Shard(format!(
"run_shard: no entities matched shard_id `{shard_id}` — \
check it against `manifest.shard_plan.shards[*].shard_id`"
)));
}
fs::create_dir_all(out_dir).map_err(|e| {
GroupError::Shard(format!(
"run_shard: failed to create out_dir `{}`: {e}",
out_dir.display()
))
})?;
let mut entity_summaries: Vec<EntitySummary> = Vec::with_capacity(entities.len());
for entity in &entities {
let openings = entity_opening_balances
.get(&entity.code)
.cloned()
.unwrap_or_default();
let summary = run_one_entity(manifest, entity, out_dir, &openings)?;
entity_summaries.push(summary);
}
let summary = ShardSummary {
shard_id: shard_id.to_string(),
entity_summaries,
};
let summary_path = out_dir.join("shard_summary.json");
let summary_json = serde_json::to_string_pretty(&summary)?;
fs::write(&summary_path, &summary_json).map_err(|e| {
GroupError::Shard(format!(
"run_shard: failed to write shard_summary.json at `{}`: {e}",
summary_path.display()
))
})?;
Ok(summary)
}
fn run_one_entity(
manifest: &GroupManifest,
entity: &ManifestEntity,
out_dir: &Path,
opening_balances: &[datasynth_core::models::balance::EntityOpeningBalance],
) -> GroupResult<EntitySummary> {
let config = build_entity_generator_config(manifest, entity).map_err(|e| {
GroupError::Shard(format!(
"{}: build_entity_generator_config failed: {e}",
entity.code
))
})?;
let phase_config = PhaseConfig::from_config(&config);
let mut orchestrator = EnhancedOrchestrator::new(config, phase_config).map_err(|e| {
GroupError::Shard(format!(
"{}: EnhancedOrchestrator::new failed: {e}",
entity.code
))
})?;
let mut ctx = build_shard_context(manifest, &entity.code).map_err(|e| {
GroupError::Shard(format!("{}: build_shard_context failed: {e}", entity.code))
})?;
let ic_journal_entry_count = ctx.extra_journal_entries.len();
if !opening_balances.is_empty() {
ctx.opening_balances = opening_balances.to_vec();
}
orchestrator.set_shard_context(ctx);
let result = orchestrator
.generate()
.map_err(|e| GroupError::Shard(format!("{}: generate failed: {e}", entity.code)))?;
let journal_entry_count = result.journal_entries.len();
let root = OutputRootConfig::per_entity(out_dir, &entity.code);
write_all_output_with_root(&result, &root, ExportLayout::Nested, &[FileFormat::Json]).map_err(
|e| {
GroupError::Shard(format!(
"{}: write_all_output_with_root failed: {e}",
entity.code
))
},
)?;
if !entity.ownership_changes.is_empty() {
let oc_dir = out_dir
.join("entities")
.join(&entity.code)
.join("intercompany");
fs::create_dir_all(&oc_dir).map_err(|e| {
GroupError::Shard(format!(
"{}: cannot create intercompany dir `{}`: {e}",
entity.code,
oc_dir.display()
))
})?;
let path = oc_dir.join("ownership_change_events.json");
let json = serde_json::to_string_pretty(&entity.ownership_changes).map_err(|e| {
GroupError::Shard(format!(
"{}: failed to serialise ownership_change_events to JSON: {e}",
entity.code
))
})?;
fs::write(&path, json).map_err(|e| {
GroupError::Shard(format!(
"{}: cannot write `{}`: {e}",
entity.code,
path.display()
))
})?;
}
Ok(EntitySummary {
entity_code: entity.code.clone(),
journal_entry_count,
ic_journal_entry_count,
output_subdir: format!("entities/{}", entity.code),
})
}