Skip to main content

sidereon_core/sp3/
combine.rs

1//! Multi-source SP3 combination: clock-datum alignment across analysis centers.
2//!
3//! Precise clock products from different analysis centers are referenced to
4//! different station/ensemble clocks, so their raw clock values differ by a
5//! per-epoch common offset - the reference-clock difference - that drifts over
6//! the day. Before clocks from two centers can be compared or combined, that
7//! datum must be removed. [`clock_reference_offset`] estimates it robustly (the
8//! median, over the satellites both products report at each epoch, of
9//! `other - reference`); subtract it from `other`'s clocks to put both products
10//! on `reference`'s datum.
11//!
12//! Orbit positions need no such treatment: every center reports ITRF
13//! center-of-mass coordinates, so cross-center position differences are already
14//! directly comparable.
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use crate::astro::math::vec3;
19use crate::astro::time::civil::mjd_from_jd;
20use crate::astro::time::gnss;
21use crate::astro::time::model::Instant;
22
23use super::interp::instant_to_j2000_seconds;
24use super::{RawNode, Sp3, Sp3DataType, Sp3Flags, Sp3Header, Sp3State};
25use crate::constants::{GPS_EPOCH_TO_J2000_S, KM_TO_M};
26use crate::frame::ItrfPositionM;
27use crate::id::{GnssSatelliteId, GnssSystem};
28use crate::tolerances::WHOLE_SECOND_EPS_S;
29use crate::validate;
30use crate::{Error, Result};
31
32const MAX_EXACT_CLIQUE_NODES: usize = 32;
33
34/// One epoch's reference-clock offset of `other` relative to `reference`.
35#[derive(Debug, Clone, Copy, PartialEq)]
36pub struct ClockReferenceOffset {
37    /// The matched epoch.
38    pub epoch: Instant,
39    /// `other - reference` clock datum at this epoch, in seconds. Positive means
40    /// `other`'s clock datum runs ahead of `reference`'s; subtract it from
41    /// `other`'s clocks to align them to `reference`.
42    pub offset_s: f64,
43    /// Number of satellites that contributed to the (median) estimate.
44    pub satellites: usize,
45}
46
47/// Estimate the per-epoch reference-clock offset of `other` relative to
48/// `reference`.
49///
50/// For each epoch present in both products, the offset is the median over the
51/// satellites both report (each with a finite clock) of
52/// `other_clock - reference_clock`. The median makes the estimate robust to a
53/// single satellite whose clock one center has wrong - but only with enough
54/// satellites, so `min_common` is the minimum number of common clocked
55/// satellites required to emit an offset for an epoch (a sound robust median
56/// wants at least three, so one outlier can be outvoted). Epochs with fewer
57/// common clocks are omitted rather than reported as a fragile one- or
58/// two-satellite estimate.
59///
60/// Epochs are matched by their J2000 second floored to a whole second (the same
61/// node-axis convention the interpolator uses). Non-finite clock differences are
62/// skipped. Epochs present in only one product, or below `min_common`, are
63/// omitted from the result.
64///
65/// The floored-whole-second key assumes the input cadence is at least one second,
66/// which holds for every standard SP3 product (15 min, 5 min, 1 min, ... down to
67/// 1 s). Two distinct epochs less than a second apart would collapse onto the
68/// same key and be matched as one; the same applies to the floored key in
69/// [`MergeReport::per_epoch_agreement`]. This is kept deliberately aligned with
70/// the interpolator's node axis rather than refined to sub-second resolution, so
71/// that matching here and interpolation downstream use one consistent grid.
72pub fn clock_reference_offset(
73    reference: &Sp3,
74    other: &Sp3,
75    min_common: usize,
76) -> Vec<ClockReferenceOffset> {
77    let mut other_index: std::collections::HashMap<i64, usize> = std::collections::HashMap::new();
78    for (idx, epoch) in other.epochs.iter().enumerate() {
79        if let Some(seconds) = instant_to_j2000_seconds(epoch) {
80            other_index.insert(seconds.floor() as i64, idx);
81        }
82    }
83
84    let mut offsets = Vec::new();
85
86    for (ref_idx, epoch) in reference.epochs.iter().enumerate() {
87        let Some(ref_seconds) = instant_to_j2000_seconds(epoch) else {
88            continue;
89        };
90        let Some(&other_idx) = other_index.get(&(ref_seconds.floor() as i64)) else {
91            continue;
92        };
93
94        let (Ok(ref_states), Ok(other_states)) =
95            (reference.states_at(ref_idx), other.states_at(other_idx))
96        else {
97            continue;
98        };
99
100        let mut diffs: Vec<f64> = Vec::new();
101        for (sat, ref_state) in ref_states.iter() {
102            let Some(ref_clock) = ref_state.clock_s else {
103                continue;
104            };
105            if let Some(other_state) = other_states.get(sat) {
106                if let Some(other_clock) = other_state.clock_s {
107                    let diff = other_clock - ref_clock;
108                    // SP3 should not carry NaN/inf clocks, but the parser can
109                    // accept them; merge infrastructure must not panic on data.
110                    if diff.is_finite() {
111                        diffs.push(diff);
112                    }
113                }
114            }
115        }
116
117        if diffs.len() >= min_common.max(1) {
118            if let Some(offset_s) = median(&mut diffs) {
119                offsets.push(ClockReferenceOffset {
120                    epoch: *epoch,
121                    offset_s,
122                    satellites: diffs.len(),
123                });
124            }
125        }
126    }
127
128    offsets
129}
130
131fn median(values: &mut [f64]) -> Option<f64> {
132    if values.is_empty() {
133        return None;
134    }
135
136    // Inputs are pre-filtered to finite values; total_cmp never panics regardless.
137    values.sort_by(f64::total_cmp);
138
139    let n = values.len();
140    if n % 2 == 1 {
141        Some(values[n / 2])
142    } else {
143        Some((values[n / 2 - 1] + values[n / 2]) / 2.0)
144    }
145}
146
147// ===========================================================================
148// Multi-source merge
149// ===========================================================================
150
151/// How the agreeing (consensus) sources for a cell are combined into the merged
152/// value.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum MergeCombine {
155    /// Arithmetic mean of the consensus sources. The clustering step has already
156    /// removed outliers, so the mean uses every agreeing measurement. Default.
157    Mean,
158    /// Component-wise median of the consensus sources.
159    Median,
160    /// The value from the highest-precedence (earliest-listed) consensus source.
161    Precedence,
162}
163
164/// Options for [`merge`].
165#[derive(Debug, Clone, PartialEq)]
166pub struct MergeOptions {
167    /// Maximum 3D position difference (meters) for two sources to be in
168    /// agreement.
169    pub position_tolerance_m: f64,
170    /// Maximum clock difference (seconds, after datum alignment) for two sources
171    /// to be in agreement.
172    pub clock_tolerance_s: f64,
173    /// Minimum number of mutually-agreeing sources required to accept a cell that
174    /// has two or more sources. A cell with a single source is always carried
175    /// through (gap fill, recorded as `single_source`); a cell with several
176    /// sources but no agreeing subset this large is quarantined rather than
177    /// averaged across disagreeing centers.
178    pub min_agree: usize,
179    /// Minimum common clocked satellites for the per-epoch clock-datum estimate
180    /// between two sources (see [`clock_reference_offset`]).
181    pub clock_min_common: usize,
182    /// How to combine the agreeing sources.
183    pub combine: MergeCombine,
184    /// Optional target epoch interval, in seconds. When unset the coarsest input
185    /// interval is used. Finer inputs are decimated onto this grid by exact
186    /// subset selection (never interpolated); inputs whose interval does not
187    /// evenly divide it are rejected.
188    pub target_epoch_interval_s: Option<f64>,
189    /// Optional constellation/system filter. When set, only satellites whose
190    /// system is in this set are considered for the merged product.
191    pub systems: Option<BTreeSet<GnssSystem>>,
192}
193
194impl Default for MergeOptions {
195    /// Defaults tuned for the common case of ~3 analysis centers: agreement is a
196    /// 2-of-3 majority (`min_agree = 2`); combine the agreeing subset by mean.
197    fn default() -> Self {
198        Self {
199            position_tolerance_m: 0.5,
200            clock_tolerance_s: 5.0e-9,
201            min_agree: 2,
202            clock_min_common: 5,
203            combine: MergeCombine::Mean,
204            target_epoch_interval_s: None,
205            systems: None,
206        }
207    }
208}
209
210/// One (epoch, satellite) cell the merge handled with a caveat. Nothing is
211/// dropped or averaged silently - every such cell is recorded here.
212#[derive(Debug, Clone, PartialEq)]
213pub struct MergeFlag {
214    /// The epoch.
215    pub epoch: Instant,
216    /// The satellite.
217    pub satellite: GnssSatelliteId,
218    /// The source indices (into the input slice) this flag refers to: for
219    /// `single_source`, the lone contributor; for `quarantined`, all sources
220    /// that disagreed; for `position_outliers`, the sources rejected from an
221    /// otherwise-accepted consensus.
222    pub sources: Vec<usize>,
223}
224
225/// Per-(epoch, satellite) agreement statistics for one accepted consensus cell:
226/// how tightly the consensus member values cluster about the combined value that
227/// was actually written to the merged product.
228///
229/// The dispersion is measured about the *combined* value (the mean, median, or
230/// precedence pick - whatever the strategy wrote), not about the cluster centroid,
231/// so it reflects the agreement of the product the merge emitted. A single-source
232/// cell has one member and zero dispersion.
233#[derive(Debug, Clone, Copy, PartialEq)]
234pub struct AgreementMetric {
235    /// The epoch.
236    pub epoch: Instant,
237    /// The satellite.
238    pub satellite: GnssSatelliteId,
239    /// Number of sources in the accepted position consensus (>= 1).
240    pub position_members: usize,
241    /// RMS, over the position-consensus members, of the 3D distance from the
242    /// combined position, meters. Zero for a single-source cell.
243    pub position_rms_m: f64,
244    /// Largest 3D distance of any position-consensus member from the combined
245    /// position, meters.
246    pub position_max_m: f64,
247    /// Number of sources in the accepted clock consensus (0 when the cell carries
248    /// no clock).
249    pub clock_members: usize,
250    /// RMS, over the clock-consensus members, of the deviation from the combined
251    /// clock, seconds; `None` when the cell carries no clock.
252    pub clock_rms_s: Option<f64>,
253    /// Largest absolute clock deviation from the combined clock, seconds; `None`
254    /// when the cell carries no clock.
255    pub clock_max_s: Option<f64>,
256}
257
258/// Per-epoch aggregate of [`AgreementMetric`] over the satellites combined at that
259/// epoch, restricted to cells with a *multi-source* consensus (a single source
260/// has no measurable dispersion, so it is excluded from the aggregate spread).
261#[derive(Debug, Clone, Copy, PartialEq)]
262pub struct EpochAgreement {
263    /// The epoch.
264    pub epoch: Instant,
265    /// Satellites at this epoch with a multi-source position consensus.
266    pub satellites: usize,
267    /// Member-count-weighted pooled RMS of the per-cell position dispersion over
268    /// those satellites, meters (i.e. the RMS of every member-to-combined 3D
269    /// distance pooled across the epoch).
270    pub position_rms_m: f64,
271    /// Worst per-cell position dispersion at this epoch, meters.
272    pub position_max_m: f64,
273    /// As `position_rms_m` for the clock channel; `None` when no multi-source
274    /// clock consensus existed at this epoch.
275    pub clock_rms_s: Option<f64>,
276    /// Worst per-cell clock dispersion at this epoch, seconds; `None` as above.
277    pub clock_max_s: Option<f64>,
278}
279
280/// Audit trail for a [`merge`].
281#[derive(Debug, Clone, Default, PartialEq)]
282pub struct MergeReport {
283    /// Cells where two or more sources disagreed beyond tolerance with no
284    /// agreeing subset of `min_agree` - omitted from the merged product.
285    pub quarantined: Vec<MergeFlag>,
286    /// Cells carried from a single source (no cross-check was possible).
287    pub single_source: Vec<MergeFlag>,
288    /// Cells accepted by consensus where one or more sources were rejected as
289    /// position outliers.
290    pub position_outliers: Vec<MergeFlag>,
291    /// Per-(epoch, satellite) agreement statistics for every accepted cell, in
292    /// output (epoch, then satellite) order - one entry per cell written to the
293    /// merged product. Quantifies how tightly the consensus sources clustered
294    /// about the combined value (Gap: per-epoch quality metrics).
295    pub agreement: Vec<AgreementMetric>,
296}
297
298impl MergeReport {
299    /// Fraction of accepted cells that were carried from a single source, in
300    /// `0.0..=1.0`; `None` when no cells were accepted.
301    ///
302    /// This is the blind-spot companion to the agreement-RMS accessors, which
303    /// quantify dispersion only over *multi-source* cells. A product can show a
304    /// tight (or `None`) agreement RMS yet be largely un-cross-checked: those
305    /// gap-fill cells (also enumerated in [`MergeReport::single_source`]) had no
306    /// second source to compare against. Read this alongside the RMS so a clean
307    /// dispersion is not mistaken for a fully corroborated product.
308    pub fn single_source_fraction(&self) -> Option<f64> {
309        let accepted = self.agreement.len();
310        (accepted > 0).then(|| self.single_source.len() as f64 / accepted as f64)
311    }
312
313    /// Member-count-weighted pooled RMS of the per-cell position dispersion over
314    /// every accepted cell with a multi-source consensus, meters. `None` when no
315    /// cell had two or more position-consensus members.
316    ///
317    /// The pool is exact: each cell contributes its summed squared member-to-
318    /// combined distances (`position_rms_m^2 * position_members`), normalised by
319    /// the total member count, so the result is the RMS of all member-to-combined
320    /// distances across the whole product.
321    ///
322    /// This covers only multi-source cells; single-source gap-fill cells are
323    /// excluded (they have no dispersion). A small or `None` result therefore does
324    /// not by itself mean the whole product was corroborated - check
325    /// [`MergeReport::single_source_fraction`] for the un-cross-checked share.
326    pub fn position_agreement_rms_m(&self) -> Option<f64> {
327        pooled_rms(
328            self.agreement
329                .iter()
330                .filter(|m| m.position_members >= 2)
331                .map(|m| (m.position_rms_m, m.position_members)),
332        )
333    }
334
335    /// Largest single-cell position dispersion over all accepted cells, meters.
336    /// `None` when there are no accepted cells.
337    pub fn position_agreement_max_m(&self) -> Option<f64> {
338        self.agreement
339            .iter()
340            .map(|m| m.position_max_m)
341            .fold(None, |acc, v| Some(fold_max(acc, v)))
342    }
343
344    /// As [`Self::position_agreement_rms_m`] for the clock channel, seconds.
345    pub fn clock_agreement_rms_s(&self) -> Option<f64> {
346        pooled_rms(self.agreement.iter().filter_map(|m| {
347            m.clock_rms_s
348                .filter(|_| m.clock_members >= 2)
349                .map(|rms| (rms, m.clock_members))
350        }))
351    }
352
353    /// Largest single-cell clock dispersion over all accepted cells, seconds.
354    pub fn clock_agreement_max_s(&self) -> Option<f64> {
355        self.agreement
356            .iter()
357            .filter_map(|m| m.clock_max_s)
358            .fold(None, |acc, v| Some(fold_max(acc, v)))
359    }
360
361    /// Per-epoch aggregate agreement, in output-epoch order. Each entry pools the
362    /// multi-source cells at that epoch (see [`EpochAgreement`]); epochs whose
363    /// cells were all single-source are still listed with `satellites == 0` and a
364    /// zero position spread so the caller sees every output epoch.
365    pub fn per_epoch_agreement(&self) -> Vec<EpochAgreement> {
366        let mut out: Vec<EpochAgreement> = Vec::new();
367        let mut current_key: Option<i64> = None;
368        for m in &self.agreement {
369            let key = instant_to_j2000_seconds(&m.epoch).map(|s| s.floor() as i64);
370            if current_key != key || out.is_empty() {
371                out.push(EpochAgreement {
372                    epoch: m.epoch,
373                    satellites: 0,
374                    position_rms_m: 0.0,
375                    position_max_m: 0.0,
376                    clock_rms_s: None,
377                    clock_max_s: None,
378                });
379                current_key = key;
380            }
381            let agg = out.last_mut().expect("just pushed");
382            agg.position_max_m = agg.position_max_m.max(m.position_max_m);
383            if m.position_members >= 2 {
384                agg.satellites += 1;
385            }
386            // Only multi-source clock cells contribute to the epoch clock max,
387            // matching the RMS path: a single-member cell has zero dispersion and
388            // must not leave clock_max_s = Some(0.0) while clock_rms_s is None.
389            if let Some(max) = m.clock_max_s.filter(|_| m.clock_members >= 2) {
390                agg.clock_max_s = Some(fold_max(agg.clock_max_s, max));
391            }
392        }
393
394        // Pooled RMS per epoch needs the sum of squared distances, which the per
395        // entry RMS encodes; recompute it in a second pass grouped by epoch key.
396        for agg in &mut out {
397            let key = instant_to_j2000_seconds(&agg.epoch).map(|s| s.floor() as i64);
398            agg.position_rms_m = pooled_rms(
399                self.agreement
400                    .iter()
401                    .filter(|m| {
402                        m.position_members >= 2
403                            && instant_to_j2000_seconds(&m.epoch).map(|s| s.floor() as i64) == key
404                    })
405                    .map(|m| (m.position_rms_m, m.position_members)),
406            )
407            .unwrap_or(0.0);
408            agg.clock_rms_s = pooled_rms(
409                self.agreement
410                    .iter()
411                    .filter(|m| instant_to_j2000_seconds(&m.epoch).map(|s| s.floor() as i64) == key)
412                    .filter_map(|m| {
413                        m.clock_rms_s
414                            .filter(|_| m.clock_members >= 2)
415                            .map(|rms| (rms, m.clock_members))
416                    }),
417            );
418        }
419
420        out
421    }
422}
423
424/// Pool per-cell RMS values weighted by member count into one RMS:
425/// `sqrt(sum(rms_i^2 * n_i) / sum(n_i))`. `None` when the iterator is empty.
426fn pooled_rms(cells: impl Iterator<Item = (f64, usize)>) -> Option<f64> {
427    let mut sumsq = 0.0_f64;
428    let mut total = 0_usize;
429    for (rms, n) in cells {
430        sumsq += rms * rms * n as f64;
431        total += n;
432    }
433    (total > 0).then(|| (sumsq / total as f64).sqrt())
434}
435
436/// `max` reduction over an `Option` accumulator (`None` is the empty identity).
437fn fold_max(acc: Option<f64>, value: f64) -> f64 {
438    match acc {
439        Some(current) if current >= value => current,
440        _ => value,
441    }
442}
443
444/// Merge several SP3 products from different analysis centers into one
445/// consistent precise-ephemeris dataset.
446///
447/// Orthogonal to time-stitching: this combines providers at the **same** epochs.
448/// Inputs must be on one common, uniform epoch grid. Mixed-cadence products are
449/// rejected rather than unioned onto a finer grid; callers that need that must
450/// resample first. For every (epoch, satellite) cell on the common grid:
451///
452/// - **Union satellite coverage.** A satellite present in any input may appear
453///   in the output, but only on the shared grid and only when doing so preserves
454///   a coherent source/consensus arc.
455/// - **Position consensus.** With one source the value is carried through
456///   (`single_source`). With several, the largest subset of sources mutually
457///   within `position_tolerance_m` is found; if it has at least `min_agree`
458///   members it is combined per `combine` and any sources outside it are recorded
459///   as `position_outliers`. If no such subset exists the cell is `quarantined`
460///   (omitted) - never averaged across disagreeing centers.
461/// - **Clock consensus.** Clocks are first put on a common datum (each source
462///   aligned to the first via [`clock_reference_offset`]), then combined by the
463///   same agreement rule; a cell with no clock consensus carries no clock. A
464///   non-reference source whose datum cannot be estimated at an epoch (below
465///   `clock_min_common` common clocks) contributes **no** clock there rather than
466///   an unaligned one - its position is still merged.
467///
468/// `Precedence` is resolved per satellite arc: once a satellite is assigned to
469/// the highest-precedence source that carries it, that satellite never switches
470/// centers at adjacent epochs. If that source is missing a cell, the cell is
471/// omitted rather than filled from a lower-precedence source.
472///
473/// All inputs must share an exact SP3 time-system label and exact
474/// coordinate-system label (epochs are matched across products in that scale);
475/// mismatches are rejected. The merged record flags are the union (OR) of the
476/// contributing sources' flags - in particular a `clock_event` on any
477/// clock-consensus member is preserved, so the interpolator still splits the
478/// clock arc. The merged header is **synthetic**: its first-epoch fields
479/// describe the union's first epoch and its data type is position-only.
480///
481/// Pure and deterministic: order the inputs by center precedence and ties (equal
482/// cluster sizes, `Precedence` combine) resolve to the earliest-listed source.
483/// The merged product's interpolation nodes are the consensus values, so it
484/// samples and interpolates like any other [`Sp3`] (it is a derived combination,
485/// not a byte-faithful copy of any one center). Consensus is exact max-clique for
486/// normal source counts and uses a deterministic greedy fallback above the exact
487/// search cap, so hostile disagreement graphs remain bounded.
488pub fn merge(sources: &[Sp3], opts: &MergeOptions) -> Result<(Sp3, MergeReport)> {
489    if sources.is_empty() {
490        return Err(Error::InvalidInput(
491            "merge requires at least one SP3 product".into(),
492        ));
493    }
494
495    // Inputs must be combinable: epochs are matched in one exact product time
496    // system, and positions are only comparable in an exactly common coordinate
497    // system / frame. Do not silently alias labels such as QZS/GPS or
498    // IGS20/IGc20 here: without an explicit transform, a differently labeled
499    // product contract is a different product contract.
500    let base = &sources[0].header;
501    for s in &sources[1..] {
502        if s.header.time_system != base.time_system {
503            return Err(Error::InvalidInput(format!(
504                "merge inputs have mismatched SP3 time systems ({:?} vs {:?})",
505                base.time_system, s.header.time_system
506            )));
507        }
508        if s.header.coordinate_system.trim() != base.coordinate_system.trim() {
509            return Err(Error::InvalidInput(format!(
510                "merge inputs have mismatched coordinate systems ({:?} vs {:?})",
511                base.coordinate_system, s.header.coordinate_system
512            )));
513        }
514    }
515
516    // floored-J2000-second -> epoch index, per source.
517    let epoch_index: Vec<BTreeMap<i64, usize>> = sources
518        .iter()
519        .map(|s| {
520            s.epochs
521                .iter()
522                .enumerate()
523                .filter_map(|(i, ep)| {
524                    instant_to_j2000_seconds(ep).map(|sec| (sec.floor() as i64, i))
525                })
526                .collect()
527        })
528        .collect();
529
530    let epoch_interval_s = resolve_common_epoch_interval(sources, opts.target_epoch_interval_s)?;
531
532    // Per-source per-epoch clock-datum offset relative to source 0. Source 0 is
533    // the datum, so its offset is identically zero.
534    let clock_offset: Vec<BTreeMap<i64, f64>> = sources
535        .iter()
536        .enumerate()
537        .map(|(idx, s)| {
538            if idx == 0 {
539                BTreeMap::new()
540            } else {
541                clock_reference_offset(&sources[0], s, opts.clock_min_common)
542                    .into_iter()
543                    .filter_map(|o| {
544                        instant_to_j2000_seconds(&o.epoch)
545                            .map(|sec| (sec.floor() as i64, o.offset_s))
546                    })
547                    .collect()
548            }
549        })
550        .collect();
551
552    // Intersection of epochs (by floored second), keeping source 0's
553    // representative Instant. Mixing a 15-minute product with 5-minute products
554    // must not emit the union grid; if all inputs share a cadence but differ in
555    // coverage, only the epochs present in every product are combined.
556    let mut epoch_keys: BTreeMap<i64, Instant> = sources[0]
557        .epochs
558        .iter()
559        .filter_map(|ep| instant_to_j2000_seconds(ep).map(|sec| (sec.floor() as i64, *ep)))
560        .collect();
561
562    for index in epoch_index.iter().skip(1) {
563        epoch_keys.retain(|key, _| index.contains_key(key));
564    }
565
566    // Decimate onto the resolved common-interval grid (anchored at the earliest
567    // common epoch): keep only epochs that land on the grid, dropping off-grid
568    // epochs by exact subset selection (never interpolation). A no-op when the
569    // inputs already share the interval; the real decimation when finer inputs
570    // are mixed with a coarser one, or an explicit coarser target is requested.
571    if let Some((&anchor, _)) = epoch_keys.iter().next() {
572        let step = epoch_interval_s.round() as i64;
573        if step > 0 {
574            epoch_keys.retain(|&key, _| (key - anchor).rem_euclid(step) == 0);
575        }
576    }
577
578    if epoch_keys.is_empty() {
579        return Err(Error::InvalidInput(
580            "merge inputs have no common epochs on a shared time grid".into(),
581        ));
582    }
583
584    let precedence_source_for_sat = if opts.combine == MergeCombine::Precedence {
585        Some(precedence_sources_for_satellites(
586            sources,
587            &epoch_index,
588            &epoch_keys,
589            opts.systems.as_ref(),
590        ))
591    } else {
592        None
593    };
594
595    let allowed_system = |sat: &GnssSatelliteId| {
596        opts.systems
597            .as_ref()
598            .is_none_or(|systems| systems.contains(&sat.system))
599    };
600
601    if let Some(systems) = &opts.systems {
602        if systems.is_empty() {
603            return Err(Error::InvalidInput(
604                "merge systems filter must not be empty".into(),
605            ));
606        }
607    }
608
609    let mut out_epochs: Vec<Instant> = Vec::with_capacity(epoch_keys.len());
610    let mut out_states: Vec<BTreeMap<GnssSatelliteId, Sp3State>> =
611        Vec::with_capacity(epoch_keys.len());
612    let mut out_raw: Vec<BTreeMap<GnssSatelliteId, RawNode>> = Vec::with_capacity(epoch_keys.len());
613    let mut report = MergeReport::default();
614    let mut all_sats: BTreeSet<GnssSatelliteId> = BTreeSet::new();
615
616    for (&key, &epoch) in &epoch_keys {
617        out_epochs.push(epoch);
618        let mut states: BTreeMap<GnssSatelliteId, Sp3State> = BTreeMap::new();
619        let mut raws: BTreeMap<GnssSatelliteId, RawNode> = BTreeMap::new();
620
621        // Satellites present at this epoch in any source, after any requested
622        // constellation filter.
623        let mut sats: BTreeSet<GnssSatelliteId> = BTreeSet::new();
624        for (idx, s) in sources.iter().enumerate() {
625            if let Some(&ei) = epoch_index[idx].get(&key) {
626                if let Ok(map) = s.states_at(ei) {
627                    sats.extend(map.keys().copied().filter(|sat| allowed_system(sat)));
628                }
629            }
630        }
631
632        for sat in sats {
633            // (source_idx, position_m, flags) and (source_idx, datum-aligned
634            // clock_s, flags). A non-reference source contributes a clock only
635            // when its datum offset could be estimated at this epoch; otherwise
636            // its clock would be unaligned, so it is omitted (the position is
637            // still gathered).
638            let preferred_source = precedence_source_for_sat
639                .as_ref()
640                .and_then(|by_sat| by_sat.get(&sat).copied());
641
642            let mut pos: Vec<(usize, [f64; 3], Sp3Flags)> = Vec::new();
643            let mut clk: Vec<(usize, f64, Sp3Flags)> = Vec::new();
644            for (idx, s) in sources.iter().enumerate() {
645                let Some(&ei) = epoch_index[idx].get(&key) else {
646                    continue;
647                };
648                let Ok(map) = s.states_at(ei) else { continue };
649                let Some(state) = map.get(&sat) else { continue };
650                pos.push((idx, state.position.as_array(), state.flags));
651                if let Some(c) = state.clock_s {
652                    let offset = if idx == 0 {
653                        Some(0.0)
654                    } else {
655                        clock_offset[idx].get(&key).copied()
656                    };
657                    if let Some(off) = offset {
658                        let aligned = c - off;
659                        if aligned.is_finite() {
660                            clk.push((idx, aligned, state.flags));
661                        }
662                    }
663                }
664            }
665
666            let flag = |srcs: Vec<usize>| MergeFlag {
667                epoch,
668                satellite: sat,
669                sources: srcs,
670            };
671
672            // Position consensus -> the merged position and the indices (into
673            // `pos`) of the sources that contributed it. In precedence mode the
674            // preferred source is fixed per satellite arc; never switch to a
675            // lower-precedence source just because the preferred source is
676            // missing or outside a different consensus cluster at this epoch.
677            let (position_m, pos_members) = if opts.combine == MergeCombine::Precedence {
678                let Some(preferred_source) = preferred_source else {
679                    continue;
680                };
681                let Some(preferred_idx) =
682                    pos.iter().position(|(src, _, _)| *src == preferred_source)
683                else {
684                    continue;
685                };
686
687                if pos.len() == 1 {
688                    report.single_source.push(flag(vec![pos[preferred_idx].0]));
689                    (pos[preferred_idx].1, vec![preferred_idx])
690                } else {
691                    let pts: Vec<[f64; 3]> = pos.iter().map(|(_, p, _)| *p).collect();
692                    let cluster = largest_within_containing(&pts, preferred_idx, |a, b| {
693                        dist3(a, b) <= opts.position_tolerance_m
694                    });
695                    if cluster.len() >= opts.min_agree {
696                        let rejected: Vec<usize> = (0..pos.len())
697                            .filter(|i| !cluster.contains(i))
698                            .map(|i| pos[i].0)
699                            .collect();
700                        if !rejected.is_empty() {
701                            report.position_outliers.push(flag(rejected));
702                        }
703                        (pos[preferred_idx].1, cluster)
704                    } else {
705                        report
706                            .quarantined
707                            .push(flag(pos.iter().map(|(i, _, _)| *i).collect()));
708                        continue;
709                    }
710                }
711            } else if pos.len() == 1 {
712                report.single_source.push(flag(vec![pos[0].0]));
713                (pos[0].1, vec![0usize])
714            } else {
715                let pts: Vec<[f64; 3]> = pos.iter().map(|(_, p, _)| *p).collect();
716                let cluster = largest_within(&pts, |a, b| dist3(a, b) <= opts.position_tolerance_m);
717                if cluster.len() >= opts.min_agree {
718                    let rejected: Vec<usize> = (0..pos.len())
719                        .filter(|i| !cluster.contains(i))
720                        .map(|i| pos[i].0)
721                        .collect();
722                    if !rejected.is_empty() {
723                        report.position_outliers.push(flag(rejected));
724                    }
725                    let members: Vec<(usize, [f64; 3])> =
726                        cluster.iter().map(|&i| (pos[i].0, pos[i].1)).collect();
727                    (combine3(&members, opts.combine), cluster)
728                } else {
729                    report
730                        .quarantined
731                        .push(flag(pos.iter().map(|(i, _, _)| *i).collect()));
732                    continue;
733                }
734            };
735
736            // Clock consensus, independent of position -> the merged clock and the
737            // indices (into `clk`) of the sources that contributed it.
738            let (clock_s, clk_members): (Option<f64>, Vec<usize>) = if clk.is_empty() {
739                (None, Vec::new())
740            } else if opts.combine == MergeCombine::Precedence {
741                match preferred_source
742                    .and_then(|src| clk.iter().position(|(clock_src, _, _)| *clock_src == src))
743                {
744                    None => (None, Vec::new()),
745                    Some(preferred_idx) if clk.len() == 1 => {
746                        (Some(clk[preferred_idx].1), vec![preferred_idx])
747                    }
748                    Some(preferred_idx) => {
749                        let vals: Vec<f64> = clk.iter().map(|(_, c, _)| *c).collect();
750                        let cluster = largest_within_containing(&vals, preferred_idx, |a, b| {
751                            (a - b).abs() <= opts.clock_tolerance_s
752                        });
753                        if cluster.len() >= opts.min_agree {
754                            (Some(clk[preferred_idx].1), cluster)
755                        } else {
756                            (None, Vec::new())
757                        }
758                    }
759                }
760            } else if clk.len() == 1 {
761                (Some(clk[0].1), vec![0usize])
762            } else {
763                let vals: Vec<f64> = clk.iter().map(|(_, c, _)| *c).collect();
764                let cluster = largest_within(&vals, |a, b| (a - b).abs() <= opts.clock_tolerance_s);
765                if cluster.len() >= opts.min_agree {
766                    let members: Vec<(usize, f64)> =
767                        cluster.iter().map(|&i| (clk[i].0, clk[i].1)).collect();
768                    (Some(combine_axis(&members, opts.combine)), cluster)
769                } else {
770                    (None, Vec::new())
771                }
772            };
773
774            // Preserve record flags: OR the orbit flags across the position
775            // members and the clock flags across the clock members, so a
776            // `clock_event` (clock reset) or maneuver on any contributing source
777            // survives into the merged product.
778            let mut flags = Sp3Flags::default();
779            for &i in &pos_members {
780                flags.maneuver |= pos[i].2.maneuver;
781                flags.orbit_predicted |= pos[i].2.orbit_predicted;
782            }
783            for &i in &clk_members {
784                flags.clock_event |= clk[i].2.clock_event;
785                flags.clock_predicted |= clk[i].2.clock_predicted;
786            }
787
788            // Per-cell agreement: dispersion of the accepted consensus members
789            // about the combined value actually written below.
790            let (position_rms_m, position_max_m) =
791                position_dispersion(&pos, &pos_members, &position_m);
792            let (clock_members_n, clock_rms_s, clock_max_s) = match clock_s {
793                Some(c) => {
794                    let (rms, max) = clock_dispersion(&clk, &clk_members, c);
795                    (clk_members.len(), Some(rms), Some(max))
796                }
797                None => (0, None, None),
798            };
799            report.agreement.push(AgreementMetric {
800                epoch,
801                satellite: sat,
802                position_members: pos_members.len(),
803                position_rms_m,
804                position_max_m,
805                clock_members: clock_members_n,
806                clock_rms_s,
807                clock_max_s,
808            });
809
810            all_sats.insert(sat);
811            states.insert(
812                sat,
813                Sp3State {
814                    position: ItrfPositionM::new(position_m[0], position_m[1], position_m[2])
815                        .expect("valid ITRF position"),
816                    clock_s,
817                    velocity: None,
818                    clock_rate_s_s: None,
819                    flags,
820                },
821            );
822            raws.insert(
823                sat,
824                RawNode {
825                    km: [
826                        position_m[0] / KM_TO_M,
827                        position_m[1] / KM_TO_M,
828                        position_m[2] / KM_TO_M,
829                    ],
830                    clock_us: clock_s.map(|c| c * 1.0e6),
831                    clock_event: flags.clock_event,
832                },
833            );
834        }
835
836        out_states.push(states);
837        out_raw.push(raws);
838    }
839
840    // Base the non-epoch metadata on a source product, but derive the first-epoch
841    // header fields from the merged grid itself. Mixed cadence / coverage can make
842    // the merged first epoch later than every input's first epoch, so cloning
843    // those fields from any input would make the `##` line stale.
844    let first_key = instant_to_j2000_seconds(&out_epochs[0]).map(|s| s.floor() as i64);
845    let base_idx = sources
846        .iter()
847        .position(|s| {
848            s.epochs
849                .first()
850                .and_then(instant_to_j2000_seconds)
851                .map(|s| s.floor() as i64)
852                == first_key
853        })
854        .or_else(|| {
855            sources
856                .iter()
857                .enumerate()
858                .filter_map(|(i, s)| {
859                    s.epochs
860                        .first()
861                        .and_then(instant_to_j2000_seconds)
862                        .map(|sec| (sec, i))
863                })
864                .min_by(|a, b| a.0.total_cmp(&b.0).then(a.1.cmp(&b.1)))
865                .map(|(_, i)| i)
866        })
867        .unwrap_or(0);
868    let first_epoch_header = first_epoch_header_fields(&out_epochs[0]).ok_or_else(|| {
869        Error::InvalidInput("merged SP3 first epoch cannot be represented in header fields".into())
870    })?;
871
872    let satellites: Vec<_> = all_sats.into_iter().collect();
873    let satellite_accuracy_codes = satellites
874        .iter()
875        .map(|sat| {
876            sources[base_idx]
877                .header
878                .satellites
879                .iter()
880                .position(|base_sat| base_sat == sat)
881                .and_then(|idx| {
882                    sources[base_idx]
883                        .header
884                        .satellite_accuracy_codes
885                        .get(idx)
886                        .copied()
887                })
888                .unwrap_or(0)
889        })
890        .collect();
891
892    let header = Sp3Header {
893        num_epochs: out_epochs.len() as u64,
894        satellites,
895        satellite_accuracy_codes,
896        data_type: Sp3DataType::Position,
897        gnss_week: first_epoch_header.gnss_week,
898        seconds_of_week: first_epoch_header.seconds_of_week,
899        epoch_interval_s,
900        mjd: first_epoch_header.mjd,
901        mjd_fraction: first_epoch_header.mjd_fraction,
902        ..sources[base_idx].header.clone()
903    };
904
905    let merged = Sp3 {
906        header,
907        epochs: out_epochs,
908        states: out_states,
909        interp_raw: out_raw,
910        comments: vec![format!("MERGED from {} SP3 products", sources.len())],
911        skipped_records: sources.iter().map(|s| s.skipped_records).sum(),
912    };
913
914    Ok((merged, report))
915}
916
917#[derive(Debug, Clone, Copy)]
918struct FirstEpochHeaderFields {
919    gnss_week: u32,
920    seconds_of_week: f64,
921    mjd: u32,
922    mjd_fraction: f64,
923}
924
925fn first_epoch_header_fields(epoch: &Instant) -> Option<FirstEpochHeaderFields> {
926    let split = epoch.julian_date()?;
927
928    let mjd_day = mjd_from_jd(split.jd_whole);
929    let mut mjd = mjd_day.floor();
930    let mut mjd_fraction = split.fraction + (mjd_day - mjd);
931    let fraction_days = mjd_fraction.floor();
932    if fraction_days != 0.0 {
933        mjd += fraction_days;
934        mjd_fraction -= fraction_days;
935    }
936    if !(0.0..=u32::MAX as f64).contains(&mjd) {
937        return None;
938    }
939
940    let gps_seconds = instant_to_j2000_seconds(epoch)? + GPS_EPOCH_TO_J2000_S;
941    let (gnss_week, seconds_of_week) = gnss::week_and_seconds_of_week(gps_seconds);
942    if !(0.0..=u32::MAX as f64).contains(&gnss_week) {
943        return None;
944    }
945
946    Some(FirstEpochHeaderFields {
947        gnss_week: gnss_week as u32,
948        seconds_of_week,
949        mjd: mjd as u32,
950        mjd_fraction,
951    })
952}
953
954fn dist3(a: &[f64; 3], b: &[f64; 3]) -> f64 {
955    vec3::norm3(vec3::sub3(*a, *b))
956}
957
958/// RMS and max of the 3D distance of each `members` position (indices into `pos`)
959/// from `combined`. `members` is the accepted consensus, always non-empty.
960fn position_dispersion(
961    pos: &[(usize, [f64; 3], Sp3Flags)],
962    members: &[usize],
963    combined: &[f64; 3],
964) -> (f64, f64) {
965    let mut sumsq = 0.0;
966    let mut max = 0.0_f64;
967    for &i in members {
968        let d = dist3(&pos[i].1, combined);
969        sumsq += d * d;
970        max = max.max(d);
971    }
972    ((sumsq / members.len().max(1) as f64).sqrt(), max)
973}
974
975/// RMS and max of the absolute deviation of each `members` clock (indices into
976/// `clk`) from `combined`. `members` is the accepted consensus, always non-empty.
977fn clock_dispersion(
978    clk: &[(usize, f64, Sp3Flags)],
979    members: &[usize],
980    combined: f64,
981) -> (f64, f64) {
982    let mut sumsq = 0.0;
983    let mut max = 0.0_f64;
984    for &i in members {
985        let d = (clk[i].1 - combined).abs();
986        sumsq += d * d;
987        max = max.max(d);
988    }
989    ((sumsq / members.len().max(1) as f64).sqrt(), max)
990}
991
992fn precedence_sources_for_satellites(
993    sources: &[Sp3],
994    epoch_index: &[BTreeMap<i64, usize>],
995    epoch_keys: &BTreeMap<i64, Instant>,
996    systems: Option<&BTreeSet<GnssSystem>>,
997) -> BTreeMap<GnssSatelliteId, usize> {
998    let mut by_sat = BTreeMap::new();
999
1000    for (idx, source) in sources.iter().enumerate() {
1001        for key in epoch_keys.keys() {
1002            let Some(&epoch_idx) = epoch_index[idx].get(key) else {
1003                continue;
1004            };
1005            let Ok(states) = source.states_at(epoch_idx) else {
1006                continue;
1007            };
1008
1009            for sat in states.keys() {
1010                if systems.is_none_or(|allowed| allowed.contains(&sat.system)) {
1011                    by_sat.entry(*sat).or_insert(idx);
1012                }
1013            }
1014        }
1015    }
1016
1017    by_sat
1018}
1019
1020/// Resolve the common (output) epoch interval and validate that every input can
1021/// be decimated onto it without interpolation.
1022///
1023/// The common interval is the caller's `target` if given, otherwise the
1024/// **coarsest** native interval among the inputs (the finest grid every input
1025/// can supply). An input is compatible only when the common interval is a
1026/// positive-integer multiple of that input's native interval: then the
1027/// common-grid epochs are an exact subset of the input's epochs, and the merge's
1028/// epoch intersection performs the decimation (e.g. a 5-minute product
1029/// contributes its :00/:15/:30/:45 epochs to a 15-minute merge - no orbit/clock
1030/// interpolation is introduced). Inputs whose interval does not evenly divide the
1031/// common interval - a coarser input than the requested grid, or a non-divisible
1032/// cadence - are rejected as incompatible. Equal-interval inputs (multiple 1) are
1033/// the same-interval fast path and behave exactly as before.
1034fn resolve_common_epoch_interval(sources: &[Sp3], target: Option<f64>) -> Result<f64> {
1035    let intervals: Vec<f64> = sources
1036        .iter()
1037        .enumerate()
1038        .map(|(idx, source)| {
1039            effective_epoch_interval_s(source)?.ok_or_else(|| {
1040                Error::InvalidInput(format!(
1041                    "merge input {idx} has no usable positive epoch interval"
1042                ))
1043            })
1044        })
1045        .collect::<Result<Vec<_>>>()?;
1046
1047    let common = match target {
1048        Some(t) if t.is_finite() && t > 0.0 => t,
1049        Some(t) => {
1050            return Err(Error::InvalidInput(format!(
1051                "merge target epoch interval must be positive and finite, got {t}"
1052            )))
1053        }
1054        None => intervals.iter().copied().fold(0.0_f64, f64::max),
1055    };
1056
1057    // The merge matches and decimates epochs on whole-second J2000 keys, so the
1058    // common grid must fall on whole seconds for the decimation lattice to be
1059    // exact. SP3 grids are integer-second; reject a fractional common interval
1060    // rather than decimate on a mismatched (rounded) lattice.
1061    if (common - common.round()).abs() > WHOLE_SECOND_EPS_S || common.round() < 1.0 {
1062        return Err(Error::InvalidInput(format!(
1063            "merge common epoch interval {common:.6} s must be a positive whole number of seconds"
1064        )));
1065    }
1066
1067    for (idx, interval) in intervals.iter().copied().enumerate() {
1068        if !divides_evenly(interval, common) {
1069            return Err(Error::InvalidInput(format!(
1070                "merge inputs have mismatched epoch intervals: common {common:.6} s is not an integer multiple of input {idx} {interval:.6} s (no exact-subset decimation; positional interpolation is not performed)"
1071            )));
1072        }
1073    }
1074
1075    Ok(common)
1076}
1077
1078/// True when `common` is a positive-integer multiple of `interval` (within the
1079/// interval tolerance), i.e. `interval`'s grid is a superset of the common grid.
1080fn divides_evenly(interval: f64, common: f64) -> bool {
1081    if !(interval.is_finite() && interval > 0.0 && common.is_finite() && common > 0.0) {
1082        return false;
1083    }
1084    let k = (common / interval).round();
1085    k >= 1.0 && same_interval(k * interval, common)
1086}
1087
1088fn effective_epoch_interval_s(source: &Sp3) -> Result<Option<f64>> {
1089    let secs: Vec<f64> = source
1090        .epochs
1091        .iter()
1092        .filter_map(instant_to_j2000_seconds)
1093        .collect();
1094    validate::require_strictly_increasing(secs.iter().copied(), "merge input epochs").map_err(
1095        |error| Error::InvalidInput(format!("{} must be strictly increasing", error.field())),
1096    )?;
1097    let gaps: Vec<f64> = secs.windows(2).map(|w| w[1] - w[0]).collect();
1098
1099    if gaps.is_empty() {
1100        let header = source.header.epoch_interval_s;
1101        return Ok((header.is_finite() && header > 0.0).then_some(header));
1102    }
1103
1104    let interval = gaps[0];
1105    if gaps.iter().all(|g| same_interval(*g, interval)) {
1106        Ok(Some(interval))
1107    } else {
1108        Ok(None)
1109    }
1110}
1111
1112fn same_interval(a: f64, b: f64) -> bool {
1113    (a - b).abs() <= WHOLE_SECOND_EPS_S
1114}
1115
1116/// Indices of the largest subset of `items` whose members are *mutually* within
1117/// `within`. Exact max-clique over normal source counts; deterministic greedy
1118/// fallback above [`MAX_EXACT_CLIQUE_NODES`] keeps hostile overlap graphs bounded.
1119/// Ties resolve to the lowest-indexed subset (precedence).
1120fn largest_within<T>(items: &[T], within: impl Fn(&T, &T) -> bool) -> Vec<usize> {
1121    let n = items.len();
1122    if n <= 1 {
1123        return (0..n).collect();
1124    }
1125    let graph = agreement_graph(items, within);
1126    if n > MAX_EXACT_CLIQUE_NODES {
1127        return greedy_largest_clique(&graph);
1128    }
1129    let mut best = vec![0];
1130    let mut current = Vec::new();
1131    max_clique_search(&graph, &mut current, (0..n).collect(), &mut best);
1132    best
1133}
1134
1135fn largest_within_containing<T>(
1136    items: &[T],
1137    required: usize,
1138    within: impl Fn(&T, &T) -> bool,
1139) -> Vec<usize> {
1140    let n = items.len();
1141    if n == 0 || required >= n {
1142        return Vec::new();
1143    }
1144    if n == 1 {
1145        return vec![required];
1146    }
1147
1148    let graph = agreement_graph(items, within);
1149    if n > MAX_EXACT_CLIQUE_NODES {
1150        return greedy_largest_clique_containing(&graph, required);
1151    }
1152    let candidates = (0..n)
1153        .filter(|&idx| idx != required && graph[required][idx])
1154        .collect();
1155    let mut best = vec![required];
1156    let mut current = vec![required];
1157    max_clique_search(&graph, &mut current, candidates, &mut best);
1158    best
1159}
1160
1161fn agreement_graph<T>(items: &[T], within: impl Fn(&T, &T) -> bool) -> Vec<Vec<bool>> {
1162    let n = items.len();
1163    let mut graph = vec![vec![false; n]; n];
1164    for i in 0..n {
1165        graph[i][i] = true;
1166        for j in i + 1..n {
1167            let agrees = within(&items[i], &items[j]);
1168            graph[i][j] = agrees;
1169            graph[j][i] = agrees;
1170        }
1171    }
1172    graph
1173}
1174
1175fn greedy_largest_clique(graph: &[Vec<bool>]) -> Vec<usize> {
1176    let mut best = Vec::new();
1177    for seed in 0..graph.len() {
1178        let candidate = greedy_clique_from_seed(graph, seed);
1179        update_best_clique(&candidate, &mut best);
1180    }
1181    best
1182}
1183
1184fn greedy_largest_clique_containing(graph: &[Vec<bool>], required: usize) -> Vec<usize> {
1185    if required >= graph.len() {
1186        return Vec::new();
1187    }
1188    greedy_clique_from_seed(graph, required)
1189}
1190
1191fn greedy_clique_from_seed(graph: &[Vec<bool>], seed: usize) -> Vec<usize> {
1192    let mut clique = vec![seed];
1193    for (idx, _) in graph.iter().enumerate() {
1194        if idx == seed {
1195            continue;
1196        }
1197        if clique.iter().all(|&member| graph[member][idx]) {
1198            clique.push(idx);
1199        }
1200    }
1201    clique.sort_unstable();
1202    clique
1203}
1204
1205fn max_clique_search(
1206    graph: &[Vec<bool>],
1207    current: &mut Vec<usize>,
1208    mut candidates: Vec<usize>,
1209    best: &mut Vec<usize>,
1210) {
1211    candidates.sort_unstable();
1212    for (pos, &candidate) in candidates.iter().enumerate() {
1213        let remaining = candidates.len() - pos;
1214        if current.len() + remaining < best.len() {
1215            break;
1216        }
1217
1218        let next_candidates = candidates[pos + 1..]
1219            .iter()
1220            .copied()
1221            .filter(|&idx| graph[candidate][idx])
1222            .collect();
1223
1224        current.push(candidate);
1225        update_best_clique(current, best);
1226        max_clique_search(graph, current, next_candidates, best);
1227        current.pop();
1228    }
1229}
1230
1231fn update_best_clique(current: &[usize], best: &mut Vec<usize>) {
1232    let mut candidate = current.to_vec();
1233    candidate.sort_unstable();
1234    if candidate.len() > best.len()
1235        || (candidate.len() == best.len() && candidate.as_slice() < best.as_slice())
1236    {
1237        *best = candidate;
1238    }
1239}
1240
1241fn combine3(members: &[(usize, [f64; 3])], how: MergeCombine) -> [f64; 3] {
1242    [0usize, 1, 2].map(|axis| {
1243        let axis_members: Vec<(usize, f64)> = members.iter().map(|(s, v)| (*s, v[axis])).collect();
1244        combine_axis(&axis_members, how)
1245    })
1246}
1247
1248fn combine_axis(members: &[(usize, f64)], how: MergeCombine) -> f64 {
1249    match how {
1250        MergeCombine::Mean => members.iter().map(|(_, v)| *v).sum::<f64>() / members.len() as f64,
1251        MergeCombine::Median => {
1252            let mut vals: Vec<f64> = members.iter().map(|(_, v)| *v).collect();
1253            median(&mut vals).expect("consensus cluster is non-empty")
1254        }
1255        MergeCombine::Precedence => members
1256            .iter()
1257            .min_by_key(|(s, _)| *s)
1258            .map(|(_, v)| *v)
1259            .expect("consensus cluster is non-empty"),
1260    }
1261}
1262
1263/// Return a copy of `other` with its clocks shifted onto `reference`'s clock
1264/// datum.
1265///
1266/// This applies the per-epoch reference-clock offset from
1267/// [`clock_reference_offset`]: at each epoch where the offset could be estimated
1268/// (at least `min_common` common clocked satellites), every clocked satellite's
1269/// offset has the datum subtracted, so the result's clocks are directly
1270/// comparable to `reference`'s. Positions are untouched (already comparable).
1271///
1272/// Epochs where the offset could not be estimated are left unchanged - they are
1273/// *not* on `reference`'s datum, so a caller mixing aligned and unaligned epochs
1274/// should consult [`clock_reference_offset`] to see which epochs were aligned.
1275/// The returned product interpolates like any other [`Sp3`].
1276pub fn align_clock_reference(reference: &Sp3, other: &Sp3, min_common: usize) -> Sp3 {
1277    let offsets: BTreeMap<i64, f64> = clock_reference_offset(reference, other, min_common)
1278        .into_iter()
1279        .filter_map(|o| {
1280            instant_to_j2000_seconds(&o.epoch).map(|sec| (sec.floor() as i64, o.offset_s))
1281        })
1282        .collect();
1283
1284    let mut aligned = other.clone();
1285    for ei in 0..aligned.epochs.len() {
1286        let Some(sec) = instant_to_j2000_seconds(&aligned.epochs[ei]) else {
1287            continue;
1288        };
1289        let Some(&off) = offsets.get(&(sec.floor() as i64)) else {
1290            continue;
1291        };
1292        for state in aligned.states[ei].values_mut() {
1293            if let Some(c) = state.clock_s.as_mut() {
1294                *c -= off;
1295            }
1296        }
1297        for node in aligned.interp_raw[ei].values_mut() {
1298            if let Some(us) = node.clock_us.as_mut() {
1299                *us -= off * 1.0e6;
1300            }
1301        }
1302    }
1303    aligned
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308    use super::super::Sp3;
1309    use super::{
1310        align_clock_reference, clock_reference_offset, merge, MergeCombine, MergeOptions,
1311        MergeReport,
1312    };
1313    use crate::constants::SECONDS_PER_DAY;
1314    use crate::id::{GnssSatelliteId, GnssSystem};
1315    use std::collections::BTreeSet;
1316
1317    /// One satellite sample in a synthetic SP3 epoch: token, ECEF position
1318    /// (km), and optional clock (microseconds).
1319    type SatSample<'a> = (&'a str, [f64; 3], Option<f64>);
1320
1321    fn gps(prn: u8) -> GnssSatelliteId {
1322        GnssSatelliteId::new(GnssSystem::Gps, prn).expect("valid satellite id")
1323    }
1324
1325    // Single-epoch SP3-c from explicit `(satellite, [x,y,z] km, clock us, flag
1326    // suffix)` records under coordinate system `cs` (5 chars, e.g. `"IGS14"`).
1327    // `flags` is appended verbatim after the 60-column record body, so a test can
1328    // place an SP3 flag (e.g. `"              E"` -> the `E` clock-event flag at
1329    // column 75). A `None` clock writes the SP3 bad-clock sentinel.
1330    fn sp3_build(records: &[(&str, [f64; 3], Option<f64>, &str)], cs: &str) -> Sp3 {
1331        let n = records.len();
1332        let mut sats = String::new();
1333        for (sat, _, _, _) in records {
1334            sats.push_str(sat);
1335        }
1336        for _ in n..17 {
1337            sats.push_str("  0");
1338        }
1339        let mut body = String::new();
1340        body.push_str(&format!(
1341            "#cP2020  6 25  0  0  0.00000000       1 ORBIT {cs} FIT  TST\n"
1342        ));
1343        body.push_str("## 2111 432000.00000000   900.00000000 59025 0.0000000000000\n");
1344        body.push_str(&format!("+   {n:2}   {sats}\n"));
1345        body.push_str("++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n");
1346        body.push_str("%c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1347        body.push_str("%c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1348        body.push_str("%f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n");
1349        body.push_str("%f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n");
1350        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1351        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1352        body.push_str("/* TEST SP3-c FIXTURE\n");
1353        body.push_str("*  2020  6 25  0  0  0.00000000\n");
1354        for (sat, p, clk, flags) in records {
1355            let c = clk.unwrap_or(999_999.999_999);
1356            body.push_str(&format!(
1357                "P{sat}{:14.6}{:14.6}{:14.6}{c:14.6}{flags}\n",
1358                p[0], p[1], p[2]
1359            ));
1360        }
1361        body.push_str("EOF\n");
1362        Sp3::parse(body.as_bytes()).expect("parse test sp3")
1363    }
1364
1365    // The common case: IGS14, no flags.
1366    fn sp3_records(records: &[(&str, [f64; 3], Option<f64>)]) -> Sp3 {
1367        let full: Vec<(&str, [f64; 3], Option<f64>, &str)> =
1368            records.iter().map(|(s, p, c)| (*s, *p, *c, "")).collect();
1369        sp3_build(&full, "IGS14")
1370    }
1371
1372    fn sp3_two_epochs(
1373        epoch0: &[(&str, [f64; 3], Option<f64>)],
1374        epoch1: &[(&str, [f64; 3], Option<f64>)],
1375        interval_s: f64,
1376        cs: &str,
1377    ) -> Sp3 {
1378        let mut sats: Vec<&str> = epoch0
1379            .iter()
1380            .chain(epoch1.iter())
1381            .map(|(sat, _, _)| *sat)
1382            .collect();
1383        sats.sort_unstable();
1384        sats.dedup();
1385        let n = sats.len();
1386        let mut sat_field = String::new();
1387        for sat in &sats {
1388            sat_field.push_str(sat);
1389        }
1390        for _ in n..17 {
1391            sat_field.push_str("  0");
1392        }
1393
1394        let mut body = String::new();
1395        body.push_str(&format!(
1396            "#cP2020  6 25  0  0  0.00000000       2 ORBIT {cs} FIT  TST\n"
1397        ));
1398        body.push_str(&format!(
1399            "## 2111 432000.00000000 {interval_s:14.8} 59025 0.0000000000000\n"
1400        ));
1401        body.push_str(&format!("+   {n:2}   {sat_field}\n"));
1402        body.push_str("++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n");
1403        body.push_str("%c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1404        body.push_str("%c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1405        body.push_str("%f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n");
1406        body.push_str("%f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n");
1407        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1408        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1409        body.push_str("/* TEST SP3-c FIXTURE\n");
1410        body.push_str("*  2020  6 25  0  0  0.00000000\n");
1411        for (sat, p, clk) in epoch0 {
1412            let c = clk.unwrap_or(999_999.999_999);
1413            body.push_str(&format!(
1414                "P{sat}{:14.6}{:14.6}{:14.6}{c:14.6}\n",
1415                p[0], p[1], p[2]
1416            ));
1417        }
1418        let second_hour = (interval_s as i64) / 3600;
1419        let second_minute = ((interval_s as i64) % 3600) / 60;
1420        let second_second = (interval_s as i64) % 60;
1421        body.push_str(&format!(
1422            "*  2020  6 25 {second_hour:2} {second_minute:2} {second_second:2}.00000000\n"
1423        ));
1424        for (sat, p, clk) in epoch1 {
1425            let c = clk.unwrap_or(999_999.999_999);
1426            body.push_str(&format!(
1427                "P{sat}{:14.6}{:14.6}{:14.6}{c:14.6}\n",
1428                p[0], p[1], p[2]
1429            ));
1430        }
1431        body.push_str("EOF\n");
1432        Sp3::parse(body.as_bytes()).expect("parse test sp3")
1433    }
1434
1435    // N consecutive epochs spaced `interval_s` apart from 2020-06-25 00:00:00.
1436    fn sp3_epochs(
1437        start_offset_s: f64,
1438        epochs: &[&[SatSample<'_>]],
1439        interval_s: f64,
1440        cs: &str,
1441    ) -> Sp3 {
1442        let mut sats: Vec<&str> = epochs
1443            .iter()
1444            .flat_map(|e| e.iter().map(|(sat, _, _)| *sat))
1445            .collect();
1446        sats.sort_unstable();
1447        sats.dedup();
1448        let n = sats.len();
1449        let mut sat_field = String::new();
1450        for sat in &sats {
1451            sat_field.push_str(sat);
1452        }
1453        for _ in n..17 {
1454            sat_field.push_str("  0");
1455        }
1456
1457        let hms = |t: i64| (t / 3600, (t % 3600) / 60, t % 60);
1458        let start = start_offset_s as i64;
1459        let (sh, sm, ss0) = hms(start);
1460
1461        let mut body = String::new();
1462        body.push_str(&format!(
1463            "#cP2020  6 25 {sh:2} {sm:2} {ss0:2}.00000000      {:2} ORBIT {cs} FIT  TST\n",
1464            epochs.len()
1465        ));
1466        // Seconds-of-week and MJD fraction of the first epoch shift with the start.
1467        let sow = 432_000.0 + start_offset_s;
1468        let mjd_frac = start_offset_s / SECONDS_PER_DAY;
1469        body.push_str(&format!(
1470            "## 2111 {sow:15.8} {interval_s:14.8} 59025 {mjd_frac:.13}\n"
1471        ));
1472        body.push_str(&format!("+   {n:2}   {sat_field}\n"));
1473        body.push_str("++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n");
1474        body.push_str("%c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1475        body.push_str("%c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1476        body.push_str("%f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n");
1477        body.push_str("%f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n");
1478        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1479        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1480        body.push_str("/* TEST SP3-c FIXTURE\n");
1481        for (k, recs) in epochs.iter().enumerate() {
1482            let (hh, mm, ss) = hms(start + (k as i64) * (interval_s as i64));
1483            body.push_str(&format!("*  2020  6 25 {hh:2} {mm:2} {ss:2}.00000000\n"));
1484            for (sat, p, clk) in recs.iter() {
1485                let c = clk.unwrap_or(999_999.999_999);
1486                body.push_str(&format!(
1487                    "P{sat}{:14.6}{:14.6}{:14.6}{c:14.6}\n",
1488                    p[0], p[1], p[2]
1489                ));
1490            }
1491        }
1492        body.push_str("EOF\n");
1493        Sp3::parse(body.as_bytes()).expect("parse test sp3")
1494    }
1495
1496    #[test]
1497    fn merge_unions_coverage_when_one_center_misses_a_satellite() {
1498        // Center A reports G01/G02/G03; center B is missing G03. The merged
1499        // product must still cover G03 at that epoch (filled from A).
1500        let a = sp3_records(&[
1501            ("G01", [15000.0, -20000.0, 5000.0], Some(100.0)),
1502            ("G02", [16000.0, -21000.0, 6000.0], Some(200.0)),
1503            ("G03", [17000.0, -22000.0, 7000.0], Some(300.0)),
1504        ]);
1505        let b = sp3_records(&[
1506            ("G01", [15000.0, -20000.0, 5000.0], Some(100.0)),
1507            ("G02", [16000.0, -21000.0, 6000.0], Some(200.0)),
1508        ]);
1509
1510        let (merged, report) = merge(&[a, b], &MergeOptions::default()).expect("merge");
1511
1512        let states = merged.states_at(0).expect("epoch 0");
1513        assert!(
1514            states.contains_key(&gps(3)),
1515            "merged output must cover G03 from the center that has it"
1516        );
1517        assert_eq!(states.len(), 3, "union is G01/G02/G03");
1518        // G01 agreed across both centers -> consensus clock is their value.
1519        let g01 = states[&gps(1)];
1520        assert!((g01.clock_s.unwrap() - 100.0e-6).abs() < 1.0e-15);
1521        // G03 had a single source -> carried through, recorded, not quarantined.
1522        assert!(report.quarantined.is_empty());
1523        assert_eq!(report.single_source.len(), 1);
1524        assert_eq!(report.single_source[0].satellite, gps(3));
1525
1526        // The un-cross-checked share is surfaced: 1 of 3 accepted cells (G03) was
1527        // single-source, so a clean multi-source agreement RMS is not the whole
1528        // story. An empty report reports None.
1529        let frac = report
1530            .single_source_fraction()
1531            .expect("accepted cells present");
1532        assert!(
1533            (frac - 1.0 / 3.0).abs() < 1.0e-12,
1534            "single-source fraction {frac}"
1535        );
1536        assert_eq!(MergeReport::default().single_source_fraction(), None);
1537    }
1538
1539    #[test]
1540    fn merge_combines_two_of_three_agreeing_sources_and_rejects_the_outlier() {
1541        // A and B agree on G01; C is 10 m off in X (> the default 0.5 m tolerance).
1542        let a = sp3_records(&[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))]);
1543        let b = sp3_records(&[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))]);
1544        let c = sp3_records(&[("G01", [15000.010, -20000.0, 5000.0], Some(100.0))]);
1545
1546        let (merged, report) = merge(&[a, b, c], &MergeOptions::default()).expect("merge");
1547
1548        let states = merged.states_at(0).expect("epoch 0");
1549        let g01 = states[&gps(1)];
1550        // Consensus is A/B (15000 km == 1.5e7 m); not dragged toward C.
1551        assert!(
1552            (g01.position.as_array()[0] - 15_000_000.0).abs() < 1.0e-3,
1553            "got {}",
1554            g01.position.as_array()[0]
1555        );
1556        // C is source index 2 -> recorded as the rejected position outlier.
1557        assert_eq!(report.position_outliers.len(), 1);
1558        assert_eq!(report.position_outliers[0].sources, vec![2]);
1559        assert!(report.quarantined.is_empty());
1560    }
1561
1562    #[test]
1563    fn merge_consensus_handles_more_than_u32_mask_bits() {
1564        // Thirty-two centers agree and the 33rd is 10 m off in X. This used to
1565        // overflow the u32 subset mask before any consensus could be found.
1566        let sources: Vec<Sp3> = (0..33)
1567            .map(|idx| {
1568                let x_km = if idx < 32 { 15000.0 } else { 15000.010 };
1569                sp3_records(&[("G01", [x_km, -20000.0, 5000.0], Some(100.0))])
1570            })
1571            .collect();
1572
1573        for combine in [MergeCombine::Mean, MergeCombine::Precedence] {
1574            let opts = MergeOptions {
1575                combine,
1576                min_agree: 32,
1577                ..MergeOptions::default()
1578            };
1579
1580            let (merged, report) = merge(&sources, &opts).expect("33-source merge");
1581
1582            let states = merged.states_at(0).expect("epoch 0");
1583            let g01 = states[&gps(1)];
1584            assert!(
1585                (g01.position.as_array()[0] - 15_000_000.0).abs() < 1.0e-3,
1586                "{combine:?}: got {}",
1587                g01.position.as_array()[0]
1588            );
1589            assert_eq!(
1590                report.position_outliers.len(),
1591                1,
1592                "{combine:?}: expected one outlier report"
1593            );
1594            assert_eq!(report.position_outliers[0].sources, vec![32]);
1595            assert!(report.quarantined.is_empty(), "{combine:?}");
1596        }
1597    }
1598
1599    #[test]
1600    fn merge_bounds_large_overlap_clique_search() {
1601        let sources: Vec<Sp3> = (0..40)
1602            .map(|idx| {
1603                let x_km = if idx % 2 == 0 { 15000.0 } else { 15000.010 };
1604                sp3_records(&[("G01", [x_km, -20000.0, 5000.0], Some(100.0))])
1605            })
1606            .collect();
1607        let opts = MergeOptions {
1608            min_agree: 20,
1609            ..MergeOptions::default()
1610        };
1611
1612        let (merged, report) = merge(&sources, &opts).expect("bounded large-source merge");
1613
1614        let states = merged.states_at(0).expect("epoch 0");
1615        let g01 = states[&gps(1)];
1616        assert!(
1617            (g01.position.as_array()[0] - 15_000_000.0).abs() < 1.0e-3,
1618            "got {}",
1619            g01.position.as_array()[0]
1620        );
1621        assert_eq!(report.position_outliers.len(), 1);
1622        assert_eq!(
1623            report.position_outliers[0].sources,
1624            (1..40).step_by(2).collect::<Vec<_>>()
1625        );
1626        assert!(report.quarantined.is_empty());
1627    }
1628
1629    #[test]
1630    fn merge_quarantines_a_satellite_all_centers_disagree_on() {
1631        // Three sources, mutually beyond tolerance on G01: no 2-of-3 consensus.
1632        let a = sp3_records(&[("G01", [15000.000, -20000.0, 5000.0], Some(100.0))]);
1633        let b = sp3_records(&[("G01", [15000.010, -20000.0, 5000.0], Some(100.0))]);
1634        let c = sp3_records(&[("G01", [15000.020, -20000.0, 5000.0], Some(100.0))]);
1635
1636        let (merged, report) = merge(&[a, b, c], &MergeOptions::default()).expect("merge");
1637
1638        assert!(
1639            merged.states_at(0).expect("epoch 0").is_empty(),
1640            "no consensus -> G01 omitted, not averaged across disagreeing centers"
1641        );
1642        assert_eq!(report.quarantined.len(), 1);
1643        assert_eq!(report.quarantined[0].satellite, gps(1));
1644    }
1645
1646    #[test]
1647    fn merge_rejects_an_empty_input() {
1648        assert!(merge(&[], &MergeOptions::default()).is_err());
1649    }
1650
1651    #[test]
1652    fn merge_omits_an_unalignable_secondary_clock() {
1653        // Only 3 common satellites, but the default clock datum needs 5, so
1654        // center B's clocks cannot be put on A's datum. They must be dropped
1655        // rather than emitted raw, and a B-only satellite gets a position but no
1656        // clock.
1657        let a = sp3_records(&[
1658            ("G01", [15000.0, -20000.0, 5000.0], Some(100.0)),
1659            ("G02", [16000.0, -21000.0, 6000.0], Some(200.0)),
1660            ("G03", [17000.0, -22000.0, 7000.0], Some(300.0)),
1661        ]);
1662        let b = sp3_records(&[
1663            ("G01", [15000.0, -20000.0, 5000.0], Some(150.0)),
1664            ("G02", [16000.0, -21000.0, 6000.0], Some(250.0)),
1665            ("G03", [17000.0, -22000.0, 7000.0], Some(350.0)),
1666            ("G04", [18000.0, -23000.0, 8000.0], Some(450.0)),
1667        ]);
1668
1669        let (merged, _) = merge(&[a, b], &MergeOptions::default()).expect("merge");
1670        let states = merged.states_at(0).expect("epoch 0");
1671
1672        // G04 is B-only (gap fill): position carried, clock unalignable -> dropped.
1673        assert!(states.contains_key(&gps(4)));
1674        assert!(
1675            states[&gps(4)].clock_s.is_none(),
1676            "an unalignable secondary clock must be dropped, not emitted raw"
1677        );
1678        // G01's clock comes from the reference (source 0), which is on its own datum.
1679        let g01_clock = states[&gps(1)]
1680            .clock_s
1681            .expect("G01 carries the reference clock");
1682        assert!((g01_clock - 100.0e-6).abs() < 1.0e-12, "got {g01_clock}");
1683    }
1684
1685    #[test]
1686    fn merge_rejects_mismatched_coordinate_systems() {
1687        let a = sp3_build(
1688            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
1689            "IGS14",
1690        );
1691        let b = sp3_build(
1692            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
1693            "IGS20",
1694        );
1695
1696        assert!(merge(&[a, b], &MergeOptions::default()).is_err());
1697    }
1698
1699    #[test]
1700    fn merge_rejects_different_igs_frame_labels_without_a_transform() {
1701        let a = sp3_build(
1702            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
1703            "IGS20",
1704        );
1705        let b = sp3_build(
1706            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
1707            "IGc20",
1708        );
1709
1710        let err = merge(&[a, b], &MergeOptions::default()).expect_err("frame mismatch");
1711        assert!(
1712            err.to_string().contains("mismatched coordinate systems"),
1713            "{err}"
1714        );
1715    }
1716
1717    #[test]
1718    fn merge_decimates_finer_interval_onto_coarse_common_grid() {
1719        // 15-min (900 s) center A and 5-min (300 s) center B over the same span.
1720        // The merge must decimate B onto the 900 s grid (exact subset of B's :00
1721        // and :15 epochs; the :05/:10 epochs are dropped, not interpolated),
1722        // output at 900 s. Under precedence, A (source 0) wins G01's whole arc and
1723        // B's distinct values must never be substituted mid-arc.
1724        let a = sp3_two_epochs(
1725            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1726            &[("G01", [15003.0, -20003.0, 5003.0], Some(103.0))],
1727            900.0,
1728            "IGS14",
1729        );
1730        let b = sp3_epochs(
1731            0.0,
1732            &[
1733                &[("G01", [26000.0, -20000.0, 5000.0], Some(200.0))],
1734                &[("G01", [26001.0, -20001.0, 5001.0], Some(201.0))],
1735                &[("G01", [26002.0, -20002.0, 5002.0], Some(202.0))],
1736                &[("G01", [26003.0, -20003.0, 5003.0], Some(203.0))],
1737            ],
1738            300.0,
1739            "IGS14",
1740        );
1741
1742        let opts = MergeOptions {
1743            combine: MergeCombine::Precedence,
1744            min_agree: 1,
1745            ..MergeOptions::default()
1746        };
1747        let (merged, _report) =
1748            merge(&[a, b], &opts).expect("mixed-interval merge decimates onto the coarse grid");
1749
1750        assert_eq!(
1751            merged.header.epoch_interval_s, 900.0,
1752            "output is on the coarse (900 s) common grid"
1753        );
1754        assert_eq!(
1755            merged.epochs.len(),
1756            2,
1757            "only the two aligned epochs (:00, :15), not B's four"
1758        );
1759        // Per-arc precedence intact across the decimated grid: A (source 0) wins
1760        // both epochs; B's :00/:15 values (26000xxx km) are never substituted.
1761        for idx in 0..2 {
1762            let g01 = merged.states_at(idx).expect("epoch")[&gps(1)];
1763            assert!(
1764                (g01.position.as_array()[0] - 15_000_000.0 - (idx as f64) * 3000.0).abs() < 1.0,
1765                "epoch {idx}: expected A's value, got {}",
1766                g01.position.as_array()[0]
1767            );
1768        }
1769        assert!(merged.states_at(0).expect("epoch 0").contains_key(&gps(1)));
1770        assert!(merged.states_at(1).expect("epoch 1").contains_key(&gps(1)));
1771    }
1772
1773    #[test]
1774    fn merge_decimates_with_explicit_coarser_target_interval() {
1775        // Two 5-min inputs, explicit 900 s target: both decimate to the 15-min grid.
1776        let recs = |x: f64| vec![("G01", [x, -20000.0, 5000.0], Some(100.0))];
1777        let make = || {
1778            sp3_epochs(
1779                0.0,
1780                &[
1781                    &recs(15000.0),
1782                    &recs(15001.0),
1783                    &recs(15002.0),
1784                    &recs(15003.0),
1785                ],
1786                300.0,
1787                "IGS14",
1788            )
1789        };
1790        let opts = MergeOptions {
1791            min_agree: 1,
1792            target_epoch_interval_s: Some(900.0),
1793            ..MergeOptions::default()
1794        };
1795        let (merged, _) = merge(&[make(), make()], &opts).expect("explicit coarse target");
1796        assert_eq!(merged.header.epoch_interval_s, 900.0);
1797        assert_eq!(
1798            merged.epochs.len(),
1799            2,
1800            "decimated 5-min inputs to the 900 s target"
1801        );
1802    }
1803
1804    #[test]
1805    fn merge_rejects_non_divisible_epoch_intervals() {
1806        // 900 s and 400 s: 900 is not an integer multiple of 400, so no exact
1807        // subset of the 400 s grid lands on the 900 s grid -> still rejected
1808        // (positional interpolation is never performed).
1809        let a = sp3_two_epochs(
1810            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1811            &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1812            900.0,
1813            "IGS14",
1814        );
1815        let b = sp3_two_epochs(
1816            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1817            &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1818            400.0,
1819            "IGS14",
1820        );
1821
1822        let err = merge(&[a, b], &MergeOptions::default()).expect_err("non-divisible intervals");
1823        assert!(
1824            err.to_string().contains("mismatched epoch intervals"),
1825            "{err}"
1826        );
1827    }
1828
1829    #[test]
1830    fn merge_rejects_a_non_whole_second_common_interval() {
1831        // The decimation lattice is whole-second J2000 keys, so a fractional
1832        // common interval must be rejected rather than silently rounded.
1833        let mk = || {
1834            sp3_two_epochs(
1835                &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1836                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1837                900.0,
1838                "IGS14",
1839            )
1840        };
1841        let opts = MergeOptions {
1842            target_epoch_interval_s: Some(450.5),
1843            ..MergeOptions::default()
1844        };
1845        let err = merge(&[mk(), mk()], &opts).expect_err("fractional target");
1846        assert!(err.to_string().contains("whole number of seconds"), "{err}");
1847    }
1848
1849    #[test]
1850    fn merge_header_first_epoch_describes_the_decimated_grid_start() {
1851        // Source A starts at 00:00, source B at 00:15 (both 15-min). The merged
1852        // grid's first epoch is the first COMMON epoch, 00:15, so the output
1853        // header's seconds-of-week / MJD fraction must describe 00:15 (source B),
1854        // not source A's earlier 00:00 start.
1855        let a = sp3_epochs(
1856            0.0,
1857            &[
1858                &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1859                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1860                &[("G01", [15002.0, -20002.0, 5002.0], Some(102.0))],
1861            ],
1862            900.0,
1863            "IGS14",
1864        );
1865        let b = sp3_epochs(
1866            900.0,
1867            &[
1868                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1869                &[("G01", [15002.0, -20002.0, 5002.0], Some(102.0))],
1870                &[("G01", [15003.0, -20003.0, 5003.0], Some(103.0))],
1871            ],
1872            900.0,
1873            "IGS14",
1874        );
1875
1876        let opts = MergeOptions {
1877            min_agree: 1,
1878            ..MergeOptions::default()
1879        };
1880        let (merged, _) = merge(&[a, b], &opts).expect("merge");
1881
1882        assert_eq!(merged.epochs.len(), 2, "common epochs are 00:15 and 00:30");
1883        assert!(
1884            (merged.header.seconds_of_week - 346_500.0).abs() < 1.0e-6,
1885            "header sow must describe the merged first epoch 00:15 (346500 s), got {}",
1886            merged.header.seconds_of_week
1887        );
1888        assert!(
1889            (merged.header.mjd_fraction - 900.0 / SECONDS_PER_DAY).abs() < 1.0e-9,
1890            "header MJD fraction must describe 00:15, got {}",
1891            merged.header.mjd_fraction
1892        );
1893    }
1894
1895    #[test]
1896    fn merge_writer_recomputes_header_when_common_grid_starts_after_all_inputs() {
1897        // A starts on the 15-minute grid at 00:00. B starts on a 7.5-minute grid
1898        // at 00:07:30. Their coarsened common grid starts at 00:15, which is not
1899        // the first epoch of either input, so the merged `##` header must be
1900        // derived from the output epoch rather than cloned from a source header.
1901        let a = sp3_epochs(
1902            0.0,
1903            &[
1904                &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1905                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1906                &[("G01", [15002.0, -20002.0, 5002.0], Some(102.0))],
1907            ],
1908            900.0,
1909            "IGS14",
1910        );
1911        let b = sp3_epochs(
1912            450.0,
1913            &[
1914                &[("G01", [15010.0, -20010.0, 5010.0], Some(110.0))],
1915                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1916                &[("G01", [15011.0, -20011.0, 5011.0], Some(111.0))],
1917                &[("G01", [15002.0, -20002.0, 5002.0], Some(102.0))],
1918            ],
1919            450.0,
1920            "IGS14",
1921        );
1922
1923        let opts = MergeOptions {
1924            min_agree: 1,
1925            ..MergeOptions::default()
1926        };
1927        let (merged, _) = merge(&[a, b], &opts).expect("mixed-cadence merge");
1928
1929        assert_eq!(merged.epochs.len(), 2, "common epochs are 00:15 and 00:30");
1930        let text = merged.to_sp3_string();
1931        let header = text
1932            .lines()
1933            .find(|line| line.starts_with("## "))
1934            .expect("written ## header");
1935        let first_epoch = text
1936            .lines()
1937            .find(|line| line.starts_with("*  "))
1938            .expect("written first epoch");
1939
1940        assert_eq!(first_epoch, "*  2020  6 25  0 15  0.00000000");
1941        assert_eq!(
1942            header,
1943            "## 2111 346500.00000000   900.00000000 59025 0.0104166666667"
1944        );
1945    }
1946
1947    #[test]
1948    fn precedence_merge_never_switches_source_within_one_satellite_arc() {
1949        let a = sp3_two_epochs(
1950            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1951            &[],
1952            900.0,
1953            "IGS14",
1954        );
1955        let b = sp3_two_epochs(
1956            &[("G01", [15000.001, -20000.0, 5000.0], Some(100.0))],
1957            &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1958            900.0,
1959            "IGS14",
1960        );
1961        let opts = MergeOptions {
1962            combine: MergeCombine::Precedence,
1963            min_agree: 1,
1964            ..MergeOptions::default()
1965        };
1966
1967        let (merged, _report) = merge(&[a, b], &opts).expect("merge");
1968        let epoch0 = merged.states_at(0).expect("epoch 0");
1969        let epoch1 = merged.states_at(1).expect("epoch 1");
1970
1971        assert!(epoch0.contains_key(&gps(1)));
1972        assert!(
1973            !epoch1.contains_key(&gps(1)),
1974            "G01 must not switch from source 0 at epoch 0 to source 1 at epoch 1"
1975        );
1976        assert_eq!(merged.header.epoch_interval_s, 900.0);
1977    }
1978
1979    #[test]
1980    fn merge_filters_requested_constellations_and_header_satellites() {
1981        let a = sp3_two_epochs(
1982            &[
1983                ("G01", [15000.0, -20000.0, 5000.0], Some(100.0)),
1984                ("E01", [21000.0, -1000.0, 13000.0], Some(120.0)),
1985            ],
1986            &[
1987                ("G01", [15001.0, -20001.0, 5001.0], Some(101.0)),
1988                ("E01", [21001.0, -1001.0, 13001.0], Some(121.0)),
1989            ],
1990            900.0,
1991            "IGS14",
1992        );
1993        let systems = BTreeSet::from([GnssSystem::Gps]);
1994        let opts = MergeOptions {
1995            systems: Some(systems),
1996            ..MergeOptions::default()
1997        };
1998
1999        let (merged, _report) = merge(&[a], &opts).expect("merge");
2000
2001        assert_eq!(merged.header.satellites, vec![gps(1)]);
2002        for idx in 0..merged.epochs.len() {
2003            let states = merged.states_at(idx).expect("epoch");
2004            assert_eq!(states.keys().copied().collect::<Vec<_>>(), vec![gps(1)]);
2005        }
2006    }
2007
2008    #[test]
2009    fn merge_preserves_a_clock_event_flag() {
2010        // Source A carries an `E` clock-event flag on G01 (column 75); the merged
2011        // product must keep it so the interpolator still splits the clock arc.
2012        let a = sp3_build(
2013            &[(
2014                "G01",
2015                [15000.0, -20000.0, 5000.0],
2016                Some(100.0),
2017                "              E",
2018            )],
2019            "IGS14",
2020        );
2021        let b = sp3_build(
2022            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
2023            "IGS14",
2024        );
2025
2026        let (merged, _) = merge(&[a, b], &MergeOptions::default()).expect("merge");
2027        let g01 = merged.states_at(0).expect("epoch 0")[&gps(1)];
2028
2029        assert!(
2030            g01.flags.clock_event,
2031            "merged cell must preserve a contributing source's clock-event flag"
2032        );
2033    }
2034
2035    #[test]
2036    fn merge_reports_effective_epoch_interval_from_actual_epochs() {
2037        // The header DECLARES a 300 s interval, but the two epochs are 15 min
2038        // (900 s) apart. The synthetic merged header must report the spacing of
2039        // the actual merged epochs, not inherit the stale declared value.
2040        let body = "#cP2020  6 25  0  0  0.00000000       2 ORBIT IGS14 FIT  TST\n\
2041            ## 2111 432000.00000000   300.00000000 59025 0.0000000000000\n\
2042            +    1   G01  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2043            ++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2044            %c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2045            %c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2046            %f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n\
2047            %f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n\
2048            %i    0    0    0    0      0      0      0      0         0\n\
2049            %i    0    0    0    0      0      0      0      0         0\n\
2050            /* TEST SP3-c FIXTURE\n\
2051            *  2020  6 25  0  0  0.00000000\n\
2052            PG01  15000.000000 -20000.000000   5000.000000    100.000000\n\
2053            *  2020  6 25  0 15  0.00000000\n\
2054            PG01  15001.000000 -20001.000000   5001.000000    101.000000\n\
2055            EOF\n";
2056        let a = Sp3::parse(body.as_bytes()).expect("parse test sp3");
2057
2058        let (merged, _) = merge(&[a], &MergeOptions::default()).expect("merge");
2059
2060        assert!(
2061            (merged.header.epoch_interval_s - 900.0).abs() < 1.0e-6,
2062            "got {}",
2063            merged.header.epoch_interval_s
2064        );
2065    }
2066
2067    #[test]
2068    fn merge_rejects_unsorted_input_epochs_before_cadence_inference() {
2069        let body = "#cP2020  6 25  0  0  0.00000000       2 ORBIT IGS14 FIT  TST\n\
2070            ## 2111 432000.00000000   900.00000000 59025 0.0000000000000\n\
2071            +    1   G01  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2072            ++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2073            %c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2074            %c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2075            %f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n\
2076            %f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n\
2077            %i    0    0    0    0      0      0      0      0         0\n\
2078            %i    0    0    0    0      0      0      0      0         0\n\
2079            /* TEST SP3-c FIXTURE\n\
2080            *  2020  6 25  0 15  0.00000000\n\
2081            PG01  15001.000000 -20001.000000   5001.000000    101.000000\n\
2082            *  2020  6 25  0  0  0.00000000\n\
2083            PG01  15000.000000 -20000.000000   5000.000000    100.000000\n\
2084            EOF\n";
2085        let source = Sp3::parse(body.as_bytes()).expect("parse unsorted test sp3");
2086
2087        let err = merge(&[source], &MergeOptions::default()).expect_err("unsorted epochs");
2088
2089        assert!(
2090            err.to_string()
2091                .contains("merge input epochs must be strictly increasing"),
2092            "{err}"
2093        );
2094    }
2095
2096    #[test]
2097    fn align_clock_reference_puts_other_on_the_reference_datum() {
2098        // `other`'s clocks all run +50 us ahead; after alignment they should sit
2099        // on `reference`'s datum (G01: 150 us - 50 us = 100 us = 1e-4 s).
2100        let reference = sp3([100.0, 200.0, 300.0]);
2101        let other = sp3([150.0, 250.0, 350.0]);
2102
2103        let aligned = align_clock_reference(&reference, &other, 3);
2104
2105        let g01 = aligned.states_at(0).expect("epoch 0")[&gps(1)];
2106        assert!(
2107            (g01.clock_s.unwrap() - 100.0e-6).abs() < 1.0e-15,
2108            "got {}",
2109            g01.clock_s.unwrap()
2110        );
2111        // Positions are untouched by clock alignment.
2112        let original = other.states_at(0).expect("epoch 0")[&gps(1)];
2113        assert_eq!(g01.position.as_array(), original.position.as_array());
2114    }
2115
2116    // Minimal single-epoch SP3-c with three satellites; each `clocks_us` entry is
2117    // that satellite's clock in microseconds (positions are arbitrary but non-zero
2118    // so they parse as valid records).
2119    fn sp3(clocks_us: [f64; 3]) -> Sp3 {
2120        let body = format!(
2121            "#cP2020  6 25  0  0  0.00000000       1 ORBIT IGS14 FIT  TST\n\
2122             ## 2111 432000.00000000   900.00000000 59025 0.0000000000000\n\
2123             +    3   G01G02G03  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2124             ++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2125             %c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2126             %c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2127             %f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n\
2128             %f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n\
2129             %i    0    0    0    0      0      0      0      0         0\n\
2130             %i    0    0    0    0      0      0      0      0         0\n\
2131             /* TEST SP3-c FIXTURE\n\
2132             *  2020  6 25  0  0  0.00000000\n\
2133             PG01  15000.000000 -20000.000000   5000.000000 {:13.6}\n\
2134             PG02  -1234.567890   2345.678901  -3456.789012 {:13.6}\n\
2135             PG03   8000.000000  12000.000000 -19000.000000 {:13.6}\n\
2136             EOF\n",
2137            clocks_us[0], clocks_us[1], clocks_us[2]
2138        );
2139        Sp3::parse(body.as_bytes()).expect("parse test sp3")
2140    }
2141
2142    #[test]
2143    fn recovers_a_uniform_datum_shift() {
2144        // every `other` clock is +50 us (= 5e-5 s) from `reference`.
2145        let reference = sp3([100.0, 200.0, 300.0]);
2146        let other = sp3([150.0, 250.0, 350.0]);
2147
2148        let offsets = clock_reference_offset(&reference, &other, 3);
2149
2150        assert_eq!(offsets.len(), 1);
2151        assert_eq!(offsets[0].satellites, 3);
2152        assert!(
2153            (offsets[0].offset_s - 5.0e-5).abs() < 1.0e-12,
2154            "got {}",
2155            offsets[0].offset_s
2156        );
2157    }
2158
2159    #[test]
2160    fn median_rejects_a_single_outlier_clock() {
2161        // Two satellites agree (+50 us); one is a wild outlier (+9000 us). The
2162        // median over the three tracks the consensus instead of being dragged out.
2163        let reference = sp3([100.0, 200.0, 300.0]);
2164        let other = sp3([150.0, 250.0, 9_300.0]);
2165
2166        let offsets = clock_reference_offset(&reference, &other, 3);
2167
2168        assert_eq!(offsets.len(), 1);
2169        assert!(
2170            (offsets[0].offset_s - 5.0e-5).abs() < 1.0e-12,
2171            "got {}",
2172            offsets[0].offset_s
2173        );
2174    }
2175
2176    #[test]
2177    fn omits_epochs_below_min_common() {
2178        // Three common clocked satellites, but require four: the fragile estimate
2179        // is omitted rather than reported.
2180        let reference = sp3([100.0, 200.0, 300.0]);
2181        let other = sp3([150.0, 250.0, 350.0]);
2182
2183        assert!(clock_reference_offset(&reference, &other, 4).is_empty());
2184    }
2185
2186    #[test]
2187    fn merge_agreement_metric_reports_known_position_dispersion() {
2188        // Three centers place G01 on a line, 0 / +3 m / +6 m in X, all within a
2189        // wide consensus tolerance. The mean combine writes +3 m, so the member
2190        // distances from the combined value are {3, 0, 3} m:
2191        //   RMS = sqrt((9 + 0 + 9) / 3) = sqrt(6) m,  max = 3 m.
2192        let a = sp3_records(&[("G01", [15000.000, -20000.0, 5000.0], Some(100.0))]);
2193        let b = sp3_records(&[("G01", [15000.003, -20000.0, 5000.0], Some(100.0))]);
2194        let c = sp3_records(&[("G01", [15000.006, -20000.0, 5000.0], Some(100.0))]);
2195        let opts = MergeOptions {
2196            position_tolerance_m: 10.0,
2197            min_agree: 3,
2198            combine: MergeCombine::Mean,
2199            ..MergeOptions::default()
2200        };
2201
2202        let (_merged, report) = merge(&[a, b, c], &opts).expect("merge");
2203
2204        assert_eq!(report.agreement.len(), 1, "one accepted cell");
2205        let m = report.agreement[0];
2206        assert_eq!(m.satellite, gps(1));
2207        assert_eq!(m.position_members, 3);
2208        assert!(
2209            (m.position_rms_m - 6.0_f64.sqrt()).abs() < 1.0e-6,
2210            "got rms {}",
2211            m.position_rms_m
2212        );
2213        assert!(
2214            (m.position_max_m - 3.0).abs() < 1.0e-6,
2215            "got max {}",
2216            m.position_max_m
2217        );
2218
2219        // The pooled summaries over the single cell reproduce the cell values.
2220        assert!((report.position_agreement_rms_m().unwrap() - 6.0_f64.sqrt()).abs() < 1.0e-6);
2221        assert!((report.position_agreement_max_m().unwrap() - 3.0).abs() < 1.0e-6);
2222
2223        // Per-epoch aggregate: one epoch, one multi-source satellite.
2224        let per_epoch = report.per_epoch_agreement();
2225        assert_eq!(per_epoch.len(), 1);
2226        assert_eq!(per_epoch[0].satellites, 1);
2227        assert!((per_epoch[0].position_rms_m - 6.0_f64.sqrt()).abs() < 1.0e-6);
2228        assert!((per_epoch[0].position_max_m - 3.0).abs() < 1.0e-6);
2229    }
2230
2231    #[test]
2232    fn merge_agreement_metric_reports_known_clock_dispersion() {
2233        // Same positions across A/B/C (zero position spread); the three centers
2234        // share a clock datum (G01/G02 identical) so the per-epoch datum offset is
2235        // zero and G03's clocks stay as authored: 300 / 330 / 270 us. The mean
2236        // combine writes 300 us, so the deviations are {0, +30, -30} us:
2237        //   RMS = sqrt((0 + 30^2 + 30^2)/3) us = sqrt(600) us,  max = 30 us.
2238        let a = sp3([100.0, 200.0, 300.0]);
2239        let b = sp3([100.0, 200.0, 330.0]);
2240        let c = sp3([100.0, 200.0, 270.0]);
2241        let opts = MergeOptions {
2242            clock_min_common: 1,
2243            clock_tolerance_s: 1.0e-3,
2244            min_agree: 3,
2245            combine: MergeCombine::Mean,
2246            ..MergeOptions::default()
2247        };
2248
2249        let (_merged, report) = merge(&[a, b, c], &opts).expect("merge");
2250
2251        let g03 = report
2252            .agreement
2253            .iter()
2254            .find(|m| m.satellite == gps(3))
2255            .expect("G03 agreement metric");
2256        assert_eq!(g03.clock_members, 3);
2257        let expected_rms_s = 600.0_f64.sqrt() * 1.0e-6;
2258        assert!(
2259            (g03.clock_rms_s.unwrap() - expected_rms_s).abs() < 1.0e-15,
2260            "got clock rms {:?}",
2261            g03.clock_rms_s
2262        );
2263        assert!(
2264            (g03.clock_max_s.unwrap() - 30.0e-6).abs() < 1.0e-15,
2265            "got clock max {:?}",
2266            g03.clock_max_s
2267        );
2268        // G01/G02 agree exactly -> zero clock dispersion.
2269        for prn in [1u8, 2] {
2270            let m = report
2271                .agreement
2272                .iter()
2273                .find(|m| m.satellite == gps(prn))
2274                .expect("metric");
2275            assert!(m.clock_rms_s.unwrap().abs() < 1.0e-18, "prn {prn}");
2276            // Positions identical across centers -> zero position dispersion too.
2277            assert!(m.position_rms_m.abs() < 1.0e-9, "prn {prn}");
2278        }
2279
2280        // The clock pooled summary is the RMS over the three multi-source cells
2281        // (G01=0, G02=0, G03), each with 3 members:
2282        //   sqrt((0 + 0 + 3*expected^2) / 9) = expected / sqrt(3).
2283        let pooled = report.clock_agreement_rms_s().expect("clock pool");
2284        assert!(
2285            (pooled - expected_rms_s / 3.0_f64.sqrt()).abs() < 1.0e-15,
2286            "got pooled {pooled}"
2287        );
2288        assert!((report.clock_agreement_max_s().unwrap() - 30.0e-6).abs() < 1.0e-15);
2289    }
2290
2291    // Real-data oracle: combine published individual analysis-center final
2292    // products (COD/GFZ/JPL, 2026-04-30, GPS week 2416 DOY 120) and compare to the
2293    // published IGS official combined for the same day. The IGS combination is a
2294    // specific weighted algorithm, so the crate's mean combine is not a bit-match;
2295    // the gate is agreement at the inter-center spread level (cm-level bound), gated
2296    // at RMS < 2 cm and max < 5 cm (observed RMS ~0.7 cm, max ~1.6 cm over 88 cells).
2297    //
2298    // Fixture provenance: the COD/GFZ/JPL `_trim.SP3` files are the final precise
2299    // orbit products of CODE (AIUB Bern), GFZ Potsdam, and JPL, all frame IGc20 /
2300    // time system GPS (ESA/GRG excluded for IGS20 frame labelling). From the Wuhan
2301    // University IGS mirror `ftp://igs.gnsswhu.cn/pub/gps/products/2416/`, full-day
2302    // `.gz`: COD0OPSFIN_20261200000_01D_05M_ORB.SP3.gz (634569 B, sha256
2303    // 90393acaed691cd4d19cd4ade7153873eb41ef38585df177d9d540eac6316112);
2304    // GFZ0OPSFIN…05M_ORB.SP3.gz (647028 B, sha256
2305    // a51a04ab283a981ddec20ae77d575cd05f4f8249202e0ee4f73e7243b7817e88);
2306    // JPL0OPSFIN…05M_ORB.SP3.gz (482973 B, sha256
2307    // 3a39ccb2d097eddb139047532b2b93c5d538abc39255fc779278ac64f10cd185). Each trim
2308    // keeps the verbatim header and only the 11 epochs 09:45..12:15 landing on the
2309    // combined's 900 s grid plus the 8-sat subset common to all three centers and
2310    // the combined (G02,G03,G04,G05,G09,G17,G25,G31); velocity/correlation records
2311    // dropped, no values altered. Trim sha256: COD…_trim.SP3 (7227 B)
2312    // f3ad3f637134651d086815345f3e5f531a9dbacb6f739b7dddf664e0ab3a1795;
2313    // GFZ…_trim.SP3 (9805 B)
2314    // 9e50edc53ac42791923fd71c39b49a97bf516084f1d2b1dcb260685d2a8f11cc;
2315    // JPL…_trim.SP3 (8210 B)
2316    // 9ac5aafdabed38679892f57b42864cc3716d997400280f29ee8049a37057adf4. The oracle
2317    // IGS0OPSFIN combined product provenance is in `sp3/tests.rs`.
2318    #[cfg(sidereon_repo_tests)]
2319    #[test]
2320    fn merge_agrees_with_published_igs_combined_within_cm() {
2321        fn load(name: &str) -> Sp3 {
2322            let path = format!("{}/tests/fixtures/sp3/{}", env!("CARGO_MANIFEST_DIR"), name);
2323            let bytes = std::fs::read(&path).unwrap_or_else(|e| panic!("read {path}: {e}"));
2324            Sp3::parse(&bytes).unwrap_or_else(|e| panic!("parse {name}: {e}"))
2325        }
2326
2327        let cod = load("COD0OPSFIN_20261200945_02H30M_15M_ORB_trim.SP3");
2328        let gfz = load("GFZ0OPSFIN_20261200945_02H30M_15M_ORB_trim.SP3");
2329        let jpl = load("JPL0OPSFIN_20261200945_02H30M_15M_ORB_trim.SP3");
2330        let igs = load("IGS0OPSFIN_20261200945_02H30M_15M_ORB.SP3");
2331
2332        let (merged, report) =
2333            merge(&[cod, gfz, jpl], &MergeOptions::default()).expect("multi-center merge");
2334
2335        // All three centers agree at the 0.5 m position tolerance: nothing
2336        // quarantined, every cell a 3-source consensus.
2337        assert!(
2338            report.quarantined.is_empty(),
2339            "centers should agree: {:?}",
2340            report.quarantined
2341        );
2342        // A clean 3-source consensus everywhere: no gap-fills, no rejected
2343        // outliers, and every accepted cell backed by all three centers.
2344        assert!(
2345            report.single_source.is_empty(),
2346            "{:?}",
2347            report.single_source
2348        );
2349        assert!(
2350            report.position_outliers.is_empty(),
2351            "{:?}",
2352            report.position_outliers
2353        );
2354        assert!(
2355            report.agreement.iter().all(|a| a.position_members == 3),
2356            "every agreement cell should be a 3-source consensus"
2357        );
2358
2359        let mut igs_idx: std::collections::BTreeMap<i64, usize> = std::collections::BTreeMap::new();
2360        for (i, ep) in igs.epochs.iter().enumerate() {
2361            if let Some(s) = super::instant_to_j2000_seconds(ep) {
2362                igs_idx.insert(s.floor() as i64, i);
2363            }
2364        }
2365
2366        let mut sumsq = 0.0_f64;
2367        let mut max = 0.0_f64;
2368        let mut n = 0usize;
2369        for (mi, ep) in merged.epochs.iter().enumerate() {
2370            let key = super::instant_to_j2000_seconds(ep)
2371                .expect("merged epoch key")
2372                .floor() as i64;
2373            let ii = *igs_idx.get(&key).expect("IGS combined covers merged epoch");
2374            let merged_states = merged.states_at(mi).expect("merged states");
2375            let igs_states = igs.states_at(ii).expect("IGS states");
2376            for (sat, mst) in merged_states.iter() {
2377                let ist = igs_states
2378                    .get(sat)
2379                    .unwrap_or_else(|| panic!("merged sat {sat} missing from IGS combined"));
2380                let d = super::dist3(&mst.position.as_array(), &ist.position.as_array());
2381                sumsq += d * d;
2382                max = max.max(d);
2383                n += 1;
2384            }
2385        }
2386
2387        // Exact coverage: 8 satellites x 11 epochs, every merged cell present in
2388        // the IGS combined (proves same epochs/sats, not a lucky subset).
2389        assert_eq!(n, 88, "expected exactly 88 compared cells, got {n}");
2390        let rms = (sumsq / n as f64).sqrt();
2391        // Observed on this day: RMS ~0.7 cm, max ~1.6 cm. Gate at a cm-level bound.
2392        assert!(
2393            rms < 0.02,
2394            "combine-vs-IGS RMS {:.4} m ({} cells) exceeds the 2 cm gate",
2395            rms,
2396            n
2397        );
2398        assert!(
2399            max < 0.05,
2400            "combine-vs-IGS max {max:.4} m exceeds the 5 cm gate"
2401        );
2402
2403        // The internal inter-center agreement metric is also cm-level.
2404        let dispersion = report
2405            .position_agreement_rms_m()
2406            .expect("multi-source cells present");
2407        assert!(
2408            dispersion < 0.05,
2409            "inter-center position dispersion {dispersion:.4} m"
2410        );
2411    }
2412}