use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, bail};
use fedimint_api_client::api::DynGlobalApi;
use fedimint_core::config::ClientConfig;
use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::envs::is_running_in_test_env;
use fedimint_core::net::api_announcement::SignedApiAnnouncement;
use fedimint_core::net::guardian_metadata::SignedGuardianMetadata;
use fedimint_core::runtime::{self, sleep};
use fedimint_core::secp256k1::SECP256K1;
use fedimint_core::util::backoff_util::custom_backoff;
use fedimint_core::util::{FmtCompactAnyhow as _, SafeUrl};
use fedimint_core::{NumPeersExt as _, PeerId, impl_db_lookup, impl_db_record};
use fedimint_logging::LOG_CLIENT;
use futures::stream::{FuturesUnordered, StreamExt as _};
use tracing::debug;
use crate::Client;
use crate::db::DbKeyPrefix;
use crate::guardian_metadata::GuardianMetadataPrefix;
#[derive(Clone, Debug, Encodable, Decodable)]
pub struct ApiAnnouncementKey(pub PeerId);
#[derive(Clone, Debug, Encodable, Decodable)]
pub struct ApiAnnouncementPrefix;
impl_db_record!(
key = ApiAnnouncementKey,
value = SignedApiAnnouncement,
db_prefix = DbKeyPrefix::ApiUrlAnnouncement,
notify_on_modify = false,
);
impl_db_lookup!(
key = ApiAnnouncementKey,
query_prefix = ApiAnnouncementPrefix
);
pub(crate) async fn run_api_announcement_refresh_task(client_inner: Arc<Client>) {
let guardian_pub_keys = client_inner.get_guardian_public_keys_blocking().await;
loop {
if let Err(err) = {
let api: &DynGlobalApi = &client_inner.api;
let results = fetch_api_announcements_from_at_least_num_of_peers(
1,
api,
&guardian_pub_keys,
if is_running_in_test_env() {
Duration::from_millis(1)
} else {
Duration::from_secs(30)
},
)
.await;
store_api_announcements_updates_from_peers(client_inner.db(), &results).await
} {
debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Refreshing api announcements failed");
}
let duration = if is_running_in_test_env() {
Duration::from_secs(1)
} else {
Duration::from_secs(3600)
};
sleep(duration).await;
}
}
pub(crate) async fn store_api_announcements_updates_from_peers(
db: &Database,
updates: &[BTreeMap<PeerId, SignedApiAnnouncement>],
) -> Result<(), anyhow::Error> {
for announcements in updates {
store_api_announcement_updates(db, announcements).await;
}
Ok(())
}
pub(crate) type PeersSignedApiAnnouncements = BTreeMap<PeerId, SignedApiAnnouncement>;
pub(crate) async fn fetch_api_announcements_from_at_least_num_of_peers(
num_responses_required: usize,
api: &DynGlobalApi,
guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
extra_response_wait: Duration,
) -> Vec<PeersSignedApiAnnouncements> {
let num_peers = guardian_pub_keys.to_num_peers();
let mut backoff = custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
async fn make_request(
delay: Duration,
peer_id: PeerId,
api: &DynGlobalApi,
guardian_pub_keys: &BTreeMap<PeerId, bitcoin::secp256k1::PublicKey>,
) -> (PeerId, anyhow::Result<PeersSignedApiAnnouncements>) {
runtime::sleep(delay).await;
let result = async {
let announcements = api.api_announcements(peer_id).await.with_context(move || {
format!("Fetching API announcements from peer {peer_id} failed")
})?;
for (peer_id, announcement) in &announcements {
let Some(guardian_pub_key) = guardian_pub_keys.get(peer_id) else {
bail!("Guardian public key not found for peer {}", peer_id);
};
if !announcement.verify(SECP256K1, guardian_pub_key) {
bail!("Failed to verify announcement for peer {}", peer_id);
}
}
Ok(announcements)
}
.await;
(peer_id, result)
}
let mut requests = FuturesUnordered::new();
for peer_id in num_peers.peer_ids() {
requests.push(make_request(
Duration::ZERO,
peer_id,
api,
guardian_pub_keys,
));
}
let mut responses = Vec::new();
loop {
let next_response = if responses.len() < num_responses_required {
requests.next().await
} else {
fedimint_core::runtime::timeout(extra_response_wait, requests.next())
.await
.ok()
.flatten()
};
let Some((peer_id, response)) = next_response else {
break;
};
match response {
Err(err) => {
debug!(
target: LOG_CLIENT,
%peer_id,
err = %err.fmt_compact_anyhow(),
"Failed to fetch API announcements from peer"
);
requests.push(make_request(
backoff.next().expect("Keeps retrying"),
peer_id,
api,
guardian_pub_keys,
));
}
Ok(announcements) => {
responses.push(announcements);
}
}
}
responses
}
pub(crate) async fn store_api_announcement_updates(
db: &Database,
announcements: &BTreeMap<PeerId, SignedApiAnnouncement>,
) {
db
.autocommit(
|dbtx, _|{
let announcements_inner = announcements.clone();
Box::pin(async move {
for (peer, new_announcement) in announcements_inner {
let replace_current_announcement = dbtx
.get_value(&ApiAnnouncementKey(peer))
.await.is_none_or(|current_announcement| {
current_announcement.api_announcement.nonce
< new_announcement.api_announcement.nonce
});
if replace_current_announcement {
debug!(target: LOG_CLIENT, ?peer, %new_announcement.api_announcement.api_url, "Updating API announcement");
dbtx.insert_entry(&ApiAnnouncementKey(peer), &new_announcement)
.await;
}
}
Result::<(), ()>::Ok(())
})},
None,
)
.await
.expect("Will never return an error");
}
pub async fn get_api_urls(db: &Database, cfg: &ClientConfig) -> BTreeMap<PeerId, SafeUrl> {
let mut dbtx = db.begin_transaction_nc().await;
let guardian_metadata: BTreeMap<PeerId, SignedGuardianMetadata> = dbtx
.find_by_prefix(&GuardianMetadataPrefix)
.await
.map(|(key, metadata)| (key.0, metadata))
.collect()
.await;
let api_announcements: BTreeMap<PeerId, SignedApiAnnouncement> = dbtx
.find_by_prefix(&ApiAnnouncementPrefix)
.await
.map(|(key, announcement)| (key.0, announcement))
.collect()
.await;
cfg.global
.api_endpoints
.iter()
.map(|(peer_id, peer_url)| {
let url = guardian_metadata
.get(peer_id)
.and_then(|m| m.guardian_metadata().api_urls.first().cloned())
.or_else(|| {
api_announcements
.get(peer_id)
.map(|a| a.api_announcement.api_url.clone())
})
.unwrap_or_else(|| peer_url.url.clone());
(*peer_id, url)
})
.collect()
}