use crate::deposits_buffer::refill_task::RefillTask;
use crate::error::CredentialProxyError;
use crate::shared_state::nyxd_client::ChainClient;
use crate::shared_state::required_deposit_cache::RequiredDepositCache;
use crate::storage::CredentialProxyStorage;
use nym_compact_ecash::PublicKeyUser;
use nym_ecash_contract_common::deposit::DepositId;
use nym_validator_client::nyxd::Coin;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex as AsyncMutex;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, warn};
use uuid::Uuid;
pub use helpers::{BufferedDeposit, PerformedDeposits, make_deposits_request, split_deposits};
pub(crate) mod helpers;
mod refill_task;
const DEPOSITS_THRESHOLD_P: f32 = 0.1;
struct DepositsBufferInner {
client: ChainClient,
required_deposit_cache: RequiredDepositCache,
storage: CredentialProxyStorage,
target_amount: usize,
max_concurrent_deposits: usize,
unused_deposits: AsyncMutex<Vec<BufferedDeposit>>,
deposits_refill_task: RefillTask,
short_sha: &'static str,
cancellation_token: CancellationToken,
}
#[derive(Clone)]
pub struct DepositsBuffer {
inner: Arc<DepositsBufferInner>,
}
impl DepositsBuffer {
pub async fn new(
storage: CredentialProxyStorage,
client: ChainClient,
required_deposit_cache: RequiredDepositCache,
short_sha: &'static str,
target_amount: usize,
max_concurrent_deposits: usize,
cancellation_token: CancellationToken,
) -> Result<Self, CredentialProxyError> {
let unused_deposits = storage.load_unused_deposits().await?;
info!("managed to load {} deposits", unused_deposits.len());
Ok(DepositsBuffer {
inner: Arc::new(DepositsBufferInner {
client,
required_deposit_cache,
storage,
target_amount,
max_concurrent_deposits,
unused_deposits: AsyncMutex::new(unused_deposits),
deposits_refill_task: RefillTask::default(),
short_sha,
cancellation_token,
}),
})
}
async fn deposit_amount(&self) -> Result<Coin, CredentialProxyError> {
self.inner
.required_deposit_cache
.get_or_update(&self.inner.client)
.await
}
#[instrument(skip(self), err(Display))]
async fn make_deposits_request(
&self,
amount: usize,
) -> Result<PerformedDeposits, CredentialProxyError> {
let memo = format!(
"credential-proxy-{}: performing {amount} deposits",
self.inner.short_sha
);
let deposit_amount = self.deposit_amount().await?;
make_deposits_request(
&self.inner.client,
deposit_amount,
&memo,
amount,
&self.inner.cancellation_token,
)
.await
}
async fn insert_new_deposits(
&self,
mut deposits: PerformedDeposits,
) -> Result<(), CredentialProxyError> {
self.inner.storage.insert_new_deposits(&deposits).await?;
self.inner
.unused_deposits
.lock()
.await
.append(&mut deposits.deposits_data);
Ok(())
}
async fn refill_deposits(&self) -> Result<(), CredentialProxyError> {
let available = self.inner.unused_deposits.lock().await.len();
let target = self.deposits_upper_threshold();
let to_request = target - available;
for request_chunk in split_deposits(to_request, self.inner.max_concurrent_deposits) {
if self.inner.cancellation_token.is_cancelled() {
info!("received cancellation during deposits refilling");
return Ok(());
}
let deposits = self.make_deposits_request(request_chunk).await?;
self.insert_new_deposits(deposits).await?;
}
Ok(())
}
fn maybe_refill_deposits(&self) {
if let Some(mut guard) = self.inner.deposits_refill_task.try_get_new_task_guard() {
let this = self.clone();
*guard = Some(tokio::spawn(async move { this.refill_deposits().await }));
}
}
fn deposits_lower_threshold(&self) -> usize {
self.inner.target_amount - (self.inner.target_amount as f32 * DEPOSITS_THRESHOLD_P) as usize
}
fn deposits_upper_threshold(&self) -> usize {
self.inner.target_amount + (self.inner.target_amount as f32 * DEPOSITS_THRESHOLD_P) as usize
}
async fn mark_deposit_as_used(
&self,
deposit_id: DepositId,
requested_on: OffsetDateTime,
client_pubkey: PublicKeyUser,
request_uuid: Uuid,
) -> Result<(), CredentialProxyError> {
self.inner
.storage
.insert_deposit_usage(deposit_id, requested_on, client_pubkey, request_uuid)
.await
}
async fn wait_for_deposit(
&self,
request_uuid: Uuid,
requested_on: OffsetDateTime,
client_pubkey: PublicKeyUser,
) -> Result<BufferedDeposit, CredentialProxyError> {
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
if let Some(buffered_deposit) = self.inner.unused_deposits.lock().await.pop() {
self.mark_deposit_as_used(
buffered_deposit.deposit_id,
requested_on,
client_pubkey,
request_uuid,
)
.await?;
return Ok(buffered_deposit);
} else {
self.maybe_refill_deposits()
}
}
}
pub async fn get_valid_deposit(
&self,
request_uuid: Uuid,
requested_on: OffsetDateTime,
client_pubkey: PublicKeyUser,
) -> Result<BufferedDeposit, CredentialProxyError> {
let mut deposits_guard = self.inner.unused_deposits.lock().await;
let deposits_available = deposits_guard.len();
debug!("we have {deposits_available} unused deposits available");
let maybe_deposit = deposits_guard.pop();
drop(deposits_guard);
if deposits_available < self.deposits_lower_threshold() {
self.maybe_refill_deposits()
}
match maybe_deposit {
None => {
warn!(
"we currently don't have any usable deposits! are we using them up faster than we request them?"
);
self.wait_for_deposit(request_uuid, requested_on, client_pubkey)
.await
}
Some(buffered_deposit) => {
self.mark_deposit_as_used(
buffered_deposit.deposit_id,
requested_on,
client_pubkey,
request_uuid,
)
.await?;
Ok(buffered_deposit)
}
}
}
pub async fn wait_for_shutdown(&self) {
let task_handle = self.inner.deposits_refill_task.take_task_join_handle();
if let Some(task_handle) = task_handle {
if !task_handle.is_finished() {
info!(
"the deposit refill task is currently in progress - waiting for the current transaction to finish before concluding shutdown"
);
let _ = task_handle.await;
}
}
}
}
impl DepositsBufferInner {
}