use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex, OnceLock, RwLock};
use std::time::Duration;
use chrono::Utc;
use mostro_core::error::{MostroError, ServiceError};
use nostr_sdk::prelude::*;
use tracing::{debug, error, info, warn};
use super::aggregate::{aggregate_tick, AggregateResult};
use super::config::{PriceSettings, ProviderConfig};
use super::fiat::is_known_fiat;
use super::provider::{PriceProvider, ProviderError, ProviderHealth, ProviderId, ProviderQuotes};
use super::providers::blockchain::BlockchainProvider;
use super::providers::coingecko::CoinGeckoProvider;
use super::providers::currency_api::CurrencyApiProvider;
use super::providers::yadio::YadioProvider;
use super::store::{PriceError, PriceStore};
const PROVIDER_COOLDOWN_CAP_SECONDS: u64 = 1800;
static PRICE_MANAGER: OnceLock<PriceManager> = OnceLock::new();
struct EnabledProvider {
id: ProviderId,
provider: Box<dyn PriceProvider>,
health: Mutex<ProviderHealth>,
}
type TimeoutResult = Result<Result<ProviderQuotes, ProviderError>, tokio::time::error::Elapsed>;
pub struct PriceManager {
providers: Vec<EnabledProvider>,
store: Arc<PriceStore>,
settings: PriceSettings,
http: reqwest::Client,
warned_stale: RwLock<HashSet<String>>,
warned_single_source: RwLock<HashSet<String>>,
}
impl PriceManager {
pub fn from_settings(settings: PriceSettings) -> Result<Self, String> {
settings.validate()?;
let mut providers: Vec<EnabledProvider> = Vec::new();
for (id_str, cfg) in &settings.providers {
if !cfg.enabled {
continue;
}
match id_str.parse::<ProviderId>() {
Ok(id) => {
let provider = build_provider(id, cfg)?;
providers.push(EnabledProvider {
id,
provider,
health: Mutex::new(ProviderHealth::new()),
});
}
Err(_) => {
warn!(
"price: unknown provider id `{id_str}` — ignoring (binary is older than the config?)"
);
}
}
}
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(settings.provider_timeout_seconds))
.user_agent(concat!("mostro/", env!("CARGO_PKG_VERSION")))
.build()
.map_err(|e| format!("price: building HTTP client: {e}"))?;
Ok(Self {
providers,
store: Arc::new(PriceStore::new()),
settings,
http,
warned_stale: RwLock::new(HashSet::new()),
warned_single_source: RwLock::new(HashSet::new()),
})
}
pub fn install_global(self) -> Result<(), InstallError> {
PRICE_MANAGER
.set(self)
.map_err(|_| InstallError::AlreadyInstalled)
}
pub fn global() -> Option<&'static PriceManager> {
PRICE_MANAGER.get()
}
pub fn settings(&self) -> &PriceSettings {
&self.settings
}
pub async fn update_all(&self) -> TickReport {
let mut report = TickReport::default();
if self.providers.is_empty() {
warn!("price: no providers enabled — skipping tick");
return report;
}
let now = Utc::now().timestamp();
let mut pollable: Vec<&EnabledProvider> = Vec::with_capacity(self.providers.len());
for p in &self.providers {
let available = p.health.lock().map(|h| h.is_available(now)).unwrap_or(true);
if available {
pollable.push(p);
} else {
info!("price: {} skipped: cooldown (circuit breaker open)", p.id);
report.skipped.push(p.id);
}
}
let outcomes: Vec<(ProviderId, TimeoutResult)> =
futures::future::join_all(pollable.iter().map(|p| async move {
let res =
tokio::time::timeout(self.poll_budget(p.id), p.provider.fetch(&self.http))
.await;
(p.id, res)
}))
.await;
let mut quotes_by_provider: Vec<(ProviderId, ProviderQuotes)> =
Vec::with_capacity(pollable.len());
let failed_at = Utc::now().timestamp();
for (p, (id, outcome)) in pollable.iter().zip(outcomes) {
let ok = match outcome {
Ok(Ok(quotes)) => {
info!("price: {} ok ({} currencies)", id, quotes.len());
quotes_by_provider.push((id, quotes));
report.successes.push(id);
true
}
Ok(Err(e)) => {
warn!("price: {} error: {}", id, e);
report.failures.push((id, e.to_string()));
false
}
Err(_) => {
warn!(
"price: {} timed out after {}s (full mirror budget)",
id,
self.poll_budget(id).as_secs()
);
report.failures.push((id, "timeout".to_string()));
false
}
};
if let Ok(mut health) = p.health.lock() {
if ok {
health.record_success();
} else {
health.record_failure(
failed_at,
self.settings.provider_failure_threshold,
self.settings.provider_failure_cooldown_seconds,
PROVIDER_COOLDOWN_CAP_SECONDS,
);
}
}
}
let filtered_with_ids: Vec<(ProviderId, ProviderQuotes)> = quotes_by_provider
.into_iter()
.map(|(id, quotes)| {
let before = quotes.len();
let fiat_only: ProviderQuotes = quotes
.into_iter()
.filter(|(code, _)| is_known_fiat(code))
.collect();
let dropped = before - fiat_only.len();
if dropped > 0 {
debug!(
"price: {} dropped {} non-fiat codes (allowlist)",
id, dropped
);
}
(id, self.scope_quotes(id, fiat_only))
})
.collect();
let aggregates = aggregate_tick(&filtered_with_ids, self.settings.outlier_threshold_pct);
if aggregates.is_empty() {
warn!("price: tick produced no fresh aggregates — keeping last-known-good");
return report;
}
let mut contributor_set: std::collections::BTreeSet<ProviderId> =
std::collections::BTreeSet::new();
for agg in aggregates.values() {
contributor_set.extend(agg.contributors.iter().copied());
}
let contributors: Vec<ProviderId> = contributor_set.into_iter().collect();
let now = Utc::now().timestamp();
self.observe_warnings(&aggregates);
self.store.update(aggregates.clone(), now);
report.fresh_currencies = aggregates.len();
report.contributors = contributors;
if self.settings.publish_to_nostr {
self.publish_rates_to_nostr(&aggregates, &report.contributors)
.await;
}
report
}
fn poll_budget(&self, id: ProviderId) -> Duration {
let attempts = self
.settings
.providers
.get(&id.to_string())
.map(|c| 1 + c.fallback_urls.len() as u64)
.unwrap_or(1)
.max(1);
Duration::from_secs(
self.settings
.provider_timeout_seconds
.saturating_mul(attempts)
.saturating_add(1),
)
}
fn scope_quotes(&self, id: ProviderId, quotes: ProviderQuotes) -> ProviderQuotes {
let cfg = match self.settings.providers.get(&id.to_string()) {
Some(c) => c,
None => return quotes,
};
if cfg.only.is_none() && cfg.except.is_none() {
return quotes;
}
quotes
.into_iter()
.filter(|(currency, _)| cfg.allows_currency(currency))
.collect()
}
fn observe_warnings(&self, aggregates: &HashMap<String, AggregateResult>) {
for (currency, agg) in aggregates {
let key = currency.to_uppercase();
if agg.sources <= 1 {
if self.mark_warned(&self.warned_single_source, &key) {
warn!("price: {} now has a single source", currency);
}
} else {
self.clear_warned(&self.warned_single_source, &key);
}
}
}
pub fn get_price(&self, currency: &str) -> Result<f64, MostroError> {
let now = Utc::now().timestamp();
let key = currency.to_uppercase();
match self
.store
.get(currency, self.settings.max_price_staleness_seconds, now)
{
Ok(value) => {
self.observe_freshness(currency, &key, now);
Ok(value)
}
Err(PriceError::TooStale) => {
let snap = self.store.snapshot(currency);
if let Some(entry) = snap {
let age = now.saturating_sub(entry.as_of);
if self.mark_warned(&self.warned_stale, &key) {
warn!(
"price: {} is past staleness window ({}s old) — Phase 1 still serves it",
currency, age
);
}
Ok(entry.value)
} else {
Err(MostroError::MostroInternalErr(ServiceError::NoAPIResponse))
}
}
Err(PriceError::NoCurrency) => {
Err(MostroError::MostroInternalErr(ServiceError::NoAPIResponse))
}
}
}
#[cfg(test)]
pub fn store(&self) -> &PriceStore {
&self.store
}
fn observe_freshness(&self, currency: &str, key: &str, now: i64) {
let Some(entry) = self.store.snapshot(currency) else {
return;
};
let age = now.saturating_sub(entry.as_of);
let one_interval = self.settings.update_interval_seconds as i64;
if age <= one_interval {
self.clear_warned(&self.warned_stale, key);
return;
}
if self.mark_warned(&self.warned_stale, key) {
warn!(
"price: {} is stale ({}s old, > {}s interval)",
currency, age, one_interval
);
}
}
fn mark_warned(&self, set: &RwLock<HashSet<String>>, key: &str) -> bool {
match set.write() {
Ok(mut w) => w.insert(key.to_string()),
Err(_) => false,
}
}
fn clear_warned(&self, set: &RwLock<HashSet<String>>, key: &str) {
if let Ok(mut w) = set.write() {
w.remove(key);
}
}
async fn publish_rates_to_nostr(
&self,
aggregates: &HashMap<String, AggregateResult>,
successes: &[ProviderId],
) {
let rates: HashMap<String, f64> = aggregates
.iter()
.map(|(c, a)| (c.clone(), a.value))
.collect();
let mut wrapper: HashMap<String, HashMap<String, f64>> = HashMap::new();
wrapper.insert("BTC".to_string(), rates);
let content = match serde_json::to_string(&wrapper) {
Ok(c) => c,
Err(e) => {
error!("price: failed to serialise rates for Nostr: {e}");
return;
}
};
let keys = match crate::util::get_keys() {
Ok(k) => k,
Err(e) => {
error!("price: failed to get Mostro keys for Nostr publish: {e}");
return;
}
};
let timestamp = Utc::now().timestamp();
let expiration_seconds = std::cmp::min(self.settings.update_interval_seconds * 2, 3600);
let expiration = timestamp + expiration_seconds as i64;
let source_tag = sources_to_tag(successes);
let tags = Tags::from_list(vec![
Tag::custom(
TagKind::Custom("published_at".into()),
vec![timestamp.to_string()],
),
Tag::custom(TagKind::Custom("source".into()), vec![source_tag]),
Tag::expiration(Timestamp::from(expiration as u64)),
]);
let event = match crate::nip33::new_exchange_rates_event(&keys, &content, tags) {
Ok(e) => e,
Err(e) => {
error!("price: failed to build exchange-rates event: {e}");
return;
}
};
let client = match crate::util::get_nostr_client() {
Ok(c) => c,
Err(e) => {
error!("price: failed to get Nostr client: {e}");
return;
}
};
let timeout_duration = Duration::from_secs(30);
match tokio::time::timeout(timeout_duration, client.send_event(&event)).await {
Ok(Ok(output)) => info!(
"price: published exchange rates to Nostr ({} currencies). Output: {:?}",
aggregates.len(),
output
),
Ok(Err(e)) => error!("price: send_event to relays failed: {e}"),
Err(_) => error!("price: timeout publishing exchange rates to Nostr (30s exceeded)"),
}
}
}
fn sources_to_tag(ids: &[ProviderId]) -> String {
let mut names: Vec<String> = ids.iter().map(|i| i.to_string()).collect();
names.sort();
names.join(",")
}
fn build_provider(id: ProviderId, cfg: &ProviderConfig) -> Result<Box<dyn PriceProvider>, String> {
match id {
ProviderId::Yadio => Ok(Box::new(YadioProvider::new(cfg))),
ProviderId::CoinGecko => Ok(Box::new(CoinGeckoProvider::new(cfg))),
ProviderId::CurrencyApi => Ok(Box::new(CurrencyApiProvider::new(cfg))),
ProviderId::Blockchain => Ok(Box::new(BlockchainProvider::new(cfg))),
ProviderId::ElToque => Err(format!(
"price: provider `{id}` is configured (enabled) but not yet implemented in \
this release — disable it or remove it from `[price.providers]` \
(see docs/PRICE_PROVIDERS.md §7)"
)),
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InstallError {
AlreadyInstalled,
}
impl std::fmt::Display for InstallError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
InstallError::AlreadyInstalled => f.write_str("PriceManager already installed"),
}
}
}
impl std::error::Error for InstallError {}
#[derive(Debug, Default)]
pub struct TickReport {
pub successes: Vec<ProviderId>,
pub failures: Vec<(ProviderId, String)>,
pub skipped: Vec<ProviderId>,
pub contributors: Vec<ProviderId>,
pub fresh_currencies: usize,
}
pub fn synthesise_legacy_price_settings(
bitcoin_price_api_url: &str,
exchange_rates_update_interval_seconds: u64,
publish_exchange_rates_to_nostr: bool,
) -> PriceSettings {
let mut providers = HashMap::new();
providers.insert(
ProviderId::Yadio.to_string(),
ProviderConfig {
enabled: true,
url: bitcoin_price_api_url.to_string(),
fallback_urls: Vec::new(),
api_key: None,
token: None,
only: None,
except: None,
},
);
PriceSettings {
update_interval_seconds: exchange_rates_update_interval_seconds,
publish_to_nostr: publish_exchange_rates_to_nostr,
providers,
..PriceSettings::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::price::provider::{ProviderQuotes, Quote};
use async_trait::async_trait;
struct ScriptedProvider {
id: ProviderId,
outcomes: std::sync::Mutex<Vec<Result<ProviderQuotes, ProviderError>>>,
}
impl ScriptedProvider {
fn new(id: ProviderId, outcomes: Vec<Result<ProviderQuotes, ProviderError>>) -> Self {
Self {
id,
outcomes: std::sync::Mutex::new(outcomes),
}
}
}
#[async_trait]
impl PriceProvider for ScriptedProvider {
fn id(&self) -> ProviderId {
self.id
}
async fn fetch(&self, _http: &reqwest::Client) -> Result<ProviderQuotes, ProviderError> {
let mut q = self.outcomes.lock().unwrap();
if q.is_empty() {
return Ok(ProviderQuotes::new());
}
q.remove(0)
}
}
fn manager_with_many(scripted: Vec<ScriptedProvider>) -> PriceManager {
let mut settings = PriceSettings {
publish_to_nostr: false,
provider_timeout_seconds: 5,
..PriceSettings::default()
};
let mut providers = Vec::new();
for s in scripted {
settings.providers.insert(
s.id.to_string(),
ProviderConfig {
enabled: true,
url: "http://test".into(),
fallback_urls: vec![],
api_key: None,
token: None,
only: None,
except: None,
},
);
providers.push(EnabledProvider {
id: s.id,
provider: Box::new(s),
health: Mutex::new(ProviderHealth::new()),
});
}
PriceManager {
providers,
store: Arc::new(PriceStore::new()),
settings,
http: reqwest::Client::new(),
warned_stale: RwLock::new(HashSet::new()),
warned_single_source: RwLock::new(HashSet::new()),
}
}
fn manager_with(scripted: ScriptedProvider) -> PriceManager {
manager_with_many(vec![scripted])
}
#[tokio::test]
async fn single_yadio_tick_matches_today() {
let mut quotes = ProviderQuotes::new();
quotes.insert("USD".into(), Quote::PerBtc(75_899.55));
quotes.insert("EUR".into(), Quote::PerBtc(65_393.99));
quotes.insert("ARS".into(), Quote::PerBtc(75_899_550.0));
let scripted = ScriptedProvider::new(ProviderId::Yadio, vec![Ok(quotes)]);
let manager = manager_with(scripted);
let report = manager.update_all().await;
assert_eq!(report.successes, vec![ProviderId::Yadio]);
assert_eq!(
report.contributors,
vec![ProviderId::Yadio],
"yadio's quotes all survived scoping, so it contributes"
);
assert!(report.failures.is_empty());
assert_eq!(report.fresh_currencies, 3);
assert!(
(manager.get_price("USD").unwrap() - 75_899.55).abs() < 1e-6,
"USD matches Yadio's value verbatim"
);
assert!((manager.get_price("eur").unwrap() - 65_393.99).abs() < 1e-6);
}
#[tokio::test]
async fn yadio_down_keeps_prior_values() {
let mut quotes = ProviderQuotes::new();
quotes.insert("USD".into(), Quote::PerBtc(50_000.0));
let scripted = ScriptedProvider::new(
ProviderId::Yadio,
vec![Ok(quotes), Err(ProviderError::Http("down".into()))],
);
let manager = manager_with(scripted);
manager.update_all().await;
let r = manager.update_all().await;
assert_eq!(r.successes, Vec::<ProviderId>::new());
assert_eq!(r.failures.len(), 1);
assert!((manager.get_price("USD").unwrap() - 50_000.0).abs() < 1e-6);
}
#[tokio::test]
async fn no_providers_returns_no_currency() {
let settings = PriceSettings {
publish_to_nostr: false,
..PriceSettings::default()
};
let manager = PriceManager {
providers: vec![],
store: Arc::new(PriceStore::new()),
settings,
http: reqwest::Client::new(),
warned_stale: RwLock::new(HashSet::new()),
warned_single_source: RwLock::new(HashSet::new()),
};
let r = manager.update_all().await;
assert_eq!(r.fresh_currencies, 0);
assert!(manager.get_price("USD").is_err());
}
#[tokio::test]
async fn scoping_only_keeps_in_scope_currencies() {
let mut quotes = ProviderQuotes::new();
quotes.insert("USD".into(), Quote::PerBtc(50_000.0));
quotes.insert("CUP".into(), Quote::PerBtc(20_000_000.0));
let scripted = ScriptedProvider::new(ProviderId::Yadio, vec![Ok(quotes)]);
let mut manager = manager_with(scripted);
manager
.settings
.providers
.get_mut(&ProviderId::Yadio.to_string())
.unwrap()
.only = Some(vec!["CUP".into()]);
manager.update_all().await;
assert!(manager.get_price("CUP").is_ok());
assert!(manager.get_price("USD").is_err());
}
#[test]
fn synthesise_legacy_builds_single_yadio_provider() {
let cfg = synthesise_legacy_price_settings("https://api.yadio.io", 600, false);
assert_eq!(cfg.update_interval_seconds, 600);
assert!(!cfg.publish_to_nostr);
let yadio = cfg
.providers
.get("yadio")
.expect("legacy migration must enable yadio");
assert!(yadio.enabled);
assert_eq!(yadio.url, "https://api.yadio.io");
cfg.validate().expect("synthesised config must validate");
}
#[test]
fn from_settings_rejects_unimplemented_provider_id() {
let mut settings = PriceSettings::default();
settings.providers.insert(
ProviderId::ElToque.to_string(),
ProviderConfig {
enabled: true,
url: "https://tasas.eltoque.com".into(),
fallback_urls: vec![],
api_key: None,
token: Some("x".into()),
only: None,
except: None,
},
);
assert!(PriceManager::from_settings(settings).is_err());
}
#[test]
fn from_settings_builds_all_phase2_providers() {
let mut settings = PriceSettings::default();
for (id, url) in [
(ProviderId::Yadio, "https://api.yadio.io"),
(ProviderId::CoinGecko, "https://api.coingecko.com/api/v3"),
(ProviderId::CurrencyApi, "https://currency-api.pages.dev/v1"),
(ProviderId::Blockchain, "https://blockchain.info"),
] {
settings.providers.insert(
id.to_string(),
ProviderConfig {
enabled: true,
url: url.into(),
fallback_urls: vec![],
api_key: None,
token: None,
only: None,
except: None,
},
);
}
let m = PriceManager::from_settings(settings).expect("phase 2 registry builds");
assert_eq!(m.providers.len(), 4);
}
#[test]
fn from_settings_ignores_unknown_id() {
let mut settings = PriceSettings::default();
settings.providers.insert(
"future_provider".to_string(),
ProviderConfig {
enabled: true,
url: "http://x".into(),
fallback_urls: vec![],
api_key: None,
token: None,
only: None,
except: None,
},
);
let m = PriceManager::from_settings(settings).expect("unknown id is non-fatal");
assert!(m.providers.is_empty());
}
#[test]
fn from_settings_skips_disabled_providers() {
let mut settings = PriceSettings::default();
settings.providers.insert(
ProviderId::Yadio.to_string(),
ProviderConfig {
enabled: false,
url: "https://api.yadio.io".into(),
fallback_urls: vec![],
api_key: None,
token: None,
only: None,
except: None,
},
);
let m = PriceManager::from_settings(settings).unwrap();
assert!(m.providers.is_empty());
}
#[test]
fn sources_to_tag_is_deterministic() {
let tag = sources_to_tag(&[ProviderId::CoinGecko, ProviderId::Yadio]);
assert_eq!(tag, "coingecko,yadio");
}
#[tokio::test]
async fn scoped_out_provider_is_success_but_not_contributor() {
let mut quotes = ProviderQuotes::new();
quotes.insert("USD".into(), Quote::PerBtc(50_000.0));
let scripted = ScriptedProvider::new(ProviderId::Yadio, vec![Ok(quotes)]);
let mut manager = manager_with(scripted);
manager
.settings
.providers
.get_mut(&ProviderId::Yadio.to_string())
.unwrap()
.only = Some(vec!["MLC".into()]);
let report = manager.update_all().await;
assert_eq!(report.successes, vec![ProviderId::Yadio]);
assert!(
report.contributors.is_empty(),
"scoped-out provider must not appear in the Nostr source tag"
);
assert_eq!(report.fresh_currencies, 0);
}
fn quotes_of(pairs: &[(&str, f64)]) -> ProviderQuotes {
pairs
.iter()
.map(|(c, v)| (c.to_string(), Quote::PerBtc(*v)))
.collect()
}
#[tokio::test]
async fn multi_source_aggregate_is_median_plus_outlier_mean() {
let providers = vec![
ScriptedProvider::new(
ProviderId::Yadio,
vec![Ok(quotes_of(&[("USD", 50_000.0), ("EUR", 43_000.0)]))],
),
ScriptedProvider::new(
ProviderId::CoinGecko,
vec![Ok(quotes_of(&[("USD", 50_500.0), ("EUR", 43_200.0)]))],
),
ScriptedProvider::new(
ProviderId::Blockchain,
vec![Ok(quotes_of(&[("USD", 49_500.0), ("EUR", 42_800.0)]))],
),
ScriptedProvider::new(
ProviderId::CurrencyApi,
vec![Ok(quotes_of(&[("USD", 80_000.0)]))], ),
];
let manager = manager_with_many(providers);
let report = manager.update_all().await;
assert_eq!(report.successes.len(), 4);
let usd = manager.get_price("USD").unwrap();
assert!(
(usd - 50_000.0).abs() < 1e-6,
"outlier must be discarded before the mean, got {usd}"
);
let eur = manager.get_price("EUR").unwrap();
assert!(
(eur - 43_000.0).abs() < 1e-6,
"median-anchored mean, got {eur}"
);
assert!(report.contributors.contains(&ProviderId::Yadio));
assert!(!report.contributors.contains(&ProviderId::CurrencyApi));
}
#[tokio::test]
async fn lowercase_and_uppercase_codes_combine() {
let providers = vec![
ScriptedProvider::new(ProviderId::Yadio, vec![Ok(quotes_of(&[("USD", 50_000.0)]))]),
ScriptedProvider::new(
ProviderId::CurrencyApi,
vec![Ok(quotes_of(&[("usd", 51_000.0)]))],
),
];
let manager = manager_with_many(providers);
manager.update_all().await;
let usd = manager.get_price("USD").unwrap();
assert!(
(usd - 50_500.0).abs() < 1e-6,
"two casings must form ONE two-source aggregate (mean), got {usd}"
);
}
#[tokio::test]
async fn non_fiat_codes_are_dropped_by_allowlist() {
let providers = vec![ScriptedProvider::new(
ProviderId::CurrencyApi,
vec![Ok(quotes_of(&[
("usd", 50_000.0),
("eth", 37.8),
("bnb", 150.0),
("xau", 25.0),
("btc", 1.0),
]))],
)];
let manager = manager_with_many(providers);
let report = manager.update_all().await;
assert_eq!(
report.fresh_currencies, 1,
"only USD survives the allowlist"
);
assert!(manager.get_price("USD").is_ok());
assert!(manager.get_price("ETH").is_err());
assert!(manager.get_price("BTC").is_err());
}
#[tokio::test]
async fn official_cup_is_scoped_out_by_except() {
let providers = vec![
ScriptedProvider::new(
ProviderId::Yadio,
vec![Ok(quotes_of(&[("USD", 50_000.0), ("CUP", 20_000_000.0)]))],
),
ScriptedProvider::new(
ProviderId::CurrencyApi,
vec![Ok(quotes_of(&[("usd", 50_100.0), ("cup", 1_300_000.0)]))],
),
];
let mut manager = manager_with_many(providers);
manager
.settings
.providers
.get_mut(&ProviderId::CurrencyApi.to_string())
.unwrap()
.except = Some(vec!["CUP".into(), "MLC".into()]);
manager.update_all().await;
let cup = manager.get_price("CUP").unwrap();
assert!(
(cup - 20_000_000.0).abs() < 1e-6,
"official-rate CUP must never enter the aggregate (got {cup}); \
with only 2 sources the outlier guard cannot save us — scoping must"
);
let usd = manager.get_price("USD").unwrap();
assert!((usd - 50_050.0).abs() < 1e-6);
}
#[tokio::test]
async fn provider_down_falls_back_to_remaining_sources() {
let providers = vec![
ScriptedProvider::new(ProviderId::Yadio, vec![Ok(quotes_of(&[("USD", 50_000.0)]))]),
ScriptedProvider::new(
ProviderId::CoinGecko,
vec![Err(ProviderError::Http("down".into()))],
),
];
let manager = manager_with_many(providers);
let report = manager.update_all().await;
assert_eq!(report.successes, vec![ProviderId::Yadio]);
assert_eq!(report.failures.len(), 1);
assert!((manager.get_price("USD").unwrap() - 50_000.0).abs() < 1e-6);
}
#[tokio::test]
async fn circuit_breaker_skips_after_threshold_failures() {
let scripted = ScriptedProvider::new(
ProviderId::CoinGecko,
vec![
Err(ProviderError::Http("down".into())),
Ok(quotes_of(&[("USD", 50_000.0)])),
],
);
let mut manager = manager_with(scripted);
manager.settings.provider_failure_threshold = 1;
manager.settings.provider_failure_cooldown_seconds = 3_600;
let first = manager.update_all().await;
assert_eq!(
first.failures.len(),
1,
"tick 1: the failure trips the breaker"
);
assert!(first.skipped.is_empty());
let second = manager.update_all().await;
assert_eq!(
second.skipped,
vec![ProviderId::CoinGecko],
"tick 2: open breaker → skipped without polling"
);
assert!(
second.successes.is_empty(),
"the scripted Ok was never consumed"
);
assert!(second.failures.is_empty(), "skipped ≠ failed");
}
#[test]
fn poll_budget_scales_with_fallback_urls() {
let scripted = ScriptedProvider::new(ProviderId::CurrencyApi, vec![]);
let mut manager = manager_with(scripted);
manager.settings.provider_timeout_seconds = 10;
assert_eq!(
manager.poll_budget(ProviderId::CurrencyApi),
Duration::from_secs(11)
);
manager
.settings
.providers
.get_mut(&ProviderId::CurrencyApi.to_string())
.unwrap()
.fallback_urls = vec!["http://m1".into(), "http://m2".into()];
assert_eq!(
manager.poll_budget(ProviderId::CurrencyApi),
Duration::from_secs(31)
);
assert_eq!(
manager.poll_budget(ProviderId::Blockchain),
Duration::from_secs(11)
);
}
#[tokio::test]
async fn breaker_success_after_cooldown_resets() {
let scripted = ScriptedProvider::new(
ProviderId::CoinGecko,
vec![
Err(ProviderError::Http("down".into())),
Ok(quotes_of(&[("USD", 50_000.0)])),
Ok(quotes_of(&[("USD", 50_100.0)])),
],
);
let mut manager = manager_with(scripted);
manager.settings.provider_failure_threshold = 1;
manager.settings.provider_failure_cooldown_seconds = 0;
manager.update_all().await; let second = manager.update_all().await; assert_eq!(second.successes, vec![ProviderId::CoinGecko]);
let third = manager.update_all().await;
assert_eq!(third.successes, vec![ProviderId::CoinGecko]);
assert!(third.skipped.is_empty());
}
#[tokio::test]
async fn stale_warning_is_one_shot_then_re_arms_on_fresh_read() {
let mut quotes = ProviderQuotes::new();
quotes.insert("USD".into(), Quote::PerBtc(50_000.0));
let scripted = ScriptedProvider::new(ProviderId::Yadio, vec![Ok(quotes.clone())]);
let mut manager = manager_with(scripted);
manager.settings.max_price_staleness_seconds = 1;
manager.settings.update_interval_seconds = 1;
let mut agg = HashMap::new();
agg.insert(
"USD".to_string(),
AggregateResult {
value: 50_000.0,
sources: 1,
contributors: vec![ProviderId::Yadio],
},
);
let now = Utc::now().timestamp();
manager.store.update(agg, now - 1_000_000);
for _ in 0..10 {
let _ = manager.get_price("USD");
}
assert_eq!(
manager.warned_stale.read().unwrap().len(),
1,
"TooStale must warn at most once between fresh reads"
);
let mut fresh = HashMap::new();
fresh.insert(
"USD".to_string(),
AggregateResult {
value: 50_000.0,
sources: 1,
contributors: vec![ProviderId::Yadio],
},
);
let fresh_now = Utc::now().timestamp();
manager.store.update(fresh, fresh_now);
let _ = manager.get_price("USD");
assert!(
manager.warned_stale.read().unwrap().is_empty(),
"fresh read must re-arm the stale guard"
);
}
}