use crate::deposits_buffer::DepositsBuffer;
use crate::error::CredentialProxyError;
use crate::quorum_checker::QuorumStateChecker;
use crate::shared_state::CredentialProxyState;
use crate::shared_state::ecash_state::EcashState;
use crate::shared_state::nyxd_client::ChainClient;
use crate::shared_state::required_deposit_cache::RequiredDepositCache;
use crate::storage::CredentialProxyStorage;
use crate::storage::pruner::StoragePruner;
use crate::webhook::ZkNymWebhook;
use nym_credentials::ecash::utils::ecash_default_expiration_date;
use nym_validator_client::nym_api::EpochId;
use std::future::Future;
use std::time::Duration;
use time::Date;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
mod shares_handlers;
pub mod ticketbook_handlers;
pub mod wallet_shares;
#[derive(Clone, Default)]
pub struct ShutdownTracker {
pub shutdown_token: CancellationToken,
pub tracker: TaskTracker,
}
#[derive(Clone)]
pub struct TicketbookManager {
pub(crate) state: CredentialProxyState,
pub(crate) webhook: ZkNymWebhook,
pub(crate) shutdown_tracker: ShutdownTracker,
}
impl TicketbookManager {
pub async fn new(
build_sha: &'static str,
quorum_check_interval: Duration,
deposits_buffer_size: usize,
max_concurrent_deposits: usize,
storage: CredentialProxyStorage,
mnemonic: bip39::Mnemonic,
webhook: ZkNymWebhook,
) -> Result<Self, CredentialProxyError> {
let chain_client = ChainClient::new(mnemonic)?;
let shutdown_tracker = ShutdownTracker::default();
let quorum_state_checker = QuorumStateChecker::new(
chain_client.clone(),
quorum_check_interval,
shutdown_tracker.shutdown_token.clone(),
)
.await?;
let required_deposit_cache = RequiredDepositCache::default();
let deposits_buffer = DepositsBuffer::new(
storage.clone(),
chain_client.clone(),
required_deposit_cache.clone(),
build_sha,
deposits_buffer_size,
max_concurrent_deposits,
shutdown_tracker.shutdown_token.clone(),
)
.await?;
let storage_pruner =
StoragePruner::new(shutdown_tracker.shutdown_token.clone(), storage.clone());
let this = TicketbookManager {
state: CredentialProxyState::new(
storage.clone(),
chain_client,
deposits_buffer,
EcashState::new(
required_deposit_cache,
quorum_state_checker.quorum_state_ref(),
),
),
webhook,
shutdown_tracker,
};
this.build_initial_cache().await?;
this.try_spawn_in_background(quorum_state_checker.run_forever());
this.try_spawn_in_background(storage_pruner.run_forever());
Ok(this)
}
async fn build_initial_cache(&self) -> Result<(), CredentialProxyError> {
let default_expiration = ecash_default_expiration_date();
let epoch_id = self.state.current_epoch_id().await?;
let _ = self.state.deposit_amount().await?;
let _ = self.state.master_verification_key(Some(epoch_id)).await?;
let _ = self.state.ecash_threshold(epoch_id).await?;
let _ = self.state.ecash_clients(epoch_id).await?;
let _ = self
.state
.master_coin_index_signatures(Some(epoch_id))
.await?;
let _ = self
.state
.master_expiration_date_signatures(epoch_id, default_expiration)
.await?;
Ok(())
}
pub async fn cancel_and_wait(&self) {
self.shutdown_tracker.shutdown_token.cancel();
self.state.deposits_buffer().wait_for_shutdown().await;
self.shutdown_tracker.tracker.wait().await
}
pub fn shutdown_token(&self) -> CancellationToken {
self.shutdown_tracker.shutdown_token.clone()
}
async fn ensure_global_data_cached(
&self,
epoch: EpochId,
expiration_date: Date,
) -> Result<(), CredentialProxyError> {
let _ = self.state.master_verification_key(Some(epoch)).await?;
let _ = self.state.master_coin_index_signatures(Some(epoch)).await?;
let _ = self
.state
.master_expiration_date_signatures(epoch, expiration_date)
.await?;
Ok(())
}
pub fn try_spawn_in_background<F>(&self, task: F) -> Option<JoinHandle<F::Output>>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
if self.shutdown_tracker.shutdown_token.is_cancelled() {
None
} else {
self.shutdown_tracker.tracker.reopen();
let join_handle = self.shutdown_tracker.tracker.spawn(task);
self.shutdown_tracker.tracker.close();
Some(join_handle)
}
}
}