use std::collections::HashMap;
use super::provider::{ProviderId, ProviderQuotes, Quote};
#[derive(Debug, Clone, PartialEq)]
pub struct AggregateResult {
pub value: f64,
pub sources: u8,
pub contributors: Vec<ProviderId>,
}
pub fn combine(xs: &[f64], outlier_pct: f64) -> Option<f64> {
let mut clean: Vec<f64> = xs
.iter()
.copied()
.filter(|x| x.is_finite() && *x > 0.0)
.collect();
if clean.is_empty() {
return None;
}
clean.sort_by(|a, b| a.partial_cmp(b).expect("finite values sort"));
match clean.len() {
1 => Some(clean[0]),
2 => Some(mean(&clean)),
_ => {
let m = median_sorted(&clean);
let tol = m * (outlier_pct / 100.0);
let kept: Vec<f64> = clean
.iter()
.copied()
.filter(|x| (x - m).abs() <= tol)
.collect();
if kept.is_empty() {
Some(m)
} else {
Some(mean(&kept))
}
}
}
}
pub fn resolve_per_base(
per_base: &[(String, String, f64)],
anchors: &HashMap<String, f64>,
) -> HashMap<String, Vec<f64>> {
let mut out: HashMap<String, Vec<f64>> = HashMap::new();
for (currency, base, value) in per_base {
if let Some(anchor) = anchors.get(base) {
let candidate = value * anchor;
if candidate.is_finite() && candidate > 0.0 {
out.entry(currency.clone()).or_default().push(candidate);
}
}
}
out
}
pub fn aggregate_tick(
provider_results: &[(ProviderId, ProviderQuotes)],
outlier_pct: f64,
) -> HashMap<String, AggregateResult> {
let mut direct: HashMap<String, Vec<(ProviderId, f64)>> = HashMap::new();
let mut per_base: Vec<(ProviderId, String, String, f64)> = Vec::new();
for (id, quotes) in provider_results {
for (currency, quote) in quotes {
let currency = currency.to_uppercase();
match quote {
Quote::PerBtc(v) => direct.entry(currency).or_default().push((*id, *v)),
Quote::PerBase { base, value } => {
per_base.push((*id, currency, base.to_uppercase(), *value))
}
}
}
}
let mut anchors: HashMap<String, f64> = HashMap::new();
for (currency, pairs) in &direct {
let values: Vec<f64> = pairs.iter().map(|(_, v)| *v).collect();
if let Some(v) = combine(&values, outlier_pct) {
anchors.insert(currency.clone(), v);
}
}
let mut resolved: HashMap<String, Vec<(ProviderId, f64)>> = HashMap::new();
for (id, currency, base, value) in &per_base {
if let Some(anchor) = anchors.get(base) {
let candidate = value * anchor;
if candidate.is_finite() && candidate > 0.0 {
resolved
.entry(currency.clone())
.or_default()
.push((*id, candidate));
}
}
}
let mut out: HashMap<String, AggregateResult> = HashMap::new();
let currencies: std::collections::HashSet<&String> =
direct.keys().chain(resolved.keys()).collect();
for currency in currencies {
let mut pairs: Vec<(ProviderId, f64)> = Vec::new();
if let Some(d) = direct.get(currency) {
pairs.extend_from_slice(d);
}
if let Some(r) = resolved.get(currency) {
pairs.extend_from_slice(r);
}
let candidates: Vec<f64> = pairs.iter().map(|(_, v)| *v).collect();
if let Some(value) = combine(&candidates, outlier_pct) {
let contributors = kept_contributors(&pairs, outlier_pct);
let sources = candidates
.iter()
.filter(|x| x.is_finite() && **x > 0.0)
.count()
.min(u8::MAX as usize) as u8;
out.insert(
currency.clone(),
AggregateResult {
value,
sources,
contributors,
},
);
}
}
out
}
fn kept_contributors(pairs: &[(ProviderId, f64)], outlier_pct: f64) -> Vec<ProviderId> {
let clean: Vec<(ProviderId, f64)> = pairs
.iter()
.copied()
.filter(|(_, x)| x.is_finite() && *x > 0.0)
.collect();
if clean.is_empty() {
return Vec::new();
}
if clean.len() <= 2 {
return dedup_sort(clean.into_iter().map(|(id, _)| id).collect());
}
let mut values: Vec<f64> = clean.iter().map(|(_, v)| *v).collect();
values.sort_by(|a, b| a.partial_cmp(b).expect("finite values sort"));
let m = median_sorted(&values);
let tol = m * (outlier_pct / 100.0);
let kept: Vec<ProviderId> = clean
.iter()
.copied()
.filter(|(_, x)| (x - m).abs() <= tol)
.map(|(id, _)| id)
.collect();
if kept.is_empty() {
return dedup_sort(clean.into_iter().map(|(id, _)| id).collect());
}
dedup_sort(kept)
}
fn dedup_sort(mut ids: Vec<ProviderId>) -> Vec<ProviderId> {
ids.sort();
ids.dedup();
ids
}
fn mean(xs: &[f64]) -> f64 {
xs.iter().sum::<f64>() / xs.len() as f64
}
fn median_sorted(sorted: &[f64]) -> f64 {
let n = sorted.len();
if n % 2 == 1 {
sorted[n / 2]
} else {
(sorted[n / 2 - 1] + sorted[n / 2]) / 2.0
}
}
#[cfg(test)]
mod tests {
use super::*;
const PCT: f64 = 5.0;
fn approx(a: f64, b: f64) {
assert!((a - b).abs() < 1e-6, "expected {b}, got {a}");
}
#[test]
fn combine_zero_sources_is_none() {
assert!(combine(&[], PCT).is_none());
}
#[test]
fn combine_one_two_sources() {
approx(combine(&[100.0], PCT).unwrap(), 100.0);
approx(combine(&[100.0, 102.0], PCT).unwrap(), 101.0); }
#[test]
fn combine_three_discards_outlier() {
approx(combine(&[100.0, 101.0, 200.0], PCT).unwrap(), 100.5);
}
#[test]
fn combine_all_equal() {
approx(combine(&[50.0, 50.0, 50.0, 50.0], PCT).unwrap(), 50.0);
}
#[test]
fn combine_outlier_boundary_inclusive() {
approx(combine(&[100.0, 100.0, 105.0], PCT).unwrap(), 305.0 / 3.0);
approx(combine(&[100.0, 100.0, 106.0], PCT).unwrap(), 100.0);
}
#[test]
fn combine_even_length_no_value_near_median_falls_back_to_median() {
let out = combine(&[1.0, 2.0, 100.0, 101.0], PCT).unwrap();
assert!(out.is_finite(), "must not be NaN");
approx(out, 51.0);
}
#[test]
fn combine_rejects_non_finite_and_non_positive() {
approx(combine(&[f64::NAN, 100.0, 100.0], PCT).unwrap(), 100.0);
approx(combine(&[0.0, -5.0, 100.0], PCT).unwrap(), 100.0);
approx(combine(&[f64::INFINITY, 100.0], PCT).unwrap(), 100.0);
assert!(combine(&[f64::NAN, 0.0, -1.0], PCT).is_none());
}
#[test]
fn resolve_per_base_with_and_without_anchor() {
let mut anchors = HashMap::new();
anchors.insert("USD".to_string(), 50_000.0);
let pb = vec![("CUP".to_string(), "USD".to_string(), 400.0)];
let resolved = resolve_per_base(&pb, &anchors);
approx(resolved["CUP"][0], 20_000_000.0);
let pb_missing = vec![("CUP".to_string(), "EUR".to_string(), 400.0)];
assert!(resolve_per_base(&pb_missing, &anchors).is_empty());
}
#[test]
fn aggregate_tick_unions_partial_coverage() {
let mut yadio = ProviderQuotes::new();
yadio.insert("USD".into(), Quote::PerBtc(50_000.0));
yadio.insert("EUR".into(), Quote::PerBtc(45_000.0));
yadio.insert("CUP".into(), Quote::PerBtc(20_000_000.0));
let mut coingecko = ProviderQuotes::new();
coingecko.insert("USD".into(), Quote::PerBtc(50_000.0));
coingecko.insert("EUR".into(), Quote::PerBtc(45_000.0));
let mut eltoque = ProviderQuotes::new();
eltoque.insert(
"CUP".into(),
Quote::PerBase {
base: "USD".into(),
value: 400.0,
},
);
let out = aggregate_tick(
&[
(ProviderId::Yadio, yadio),
(ProviderId::CoinGecko, coingecko),
(ProviderId::ElToque, eltoque),
],
PCT,
);
approx(out["USD"].value, 50_000.0);
assert_eq!(out["USD"].sources, 2);
assert_eq!(
out["USD"].contributors,
vec![ProviderId::Yadio, ProviderId::CoinGecko],
"USD: both direct quoters survived"
);
approx(out["EUR"].value, 45_000.0);
assert_eq!(out["EUR"].sources, 2);
approx(out["CUP"].value, 20_000_000.0);
assert_eq!(out["CUP"].sources, 2);
assert_eq!(
out["CUP"].contributors,
vec![ProviderId::Yadio, ProviderId::ElToque]
);
}
#[test]
fn aggregate_tick_failed_provider_contributes_nothing() {
let mut yadio = ProviderQuotes::new();
yadio.insert("USD".into(), Quote::PerBtc(50_000.0));
let out = aggregate_tick(&[(ProviderId::Yadio, yadio)], PCT);
approx(out["USD"].value, 50_000.0);
assert_eq!(out["USD"].sources, 1);
assert_eq!(out["USD"].contributors, vec![ProviderId::Yadio]);
}
#[test]
fn aggregate_tick_uppercases_currency_codes() {
let mut a = ProviderQuotes::new();
a.insert("usd".into(), Quote::PerBtc(50_000.0));
let mut b = ProviderQuotes::new();
b.insert("USD".into(), Quote::PerBtc(50_200.0));
let out = aggregate_tick(&[(ProviderId::CurrencyApi, a), (ProviderId::Yadio, b)], PCT);
assert_eq!(out.len(), 1, "lowercase and uppercase must merge");
approx(out["USD"].value, 50_100.0);
assert_eq!(out["USD"].sources, 2);
assert_eq!(
out["USD"].contributors,
vec![ProviderId::Yadio, ProviderId::CurrencyApi],
"n=2: both contributors keep their seat"
);
}
#[test]
fn aggregate_tick_cross_only_currency_without_anchor_is_absent() {
let mut eltoque = ProviderQuotes::new();
eltoque.insert(
"CUP".into(),
Quote::PerBase {
base: "USD".into(),
value: 400.0,
},
);
let out = aggregate_tick(&[(ProviderId::ElToque, eltoque)], PCT);
assert!(out.is_empty(), "no USD anchor → CUP cannot resolve");
}
#[test]
fn aggregate_tick_outlier_drops_provider_from_contributors() {
let mut yadio = ProviderQuotes::new();
yadio.insert("USD".into(), Quote::PerBtc(50_000.0));
let mut coingecko = ProviderQuotes::new();
coingecko.insert("USD".into(), Quote::PerBtc(50_200.0));
let mut blockchain = ProviderQuotes::new();
blockchain.insert("USD".into(), Quote::PerBtc(75_000.0));
let out = aggregate_tick(
&[
(ProviderId::Yadio, yadio),
(ProviderId::CoinGecko, coingecko),
(ProviderId::Blockchain, blockchain),
],
PCT,
);
approx(out["USD"].value, 50_100.0);
assert_eq!(out["USD"].sources, 3);
assert_eq!(
out["USD"].contributors,
vec![ProviderId::Yadio, ProviderId::CoinGecko],
"outlier provider must not be advertised in the source tag"
);
}
#[test]
fn aggregate_tick_bimodal_fallback_keeps_all_clean_contributors() {
let mk = |v: f64| {
let mut q = ProviderQuotes::new();
q.insert("USD".into(), Quote::PerBtc(v));
q
};
let out = aggregate_tick(
&[
(ProviderId::Yadio, mk(1.0)),
(ProviderId::CoinGecko, mk(2.0)),
(ProviderId::CurrencyApi, mk(100.0)),
(ProviderId::Blockchain, mk(101.0)),
],
PCT,
);
approx(out["USD"].value, 51.0); assert_eq!(
out["USD"].contributors,
vec![
ProviderId::Yadio,
ProviderId::CoinGecko,
ProviderId::CurrencyApi,
ProviderId::Blockchain,
]
);
}
#[test]
fn aggregate_tick_non_finite_value_drops_provider_from_contributors() {
let mut bad = ProviderQuotes::new();
bad.insert("USD".into(), Quote::PerBtc(f64::NAN));
let mut good = ProviderQuotes::new();
good.insert("USD".into(), Quote::PerBtc(50_000.0));
let out = aggregate_tick(
&[(ProviderId::Yadio, bad), (ProviderId::CoinGecko, good)],
PCT,
);
approx(out["USD"].value, 50_000.0);
assert_eq!(
out["USD"].contributors,
vec![ProviderId::CoinGecko],
"NaN must drop the provider from contributors, not silently survive"
);
}
}