use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};
use chrono::NaiveDate;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use datasynth_core::models::balance::TrialBalance;
use datasynth_core::models::business_combination::BusinessCombination;
use datasynth_core::models::JournalEntry;
use datasynth_standards::framework::AccountingFramework;
use crate::aggregate::coverage_report::{build_coverage_report, write_coverage_report};
use crate::aggregate::elimination::{eliminations_to_journal_entries, generate_eliminations};
use crate::aggregate::equity_method::{
compute_equity_method_investment, ingest_opening_equity_method_carrying_values,
ingest_opening_suppressed_losses, write_equity_method_investments, write_suppressed_losses,
EquityMethodInputs, EquityMethodInvestment,
};
use crate::aggregate::fs::{
build_consolidated_balance_sheet_with_names, build_consolidated_cash_flow,
build_consolidated_income_statement_with_names, build_consolidation_schedule,
build_notes_to_consolidated_fs, build_statement_of_changes_in_equity, write_consolidated_fs,
AccountNameDictionary, CashFlowInputs, ConsolidatedFinancialStatements, EquityChangesInputs,
NotesInputs,
};
use crate::aggregate::ic_matcher::match_ic_pairs;
use crate::aggregate::nci::{
compute_nci_rollforward, ingest_opening_nci_balances, write_nci_rollforward, NciInputs,
NciRollforward,
};
use crate::aggregate::post_elim::{apply_eliminations_to_tb, apply_nci_and_equity_method};
use crate::aggregate::pre_elim::aggregate_pre_elimination;
use crate::aggregate::tb_loader::load_entity_trial_balance;
use crate::aggregate::translation::cta::{
cta_rollforward, write_cta_rollforward, CtaRollforward, CONSOLIDATED_SUBDIR,
CTA_ROLLFORWARD_FILENAME,
};
use crate::aggregate::translation::restatement::{select_restatement_path, RestatementPath};
use crate::aggregate::translation::translate::{
translate_entity_tb_with_hyperinflation, translate_entity_tb_with_indexed_restatement, DrCr,
TranslatedTb,
};
use crate::aggregate::translation::worksheet::write_translation_worksheet;
use crate::config::ConsolidationMethod;
use crate::errors::{GroupError, GroupResult};
use crate::manifest::builder::{GroupManifest, ManifestEntity};
#[derive(Debug, Clone, Default)]
pub struct AggregateOptions {
pub prior_period_aggregate: Option<PathBuf>,
pub tolerate_missing_shards: bool,
pub cgu_test_inputs: Vec<crate::aggregate::cgu_impairment::CguTestInputs>,
pub cpi_series_by_currency:
BTreeMap<String, datasynth_core::models::hyperinflation::GeneralPriceIndex>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AggregateSummary {
pub group_id: String,
pub presentation_currency: String,
pub as_of_date: NaiveDate,
pub entities_processed: Vec<String>,
pub entities_missing: Vec<String>,
pub deferred_entities: Vec<String>,
pub matched_pairs: usize,
pub coverage: f64,
pub total_assets: Decimal,
pub total_liabilities: Decimal,
pub total_equity: Decimal,
pub total_nci: Decimal,
pub artifacts_written: Vec<PathBuf>,
}
pub fn run_aggregate(
manifest: &GroupManifest,
shards_dir: &Path,
out_dir: &Path,
opts: &AggregateOptions,
) -> GroupResult<AggregateSummary> {
let framework = resolve_primary_framework(manifest);
let WalkOutcome {
contributing_tbs,
contributing_jes,
deferred_tbs,
entities_missing,
} = walk_entity_archives(manifest, shards_dir, opts.tolerate_missing_shards)?;
let entities_processed: Vec<String> = contributing_tbs
.iter()
.map(|(c, _)| c.clone())
.chain(deferred_tbs.iter().map(|(c, _)| c.clone()))
.collect();
let mut entities_processed_sorted = entities_processed.clone();
entities_processed_sorted.sort();
entities_processed_sorted.dedup();
let pre_elim = aggregate_pre_elimination(manifest, &contributing_tbs)?;
let match_result = match_ic_pairs(manifest, &contributing_jes)?;
let coverage_report = build_coverage_report(&match_result);
let coverage_path = write_coverage_report(&coverage_report, out_dir)?;
let elim_result = generate_eliminations(&match_result.matched, manifest)?;
let elim_jes = eliminations_to_journal_entries(&elim_result);
let post_elim = apply_eliminations_to_tb(&pre_elim, &elim_jes)?;
let translated_tbs = translate_all_contributing(
&contributing_tbs,
manifest,
framework,
&entity_lookup(manifest),
&opts.cpi_series_by_currency,
)?;
let cta_rolls = build_cta_rollforwards(
&translated_tbs,
&manifest.presentation_currency,
opts.prior_period_aggregate.as_deref(),
)?;
let cta_path = write_cta_rollforward(&cta_rolls, out_dir)?;
let worksheet_path = write_translation_worksheet(&translated_tbs, out_dir)?;
let acquisition_fv_map = ingest_acquisition_date_nci_fair_values(manifest, shards_dir);
let ownership_changes_map = ingest_ownership_change_events(manifest, shards_dir);
let nci_rolls = build_nci_rollforwards(
manifest,
&translated_tbs,
opts.prior_period_aggregate.as_deref(),
&acquisition_fv_map,
&ownership_changes_map,
)?;
let nci_path = write_nci_rollforward(&nci_rolls, out_dir)?;
let eq_method_invs = build_equity_method_investments(
manifest,
&deferred_tbs,
framework,
opts.prior_period_aggregate.as_deref(),
)?;
let eq_method_path = write_equity_method_investments(&eq_method_invs, out_dir)?;
let _ = write_suppressed_losses(&eq_method_invs, out_dir)?;
let post_overlay = apply_nci_and_equity_method(&post_elim, &nci_rolls, &eq_method_invs)?;
let account_names = AccountNameDictionary::from_coa_master(
&manifest.chart_of_accounts_master,
&manifest.chart_of_accounts_master.primary_framework,
);
let bs = build_consolidated_balance_sheet_with_names(
&post_overlay,
&manifest.group_id,
manifest.period.end,
&account_names,
)?;
let is = build_consolidated_income_statement_with_names(
&post_overlay,
&nci_rolls,
&manifest.group_id,
manifest.period.end,
&account_names,
)?;
let cf_inputs = CashFlowInputs {
post_elim_tb_current: &post_overlay,
post_elim_tb_prior: None,
net_income: is.net_income,
depreciation_amortization: Decimal::ZERO,
impairment: Decimal::ZERO,
capex: Decimal::ZERO,
debt_issuance: Decimal::ZERO,
debt_repayment: Decimal::ZERO,
dividends_paid_to_owners: Decimal::ZERO,
dividends_paid_to_nci: Decimal::ZERO,
equity_issuance: Decimal::ZERO,
};
let cf = build_consolidated_cash_flow(
&cf_inputs,
&manifest.group_id,
manifest.period.start,
manifest.period.end,
)?;
let eq_changes_inputs = EquityChangesInputs {
opening_owners_equity: Decimal::ZERO,
opening_nci: nci_rolls
.iter()
.map(|rf| rf.opening_nci)
.fold(Decimal::ZERO, |acc, v| acc + v),
net_income_to_owners: is.net_income_to_owners,
net_income_to_nci: is.net_income_to_nci,
oci_to_owners: cta_rolls
.iter()
.map(|rf| rf.period_cta)
.fold(Decimal::ZERO, |acc, v| acc + v),
oci_to_nci: nci_rolls
.iter()
.map(|rf| rf.nci_share_of_oci)
.fold(Decimal::ZERO, |acc, v| acc + v),
dividends_to_owners: Decimal::ZERO,
dividends_to_nci: nci_rolls
.iter()
.map(|rf| rf.nci_dividends)
.fold(Decimal::ZERO, |acc, v| acc + v),
other_owners: Decimal::ZERO,
other_nci: Decimal::ZERO,
};
let changes_in_equity = build_statement_of_changes_in_equity(
&eq_changes_inputs,
&manifest.group_id,
manifest.period.start,
manifest.period.end,
&manifest.presentation_currency,
);
let fs_bundle = ConsolidatedFinancialStatements {
balance_sheet: bs,
income_statement: is,
cash_flow: cf,
changes_in_equity,
};
let schedule = build_consolidation_schedule(
&pre_elim,
&post_overlay,
&contributing_tbs,
&manifest.group_id,
manifest.period.end,
)?;
let notes_inputs = NotesInputs {
manifest,
framework,
ic_coverage: &coverage_report,
nci_rollforwards: &nci_rolls,
cta_rollforwards: &cta_rolls,
equity_method_investments: &eq_method_invs,
};
let notes = build_notes_to_consolidated_fs(¬es_inputs, manifest.period.end);
let fs_paths = write_consolidated_fs(&fs_bundle, &schedule, ¬es, out_dir)?;
let cgu_results = crate::aggregate::cgu_impairment::run_cgu_impairment_tests(
&manifest.cgu_plan,
&opts.cgu_test_inputs,
manifest.period.end,
&manifest.presentation_currency,
)?;
let cgu_path =
crate::aggregate::cgu_impairment::write_cgu_impairment_tests(out_dir, &cgu_results)?;
let mut artifacts_written: Vec<PathBuf> = Vec::with_capacity(9);
artifacts_written.push(coverage_path);
artifacts_written.push(cta_path);
artifacts_written.push(worksheet_path);
artifacts_written.push(nci_path);
artifacts_written.push(eq_method_path);
artifacts_written.extend(fs_paths);
if let Some(p) = cgu_path {
artifacts_written.push(p);
}
let mut deferred_codes: Vec<String> = deferred_tbs.iter().map(|(c, _)| c.clone()).collect();
deferred_codes.sort();
deferred_codes.dedup();
Ok(AggregateSummary {
group_id: manifest.group_id.clone(),
presentation_currency: manifest.presentation_currency.clone(),
as_of_date: manifest.period.end,
entities_processed: entities_processed_sorted,
entities_missing,
deferred_entities: deferred_codes,
matched_pairs: match_result.matched.len(),
coverage: match_result.coverage,
total_assets: fs_bundle.balance_sheet.total_assets,
total_liabilities: fs_bundle.balance_sheet.total_liabilities,
total_equity: fs_bundle.balance_sheet.total_equity,
total_nci: fs_bundle.balance_sheet.total_nci,
artifacts_written,
})
}
struct WalkOutcome {
contributing_tbs: Vec<(String, TrialBalance)>,
contributing_jes: Vec<(String, Vec<JournalEntry>)>,
deferred_tbs: Vec<(String, TrialBalance)>,
entities_missing: Vec<String>,
}
fn walk_entity_archives(
manifest: &GroupManifest,
shards_dir: &Path,
tolerate_missing_shards: bool,
) -> GroupResult<WalkOutcome> {
let mut contributing_tbs: Vec<(String, TrialBalance)> = Vec::new();
let mut contributing_jes: Vec<(String, Vec<JournalEntry>)> = Vec::new();
let mut deferred_tbs: Vec<(String, TrialBalance)> = Vec::new();
let mut entities_missing: Vec<String> = Vec::new();
for entity in &manifest.ownership_graph.entities {
let entity_dir = shards_dir.join("entities").join(&entity.code);
let tb_path = entity_dir.join("period_close").join("trial_balances.json");
if !tb_path.exists() {
if tolerate_missing_shards {
tracing::warn!(
entity = %entity.code,
path = %tb_path.display(),
"missing shard archive — continuing in tolerate_missing_shards mode",
);
entities_missing.push(entity.code.clone());
continue;
}
return Err(GroupError::Aggregate(format!(
"run_aggregate: missing shard archive for `{}` at `{}`",
entity.code,
tb_path.display()
)));
}
let tb = load_entity_trial_balance(&entity_dir)?;
let jes = load_entity_journal_entries(&entity_dir, &entity.code)?;
match entity.consolidation_method {
ConsolidationMethod::Parent | ConsolidationMethod::Full => {
contributing_tbs.push((entity.code.clone(), tb));
contributing_jes.push((entity.code.clone(), jes));
}
ConsolidationMethod::EquityMethod
| ConsolidationMethod::Proportional
| ConsolidationMethod::FairValue => {
deferred_tbs.push((entity.code.clone(), tb));
}
}
}
entities_missing.sort();
Ok(WalkOutcome {
contributing_tbs,
contributing_jes,
deferred_tbs,
entities_missing,
})
}
fn load_entity_journal_entries(
entity_dir: &Path,
entity_code: &str,
) -> GroupResult<Vec<JournalEntry>> {
let path = entity_dir.join("journal_entries.json");
if !path.exists() {
tracing::warn!(
entity = %entity_code,
path = %path.display(),
"no journal_entries.json found — treating as empty",
);
return Ok(Vec::new());
}
let bytes = std::fs::read(&path).map_err(GroupError::Io)?;
let jes: Vec<JournalEntry> = serde_json::from_slice(&bytes)?;
Ok(jes)
}
fn translate_all_contributing(
contributing_tbs: &[(String, TrialBalance)],
manifest: &GroupManifest,
framework: AccountingFramework,
entity_lookup: &BTreeMap<String, ManifestEntity>,
cpi_series_by_currency: &BTreeMap<
String,
datasynth_core::models::hyperinflation::GeneralPriceIndex,
>,
) -> GroupResult<Vec<TranslatedTb>> {
let mut out: Vec<TranslatedTb> = Vec::with_capacity(contributing_tbs.len());
let cpi_opt = if cpi_series_by_currency.is_empty() {
None
} else {
Some(cpi_series_by_currency)
};
for (code, tb) in contributing_tbs {
let entity = entity_lookup.get(code).ok_or_else(|| {
GroupError::Aggregate(format!(
"run_aggregate: entity `{code}` not in manifest's ownership graph",
))
})?;
let path = select_restatement_path(
entity.hyperinflation_status,
entity.functional_currency.as_str(),
cpi_opt,
manifest.period.start,
manifest.period.end,
);
let translated = match &path {
RestatementPath::Indexed(ir) => translate_entity_tb_with_indexed_restatement(
tb,
entity.functional_currency.as_str(),
&manifest.fx_rate_master,
manifest.period.end,
&manifest.presentation_currency,
framework,
entity.hyperinflation_status,
Some(ir),
)?,
RestatementPath::Standard | RestatementPath::ClosingRate => {
if matches!(path, RestatementPath::ClosingRate) && cpi_opt.is_some() {
tracing::warn!(
entity = %code,
functional_currency = %entity.functional_currency,
period_start = %manifest.period.start,
period_end = %manifest.period.end,
"hyperinflationary entity has no matching CPI series — falling back to IAS 21 § 42(b) closing-rate translation only (no IAS 29 § 12 indexed restatement)",
);
}
translate_entity_tb_with_hyperinflation(
tb,
entity.functional_currency.as_str(),
&manifest.fx_rate_master,
manifest.period.end,
&manifest.presentation_currency,
framework,
entity.hyperinflation_status,
)?
}
};
out.push(translated);
}
Ok(out)
}
fn build_cta_rollforwards(
translated_tbs: &[TranslatedTb],
presentation_currency: &str,
prior_period_aggregate: Option<&Path>,
) -> GroupResult<Vec<CtaRollforward>> {
let opening_map = ingest_opening_cta_balances(prior_period_aggregate)?;
let mut rolls: Vec<CtaRollforward> = Vec::new();
for t in translated_tbs {
if t.functional_currency == presentation_currency {
continue;
}
let opening = opening_map
.get(&t.entity_code)
.copied()
.unwrap_or(Decimal::ZERO);
rolls.push(cta_rollforward(
&t.entity_code,
&t.functional_currency,
&t.presentation_currency,
opening,
t.cta,
));
}
Ok(rolls)
}
fn ingest_opening_cta_balances(
prior_period_aggregate: Option<&Path>,
) -> GroupResult<BTreeMap<String, Decimal>> {
let Some(prior) = prior_period_aggregate else {
return Ok(BTreeMap::new());
};
let path = prior
.join(CONSOLIDATED_SUBDIR)
.join(CTA_ROLLFORWARD_FILENAME);
if !path.exists() {
tracing::warn!(
path = %path.display(),
"opening CTA file not found; defaulting to zero opening balance per entity",
);
return Ok(BTreeMap::new());
}
let bytes = std::fs::read(&path).map_err(GroupError::Io)?;
let rolls: Vec<CtaRollforward> = serde_json::from_slice(&bytes)?;
let mut map: BTreeMap<String, Decimal> = BTreeMap::new();
for rf in rolls {
if map.contains_key(&rf.entity_code) {
return Err(GroupError::Aggregate(format!(
"ingest_opening_cta_balances: duplicate entity `{}` in opening CTA file {}",
rf.entity_code,
path.display(),
)));
}
map.insert(rf.entity_code, rf.closing_cta);
}
Ok(map)
}
fn ingest_acquisition_date_nci_fair_values(
manifest: &GroupManifest,
shards_dir: &Path,
) -> BTreeMap<String, Decimal> {
let mut map: BTreeMap<String, Decimal> = BTreeMap::new();
for entity in &manifest.ownership_graph.entities {
let path = shards_dir
.join("entities")
.join(&entity.code)
.join("accounting_standards")
.join("business_combinations.json");
if !path.exists() {
continue;
}
let bytes = match fs::read(&path) {
Ok(b) => b,
Err(e) => {
tracing::debug!(
path = %path.display(),
error = %e,
"failed to read business_combinations.json — skipping",
);
continue;
}
};
let combinations: Vec<BusinessCombination> = match serde_json::from_slice(&bytes) {
Ok(v) => v,
Err(e) => {
tracing::debug!(
path = %path.display(),
error = %e,
"failed to parse business_combinations.json — skipping",
);
continue;
}
};
for bc in combinations {
if let (Some(acquiree), Some(fv)) = (
bc.acquiree_entity_code.as_deref(),
bc.acquisition_date_nci_fair_value,
) {
map.entry(acquiree.to_string()).or_insert(fv);
}
}
}
map
}
fn ingest_ownership_change_events(
manifest: &GroupManifest,
shards_dir: &Path,
) -> BTreeMap<String, Vec<datasynth_core::models::intercompany::OwnershipChangeEvent>> {
let mut map: BTreeMap<String, Vec<datasynth_core::models::intercompany::OwnershipChangeEvent>> =
BTreeMap::new();
for entity in &manifest.ownership_graph.entities {
let path = shards_dir
.join("entities")
.join(&entity.code)
.join("intercompany")
.join("ownership_change_events.json");
if !path.exists() {
continue;
}
let bytes = match fs::read(&path) {
Ok(b) => b,
Err(e) => {
tracing::debug!(
path = %path.display(),
error = %e,
"failed to read ownership_change_events.json — skipping",
);
continue;
}
};
let events: Vec<datasynth_core::models::intercompany::OwnershipChangeEvent> =
match serde_json::from_slice(&bytes) {
Ok(v) => v,
Err(e) => {
tracing::debug!(
path = %path.display(),
error = %e,
"failed to parse ownership_change_events.json — skipping",
);
continue;
}
};
if !events.is_empty() {
map.insert(entity.code.clone(), events);
}
}
map
}
fn build_nci_rollforwards(
manifest: &GroupManifest,
translated_tbs: &[TranslatedTb],
prior_period_aggregate: Option<&Path>,
acquisition_fv_map: &BTreeMap<String, Decimal>,
ownership_changes_map: &BTreeMap<
String,
Vec<datasynth_core::models::intercompany::OwnershipChangeEvent>,
>,
) -> GroupResult<Vec<NciRollforward>> {
let opening_map = match prior_period_aggregate {
Some(p) => ingest_opening_nci_balances(p)?,
None => BTreeMap::new(),
};
let translated_lookup: BTreeMap<&str, &TranslatedTb> = translated_tbs
.iter()
.map(|t| (t.entity_code.as_str(), t))
.collect();
let mut rolls: Vec<NciRollforward> = Vec::new();
for entity in &manifest.ownership_graph.entities {
if entity.consolidation_method != ConsolidationMethod::Full {
continue;
}
let Some(ownership) = entity.ownership_percent else {
continue;
};
if ownership >= Decimal::ONE {
continue;
}
let translated = translated_lookup.get(entity.code.as_str()).ok_or_else(|| {
GroupError::Aggregate(format!(
"run_aggregate: NCI computation needs translated TB for `{}` but none was produced",
entity.code,
))
})?;
let inputs = NciInputs {
entity,
period_net_income: net_income_from_translated(translated),
period_oci: oci_from_translated(translated),
total_dividends_paid: Decimal::ZERO,
opening_nci: opening_map
.get(&entity.code)
.copied()
.unwrap_or(Decimal::ZERO),
acquisition_date_nci_fair_value: acquisition_fv_map.get(&entity.code).copied(),
ownership_changes: ownership_changes_map
.get(&entity.code)
.map(|v| v.as_slice())
.unwrap_or(&[]),
period_start: manifest.period.end,
period_end: manifest.period.end,
currency: manifest.presentation_currency.clone(),
};
rolls.push(compute_nci_rollforward(&inputs)?);
}
Ok(rolls)
}
fn build_equity_method_investments(
manifest: &GroupManifest,
deferred_tbs: &[(String, TrialBalance)],
framework: AccountingFramework,
prior_period_aggregate: Option<&Path>,
) -> GroupResult<Vec<EquityMethodInvestment>> {
let (opening_map, opening_suppressed_map) = match prior_period_aggregate {
Some(p) => (
ingest_opening_equity_method_carrying_values(p)?,
ingest_opening_suppressed_losses(p)?,
),
None => (BTreeMap::new(), BTreeMap::new()),
};
let deferred_lookup: BTreeMap<&str, &TrialBalance> = deferred_tbs
.iter()
.map(|(c, tb)| (c.as_str(), tb))
.collect();
let mut invs: Vec<EquityMethodInvestment> = Vec::new();
for entity in &manifest.ownership_graph.entities {
if entity.consolidation_method != ConsolidationMethod::EquityMethod {
continue;
}
let Some(parent_code) = &entity.parent_code else {
continue;
};
let (investee_net_income, investee_dividends_paid) =
match deferred_lookup.get(entity.code.as_str()) {
Some(tb) => (net_income_from_tb(tb, framework), dividends_from_tb(tb)),
None => (Decimal::ZERO, Decimal::ZERO),
};
let inputs = EquityMethodInputs {
investee: entity,
investor_entity_code: parent_code.clone(),
investee_net_income,
investee_dividends_paid,
opening_carrying_value: opening_map
.get(&entity.code)
.copied()
.unwrap_or(Decimal::ZERO),
opening_suppressed_loss: opening_suppressed_map
.get(&entity.code)
.copied()
.unwrap_or(Decimal::ZERO),
impairment: Decimal::ZERO,
period_end: manifest.period.end,
currency: manifest.presentation_currency.clone(),
};
invs.push(compute_equity_method_investment(&inputs)?);
}
Ok(invs)
}
fn resolve_primary_framework(manifest: &GroupManifest) -> AccountingFramework {
let label = manifest
.chart_of_accounts_master
.primary_framework
.to_lowercase();
match label.as_str() {
"ifrs" => AccountingFramework::Ifrs,
"us_gaap" | "usgaap" | "us-gaap" => AccountingFramework::UsGaap,
"dual_reporting" | "dual" => AccountingFramework::DualReporting,
"french_gaap" | "frenchgaap" | "pcg" => AccountingFramework::FrenchGaap,
"german_gaap" | "germangaap" | "hgb" => AccountingFramework::GermanGaap,
_ => AccountingFramework::default(),
}
}
fn entity_lookup(manifest: &GroupManifest) -> BTreeMap<String, ManifestEntity> {
manifest
.ownership_graph
.entities
.iter()
.map(|e| (e.code.clone(), e.clone()))
.collect()
}
fn net_income_from_translated(translated: &TranslatedTb) -> Decimal {
use crate::aggregate::translation::classify::TranslationAccountType as T;
let mut net = Decimal::ZERO;
for line in &translated.lines {
let signed = match line.local_dr_cr {
DrCr::Debit => line.translated_amount,
DrCr::Credit => -line.translated_amount,
};
match line.account_type {
T::PlRevenue => net -= signed,
T::PlExpense => net -= signed,
_ => {}
}
}
net
}
fn oci_from_translated(translated: &TranslatedTb) -> Decimal {
use crate::aggregate::translation::classify::TranslationAccountType as T;
let mut oci = Decimal::ZERO;
for line in &translated.lines {
if line.account_type == T::PlOci {
let signed = match line.local_dr_cr {
DrCr::Debit => line.translated_amount,
DrCr::Credit => -line.translated_amount,
};
oci -= signed;
}
}
oci
}
fn net_income_from_tb(tb: &TrialBalance, framework: AccountingFramework) -> Decimal {
use crate::aggregate::translation::classify::{classify_account, TranslationAccountType as T};
let mut net = Decimal::ZERO;
for line in &tb.lines {
let ty = classify_account(&line.account_code, framework);
match ty {
T::PlRevenue => net += line.credit_balance - line.debit_balance,
T::PlExpense => net -= line.debit_balance - line.credit_balance,
_ => {}
}
}
net
}
fn dividends_from_tb(tb: &TrialBalance) -> Decimal {
use datasynth_core::accounts::equity_accounts;
tb.lines
.iter()
.filter(|l| l.account_code == equity_accounts::DIVIDENDS_PAID)
.map(|l| l.debit_balance - l.credit_balance)
.fold(Decimal::ZERO, |acc, v| acc + v)
}
#[derive(Debug, Clone)]
pub struct PeriodSpec {
pub manifest: GroupManifest,
pub shards_dir: PathBuf,
pub out_dir: PathBuf,
pub options: AggregateOptions,
}
pub fn run_aggregate_chain(periods: Vec<PeriodSpec>) -> GroupResult<Vec<AggregateSummary>> {
let mut summaries: Vec<AggregateSummary> = Vec::with_capacity(periods.len());
let mut prior_out: Option<PathBuf> = None;
for (idx, mut spec) in periods.into_iter().enumerate() {
if idx > 0 {
spec.options.prior_period_aggregate = prior_out.clone();
}
let summary = run_aggregate(
&spec.manifest,
&spec.shards_dir,
&spec.out_dir,
&spec.options,
)?;
prior_out = Some(spec.out_dir.clone());
summaries.push(summary);
}
Ok(summaries)
}
#[cfg(test)]
mod tests {
use super::resolve_primary_framework;
use super::PeriodSpec;
use std::path::PathBuf;
#[test]
fn period_spec_construction_uses_owned_paths() {
let opts = super::AggregateOptions {
prior_period_aggregate: Some(PathBuf::from("/tmp/seed-from-engagement")),
tolerate_missing_shards: false,
cgu_test_inputs: Vec::new(),
cpi_series_by_currency: std::collections::BTreeMap::new(),
};
assert!(opts.prior_period_aggregate.is_some());
let _: fn() -> PeriodSpec;
}
use crate::aggregate::driver::AggregateOptions;
use datasynth_standards::framework::AccountingFramework;
#[test]
fn aggregate_options_default_is_fail_fast_no_prior() {
let opts = AggregateOptions::default();
assert!(!opts.tolerate_missing_shards, "default fails fast");
assert!(opts.prior_period_aggregate.is_none(), "no prior period");
}
#[test]
fn _resolve_primary_framework_is_pure_string_match() {
let _ = resolve_primary_framework;
let _ = AccountingFramework::default();
}
}