datasynth_group/aggregate/driver.rs
1//! Aggregate phase driver — Task 9.1.
2//!
3//! Wires Chunks 5–8 into a single entrypoint: [`run_aggregate`] walks the
4//! per-entity shard archives produced by
5//! [`crate::shard::run_shard`], folds them through pre-elimination,
6//! IC matching, eliminations, IAS 21 translation, NCI / equity-method
7//! overlays, and consolidated FS assembly, and emits every
8//! aggregate-phase artefact under `{out_dir}/consolidated/` and
9//! `{out_dir}/ic_eliminations/`.
10//!
11//! # v5.0 ordering
12//!
13//! The driver runs the chunks **in this strict order** because each
14//! step depends on the prior step's outputs:
15//!
16//! 1. **Chunk 5** — Pre-elimination aggregation, IC matching,
17//! elimination JEs, post-elimination consolidated TB. The
18//! presentation currency check on the pre-elim aggregator means
19//! every contributing entity must already be denominated in the
20//! presentation currency at this point — the v5.0 fixture (Mini-
21//! Acme) is single-currency CHF so this trivially holds. Multi-
22//! currency engagements will need to translate first; that path is
23//! documented in the spec but not exercised in v5.0 driver-level
24//! tests.
25//! 2. **Chunk 6** — IAS 21 per-entity translation, CTA rollforward,
26//! translation worksheet emission. Runs *after* the post-elim TB
27//! so the consolidated TB and the translation worksheet are both
28//! grounded in the same set of contributing entities.
29//! 3. **Chunk 7** — NCI rollforward per Full-method subsidiary, and
30//! equity-method investment rollforward per EquityMethod investee.
31//! These overlays sit on top of the post-elim TB; the driver applies
32//! them via `apply_nci_and_equity_method` before assembling the FS.
33//! 4. **Chunk 8** — Consolidated FS assembly (BS / IS / CF / Changes
34//! in Equity), consolidation schedule, notes, and JSON writer.
35//!
36//! # Missing-shard handling
37//!
38//! [`AggregateOptions::tolerate_missing_shards`] toggles between fail-
39//! fast and best-effort recovery semantics:
40//!
41//! - `false` (default) — any missing entity directory or
42//! `period_close/trial_balances.json` produces a
43//! [`crate::errors::GroupError::Aggregate`] error naming the entity
44//! and the path the driver expected. This is the production
45//! contract: a complete shard archive is required for a complete
46//! consolidation.
47//! - `true` — missing entities are pushed to
48//! [`AggregateSummary::entities_missing`] and a `tracing::warn!` is
49//! logged. The driver then continues with the remaining entities,
50//! which is useful for partial-archive recovery scenarios.
51//!
52//! Equity-method / fair-value / proportional entities are still
53//! **expected** to have shards on disk (the runner generated them) but
54//! their TBs are captured separately via
55//! [`AggregateOptions::deferred_entity_tbs`-style sidecar] for Chunk 7
56//! to consume — the driver does not fold them into the consolidated TB.
57//!
58//! # Prior-period plumbing
59//!
60//! When [`AggregateOptions::prior_period_aggregate`] is `Some(path)`,
61//! the driver reads opening NCI and equity-method carrying values from
62//! `path/consolidated/{nci_rollforward,equity_method_investments}.json`
63//! via the existing ingestion helpers in
64//! [`crate::aggregate::nci::opening`] and
65//! [`crate::aggregate::equity_method`]. Opening CTA balances are
66//! similarly loaded from the prior `cta_rollforward.json`. When `None`,
67//! every opening defaults to zero — the engagement's first period.
68//!
69//! # Determinism
70//!
71//! Every step the driver invokes is deterministic given identical
72//! inputs. The driver itself walks `manifest.ownership_graph.entities`
73//! in declaration order; sub-steps that need lexicographic ordering
74//! (pre-elim aggregator, IC matcher) sort internally. Two runs of
75//! `run_aggregate` over the same shard archive produce byte-identical
76//! `consolidated/*.json` and `ic_eliminations/*.json` files.
77
78use std::collections::BTreeMap;
79use std::fs;
80use std::path::{Path, PathBuf};
81
82use chrono::NaiveDate;
83use rust_decimal::Decimal;
84use serde::{Deserialize, Serialize};
85
86use datasynth_core::models::balance::TrialBalance;
87use datasynth_core::models::business_combination::BusinessCombination;
88use datasynth_core::models::JournalEntry;
89use datasynth_standards::framework::AccountingFramework;
90
91use crate::aggregate::coverage_report::{build_coverage_report, write_coverage_report};
92use crate::aggregate::elimination::{eliminations_to_journal_entries, generate_eliminations};
93use crate::aggregate::equity_method::{
94 compute_equity_method_investment, ingest_opening_equity_method_carrying_values,
95 ingest_opening_suppressed_losses, write_equity_method_investments, write_suppressed_losses,
96 EquityMethodInputs, EquityMethodInvestment,
97};
98use crate::aggregate::fs::{
99 build_consolidated_balance_sheet_with_names, build_consolidated_cash_flow,
100 build_consolidated_income_statement_with_names,
101 build_consolidation_schedule_with_contributions, build_notes_to_consolidated_fs,
102 build_statement_of_changes_in_equity, write_consolidated_fs, AccountNameDictionary,
103 CashFlowInputs, ConsolidatedFinancialStatements, EquityChangesInputs, NotesInputs,
104};
105#[allow(unused_imports)]
106use crate::aggregate::ic_matcher::match_ic_pairs;
107use crate::aggregate::nci::{
108 compute_nci_rollforward, ingest_opening_nci_balances, write_nci_rollforward, NciInputs,
109 NciRollforward,
110};
111use crate::aggregate::post_elim::{apply_eliminations_to_tb, apply_nci_and_equity_method};
112use crate::aggregate::pre_elim::{
113 accumulate_entity_into_aggregate, empty_aggregate, finalise_streaming_aggregate, AggregatedTb,
114};
115use crate::aggregate::tb_loader::load_entity_trial_balance;
116use crate::aggregate::translation::cta::{
117 cta_rollforward, write_cta_rollforward, CtaRollforward, CONSOLIDATED_SUBDIR,
118 CTA_ROLLFORWARD_FILENAME,
119};
120use crate::aggregate::translation::restatement::{select_restatement_path, RestatementPath};
121use crate::aggregate::translation::translate::{
122 translate_entity_tb_with_hyperinflation, translate_entity_tb_with_indexed_restatement, DrCr,
123 TranslatedTb,
124};
125use crate::aggregate::translation::worksheet::write_translation_worksheet;
126use crate::config::ConsolidationMethod;
127use crate::errors::{GroupError, GroupResult};
128use crate::manifest::builder::{GroupManifest, ManifestEntity};
129
130// ── Public types ──────────────────────────────────────────────────────────────
131
132/// Knobs the caller can supply to tune [`run_aggregate`] behaviour.
133///
134/// All fields are optional; the [`Default`] impl gives the production
135/// fail-fast contract (no prior period, missing shards are errors).
136#[derive(Debug, Clone, Default)]
137pub struct AggregateOptions {
138 /// Optional path to a prior period's `consolidated/` directory's
139 /// **parent** — i.e. the prior period's `out_dir`. When supplied,
140 /// opening NCI, equity-method carrying values, and CTA balances are
141 /// loaded from
142 /// `prior_period_aggregate/consolidated/{nci_rollforward,equity_method_investments,cta_rollforward}.json`.
143 /// When `None`, every opening defaults to zero per entity.
144 pub prior_period_aggregate: Option<PathBuf>,
145 /// When `true`, missing per-entity shard archives produce a warning
146 /// rather than an error — useful for partial-archive recovery
147 /// scenarios. Defaults to `false`: missing shards fail fast.
148 pub tolerate_missing_shards: bool,
149 /// **v5.2 IAS 36 § 10** — per-period CGU goodwill impairment test
150 /// inputs. Each entry references a CGU defined in
151 /// [`crate::manifest::GroupManifest::cgu_plan`] and supplies the
152 /// CGU's other-asset carrying amount, fair-value-less-costs, and
153 /// value-in-use estimates for this period. When non-empty,
154 /// [`run_aggregate`] joins these against the manifest plan, runs
155 /// [`datasynth_core::models::CguImpairmentTest::run`] per CGU, and
156 /// emits `consolidated/cgu_impairment_tests.json`. When empty
157 /// (default), no impairment tests run and no artefact is emitted —
158 /// preserves backwards compatibility byte-for-byte for engagements
159 /// without CGU configuration.
160 pub cgu_test_inputs: Vec<crate::aggregate::cgu_impairment::CguTestInputs>,
161 /// **v5.5.2 IAS 29 § 12** — per-currency general price index (CPI)
162 /// series, keyed by ISO 4217 currency code. When supplied, the
163 /// aggregate driver matches each entity in
164 /// [`datasynth_core::models::HyperinflationStatus::Hyperinflationary`]
165 /// against this map by its functional currency. Matched entities
166 /// are translated via [`translate_entity_tb_with_indexed_restatement`]
167 /// (composing IAS 29 § 12 indexed restatement with IAS 21 § 42(b)
168 /// closing-rate translation). Hyperinflationary entities without a
169 /// matching currency entry log a warning and fall back to the
170 /// closing-rate-only path (the v5.5.0 behaviour). Non-hyperinflationary
171 /// entities ignore this map. Empty (default) preserves
172 /// backwards-compatible behaviour byte-for-byte.
173 pub cpi_series_by_currency:
174 BTreeMap<String, datasynth_core::models::hyperinflation::GeneralPriceIndex>,
175}
176
177/// Top-level result returned by [`run_aggregate`].
178///
179/// All numeric fields mirror the post-overlay consolidated TB; the
180/// `artifacts_written` list is the absolute paths of every file the
181/// driver emitted, in the order they were written.
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
183pub struct AggregateSummary {
184 /// Group identifier, mirrors [`GroupManifest::group_id`].
185 pub group_id: String,
186 /// Group presentation currency.
187 pub presentation_currency: String,
188 /// Period end date.
189 pub as_of_date: NaiveDate,
190 /// Codes of every entity whose shard archive contributed to the
191 /// pre-elimination aggregation (Parent + Full). Includes
192 /// equity-method / fair-value / proportional entities whose TBs were
193 /// loaded for Chunk 7 use even though they were not folded into the
194 /// pre-elim totals. Sorted lexicographically.
195 pub entities_processed: Vec<String>,
196 /// Codes of entities the driver expected but could not find on
197 /// disk. Always empty when [`AggregateOptions::tolerate_missing_shards`]
198 /// is `false`. Sorted lexicographically.
199 pub entities_missing: Vec<String>,
200 /// Codes of entities held back from the pre-elim aggregation per
201 /// their consolidation method (equity-method / fair-value /
202 /// proportional). Sorted lexicographically.
203 pub deferred_entities: Vec<String>,
204 /// Number of IC pairs successfully matched.
205 pub matched_pairs: usize,
206 /// IC matching coverage = matched / planned.
207 pub coverage: f64,
208 /// Sum of `total_assets` from the consolidated balance sheet.
209 pub total_assets: Decimal,
210 /// Sum of `total_liabilities` from the consolidated balance sheet.
211 pub total_liabilities: Decimal,
212 /// Sum of equity attributable to owners of the parent.
213 pub total_equity: Decimal,
214 /// Sum of non-controlling interest equity.
215 pub total_nci: Decimal,
216 /// Absolute paths of every artefact the driver wrote, in
217 /// emission order:
218 ///
219 /// 1. `ic_eliminations/ic_matching_coverage.json`
220 /// 2. `consolidated/cta_rollforward.json`
221 /// 3. `consolidated/translation_worksheet.json`
222 /// 4. `consolidated/nci_rollforward.json`
223 /// 5. `consolidated/equity_method_investments.json`
224 /// 6. `consolidated/consolidated_financial_statements.json`
225 /// 7. `consolidated/consolidation_schedule.json`
226 /// 8. `consolidated/notes_to_consolidated_fs.json`
227 pub artifacts_written: Vec<PathBuf>,
228}
229
230// ── Public API ────────────────────────────────────────────────────────────────
231
232/// Drive the aggregate phase end-to-end.
233///
234/// Walks the per-entity shard archives under `shards_dir/entities/`,
235/// folds them through Chunks 5–8, and writes every artefact under
236/// `out_dir`. Returns an [`AggregateSummary`] describing the
237/// consolidation outcome and the absolute paths of every emitted file.
238///
239/// `shards_dir == out_dir` is supported (and used by
240/// [`crate::standalone::generate_standalone`] — the runner writes
241/// shards directly into the same root the aggregate driver consumes).
242///
243/// # Errors
244///
245/// - [`GroupError::Aggregate`] if a missing shard archive is fatal
246/// (`tolerate_missing_shards == false`).
247/// - [`GroupError::Aggregate`] propagated from the sub-modules
248/// (currency mismatch, balance regressions, FX rate gaps, etc.).
249/// - [`GroupError::Io`] if the per-entity TB / JE files cannot be read
250/// or the consolidated artefacts cannot be written.
251/// - [`GroupError::Serde`] if any file fails to deserialise / serialise.
252pub fn run_aggregate(
253 manifest: &GroupManifest,
254 shards_dir: &Path,
255 out_dir: &Path,
256 opts: &AggregateOptions,
257) -> GroupResult<AggregateSummary> {
258 // Resolve framework once for downstream consumers (translation +
259 // notes) so we don't re-parse it per-entity.
260 let framework = resolve_primary_framework(manifest);
261
262 // ── 1+2+9. v5.31 C1 streaming walk ──────────────────────────────────
263 //
264 // The previous v5.0–v5.30 driver loaded all 1800–2000 contributing
265 // TBs into a `Vec<(String, TrialBalance)>` (`walk_entity_archives`),
266 // ran pre-elim accumulation, then translated *all* TBs into a
267 // second 1800–2000-element vector (`translate_all_contributing`).
268 // Combined hold peaked at ~400 GB on the 2026-05-26 2000-entity
269 // run, OOM-killing the aggregate phase at 226 GB anon-rss.
270 //
271 // The streaming variant fuses the walk, pre-elim accumulation,
272 // translation, and consolidation-contribution-map build into ONE
273 // pass per entity directory. Each source TB drops at the end of
274 // its loop iteration; the only structures that survive past the
275 // walk are the (small) derived aggregates.
276 let StreamingWalkOutcome {
277 pre_elim,
278 translated_tbs,
279 entity_contributions,
280 deferred_tbs,
281 ic_journal_entries,
282 mut je_network_writer,
283 entities_missing,
284 } = walk_aggregate_streaming(
285 manifest,
286 shards_dir,
287 out_dir,
288 framework,
289 &opts.cpi_series_by_currency,
290 opts.tolerate_missing_shards,
291 )?;
292
293 let entities_processed: Vec<String> = pre_elim
294 .contributing_entities
295 .iter()
296 .cloned()
297 .chain(deferred_tbs.iter().map(|(c, _)| c.clone()))
298 .collect();
299 let mut entities_processed_sorted = entities_processed.clone();
300 entities_processed_sorted.sort();
301 entities_processed_sorted.dedup();
302
303 // ── 3. (already covered above by walk_aggregate_streaming) ──────────
304
305 // ── 4. Match IC pairs (Task 5.3) — v5.31 C1 Phase 2: input is the
306 // IC-filtered JE subset (~5 % of total) instead of all JEs.
307 // v5.31 C1 Phase 3: input consumed by value to eliminate the
308 // double-clone hold in the IC matcher (~45 GB saving at 2k).
309 let match_result =
310 crate::aggregate::ic_matcher::match_ic_pairs_consuming(manifest, ic_journal_entries)?;
311
312 // ── 5. Build + write coverage report (Task 5.7) ─────────────────────
313 let coverage_report = build_coverage_report(&match_result);
314 let coverage_path = write_coverage_report(&coverage_report, out_dir)?;
315
316 // ── 6. Generate eliminations (Task 5.4) ─────────────────────────────
317 let elim_result = generate_eliminations(&match_result.matched, manifest)?;
318
319 // ── 7. Convert to elimination JEs (Task 5.5) ────────────────────────
320 let elim_jes = eliminations_to_journal_entries(&elim_result);
321
322 // ── 7b. v5.31 C1 Phase 2: write elimination edges to the streaming
323 // JE-network writer, then finalise. Per-entity edges already
324 // flowed through the writer during walk_aggregate_streaming;
325 // the elim edges only land in the consolidated CSV (per the
326 // v5.0 contract — no per-entity emit for eliminations).
327 je_network_writer.write_elim_edges(&elim_jes)?;
328 let je_network_summary = je_network_writer.finalize()?;
329 tracing::info!(
330 "v5.10 je_network: {} per-entity files, {} elim edges, {} consolidated edges -> {:?}",
331 je_network_summary.per_entity_edge_count.len(),
332 je_network_summary.elim_edge_count,
333 je_network_summary.consolidated_edge_count,
334 je_network_summary.consolidated_csv_path,
335 );
336
337 // ── 8. Apply eliminations to pre-elim TB (Task 5.6) ─────────────────
338 let post_elim = apply_eliminations_to_tb(&pre_elim, &elim_jes)?;
339
340 // ── 9. Per-entity translation (Task 6.2) ────────────────────────────
341 // v5.31 C1: translated_tbs was built during the streaming walk
342 // above (no second pass over contributing_tbs).
343
344 // ── 10. CTA rollforward (Task 6.3) ─────────────────────────────────
345 let cta_rolls = build_cta_rollforwards(
346 &translated_tbs,
347 &manifest.presentation_currency,
348 opts.prior_period_aggregate.as_deref(),
349 )?;
350 let cta_path = write_cta_rollforward(&cta_rolls, out_dir)?;
351
352 // ── 11. Translation worksheet (Task 6.4) ───────────────────────────
353 let worksheet_path = write_translation_worksheet(&translated_tbs, out_dir)?;
354
355 // ── 12. NCI rollforward (Tasks 7.1 + 7.2) ──────────────────────────
356 // v5.2: walk the contributing shards' BusinessCombination files
357 // up-front so the NCI rollforward can seed period-1 opening from
358 // the IFRS 3.19(a) acquisition-date fair value when present.
359 let acquisition_fv_map = ingest_acquisition_date_nci_fair_values(manifest, shards_dir);
360 // v5.2: read each entity's `intercompany/ownership_change_events.json`
361 // (PR #155) so the NCI rollforward can apply IFRS 10.23 equity-
362 // transaction adjustments for `ControlIncreased` / `ControlDecreased`
363 // events. Empty map when no entity declares ownership changes —
364 // preserves v5.0–v5.1 byte-identical NCI behaviour.
365 let ownership_changes_map = ingest_ownership_change_events(manifest, shards_dir);
366 let nci_rolls = build_nci_rollforwards(
367 manifest,
368 &translated_tbs,
369 opts.prior_period_aggregate.as_deref(),
370 &acquisition_fv_map,
371 &ownership_changes_map,
372 )?;
373 let nci_path = write_nci_rollforward(&nci_rolls, out_dir)?;
374
375 // ── 13. Equity-method investments (Task 7.3) ───────────────────────
376 let eq_method_invs = build_equity_method_investments(
377 manifest,
378 &deferred_tbs,
379 framework,
380 opts.prior_period_aggregate.as_deref(),
381 )?;
382 let eq_method_path = write_equity_method_investments(&eq_method_invs, out_dir)?;
383 // v5.1: side-by-side IAS 28.38 suppressed-loss memorandum
384 // (filtered to records with closing_suppressed_loss > 0).
385 let _ = write_suppressed_losses(&eq_method_invs, out_dir)?;
386
387 // ── 14. Apply NCI + equity-method overlay (Task 7.4) ───────────────
388 let post_overlay = apply_nci_and_equity_method(&post_elim, &nci_rolls, &eq_method_invs)?;
389
390 // ── 15. Build consolidated FS (Tasks 8.1–8.4) ──────────────────────
391 // v5.1: thread the engagement's CoA-master labels through the FS
392 // builders so localised account names (SKR04 / PCG / custom client
393 // chart) flow into the consolidated BS / IS line labels. Falls
394 // back to the canonical built-in English labels for any code the
395 // master doesn't carry, then to the bare code itself.
396 let account_names = AccountNameDictionary::from_coa_master(
397 &manifest.chart_of_accounts_master,
398 &manifest.chart_of_accounts_master.primary_framework,
399 );
400 let bs = build_consolidated_balance_sheet_with_names(
401 &post_overlay,
402 &manifest.group_id,
403 manifest.period.end,
404 &account_names,
405 )?;
406 let is = build_consolidated_income_statement_with_names(
407 &post_overlay,
408 &nci_rolls,
409 &manifest.group_id,
410 manifest.period.end,
411 &account_names,
412 )?;
413 // For v5.0 the cash flow statement gets a minimal input set: net
414 // income from the IS, no non-cash adjustments / capex / financing
415 // activity. The fully derived cash flow statement is on the v5.1
416 // roadmap. `post_elim_tb_prior = None` because the driver does not
417 // re-run the prior-period aggregation pipeline (see module rustdoc
418 // step 15 in the plan — leave at `None`).
419 let cf_inputs = CashFlowInputs {
420 post_elim_tb_current: &post_overlay,
421 post_elim_tb_prior: None,
422 net_income: is.net_income,
423 depreciation_amortization: Decimal::ZERO,
424 impairment: Decimal::ZERO,
425 capex: Decimal::ZERO,
426 debt_issuance: Decimal::ZERO,
427 debt_repayment: Decimal::ZERO,
428 dividends_paid_to_owners: Decimal::ZERO,
429 dividends_paid_to_nci: Decimal::ZERO,
430 equity_issuance: Decimal::ZERO,
431 };
432 let cf = build_consolidated_cash_flow(
433 &cf_inputs,
434 &manifest.group_id,
435 manifest.period.start,
436 manifest.period.end,
437 )?;
438 let eq_changes_inputs = EquityChangesInputs {
439 opening_owners_equity: Decimal::ZERO,
440 opening_nci: nci_rolls
441 .iter()
442 .map(|rf| rf.opening_nci)
443 .fold(Decimal::ZERO, |acc, v| acc + v),
444 net_income_to_owners: is.net_income_to_owners,
445 net_income_to_nci: is.net_income_to_nci,
446 oci_to_owners: cta_rolls
447 .iter()
448 .map(|rf| rf.period_cta)
449 .fold(Decimal::ZERO, |acc, v| acc + v),
450 oci_to_nci: nci_rolls
451 .iter()
452 .map(|rf| rf.nci_share_of_oci)
453 .fold(Decimal::ZERO, |acc, v| acc + v),
454 dividends_to_owners: Decimal::ZERO,
455 dividends_to_nci: nci_rolls
456 .iter()
457 .map(|rf| rf.nci_dividends)
458 .fold(Decimal::ZERO, |acc, v| acc + v),
459 other_owners: Decimal::ZERO,
460 other_nci: Decimal::ZERO,
461 };
462 let changes_in_equity = build_statement_of_changes_in_equity(
463 &eq_changes_inputs,
464 &manifest.group_id,
465 manifest.period.start,
466 manifest.period.end,
467 &manifest.presentation_currency,
468 );
469
470 let fs_bundle = ConsolidatedFinancialStatements {
471 balance_sheet: bs,
472 income_statement: is,
473 cash_flow: cf,
474 changes_in_equity,
475 };
476
477 // ── 16. Build consolidation schedule (Task 8.5) ────────────────────
478 // v5.31 C1: pass the pre-built entity_contributions map (built
479 // during the streaming walk) instead of re-iterating all TBs.
480 let schedule = build_consolidation_schedule_with_contributions(
481 &pre_elim,
482 &post_overlay,
483 &entity_contributions,
484 &manifest.group_id,
485 manifest.period.end,
486 )?;
487
488 // ── 17. Build notes (Task 8.6) ─────────────────────────────────────
489 let notes_inputs = NotesInputs {
490 manifest,
491 framework,
492 ic_coverage: &coverage_report,
493 nci_rollforwards: &nci_rolls,
494 cta_rollforwards: &cta_rolls,
495 equity_method_investments: &eq_method_invs,
496 };
497 let notes = build_notes_to_consolidated_fs(¬es_inputs, manifest.period.end);
498
499 // ── 18. Write FS artefacts (Task 8.7) ──────────────────────────────
500 let fs_paths = write_consolidated_fs(&fs_bundle, &schedule, ¬es, out_dir)?;
501
502 // ── 18b. CGU goodwill impairment tests (IAS 36 § 10) ───────────────
503 // No-op when caller supplies no test inputs OR manifest has no
504 // CGU plan — preserves backwards compatibility byte-for-byte.
505 // Per-entity net assets (presentation currency) = sum of balance-sheet
506 // lines (assets are debit-side, liabilities credit-side) from the translated
507 // TBs, so a CGU's carrying derives from — and reconciles to — the generated
508 // financials (IAS 36 CGU carrying <-> balance sheet). Consumed by
509 // `run_cgu_impairment_tests` for any CGU whose `other_carrying` is `None`.
510 let entity_net_assets: std::collections::BTreeMap<String, Decimal> = {
511 use crate::aggregate::translation::TranslationAccountType as T;
512 use crate::DrCr;
513 let mut m = std::collections::BTreeMap::new();
514 for t in &translated_tbs {
515 let net: Decimal = t
516 .lines
517 .iter()
518 .filter(|l| matches!(l.account_type, T::BsMonetary | T::BsNonMonetary))
519 .map(|l| match l.local_dr_cr {
520 DrCr::Debit => l.translated_amount,
521 DrCr::Credit => -l.translated_amount,
522 })
523 .sum();
524 m.insert(t.entity_code.clone(), net);
525 }
526 m
527 };
528 let cgu_results = crate::aggregate::cgu_impairment::run_cgu_impairment_tests(
529 &manifest.cgu_plan,
530 &opts.cgu_test_inputs,
531 &entity_net_assets,
532 manifest.period.end,
533 &manifest.presentation_currency,
534 )?;
535 let cgu_path =
536 crate::aggregate::cgu_impairment::write_cgu_impairment_tests(out_dir, &cgu_results)?;
537
538 // ── 19. Build summary ───────────────────────────────────────────────
539 let mut artifacts_written: Vec<PathBuf> = Vec::with_capacity(9);
540 artifacts_written.push(coverage_path);
541 artifacts_written.push(cta_path);
542 artifacts_written.push(worksheet_path);
543 artifacts_written.push(nci_path);
544 artifacts_written.push(eq_method_path);
545 artifacts_written.extend(fs_paths);
546 if let Some(p) = cgu_path {
547 artifacts_written.push(p);
548 }
549
550 let mut deferred_codes: Vec<String> = deferred_tbs.iter().map(|(c, _)| c.clone()).collect();
551 deferred_codes.sort();
552 deferred_codes.dedup();
553
554 Ok(AggregateSummary {
555 group_id: manifest.group_id.clone(),
556 presentation_currency: manifest.presentation_currency.clone(),
557 as_of_date: manifest.period.end,
558 entities_processed: entities_processed_sorted,
559 entities_missing,
560 deferred_entities: deferred_codes,
561 matched_pairs: match_result.matched.len(),
562 coverage: match_result.coverage,
563 total_assets: fs_bundle.balance_sheet.total_assets,
564 total_liabilities: fs_bundle.balance_sheet.total_liabilities,
565 total_equity: fs_bundle.balance_sheet.total_equity,
566 total_nci: fs_bundle.balance_sheet.total_nci,
567 artifacts_written,
568 })
569}
570
571// ── Internal helpers ──────────────────────────────────────────────────────────
572
573/// Outcome of the per-entity archive walk: balanced TBs partitioned by
574/// consolidation method, paired with the JEs the runner emitted, plus
575/// the entities that were missing from disk.
576///
577/// **v5.31 C1**: superseded by [`StreamingWalkOutcome`] + [`walk_aggregate_streaming`].
578/// Kept for now so the legacy path can be revived for differential-test fixtures
579/// without re-introducing the symbol; mark as `#[allow(dead_code)]` to silence the
580/// dead-code lint while the streaming path bakes.
581#[allow(dead_code)]
582struct WalkOutcome {
583 /// Parent + Full entity TBs, paired with their entity codes. Fed
584 /// into the pre-elim aggregator.
585 contributing_tbs: Vec<(String, TrialBalance)>,
586 /// Parent + Full entity JEs, paired with their entity codes. Fed
587 /// into the IC matcher.
588 contributing_jes: Vec<(String, Vec<JournalEntry>)>,
589 /// Equity-method / fair-value / proportional entity TBs, kept
590 /// separate so Chunk 7 (equity-method rollforward) can consume them
591 /// without re-walking the disk.
592 deferred_tbs: Vec<(String, TrialBalance)>,
593 /// Entity codes the driver expected but could not find on disk
594 /// (only populated when `tolerate_missing_shards == true`).
595 entities_missing: Vec<String>,
596}
597
598/// Walk every entity in `manifest.ownership_graph.entities` and load
599/// its `period_close/trial_balances.json` + `journal_entries.json` from
600/// `shards_dir/entities/{code}/`.
601///
602/// Per the plan, equity-method / fair-value / proportional entities
603/// **are** expected to have shards on disk — the runner generated them
604/// — but their TBs go into `deferred_tbs` rather than `contributing_tbs`
605/// so the pre-elim aggregator only sees Parent + Full inputs.
606///
607/// A missing entity directory or `period_close/trial_balances.json` is
608/// either fatal (`tolerate_missing_shards = false`) or pushed to
609/// `entities_missing` with a `tracing::warn!`. A missing
610/// `journal_entries.json` for a present entity is treated as "no JEs"
611/// (empty Vec) — the runner always writes the file even when the
612/// entity emits zero JEs (the orchestrator at minimum produces opening-
613/// balance entries for any seeded TB), so the missing-file case is
614/// effectively unreachable under v5.0 but defended here defensively.
615///
616/// **v5.31 C1**: superseded by [`walk_aggregate_streaming`]. Kept for
617/// the differential-test fixture path; marked dead-code-allowed.
618#[allow(dead_code)]
619fn walk_entity_archives(
620 manifest: &GroupManifest,
621 shards_dir: &Path,
622 tolerate_missing_shards: bool,
623) -> GroupResult<WalkOutcome> {
624 let mut contributing_tbs: Vec<(String, TrialBalance)> = Vec::new();
625 let mut contributing_jes: Vec<(String, Vec<JournalEntry>)> = Vec::new();
626 let mut deferred_tbs: Vec<(String, TrialBalance)> = Vec::new();
627 let mut entities_missing: Vec<String> = Vec::new();
628
629 for entity in &manifest.ownership_graph.entities {
630 let entity_dir = shards_dir.join("entities").join(&entity.code);
631 let tb_path = entity_dir.join("period_close").join("trial_balances.json");
632
633 if !tb_path.exists() {
634 if tolerate_missing_shards {
635 tracing::warn!(
636 entity = %entity.code,
637 path = %tb_path.display(),
638 "missing shard archive — continuing in tolerate_missing_shards mode",
639 );
640 entities_missing.push(entity.code.clone());
641 continue;
642 }
643 return Err(GroupError::Aggregate(format!(
644 "run_aggregate: missing shard archive for `{}` at `{}`",
645 entity.code,
646 tb_path.display()
647 )));
648 }
649
650 let tb = load_entity_trial_balance(&entity_dir)?;
651 let jes = load_entity_journal_entries(&entity_dir, &entity.code)?;
652
653 match entity.consolidation_method {
654 ConsolidationMethod::Parent | ConsolidationMethod::Full => {
655 contributing_tbs.push((entity.code.clone(), tb));
656 contributing_jes.push((entity.code.clone(), jes));
657 }
658 ConsolidationMethod::EquityMethod
659 | ConsolidationMethod::Proportional
660 | ConsolidationMethod::FairValue => {
661 // Capture the deferred TB for Chunk 7 consumption; we
662 // intentionally do *not* push the JEs into the
663 // contributing slice because IC matching only spans
664 // line-by-line consolidated entities. v5.0 IC pair plans
665 // for equity-method investees are out of scope; v5.3
666 // will revisit.
667 deferred_tbs.push((entity.code.clone(), tb));
668 }
669 }
670 }
671
672 entities_missing.sort();
673 Ok(WalkOutcome {
674 contributing_tbs,
675 contributing_jes,
676 deferred_tbs,
677 entities_missing,
678 })
679}
680
681/// Read every JE the orchestrator emitted for `entity_code` from
682/// `entity_dir/journal_entries.json`. Treats a missing file as zero
683/// JEs (defensive — see `walk_entity_archives` rustdoc).
684#[allow(dead_code)]
685fn load_entity_journal_entries(
686 entity_dir: &Path,
687 entity_code: &str,
688) -> GroupResult<Vec<JournalEntry>> {
689 let path = entity_dir.join("journal_entries.json");
690 if !path.exists() {
691 tracing::warn!(
692 entity = %entity_code,
693 path = %path.display(),
694 "no journal_entries.json found — treating as empty",
695 );
696 return Ok(Vec::new());
697 }
698 let bytes = std::fs::read(&path).map_err(GroupError::Io)?;
699 let jes: Vec<JournalEntry> = serde_json::from_slice(&bytes)?;
700 Ok(jes)
701}
702
703/// v5.31 C1 Phase 6 — streaming JSON-array parse for journal_entries.json.
704///
705/// Reads `entity_dir/journal_entries.json` (a JSON array of [`JournalEntry`])
706/// via `serde_json::Deserializer::from_reader` and yields one JE at a time
707/// through the `on_je` callback. **Never materialises the full
708/// `Vec<JournalEntry>`** — peak per-iteration allocation is one JE
709/// (~25 KB) rather than the legacy `load_entity_journal_entries`'s
710/// ~25-300 MB.
711///
712/// Why this matters: the Phase 5 diagnostic
713/// (`docs/baselines/2026-05-27-v5.31-c1-phase5-diagnostic/COMPARISON.md`)
714/// found that 178 MB of RSS climbed per entity even though tracked
715/// data structures only grew ~75 KB per entity — a 2 400× gap.
716/// Allocator-fragmentation from the per-iteration spike of JSON-parse
717/// String allocations was eating ~178 MB / entity even with mimalloc
718/// (since the small String allocations leave the heap too fragmented
719/// for the OS to reclaim pages). Streaming the parse eliminates the
720/// spike → no fragmentation → RSS stays flat.
721///
722/// Treats a missing file as zero JEs (defensive — see
723/// `walk_entity_archives` rustdoc).
724///
725/// # Errors
726///
727/// - [`GroupError::Io`] if the file cannot be opened.
728/// - [`GroupError::Serde`] if the JSON is malformed at any point.
729/// - Whatever error `on_je` returns (propagated through serde's
730/// error-conversion machinery).
731fn stream_entity_journal_entries<F>(
732 entity_dir: &Path,
733 entity_code: &str,
734 mut on_je: F,
735) -> GroupResult<usize>
736where
737 F: FnMut(JournalEntry) -> GroupResult<()>,
738{
739 let path = entity_dir.join("journal_entries.json");
740 if !path.exists() {
741 tracing::warn!(
742 entity = %entity_code,
743 path = %path.display(),
744 "no journal_entries.json found — treating as empty",
745 );
746 return Ok(0);
747 }
748 let file = std::fs::File::open(&path).map_err(GroupError::Io)?;
749 let reader = std::io::BufReader::with_capacity(64 * 1024, file);
750 let mut de = serde_json::Deserializer::from_reader(reader);
751
752 // Custom Visitor that drives the array iteration without ever
753 // materialising the whole sequence into a Vec.
754 struct JeStreamVisitor<F> {
755 on_je: F,
756 count: usize,
757 }
758 impl<'de, F> serde::de::Visitor<'de> for JeStreamVisitor<F>
759 where
760 F: FnMut(JournalEntry) -> GroupResult<()>,
761 {
762 type Value = usize;
763 fn expecting(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
764 fmt.write_str("a JSON array of JournalEntry")
765 }
766 fn visit_seq<A>(mut self, mut seq: A) -> Result<usize, A::Error>
767 where
768 A: serde::de::SeqAccess<'de>,
769 {
770 while let Some(je) = seq.next_element::<JournalEntry>()? {
771 (self.on_je)(je).map_err(serde::de::Error::custom)?;
772 self.count += 1;
773 }
774 Ok(self.count)
775 }
776 }
777
778 let count = serde::de::Deserializer::deserialize_seq(
779 &mut de,
780 JeStreamVisitor {
781 on_je: &mut on_je,
782 count: 0,
783 },
784 )?;
785 Ok(count)
786}
787
788/// v5.31 C1 — translate a **single** entity's TB.
789///
790/// v5.31 C1 Phase 5 — read this process's resident-set size from
791/// `/proc/self/statm`. Returns `None` on non-Linux or read failure;
792/// the caller logs `0` in that case so the tracing line is still
793/// emitted. Used by the walk-loop RSS checkpoints to attribute the
794/// Phase 4 OOM trajectory to a specific accumulator.
795fn read_self_rss_kb() -> Option<u64> {
796 // `/proc/self/statm` fields: size resident shared text lib data dt
797 // — all in pages. We want field index 1 (resident).
798 let s = std::fs::read_to_string("/proc/self/statm").ok()?;
799 let pages: u64 = s.split_ascii_whitespace().nth(1)?.parse().ok()?;
800 // Page size is 4 KB on x86_64 Linux. Could call `sysconf(_SC_PAGESIZE)`
801 // for portability but the only platform we run on is x86_64 Linux.
802 Some(pages * 4)
803}
804
805/// Factored out of [`translate_all_contributing`] so the streaming
806/// aggregate walk can call it per-entity without materialising a
807/// `Vec<(String, TrialBalance)>`. The signature mirrors the inner
808/// loop body verbatim — the only structural change is the explicit
809/// `entity: &ManifestEntity` parameter (the caller already has the
810/// lookup in hand, no need to repeat it inside the helper).
811fn translate_one_entity(
812 code: &str,
813 tb: &TrialBalance,
814 entity: &ManifestEntity,
815 manifest: &GroupManifest,
816 framework: AccountingFramework,
817 cpi_opt: Option<&BTreeMap<String, datasynth_core::models::hyperinflation::GeneralPriceIndex>>,
818) -> GroupResult<TranslatedTb> {
819 let path = select_restatement_path(
820 entity.hyperinflation_status,
821 entity.functional_currency.as_str(),
822 cpi_opt,
823 manifest.period.start,
824 manifest.period.end,
825 );
826 let translated = match &path {
827 RestatementPath::Indexed(ir) => translate_entity_tb_with_indexed_restatement(
828 tb,
829 entity.functional_currency.as_str(),
830 &manifest.fx_rate_master,
831 manifest.period.end,
832 &manifest.presentation_currency,
833 framework,
834 entity.hyperinflation_status,
835 Some(ir),
836 )?,
837 RestatementPath::Standard | RestatementPath::ClosingRate => {
838 if matches!(path, RestatementPath::ClosingRate) && cpi_opt.is_some() {
839 tracing::warn!(
840 entity = %code,
841 functional_currency = %entity.functional_currency,
842 period_start = %manifest.period.start,
843 period_end = %manifest.period.end,
844 "hyperinflationary entity has no matching CPI series — falling back to IAS 21 § 42(b) closing-rate translation only (no IAS 29 § 12 indexed restatement)",
845 );
846 }
847 translate_entity_tb_with_hyperinflation(
848 tb,
849 entity.functional_currency.as_str(),
850 &manifest.fx_rate_master,
851 manifest.period.end,
852 &manifest.presentation_currency,
853 framework,
854 entity.hyperinflation_status,
855 )?
856 }
857 };
858 Ok(translated)
859}
860
861/// v5.31 C1 — outcome of the streaming aggregate walk.
862///
863/// Eliminates the `contributing_tbs: Vec<(String, TrialBalance)>` hold
864/// (100-200 GB on a 2000-entity run) by fusing the walk, pre-elimination
865/// accumulation, translation, and consolidation-contribution-map build
866/// into one pass per entity directory. After the walk completes, only
867/// the **derived** structures live in memory:
868///
869/// - `pre_elim` — small (~50 GL accounts × few KB each)
870/// - `translated_tbs` — ~20 MB × N entities (one per contributing
871/// entity); needed by Chunk 6 + Chunk 7 + Chunk 8 downstream
872/// - `entity_contributions` — `account_code → entity_code → net` map
873/// used by Chunk 8 consolidation_schedule (~200 MB at 2000 entities)
874/// - `deferred_tbs` — small (~200 entities, kept for equity-method
875/// in Chunk 7; refactoring this is a future phase)
876/// - `ic_journal_entries` — **v5.31 C1 Phase 2**: only IC-pair-tagged
877/// JEs (typically <5 % of total emitted JEs). All non-IC JEs are
878/// dropped inside the walk after their edges flow through the
879/// streaming JE-network writer.
880/// - `je_network_summary` — running tally from
881/// `JeNetworkStreamingWriter` (the writer is closed at the end of
882/// the walk, so `consolidated_csv_path` is populated).
883/// - `entities_missing` — mirror of [`WalkOutcome`]
884///
885/// The source TBs and the full JE vectors are **never materialised
886/// across entities** — each entity's TB + JE vec drops at the end of
887/// its loop iteration; only the IC-filtered subset persists.
888struct StreamingWalkOutcome {
889 pre_elim: AggregatedTb,
890 translated_tbs: Vec<TranslatedTb>,
891 /// `account_code → entity_code → net balance`
892 entity_contributions: BTreeMap<String, BTreeMap<String, Decimal>>,
893 deferred_tbs: Vec<(String, TrialBalance)>,
894 /// v5.31 C1 Phase 2: only IC-pair-tagged JEs survive the walk.
895 /// Non-IC JEs flow through the JE-network writer + drop.
896 ic_journal_entries: Vec<(String, Vec<JournalEntry>)>,
897 /// v5.31 C1 Phase 2: the streaming JE-network writer. Stays open
898 /// past the walk so the driver can write the post-walk elimination
899 /// edges before calling `finalize()` to flush + close.
900 je_network_writer: crate::aggregate::je_network::JeNetworkStreamingWriter,
901 entities_missing: Vec<String>,
902}
903
904/// v5.31 C1 — streaming aggregate walk.
905///
906/// Combines the three former passes (walk → pre-elim → translate)
907/// into one pass per entity, dropping each source TB immediately after
908/// it has been accumulated into the running [`AggregatedTb`] and
909/// translated into a [`TranslatedTb`].
910///
911/// Memory profile on a 2000-entity run:
912/// - Peak source-TB hold: **1 TB at a time** (was 1800–2000 TBs)
913/// - Net hold after walk: `pre_elim + translated_tbs +
914/// entity_contributions + deferred_tbs + contributing_jes`
915/// - Eliminates the ~200 GB that drove the OOM on the 2026-05-26 run.
916///
917/// Behaviour is byte-equivalent to the legacy three-step path: the
918/// pre-elim aggregator, the translation function, and the consolidation
919/// schedule's contribution-map all see identical inputs in identical
920/// order; `pre_elim` is `finalise`d at the end to match the legacy
921/// deterministic sort.
922fn walk_aggregate_streaming(
923 manifest: &GroupManifest,
924 shards_dir: &Path,
925 out_dir: &Path,
926 framework: AccountingFramework,
927 cpi_series_by_currency: &BTreeMap<
928 String,
929 datasynth_core::models::hyperinflation::GeneralPriceIndex,
930 >,
931 tolerate_missing_shards: bool,
932) -> GroupResult<StreamingWalkOutcome> {
933 let entity_lookup_map = entity_lookup(manifest);
934 let cpi_opt = if cpi_series_by_currency.is_empty() {
935 None
936 } else {
937 Some(cpi_series_by_currency)
938 };
939
940 let mut pre_elim = empty_aggregate(manifest);
941 let mut translated_tbs: Vec<TranslatedTb> =
942 Vec::with_capacity(manifest.ownership_graph.entities.len());
943 let mut entity_contributions: BTreeMap<String, BTreeMap<String, Decimal>> = BTreeMap::new();
944 let mut deferred_tbs: Vec<(String, TrialBalance)> = Vec::new();
945 let mut ic_journal_entries: Vec<(String, Vec<JournalEntry>)> = Vec::new();
946 let mut entities_missing: Vec<String> = Vec::new();
947
948 // v5.31 C1 Phase 2: open the streaming JE-network writer once;
949 // each per-entity loop iteration writes its edges directly to
950 // disk (per-entity + consolidated CSV). No consolidated edge vec
951 // ever materialises in memory.
952 let mut je_network_writer =
953 crate::aggregate::je_network::JeNetworkStreamingWriter::open(out_dir)?;
954
955 // v5.31 C1 Phase 5 — RSS profiling.
956 //
957 // Periodically log peak RSS + the sizes of every long-lived
958 // accumulator. The Phase 4 OOM analysis surfaced a 218 GB linear
959 // climb at ~6 GB/min over 38 min, but the IC subset (the holding
960 // structure I theorised) is only 0.03 % of edges; ic_journal_entries
961 // is ~100 MB, not the 50 GB I'd estimated. The actual hotspot is
962 // unknown. These checkpoint logs let the post-OOM log inspection
963 // tell us which structure grows fastest.
964 let entity_count = manifest.ownership_graph.entities.len();
965 let log_every = (entity_count / 20).max(1); // ~20 samples across the walk
966
967 for (idx, entity) in manifest.ownership_graph.entities.iter().enumerate() {
968 let entity_dir = shards_dir.join("entities").join(&entity.code);
969 let tb_path = entity_dir.join("period_close").join("trial_balances.json");
970
971 if !tb_path.exists() {
972 if tolerate_missing_shards {
973 tracing::warn!(
974 entity = %entity.code,
975 path = %tb_path.display(),
976 "missing shard archive — continuing in tolerate_missing_shards mode",
977 );
978 entities_missing.push(entity.code.clone());
979 continue;
980 }
981 return Err(GroupError::Aggregate(format!(
982 "run_aggregate: missing shard archive for `{}` at `{}`",
983 entity.code,
984 tb_path.display()
985 )));
986 }
987
988 let tb = load_entity_trial_balance(&entity_dir)?;
989
990 match entity.consolidation_method {
991 ConsolidationMethod::Parent | ConsolidationMethod::Full => {
992 // 1. Accumulate into running pre-elim aggregate (small).
993 accumulate_entity_into_aggregate(&mut pre_elim, manifest, &entity.code, &tb)?;
994
995 // 2. Build per-account contribution map for the
996 // consolidation schedule (Chunk 8.5). Replaces the
997 // O(N) re-scan of all TBs that used to live inside
998 // `build_consolidation_schedule`.
999 for line in &tb.lines {
1000 let net = line.debit_balance - line.credit_balance;
1001 entity_contributions
1002 .entry(line.account_code.clone())
1003 .or_default()
1004 .insert(entity.code.clone(), net);
1005 }
1006
1007 // 3. Translate to presentation currency (Chunk 6).
1008 // `translated_tbs` stays in memory — downstream Chunks
1009 // 6 + 7 + 8 iterate it; refactoring those to lazy
1010 // iteration is Phase 2.
1011 let entity_meta = entity_lookup_map.get(&entity.code).ok_or_else(|| {
1012 GroupError::Aggregate(format!(
1013 "run_aggregate: entity `{}` not in manifest's ownership graph",
1014 entity.code,
1015 ))
1016 })?;
1017 let translated = translate_one_entity(
1018 &entity.code,
1019 &tb,
1020 entity_meta,
1021 manifest,
1022 framework,
1023 cpi_opt,
1024 )?;
1025 translated_tbs.push(translated);
1026
1027 // 4. v5.31 C1 Phase 6 — stream the entity's
1028 // journal_entries.json one JE at a time, building
1029 // edges incrementally + filtering the IC subset
1030 // inline. NEVER materialises the full
1031 // `Vec<JournalEntry>` — Phase 5 diagnostic showed
1032 // that the per-iteration JSON-parse spike (~150-
1033 // 300 MB per entity) fragmented the heap to 218 GB
1034 // by entity ~1200 at 2k scale. Streaming drops the
1035 // peak per-iteration alloc to ~25 KB (one JE), so
1036 // the same loop runs at ~10 GB RSS.
1037 //
1038 // The `JeNetworkEdgeBuilder` maintains the
1039 // `line_id → edge_id` map across `push_je` calls
1040 // so cross-JE predecessor chains within this
1041 // entity (PO → invoice → payment doc chains)
1042 // resolve identically to the legacy
1043 // `build_je_network_edges(&jes, …)` slice call.
1044 let mut edge_builder = datasynth_runtime::je_network::JeNetworkEdgeBuilder::new(
1045 datasynth_config::JeNetworkMethod::A,
1046 );
1047 let mut ic_jes: Vec<JournalEntry> = Vec::new();
1048 let _je_count = stream_entity_journal_entries(&entity_dir, &entity.code, |je| {
1049 edge_builder.push_je(&je);
1050 if je.header.ic_pair_id.is_some() {
1051 ic_jes.push(je);
1052 }
1053 // `je` drops here on the non-IC path —
1054 // single-JE allocation reclaimed before the
1055 // next stream iteration → no fragmentation
1056 // spike.
1057 Ok(())
1058 })?;
1059 let edges = edge_builder.into_edges();
1060 je_network_writer.write_entity_edges_prebuilt(&entity.code, &edges)?;
1061
1062 // 5. IC subset retained for downstream IC matcher
1063 // (per v5.30 SOTA defaults this is ~0.03 % of all
1064 // JEs ≈ ~5-10 GB across all 2k entities, vs the
1065 // 100-400 GB of holding every JE).
1066 ic_journal_entries.push((entity.code.clone(), ic_jes));
1067
1068 // 6. `tb`, `edges` drop here — never held past this
1069 // iteration.
1070 }
1071 ConsolidationMethod::EquityMethod
1072 | ConsolidationMethod::Proportional
1073 | ConsolidationMethod::FairValue => {
1074 // accumulate_entity_into_aggregate already recorded the
1075 // DeferredEntity in pre_elim; we still keep the source
1076 // TB in `deferred_tbs` for equity-method downstream
1077 // (~200 entities × ~20 MB = ~4 GB max; manageable).
1078 accumulate_entity_into_aggregate(&mut pre_elim, manifest, &entity.code, &tb)?;
1079 deferred_tbs.push((entity.code.clone(), tb));
1080 // Deferred-method entities do not feed IC matching or
1081 // the consolidated je_network (per v5.0 contract — IC
1082 // pair plans only span Parent/Full entities). v5.31 C1
1083 // Phase 6 — we no longer load JEs for these entities
1084 // at all, saving the JSON-parse cost (~10-30 MB / entity).
1085 }
1086 }
1087
1088 // v5.31 C1 Phase 5 RSS checkpoint — every ~5 % of entities,
1089 // emit a structured log so the Phase 4 OOM trajectory can be
1090 // attributed to a specific accumulator.
1091 if (idx + 1) % log_every == 0 || idx + 1 == entity_count {
1092 let rss_kb = read_self_rss_kb().unwrap_or(0);
1093 let ic_je_count: usize = ic_journal_entries.iter().map(|(_, v)| v.len()).sum();
1094 let pre_elim_accounts = pre_elim.account_totals.len();
1095 tracing::info!(
1096 target: "datasynth_group::c1_phase5",
1097 entity_idx = idx + 1,
1098 total_entities = entity_count,
1099 rss_gb = rss_kb / 1024 / 1024,
1100 pre_elim_accounts,
1101 pre_elim_contributing = pre_elim.contributing_entities.len(),
1102 pre_elim_deferred = pre_elim.deferred_entities.len(),
1103 translated_tbs_n = translated_tbs.len(),
1104 translated_tbs_total_lines = translated_tbs.iter().map(|t| t.lines.len()).sum::<usize>(),
1105 entity_contributions_keys = entity_contributions.len(),
1106 entity_contributions_total_pairs = entity_contributions.values().map(|m| m.len()).sum::<usize>(),
1107 deferred_tbs_n = deferred_tbs.len(),
1108 ic_journal_entries_entries = ic_journal_entries.len(),
1109 ic_journal_entries_total_jes = ic_je_count,
1110 "c1_phase5 checkpoint"
1111 );
1112 }
1113 }
1114
1115 entities_missing.sort();
1116 finalise_streaming_aggregate(&mut pre_elim);
1117
1118 // v5.31 C1 Phase 5 — final per-component sizes after walk completes
1119 // (before IC matching runs).
1120 let final_rss_kb = read_self_rss_kb().unwrap_or(0);
1121 let final_ic_jes: usize = ic_journal_entries.iter().map(|(_, v)| v.len()).sum();
1122 tracing::info!(
1123 target: "datasynth_group::c1_phase5",
1124 rss_gb = final_rss_kb / 1024 / 1024,
1125 rss_mb = final_rss_kb / 1024,
1126 translated_tbs_n = translated_tbs.len(),
1127 translated_tbs_lines = translated_tbs.iter().map(|t| t.lines.len()).sum::<usize>(),
1128 entity_contributions_keys = entity_contributions.len(),
1129 entity_contributions_total_pairs = entity_contributions.values().map(|m| m.len()).sum::<usize>(),
1130 deferred_tbs_n = deferred_tbs.len(),
1131 ic_journal_entries_n = ic_journal_entries.len(),
1132 ic_journal_entries_total = final_ic_jes,
1133 pre_elim_accounts = pre_elim.account_totals.len(),
1134 "c1_phase5 walk-complete summary"
1135 );
1136
1137 // v5.31 C1 Phase 2: the writer stays open — the caller writes the
1138 // elim edges + finalises after IC matching produces them. We
1139 // hand the writer out of the walk via the outcome struct.
1140 Ok(StreamingWalkOutcome {
1141 pre_elim,
1142 translated_tbs,
1143 entity_contributions,
1144 deferred_tbs,
1145 ic_journal_entries,
1146 je_network_writer,
1147 entities_missing,
1148 })
1149}
1150
1151/// Translate every contributing entity's TB to the presentation
1152/// currency. Returns one [`TranslatedTb`] per `(entity_code, tb)` in
1153/// the contributing slice.
1154///
1155/// **v5.5.2 routing:** when `cpi_series_by_currency` is non-empty,
1156/// hyperinflationary entities whose functional currency matches an
1157/// entry in the map are translated via the indexed-restatement path
1158/// (IAS 29 § 12 + IAS 21 § 42(b)); hyperinflationary entities without a
1159/// matching series log a warning and fall back to the closing-rate-only
1160/// path. Non-hyperinflationary entities always use the standard IAS 21
1161/// multi-rate path regardless of the map.
1162///
1163/// **v5.31 C1**: superseded by [`translate_one_entity`] + the inline
1164/// per-entity translation in [`walk_aggregate_streaming`]. Kept for
1165/// the differential-test fixture path; marked dead-code-allowed.
1166#[allow(dead_code)]
1167fn translate_all_contributing(
1168 contributing_tbs: &[(String, TrialBalance)],
1169 manifest: &GroupManifest,
1170 framework: AccountingFramework,
1171 entity_lookup: &BTreeMap<String, ManifestEntity>,
1172 cpi_series_by_currency: &BTreeMap<
1173 String,
1174 datasynth_core::models::hyperinflation::GeneralPriceIndex,
1175 >,
1176) -> GroupResult<Vec<TranslatedTb>> {
1177 let mut out: Vec<TranslatedTb> = Vec::with_capacity(contributing_tbs.len());
1178 let cpi_opt = if cpi_series_by_currency.is_empty() {
1179 None
1180 } else {
1181 Some(cpi_series_by_currency)
1182 };
1183 for (code, tb) in contributing_tbs {
1184 let entity = entity_lookup.get(code).ok_or_else(|| {
1185 GroupError::Aggregate(format!(
1186 "run_aggregate: entity `{code}` not in manifest's ownership graph",
1187 ))
1188 })?;
1189 // v5.2 / v5.5.2: pick the IAS 21 / IAS 29 translation path per
1190 // entity based on its hyperinflation status + the optional CPI
1191 // series map. See [`select_restatement_path`] for the routing
1192 // logic.
1193 let path = select_restatement_path(
1194 entity.hyperinflation_status,
1195 entity.functional_currency.as_str(),
1196 cpi_opt,
1197 manifest.period.start,
1198 manifest.period.end,
1199 );
1200 let translated = match &path {
1201 RestatementPath::Indexed(ir) => translate_entity_tb_with_indexed_restatement(
1202 tb,
1203 entity.functional_currency.as_str(),
1204 &manifest.fx_rate_master,
1205 manifest.period.end,
1206 &manifest.presentation_currency,
1207 framework,
1208 entity.hyperinflation_status,
1209 Some(ir),
1210 )?,
1211 RestatementPath::Standard | RestatementPath::ClosingRate => {
1212 if matches!(path, RestatementPath::ClosingRate) && cpi_opt.is_some() {
1213 // Hyperinflationary entity but the CPI map missed
1214 // (no entry for currency, or lookup outside the
1215 // observation range). Warn so the operator can
1216 // notice the data gap, but continue with the
1217 // closing-rate fallback rather than failing the
1218 // aggregate run.
1219 tracing::warn!(
1220 entity = %code,
1221 functional_currency = %entity.functional_currency,
1222 period_start = %manifest.period.start,
1223 period_end = %manifest.period.end,
1224 "hyperinflationary entity has no matching CPI series — falling back to IAS 21 § 42(b) closing-rate translation only (no IAS 29 § 12 indexed restatement)",
1225 );
1226 }
1227 translate_entity_tb_with_hyperinflation(
1228 tb,
1229 entity.functional_currency.as_str(),
1230 &manifest.fx_rate_master,
1231 manifest.period.end,
1232 &manifest.presentation_currency,
1233 framework,
1234 entity.hyperinflation_status,
1235 )?
1236 }
1237 };
1238 out.push(translated);
1239 }
1240 Ok(out)
1241}
1242
1243/// Build a [`CtaRollforward`] for every non-presentation-currency
1244/// entity. Reads opening CTA from
1245/// `prior_period.consolidated/cta_rollforward.json` when supplied,
1246/// otherwise defaults to zero.
1247fn build_cta_rollforwards(
1248 translated_tbs: &[TranslatedTb],
1249 presentation_currency: &str,
1250 prior_period_aggregate: Option<&Path>,
1251) -> GroupResult<Vec<CtaRollforward>> {
1252 let opening_map = ingest_opening_cta_balances(prior_period_aggregate)?;
1253
1254 let mut rolls: Vec<CtaRollforward> = Vec::new();
1255 for t in translated_tbs {
1256 if t.functional_currency == presentation_currency {
1257 // No CTA for entities already in the presentation currency.
1258 continue;
1259 }
1260 let opening = opening_map
1261 .get(&t.entity_code)
1262 .copied()
1263 .unwrap_or(Decimal::ZERO);
1264 rolls.push(cta_rollforward(
1265 &t.entity_code,
1266 &t.functional_currency,
1267 &t.presentation_currency,
1268 opening,
1269 t.cta,
1270 ));
1271 }
1272 Ok(rolls)
1273}
1274
1275/// Read prior-period closing CTA balances by entity code. Mirrors
1276/// [`crate::aggregate::nci::opening::ingest_opening_nci_balances`]
1277/// semantics (missing file → empty map + warn, duplicate entity →
1278/// error).
1279fn ingest_opening_cta_balances(
1280 prior_period_aggregate: Option<&Path>,
1281) -> GroupResult<BTreeMap<String, Decimal>> {
1282 let Some(prior) = prior_period_aggregate else {
1283 return Ok(BTreeMap::new());
1284 };
1285 let path = prior
1286 .join(CONSOLIDATED_SUBDIR)
1287 .join(CTA_ROLLFORWARD_FILENAME);
1288 if !path.exists() {
1289 tracing::warn!(
1290 path = %path.display(),
1291 "opening CTA file not found; defaulting to zero opening balance per entity",
1292 );
1293 return Ok(BTreeMap::new());
1294 }
1295 let bytes = std::fs::read(&path).map_err(GroupError::Io)?;
1296 let rolls: Vec<CtaRollforward> = serde_json::from_slice(&bytes)?;
1297 let mut map: BTreeMap<String, Decimal> = BTreeMap::new();
1298 for rf in rolls {
1299 if map.contains_key(&rf.entity_code) {
1300 return Err(GroupError::Aggregate(format!(
1301 "ingest_opening_cta_balances: duplicate entity `{}` in opening CTA file {}",
1302 rf.entity_code,
1303 path.display(),
1304 )));
1305 }
1306 map.insert(rf.entity_code, rf.closing_cta);
1307 }
1308 Ok(map)
1309}
1310
1311/// Walk every contributing shard's
1312/// `accounting_standards/business_combinations.json` and build a
1313/// map of `acquiree_entity_code → acquisition_date_nci_fair_value`
1314/// for use as period-1 opening NCI in the rollforward (v5.2 IFRS
1315/// 3.19(a) full-goodwill basis).
1316///
1317/// - Missing per-shard files are tolerated (most engagements don't
1318/// have an acquisition every period).
1319/// - BC records without `acquiree_entity_code` or without a fair
1320/// value contribute nothing — those acquisitions either have no
1321/// consolidated counterpart or use the proportionate basis.
1322/// - When two shards both list a BC for the same acquiree, the
1323/// first one wins (deterministic — entities iterate in manifest
1324/// order).
1325fn ingest_acquisition_date_nci_fair_values(
1326 manifest: &GroupManifest,
1327 shards_dir: &Path,
1328) -> BTreeMap<String, Decimal> {
1329 let mut map: BTreeMap<String, Decimal> = BTreeMap::new();
1330 for entity in &manifest.ownership_graph.entities {
1331 let path = shards_dir
1332 .join("entities")
1333 .join(&entity.code)
1334 .join("accounting_standards")
1335 .join("business_combinations.json");
1336 if !path.exists() {
1337 continue;
1338 }
1339 let bytes = match fs::read(&path) {
1340 Ok(b) => b,
1341 Err(e) => {
1342 tracing::debug!(
1343 path = %path.display(),
1344 error = %e,
1345 "failed to read business_combinations.json — skipping",
1346 );
1347 continue;
1348 }
1349 };
1350 let combinations: Vec<BusinessCombination> = match serde_json::from_slice(&bytes) {
1351 Ok(v) => v,
1352 Err(e) => {
1353 tracing::debug!(
1354 path = %path.display(),
1355 error = %e,
1356 "failed to parse business_combinations.json — skipping",
1357 );
1358 continue;
1359 }
1360 };
1361 for bc in combinations {
1362 if let (Some(acquiree), Some(fv)) = (
1363 bc.acquiree_entity_code.as_deref(),
1364 bc.acquisition_date_nci_fair_value,
1365 ) {
1366 map.entry(acquiree.to_string()).or_insert(fv);
1367 }
1368 }
1369 }
1370 map
1371}
1372
1373/// **v5.2** — Walk every entity's per-shard archive, read its
1374/// `intercompany/ownership_change_events.json` (PR #155 emission),
1375/// and build an `entity_code → events` map for the NCI rollforward
1376/// to consume.
1377///
1378/// Best-effort: malformed or missing files are skipped with a
1379/// `tracing::debug!` rather than failing the aggregate — engagements
1380/// without ownership-change events have no file at all, and that's
1381/// the v5.0–v5.1 byte-identical baseline. A malformed events file
1382/// downgrades to "no events for this entity" rather than a hard
1383/// error, mirroring the BC ingestor pattern.
1384fn ingest_ownership_change_events(
1385 manifest: &GroupManifest,
1386 shards_dir: &Path,
1387) -> BTreeMap<String, Vec<datasynth_core::models::intercompany::OwnershipChangeEvent>> {
1388 let mut map: BTreeMap<String, Vec<datasynth_core::models::intercompany::OwnershipChangeEvent>> =
1389 BTreeMap::new();
1390 for entity in &manifest.ownership_graph.entities {
1391 let path = shards_dir
1392 .join("entities")
1393 .join(&entity.code)
1394 .join("intercompany")
1395 .join("ownership_change_events.json");
1396 if !path.exists() {
1397 continue;
1398 }
1399 let bytes = match fs::read(&path) {
1400 Ok(b) => b,
1401 Err(e) => {
1402 tracing::debug!(
1403 path = %path.display(),
1404 error = %e,
1405 "failed to read ownership_change_events.json — skipping",
1406 );
1407 continue;
1408 }
1409 };
1410 let events: Vec<datasynth_core::models::intercompany::OwnershipChangeEvent> =
1411 match serde_json::from_slice(&bytes) {
1412 Ok(v) => v,
1413 Err(e) => {
1414 tracing::debug!(
1415 path = %path.display(),
1416 error = %e,
1417 "failed to parse ownership_change_events.json — skipping",
1418 );
1419 continue;
1420 }
1421 };
1422 if !events.is_empty() {
1423 map.insert(entity.code.clone(), events);
1424 }
1425 }
1426 map
1427}
1428
1429/// Build an [`NciRollforward`] for every Full-method, non-wholly-owned
1430/// entity. v5.0 sources the period P&L / OCI numbers from the entity's
1431/// translated TB; dividends paid is left at zero (the manifest does not
1432/// yet model it).
1433fn build_nci_rollforwards(
1434 manifest: &GroupManifest,
1435 translated_tbs: &[TranslatedTb],
1436 prior_period_aggregate: Option<&Path>,
1437 acquisition_fv_map: &BTreeMap<String, Decimal>,
1438 ownership_changes_map: &BTreeMap<
1439 String,
1440 Vec<datasynth_core::models::intercompany::OwnershipChangeEvent>,
1441 >,
1442) -> GroupResult<Vec<NciRollforward>> {
1443 let opening_map = match prior_period_aggregate {
1444 Some(p) => ingest_opening_nci_balances(p)?,
1445 None => BTreeMap::new(),
1446 };
1447
1448 let translated_lookup: BTreeMap<&str, &TranslatedTb> = translated_tbs
1449 .iter()
1450 .map(|t| (t.entity_code.as_str(), t))
1451 .collect();
1452
1453 let mut rolls: Vec<NciRollforward> = Vec::new();
1454 for entity in &manifest.ownership_graph.entities {
1455 if entity.consolidation_method != ConsolidationMethod::Full {
1456 continue;
1457 }
1458 let Some(ownership) = entity.ownership_percent else {
1459 continue;
1460 };
1461 if ownership >= Decimal::ONE {
1462 // Wholly-owned `Full` entities have no NCI to measure. We
1463 // skip them silently here rather than surfacing an error
1464 // because mini_acme.yaml has both 100%-owned (ACME_USA,
1465 // ACME_BR) and partially-owned (ACME_DE 80%) Full
1466 // subsidiaries. The NCI rollforward computer would reject
1467 // 100% ownership as a caller bug, so the filter must happen
1468 // at the driver level.
1469 continue;
1470 }
1471 let translated = translated_lookup.get(entity.code.as_str()).ok_or_else(|| {
1472 GroupError::Aggregate(format!(
1473 "run_aggregate: NCI computation needs translated TB for `{}` but none was produced",
1474 entity.code,
1475 ))
1476 })?;
1477
1478 let inputs = NciInputs {
1479 entity,
1480 period_net_income: net_income_from_translated(translated),
1481 period_oci: oci_from_translated(translated),
1482 total_dividends_paid: Decimal::ZERO,
1483 opening_nci: opening_map
1484 .get(&entity.code)
1485 .copied()
1486 .unwrap_or(Decimal::ZERO),
1487 // v5.2: acquisition-date NCI fair value (IFRS 3.19(a))
1488 // looked up from the per-shard
1489 // `accounting_standards/business_combinations.json` files
1490 // by `acquiree_entity_code`. Engagements without an
1491 // acquisition record produce `None`, preserving v5.0–v5.1
1492 // proportionate-basis behaviour byte-for-byte.
1493 acquisition_date_nci_fair_value: acquisition_fv_map.get(&entity.code).copied(),
1494 ownership_changes: ownership_changes_map
1495 .get(&entity.code)
1496 .map(|v| v.as_slice())
1497 .unwrap_or(&[]),
1498 period_start: manifest.period.end,
1499 period_end: manifest.period.end,
1500 currency: manifest.presentation_currency.clone(),
1501 };
1502 rolls.push(compute_nci_rollforward(&inputs)?);
1503 }
1504 Ok(rolls)
1505}
1506
1507/// Build an [`EquityMethodInvestment`] for every EquityMethod investee
1508/// that has a deferred shard TB on disk. Sources `investee_net_income`
1509/// and `investee_dividends_paid` from the deferred TB; impairment is
1510/// left at zero (the manifest does not yet model it).
1511fn build_equity_method_investments(
1512 manifest: &GroupManifest,
1513 deferred_tbs: &[(String, TrialBalance)],
1514 framework: AccountingFramework,
1515 prior_period_aggregate: Option<&Path>,
1516) -> GroupResult<Vec<EquityMethodInvestment>> {
1517 let (opening_map, opening_suppressed_map) = match prior_period_aggregate {
1518 Some(p) => (
1519 ingest_opening_equity_method_carrying_values(p)?,
1520 ingest_opening_suppressed_losses(p)?,
1521 ),
1522 None => (BTreeMap::new(), BTreeMap::new()),
1523 };
1524
1525 let deferred_lookup: BTreeMap<&str, &TrialBalance> = deferred_tbs
1526 .iter()
1527 .map(|(c, tb)| (c.as_str(), tb))
1528 .collect();
1529
1530 let mut invs: Vec<EquityMethodInvestment> = Vec::new();
1531 for entity in &manifest.ownership_graph.entities {
1532 if entity.consolidation_method != ConsolidationMethod::EquityMethod {
1533 continue;
1534 }
1535 let Some(parent_code) = &entity.parent_code else {
1536 // No parent declared — equity-method requires an investor
1537 // entity. Skip silently rather than erroring since v5.0's
1538 // upstream validators already reject ownership graphs
1539 // missing parent declarations.
1540 continue;
1541 };
1542
1543 // The investee's deferred TB may be absent if its shard was
1544 // missing and `tolerate_missing_shards` was enabled. Treat
1545 // that as "no income / dividends this period" (zero) so the
1546 // rollforward still produces a stable opening → opening record.
1547 let (investee_net_income, investee_dividends_paid) =
1548 match deferred_lookup.get(entity.code.as_str()) {
1549 Some(tb) => (net_income_from_tb(tb, framework), dividends_from_tb(tb)),
1550 None => (Decimal::ZERO, Decimal::ZERO),
1551 };
1552
1553 let inputs = EquityMethodInputs {
1554 investee: entity,
1555 investor_entity_code: parent_code.clone(),
1556 investee_net_income,
1557 investee_dividends_paid,
1558 opening_carrying_value: opening_map
1559 .get(&entity.code)
1560 .copied()
1561 .unwrap_or(Decimal::ZERO),
1562 opening_suppressed_loss: opening_suppressed_map
1563 .get(&entity.code)
1564 .copied()
1565 .unwrap_or(Decimal::ZERO),
1566 impairment: Decimal::ZERO,
1567 period_end: manifest.period.end,
1568 currency: manifest.presentation_currency.clone(),
1569 };
1570 invs.push(compute_equity_method_investment(&inputs)?);
1571 }
1572 Ok(invs)
1573}
1574
1575/// Resolve the presentation framework once. Maps the manifest's
1576/// primary CoA framework string back to the typed
1577/// [`AccountingFramework`]. Anything unrecognised falls through to the
1578/// default (UsGaap) — matches the [`AccountingFramework::Default`] impl.
1579fn resolve_primary_framework(manifest: &GroupManifest) -> AccountingFramework {
1580 let label = manifest
1581 .chart_of_accounts_master
1582 .primary_framework
1583 .to_lowercase();
1584 match label.as_str() {
1585 "ifrs" => AccountingFramework::Ifrs,
1586 "us_gaap" | "usgaap" | "us-gaap" => AccountingFramework::UsGaap,
1587 "dual_reporting" | "dual" => AccountingFramework::DualReporting,
1588 "french_gaap" | "frenchgaap" | "pcg" => AccountingFramework::FrenchGaap,
1589 "german_gaap" | "germangaap" | "hgb" => AccountingFramework::GermanGaap,
1590 _ => AccountingFramework::default(),
1591 }
1592}
1593
1594/// O(1) lookup of `entity_code → ManifestEntity` from the manifest's
1595/// ownership graph. Built once per `run_aggregate` call and shared
1596/// across the per-entity translation step.
1597fn entity_lookup(manifest: &GroupManifest) -> BTreeMap<String, ManifestEntity> {
1598 manifest
1599 .ownership_graph
1600 .entities
1601 .iter()
1602 .map(|e| (e.code.clone(), e.clone()))
1603 .collect()
1604}
1605
1606/// Sum a translated TB's revenue minus expense (P&L lines) to derive
1607/// the entity's translated period net income. Sign convention: revenue
1608/// (credit-natural) is added, expenses (debit-natural) subtracted, so
1609/// the result is positive on profit, negative on loss.
1610fn net_income_from_translated(translated: &TranslatedTb) -> Decimal {
1611 use crate::aggregate::translation::classify::TranslationAccountType as T;
1612 let mut net = Decimal::ZERO;
1613 for line in &translated.lines {
1614 let signed = match line.local_dr_cr {
1615 DrCr::Debit => line.translated_amount,
1616 DrCr::Credit => -line.translated_amount,
1617 };
1618 match line.account_type {
1619 // Revenue is credit-natural → contributes positively to net
1620 // income. We negate the signed (debit positive, credit
1621 // negative) value so a CR revenue posting increments NI.
1622 T::PlRevenue => net -= signed,
1623 // Expense is debit-natural → subtracts from net income. A
1624 // DR expense posting reduces NI; we subtract the (positive)
1625 // signed value.
1626 T::PlExpense => net -= signed,
1627 _ => {}
1628 }
1629 }
1630 net
1631}
1632
1633/// Sum the OCI lines from a translated TB. Sign convention follows
1634/// IAS 1 — OCI gains are credit-natural so we negate the signed amount.
1635fn oci_from_translated(translated: &TranslatedTb) -> Decimal {
1636 use crate::aggregate::translation::classify::TranslationAccountType as T;
1637 let mut oci = Decimal::ZERO;
1638 for line in &translated.lines {
1639 if line.account_type == T::PlOci {
1640 let signed = match line.local_dr_cr {
1641 DrCr::Debit => line.translated_amount,
1642 DrCr::Credit => -line.translated_amount,
1643 };
1644 oci -= signed;
1645 }
1646 }
1647 oci
1648}
1649
1650/// Equivalent of [`net_income_from_translated`] but for an untranslated
1651/// [`TrialBalance`] — used for equity-method investees whose deferred
1652/// TB is consumed before translation.
1653///
1654/// Classifies each line by GL account → translation type so we can
1655/// pick out P&L revenue / expense lines without relying on the
1656/// AccountType enum (which the orchestrator may have populated with
1657/// different values for non-IFRS frameworks).
1658fn net_income_from_tb(tb: &TrialBalance, framework: AccountingFramework) -> Decimal {
1659 use crate::aggregate::translation::classify::{classify_account, TranslationAccountType as T};
1660 let mut net = Decimal::ZERO;
1661 for line in &tb.lines {
1662 let ty = classify_account(&line.account_code, framework);
1663 match ty {
1664 T::PlRevenue => net += line.credit_balance - line.debit_balance,
1665 T::PlExpense => net -= line.debit_balance - line.credit_balance,
1666 _ => {}
1667 }
1668 }
1669 net
1670}
1671
1672/// Sum the dividends-paid balance from a TB. Dividends paid lives in
1673/// the equity sub-account `equity_accounts::DIVIDENDS_PAID` (3500 in
1674/// some charts; the named constant is the source of truth). We look up
1675/// that exact code rather than scan a range to avoid pulling in
1676/// unrelated equity movements.
1677fn dividends_from_tb(tb: &TrialBalance) -> Decimal {
1678 use datasynth_core::accounts::equity_accounts;
1679 tb.lines
1680 .iter()
1681 .filter(|l| l.account_code == equity_accounts::DIVIDENDS_PAID)
1682 .map(|l| l.debit_balance - l.credit_balance)
1683 .fold(Decimal::ZERO, |acc, v| acc + v)
1684}
1685
1686// ── Unit tests ────────────────────────────────────────────────────────────────
1687//
1688// The driver itself is end-to-end tested by `tests/aggregate_e2e.rs`
1689// (which exercises the full pipeline against a Mini-Acme fixture).
1690// `resolve_primary_framework` is the only branch worth exercising at
1691// the unit-test layer; the remainder of the helpers are linear glue
1692// over already-tested sub-modules.
1693
1694// ── v5.3 multi-period chain ────────────────────────────────────────────────
1695
1696/// One period's input to [`run_aggregate_chain`]. Each `PeriodSpec`
1697/// describes a single aggregate run — its own manifest, the shards
1698/// dir to read from, and the out_dir to write to.
1699///
1700/// The chain runner inserts `prior_period_aggregate` automatically:
1701/// the N-th period's `out_dir` becomes the (N+1)-th's
1702/// `prior_period_aggregate`. The first period uses the value the
1703/// caller set on its `AggregateOptions` (typically `None`).
1704#[derive(Debug, Clone)]
1705pub struct PeriodSpec {
1706 /// Manifest for this period. Each period has its own manifest
1707 /// because IAS 21 closing rates, scoping, and entity ownership
1708 /// can change period-on-period.
1709 pub manifest: GroupManifest,
1710 /// Per-entity shard directory root for this period.
1711 pub shards_dir: PathBuf,
1712 /// Where to write this period's consolidated artefacts.
1713 pub out_dir: PathBuf,
1714 /// Per-period options. `prior_period_aggregate` is overridden
1715 /// by the chain runner — set it on the FIRST period to seed the
1716 /// chain, and leave it `None` on subsequent periods.
1717 pub options: AggregateOptions,
1718}
1719
1720/// **v5.3** — drive multi-period consolidation by chaining
1721/// [`run_aggregate`] calls in sequence. Period N's `out_dir`
1722/// becomes period N+1's `prior_period_aggregate`, so opening NCI /
1723/// CTA / equity-method carrying values stitch automatically across
1724/// the chain.
1725///
1726/// v5.0–v5.2 already supported manual chaining via the
1727/// `--prior-period-aggregate` flag (PRs #141, #146 used it for the
1728/// acquisition-date NCI rollforward). This helper formalises that
1729/// pattern so callers don't have to weave the paths by hand.
1730///
1731/// # Behaviour
1732///
1733/// - Periods run in the order they appear in `periods`.
1734/// - The first period uses its own `options.prior_period_aggregate`
1735/// (typically `None` for a fresh engagement, `Some(path)` when
1736/// resuming from an external archive).
1737/// - Each subsequent period gets `options.prior_period_aggregate`
1738/// **forcibly overridden** to the previous period's `out_dir` —
1739/// any value the caller set on `periods[n].options` for n ≥ 1 is
1740/// silently ignored. This keeps the chain semantics intuitive.
1741/// - On any per-period failure the chain stops, returning the error
1742/// from `run_aggregate` for that period. Output already written
1743/// for prior periods is preserved on disk.
1744///
1745/// # Returns
1746///
1747/// A `Vec<AggregateSummary>` matching `periods` in input order.
1748///
1749/// # Errors
1750///
1751/// Whatever [`run_aggregate`] returns for the first failing period.
1752pub fn run_aggregate_chain(periods: Vec<PeriodSpec>) -> GroupResult<Vec<AggregateSummary>> {
1753 let mut summaries: Vec<AggregateSummary> = Vec::with_capacity(periods.len());
1754 let mut prior_out: Option<PathBuf> = None;
1755 for (idx, mut spec) in periods.into_iter().enumerate() {
1756 // For periods 1..N, override prior_period_aggregate with the
1757 // previous period's out_dir. Period 0 keeps whatever the
1758 // caller set.
1759 if idx > 0 {
1760 spec.options.prior_period_aggregate = prior_out.clone();
1761 }
1762 let summary = run_aggregate(
1763 &spec.manifest,
1764 &spec.shards_dir,
1765 &spec.out_dir,
1766 &spec.options,
1767 )?;
1768 prior_out = Some(spec.out_dir.clone());
1769 summaries.push(summary);
1770 }
1771 Ok(summaries)
1772}
1773
1774#[cfg(test)]
1775mod tests {
1776 use super::resolve_primary_framework;
1777 use super::PeriodSpec;
1778 use std::path::PathBuf;
1779
1780 #[test]
1781 fn period_spec_construction_uses_owned_paths() {
1782 // PeriodSpec must be constructible with owned PathBuf fields
1783 // and a real `AggregateOptions` value — the public contract
1784 // the chain runner consumes. The actual chain execution
1785 // lives in `tests/aggregate_e2e.rs` once a multi-period
1786 // fixture is loaded; here we just pin the field shape so
1787 // refactors that move fields surface as a compile break.
1788 let opts = super::AggregateOptions {
1789 prior_period_aggregate: Some(PathBuf::from("/tmp/seed-from-engagement")),
1790 tolerate_missing_shards: false,
1791 cgu_test_inputs: Vec::new(),
1792 cpi_series_by_currency: std::collections::BTreeMap::new(),
1793 };
1794 assert!(opts.prior_period_aggregate.is_some());
1795 // PeriodSpec stays unconstructed in a runnable form because
1796 // `GroupManifest` requires a fully-populated graph; we keep
1797 // the type imported so a refactor that hides the type breaks
1798 // the test.
1799 let _: fn() -> PeriodSpec;
1800 }
1801 use crate::aggregate::driver::AggregateOptions;
1802 use datasynth_standards::framework::AccountingFramework;
1803
1804 #[test]
1805 fn aggregate_options_default_is_fail_fast_no_prior() {
1806 let opts = AggregateOptions::default();
1807 assert!(!opts.tolerate_missing_shards, "default fails fast");
1808 assert!(opts.prior_period_aggregate.is_none(), "no prior period");
1809 }
1810
1811 #[test]
1812 fn _resolve_primary_framework_is_pure_string_match() {
1813 // Type-only sanity: keep `resolve_primary_framework` in scope so
1814 // a refactor that drops a variant breaks the build instead of
1815 // silently falling through to `AccountingFramework::default()`.
1816 // The full label-vs-enum mapping is exercised in
1817 // `tests/aggregate_e2e.rs` once a fixture is loaded.
1818 let _ = resolve_primary_framework;
1819 let _ = AccountingFramework::default();
1820 }
1821}