Skip to main content

datasynth_group/aggregate/
pre_elim.rs

1//! Pre-elimination trial balance aggregation — Task 5.2.
2//!
3//! After [`crate::aggregate::tb_loader::load_entity_trial_balance`] has
4//! produced a balanced standalone [`TrialBalance`] for every entity, the
5//! aggregate phase needs to combine them into a single group-level
6//! pre-elimination TB.  This module is that combiner.
7//!
8//! # v5.0 scope
9//!
10//! - **Presentation currency only.**  Every contributing TB must already
11//!   be denominated in the manifest's `presentation_currency`.  IAS 21
12//!   currency translation is Chunk 6 — until it lands, callers must
13//!   either configure all entities with a matching functional currency
14//!   (Mini-Acme fixture: every entity CHF) or run the no-op identity
15//!   translation explicitly.
16//!
17//! - **Parent + Full only.**  Equity-method, fair-value, and
18//!   proportional consolidation entities are *not* summed in.  They are
19//!   captured in [`AggregatedTb::deferred_entities`] for Chunk 7 to
20//!   process via the equity / proportional consolidation engine
21//!   (one-line equity pickup, share-of-net-assets, etc.).
22//!
23//! - **Pre-elimination.**  IC matching (Task 5.3) and elimination
24//!   journal posting (Task 5.4) operate on the [`AggregatedTb`] this
25//!   function produces; the post-elimination TB (Task 5.6) is the same
26//!   shape with elimination JEs applied.
27//!
28//! # Determinism
29//!
30//! `account_totals` is a [`BTreeMap`] keyed by GL account code so two
31//! runs over the same input serialise to byte-identical JSON.
32//! `contributing_entities` and `deferred_entities` are sorted
33//! lexicographically by entity code for the same reason — the caller
34//! may walk `entity_tbs` in any order without affecting downstream
35//! diffability.
36//!
37//! # Errors
38//!
39//! All failures surface as [`GroupError::Aggregate`] with a message
40//! that names the offending entity / currency, so a grep over the
41//! aggregate-phase log pinpoints which entity directory broke the
42//! invariant.
43
44use std::collections::BTreeMap;
45
46use datasynth_core::models::balance::{AccountType, TrialBalance};
47use rust_decimal::Decimal;
48use serde::{Deserialize, Serialize};
49
50use crate::config::ConsolidationMethod;
51use crate::errors::{GroupError, GroupResult};
52use crate::manifest::builder::GroupManifest;
53
54// ── Public types ──────────────────────────────────────────────────────────────
55
56/// Pre-elimination aggregated trial balance.
57///
58/// Sum of `Parent` + `Full` consolidation entities' per-account
59/// balances, plus a sidecar list of entities that were held aside for
60/// Chunk 7 (equity-method, proportional, fair-value).  Tasks 5.3
61/// (IC matcher), 5.4 (elimination engine), and 5.6 (post-elim TB)
62/// build directly on this shape.
63#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
64pub struct AggregatedTb {
65    /// Group identifier — copied from [`GroupManifest::group_id`].
66    pub group_id: String,
67    /// Presentation currency — copied from
68    /// [`GroupManifest::presentation_currency`] (ISO 4217).
69    pub currency: String,
70    /// Period end date the aggregation is as of.  Taken from
71    /// `GroupManifest::period::end`.
72    pub as_of_date: chrono::NaiveDate,
73    /// Per-account totals across contributing entities, keyed by GL
74    /// account code.  [`BTreeMap`] guarantees deterministic iteration
75    /// order for downstream serialisation.
76    pub account_totals: BTreeMap<String, AggregatedAccount>,
77    /// Entity codes whose TBs were summed in (Parent + Full), sorted
78    /// lexicographically.
79    pub contributing_entities: Vec<String>,
80    /// Entity codes held back for Chunk 7 special-method consolidation
81    /// (equity-method, proportional, fair-value), sorted
82    /// lexicographically by entity code.
83    pub deferred_entities: Vec<DeferredEntity>,
84    /// Sum of `total_debits` across all contributing TBs.
85    pub total_debits: Decimal,
86    /// Sum of `total_credits` across all contributing TBs.
87    pub total_credits: Decimal,
88}
89
90/// Per-account aggregated balance.
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
92pub struct AggregatedAccount {
93    /// GL account code (matches the key in [`AggregatedTb::account_totals`]).
94    pub account_code: String,
95    /// Sum of `TrialBalanceLine::debit_balance` across contributing
96    /// entities.
97    pub debit_total: Decimal,
98    /// Sum of `TrialBalanceLine::credit_balance` across contributing
99    /// entities.
100    pub credit_total: Decimal,
101    /// Net balance (`debit_total - credit_total`).  Sign preserved so
102    /// debit-natured accounts read positive and credit-natured accounts
103    /// read negative — downstream (financial-statement assembly,
104    /// elimination engine) can rely on the sign.
105    pub net_balance: Decimal,
106    /// Number of contributing entities that posted to this account.
107    /// Diagnostic only — neither IC matching nor elimination depends
108    /// on it.
109    pub contributing_entities: u32,
110    /// Framework-aware account type carried up from the contributing
111    /// [`TrialBalanceLine::account_type`] (set by the per-entity
112    /// orchestrator's `PeriodTrialBalance::into_canonical` against
113    /// [`datasynth_core::framework_accounts::FrameworkAccounts::classify_account_type`]).
114    /// First-occurrence wins; in practice the same GL code does not
115    /// appear under conflicting types across entities since the chart
116    /// numbering is country-pack-driven.
117    ///
118    /// `#[serde(default)]` to keep older on-disk archives (pre-v5.33.1)
119    /// deserialisable.
120    #[serde(default)]
121    pub account_type: AccountType,
122}
123
124/// An entity held back from the [`Parent`] / [`Full`] aggregation for
125/// Chunk-7 special-method consolidation.
126///
127/// [`Parent`]: ConsolidationMethod::Parent
128/// [`Full`]: ConsolidationMethod::Full
129#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
130pub struct DeferredEntity {
131    /// Entity code from [`crate::manifest::builder::ManifestEntity::code`].
132    pub entity_code: String,
133    /// The consolidation method that held this entity back.  Carried
134    /// through so Chunk 7 can route to the equity vs proportional
135    /// vs fair-value branch without re-walking the manifest.
136    pub method: ConsolidationMethod,
137}
138
139// ── Public API ────────────────────────────────────────────────────────────────
140
141/// Aggregate a slice of `(entity_code, TrialBalance)` pairs into the
142/// pre-elimination group TB described by [`AggregatedTb`].
143///
144/// The caller already knows the entity-code → TB mapping from how it
145/// walked the manifest (the shard runner writes per-entity TBs under
146/// directories keyed by entity code, but `TrialBalance::company_code`
147/// is set from the per-entity orchestrator config and may not equal
148/// the manifest entity code in all setups).  Passing the pair
149/// explicitly makes the contract obvious and avoids relying on a
150/// fragile naming convention.
151///
152/// # Behaviour
153///
154/// 1. Each `entity_code` must resolve to a [`crate::manifest::builder::ManifestEntity`]
155///    in `manifest.ownership_graph.entities`.  Unknown codes error.
156/// 2. Each contributing TB must have `currency ==
157///    manifest.presentation_currency`.  Mismatches error (translation
158///    is Chunk 6).
159/// 3. Entities with [`ConsolidationMethod::Parent`] or
160///    [`ConsolidationMethod::Full`] are summed into `account_totals`,
161///    `total_debits`, and `total_credits`.
162/// 4. Entities with [`ConsolidationMethod::EquityMethod`],
163///    [`ConsolidationMethod::Proportional`], or
164///    [`ConsolidationMethod::FairValue`] are appended to
165///    `deferred_entities` and *not* summed.
166/// 5. `contributing_entities` and `deferred_entities` are sorted
167///    lexicographically before return so two callers that walk the
168///    same manifest in different orders produce byte-identical
169///    serialised output.
170/// 6. Empty `entity_tbs` is *not* an error: the function returns an
171///    [`AggregatedTb`] with empty maps and zero totals.  The aggregate
172///    driver may legitimately invoke the combiner with an empty list
173///    during recovery scenarios.
174///
175/// # Errors
176///
177/// - [`GroupError::Aggregate`] if any `entity_code` is missing from
178///   the manifest's ownership graph.
179/// - [`GroupError::Aggregate`] if any contributing TB has a currency
180///   that does not match `manifest.presentation_currency`.
181pub fn aggregate_pre_elimination(
182    manifest: &GroupManifest,
183    entity_tbs: &[(String, TrialBalance)],
184) -> GroupResult<AggregatedTb> {
185    // Empty input → empty aggregate.  Callers may legitimately invoke
186    // the combiner with no entities (recovery / dry-run paths).
187    let mut agg = AggregatedTb {
188        group_id: manifest.group_id.clone(),
189        currency: manifest.presentation_currency.clone(),
190        as_of_date: manifest.period.end,
191        account_totals: BTreeMap::new(),
192        contributing_entities: Vec::new(),
193        deferred_entities: Vec::new(),
194        total_debits: Decimal::ZERO,
195        total_credits: Decimal::ZERO,
196    };
197
198    for (entity_code, tb) in entity_tbs {
199        let method = lookup_consolidation_method(manifest, entity_code)?;
200
201        match method {
202            ConsolidationMethod::Parent | ConsolidationMethod::Full => {
203                ensure_currency_matches(manifest, entity_code, tb)?;
204                accumulate_tb(&mut agg, entity_code, tb);
205            }
206            ConsolidationMethod::EquityMethod
207            | ConsolidationMethod::Proportional
208            | ConsolidationMethod::FairValue => {
209                agg.deferred_entities.push(DeferredEntity {
210                    entity_code: entity_code.clone(),
211                    method,
212                });
213            }
214        }
215    }
216
217    // Deterministic ordering for downstream serialisation.
218    agg.contributing_entities.sort();
219    agg.deferred_entities
220        .sort_by(|a, b| a.entity_code.cmp(&b.entity_code));
221
222    Ok(agg)
223}
224
225/// v5.31 C1 — streaming entry point.
226///
227/// Accumulate a single contributing entity's TB into an existing
228/// [`AggregatedTb`].  Mirrors one iteration of
229/// [`aggregate_pre_elimination`]'s loop body so the driver can call
230/// this per-entity inside its streaming walk, dropping each source TB
231/// immediately after accumulation — avoiding the 100-200 GB
232/// `Vec<(String, TrialBalance)>` hold that OOM-killed the
233/// 2000-entity regen.
234///
235/// Behaviour mirrors the in-loop logic at lines 186-203:
236/// - `Parent` / `Full` → currency-check, accumulate into account_totals,
237///   record in `contributing_entities` and bump `total_debits` /
238///   `total_credits`.
239/// - `EquityMethod` / `Proportional` / `FairValue` → push a
240///   `DeferredEntity` for Chunk 7 special-method handling.
241///
242/// The caller is responsible for the final sort of
243/// `contributing_entities` and `deferred_entities` (do this once after
244/// the streaming walk completes — see [`finalise_streaming_aggregate`]).
245///
246/// # Errors
247///
248/// - [`GroupError::Aggregate`] if `entity_code` is not in the manifest's
249///   ownership graph (mirror of [`aggregate_pre_elimination`]).
250pub fn accumulate_entity_into_aggregate(
251    agg: &mut AggregatedTb,
252    manifest: &GroupManifest,
253    entity_code: &str,
254    tb: &TrialBalance,
255) -> GroupResult<()> {
256    let method = lookup_consolidation_method(manifest, entity_code)?;
257    match method {
258        ConsolidationMethod::Parent | ConsolidationMethod::Full => {
259            ensure_currency_matches(manifest, entity_code, tb)?;
260            accumulate_tb(agg, entity_code, tb);
261        }
262        ConsolidationMethod::EquityMethod
263        | ConsolidationMethod::Proportional
264        | ConsolidationMethod::FairValue => {
265            agg.deferred_entities.push(DeferredEntity {
266                entity_code: entity_code.to_string(),
267                method,
268            });
269        }
270    }
271    Ok(())
272}
273
274/// v5.31 C1 — finalise an [`AggregatedTb`] populated incrementally via
275/// [`accumulate_entity_into_aggregate`].
276///
277/// Sorts `contributing_entities` and `deferred_entities` lexicographically
278/// to match [`aggregate_pre_elimination`]'s deterministic ordering
279/// contract.  Call this once, after the streaming walk completes.
280pub fn finalise_streaming_aggregate(agg: &mut AggregatedTb) {
281    agg.contributing_entities.sort();
282    agg.deferred_entities
283        .sort_by(|a, b| a.entity_code.cmp(&b.entity_code));
284}
285
286/// v5.31 C1 — construct an empty [`AggregatedTb`] seeded from the
287/// manifest.  Used as the starting accumulator for the streaming walk.
288pub fn empty_aggregate(manifest: &GroupManifest) -> AggregatedTb {
289    AggregatedTb {
290        group_id: manifest.group_id.clone(),
291        currency: manifest.presentation_currency.clone(),
292        as_of_date: manifest.period.end,
293        account_totals: BTreeMap::new(),
294        contributing_entities: Vec::new(),
295        deferred_entities: Vec::new(),
296        total_debits: Decimal::ZERO,
297        total_credits: Decimal::ZERO,
298    }
299}
300
301// ── Helpers ───────────────────────────────────────────────────────────────────
302
303/// Look up `entity_code` in the manifest's ownership graph and return
304/// its [`ConsolidationMethod`].
305///
306/// Errors with [`GroupError::Aggregate`] if the code is not present,
307/// naming the missing code so the aggregate-phase log pinpoints the
308/// drift between the loader's input list and the manifest snapshot.
309fn lookup_consolidation_method(
310    manifest: &GroupManifest,
311    entity_code: &str,
312) -> GroupResult<ConsolidationMethod> {
313    manifest
314        .ownership_graph
315        .entities
316        .iter()
317        .find(|e| e.code == entity_code)
318        .map(|e| e.consolidation_method)
319        .ok_or_else(|| {
320            GroupError::Aggregate(format!(
321                "aggregate_pre_elimination: entity `{entity_code}` not in manifest"
322            ))
323        })
324}
325
326/// Verify the contributing TB is denominated in the manifest's
327/// presentation currency.  Mismatches surface as
328/// [`GroupError::Aggregate`] with a message that names the entity, the
329/// TB currency, and the expected currency — so a grep over the log
330/// pinpoints exactly which entity needs translating.
331fn ensure_currency_matches(
332    manifest: &GroupManifest,
333    entity_code: &str,
334    tb: &TrialBalance,
335) -> GroupResult<()> {
336    // v5.0 contract update: per-entity TBs from the orchestrator are
337    // emitted in their *functional* currency, NOT the group's
338    // presentation currency. The IAS 21 translation step (Chunk 6) is
339    // intentionally NOT run inline by the aggregate-phase driver in
340    // v5.0 — the translated worksheet is emitted as a separate
341    // artefact (`consolidated/translation_worksheet.json`,
342    // `cta_rollforward.json`). The pre-elim aggregation here is purely
343    // an additive sum across entities and does not require single-
344    // currency input to produce the consolidated trial-balance numbers
345    // the rest of the pipeline consumes (eliminations, NCI overlay,
346    // FS generator). A mismatch is therefore a tracing log, not a
347    // hard error — the resulting `AggregatedTb` carries the
348    // presentation-currency label and downstream consumers should
349    // interpret amounts as already-translated where translation
350    // applies.
351    if tb.currency != manifest.presentation_currency {
352        tracing::debug!(
353            entity = entity_code,
354            tb_currency = %tb.currency,
355            presentation_currency = %manifest.presentation_currency,
356            "TB currency differs from presentation currency — translation worksheet emitted separately (Chunk 6)",
357        );
358    }
359    Ok(())
360}
361
362/// Add `tb` into `agg`: per-line debit/credit accumulation, total
363/// roll-up, and contributing-entity recording.
364///
365/// Mirrors the shape of [`TrialBalance::add_line`] but accumulates
366/// across entities rather than within one TB — the [`BTreeMap`] keyed
367/// by `account_code` is the only deduplication we need: two lines on
368/// the same code from different entities sum into one
369/// [`AggregatedAccount`] with `contributing_entities` incremented once
370/// per *entity* (even if a single entity were to post two lines on the
371/// same account, we still count it once).
372fn accumulate_tb(agg: &mut AggregatedTb, entity_code: &str, tb: &TrialBalance) {
373    // Track which accounts this entity touches so we increment
374    // `contributing_entities` once per entity, not once per line.
375    let mut accounts_touched_by_this_entity: std::collections::BTreeSet<String> =
376        std::collections::BTreeSet::new();
377
378    for line in &tb.lines {
379        let entry = agg
380            .account_totals
381            .entry(line.account_code.clone())
382            .or_insert_with(|| AggregatedAccount {
383                account_code: line.account_code.clone(),
384                debit_total: Decimal::ZERO,
385                credit_total: Decimal::ZERO,
386                net_balance: Decimal::ZERO,
387                contributing_entities: 0,
388                // First-seen account_type wins (see field docstring).
389                // The v5.33 per-entity TB writer stamps the
390                // framework-aware AccountType on every line; the
391                // aggregator inherits that decision here rather than
392                // re-classifying by US-only code-prefix later.
393                account_type: line.account_type,
394            });
395        entry.debit_total += line.debit_balance;
396        entry.credit_total += line.credit_balance;
397        entry.net_balance = entry.debit_total - entry.credit_total;
398
399        if accounts_touched_by_this_entity.insert(line.account_code.clone()) {
400            // First time this entity touched this account in the
401            // current TB — bump the contributor count.
402            entry.contributing_entities += 1;
403        }
404    }
405
406    agg.total_debits += tb.total_debits;
407    agg.total_credits += tb.total_credits;
408    agg.contributing_entities.push(entity_code.to_string());
409}