1use crate::deposits_buffer::helpers::{BufferedDeposit, PerformedDeposits};
5use crate::error::CredentialProxyError;
6use crate::storage::manager::SqliteStorageManager;
7use crate::storage::models::{BlindedShares, MinimalWalletShare};
8use nym_compact_ecash::PublicKeyUser;
9use nym_credentials::ecash::bandwidth::serialiser::VersionedSerialise;
10use nym_credentials::{
11 AggregatedCoinIndicesSignatures, AggregatedExpirationDateSignatures, EpochVerificationKey,
12};
13use nym_validator_client::ecash::BlindedSignatureResponse;
14use nym_validator_client::nym_api::EpochId;
15use nym_validator_client::nyxd::contract_traits::ecash_query_client::DepositId;
16use sqlx::ConnectOptions;
17use sqlx::sqlite::{SqliteAutoVacuum, SqliteSynchronous};
18use std::fmt::Debug;
19use std::path::Path;
20use std::time::Duration;
21use time::{Date, OffsetDateTime};
22use tracing::log::LevelFilter;
23use tracing::{debug, error, info, instrument};
24use uuid::Uuid;
25
26mod manager;
27pub mod models;
28pub(crate) mod pruner;
29pub mod traits;
30
31type NodeId = u64;
33
34#[derive(Clone)]
35pub struct CredentialProxyStorage {
36 pub(crate) storage_manager: SqliteStorageManager,
37}
38
39impl CredentialProxyStorage {
40 #[instrument]
41 pub async fn init<P: AsRef<Path> + Debug>(
42 database_path: P,
43 ) -> Result<Self, CredentialProxyError> {
44 debug!("Attempting to connect to database");
45
46 let opts = sqlx::sqlite::SqliteConnectOptions::new()
47 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
48 .synchronous(SqliteSynchronous::Normal)
49 .auto_vacuum(SqliteAutoVacuum::Incremental)
50 .filename(database_path)
51 .create_if_missing(true)
52 .log_statements(LevelFilter::Trace)
53 .log_slow_statements(LevelFilter::Warn, Duration::from_millis(250));
54
55 let pool_opts = sqlx::sqlite::SqlitePoolOptions::new()
56 .min_connections(5)
57 .max_connections(25)
58 .acquire_timeout(Duration::from_secs(60));
59
60 let connection_pool = match pool_opts.connect_with(opts).await {
61 Ok(db) => db,
62 Err(err) => {
63 error!("Failed to connect to SQLx database: {err}");
64 return Err(err.into());
65 }
66 };
67
68 if let Err(err) = sqlx::migrate!("./migrations").run(&connection_pool).await {
69 error!("Failed to initialize SQLx database: {err}");
70 return Err(err.into());
71 }
72
73 info!("Database migration finished!");
74
75 Ok(CredentialProxyStorage {
76 storage_manager: SqliteStorageManager { connection_pool },
77 })
78 }
79
80 #[allow(dead_code)]
81 pub async fn load_blinded_shares_status_by_shares_id(
82 &self,
83 id: i64,
84 ) -> Result<Option<BlindedShares>, CredentialProxyError> {
85 Ok(self
86 .storage_manager
87 .load_blinded_shares_status_by_shares_id(id)
88 .await?)
89 }
90
91 pub async fn load_wallet_shares_by_shares_id(
92 &self,
93 id: i64,
94 ) -> Result<Vec<MinimalWalletShare>, CredentialProxyError> {
95 Ok(self
96 .storage_manager
97 .load_wallet_shares_by_shares_id(id)
98 .await?)
99 }
100
101 pub async fn load_shares_error_by_shares_id(
102 &self,
103 id: i64,
104 ) -> Result<Option<String>, CredentialProxyError> {
105 Ok(self
106 .storage_manager
107 .load_shares_error_by_device_by_shares_id(id)
108 .await?)
109 }
110
111 #[allow(dead_code)]
112 pub async fn load_blinded_shares_status_by_device_and_credential_id(
113 &self,
114 device_id: &str,
115 credential_id: &str,
116 ) -> Result<Option<BlindedShares>, CredentialProxyError> {
117 Ok(self
118 .storage_manager
119 .load_blinded_shares_status_by_device_and_credential_id(device_id, credential_id)
120 .await?)
121 }
122
123 pub async fn load_wallet_shares_by_device_and_credential_id(
124 &self,
125 device_id: &str,
126 credential_id: &str,
127 ) -> Result<Vec<MinimalWalletShare>, CredentialProxyError> {
128 Ok(self
129 .storage_manager
130 .load_wallet_shares_by_device_and_credential_id(device_id, credential_id)
131 .await?)
132 }
133
134 pub async fn load_shares_error_by_device_and_credential_id(
135 &self,
136 device_id: &str,
137 credential_id: &str,
138 ) -> Result<Option<String>, CredentialProxyError> {
139 Ok(self
140 .storage_manager
141 .load_shares_error_by_device_and_credential_id(device_id, credential_id)
142 .await?)
143 }
144
145 pub async fn insert_new_pending_async_shares_request(
146 &self,
147 request: Uuid,
148 device_id: &str,
149 credential_id: &str,
150 ) -> Result<BlindedShares, CredentialProxyError> {
151 Ok(self
152 .storage_manager
153 .insert_new_pending_async_shares_request(request.to_string(), device_id, credential_id)
154 .await?)
155 }
156
157 pub async fn update_pending_async_blinded_shares_issued(
158 &self,
159 available_shares: usize,
160 device_id: &str,
161 credential_id: &str,
162 ) -> Result<BlindedShares, CredentialProxyError> {
163 Ok(self
164 .storage_manager
165 .update_pending_async_blinded_shares_issued(
166 available_shares as i64,
167 device_id,
168 credential_id,
169 )
170 .await?)
171 }
172
173 pub async fn update_pending_async_blinded_shares_error(
174 &self,
175 available_shares: usize,
176 device_id: &str,
177 credential_id: &str,
178 error: &str,
179 ) -> Result<BlindedShares, CredentialProxyError> {
180 Ok(self
181 .storage_manager
182 .update_pending_async_blinded_shares_error(
183 available_shares as i64,
184 device_id,
185 credential_id,
186 error,
187 )
188 .await?)
189 }
190
191 pub async fn prune_old_blinded_shares(&self) -> Result<(), CredentialProxyError> {
192 let max_age = OffsetDateTime::now_utc() - time::Duration::days(31);
193
194 self.storage_manager
195 .prune_old_partial_blinded_wallets(max_age)
196 .await?;
197 self.storage_manager
198 .prune_old_partial_blinded_wallet_failures(max_age)
199 .await?;
200 self.storage_manager
201 .prune_old_blinded_shares(max_age)
202 .await?;
203 Ok(())
204 }
205
206 pub async fn insert_new_deposits(
207 &self,
208 deposits: &PerformedDeposits,
209 ) -> Result<(), CredentialProxyError> {
210 debug!("inserting {} deposits data", deposits.deposits_data.len());
211
212 self.storage_manager
213 .insert_new_deposits(deposits.to_storable())
214 .await?;
215 Ok(())
216 }
217
218 pub async fn load_unused_deposits(&self) -> Result<Vec<BufferedDeposit>, CredentialProxyError> {
219 self.storage_manager
220 .load_unused_deposits()
221 .await?
222 .into_iter()
223 .map(|deposit| deposit.try_into())
224 .collect()
225 }
226
227 pub async fn insert_deposit_usage(
228 &self,
229 deposit_id: DepositId,
230 requested_on: OffsetDateTime,
231 client_pubkey: PublicKeyUser,
232 request_uuid: Uuid,
233 ) -> Result<(), CredentialProxyError> {
234 self.storage_manager
235 .insert_deposit_usage(
236 deposit_id,
237 requested_on,
238 client_pubkey.to_bytes(),
239 request_uuid.to_string(),
240 )
241 .await?;
242 Ok(())
243 }
244
245 pub async fn insert_deposit_usage_error(
246 &self,
247 deposit_id: DepositId,
248 error: String,
249 ) -> Result<(), CredentialProxyError> {
250 self.storage_manager
251 .insert_deposit_usage_error(deposit_id, error)
252 .await?;
253 Ok(())
254 }
255
256 pub async fn insert_partial_wallet_share(
257 &self,
258 deposit_id: DepositId,
259 epoch_id: EpochId,
260 expiration_date: Date,
261 node_id: NodeId,
262 res: &Result<BlindedSignatureResponse, CredentialProxyError>,
263 ) -> Result<(), CredentialProxyError> {
264 debug!("inserting partial wallet share");
265 let now = OffsetDateTime::now_utc();
266
267 match res {
268 Ok(share) => {
269 self.storage_manager
270 .insert_partial_wallet_share(
271 deposit_id,
272 epoch_id as i64,
273 expiration_date,
274 node_id as i64,
275 now,
276 &share.blinded_signature.to_bytes(),
277 )
278 .await?;
279 }
280 Err(err) => {
281 self.storage_manager
282 .insert_partial_wallet_issuance_failure(
283 deposit_id,
284 epoch_id as i64,
285 expiration_date,
286 node_id as i64,
287 now,
288 err.to_string(),
289 )
290 .await?
291 }
292 }
293 Ok(())
294 }
295
296 pub async fn get_master_verification_key(
297 &self,
298 epoch_id: EpochId,
299 ) -> Result<Option<EpochVerificationKey>, CredentialProxyError> {
300 let Some(raw) = self
301 .storage_manager
302 .get_master_verification_key(epoch_id as i64)
303 .await?
304 else {
305 return Ok(None);
306 };
307
308 let deserialised =
309 EpochVerificationKey::try_unpack(&raw.serialised_key, raw.serialization_revision)
310 .map_err(|err| CredentialProxyError::database_inconsistency(err.to_string()))?;
311 Ok(Some(deserialised))
312 }
313
314 pub async fn insert_master_verification_key(
315 &self,
316 key: &EpochVerificationKey,
317 ) -> Result<(), CredentialProxyError> {
318 let packed = key.pack();
319 Ok(self
320 .storage_manager
321 .insert_master_verification_key(packed.revision, key.epoch_id as i64, &packed.data)
322 .await?)
323 }
324
325 pub async fn get_master_coin_index_signatures(
326 &self,
327 epoch_id: EpochId,
328 ) -> Result<Option<AggregatedCoinIndicesSignatures>, CredentialProxyError> {
329 let Some(raw) = self
330 .storage_manager
331 .get_master_coin_index_signatures(epoch_id as i64)
332 .await?
333 else {
334 return Ok(None);
335 };
336
337 let deserialised = AggregatedCoinIndicesSignatures::try_unpack(
338 &raw.serialised_signatures,
339 raw.serialization_revision,
340 )
341 .map_err(|err| CredentialProxyError::database_inconsistency(err.to_string()))?;
342 Ok(Some(deserialised))
343 }
344
345 pub async fn insert_master_coin_index_signatures(
346 &self,
347 signatures: &AggregatedCoinIndicesSignatures,
348 ) -> Result<(), CredentialProxyError> {
349 let packed = signatures.pack();
350 self.storage_manager
351 .insert_master_coin_index_signatures(
352 packed.revision,
353 signatures.epoch_id as i64,
354 &packed.data,
355 )
356 .await?;
357 Ok(())
358 }
359
360 pub async fn get_master_expiration_date_signatures(
361 &self,
362 expiration_date: Date,
363 epoch_id: EpochId,
364 ) -> Result<Option<AggregatedExpirationDateSignatures>, CredentialProxyError> {
365 let Some(raw) = self
366 .storage_manager
367 .get_master_expiration_date_signatures(expiration_date, epoch_id as i64)
368 .await?
369 else {
370 return Ok(None);
371 };
372
373 let deserialised = AggregatedExpirationDateSignatures::try_unpack(
374 &raw.serialised_signatures,
375 raw.serialization_revision,
376 )
377 .map_err(|err| CredentialProxyError::database_inconsistency(err.to_string()))?;
378 Ok(Some(deserialised))
379 }
380
381 pub async fn insert_master_expiration_date_signatures(
382 &self,
383 signatures: &AggregatedExpirationDateSignatures,
384 ) -> Result<(), CredentialProxyError> {
385 let packed = signatures.pack();
386 self.storage_manager
387 .insert_master_expiration_date_signatures(
388 packed.revision,
389 signatures.epoch_id as i64,
390 signatures.expiration_date,
391 &packed.data,
392 )
393 .await?;
394 Ok(())
395 }
396}
397
398#[allow(clippy::expect_used)]
399#[allow(clippy::unwrap_used)]
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use crate::helpers::random_uuid;
404 use crate::storage::models::BlindedSharesStatus;
405 use nym_compact_ecash::scheme::keygen::KeyPairUser;
406 use nym_crypto::asymmetric::ed25519;
407 use nym_validator_client::nyxd::{Coin, Hash};
408 use rand::RngCore;
409 use rand::rngs::OsRng;
410 use std::ops::Deref;
411 use tempfile::{NamedTempFile, TempPath};
412
413 struct StorageTestWrapper {
415 inner: CredentialProxyStorage,
416 _path: TempPath,
417 }
418
419 impl StorageTestWrapper {
420 async fn new() -> anyhow::Result<Self> {
421 let file = NamedTempFile::new()?;
422 let path = file.into_temp_path();
423
424 println!("Creating database at {path:?}...");
425
426 Ok(StorageTestWrapper {
427 inner: CredentialProxyStorage::init(&path).await?,
428 _path: path,
429 })
430 }
431
432 async fn insert_dummy_used_deposit(&self, uuid: Uuid) -> anyhow::Result<DepositId> {
433 let mut rng = OsRng;
434 let deposit_id = rng.next_u32();
435 let tx_hash = Hash::Sha256(Default::default());
436 let requested_on = OffsetDateTime::now_utc();
437 let deposit_amount = Coin::new(1, "ufoomp");
438 let client_keypair = KeyPairUser::new();
439 let client_ecash_pubkey = &client_keypair.public_key();
440
441 let deposit_key = ed25519::PrivateKey::new(&mut rng);
442
443 self.inner
444 .insert_new_deposits(&PerformedDeposits {
445 deposits_data: vec![BufferedDeposit {
446 deposit_id,
447 ed25519_private_key: deposit_key,
448 }],
449 tx_hash,
450 requested_on,
451 deposit_amount,
452 })
453 .await?;
454 self.inner
455 .insert_deposit_usage(deposit_id, requested_on, *client_ecash_pubkey, uuid)
456 .await?;
457
458 Ok(deposit_id)
459 }
460 }
461
462 impl Deref for StorageTestWrapper {
463 type Target = CredentialProxyStorage;
464 fn deref(&self) -> &Self::Target {
465 &self.inner
466 }
467 }
468
469 async fn get_storage() -> anyhow::Result<StorageTestWrapper> {
470 StorageTestWrapper::new().await
471 }
472
473 #[tokio::test]
474 async fn test_creation() -> anyhow::Result<()> {
475 let storage = get_storage().await;
476 assert!(storage.is_ok());
477
478 Ok(())
479 }
480
481 #[tokio::test]
482 async fn test_add() -> anyhow::Result<()> {
483 let storage = get_storage().await?;
484
485 let dummy_uuid = random_uuid();
486 println!("🚀 insert_pending_blinded_share...");
487
488 storage.insert_dummy_used_deposit(dummy_uuid).await?;
489 let res = storage
490 .insert_new_pending_async_shares_request(dummy_uuid, "1234", "1234")
491 .await;
492 if let Err(e) = &res {
493 println!("❌ {e}");
494 }
495 assert!(res.is_ok());
496 let res = res?;
497 println!("res = {res:?}");
498 assert_eq!(res.status, BlindedSharesStatus::Pending);
499
500 println!("🚀 update_pending_blinded_share_error...");
501 let res = storage
502 .update_pending_async_blinded_shares_error(0, "1234", "1234", "this is an error")
503 .await;
504 if let Err(e) = &res {
505 println!("❌ {e}");
506 }
507 assert!(res.is_ok());
508 let res = res?;
509 println!("res = {res:?}");
510 assert!(res.error_message.is_some());
511 assert_eq!(res.status, BlindedSharesStatus::Error);
512
513 println!("🚀 update_pending_blinded_share_data...");
514 let res = storage
515 .update_pending_async_blinded_shares_issued(42, "1234", "1234")
516 .await;
517 if let Err(e) = &res {
518 println!("❌ {e}");
519 }
520 assert!(res.is_ok());
521 let res = res?;
522 println!("res = {res:?}");
523 assert_eq!(res.status, BlindedSharesStatus::Issued);
524 assert!(res.error_message.is_none());
525
526 Ok(())
527 }
528}