use crate::error::CredentialProxyError;
use crate::storage::models::BlindedShares;
use crate::ticketbook_manager::TicketbookManager;
use futures::{StreamExt, stream};
use nym_compact_ecash::Base58;
use nym_credential_proxy_requests::api::v1::ticketbook::models::{
TicketbookAsyncRequest, TicketbookObtainParams, TicketbookRequest,
TicketbookWalletSharesResponse, WalletShare, WebhookTicketbookWalletShares,
WebhookTicketbookWalletSharesRequest,
};
use nym_validator_client::client::NymApiClientExt;
use nym_validator_client::ecash::BlindSignRequestBody;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tracing::{debug, error, info, instrument};
use uuid::Uuid;
impl TicketbookManager {
#[instrument(
skip(self, request_data, request, requested_on),
fields(
expiration_date = %request_data.expiration_date,
ticketbook_type = %request_data.ticketbook_type
)
)]
pub async fn try_obtain_wallet_shares(
&self,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookRequest,
) -> Result<Vec<WalletShare>, CredentialProxyError> {
if !self.state.ecash_state().quorum_state.available() {
return Err(CredentialProxyError::UnavailableSigningQuorum);
}
let epoch = self.state.current_epoch_id().await?;
let threshold = self.state.ecash_threshold(epoch).await?;
let expiration_date = request_data.expiration_date;
self.ensure_global_data_cached(epoch, expiration_date)
.await?;
let ecash_api_clients = self.state.ecash_clients(epoch).await?.clone();
let deposit_data = self
.state
.get_deposit(request, requested_on, request_data.ecash_pubkey)
.await?;
let deposit_id = deposit_data.deposit_id;
let signature = deposit_data.sign_ticketbook_plaintext(&request_data.withdrawal_request);
let credential_request = BlindSignRequestBody::new(
request_data.withdrawal_request.into(),
deposit_id,
signature,
request_data.ecash_pubkey,
request_data.expiration_date,
request_data.ticketbook_type,
);
let wallet_shares = Arc::new(Mutex::new(HashMap::new()));
info!("attempting to contract all nym-apis for the partial wallets...");
stream::iter(ecash_api_clients)
.for_each_concurrent(None, |client| async {
let client = client;
debug!("contacting {client} for blinded partial wallet");
let res = timeout(
Duration::from_secs(5),
client.api_client.blind_sign(&credential_request),
)
.await
.map_err(|_| CredentialProxyError::EcashApiRequestTimeout {
client_repr: client.to_string(),
})
.and_then(|res| res.map_err(Into::into));
if let Err(err) = self
.state
.storage()
.insert_partial_wallet_share(
deposit_id,
epoch,
expiration_date,
client.node_id,
&res,
)
.await
{
error!("failed to persist issued partial share: {err}")
}
match res {
Ok(share) => {
wallet_shares
.lock()
.await
.insert(client.node_id, share.blinded_signature);
}
Err(err) => {
error!("failed to obtain partial blinded wallet share from {client}: {err}")
}
}
})
.await;
#[allow(clippy::unwrap_used)]
let wallet_shares = Arc::into_inner(wallet_shares).unwrap().into_inner();
let shares = wallet_shares.len();
if shares < threshold as usize {
let err = CredentialProxyError::InsufficientNumberOfCredentials {
available: shares,
threshold,
};
self.state
.insert_deposit_usage_error(deposit_id, err.to_string())
.await;
return Err(err);
}
Ok(wallet_shares
.into_iter()
.map(|(node_index, share)| WalletShare {
node_index,
bs58_encoded_share: share.to_bs58(),
})
.collect())
}
pub async fn try_obtain_wallet_shares_async(
&self,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookRequest,
device_id: &str,
credential_id: &str,
) -> Result<Vec<WalletShare>, CredentialProxyError> {
let shares = match self
.try_obtain_wallet_shares(request, requested_on, request_data)
.await
{
Ok(shares) => shares,
Err(err) => {
let obtained = match err {
CredentialProxyError::InsufficientNumberOfCredentials { available, .. } => {
available
}
_ => 0,
};
if let Err(err) = self
.state
.storage()
.update_pending_async_blinded_shares_error(
obtained,
device_id,
credential_id,
&err.to_string(),
)
.await
{
error!("failed to update database with the error information: {err}")
}
return Err(err);
}
};
Ok(shares)
}
async fn try_obtain_blinded_ticketbook_async_inner(
&self,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookAsyncRequest,
params: TicketbookObtainParams,
pending: &BlindedShares,
) -> Result<(), CredentialProxyError> {
let epoch_id = self.state.current_epoch_id().await?;
let device_id = &request_data.device_id;
let credential_id = &request_data.credential_id;
let secret = request_data.secret.clone();
let (
master_verification_key,
aggregated_expiration_date_signatures,
aggregated_coin_index_signatures,
) = self
.state
.global_data(params.global, epoch_id, request_data.inner.expiration_date)
.await?;
let shares = self
.try_obtain_wallet_shares_async(
request,
requested_on,
request_data.inner,
device_id,
credential_id,
)
.await?;
if let Err(err) = self
.state
.storage()
.update_pending_async_blinded_shares_issued(shares.len(), device_id, credential_id)
.await
{
error!(uuid = %request, "failed to update db with issued information: {err}")
}
let data = Some(TicketbookWalletSharesResponse {
epoch_id,
shares,
master_verification_key,
aggregated_coin_index_signatures,
aggregated_expiration_date_signatures,
});
let ticketbook_wallet_shares = WebhookTicketbookWalletShares {
id: pending.id,
status: pending.status.to_string(),
device_id: device_id.clone(),
credential_id: credential_id.clone(),
data,
error_message: None,
created: pending.created,
updated: pending.updated,
};
let webhook_request = WebhookTicketbookWalletSharesRequest {
ticketbook_wallet_shares,
secret,
};
self.webhook.try_trigger(request, &webhook_request).await;
Ok(())
}
async fn try_trigger_webhook_request_for_error(
&self,
request: Uuid,
request_data: TicketbookAsyncRequest,
pending: &BlindedShares,
error_message: String,
) -> Result<(), CredentialProxyError> {
let device_id = &request_data.device_id;
let credential_id = &request_data.credential_id;
let secret = request_data.secret.clone();
let ticketbook_wallet_shares = WebhookTicketbookWalletShares {
id: pending.id,
status: "error".to_string(),
device_id: device_id.clone(),
credential_id: credential_id.clone(),
data: None,
error_message: Some(error_message),
created: pending.created,
updated: pending.updated,
};
let webhook_request = WebhookTicketbookWalletSharesRequest {
ticketbook_wallet_shares,
secret,
};
self.webhook.try_trigger(request, &webhook_request).await;
Ok(())
}
#[instrument(
skip_all,
fields(
credential_id = %request_data.credential_id,
device_id = %request_data.device_id)
)
]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn try_obtain_blinded_ticketbook_async(
&self,
request: Uuid,
requested_on: OffsetDateTime,
request_data: TicketbookAsyncRequest,
params: TicketbookObtainParams,
pending: BlindedShares,
) {
let skip_webhook = params.skip_webhook;
if let Err(err) = self
.try_obtain_blinded_ticketbook_async_inner(
request,
requested_on,
request_data.clone(),
params,
&pending,
)
.await
{
if skip_webhook {
info!(uuid = %request,"the webhook is not going to be called for this request");
return;
}
if let Err(webhook_err) = self
.try_trigger_webhook_request_for_error(
request,
request_data,
&pending,
format!("Failed to get ticketbook: {err}"),
)
.await
{
error!(uuid = %request, "failed to make webhook request to report error: {webhook_err}")
}
error!(uuid = %request, "failed to resolve the blinded ticketbook issuance: {err}")
} else {
info!(uuid = %request, "managed to resolve the blinded ticketbook issuance")
}
}
}