use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use cdk_common::database::{self, WalletDatabase};
use cdk_common::mint_url::MintUrl;
use cdk_common::nuts::{KeySetInfo, Keys};
use cdk_common::parking_lot::RwLock;
use cdk_common::{CurrencyUnit, KeySet, MintInfo};
use tokio::sync::Mutex;
use web_time::Instant;
use crate::nuts::Id;
use crate::wallet::{AuthMintConnector, AuthWallet, MintConnector};
use crate::{Error, Wallet};
#[derive(Clone, Debug)]
pub struct FreshnessStatus {
pub is_populated: bool,
pub updated_at: Instant,
version: usize,
}
impl Default for FreshnessStatus {
fn default() -> Self {
Self {
is_populated: false,
updated_at: Instant::now(),
version: 0,
}
}
}
#[derive(Clone, Debug, Default)]
pub struct MintMetadata {
pub mint_info: MintInfo,
pub keysets: HashMap<Id, Arc<KeySetInfo>>,
pub keys: HashMap<Id, Arc<Keys>>,
pub active_keysets: Vec<Arc<KeySetInfo>>,
status: FreshnessStatus,
auth_status: FreshnessStatus,
}
#[derive(Clone)]
pub struct MintMetadataCache {
mint_url: MintUrl,
metadata: Arc<ArcSwap<MintMetadata>>,
db_sync_versions: Arc<RwLock<HashMap<usize, usize>>>,
fetch_lock: Arc<Mutex<()>>,
}
impl std::fmt::Debug for MintMetadataCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MintMetadataCache")
.field("mint_url", &self.mint_url)
.field("is_populated", &self.metadata.load().status.is_populated)
.field("keyset_count", &self.metadata.load().keysets.len())
.finish()
}
}
impl Wallet {
pub fn set_metadata_cache_ttl(&self, ttl: Option<Duration>) {
let mut guarded_ttl = self.metadata_cache_ttl.write();
*guarded_ttl = ttl;
}
pub fn get_metadata_cache_info(&self) -> FreshnessStatus {
self.metadata_cache.metadata.load().status.clone()
}
}
impl AuthWallet {
pub fn get_metadata_cache_info(&self) -> FreshnessStatus {
self.metadata_cache.metadata.load().auth_status.clone()
}
}
impl MintMetadataCache {
fn arc_pointer_id<T>(arc: &Arc<T>) -> usize
where
T: ?Sized,
{
Arc::as_ptr(arc) as *const () as usize
}
pub fn new(mint_url: MintUrl) -> Self {
Self {
mint_url,
metadata: Arc::new(ArcSwap::default()),
db_sync_versions: Arc::new(Default::default()),
fetch_lock: Arc::new(Mutex::new(())),
}
}
#[inline(always)]
pub async fn load_from_mint(
&self,
storage: &Arc<dyn WalletDatabase<database::Error> + Send + Sync>,
client: &Arc<dyn MintConnector + Send + Sync>,
) -> Result<Arc<MintMetadata>, Error> {
let current_version = self.metadata.load().status.version;
let _guard = self.fetch_lock.lock().await;
let current_metadata = self.metadata.load().clone();
if current_metadata.status.is_populated && current_metadata.status.version > current_version
{
tracing::debug!(
"Cache was updated while waiting for fetch lock, returning cached data"
);
return Ok(current_metadata);
}
if let Some(keysets) = storage.get_mint_keysets(self.mint_url.clone()).await? {
let mut updated_metadata = (*self.metadata.load().clone()).clone();
for keyset_info in keysets {
if let Some(keys) = storage.get_keys(&keyset_info.id).await? {
tracing::trace!("Loaded keys for keyset {} from database", keyset_info.id);
updated_metadata.keys.insert(keyset_info.id, Arc::new(keys));
}
}
self.metadata.store(Arc::new(updated_metadata));
}
let metadata = self.fetch_from_http(Some(client), None).await?;
self.database_sync(storage.clone(), metadata.clone()).await;
Ok(metadata)
}
#[inline(always)]
pub async fn load(
&self,
storage: &Arc<dyn WalletDatabase<database::Error> + Send + Sync>,
client: &Arc<dyn MintConnector + Send + Sync>,
ttl: Option<Duration>,
) -> Result<Arc<MintMetadata>, Error> {
let cached_metadata = self.metadata.load().clone();
let storage_id = Self::arc_pointer_id(storage);
let db_synced_version = self
.db_sync_versions
.read()
.get(&storage_id)
.cloned()
.unwrap_or_default();
if cached_metadata.status.is_populated
&& ttl
.map(|ttl| cached_metadata.status.updated_at + ttl > Instant::now())
.unwrap_or(true)
{
if db_synced_version != cached_metadata.status.version {
self.database_sync(storage.clone(), cached_metadata.clone())
.await;
}
return Ok(cached_metadata);
}
self.load_from_mint(storage, client).await
}
pub async fn load_auth(
&self,
storage: &Arc<dyn WalletDatabase<database::Error> + Send + Sync>,
auth_client: &Arc<dyn AuthMintConnector + Send + Sync>,
) -> Result<Arc<MintMetadata>, Error> {
let cached_metadata = self.metadata.load().clone();
let storage_id = Self::arc_pointer_id(storage);
let db_synced_version = self
.db_sync_versions
.read()
.get(&storage_id)
.cloned()
.unwrap_or_default();
if cached_metadata.auth_status.is_populated
&& cached_metadata.auth_status.updated_at > Instant::now()
{
if db_synced_version != cached_metadata.status.version {
self.database_sync(storage.clone(), cached_metadata.clone())
.await;
}
return Ok(cached_metadata);
}
let _guard = self.fetch_lock.lock().await;
let current_metadata = self.metadata.load().clone();
if current_metadata.auth_status.is_populated
&& current_metadata.auth_status.updated_at > Instant::now()
{
tracing::debug!(
"Auth cache was updated while waiting for fetch lock, returning cached data"
);
return Ok(current_metadata);
}
if let Some(keysets) = storage.get_mint_keysets(self.mint_url.clone()).await? {
let mut updated_metadata = (*self.metadata.load().clone()).clone();
for keyset_info in keysets {
if let Some(keys) = storage.get_keys(&keyset_info.id).await? {
tracing::trace!(
"Loaded keys for keyset {} from database (auth)",
keyset_info.id
);
updated_metadata.keys.insert(keyset_info.id, Arc::new(keys));
}
}
self.metadata.store(Arc::new(updated_metadata));
}
let metadata = self.fetch_from_http(None, Some(auth_client)).await?;
self.database_sync(storage.clone(), metadata.clone()).await;
Ok(metadata)
}
async fn database_sync(
&self,
storage: Arc<dyn WalletDatabase<database::Error> + Send + Sync>,
metadata: Arc<MintMetadata>,
) {
let mint_url = self.mint_url.clone();
let db_sync_versions = self.db_sync_versions.clone();
Self::persist_to_database(mint_url, storage, metadata, db_sync_versions).await
}
async fn persist_to_database(
mint_url: MintUrl,
storage: Arc<dyn WalletDatabase<database::Error> + Send + Sync>,
metadata: Arc<MintMetadata>,
db_sync_versions: Arc<RwLock<HashMap<usize, usize>>>,
) {
let storage_id = Self::arc_pointer_id(&storage);
{
let mut versions = db_sync_versions.write();
let current_synced_version = versions.get(&storage_id).cloned().unwrap_or_default();
if metadata.status.version <= current_synced_version {
return;
}
versions.insert(storage_id, metadata.status.version);
}
storage
.add_mint(mint_url.clone(), Some(metadata.mint_info.clone()))
.await
.inspect_err(|e| tracing::warn!("Failed to save mint info for {}: {}", mint_url, e))
.ok();
let keysets: Vec<_> = metadata.keysets.values().map(|ks| (**ks).clone()).collect();
if !keysets.is_empty() {
storage
.add_mint_keysets(mint_url.clone(), keysets)
.await
.inspect_err(|e| tracing::warn!("Failed to save keysets for {}: {}", mint_url, e))
.ok();
}
for (keyset_id, keys) in &metadata.keys {
if let Some(keyset_info) = metadata.keysets.get(keyset_id) {
if storage.get_keys(keyset_id).await.ok().flatten().is_some() {
tracing::trace!(
"Keys for keyset {} already in database, skipping insert",
keyset_id
);
continue;
}
let keyset = KeySet {
id: *keyset_id,
unit: keyset_info.unit.clone(),
active: Some(keyset_info.active),
input_fee_ppk: keyset_info.input_fee_ppk,
final_expiry: keyset_info.final_expiry,
keys: (**keys).clone(),
};
storage
.add_keys(keyset)
.await
.inspect_err(|e| {
tracing::warn!(
"Failed to save keys for keyset {} at {}: {}",
keyset_id,
mint_url,
e
)
})
.ok();
}
}
}
async fn fetch_from_http(
&self,
client: Option<&Arc<dyn MintConnector + Send + Sync>>,
auth_client: Option<&Arc<dyn AuthMintConnector + Send + Sync>>,
) -> Result<Arc<MintMetadata>, Error> {
tracing::debug!("Fetching mint metadata from HTTP for {}", self.mint_url);
let mut new_metadata = (*self.metadata.load().clone()).clone();
let mut keysets_to_fetch = Vec::new();
if let Some(client) = client.as_ref() {
new_metadata.mint_info = client.get_mint_info().await.inspect_err(|err| {
tracing::error!("Failed to fetch mint info for {}: {}", self.mint_url, err);
})?;
keysets_to_fetch.extend(
client
.get_mint_keysets()
.await
.inspect_err(|err| {
tracing::error!("Failed to fetch keysets for {}: {}", self.mint_url, err);
})?
.keysets,
);
}
if let Some(auth_client) = auth_client.as_ref() {
keysets_to_fetch.extend(auth_client.get_mint_blind_auth_keysets().await?.keysets);
}
tracing::debug!(
"Fetched {} keysets for {}",
keysets_to_fetch.len(),
self.mint_url
);
for keyset_info in keysets_to_fetch {
let keyset_arc = Arc::new(keyset_info.clone());
new_metadata
.keysets
.insert(keyset_info.id, keyset_arc.clone());
if keyset_info.active {
new_metadata.active_keysets.push(keyset_arc);
}
if let std::collections::hash_map::Entry::Vacant(e) =
new_metadata.keys.entry(keyset_info.id)
{
let keyset = if keyset_info.unit == CurrencyUnit::Auth {
auth_client
.as_ref()
.ok_or(Error::Internal)?
.get_mint_blind_auth_keyset(keyset_info.id)
.await?
} else {
client
.as_ref()
.ok_or(Error::Internal)?
.get_mint_keyset(keyset_info.id)
.await?
};
keyset.verify_id()?;
e.insert(Arc::new(keyset.keys));
}
}
if client.is_some() {
new_metadata.status.is_populated = true;
new_metadata.status.updated_at = Instant::now();
new_metadata.status.version += 1;
}
if auth_client.is_some() {
new_metadata.auth_status.is_populated = true;
new_metadata.auth_status.updated_at = Instant::now();
new_metadata.auth_status.version += 1;
}
tracing::info!(
"Updated cache for {} with {} keysets (version {})",
self.mint_url,
new_metadata.keysets.len(),
new_metadata.status.version
);
let metadata_arc = Arc::new(new_metadata);
self.metadata.store(metadata_arc.clone());
Ok(metadata_arc)
}
pub fn mint_url(&self) -> &MintUrl {
&self.mint_url
}
}