Skip to main content

sidereon_core/sp3/
combine.rs

1//! Multi-source SP3 combination: clock-datum alignment across analysis centers.
2//!
3//! Precise clock products from different analysis centers are referenced to
4//! different station/ensemble clocks, so their raw clock values differ by a
5//! per-epoch common offset - the reference-clock difference - that drifts over
6//! the day. Before clocks from two centers can be compared or combined, that
7//! datum must be removed. [`clock_reference_offset`] estimates it robustly (the
8//! median, over the satellites both products report at each epoch, of
9//! `other - reference`); subtract it from `other`'s clocks to put both products
10//! on `reference`'s datum.
11//!
12//! Orbit positions need no such treatment: every center reports ITRF
13//! center-of-mass coordinates, so cross-center position differences are already
14//! directly comparable.
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use crate::astro::math::vec3;
19use crate::astro::time::civil::mjd_from_jd;
20use crate::astro::time::gnss;
21use crate::astro::time::model::Instant;
22
23use super::interp::{instant_to_j2000_seconds, sp3_epoch_j2000_seconds};
24use super::{RawNode, Sp3, Sp3DataType, Sp3Flags, Sp3Header, Sp3State};
25use crate::constants::{GPS_EPOCH_TO_J2000_S, KM_TO_M};
26use crate::frame::ItrfPositionM;
27use crate::id::{GnssSatelliteId, GnssSystem};
28use crate::tolerances::WHOLE_SECOND_EPS_S;
29use crate::validate;
30use crate::{Error, Result};
31
32const MAX_EXACT_CLIQUE_NODES: usize = 32;
33
34/// One epoch's reference-clock offset of `other` relative to `reference`.
35#[derive(Debug, Clone, Copy, PartialEq)]
36pub struct ClockReferenceOffset {
37    /// The matched epoch.
38    pub epoch: Instant,
39    /// `other - reference` clock datum at this epoch, in seconds. Positive means
40    /// `other`'s clock datum runs ahead of `reference`'s; subtract it from
41    /// `other`'s clocks to align them to `reference`.
42    pub offset_s: f64,
43    /// Number of satellites that contributed to the (median) estimate.
44    pub satellites: usize,
45}
46
47/// Estimate the per-epoch reference-clock offset of `other` relative to
48/// `reference`.
49///
50/// For each epoch present in both products, the offset is the median over the
51/// satellites both report (each with a finite clock) of
52/// `other_clock - reference_clock`. The median makes the estimate robust to a
53/// single satellite whose clock one center has wrong - but only with enough
54/// satellites, so `min_common` is the minimum number of common clocked
55/// satellites required to emit an offset for an epoch (a sound robust median
56/// wants at least three, so one outlier can be outvoted). Epochs with fewer
57/// common clocks are omitted rather than reported as a fragile one- or
58/// two-satellite estimate.
59///
60/// Epochs are matched by their J2000 second floored to a whole second (the same
61/// node-axis convention the interpolator uses). Non-finite clock differences are
62/// skipped. Epochs present in only one product, or below `min_common`, are
63/// omitted from the result.
64///
65/// The floored-whole-second key assumes the input cadence is at least one second,
66/// which holds for every standard SP3 product (15 min, 5 min, 1 min, ... down to
67/// 1 s). Two distinct epochs less than a second apart would collapse onto the
68/// same key and be matched as one; the same applies to the floored key in
69/// [`MergeReport::per_epoch_agreement`]. This is kept deliberately aligned with
70/// the interpolator's node axis rather than refined to sub-second resolution, so
71/// that matching here and interpolation downstream use one consistent grid.
72pub fn clock_reference_offset(
73    reference: &Sp3,
74    other: &Sp3,
75    min_common: usize,
76) -> Vec<ClockReferenceOffset> {
77    let mut other_index: std::collections::HashMap<i64, usize> = std::collections::HashMap::new();
78    for (idx, epoch) in other.epochs.iter().enumerate() {
79        if let Some(seconds) = sp3_epoch_j2000_seconds(other, idx, epoch) {
80            other_index.insert(seconds.floor() as i64, idx);
81        }
82    }
83
84    let mut offsets = Vec::new();
85
86    for (ref_idx, epoch) in reference.epochs.iter().enumerate() {
87        let Some(ref_seconds) = sp3_epoch_j2000_seconds(reference, ref_idx, epoch) else {
88            continue;
89        };
90        let Some(&other_idx) = other_index.get(&(ref_seconds.floor() as i64)) else {
91            continue;
92        };
93
94        let (Ok(ref_states), Ok(other_states)) =
95            (reference.states_at(ref_idx), other.states_at(other_idx))
96        else {
97            continue;
98        };
99
100        let mut diffs: Vec<f64> = Vec::new();
101        for (sat, ref_state) in ref_states.iter() {
102            let Some(ref_clock) = ref_state.clock_s else {
103                continue;
104            };
105            if let Some(other_state) = other_states.get(sat) {
106                if let Some(other_clock) = other_state.clock_s {
107                    let diff = other_clock - ref_clock;
108                    // SP3 should not carry NaN/inf clocks, but the parser can
109                    // accept them; merge infrastructure must not panic on data.
110                    if diff.is_finite() {
111                        diffs.push(diff);
112                    }
113                }
114            }
115        }
116
117        if diffs.len() >= min_common.max(1) {
118            if let Some(offset_s) = median(&mut diffs) {
119                offsets.push(ClockReferenceOffset {
120                    epoch: *epoch,
121                    offset_s,
122                    satellites: diffs.len(),
123                });
124            }
125        }
126    }
127
128    offsets
129}
130
131fn median(values: &mut [f64]) -> Option<f64> {
132    // Inputs are pre-filtered to finite values; total_cmp never panics regardless.
133    crate::astro::math::robust::median_sorting_in_place(values)
134}
135
136// ===========================================================================
137// Multi-source merge
138// ===========================================================================
139
140/// How the agreeing (consensus) sources for a cell are combined into the merged
141/// value.
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum MergeCombine {
144    /// Arithmetic mean of the consensus sources. The clustering step has already
145    /// removed outliers, so the mean uses every agreeing measurement. Default.
146    Mean,
147    /// Component-wise median of the consensus sources.
148    Median,
149    /// The value from the highest-precedence (earliest-listed) consensus source.
150    Precedence,
151}
152
153/// Options for [`merge`].
154#[derive(Debug, Clone, PartialEq)]
155pub struct MergeOptions {
156    /// Maximum 3D position difference (meters) for two sources to be in
157    /// agreement.
158    pub position_tolerance_m: f64,
159    /// Maximum clock difference (seconds, after datum alignment) for two sources
160    /// to be in agreement.
161    pub clock_tolerance_s: f64,
162    /// Minimum number of mutually-agreeing sources required to accept a cell that
163    /// has two or more sources. A cell with a single source is always carried
164    /// through (gap fill, recorded as `single_source`); a cell with several
165    /// sources but no agreeing subset this large is quarantined rather than
166    /// averaged across disagreeing centers.
167    pub min_agree: usize,
168    /// Minimum common clocked satellites for the per-epoch clock-datum estimate
169    /// between two sources (see [`clock_reference_offset`]).
170    pub clock_min_common: usize,
171    /// How to combine the agreeing sources.
172    pub combine: MergeCombine,
173    /// Optional target epoch interval, in seconds. When unset the coarsest input
174    /// interval is used. Finer inputs are decimated onto this grid by exact
175    /// subset selection (never interpolated); inputs whose interval does not
176    /// evenly divide it are rejected.
177    pub target_epoch_interval_s: Option<f64>,
178    /// Optional constellation/system filter. When set, only satellites whose
179    /// system is in this set are considered for the merged product.
180    pub systems: Option<BTreeSet<GnssSystem>>,
181}
182
183impl Default for MergeOptions {
184    /// Defaults tuned for the common case of ~3 analysis centers: agreement is a
185    /// 2-of-3 majority (`min_agree = 2`); combine the agreeing subset by mean.
186    fn default() -> Self {
187        Self {
188            position_tolerance_m: 0.5,
189            clock_tolerance_s: 5.0e-9,
190            min_agree: 2,
191            clock_min_common: 5,
192            combine: MergeCombine::Mean,
193            target_epoch_interval_s: None,
194            systems: None,
195        }
196    }
197}
198
199/// One (epoch, satellite) cell the merge handled with a caveat. Nothing is
200/// dropped or averaged silently - every such cell is recorded here.
201#[derive(Debug, Clone, PartialEq)]
202pub struct MergeFlag {
203    /// The epoch.
204    pub epoch: Instant,
205    /// The satellite.
206    pub satellite: GnssSatelliteId,
207    /// The source indices (into the input slice) this flag refers to: for
208    /// `single_source`, the lone contributor; for `quarantined`, all sources
209    /// that disagreed; for `position_outliers`, the sources rejected from an
210    /// otherwise-accepted consensus.
211    pub sources: Vec<usize>,
212}
213
214/// Per-(epoch, satellite) agreement statistics for one accepted consensus cell:
215/// how tightly the consensus member values cluster about the combined value that
216/// was actually written to the merged product.
217///
218/// The dispersion is measured about the *combined* value (the mean, median, or
219/// precedence pick - whatever the strategy wrote), not about the cluster centroid,
220/// so it reflects the agreement of the product the merge emitted. A single-source
221/// cell has one member and zero dispersion.
222#[derive(Debug, Clone, Copy, PartialEq)]
223pub struct AgreementMetric {
224    /// The epoch.
225    pub epoch: Instant,
226    /// The satellite.
227    pub satellite: GnssSatelliteId,
228    /// Number of sources in the accepted position consensus (>= 1).
229    pub position_members: usize,
230    /// RMS, over the position-consensus members, of the 3D distance from the
231    /// combined position, meters. Zero for a single-source cell.
232    pub position_rms_m: f64,
233    /// Largest 3D distance of any position-consensus member from the combined
234    /// position, meters.
235    pub position_max_m: f64,
236    /// Number of sources in the accepted clock consensus (0 when the cell carries
237    /// no clock).
238    pub clock_members: usize,
239    /// RMS, over the clock-consensus members, of the deviation from the combined
240    /// clock, seconds; `None` when the cell carries no clock.
241    pub clock_rms_s: Option<f64>,
242    /// Largest absolute clock deviation from the combined clock, seconds; `None`
243    /// when the cell carries no clock.
244    pub clock_max_s: Option<f64>,
245}
246
247/// Per-epoch aggregate of [`AgreementMetric`] over the satellites combined at that
248/// epoch, restricted to cells with a *multi-source* consensus (a single source
249/// has no measurable dispersion, so it is excluded from the aggregate spread).
250#[derive(Debug, Clone, Copy, PartialEq)]
251pub struct EpochAgreement {
252    /// The epoch.
253    pub epoch: Instant,
254    /// Satellites at this epoch with a multi-source position consensus.
255    pub satellites: usize,
256    /// Member-count-weighted pooled RMS of the per-cell position dispersion over
257    /// those satellites, meters (i.e. the RMS of every member-to-combined 3D
258    /// distance pooled across the epoch).
259    pub position_rms_m: f64,
260    /// Worst per-cell position dispersion at this epoch, meters.
261    pub position_max_m: f64,
262    /// As `position_rms_m` for the clock channel; `None` when no multi-source
263    /// clock consensus existed at this epoch.
264    pub clock_rms_s: Option<f64>,
265    /// Worst per-cell clock dispersion at this epoch, seconds; `None` as above.
266    pub clock_max_s: Option<f64>,
267}
268
269/// Audit trail for a [`merge`].
270#[derive(Debug, Clone, Default, PartialEq)]
271pub struct MergeReport {
272    /// Cells where two or more sources disagreed beyond tolerance with no
273    /// agreeing subset of `min_agree` - omitted from the merged product.
274    pub quarantined: Vec<MergeFlag>,
275    /// Cells carried from a single source (no cross-check was possible).
276    pub single_source: Vec<MergeFlag>,
277    /// Cells accepted by consensus where one or more sources were rejected as
278    /// position outliers.
279    pub position_outliers: Vec<MergeFlag>,
280    /// Per-(epoch, satellite) agreement statistics for every accepted cell, in
281    /// output (epoch, then satellite) order - one entry per cell written to the
282    /// merged product. Quantifies how tightly the consensus sources clustered
283    /// about the combined value (Gap: per-epoch quality metrics).
284    pub agreement: Vec<AgreementMetric>,
285}
286
287impl MergeReport {
288    /// Fraction of accepted cells that were carried from a single source, in
289    /// `0.0..=1.0`; `None` when no cells were accepted.
290    ///
291    /// This is the blind-spot companion to the agreement-RMS accessors, which
292    /// quantify dispersion only over *multi-source* cells. A product can show a
293    /// tight (or `None`) agreement RMS yet be largely un-cross-checked: those
294    /// gap-fill cells (also enumerated in [`MergeReport::single_source`]) had no
295    /// second source to compare against. Read this alongside the RMS so a clean
296    /// dispersion is not mistaken for a fully corroborated product.
297    pub fn single_source_fraction(&self) -> Option<f64> {
298        let accepted = self.agreement.len();
299        (accepted > 0).then(|| self.single_source.len() as f64 / accepted as f64)
300    }
301
302    /// Member-count-weighted pooled RMS of the per-cell position dispersion over
303    /// every accepted cell with a multi-source consensus, meters. `None` when no
304    /// cell had two or more position-consensus members.
305    ///
306    /// The pool is exact: each cell contributes its summed squared member-to-
307    /// combined distances (`position_rms_m^2 * position_members`), normalised by
308    /// the total member count, so the result is the RMS of all member-to-combined
309    /// distances across the whole product.
310    ///
311    /// This covers only multi-source cells; single-source gap-fill cells are
312    /// excluded (they have no dispersion). A small or `None` result therefore does
313    /// not by itself mean the whole product was corroborated - check
314    /// [`MergeReport::single_source_fraction`] for the un-cross-checked share.
315    pub fn position_agreement_rms_m(&self) -> Option<f64> {
316        pooled_rms(
317            self.agreement
318                .iter()
319                .filter(|m| m.position_members >= 2)
320                .map(|m| (m.position_rms_m, m.position_members)),
321        )
322    }
323
324    /// Largest single-cell position dispersion over all accepted cells, meters.
325    /// `None` when there are no accepted cells.
326    pub fn position_agreement_max_m(&self) -> Option<f64> {
327        self.agreement
328            .iter()
329            .map(|m| m.position_max_m)
330            .fold(None, |acc, v| Some(fold_max(acc, v)))
331    }
332
333    /// As [`Self::position_agreement_rms_m`] for the clock channel, seconds.
334    pub fn clock_agreement_rms_s(&self) -> Option<f64> {
335        pooled_rms(self.agreement.iter().filter_map(|m| {
336            m.clock_rms_s
337                .filter(|_| m.clock_members >= 2)
338                .map(|rms| (rms, m.clock_members))
339        }))
340    }
341
342    /// Largest single-cell clock dispersion over all accepted cells, seconds.
343    pub fn clock_agreement_max_s(&self) -> Option<f64> {
344        self.agreement
345            .iter()
346            .filter_map(|m| m.clock_max_s)
347            .fold(None, |acc, v| Some(fold_max(acc, v)))
348    }
349
350    /// Per-epoch aggregate agreement, in output-epoch order. Each entry pools the
351    /// multi-source cells at that epoch (see [`EpochAgreement`]); epochs whose
352    /// cells were all single-source are still listed with `satellites == 0` and a
353    /// zero position spread so the caller sees every output epoch.
354    pub fn per_epoch_agreement(&self) -> Vec<EpochAgreement> {
355        let mut out: Vec<EpochAgreement> = Vec::new();
356        let mut current_key: Option<i64> = None;
357        for m in &self.agreement {
358            let key = instant_to_j2000_seconds(&m.epoch).map(|s| s.floor() as i64);
359            if current_key != key || out.is_empty() {
360                out.push(EpochAgreement {
361                    epoch: m.epoch,
362                    satellites: 0,
363                    position_rms_m: 0.0,
364                    position_max_m: 0.0,
365                    clock_rms_s: None,
366                    clock_max_s: None,
367                });
368                current_key = key;
369            }
370            let agg = out.last_mut().expect("just pushed");
371            agg.position_max_m = agg.position_max_m.max(m.position_max_m);
372            if m.position_members >= 2 {
373                agg.satellites += 1;
374            }
375            // Only multi-source clock cells contribute to the epoch clock max,
376            // matching the RMS path: a single-member cell has zero dispersion and
377            // must not leave clock_max_s = Some(0.0) while clock_rms_s is None.
378            if let Some(max) = m.clock_max_s.filter(|_| m.clock_members >= 2) {
379                agg.clock_max_s = Some(fold_max(agg.clock_max_s, max));
380            }
381        }
382
383        // Pooled RMS per epoch needs the sum of squared distances, which the per
384        // entry RMS encodes; recompute it in a second pass grouped by epoch key.
385        for agg in &mut out {
386            let key = instant_to_j2000_seconds(&agg.epoch).map(|s| s.floor() as i64);
387            agg.position_rms_m = pooled_rms(
388                self.agreement
389                    .iter()
390                    .filter(|m| {
391                        m.position_members >= 2
392                            && instant_to_j2000_seconds(&m.epoch).map(|s| s.floor() as i64) == key
393                    })
394                    .map(|m| (m.position_rms_m, m.position_members)),
395            )
396            .unwrap_or(0.0);
397            agg.clock_rms_s = pooled_rms(
398                self.agreement
399                    .iter()
400                    .filter(|m| instant_to_j2000_seconds(&m.epoch).map(|s| s.floor() as i64) == key)
401                    .filter_map(|m| {
402                        m.clock_rms_s
403                            .filter(|_| m.clock_members >= 2)
404                            .map(|rms| (rms, m.clock_members))
405                    }),
406            );
407        }
408
409        out
410    }
411}
412
413/// Pool per-cell RMS values weighted by member count into one RMS:
414/// `sqrt(sum(rms_i^2 * n_i) / sum(n_i))`. `None` when the iterator is empty.
415fn pooled_rms(cells: impl Iterator<Item = (f64, usize)>) -> Option<f64> {
416    let mut sumsq = 0.0_f64;
417    let mut total = 0_usize;
418    for (rms, n) in cells {
419        sumsq += rms * rms * n as f64;
420        total += n;
421    }
422    (total > 0).then(|| (sumsq / total as f64).sqrt())
423}
424
425/// `max` reduction over an `Option` accumulator (`None` is the empty identity).
426fn fold_max(acc: Option<f64>, value: f64) -> f64 {
427    match acc {
428        Some(current) if current >= value => current,
429        _ => value,
430    }
431}
432
433/// Merge several SP3 products from different analysis centers into one
434/// consistent precise-ephemeris dataset.
435///
436/// Orthogonal to time-stitching: this combines providers at the **same** epochs.
437/// Inputs must be on one common, uniform epoch grid. Mixed-cadence products are
438/// rejected rather than unioned onto a finer grid; callers that need that must
439/// resample first. For every (epoch, satellite) cell on the common grid:
440///
441/// - **Union satellite coverage.** A satellite present in any input may appear
442///   in the output, but only on the shared grid and only when doing so preserves
443///   a coherent source/consensus arc.
444/// - **Position consensus.** With one source the value is carried through
445///   (`single_source`). With several, the largest subset of sources mutually
446///   within `position_tolerance_m` is found; if it has at least `min_agree`
447///   members it is combined per `combine` and any sources outside it are recorded
448///   as `position_outliers`. If no such subset exists the cell is `quarantined`
449///   (omitted) - never averaged across disagreeing centers.
450/// - **Clock consensus.** Clocks are first put on a common datum (each source
451///   aligned to the first via [`clock_reference_offset`]), then combined by the
452///   same agreement rule; a cell with no clock consensus carries no clock. A
453///   non-reference source whose datum cannot be estimated at an epoch (below
454///   `clock_min_common` common clocks) contributes **no** clock there rather than
455///   an unaligned one - its position is still merged.
456///
457/// `Precedence` is resolved per satellite arc: once a satellite is assigned to
458/// the highest-precedence source that carries it, that satellite never switches
459/// centers at adjacent epochs. If that source is missing a cell, the cell is
460/// omitted rather than filled from a lower-precedence source.
461///
462/// All inputs must share an exact SP3 time-system label and exact
463/// coordinate-system label (epochs are matched across products in that scale);
464/// mismatches are rejected. The merged record flags are the union (OR) of the
465/// contributing sources' flags - in particular a `clock_event` on any
466/// clock-consensus member is preserved, so the interpolator still splits the
467/// clock arc. The merged header is **synthetic**: its first-epoch fields
468/// describe the union's first epoch and its data type is position-only.
469///
470/// Pure and deterministic: order the inputs by center precedence and ties (equal
471/// cluster sizes, `Precedence` combine) resolve to the earliest-listed source.
472/// The merged product's interpolation nodes are the consensus values, so it
473/// samples and interpolates like any other [`Sp3`] (it is a derived combination,
474/// not a byte-faithful copy of any one center). Consensus is exact max-clique for
475/// normal source counts and uses a deterministic greedy fallback above the exact
476/// search cap, so hostile disagreement graphs remain bounded.
477pub fn merge(sources: &[Sp3], opts: &MergeOptions) -> Result<(Sp3, MergeReport)> {
478    if sources.is_empty() {
479        return Err(Error::InvalidInput(
480            "merge requires at least one SP3 product".into(),
481        ));
482    }
483
484    // Inputs must be combinable: epochs are matched in one exact product time
485    // system, and positions are only comparable in an exactly common coordinate
486    // system / frame. Do not silently alias labels such as QZS/GPS or
487    // IGS20/IGc20 here: without an explicit transform, a differently labeled
488    // product contract is a different product contract.
489    let base = &sources[0].header;
490    for s in &sources[1..] {
491        if s.header.time_system != base.time_system {
492            return Err(Error::InvalidInput(format!(
493                "merge inputs have mismatched SP3 time systems ({:?} vs {:?})",
494                base.time_system, s.header.time_system
495            )));
496        }
497        if s.header.coordinate_system.trim() != base.coordinate_system.trim() {
498            return Err(Error::InvalidInput(format!(
499                "merge inputs have mismatched coordinate systems ({:?} vs {:?})",
500                base.coordinate_system, s.header.coordinate_system
501            )));
502        }
503    }
504
505    // floored-J2000-second -> epoch index, per source.
506    let epoch_index: Vec<BTreeMap<i64, usize>> = sources
507        .iter()
508        .map(|s| {
509            s.epochs
510                .iter()
511                .enumerate()
512                .filter_map(|(i, ep)| {
513                    sp3_epoch_j2000_seconds(s, i, ep).map(|sec| (sec.floor() as i64, i))
514                })
515                .collect()
516        })
517        .collect();
518
519    let epoch_interval_s = resolve_common_epoch_interval(sources, opts.target_epoch_interval_s)?;
520
521    // Per-source per-epoch clock-datum offset relative to source 0. Source 0 is
522    // the datum, so its offset is identically zero.
523    let clock_offset: Vec<BTreeMap<i64, f64>> = sources
524        .iter()
525        .enumerate()
526        .map(|(idx, s)| {
527            if idx == 0 {
528                BTreeMap::new()
529            } else {
530                clock_reference_offset(&sources[0], s, opts.clock_min_common)
531                    .into_iter()
532                    .filter_map(|o| {
533                        instant_to_j2000_seconds(&o.epoch)
534                            .map(|sec| (sec.floor() as i64, o.offset_s))
535                    })
536                    .collect()
537            }
538        })
539        .collect();
540
541    // Intersection of epochs (by floored second), keeping source 0's
542    // representative Instant. Mixing a 15-minute product with 5-minute products
543    // must not emit the union grid; if all inputs share a cadence but differ in
544    // coverage, only the epochs present in every product are combined.
545    let mut epoch_keys: BTreeMap<i64, Instant> = sources[0]
546        .epochs
547        .iter()
548        .enumerate()
549        .filter_map(|(idx, ep)| {
550            sp3_epoch_j2000_seconds(&sources[0], idx, ep).map(|sec| (sec.floor() as i64, *ep))
551        })
552        .collect();
553
554    for index in epoch_index.iter().skip(1) {
555        epoch_keys.retain(|key, _| index.contains_key(key));
556    }
557
558    // Decimate onto the resolved common-interval grid (anchored at the earliest
559    // common epoch): keep only epochs that land on the grid, dropping off-grid
560    // epochs by exact subset selection (never interpolation). A no-op when the
561    // inputs already share the interval; the real decimation when finer inputs
562    // are mixed with a coarser one, or an explicit coarser target is requested.
563    if let Some((&anchor, _)) = epoch_keys.iter().next() {
564        let step = epoch_interval_s.round() as i64;
565        if step > 0 {
566            epoch_keys.retain(|&key, _| (key - anchor).rem_euclid(step) == 0);
567        }
568    }
569
570    if epoch_keys.is_empty() {
571        return Err(Error::InvalidInput(
572            "merge inputs have no common epochs on a shared time grid".into(),
573        ));
574    }
575
576    let precedence_source_for_sat = if opts.combine == MergeCombine::Precedence {
577        Some(precedence_sources_for_satellites(
578            sources,
579            &epoch_index,
580            &epoch_keys,
581            opts.systems.as_ref(),
582        ))
583    } else {
584        None
585    };
586
587    let allowed_system = |sat: &GnssSatelliteId| {
588        opts.systems
589            .as_ref()
590            .is_none_or(|systems| systems.contains(&sat.system))
591    };
592
593    if let Some(systems) = &opts.systems {
594        if systems.is_empty() {
595            return Err(Error::InvalidInput(
596                "merge systems filter must not be empty".into(),
597            ));
598        }
599    }
600
601    let mut out_epochs: Vec<Instant> = Vec::with_capacity(epoch_keys.len());
602    let mut out_epoch_j2000_s: Vec<f64> = Vec::with_capacity(epoch_keys.len());
603    let mut out_states: Vec<BTreeMap<GnssSatelliteId, Sp3State>> =
604        Vec::with_capacity(epoch_keys.len());
605    let mut out_raw: Vec<BTreeMap<GnssSatelliteId, RawNode>> = Vec::with_capacity(epoch_keys.len());
606    let mut report = MergeReport::default();
607    let mut all_sats: BTreeSet<GnssSatelliteId> = BTreeSet::new();
608
609    for (&key, &epoch) in &epoch_keys {
610        out_epochs.push(epoch);
611        out_epoch_j2000_s.push(key as f64);
612        let mut states: BTreeMap<GnssSatelliteId, Sp3State> = BTreeMap::new();
613        let mut raws: BTreeMap<GnssSatelliteId, RawNode> = BTreeMap::new();
614
615        // Satellites present at this epoch in any source, after any requested
616        // constellation filter.
617        let mut sats: BTreeSet<GnssSatelliteId> = BTreeSet::new();
618        for (idx, s) in sources.iter().enumerate() {
619            if let Some(&ei) = epoch_index[idx].get(&key) {
620                if let Ok(map) = s.states_at(ei) {
621                    sats.extend(map.keys().copied().filter(|sat| allowed_system(sat)));
622                }
623            }
624        }
625
626        for sat in sats {
627            // (source_idx, position_m, flags) and (source_idx, datum-aligned
628            // clock_s, flags). A non-reference source contributes a clock only
629            // when its datum offset could be estimated at this epoch; otherwise
630            // its clock would be unaligned, so it is omitted (the position is
631            // still gathered).
632            let preferred_source = precedence_source_for_sat
633                .as_ref()
634                .and_then(|by_sat| by_sat.get(&sat).copied());
635
636            let mut pos: Vec<(usize, [f64; 3], Sp3Flags)> = Vec::new();
637            let mut clk: Vec<(usize, f64, Sp3Flags)> = Vec::new();
638            for (idx, s) in sources.iter().enumerate() {
639                let Some(&ei) = epoch_index[idx].get(&key) else {
640                    continue;
641                };
642                let Ok(map) = s.states_at(ei) else { continue };
643                let Some(state) = map.get(&sat) else { continue };
644                pos.push((idx, state.position.as_array(), state.flags));
645                if let Some(c) = state.clock_s {
646                    let offset = if idx == 0 {
647                        Some(0.0)
648                    } else {
649                        clock_offset[idx].get(&key).copied()
650                    };
651                    if let Some(off) = offset {
652                        let aligned = c - off;
653                        if aligned.is_finite() {
654                            clk.push((idx, aligned, state.flags));
655                        }
656                    }
657                }
658            }
659
660            let flag = |srcs: Vec<usize>| MergeFlag {
661                epoch,
662                satellite: sat,
663                sources: srcs,
664            };
665
666            // Position consensus -> the merged position and the indices (into
667            // `pos`) of the sources that contributed it. In precedence mode the
668            // preferred source is fixed per satellite arc; never switch to a
669            // lower-precedence source just because the preferred source is
670            // missing or outside a different consensus cluster at this epoch.
671            let (position_m, pos_members) = if opts.combine == MergeCombine::Precedence {
672                let Some(preferred_source) = preferred_source else {
673                    continue;
674                };
675                let Some(preferred_idx) =
676                    pos.iter().position(|(src, _, _)| *src == preferred_source)
677                else {
678                    continue;
679                };
680
681                if pos.len() == 1 {
682                    report.single_source.push(flag(vec![pos[preferred_idx].0]));
683                    (pos[preferred_idx].1, vec![preferred_idx])
684                } else {
685                    let pts: Vec<[f64; 3]> = pos.iter().map(|(_, p, _)| *p).collect();
686                    let cluster = largest_within_containing(&pts, preferred_idx, |a, b| {
687                        dist3(a, b) <= opts.position_tolerance_m
688                    });
689                    if cluster.len() >= opts.min_agree {
690                        let rejected: Vec<usize> = (0..pos.len())
691                            .filter(|i| !cluster.contains(i))
692                            .map(|i| pos[i].0)
693                            .collect();
694                        if !rejected.is_empty() {
695                            report.position_outliers.push(flag(rejected));
696                        }
697                        (pos[preferred_idx].1, cluster)
698                    } else {
699                        report
700                            .quarantined
701                            .push(flag(pos.iter().map(|(i, _, _)| *i).collect()));
702                        continue;
703                    }
704                }
705            } else if pos.len() == 1 {
706                report.single_source.push(flag(vec![pos[0].0]));
707                (pos[0].1, vec![0usize])
708            } else {
709                let pts: Vec<[f64; 3]> = pos.iter().map(|(_, p, _)| *p).collect();
710                let cluster = largest_within(&pts, |a, b| dist3(a, b) <= opts.position_tolerance_m);
711                if cluster.len() >= opts.min_agree {
712                    let rejected: Vec<usize> = (0..pos.len())
713                        .filter(|i| !cluster.contains(i))
714                        .map(|i| pos[i].0)
715                        .collect();
716                    if !rejected.is_empty() {
717                        report.position_outliers.push(flag(rejected));
718                    }
719                    let members: Vec<(usize, [f64; 3])> =
720                        cluster.iter().map(|&i| (pos[i].0, pos[i].1)).collect();
721                    (combine3(&members, opts.combine), cluster)
722                } else {
723                    report
724                        .quarantined
725                        .push(flag(pos.iter().map(|(i, _, _)| *i).collect()));
726                    continue;
727                }
728            };
729
730            // Clock consensus, independent of position -> the merged clock and the
731            // indices (into `clk`) of the sources that contributed it.
732            let (clock_s, clk_members): (Option<f64>, Vec<usize>) = if clk.is_empty() {
733                (None, Vec::new())
734            } else if opts.combine == MergeCombine::Precedence {
735                match preferred_source
736                    .and_then(|src| clk.iter().position(|(clock_src, _, _)| *clock_src == src))
737                {
738                    None => (None, Vec::new()),
739                    Some(preferred_idx) if clk.len() == 1 => {
740                        (Some(clk[preferred_idx].1), vec![preferred_idx])
741                    }
742                    Some(preferred_idx) => {
743                        let vals: Vec<f64> = clk.iter().map(|(_, c, _)| *c).collect();
744                        let cluster = largest_within_containing(&vals, preferred_idx, |a, b| {
745                            (a - b).abs() <= opts.clock_tolerance_s
746                        });
747                        if cluster.len() >= opts.min_agree {
748                            (Some(clk[preferred_idx].1), cluster)
749                        } else {
750                            (None, Vec::new())
751                        }
752                    }
753                }
754            } else if clk.len() == 1 {
755                (Some(clk[0].1), vec![0usize])
756            } else {
757                let vals: Vec<f64> = clk.iter().map(|(_, c, _)| *c).collect();
758                let cluster = largest_within(&vals, |a, b| (a - b).abs() <= opts.clock_tolerance_s);
759                if cluster.len() >= opts.min_agree {
760                    let members: Vec<(usize, f64)> =
761                        cluster.iter().map(|&i| (clk[i].0, clk[i].1)).collect();
762                    (Some(combine_axis(&members, opts.combine)), cluster)
763                } else {
764                    (None, Vec::new())
765                }
766            };
767
768            // Preserve record flags: OR the orbit flags across the position
769            // members and the clock flags across the clock members, so a
770            // `clock_event` (clock reset) or maneuver on any contributing source
771            // survives into the merged product.
772            let mut flags = Sp3Flags::default();
773            for &i in &pos_members {
774                flags.maneuver |= pos[i].2.maneuver;
775                flags.orbit_predicted |= pos[i].2.orbit_predicted;
776            }
777            for &i in &clk_members {
778                flags.clock_event |= clk[i].2.clock_event;
779                flags.clock_predicted |= clk[i].2.clock_predicted;
780            }
781
782            // Per-cell agreement: dispersion of the accepted consensus members
783            // about the combined value actually written below.
784            let (position_rms_m, position_max_m) =
785                position_dispersion(&pos, &pos_members, &position_m);
786            let (clock_members_n, clock_rms_s, clock_max_s) = match clock_s {
787                Some(c) => {
788                    let (rms, max) = clock_dispersion(&clk, &clk_members, c);
789                    (clk_members.len(), Some(rms), Some(max))
790                }
791                None => (0, None, None),
792            };
793            report.agreement.push(AgreementMetric {
794                epoch,
795                satellite: sat,
796                position_members: pos_members.len(),
797                position_rms_m,
798                position_max_m,
799                clock_members: clock_members_n,
800                clock_rms_s,
801                clock_max_s,
802            });
803
804            all_sats.insert(sat);
805            states.insert(
806                sat,
807                Sp3State {
808                    position: ItrfPositionM::new(position_m[0], position_m[1], position_m[2])
809                        .expect("valid ITRF position"),
810                    clock_s,
811                    velocity: None,
812                    clock_rate_s_s: None,
813                    flags,
814                },
815            );
816            raws.insert(
817                sat,
818                RawNode {
819                    km: [
820                        position_m[0] / KM_TO_M,
821                        position_m[1] / KM_TO_M,
822                        position_m[2] / KM_TO_M,
823                    ],
824                    clock_us: clock_s.map(|c| c * 1.0e6),
825                    clock_event: flags.clock_event,
826                },
827            );
828        }
829
830        out_states.push(states);
831        out_raw.push(raws);
832    }
833
834    // Base the non-epoch metadata on a source product, but derive the first-epoch
835    // header fields from the merged grid itself. Mixed cadence / coverage can make
836    // the merged first epoch later than every input's first epoch, so cloning
837    // those fields from any input would make the `##` line stale.
838    let first_key = Some(out_epoch_j2000_s[0].floor() as i64);
839    let base_idx = sources
840        .iter()
841        .position(|s| {
842            s.epochs
843                .first()
844                .and_then(|ep| sp3_epoch_j2000_seconds(s, 0, ep))
845                .map(|sec| sec.floor() as i64)
846                == first_key
847        })
848        .or_else(|| {
849            sources
850                .iter()
851                .enumerate()
852                .filter_map(|(i, s)| {
853                    s.epochs
854                        .first()
855                        .and_then(|ep| sp3_epoch_j2000_seconds(s, 0, ep))
856                        .map(|sec| (sec, i))
857                })
858                .min_by(|a, b| a.0.total_cmp(&b.0).then(a.1.cmp(&b.1)))
859                .map(|(_, i)| i)
860        })
861        .unwrap_or(0);
862    let first_epoch_header = first_epoch_header_fields(&out_epochs[0]).ok_or_else(|| {
863        Error::InvalidInput("merged SP3 first epoch cannot be represented in header fields".into())
864    })?;
865
866    let satellites: Vec<_> = all_sats.into_iter().collect();
867    let satellite_accuracy_codes = satellites
868        .iter()
869        .map(|sat| {
870            sources[base_idx]
871                .header
872                .satellites
873                .iter()
874                .position(|base_sat| base_sat == sat)
875                .and_then(|idx| {
876                    sources[base_idx]
877                        .header
878                        .satellite_accuracy_codes
879                        .get(idx)
880                        .copied()
881                })
882                .unwrap_or(0)
883        })
884        .collect();
885
886    let header = Sp3Header {
887        num_epochs: out_epochs.len() as u64,
888        satellites,
889        satellite_accuracy_codes,
890        data_type: Sp3DataType::Position,
891        gnss_week: first_epoch_header.gnss_week,
892        seconds_of_week: first_epoch_header.seconds_of_week,
893        epoch_interval_s,
894        mjd: first_epoch_header.mjd,
895        mjd_fraction: first_epoch_header.mjd_fraction,
896        ..sources[base_idx].header.clone()
897    };
898
899    let merged = Sp3 {
900        header,
901        epochs: out_epochs,
902        epoch_j2000_s: out_epoch_j2000_s,
903        states: out_states,
904        interp_raw: out_raw,
905        comments: vec![format!("MERGED from {} SP3 products", sources.len())],
906        skipped_records: sources.iter().map(|s| s.skipped_records).sum(),
907    };
908
909    Ok((merged, report))
910}
911
912#[derive(Debug, Clone, Copy)]
913struct FirstEpochHeaderFields {
914    gnss_week: u32,
915    seconds_of_week: f64,
916    mjd: u32,
917    mjd_fraction: f64,
918}
919
920fn first_epoch_header_fields(epoch: &Instant) -> Option<FirstEpochHeaderFields> {
921    let split = epoch.julian_date()?;
922
923    let mjd_day = mjd_from_jd(split.jd_whole);
924    let mut mjd = mjd_day.floor();
925    let mut mjd_fraction = split.fraction + (mjd_day - mjd);
926    let fraction_days = mjd_fraction.floor();
927    if fraction_days != 0.0 {
928        mjd += fraction_days;
929        mjd_fraction -= fraction_days;
930    }
931    if !(0.0..=u32::MAX as f64).contains(&mjd) {
932        return None;
933    }
934
935    let gps_seconds = instant_to_j2000_seconds(epoch)? + GPS_EPOCH_TO_J2000_S;
936    let (gnss_week, seconds_of_week) = gnss::week_and_seconds_of_week(gps_seconds);
937    if !(0.0..=u32::MAX as f64).contains(&gnss_week) {
938        return None;
939    }
940
941    Some(FirstEpochHeaderFields {
942        gnss_week: gnss_week as u32,
943        seconds_of_week,
944        mjd: mjd as u32,
945        mjd_fraction,
946    })
947}
948
949fn dist3(a: &[f64; 3], b: &[f64; 3]) -> f64 {
950    vec3::norm3(vec3::sub3(*a, *b))
951}
952
953/// RMS and max of the 3D distance of each `members` position (indices into `pos`)
954/// from `combined`. `members` is the accepted consensus, always non-empty.
955fn position_dispersion(
956    pos: &[(usize, [f64; 3], Sp3Flags)],
957    members: &[usize],
958    combined: &[f64; 3],
959) -> (f64, f64) {
960    let mut sumsq = 0.0;
961    let mut max = 0.0_f64;
962    for &i in members {
963        let d = dist3(&pos[i].1, combined);
964        sumsq += d * d;
965        max = max.max(d);
966    }
967    ((sumsq / members.len().max(1) as f64).sqrt(), max)
968}
969
970/// RMS and max of the absolute deviation of each `members` clock (indices into
971/// `clk`) from `combined`. `members` is the accepted consensus, always non-empty.
972fn clock_dispersion(
973    clk: &[(usize, f64, Sp3Flags)],
974    members: &[usize],
975    combined: f64,
976) -> (f64, f64) {
977    let mut sumsq = 0.0;
978    let mut max = 0.0_f64;
979    for &i in members {
980        let d = (clk[i].1 - combined).abs();
981        sumsq += d * d;
982        max = max.max(d);
983    }
984    ((sumsq / members.len().max(1) as f64).sqrt(), max)
985}
986
987fn precedence_sources_for_satellites(
988    sources: &[Sp3],
989    epoch_index: &[BTreeMap<i64, usize>],
990    epoch_keys: &BTreeMap<i64, Instant>,
991    systems: Option<&BTreeSet<GnssSystem>>,
992) -> BTreeMap<GnssSatelliteId, usize> {
993    let mut by_sat = BTreeMap::new();
994
995    for (idx, source) in sources.iter().enumerate() {
996        for key in epoch_keys.keys() {
997            let Some(&epoch_idx) = epoch_index[idx].get(key) else {
998                continue;
999            };
1000            let Ok(states) = source.states_at(epoch_idx) else {
1001                continue;
1002            };
1003
1004            for sat in states.keys() {
1005                if systems.is_none_or(|allowed| allowed.contains(&sat.system)) {
1006                    by_sat.entry(*sat).or_insert(idx);
1007                }
1008            }
1009        }
1010    }
1011
1012    by_sat
1013}
1014
1015/// Resolve the common (output) epoch interval and validate that every input can
1016/// be decimated onto it without interpolation.
1017///
1018/// The common interval is the caller's `target` if given, otherwise the
1019/// **coarsest** native interval among the inputs (the finest grid every input
1020/// can supply). An input is compatible only when the common interval is a
1021/// positive-integer multiple of that input's native interval: then the
1022/// common-grid epochs are an exact subset of the input's epochs, and the merge's
1023/// epoch intersection performs the decimation (e.g. a 5-minute product
1024/// contributes its :00/:15/:30/:45 epochs to a 15-minute merge - no orbit/clock
1025/// interpolation is introduced). Inputs whose interval does not evenly divide the
1026/// common interval - a coarser input than the requested grid, or a non-divisible
1027/// cadence - are rejected as incompatible. Equal-interval inputs (multiple 1) are
1028/// the same-interval fast path and behave exactly as before.
1029fn resolve_common_epoch_interval(sources: &[Sp3], target: Option<f64>) -> Result<f64> {
1030    let intervals: Vec<f64> = sources
1031        .iter()
1032        .enumerate()
1033        .map(|(idx, source)| {
1034            effective_epoch_interval_s(source)?.ok_or_else(|| {
1035                Error::InvalidInput(format!(
1036                    "merge input {idx} has no usable positive epoch interval"
1037                ))
1038            })
1039        })
1040        .collect::<Result<Vec<_>>>()?;
1041
1042    let common = match target {
1043        Some(t) if t.is_finite() && t > 0.0 => t,
1044        Some(t) => {
1045            return Err(Error::InvalidInput(format!(
1046                "merge target epoch interval must be positive and finite, got {t}"
1047            )))
1048        }
1049        None => intervals.iter().copied().fold(0.0_f64, f64::max),
1050    };
1051
1052    // The merge matches and decimates epochs on whole-second J2000 keys, so the
1053    // common grid must fall on whole seconds for the decimation lattice to be
1054    // exact. SP3 grids are integer-second; reject a fractional common interval
1055    // rather than decimate on a mismatched (rounded) lattice.
1056    if (common - common.round()).abs() > WHOLE_SECOND_EPS_S || common.round() < 1.0 {
1057        return Err(Error::InvalidInput(format!(
1058            "merge common epoch interval {common:.6} s must be a positive whole number of seconds"
1059        )));
1060    }
1061
1062    for (idx, interval) in intervals.iter().copied().enumerate() {
1063        if !divides_evenly(interval, common) {
1064            return Err(Error::InvalidInput(format!(
1065                "merge inputs have mismatched epoch intervals: common {common:.6} s is not an integer multiple of input {idx} {interval:.6} s (no exact-subset decimation; positional interpolation is not performed)"
1066            )));
1067        }
1068    }
1069
1070    Ok(common)
1071}
1072
1073/// True when `common` is a positive-integer multiple of `interval` (within the
1074/// interval tolerance), i.e. `interval`'s grid is a superset of the common grid.
1075fn divides_evenly(interval: f64, common: f64) -> bool {
1076    if !(interval.is_finite() && interval > 0.0 && common.is_finite() && common > 0.0) {
1077        return false;
1078    }
1079    let k = (common / interval).round();
1080    k >= 1.0 && same_interval(k * interval, common)
1081}
1082
1083fn effective_epoch_interval_s(source: &Sp3) -> Result<Option<f64>> {
1084    let secs: Vec<f64> = source
1085        .epochs
1086        .iter()
1087        .filter_map(instant_to_j2000_seconds)
1088        .collect();
1089    validate::require_strictly_increasing(secs.iter().copied(), "merge input epochs").map_err(
1090        |error| Error::InvalidInput(format!("{} must be strictly increasing", error.field())),
1091    )?;
1092    let gaps: Vec<f64> = secs.windows(2).map(|w| w[1] - w[0]).collect();
1093
1094    if gaps.is_empty() {
1095        let header = source.header.epoch_interval_s;
1096        return Ok((header.is_finite() && header > 0.0).then_some(header));
1097    }
1098
1099    let interval = gaps[0];
1100    if gaps.iter().all(|g| same_interval(*g, interval)) {
1101        Ok(Some(interval))
1102    } else {
1103        Ok(None)
1104    }
1105}
1106
1107fn same_interval(a: f64, b: f64) -> bool {
1108    (a - b).abs() <= WHOLE_SECOND_EPS_S
1109}
1110
1111/// Indices of the largest subset of `items` whose members are *mutually* within
1112/// `within`. Exact max-clique over normal source counts; deterministic greedy
1113/// fallback above [`MAX_EXACT_CLIQUE_NODES`] keeps hostile overlap graphs bounded.
1114/// Ties resolve to the lowest-indexed subset (precedence).
1115fn largest_within<T>(items: &[T], within: impl Fn(&T, &T) -> bool) -> Vec<usize> {
1116    let n = items.len();
1117    if n <= 1 {
1118        return (0..n).collect();
1119    }
1120    let graph = agreement_graph(items, within);
1121    if n > MAX_EXACT_CLIQUE_NODES {
1122        return greedy_largest_clique(&graph);
1123    }
1124    let mut best = vec![0];
1125    let mut current = Vec::new();
1126    max_clique_search(&graph, &mut current, (0..n).collect(), &mut best);
1127    best
1128}
1129
1130fn largest_within_containing<T>(
1131    items: &[T],
1132    required: usize,
1133    within: impl Fn(&T, &T) -> bool,
1134) -> Vec<usize> {
1135    let n = items.len();
1136    if n == 0 || required >= n {
1137        return Vec::new();
1138    }
1139    if n == 1 {
1140        return vec![required];
1141    }
1142
1143    let graph = agreement_graph(items, within);
1144    if n > MAX_EXACT_CLIQUE_NODES {
1145        return greedy_largest_clique_containing(&graph, required);
1146    }
1147    let candidates = (0..n)
1148        .filter(|&idx| idx != required && graph[required][idx])
1149        .collect();
1150    let mut best = vec![required];
1151    let mut current = vec![required];
1152    max_clique_search(&graph, &mut current, candidates, &mut best);
1153    best
1154}
1155
1156fn agreement_graph<T>(items: &[T], within: impl Fn(&T, &T) -> bool) -> Vec<Vec<bool>> {
1157    let n = items.len();
1158    let mut graph = vec![vec![false; n]; n];
1159    for i in 0..n {
1160        graph[i][i] = true;
1161        for j in i + 1..n {
1162            let agrees = within(&items[i], &items[j]);
1163            graph[i][j] = agrees;
1164            graph[j][i] = agrees;
1165        }
1166    }
1167    graph
1168}
1169
1170fn greedy_largest_clique(graph: &[Vec<bool>]) -> Vec<usize> {
1171    let mut best = Vec::new();
1172    for seed in 0..graph.len() {
1173        let candidate = greedy_clique_from_seed(graph, seed);
1174        update_best_clique(&candidate, &mut best);
1175    }
1176    best
1177}
1178
1179fn greedy_largest_clique_containing(graph: &[Vec<bool>], required: usize) -> Vec<usize> {
1180    if required >= graph.len() {
1181        return Vec::new();
1182    }
1183    greedy_clique_from_seed(graph, required)
1184}
1185
1186fn greedy_clique_from_seed(graph: &[Vec<bool>], seed: usize) -> Vec<usize> {
1187    let mut clique = vec![seed];
1188    for (idx, _) in graph.iter().enumerate() {
1189        if idx == seed {
1190            continue;
1191        }
1192        if clique.iter().all(|&member| graph[member][idx]) {
1193            clique.push(idx);
1194        }
1195    }
1196    clique.sort_unstable();
1197    clique
1198}
1199
1200fn max_clique_search(
1201    graph: &[Vec<bool>],
1202    current: &mut Vec<usize>,
1203    mut candidates: Vec<usize>,
1204    best: &mut Vec<usize>,
1205) {
1206    candidates.sort_unstable();
1207    for (pos, &candidate) in candidates.iter().enumerate() {
1208        let remaining = candidates.len() - pos;
1209        if current.len() + remaining < best.len() {
1210            break;
1211        }
1212
1213        let next_candidates = candidates[pos + 1..]
1214            .iter()
1215            .copied()
1216            .filter(|&idx| graph[candidate][idx])
1217            .collect();
1218
1219        current.push(candidate);
1220        update_best_clique(current, best);
1221        max_clique_search(graph, current, next_candidates, best);
1222        current.pop();
1223    }
1224}
1225
1226fn update_best_clique(current: &[usize], best: &mut Vec<usize>) {
1227    let mut candidate = current.to_vec();
1228    candidate.sort_unstable();
1229    if candidate.len() > best.len()
1230        || (candidate.len() == best.len() && candidate.as_slice() < best.as_slice())
1231    {
1232        *best = candidate;
1233    }
1234}
1235
1236fn combine3(members: &[(usize, [f64; 3])], how: MergeCombine) -> [f64; 3] {
1237    [0usize, 1, 2].map(|axis| {
1238        let axis_members: Vec<(usize, f64)> = members.iter().map(|(s, v)| (*s, v[axis])).collect();
1239        combine_axis(&axis_members, how)
1240    })
1241}
1242
1243fn combine_axis(members: &[(usize, f64)], how: MergeCombine) -> f64 {
1244    match how {
1245        MergeCombine::Mean => members.iter().map(|(_, v)| *v).sum::<f64>() / members.len() as f64,
1246        MergeCombine::Median => {
1247            let mut vals: Vec<f64> = members.iter().map(|(_, v)| *v).collect();
1248            median(&mut vals).expect("consensus cluster is non-empty")
1249        }
1250        MergeCombine::Precedence => members
1251            .iter()
1252            .min_by_key(|(s, _)| *s)
1253            .map(|(_, v)| *v)
1254            .expect("consensus cluster is non-empty"),
1255    }
1256}
1257
1258/// Return a copy of `other` with its clocks shifted onto `reference`'s clock
1259/// datum.
1260///
1261/// This applies the per-epoch reference-clock offset from
1262/// [`clock_reference_offset`]: at each epoch where the offset could be estimated
1263/// (at least `min_common` common clocked satellites), every clocked satellite's
1264/// offset has the datum subtracted, so the result's clocks are directly
1265/// comparable to `reference`'s. Positions are untouched (already comparable).
1266///
1267/// Epochs where the offset could not be estimated are left unchanged - they are
1268/// *not* on `reference`'s datum, so a caller mixing aligned and unaligned epochs
1269/// should consult [`clock_reference_offset`] to see which epochs were aligned.
1270/// The returned product interpolates like any other [`Sp3`].
1271pub fn align_clock_reference(reference: &Sp3, other: &Sp3, min_common: usize) -> Sp3 {
1272    let offsets: BTreeMap<i64, f64> = clock_reference_offset(reference, other, min_common)
1273        .into_iter()
1274        .filter_map(|o| {
1275            instant_to_j2000_seconds(&o.epoch).map(|sec| (sec.floor() as i64, o.offset_s))
1276        })
1277        .collect();
1278
1279    let mut aligned = other.clone();
1280    for ei in 0..aligned.epochs.len() {
1281        let Some(sec) = sp3_epoch_j2000_seconds(&aligned, ei, &aligned.epochs[ei]) else {
1282            continue;
1283        };
1284        let Some(&off) = offsets.get(&(sec.floor() as i64)) else {
1285            continue;
1286        };
1287        for state in aligned.states[ei].values_mut() {
1288            if let Some(c) = state.clock_s.as_mut() {
1289                *c -= off;
1290            }
1291        }
1292        for node in aligned.interp_raw[ei].values_mut() {
1293            if let Some(us) = node.clock_us.as_mut() {
1294                *us -= off * 1.0e6;
1295            }
1296        }
1297    }
1298    aligned
1299}
1300
1301#[cfg(test)]
1302mod tests {
1303    use super::super::Sp3;
1304    use super::{
1305        align_clock_reference, clock_reference_offset, merge, MergeCombine, MergeOptions,
1306        MergeReport,
1307    };
1308    use crate::constants::SECONDS_PER_DAY;
1309    use crate::id::{GnssSatelliteId, GnssSystem};
1310    use std::collections::BTreeSet;
1311
1312    /// One satellite sample in a synthetic SP3 epoch: token, ECEF position
1313    /// (km), and optional clock (microseconds).
1314    type SatSample<'a> = (&'a str, [f64; 3], Option<f64>);
1315
1316    fn gps(prn: u8) -> GnssSatelliteId {
1317        GnssSatelliteId::new(GnssSystem::Gps, prn).expect("valid satellite id")
1318    }
1319
1320    // Single-epoch SP3-c from explicit `(satellite, [x,y,z] km, clock us, flag
1321    // suffix)` records under coordinate system `cs` (5 chars, e.g. `"IGS14"`).
1322    // `flags` is appended verbatim after the 60-column record body, so a test can
1323    // place an SP3 flag (e.g. `"              E"` -> the `E` clock-event flag at
1324    // column 75). A `None` clock writes the SP3 bad-clock sentinel.
1325    fn sp3_build(records: &[(&str, [f64; 3], Option<f64>, &str)], cs: &str) -> Sp3 {
1326        let n = records.len();
1327        let mut sats = String::new();
1328        for (sat, _, _, _) in records {
1329            sats.push_str(sat);
1330        }
1331        for _ in n..17 {
1332            sats.push_str("  0");
1333        }
1334        let mut body = String::new();
1335        body.push_str(&format!(
1336            "#cP2020  6 25  0  0  0.00000000       1 ORBIT {cs} FIT  TST\n"
1337        ));
1338        body.push_str("## 2111 432000.00000000   900.00000000 59025 0.0000000000000\n");
1339        body.push_str(&format!("+   {n:2}   {sats}\n"));
1340        body.push_str("++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n");
1341        body.push_str("%c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1342        body.push_str("%c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1343        body.push_str("%f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n");
1344        body.push_str("%f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n");
1345        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1346        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1347        body.push_str("/* TEST SP3-c FIXTURE\n");
1348        body.push_str("*  2020  6 25  0  0  0.00000000\n");
1349        for (sat, p, clk, flags) in records {
1350            let c = clk.unwrap_or(999_999.999_999);
1351            body.push_str(&format!(
1352                "P{sat}{:14.6}{:14.6}{:14.6}{c:14.6}{flags}\n",
1353                p[0], p[1], p[2]
1354            ));
1355        }
1356        body.push_str("EOF\n");
1357        Sp3::parse(body.as_bytes()).expect("parse test sp3")
1358    }
1359
1360    // The common case: IGS14, no flags.
1361    fn sp3_records(records: &[(&str, [f64; 3], Option<f64>)]) -> Sp3 {
1362        let full: Vec<(&str, [f64; 3], Option<f64>, &str)> =
1363            records.iter().map(|(s, p, c)| (*s, *p, *c, "")).collect();
1364        sp3_build(&full, "IGS14")
1365    }
1366
1367    fn sp3_two_epochs(
1368        epoch0: &[(&str, [f64; 3], Option<f64>)],
1369        epoch1: &[(&str, [f64; 3], Option<f64>)],
1370        interval_s: f64,
1371        cs: &str,
1372    ) -> Sp3 {
1373        let mut sats: Vec<&str> = epoch0
1374            .iter()
1375            .chain(epoch1.iter())
1376            .map(|(sat, _, _)| *sat)
1377            .collect();
1378        sats.sort_unstable();
1379        sats.dedup();
1380        let n = sats.len();
1381        let mut sat_field = String::new();
1382        for sat in &sats {
1383            sat_field.push_str(sat);
1384        }
1385        for _ in n..17 {
1386            sat_field.push_str("  0");
1387        }
1388
1389        let mut body = String::new();
1390        body.push_str(&format!(
1391            "#cP2020  6 25  0  0  0.00000000       2 ORBIT {cs} FIT  TST\n"
1392        ));
1393        body.push_str(&format!(
1394            "## 2111 432000.00000000 {interval_s:14.8} 59025 0.0000000000000\n"
1395        ));
1396        body.push_str(&format!("+   {n:2}   {sat_field}\n"));
1397        body.push_str("++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n");
1398        body.push_str("%c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1399        body.push_str("%c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1400        body.push_str("%f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n");
1401        body.push_str("%f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n");
1402        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1403        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1404        body.push_str("/* TEST SP3-c FIXTURE\n");
1405        body.push_str("*  2020  6 25  0  0  0.00000000\n");
1406        for (sat, p, clk) in epoch0 {
1407            let c = clk.unwrap_or(999_999.999_999);
1408            body.push_str(&format!(
1409                "P{sat}{:14.6}{:14.6}{:14.6}{c:14.6}\n",
1410                p[0], p[1], p[2]
1411            ));
1412        }
1413        let second_hour = (interval_s as i64) / 3600;
1414        let second_minute = ((interval_s as i64) % 3600) / 60;
1415        let second_second = (interval_s as i64) % 60;
1416        body.push_str(&format!(
1417            "*  2020  6 25 {second_hour:2} {second_minute:2} {second_second:2}.00000000\n"
1418        ));
1419        for (sat, p, clk) in epoch1 {
1420            let c = clk.unwrap_or(999_999.999_999);
1421            body.push_str(&format!(
1422                "P{sat}{:14.6}{:14.6}{:14.6}{c:14.6}\n",
1423                p[0], p[1], p[2]
1424            ));
1425        }
1426        body.push_str("EOF\n");
1427        Sp3::parse(body.as_bytes()).expect("parse test sp3")
1428    }
1429
1430    // N consecutive epochs spaced `interval_s` apart from 2020-06-25 00:00:00.
1431    fn sp3_epochs(
1432        start_offset_s: f64,
1433        epochs: &[&[SatSample<'_>]],
1434        interval_s: f64,
1435        cs: &str,
1436    ) -> Sp3 {
1437        let mut sats: Vec<&str> = epochs
1438            .iter()
1439            .flat_map(|e| e.iter().map(|(sat, _, _)| *sat))
1440            .collect();
1441        sats.sort_unstable();
1442        sats.dedup();
1443        let n = sats.len();
1444        let mut sat_field = String::new();
1445        for sat in &sats {
1446            sat_field.push_str(sat);
1447        }
1448        for _ in n..17 {
1449            sat_field.push_str("  0");
1450        }
1451
1452        let hms = |t: i64| (t / 3600, (t % 3600) / 60, t % 60);
1453        let start = start_offset_s as i64;
1454        let (sh, sm, ss0) = hms(start);
1455
1456        let mut body = String::new();
1457        body.push_str(&format!(
1458            "#cP2020  6 25 {sh:2} {sm:2} {ss0:2}.00000000      {:2} ORBIT {cs} FIT  TST\n",
1459            epochs.len()
1460        ));
1461        // Seconds-of-week and MJD fraction of the first epoch shift with the start.
1462        let sow = 432_000.0 + start_offset_s;
1463        let mjd_frac = start_offset_s / SECONDS_PER_DAY;
1464        body.push_str(&format!(
1465            "## 2111 {sow:15.8} {interval_s:14.8} 59025 {mjd_frac:.13}\n"
1466        ));
1467        body.push_str(&format!("+   {n:2}   {sat_field}\n"));
1468        body.push_str("++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n");
1469        body.push_str("%c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1470        body.push_str("%c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n");
1471        body.push_str("%f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n");
1472        body.push_str("%f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n");
1473        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1474        body.push_str("%i    0    0    0    0      0      0      0      0         0\n");
1475        body.push_str("/* TEST SP3-c FIXTURE\n");
1476        for (k, recs) in epochs.iter().enumerate() {
1477            let (hh, mm, ss) = hms(start + (k as i64) * (interval_s as i64));
1478            body.push_str(&format!("*  2020  6 25 {hh:2} {mm:2} {ss:2}.00000000\n"));
1479            for (sat, p, clk) in recs.iter() {
1480                let c = clk.unwrap_or(999_999.999_999);
1481                body.push_str(&format!(
1482                    "P{sat}{:14.6}{:14.6}{:14.6}{c:14.6}\n",
1483                    p[0], p[1], p[2]
1484                ));
1485            }
1486        }
1487        body.push_str("EOF\n");
1488        Sp3::parse(body.as_bytes()).expect("parse test sp3")
1489    }
1490
1491    #[test]
1492    fn merge_unions_coverage_when_one_center_misses_a_satellite() {
1493        // Center A reports G01/G02/G03; center B is missing G03. The merged
1494        // product must still cover G03 at that epoch (filled from A).
1495        let a = sp3_records(&[
1496            ("G01", [15000.0, -20000.0, 5000.0], Some(100.0)),
1497            ("G02", [16000.0, -21000.0, 6000.0], Some(200.0)),
1498            ("G03", [17000.0, -22000.0, 7000.0], Some(300.0)),
1499        ]);
1500        let b = sp3_records(&[
1501            ("G01", [15000.0, -20000.0, 5000.0], Some(100.0)),
1502            ("G02", [16000.0, -21000.0, 6000.0], Some(200.0)),
1503        ]);
1504
1505        let (merged, report) = merge(&[a, b], &MergeOptions::default()).expect("merge");
1506
1507        let states = merged.states_at(0).expect("epoch 0");
1508        assert!(
1509            states.contains_key(&gps(3)),
1510            "merged output must cover G03 from the center that has it"
1511        );
1512        assert_eq!(states.len(), 3, "union is G01/G02/G03");
1513        // G01 agreed across both centers -> consensus clock is their value.
1514        let g01 = states[&gps(1)];
1515        assert!((g01.clock_s.unwrap() - 100.0e-6).abs() < 1.0e-15);
1516        // G03 had a single source -> carried through, recorded, not quarantined.
1517        assert!(report.quarantined.is_empty());
1518        assert_eq!(report.single_source.len(), 1);
1519        assert_eq!(report.single_source[0].satellite, gps(3));
1520
1521        // The un-cross-checked share is surfaced: 1 of 3 accepted cells (G03) was
1522        // single-source, so a clean multi-source agreement RMS is not the whole
1523        // story. An empty report reports None.
1524        let frac = report
1525            .single_source_fraction()
1526            .expect("accepted cells present");
1527        assert!(
1528            (frac - 1.0 / 3.0).abs() < 1.0e-12,
1529            "single-source fraction {frac}"
1530        );
1531        assert_eq!(MergeReport::default().single_source_fraction(), None);
1532    }
1533
1534    #[test]
1535    fn merge_combines_two_of_three_agreeing_sources_and_rejects_the_outlier() {
1536        // A and B agree on G01; C is 10 m off in X (> the default 0.5 m tolerance).
1537        let a = sp3_records(&[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))]);
1538        let b = sp3_records(&[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))]);
1539        let c = sp3_records(&[("G01", [15000.010, -20000.0, 5000.0], Some(100.0))]);
1540
1541        let (merged, report) = merge(&[a, b, c], &MergeOptions::default()).expect("merge");
1542
1543        let states = merged.states_at(0).expect("epoch 0");
1544        let g01 = states[&gps(1)];
1545        // Consensus is A/B (15000 km == 1.5e7 m); not dragged toward C.
1546        assert!(
1547            (g01.position.as_array()[0] - 15_000_000.0).abs() < 1.0e-3,
1548            "got {}",
1549            g01.position.as_array()[0]
1550        );
1551        // C is source index 2 -> recorded as the rejected position outlier.
1552        assert_eq!(report.position_outliers.len(), 1);
1553        assert_eq!(report.position_outliers[0].sources, vec![2]);
1554        assert!(report.quarantined.is_empty());
1555    }
1556
1557    #[test]
1558    fn merge_consensus_handles_more_than_u32_mask_bits() {
1559        // Thirty-two centers agree and the 33rd is 10 m off in X. This used to
1560        // overflow the u32 subset mask before any consensus could be found.
1561        let sources: Vec<Sp3> = (0..33)
1562            .map(|idx| {
1563                let x_km = if idx < 32 { 15000.0 } else { 15000.010 };
1564                sp3_records(&[("G01", [x_km, -20000.0, 5000.0], Some(100.0))])
1565            })
1566            .collect();
1567
1568        for combine in [MergeCombine::Mean, MergeCombine::Precedence] {
1569            let opts = MergeOptions {
1570                combine,
1571                min_agree: 32,
1572                ..MergeOptions::default()
1573            };
1574
1575            let (merged, report) = merge(&sources, &opts).expect("33-source merge");
1576
1577            let states = merged.states_at(0).expect("epoch 0");
1578            let g01 = states[&gps(1)];
1579            assert!(
1580                (g01.position.as_array()[0] - 15_000_000.0).abs() < 1.0e-3,
1581                "{combine:?}: got {}",
1582                g01.position.as_array()[0]
1583            );
1584            assert_eq!(
1585                report.position_outliers.len(),
1586                1,
1587                "{combine:?}: expected one outlier report"
1588            );
1589            assert_eq!(report.position_outliers[0].sources, vec![32]);
1590            assert!(report.quarantined.is_empty(), "{combine:?}");
1591        }
1592    }
1593
1594    #[test]
1595    fn merge_bounds_large_overlap_clique_search() {
1596        let sources: Vec<Sp3> = (0..40)
1597            .map(|idx| {
1598                let x_km = if idx % 2 == 0 { 15000.0 } else { 15000.010 };
1599                sp3_records(&[("G01", [x_km, -20000.0, 5000.0], Some(100.0))])
1600            })
1601            .collect();
1602        let opts = MergeOptions {
1603            min_agree: 20,
1604            ..MergeOptions::default()
1605        };
1606
1607        let (merged, report) = merge(&sources, &opts).expect("bounded large-source merge");
1608
1609        let states = merged.states_at(0).expect("epoch 0");
1610        let g01 = states[&gps(1)];
1611        assert!(
1612            (g01.position.as_array()[0] - 15_000_000.0).abs() < 1.0e-3,
1613            "got {}",
1614            g01.position.as_array()[0]
1615        );
1616        assert_eq!(report.position_outliers.len(), 1);
1617        assert_eq!(
1618            report.position_outliers[0].sources,
1619            (1..40).step_by(2).collect::<Vec<_>>()
1620        );
1621        assert!(report.quarantined.is_empty());
1622    }
1623
1624    #[test]
1625    fn merge_quarantines_a_satellite_all_centers_disagree_on() {
1626        // Three sources, mutually beyond tolerance on G01: no 2-of-3 consensus.
1627        let a = sp3_records(&[("G01", [15000.000, -20000.0, 5000.0], Some(100.0))]);
1628        let b = sp3_records(&[("G01", [15000.010, -20000.0, 5000.0], Some(100.0))]);
1629        let c = sp3_records(&[("G01", [15000.020, -20000.0, 5000.0], Some(100.0))]);
1630
1631        let (merged, report) = merge(&[a, b, c], &MergeOptions::default()).expect("merge");
1632
1633        assert!(
1634            merged.states_at(0).expect("epoch 0").is_empty(),
1635            "no consensus -> G01 omitted, not averaged across disagreeing centers"
1636        );
1637        assert_eq!(report.quarantined.len(), 1);
1638        assert_eq!(report.quarantined[0].satellite, gps(1));
1639    }
1640
1641    #[test]
1642    fn merge_rejects_an_empty_input() {
1643        assert!(merge(&[], &MergeOptions::default()).is_err());
1644    }
1645
1646    #[test]
1647    fn merge_omits_an_unalignable_secondary_clock() {
1648        // Only 3 common satellites, but the default clock datum needs 5, so
1649        // center B's clocks cannot be put on A's datum. They must be dropped
1650        // rather than emitted raw, and a B-only satellite gets a position but no
1651        // clock.
1652        let a = sp3_records(&[
1653            ("G01", [15000.0, -20000.0, 5000.0], Some(100.0)),
1654            ("G02", [16000.0, -21000.0, 6000.0], Some(200.0)),
1655            ("G03", [17000.0, -22000.0, 7000.0], Some(300.0)),
1656        ]);
1657        let b = sp3_records(&[
1658            ("G01", [15000.0, -20000.0, 5000.0], Some(150.0)),
1659            ("G02", [16000.0, -21000.0, 6000.0], Some(250.0)),
1660            ("G03", [17000.0, -22000.0, 7000.0], Some(350.0)),
1661            ("G04", [18000.0, -23000.0, 8000.0], Some(450.0)),
1662        ]);
1663
1664        let (merged, _) = merge(&[a, b], &MergeOptions::default()).expect("merge");
1665        let states = merged.states_at(0).expect("epoch 0");
1666
1667        // G04 is B-only (gap fill): position carried, clock unalignable -> dropped.
1668        assert!(states.contains_key(&gps(4)));
1669        assert!(
1670            states[&gps(4)].clock_s.is_none(),
1671            "an unalignable secondary clock must be dropped, not emitted raw"
1672        );
1673        // G01's clock comes from the reference (source 0), which is on its own datum.
1674        let g01_clock = states[&gps(1)]
1675            .clock_s
1676            .expect("G01 carries the reference clock");
1677        assert!((g01_clock - 100.0e-6).abs() < 1.0e-12, "got {g01_clock}");
1678    }
1679
1680    #[test]
1681    fn merge_rejects_mismatched_coordinate_systems() {
1682        let a = sp3_build(
1683            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
1684            "IGS14",
1685        );
1686        let b = sp3_build(
1687            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
1688            "IGS20",
1689        );
1690
1691        assert!(merge(&[a, b], &MergeOptions::default()).is_err());
1692    }
1693
1694    #[test]
1695    fn merge_rejects_different_igs_frame_labels_without_a_transform() {
1696        let a = sp3_build(
1697            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
1698            "IGS20",
1699        );
1700        let b = sp3_build(
1701            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
1702            "IGc20",
1703        );
1704
1705        let err = merge(&[a, b], &MergeOptions::default()).expect_err("frame mismatch");
1706        assert!(
1707            err.to_string().contains("mismatched coordinate systems"),
1708            "{err}"
1709        );
1710    }
1711
1712    #[test]
1713    fn merge_decimates_finer_interval_onto_coarse_common_grid() {
1714        // 15-min (900 s) center A and 5-min (300 s) center B over the same span.
1715        // The merge must decimate B onto the 900 s grid (exact subset of B's :00
1716        // and :15 epochs; the :05/:10 epochs are dropped, not interpolated),
1717        // output at 900 s. Under precedence, A (source 0) wins G01's whole arc and
1718        // B's distinct values must never be substituted mid-arc.
1719        let a = sp3_two_epochs(
1720            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1721            &[("G01", [15003.0, -20003.0, 5003.0], Some(103.0))],
1722            900.0,
1723            "IGS14",
1724        );
1725        let b = sp3_epochs(
1726            0.0,
1727            &[
1728                &[("G01", [26000.0, -20000.0, 5000.0], Some(200.0))],
1729                &[("G01", [26001.0, -20001.0, 5001.0], Some(201.0))],
1730                &[("G01", [26002.0, -20002.0, 5002.0], Some(202.0))],
1731                &[("G01", [26003.0, -20003.0, 5003.0], Some(203.0))],
1732            ],
1733            300.0,
1734            "IGS14",
1735        );
1736
1737        let opts = MergeOptions {
1738            combine: MergeCombine::Precedence,
1739            min_agree: 1,
1740            ..MergeOptions::default()
1741        };
1742        let (merged, _report) =
1743            merge(&[a, b], &opts).expect("mixed-interval merge decimates onto the coarse grid");
1744
1745        assert_eq!(
1746            merged.header.epoch_interval_s, 900.0,
1747            "output is on the coarse (900 s) common grid"
1748        );
1749        assert_eq!(
1750            merged.epochs.len(),
1751            2,
1752            "only the two aligned epochs (:00, :15), not B's four"
1753        );
1754        // Per-arc precedence intact across the decimated grid: A (source 0) wins
1755        // both epochs; B's :00/:15 values (26000xxx km) are never substituted.
1756        for idx in 0..2 {
1757            let g01 = merged.states_at(idx).expect("epoch")[&gps(1)];
1758            assert!(
1759                (g01.position.as_array()[0] - 15_000_000.0 - (idx as f64) * 3000.0).abs() < 1.0,
1760                "epoch {idx}: expected A's value, got {}",
1761                g01.position.as_array()[0]
1762            );
1763        }
1764        assert!(merged.states_at(0).expect("epoch 0").contains_key(&gps(1)));
1765        assert!(merged.states_at(1).expect("epoch 1").contains_key(&gps(1)));
1766    }
1767
1768    #[test]
1769    fn merge_decimates_with_explicit_coarser_target_interval() {
1770        // Two 5-min inputs, explicit 900 s target: both decimate to the 15-min grid.
1771        let recs = |x: f64| vec![("G01", [x, -20000.0, 5000.0], Some(100.0))];
1772        let make = || {
1773            sp3_epochs(
1774                0.0,
1775                &[
1776                    &recs(15000.0),
1777                    &recs(15001.0),
1778                    &recs(15002.0),
1779                    &recs(15003.0),
1780                ],
1781                300.0,
1782                "IGS14",
1783            )
1784        };
1785        let opts = MergeOptions {
1786            min_agree: 1,
1787            target_epoch_interval_s: Some(900.0),
1788            ..MergeOptions::default()
1789        };
1790        let (merged, _) = merge(&[make(), make()], &opts).expect("explicit coarse target");
1791        assert_eq!(merged.header.epoch_interval_s, 900.0);
1792        assert_eq!(
1793            merged.epochs.len(),
1794            2,
1795            "decimated 5-min inputs to the 900 s target"
1796        );
1797    }
1798
1799    #[test]
1800    fn merge_rejects_non_divisible_epoch_intervals() {
1801        // 900 s and 400 s: 900 is not an integer multiple of 400, so no exact
1802        // subset of the 400 s grid lands on the 900 s grid -> still rejected
1803        // (positional interpolation is never performed).
1804        let a = sp3_two_epochs(
1805            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1806            &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1807            900.0,
1808            "IGS14",
1809        );
1810        let b = sp3_two_epochs(
1811            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1812            &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1813            400.0,
1814            "IGS14",
1815        );
1816
1817        let err = merge(&[a, b], &MergeOptions::default()).expect_err("non-divisible intervals");
1818        assert!(
1819            err.to_string().contains("mismatched epoch intervals"),
1820            "{err}"
1821        );
1822    }
1823
1824    #[test]
1825    fn merge_rejects_a_non_whole_second_common_interval() {
1826        // The decimation lattice is whole-second J2000 keys, so a fractional
1827        // common interval must be rejected rather than silently rounded.
1828        let mk = || {
1829            sp3_two_epochs(
1830                &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1831                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1832                900.0,
1833                "IGS14",
1834            )
1835        };
1836        let opts = MergeOptions {
1837            target_epoch_interval_s: Some(450.5),
1838            ..MergeOptions::default()
1839        };
1840        let err = merge(&[mk(), mk()], &opts).expect_err("fractional target");
1841        assert!(err.to_string().contains("whole number of seconds"), "{err}");
1842    }
1843
1844    #[test]
1845    fn merge_header_first_epoch_describes_the_decimated_grid_start() {
1846        // Source A starts at 00:00, source B at 00:15 (both 15-min). The merged
1847        // grid's first epoch is the first COMMON epoch, 00:15, so the output
1848        // header's seconds-of-week / MJD fraction must describe 00:15 (source B),
1849        // not source A's earlier 00:00 start.
1850        let a = sp3_epochs(
1851            0.0,
1852            &[
1853                &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1854                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1855                &[("G01", [15002.0, -20002.0, 5002.0], Some(102.0))],
1856            ],
1857            900.0,
1858            "IGS14",
1859        );
1860        let b = sp3_epochs(
1861            900.0,
1862            &[
1863                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1864                &[("G01", [15002.0, -20002.0, 5002.0], Some(102.0))],
1865                &[("G01", [15003.0, -20003.0, 5003.0], Some(103.0))],
1866            ],
1867            900.0,
1868            "IGS14",
1869        );
1870
1871        let opts = MergeOptions {
1872            min_agree: 1,
1873            ..MergeOptions::default()
1874        };
1875        let (merged, _) = merge(&[a, b], &opts).expect("merge");
1876
1877        assert_eq!(merged.epochs.len(), 2, "common epochs are 00:15 and 00:30");
1878        assert!(
1879            (merged.header.seconds_of_week - 346_500.0).abs() < 1.0e-6,
1880            "header sow must describe the merged first epoch 00:15 (346500 s), got {}",
1881            merged.header.seconds_of_week
1882        );
1883        assert!(
1884            (merged.header.mjd_fraction - 900.0 / SECONDS_PER_DAY).abs() < 1.0e-9,
1885            "header MJD fraction must describe 00:15, got {}",
1886            merged.header.mjd_fraction
1887        );
1888    }
1889
1890    #[test]
1891    fn merge_writer_recomputes_header_when_common_grid_starts_after_all_inputs() {
1892        // A starts on the 15-minute grid at 00:00. B starts on a 7.5-minute grid
1893        // at 00:07:30. Their coarsened common grid starts at 00:15, which is not
1894        // the first epoch of either input, so the merged `##` header must be
1895        // derived from the output epoch rather than cloned from a source header.
1896        let a = sp3_epochs(
1897            0.0,
1898            &[
1899                &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1900                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1901                &[("G01", [15002.0, -20002.0, 5002.0], Some(102.0))],
1902            ],
1903            900.0,
1904            "IGS14",
1905        );
1906        let b = sp3_epochs(
1907            450.0,
1908            &[
1909                &[("G01", [15010.0, -20010.0, 5010.0], Some(110.0))],
1910                &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1911                &[("G01", [15011.0, -20011.0, 5011.0], Some(111.0))],
1912                &[("G01", [15002.0, -20002.0, 5002.0], Some(102.0))],
1913            ],
1914            450.0,
1915            "IGS14",
1916        );
1917
1918        let opts = MergeOptions {
1919            min_agree: 1,
1920            ..MergeOptions::default()
1921        };
1922        let (merged, _) = merge(&[a, b], &opts).expect("mixed-cadence merge");
1923
1924        assert_eq!(merged.epochs.len(), 2, "common epochs are 00:15 and 00:30");
1925        let text = merged.to_sp3_string();
1926        let header = text
1927            .lines()
1928            .find(|line| line.starts_with("## "))
1929            .expect("written ## header");
1930        let first_epoch = text
1931            .lines()
1932            .find(|line| line.starts_with("*  "))
1933            .expect("written first epoch");
1934
1935        assert_eq!(first_epoch, "*  2020  6 25  0 15  0.00000000");
1936        assert_eq!(
1937            header,
1938            "## 2111 346500.00000000   900.00000000 59025 0.0104166666667"
1939        );
1940    }
1941
1942    #[test]
1943    fn precedence_merge_never_switches_source_within_one_satellite_arc() {
1944        let a = sp3_two_epochs(
1945            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0))],
1946            &[],
1947            900.0,
1948            "IGS14",
1949        );
1950        let b = sp3_two_epochs(
1951            &[("G01", [15000.001, -20000.0, 5000.0], Some(100.0))],
1952            &[("G01", [15001.0, -20001.0, 5001.0], Some(101.0))],
1953            900.0,
1954            "IGS14",
1955        );
1956        let opts = MergeOptions {
1957            combine: MergeCombine::Precedence,
1958            min_agree: 1,
1959            ..MergeOptions::default()
1960        };
1961
1962        let (merged, _report) = merge(&[a, b], &opts).expect("merge");
1963        let epoch0 = merged.states_at(0).expect("epoch 0");
1964        let epoch1 = merged.states_at(1).expect("epoch 1");
1965
1966        assert!(epoch0.contains_key(&gps(1)));
1967        assert!(
1968            !epoch1.contains_key(&gps(1)),
1969            "G01 must not switch from source 0 at epoch 0 to source 1 at epoch 1"
1970        );
1971        assert_eq!(merged.header.epoch_interval_s, 900.0);
1972    }
1973
1974    #[test]
1975    fn merge_filters_requested_constellations_and_header_satellites() {
1976        let a = sp3_two_epochs(
1977            &[
1978                ("G01", [15000.0, -20000.0, 5000.0], Some(100.0)),
1979                ("E01", [21000.0, -1000.0, 13000.0], Some(120.0)),
1980            ],
1981            &[
1982                ("G01", [15001.0, -20001.0, 5001.0], Some(101.0)),
1983                ("E01", [21001.0, -1001.0, 13001.0], Some(121.0)),
1984            ],
1985            900.0,
1986            "IGS14",
1987        );
1988        let systems = BTreeSet::from([GnssSystem::Gps]);
1989        let opts = MergeOptions {
1990            systems: Some(systems),
1991            ..MergeOptions::default()
1992        };
1993
1994        let (merged, _report) = merge(&[a], &opts).expect("merge");
1995
1996        assert_eq!(merged.header.satellites, vec![gps(1)]);
1997        for idx in 0..merged.epochs.len() {
1998            let states = merged.states_at(idx).expect("epoch");
1999            assert_eq!(states.keys().copied().collect::<Vec<_>>(), vec![gps(1)]);
2000        }
2001    }
2002
2003    #[test]
2004    fn merge_preserves_a_clock_event_flag() {
2005        // Source A carries an `E` clock-event flag on G01 (column 75); the merged
2006        // product must keep it so the interpolator still splits the clock arc.
2007        let a = sp3_build(
2008            &[(
2009                "G01",
2010                [15000.0, -20000.0, 5000.0],
2011                Some(100.0),
2012                "              E",
2013            )],
2014            "IGS14",
2015        );
2016        let b = sp3_build(
2017            &[("G01", [15000.0, -20000.0, 5000.0], Some(100.0), "")],
2018            "IGS14",
2019        );
2020
2021        let (merged, _) = merge(&[a, b], &MergeOptions::default()).expect("merge");
2022        let g01 = merged.states_at(0).expect("epoch 0")[&gps(1)];
2023
2024        assert!(
2025            g01.flags.clock_event,
2026            "merged cell must preserve a contributing source's clock-event flag"
2027        );
2028    }
2029
2030    #[test]
2031    fn merge_reports_effective_epoch_interval_from_actual_epochs() {
2032        // The header DECLARES a 300 s interval, but the two epochs are 15 min
2033        // (900 s) apart. The synthetic merged header must report the spacing of
2034        // the actual merged epochs, not inherit the stale declared value.
2035        let body = "#cP2020  6 25  0  0  0.00000000       2 ORBIT IGS14 FIT  TST\n\
2036            ## 2111 432000.00000000   300.00000000 59025 0.0000000000000\n\
2037            +    1   G01  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2038            ++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2039            %c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2040            %c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2041            %f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n\
2042            %f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n\
2043            %i    0    0    0    0      0      0      0      0         0\n\
2044            %i    0    0    0    0      0      0      0      0         0\n\
2045            /* TEST SP3-c FIXTURE\n\
2046            *  2020  6 25  0  0  0.00000000\n\
2047            PG01  15000.000000 -20000.000000   5000.000000    100.000000\n\
2048            *  2020  6 25  0 15  0.00000000\n\
2049            PG01  15001.000000 -20001.000000   5001.000000    101.000000\n\
2050            EOF\n";
2051        let a = Sp3::parse(body.as_bytes()).expect("parse test sp3");
2052
2053        let (merged, _) = merge(&[a], &MergeOptions::default()).expect("merge");
2054
2055        assert!(
2056            (merged.header.epoch_interval_s - 900.0).abs() < 1.0e-6,
2057            "got {}",
2058            merged.header.epoch_interval_s
2059        );
2060    }
2061
2062    #[test]
2063    fn merge_rejects_unsorted_input_epochs_before_cadence_inference() {
2064        let body = "#cP2020  6 25  0  0  0.00000000       2 ORBIT IGS14 FIT  TST\n\
2065            ## 2111 432000.00000000   900.00000000 59025 0.0000000000000\n\
2066            +    1   G01  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2067            ++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2068            %c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2069            %c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2070            %f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n\
2071            %f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n\
2072            %i    0    0    0    0      0      0      0      0         0\n\
2073            %i    0    0    0    0      0      0      0      0         0\n\
2074            /* TEST SP3-c FIXTURE\n\
2075            *  2020  6 25  0 15  0.00000000\n\
2076            PG01  15001.000000 -20001.000000   5001.000000    101.000000\n\
2077            *  2020  6 25  0  0  0.00000000\n\
2078            PG01  15000.000000 -20000.000000   5000.000000    100.000000\n\
2079            EOF\n";
2080        let source = Sp3::parse(body.as_bytes()).expect("parse unsorted test sp3");
2081
2082        let err = merge(&[source], &MergeOptions::default()).expect_err("unsorted epochs");
2083
2084        assert!(
2085            err.to_string()
2086                .contains("merge input epochs must be strictly increasing"),
2087            "{err}"
2088        );
2089    }
2090
2091    #[test]
2092    fn align_clock_reference_puts_other_on_the_reference_datum() {
2093        // `other`'s clocks all run +50 us ahead; after alignment they should sit
2094        // on `reference`'s datum (G01: 150 us - 50 us = 100 us = 1e-4 s).
2095        let reference = sp3([100.0, 200.0, 300.0]);
2096        let other = sp3([150.0, 250.0, 350.0]);
2097
2098        let aligned = align_clock_reference(&reference, &other, 3);
2099
2100        let g01 = aligned.states_at(0).expect("epoch 0")[&gps(1)];
2101        assert!(
2102            (g01.clock_s.unwrap() - 100.0e-6).abs() < 1.0e-15,
2103            "got {}",
2104            g01.clock_s.unwrap()
2105        );
2106        // Positions are untouched by clock alignment.
2107        let original = other.states_at(0).expect("epoch 0")[&gps(1)];
2108        assert_eq!(g01.position.as_array(), original.position.as_array());
2109    }
2110
2111    // Minimal single-epoch SP3-c with three satellites; each `clocks_us` entry is
2112    // that satellite's clock in microseconds (positions are arbitrary but non-zero
2113    // so they parse as valid records).
2114    fn sp3(clocks_us: [f64; 3]) -> Sp3 {
2115        let body = format!(
2116            "#cP2020  6 25  0  0  0.00000000       1 ORBIT IGS14 FIT  TST\n\
2117             ## 2111 432000.00000000   900.00000000 59025 0.0000000000000\n\
2118             +    3   G01G02G03  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2119             ++         0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0  0\n\
2120             %c G  cc GPS ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2121             %c cc cc ccc ccc cccc cccc cccc cccc ccccc ccccc ccccc ccccc\n\
2122             %f  1.2500000  1.025000000  0.00000000000  0.000000000000000\n\
2123             %f  0.0000000  0.000000000  0.00000000000  0.000000000000000\n\
2124             %i    0    0    0    0      0      0      0      0         0\n\
2125             %i    0    0    0    0      0      0      0      0         0\n\
2126             /* TEST SP3-c FIXTURE\n\
2127             *  2020  6 25  0  0  0.00000000\n\
2128             PG01  15000.000000 -20000.000000   5000.000000 {:13.6}\n\
2129             PG02  -1234.567890   2345.678901  -3456.789012 {:13.6}\n\
2130             PG03   8000.000000  12000.000000 -19000.000000 {:13.6}\n\
2131             EOF\n",
2132            clocks_us[0], clocks_us[1], clocks_us[2]
2133        );
2134        Sp3::parse(body.as_bytes()).expect("parse test sp3")
2135    }
2136
2137    #[test]
2138    fn recovers_a_uniform_datum_shift() {
2139        // every `other` clock is +50 us (= 5e-5 s) from `reference`.
2140        let reference = sp3([100.0, 200.0, 300.0]);
2141        let other = sp3([150.0, 250.0, 350.0]);
2142
2143        let offsets = clock_reference_offset(&reference, &other, 3);
2144
2145        assert_eq!(offsets.len(), 1);
2146        assert_eq!(offsets[0].satellites, 3);
2147        assert!(
2148            (offsets[0].offset_s - 5.0e-5).abs() < 1.0e-12,
2149            "got {}",
2150            offsets[0].offset_s
2151        );
2152    }
2153
2154    #[test]
2155    fn median_rejects_a_single_outlier_clock() {
2156        // Two satellites agree (+50 us); one is a wild outlier (+9000 us). The
2157        // median over the three tracks the consensus instead of being dragged out.
2158        let reference = sp3([100.0, 200.0, 300.0]);
2159        let other = sp3([150.0, 250.0, 9_300.0]);
2160
2161        let offsets = clock_reference_offset(&reference, &other, 3);
2162
2163        assert_eq!(offsets.len(), 1);
2164        assert!(
2165            (offsets[0].offset_s - 5.0e-5).abs() < 1.0e-12,
2166            "got {}",
2167            offsets[0].offset_s
2168        );
2169    }
2170
2171    #[test]
2172    fn omits_epochs_below_min_common() {
2173        // Three common clocked satellites, but require four: the fragile estimate
2174        // is omitted rather than reported.
2175        let reference = sp3([100.0, 200.0, 300.0]);
2176        let other = sp3([150.0, 250.0, 350.0]);
2177
2178        assert!(clock_reference_offset(&reference, &other, 4).is_empty());
2179    }
2180
2181    #[test]
2182    fn merge_agreement_metric_reports_known_position_dispersion() {
2183        // Three centers place G01 on a line, 0 / +3 m / +6 m in X, all within a
2184        // wide consensus tolerance. The mean combine writes +3 m, so the member
2185        // distances from the combined value are {3, 0, 3} m:
2186        //   RMS = sqrt((9 + 0 + 9) / 3) = sqrt(6) m,  max = 3 m.
2187        let a = sp3_records(&[("G01", [15000.000, -20000.0, 5000.0], Some(100.0))]);
2188        let b = sp3_records(&[("G01", [15000.003, -20000.0, 5000.0], Some(100.0))]);
2189        let c = sp3_records(&[("G01", [15000.006, -20000.0, 5000.0], Some(100.0))]);
2190        let opts = MergeOptions {
2191            position_tolerance_m: 10.0,
2192            min_agree: 3,
2193            combine: MergeCombine::Mean,
2194            ..MergeOptions::default()
2195        };
2196
2197        let (_merged, report) = merge(&[a, b, c], &opts).expect("merge");
2198
2199        assert_eq!(report.agreement.len(), 1, "one accepted cell");
2200        let m = report.agreement[0];
2201        assert_eq!(m.satellite, gps(1));
2202        assert_eq!(m.position_members, 3);
2203        assert!(
2204            (m.position_rms_m - 6.0_f64.sqrt()).abs() < 1.0e-6,
2205            "got rms {}",
2206            m.position_rms_m
2207        );
2208        assert!(
2209            (m.position_max_m - 3.0).abs() < 1.0e-6,
2210            "got max {}",
2211            m.position_max_m
2212        );
2213
2214        // The pooled summaries over the single cell reproduce the cell values.
2215        assert!((report.position_agreement_rms_m().unwrap() - 6.0_f64.sqrt()).abs() < 1.0e-6);
2216        assert!((report.position_agreement_max_m().unwrap() - 3.0).abs() < 1.0e-6);
2217
2218        // Per-epoch aggregate: one epoch, one multi-source satellite.
2219        let per_epoch = report.per_epoch_agreement();
2220        assert_eq!(per_epoch.len(), 1);
2221        assert_eq!(per_epoch[0].satellites, 1);
2222        assert!((per_epoch[0].position_rms_m - 6.0_f64.sqrt()).abs() < 1.0e-6);
2223        assert!((per_epoch[0].position_max_m - 3.0).abs() < 1.0e-6);
2224    }
2225
2226    #[test]
2227    fn merge_agreement_metric_reports_known_clock_dispersion() {
2228        // Same positions across A/B/C (zero position spread); the three centers
2229        // share a clock datum (G01/G02 identical) so the per-epoch datum offset is
2230        // zero and G03's clocks stay as authored: 300 / 330 / 270 us. The mean
2231        // combine writes 300 us, so the deviations are {0, +30, -30} us:
2232        //   RMS = sqrt((0 + 30^2 + 30^2)/3) us = sqrt(600) us,  max = 30 us.
2233        let a = sp3([100.0, 200.0, 300.0]);
2234        let b = sp3([100.0, 200.0, 330.0]);
2235        let c = sp3([100.0, 200.0, 270.0]);
2236        let opts = MergeOptions {
2237            clock_min_common: 1,
2238            clock_tolerance_s: 1.0e-3,
2239            min_agree: 3,
2240            combine: MergeCombine::Mean,
2241            ..MergeOptions::default()
2242        };
2243
2244        let (_merged, report) = merge(&[a, b, c], &opts).expect("merge");
2245
2246        let g03 = report
2247            .agreement
2248            .iter()
2249            .find(|m| m.satellite == gps(3))
2250            .expect("G03 agreement metric");
2251        assert_eq!(g03.clock_members, 3);
2252        let expected_rms_s = 600.0_f64.sqrt() * 1.0e-6;
2253        assert!(
2254            (g03.clock_rms_s.unwrap() - expected_rms_s).abs() < 1.0e-15,
2255            "got clock rms {:?}",
2256            g03.clock_rms_s
2257        );
2258        assert!(
2259            (g03.clock_max_s.unwrap() - 30.0e-6).abs() < 1.0e-15,
2260            "got clock max {:?}",
2261            g03.clock_max_s
2262        );
2263        // G01/G02 agree exactly -> zero clock dispersion.
2264        for prn in [1u8, 2] {
2265            let m = report
2266                .agreement
2267                .iter()
2268                .find(|m| m.satellite == gps(prn))
2269                .expect("metric");
2270            assert!(m.clock_rms_s.unwrap().abs() < 1.0e-18, "prn {prn}");
2271            // Positions identical across centers -> zero position dispersion too.
2272            assert!(m.position_rms_m.abs() < 1.0e-9, "prn {prn}");
2273        }
2274
2275        // The clock pooled summary is the RMS over the three multi-source cells
2276        // (G01=0, G02=0, G03), each with 3 members:
2277        //   sqrt((0 + 0 + 3*expected^2) / 9) = expected / sqrt(3).
2278        let pooled = report.clock_agreement_rms_s().expect("clock pool");
2279        assert!(
2280            (pooled - expected_rms_s / 3.0_f64.sqrt()).abs() < 1.0e-15,
2281            "got pooled {pooled}"
2282        );
2283        assert!((report.clock_agreement_max_s().unwrap() - 30.0e-6).abs() < 1.0e-15);
2284    }
2285
2286    // Real-data oracle: combine published individual analysis-center final
2287    // products (COD/GFZ/JPL, 2026-04-30, GPS week 2416 DOY 120) and compare to the
2288    // published IGS official combined for the same day. The IGS combination is a
2289    // specific weighted algorithm, so the crate's mean combine is not a bit-match;
2290    // the gate is agreement at the inter-center spread level (cm-level bound), gated
2291    // at RMS < 2 cm and max < 5 cm (observed RMS ~0.7 cm, max ~1.6 cm over 88 cells).
2292    //
2293    // Fixture provenance: the COD/GFZ/JPL `_trim.SP3` files are the final precise
2294    // orbit products of CODE (AIUB Bern), GFZ Potsdam, and JPL, all frame IGc20 /
2295    // time system GPS (ESA/GRG excluded for IGS20 frame labelling). From the Wuhan
2296    // University IGS mirror `ftp://igs.gnsswhu.cn/pub/gps/products/2416/`, full-day
2297    // `.gz`: COD0OPSFIN_20261200000_01D_05M_ORB.SP3.gz (634569 B, sha256
2298    // 90393acaed691cd4d19cd4ade7153873eb41ef38585df177d9d540eac6316112);
2299    // GFZ0OPSFIN…05M_ORB.SP3.gz (647028 B, sha256
2300    // a51a04ab283a981ddec20ae77d575cd05f4f8249202e0ee4f73e7243b7817e88);
2301    // JPL0OPSFIN…05M_ORB.SP3.gz (482973 B, sha256
2302    // 3a39ccb2d097eddb139047532b2b93c5d538abc39255fc779278ac64f10cd185). Each trim
2303    // keeps the verbatim header and only the 11 epochs 09:45..12:15 landing on the
2304    // combined's 900 s grid plus the 8-sat subset common to all three centers and
2305    // the combined (G02,G03,G04,G05,G09,G17,G25,G31); velocity/correlation records
2306    // dropped, no values altered. Trim sha256: COD…_trim.SP3 (7227 B)
2307    // f3ad3f637134651d086815345f3e5f531a9dbacb6f739b7dddf664e0ab3a1795;
2308    // GFZ…_trim.SP3 (9805 B)
2309    // 9e50edc53ac42791923fd71c39b49a97bf516084f1d2b1dcb260685d2a8f11cc;
2310    // JPL…_trim.SP3 (8210 B)
2311    // 9ac5aafdabed38679892f57b42864cc3716d997400280f29ee8049a37057adf4. The oracle
2312    // IGS0OPSFIN combined product provenance is in `sp3/tests.rs`.
2313    #[cfg(sidereon_repo_tests)]
2314    #[test]
2315    fn merge_agrees_with_published_igs_combined_within_cm() {
2316        fn load(name: &str) -> Sp3 {
2317            let path = format!("{}/tests/fixtures/sp3/{}", env!("CARGO_MANIFEST_DIR"), name);
2318            let bytes = std::fs::read(&path).unwrap_or_else(|e| panic!("read {path}: {e}"));
2319            Sp3::parse(&bytes).unwrap_or_else(|e| panic!("parse {name}: {e}"))
2320        }
2321
2322        let cod = load("COD0OPSFIN_20261200945_02H30M_15M_ORB_trim.SP3");
2323        let gfz = load("GFZ0OPSFIN_20261200945_02H30M_15M_ORB_trim.SP3");
2324        let jpl = load("JPL0OPSFIN_20261200945_02H30M_15M_ORB_trim.SP3");
2325        let igs = load("IGS0OPSFIN_20261200945_02H30M_15M_ORB.SP3");
2326
2327        let (merged, report) =
2328            merge(&[cod, gfz, jpl], &MergeOptions::default()).expect("multi-center merge");
2329
2330        // All three centers agree at the 0.5 m position tolerance: nothing
2331        // quarantined, every cell a 3-source consensus.
2332        assert!(
2333            report.quarantined.is_empty(),
2334            "centers should agree: {:?}",
2335            report.quarantined
2336        );
2337        // A clean 3-source consensus everywhere: no gap-fills, no rejected
2338        // outliers, and every accepted cell backed by all three centers.
2339        assert!(
2340            report.single_source.is_empty(),
2341            "{:?}",
2342            report.single_source
2343        );
2344        assert!(
2345            report.position_outliers.is_empty(),
2346            "{:?}",
2347            report.position_outliers
2348        );
2349        assert!(
2350            report.agreement.iter().all(|a| a.position_members == 3),
2351            "every agreement cell should be a 3-source consensus"
2352        );
2353
2354        let mut igs_idx: std::collections::BTreeMap<i64, usize> = std::collections::BTreeMap::new();
2355        for (i, ep) in igs.epochs.iter().enumerate() {
2356            if let Some(s) = super::instant_to_j2000_seconds(ep) {
2357                igs_idx.insert(s.floor() as i64, i);
2358            }
2359        }
2360
2361        let mut sumsq = 0.0_f64;
2362        let mut max = 0.0_f64;
2363        let mut n = 0usize;
2364        for (mi, ep) in merged.epochs.iter().enumerate() {
2365            let key = super::instant_to_j2000_seconds(ep)
2366                .expect("merged epoch key")
2367                .floor() as i64;
2368            let ii = *igs_idx.get(&key).expect("IGS combined covers merged epoch");
2369            let merged_states = merged.states_at(mi).expect("merged states");
2370            let igs_states = igs.states_at(ii).expect("IGS states");
2371            for (sat, mst) in merged_states.iter() {
2372                let ist = igs_states
2373                    .get(sat)
2374                    .unwrap_or_else(|| panic!("merged sat {sat} missing from IGS combined"));
2375                let d = super::dist3(&mst.position.as_array(), &ist.position.as_array());
2376                sumsq += d * d;
2377                max = max.max(d);
2378                n += 1;
2379            }
2380        }
2381
2382        // Exact coverage: 8 satellites x 11 epochs, every merged cell present in
2383        // the IGS combined (proves same epochs/sats, not a lucky subset).
2384        assert_eq!(n, 88, "expected exactly 88 compared cells, got {n}");
2385        let rms = (sumsq / n as f64).sqrt();
2386        // Observed on this day: RMS ~0.7 cm, max ~1.6 cm. Gate at a cm-level bound.
2387        assert!(
2388            rms < 0.02,
2389            "combine-vs-IGS RMS {:.4} m ({} cells) exceeds the 2 cm gate",
2390            rms,
2391            n
2392        );
2393        assert!(
2394            max < 0.05,
2395            "combine-vs-IGS max {max:.4} m exceeds the 5 cm gate"
2396        );
2397
2398        // The internal inter-center agreement metric is also cm-level.
2399        let dispersion = report
2400            .position_agreement_rms_m()
2401            .expect("multi-source cells present");
2402        assert!(
2403            dispersion < 0.05,
2404            "inter-center position dispersion {dispersion:.4} m"
2405        );
2406    }
2407}