use std::collections::BTreeMap;
use std::pin::pin;
use std::sync::Arc;
use async_stream::stream;
use fedimint_client_module::meta::{FetchKind, MetaSource, MetaValue, MetaValues};
use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
use fedimint_core::task::waiter::Waiter;
use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
use fedimint_logging::LOG_CLIENT;
use futures::StreamExt as _;
use serde::de::DeserializeOwned;
use tokio::sync::Notify;
use tokio_stream::Stream;
use tracing::warn;
use crate::Client;
use crate::db::{
MetaFieldKey, MetaFieldPrefix, MetaFieldValue, MetaServiceInfo, MetaServiceInfoKey,
};
pub struct MetaService<S: ?Sized = dyn MetaSource> {
initial_fetch_waiter: Waiter,
meta_update_notify: Notify,
source: S,
}
pub type MetaEntries = BTreeMap<String, serde_json::Value>;
impl<S: MetaSource + ?Sized> MetaService<S> {
pub fn new(source: S) -> Arc<MetaService>
where
S: Sized,
{
Arc::new(MetaService {
initial_fetch_waiter: Waiter::new(),
meta_update_notify: Notify::new(),
source,
})
}
pub async fn get_field<V: DeserializeOwned + 'static>(
&self,
db: &Database,
field: &str,
) -> Option<MetaValue<V>> {
match self.get_field_from_db(db, field).await {
Some(value) => {
Some(value)
}
_ => {
self.initial_fetch_waiter.wait().await;
self.get_field_from_db(db, field).await
}
}
}
async fn get_field_from_db<V: DeserializeOwned + 'static>(
&self,
db: &Database,
field: &str,
) -> Option<MetaValue<V>> {
let dbtx = &mut db.begin_transaction_nc().await;
let info = dbtx.get_value(&MetaServiceInfoKey).await?;
let value = dbtx
.get_value(&MetaFieldKey(fedimint_client_module::meta::MetaFieldKey(
field.to_string(),
)))
.await
.and_then(|value| {
serde_json::from_value(value.0.0)
.inspect_err(|err| {
warn!(target: LOG_CLIENT,
%field,
type = std::any::type_name::<V>(),
err = %err.fmt_compact(),
"Failed to deserialize meta value as the requested type")
})
.ok()
});
Some(MetaValue {
fetch_time: info.last_updated,
value,
})
}
pub async fn entries(&self, db: &Database) -> Option<MetaEntries> {
if let Some(value) = self.entries_from_db(db).await {
Some(value)
} else {
self.wait_initialization().await;
self.entries_from_db(db).await
}
}
async fn entries_from_db(&self, db: &Database) -> Option<MetaEntries> {
let dbtx = &mut db.begin_transaction_nc().await;
let info = dbtx.get_value(&MetaServiceInfoKey).await;
#[allow(clippy::question_mark)] if info.is_none() {
return None;
}
let entries: MetaEntries = dbtx
.find_by_prefix(&MetaFieldPrefix)
.await
.map(|(k, v)| (k.0.0, v.0.0))
.collect()
.await;
Some(entries)
}
async fn current_revision(&self, dbtx: &mut DatabaseTransaction<'_>) -> Option<u64> {
dbtx.get_value(&MetaServiceInfoKey)
.await
.map(|x| x.revision)
}
pub async fn wait_initialization(&self) {
self.initial_fetch_waiter.wait().await;
}
pub fn subscribe_to_updates(&self) -> impl Stream<Item = ()> + '_ {
stream! {
let mut notify = pin!(self.meta_update_notify.notified());
loop {
notify.as_mut().await;
notify.set(self.meta_update_notify.notified());
notify.as_mut().enable();
yield ();
}
}
}
pub fn subscribe_to_field<'a, V: DeserializeOwned + 'static>(
&'a self,
db: &'a Database,
name: &'a str,
) -> impl Stream<Item = Option<MetaValue<V>>> + 'a {
stream! {
let mut update_stream = pin!(self.subscribe_to_updates());
loop {
let value = self.get_field_from_db(db, name).await;
yield value;
if update_stream.next().await.is_none() {
break;
}
}
}
}
pub(crate) async fn update_continuously(&self, client: &Client) -> ! {
let mut current_revision = self
.current_revision(&mut client.db().begin_transaction_nc().await)
.await;
let client_config = client.config().await;
let meta_values = self
.source
.fetch(
&client_config,
&client.api,
FetchKind::Initial,
current_revision,
)
.await;
let failed_initial = meta_values.is_err();
match meta_values {
Ok(meta_values) => self.save_meta_values(client, &meta_values).await,
Err(error) => {
warn!(target: LOG_CLIENT, err = %error.fmt_compact_anyhow(), "failed to fetch source");
}
};
self.initial_fetch_waiter.done();
if !failed_initial {
self.source.wait_for_update().await;
}
loop {
if let Ok(meta_values) = self
.source
.fetch(
&client_config,
&client.api,
FetchKind::Background,
current_revision,
)
.await
{
current_revision = Some(meta_values.revision);
self.save_meta_values(client, &meta_values).await;
}
self.source.wait_for_update().await;
}
}
async fn save_meta_values(&self, client: &Client, meta_values: &MetaValues) {
let mut dbtx = client.db().begin_transaction().await;
dbtx.remove_by_prefix(&MetaFieldPrefix).await;
dbtx.insert_entry(
&MetaServiceInfoKey,
&MetaServiceInfo {
last_updated: fedimint_core::time::now(),
revision: meta_values.revision,
},
)
.await;
for (key, value) in &meta_values.values {
dbtx.insert_entry(&MetaFieldKey(key.clone()), &MetaFieldValue(value.clone()))
.await;
}
dbtx.commit_tx().await;
self.meta_update_notify.notify_waiters();
}
}