use std::sync::Arc;
use async_trait::async_trait;
use base64::Engine as _;
use bsv::wallet::interfaces::{
AbortActionArgs, AbortActionResult, ListActionsArgs, ListActionsResult, ListCertificatesArgs,
ListCertificatesResult, ListOutputsArgs, ListOutputsResult, RelinquishCertificateArgs,
RelinquishOutputArgs,
};
use crate::error::{WalletError, WalletResult};
use crate::services::traits::WalletServices;
use crate::status::SyncStatus;
use crate::status::TransactionStatus;
use crate::storage::action_traits::StorageActionProvider;
use crate::storage::action_types::{
StorageCreateActionArgs, StorageCreateActionResult, StorageInternalizeActionArgs,
StorageInternalizeActionResult, StorageProcessActionArgs, StorageProcessActionResult,
};
use crate::storage::find_args::{
CertificatePartial, FindCertificateFieldsArgs, FindCertificatesArgs, FindMonitorEventsArgs,
FindOutputBasketsArgs, FindOutputsArgs, FindProvenTxReqsArgs, FindProvenTxsArgs,
FindSettingsArgs, FindSyncStatesArgs, FindTransactionsArgs, OutputPartial, ProvenTxPartial,
ProvenTxReqPartial, PurgeParams, SettingsPartial, SyncStatePartial, TransactionPartial,
UserPartial,
};
use crate::storage::sync::get_sync_chunk::{GetSyncChunkArgs, SyncChunkOffsets};
use crate::storage::sync::request_args::{RequestSyncChunkArgs, SyncChunkOffset};
use crate::storage::sync::sync_map::SyncMap;
use crate::storage::sync::{ProcessSyncChunkResult, SyncChunk};
use crate::storage::traits::provider::StorageProvider;
use crate::storage::traits::reader::StorageReader;
use crate::storage::traits::reader_writer::StorageReaderWriter;
use crate::storage::TrxToken;
use crate::storage::{verify_one, verify_one_or_none};
use crate::tables::{
Certificate, CertificateField, MonitorEvent, Output, OutputBasket, ProvenTx, ProvenTxReq,
Settings, SyncState, Transaction, User,
};
use crate::wallet::types::{AdminStatsResult, AuthId};
#[async_trait]
pub trait WalletStorageProvider: Send + Sync {
fn is_storage_provider(&self) -> bool {
true
}
fn get_endpoint_url(&self) -> Option<String> {
None
}
async fn get_services(&self) -> WalletResult<Arc<dyn WalletServices>> {
Err(WalletError::NotImplemented(
"services not available on storage providers".into(),
))
}
async fn set_services(&self, _services: Arc<dyn WalletServices>) -> WalletResult<()> {
Ok(())
}
fn is_available(&self) -> bool;
async fn get_settings(&self) -> WalletResult<Settings>;
async fn find_certificates_auth(
&self,
auth: &AuthId,
args: &FindCertificatesArgs,
) -> WalletResult<Vec<Certificate>>;
async fn find_output_baskets_auth(
&self,
auth: &AuthId,
args: &FindOutputBasketsArgs,
) -> WalletResult<Vec<OutputBasket>>;
async fn find_outputs_auth(
&self,
auth: &AuthId,
args: &FindOutputsArgs,
) -> WalletResult<Vec<Output>>;
async fn find_proven_tx_reqs(
&self,
args: &FindProvenTxReqsArgs,
) -> WalletResult<Vec<ProvenTxReq>>;
async fn list_actions(
&self,
auth: &AuthId,
args: &ListActionsArgs,
) -> WalletResult<ListActionsResult>;
async fn list_certificates(
&self,
auth: &AuthId,
args: &ListCertificatesArgs,
) -> WalletResult<ListCertificatesResult>;
async fn list_outputs(
&self,
auth: &AuthId,
args: &ListOutputsArgs,
) -> WalletResult<ListOutputsResult>;
async fn make_available(&self) -> WalletResult<Settings>;
async fn migrate(&self, storage_name: &str, storage_identity_key: &str)
-> WalletResult<String>;
async fn destroy(&self) -> WalletResult<()>;
async fn find_or_insert_user(&self, identity_key: &str) -> WalletResult<(User, bool)>;
async fn abort_action(
&self,
auth: &AuthId,
args: &AbortActionArgs,
) -> WalletResult<AbortActionResult>;
async fn create_action(
&self,
auth: &AuthId,
args: &StorageCreateActionArgs,
) -> WalletResult<StorageCreateActionResult>;
async fn process_action(
&self,
auth: &AuthId,
args: &StorageProcessActionArgs,
) -> WalletResult<StorageProcessActionResult>;
async fn internalize_action(
&self,
auth: &AuthId,
args: &StorageInternalizeActionArgs,
services: &dyn WalletServices,
) -> WalletResult<StorageInternalizeActionResult>;
async fn insert_certificate_auth(
&self,
auth: &AuthId,
certificate: &Certificate,
) -> WalletResult<i64>;
async fn relinquish_certificate(
&self,
auth: &AuthId,
args: &RelinquishCertificateArgs,
) -> WalletResult<i64>;
async fn relinquish_output(
&self,
auth: &AuthId,
args: &RelinquishOutputArgs,
) -> WalletResult<i64>;
async fn find_or_insert_sync_state_auth(
&self,
auth: &AuthId,
storage_identity_key: &str,
storage_name: &str,
) -> WalletResult<(SyncState, bool)>;
async fn set_active(
&self,
auth: &AuthId,
new_active_storage_identity_key: &str,
) -> WalletResult<i64>;
async fn get_sync_chunk(&self, args: &RequestSyncChunkArgs) -> WalletResult<SyncChunk>;
async fn process_sync_chunk(
&self,
args: &RequestSyncChunkArgs,
chunk: &SyncChunk,
) -> WalletResult<ProcessSyncChunkResult>;
async fn find_user_by_identity_key(&self, key: &str) -> WalletResult<Option<User>> {
let _ = key;
Err(WalletError::NotImplemented(
"find_user_by_identity_key".into(),
))
}
async fn find_certificates_storage(
&self,
args: &FindCertificatesArgs,
) -> WalletResult<Vec<Certificate>> {
let _ = args;
Err(WalletError::NotImplemented(
"find_certificates_storage".into(),
))
}
async fn find_certificate_fields(
&self,
args: &FindCertificateFieldsArgs,
) -> WalletResult<Vec<CertificateField>> {
let _ = args;
Err(WalletError::NotImplemented(
"find_certificate_fields".into(),
))
}
async fn find_outputs_storage(&self, args: &FindOutputsArgs) -> WalletResult<Vec<Output>> {
let _ = args;
Err(WalletError::NotImplemented("find_outputs_storage".into()))
}
async fn find_outputs(&self, args: &FindOutputsArgs) -> WalletResult<Vec<Output>> {
self.find_outputs_storage(args).await
}
async fn update_output(&self, id: i64, update: &OutputPartial) -> WalletResult<i64> {
let _ = (id, update);
Err(WalletError::NotImplemented("update_output".into()))
}
async fn insert_certificate_storage(&self, cert: &Certificate) -> WalletResult<i64> {
let _ = cert;
Err(WalletError::NotImplemented(
"insert_certificate_storage".into(),
))
}
async fn insert_certificate_field_storage(&self, field: &CertificateField) -> WalletResult<()> {
let _ = field;
Err(WalletError::NotImplemented(
"insert_certificate_field_storage".into(),
))
}
async fn find_settings_storage(&self, args: &FindSettingsArgs) -> WalletResult<Vec<Settings>> {
let _ = args;
Err(WalletError::NotImplemented("find_settings_storage".into()))
}
async fn update_settings_storage(&self, update: &SettingsPartial) -> WalletResult<i64> {
let _ = update;
Err(WalletError::NotImplemented(
"update_settings_storage".into(),
))
}
async fn find_proven_txs(&self, _args: &FindProvenTxsArgs) -> WalletResult<Vec<ProvenTx>> {
Err(WalletError::NotImplemented("find_proven_txs".into()))
}
async fn find_transactions(
&self,
_args: &FindTransactionsArgs,
) -> WalletResult<Vec<Transaction>> {
Err(WalletError::NotImplemented("find_transactions".into()))
}
async fn update_proven_tx_req(
&self,
id: i64,
update: &ProvenTxReqPartial,
) -> WalletResult<i64> {
let _ = (id, update);
Err(WalletError::NotImplemented("update_proven_tx_req".into()))
}
async fn update_proven_tx(&self, id: i64, update: &ProvenTxPartial) -> WalletResult<i64> {
let _ = (id, update);
Err(WalletError::NotImplemented("update_proven_tx".into()))
}
async fn update_transaction(&self, id: i64, update: &TransactionPartial) -> WalletResult<i64> {
let _ = (id, update);
Err(WalletError::NotImplemented("update_transaction".into()))
}
async fn update_transaction_status(
&self,
txid: &str,
new_status: TransactionStatus,
) -> WalletResult<()> {
let _ = (txid, new_status);
Err(WalletError::NotImplemented(
"update_transaction_status".into(),
))
}
async fn update_proven_tx_req_with_new_proven_tx(
&self,
req_id: i64,
proven_tx: &ProvenTx,
) -> WalletResult<i64> {
let _ = (req_id, proven_tx);
Err(WalletError::NotImplemented(
"update_proven_tx_req_with_new_proven_tx".into(),
))
}
async fn find_output_baskets(
&self,
args: &FindOutputBasketsArgs,
) -> WalletResult<Vec<OutputBasket>> {
let _ = args;
Err(WalletError::NotImplemented("find_output_baskets".into()))
}
async fn insert_output_basket(&self, basket: &OutputBasket) -> WalletResult<i64> {
let _ = basket;
Err(WalletError::NotImplemented("insert_output_basket".into()))
}
async fn insert_transaction(&self, tx: &Transaction) -> WalletResult<i64> {
let _ = tx;
Err(WalletError::NotImplemented("insert_transaction".into()))
}
async fn insert_output(&self, output: &Output) -> WalletResult<i64> {
let _ = output;
Err(WalletError::NotImplemented("insert_output".into()))
}
async fn insert_monitor_event(&self, event: &MonitorEvent) -> WalletResult<i64> {
let _ = event;
Err(WalletError::NotImplemented("insert_monitor_event".into()))
}
async fn delete_monitor_events_before_id(
&self,
event_name: &str,
before_id: i64,
) -> WalletResult<u64> {
let _ = (event_name, before_id);
Err(WalletError::NotImplemented(
"delete_monitor_events_before_id".into(),
))
}
async fn find_monitor_events(
&self,
args: &FindMonitorEventsArgs,
) -> WalletResult<Vec<MonitorEvent>> {
let _ = args;
Err(WalletError::NotImplemented("find_monitor_events".into()))
}
async fn count_monitor_events(&self, args: &FindMonitorEventsArgs) -> WalletResult<i64> {
let _ = args;
Err(WalletError::NotImplemented("count_monitor_events".into()))
}
async fn admin_stats(&self, auth_id: &str) -> WalletResult<AdminStatsResult> {
let _ = auth_id;
Err(WalletError::NotImplemented("admin_stats".into()))
}
async fn purge_data(&self, params: &PurgeParams) -> WalletResult<String> {
let _ = params;
Err(WalletError::NotImplemented("purge_data".into()))
}
async fn review_status(&self, aged_limit: chrono::NaiveDateTime) -> WalletResult<String> {
let _ = aged_limit;
Err(WalletError::NotImplemented("review_status".into()))
}
async fn get_storage_identity_key(&self) -> WalletResult<String> {
let settings = self.get_settings().await?;
Ok(settings.storage_identity_key)
}
async fn begin_transaction(&self) -> WalletResult<TrxToken> {
Err(WalletError::NotImplemented("begin_transaction".into()))
}
async fn commit_transaction(&self, trx: TrxToken) -> WalletResult<()> {
let _ = trx;
Err(WalletError::NotImplemented("commit_transaction".into()))
}
async fn rollback_transaction(&self, trx: TrxToken) -> WalletResult<()> {
let _ = trx;
Err(WalletError::NotImplemented("rollback_transaction".into()))
}
async fn find_outputs_trx(
&self,
args: &FindOutputsArgs,
trx: Option<&TrxToken>,
) -> WalletResult<Vec<Output>> {
let _ = (args, trx);
Err(WalletError::NotImplemented("find_outputs_trx".into()))
}
async fn find_transactions_trx(
&self,
args: &FindTransactionsArgs,
trx: Option<&TrxToken>,
) -> WalletResult<Vec<Transaction>> {
let _ = (args, trx);
Err(WalletError::NotImplemented("find_transactions_trx".into()))
}
async fn find_proven_tx_reqs_trx(
&self,
args: &FindProvenTxReqsArgs,
trx: Option<&TrxToken>,
) -> WalletResult<Vec<ProvenTxReq>> {
let _ = (args, trx);
Err(WalletError::NotImplemented(
"find_proven_tx_reqs_trx".into(),
))
}
async fn update_output_trx(
&self,
id: i64,
update: &OutputPartial,
trx: Option<&TrxToken>,
) -> WalletResult<i64> {
let _ = (id, update, trx);
Err(WalletError::NotImplemented("update_output_trx".into()))
}
async fn update_proven_tx_req_trx(
&self,
id: i64,
update: &ProvenTxReqPartial,
trx: Option<&TrxToken>,
) -> WalletResult<i64> {
let _ = (id, update, trx);
Err(WalletError::NotImplemented(
"update_proven_tx_req_trx".into(),
))
}
async fn update_transaction_status_trx(
&self,
txid: &str,
new_status: TransactionStatus,
trx: Option<&TrxToken>,
) -> WalletResult<()> {
let _ = (txid, new_status, trx);
Err(WalletError::NotImplemented(
"update_transaction_status_trx".into(),
))
}
async fn restore_consumed_inputs_trx(
&self,
tx_id: i64,
trx: Option<&TrxToken>,
) -> WalletResult<u64> {
let _ = (tx_id, trx);
Err(WalletError::NotImplemented(
"restore_consumed_inputs_trx".into(),
))
}
}
#[async_trait]
impl<T: StorageProvider> WalletStorageProvider for T {
fn is_storage_provider(&self) -> bool {
true
}
fn is_available(&self) -> bool {
StorageProvider::is_available(self)
}
async fn get_settings(&self) -> WalletResult<Settings> {
StorageProvider::get_settings(self, None).await
}
async fn make_available(&self) -> WalletResult<Settings> {
StorageProvider::make_available(self).await
}
async fn migrate(
&self,
_storage_name: &str,
_storage_identity_key: &str,
) -> WalletResult<String> {
StorageProvider::migrate_database(self).await
}
async fn destroy(&self) -> WalletResult<()> {
StorageProvider::destroy(self).await
}
async fn find_or_insert_user(&self, identity_key: &str) -> WalletResult<(User, bool)> {
StorageReaderWriter::find_or_insert_user(self, identity_key, None).await
}
async fn find_certificates_auth(
&self,
auth: &AuthId,
args: &FindCertificatesArgs,
) -> WalletResult<Vec<Certificate>> {
let (user, _) =
StorageReaderWriter::find_or_insert_user(self, &auth.identity_key, None).await?;
let mut scoped_args = FindCertificatesArgs {
partial: CertificatePartial {
user_id: Some(user.user_id),
..Default::default()
},
since: args.since,
paged: args.paged.clone(),
};
if scoped_args.partial.cert_type.is_none() {
scoped_args.partial.cert_type = args.partial.cert_type.clone();
}
if scoped_args.partial.serial_number.is_none() {
scoped_args.partial.serial_number = args.partial.serial_number.clone();
}
if scoped_args.partial.certifier.is_none() {
scoped_args.partial.certifier = args.partial.certifier.clone();
}
if scoped_args.partial.subject.is_none() {
scoped_args.partial.subject = args.partial.subject.clone();
}
if scoped_args.partial.is_deleted.is_none() {
scoped_args.partial.is_deleted = args.partial.is_deleted;
}
StorageReader::find_certificates(self, &scoped_args, None).await
}
async fn find_output_baskets_auth(
&self,
auth: &AuthId,
args: &FindOutputBasketsArgs,
) -> WalletResult<Vec<OutputBasket>> {
let _ = auth; StorageReader::find_output_baskets(self, args, None).await
}
async fn find_outputs_auth(
&self,
auth: &AuthId,
args: &FindOutputsArgs,
) -> WalletResult<Vec<Output>> {
let _ = auth; StorageReader::find_outputs(self, args, None).await
}
async fn find_proven_tx_reqs(
&self,
args: &FindProvenTxReqsArgs,
) -> WalletResult<Vec<ProvenTxReq>> {
StorageReader::find_proven_tx_reqs(self, args, None).await
}
async fn list_actions(
&self,
auth: &AuthId,
args: &ListActionsArgs,
) -> WalletResult<ListActionsResult> {
let (user, _) =
StorageReaderWriter::find_or_insert_user(self, &auth.identity_key, None).await?;
crate::storage::methods::list_actions::list_actions(
self as &dyn StorageReader,
&auth.identity_key,
user.user_id,
args,
None,
)
.await
}
async fn list_certificates(
&self,
auth: &AuthId,
args: &ListCertificatesArgs,
) -> WalletResult<ListCertificatesResult> {
let (user, _) =
StorageReaderWriter::find_or_insert_user(self, &auth.identity_key, None).await?;
crate::storage::methods::list_certificates::list_certificates(
self as &dyn StorageReader,
&auth.identity_key,
user.user_id,
args,
None,
)
.await
}
async fn list_outputs(
&self,
auth: &AuthId,
args: &ListOutputsArgs,
) -> WalletResult<ListOutputsResult> {
let (user, _) =
StorageReaderWriter::find_or_insert_user(self, &auth.identity_key, None).await?;
crate::storage::methods::list_outputs::list_outputs_rw(
self as &dyn StorageReaderWriter,
&auth.identity_key,
user.user_id,
args,
None,
)
.await
}
async fn abort_action(
&self,
auth: &AuthId,
args: &AbortActionArgs,
) -> WalletResult<AbortActionResult> {
crate::storage::methods::abort_action::abort_action(self, &auth.identity_key, args, None)
.await
}
async fn create_action(
&self,
auth: &AuthId,
args: &StorageCreateActionArgs,
) -> WalletResult<StorageCreateActionResult> {
StorageActionProvider::create_action(self, &auth.identity_key, args, None).await
}
async fn process_action(
&self,
auth: &AuthId,
args: &StorageProcessActionArgs,
) -> WalletResult<StorageProcessActionResult> {
StorageActionProvider::process_action(self, &auth.identity_key, args, None).await
}
async fn internalize_action(
&self,
auth: &AuthId,
args: &StorageInternalizeActionArgs,
services: &dyn WalletServices,
) -> WalletResult<StorageInternalizeActionResult> {
StorageActionProvider::internalize_action(self, &auth.identity_key, args, services, None)
.await
}
async fn insert_certificate_auth(
&self,
auth: &AuthId,
certificate: &Certificate,
) -> WalletResult<i64> {
let _ = auth; StorageReaderWriter::insert_certificate(self, certificate, None).await
}
async fn relinquish_certificate(
&self,
_auth: &AuthId,
args: &RelinquishCertificateArgs,
) -> WalletResult<i64> {
let certifier_hex = args.certifier.to_der_hex();
let cert_type_str = String::from_utf8_lossy(args.cert_type.bytes()).to_string();
let cert_type_str = cert_type_str.trim_end_matches('\0').to_string();
let serial_number_str = String::from_utf8_lossy(&args.serial_number.0)
.trim_end_matches('\0')
.to_string();
let cert = verify_one(
StorageReader::find_certificates(
self,
&FindCertificatesArgs {
partial: CertificatePartial {
certifier: Some(certifier_hex),
serial_number: Some(serial_number_str),
cert_type: Some(cert_type_str),
..Default::default()
},
..Default::default()
},
None,
)
.await?,
)?;
StorageReaderWriter::update_certificate(
self,
cert.certificate_id,
&CertificatePartial {
is_deleted: Some(true),
..Default::default()
},
None,
)
.await?;
Ok(1)
}
async fn relinquish_output(
&self,
_auth: &AuthId,
args: &RelinquishOutputArgs,
) -> WalletResult<i64> {
let outpoint_str = args.output.to_string();
let parts: Vec<&str> = outpoint_str.rsplitn(2, '.').collect();
if parts.len() != 2 {
return Err(WalletError::InvalidParameter {
parameter: "output".to_string(),
must_be: "a valid outpoint string (txid.vout)".to_string(),
});
}
let vout: i32 = parts[0]
.parse()
.map_err(|_| WalletError::InvalidParameter {
parameter: "output".to_string(),
must_be: "a valid outpoint string with numeric vout".to_string(),
})?;
let txid = parts[1].to_string();
let output = verify_one(
StorageReader::find_outputs(
self,
&FindOutputsArgs {
partial: OutputPartial {
txid: Some(txid),
vout: Some(vout),
..Default::default()
},
..Default::default()
},
None,
)
.await?,
)?;
StorageReaderWriter::update_output(
self,
output.output_id,
&OutputPartial {
basket_id: Some(0), ..Default::default()
},
None,
)
.await?;
Ok(1)
}
async fn find_or_insert_sync_state_auth(
&self,
auth: &AuthId,
storage_identity_key: &str,
storage_name: &str,
) -> WalletResult<(SyncState, bool)> {
let (user, _) =
StorageReaderWriter::find_or_insert_user(self, &auth.identity_key, None).await?;
let existing = verify_one_or_none(
StorageReader::find_sync_states(
self,
&FindSyncStatesArgs {
partial: SyncStatePartial {
user_id: Some(user.user_id),
storage_identity_key: Some(storage_identity_key.to_string()),
..Default::default()
},
..Default::default()
},
None,
)
.await?,
)?;
if let Some(state) = existing {
return Ok((state, false));
}
let ref_num = {
use rand::RngCore;
let mut bytes = [0u8; 12];
rand::thread_rng().fill_bytes(&mut bytes);
base64::engine::general_purpose::STANDARD.encode(bytes)
};
let now = chrono::Utc::now().naive_utc();
let new_state = SyncState {
created_at: now,
updated_at: now,
sync_state_id: 0,
user_id: user.user_id,
storage_identity_key: storage_identity_key.to_string(),
storage_name: storage_name.to_string(),
status: SyncStatus::Unknown,
init: false,
ref_num,
sync_map: "{}".to_string(),
when: None,
satoshis: None,
error_local: None,
error_other: None,
};
let id = StorageReaderWriter::insert_sync_state(self, &new_state, None).await?;
let mut inserted = new_state;
inserted.sync_state_id = id;
Ok((inserted, true))
}
async fn set_active(
&self,
auth: &AuthId,
new_active_storage_identity_key: &str,
) -> WalletResult<i64> {
let (user, _) =
StorageReaderWriter::find_or_insert_user(self, &auth.identity_key, None).await?;
StorageReaderWriter::update_user(
self,
user.user_id,
&UserPartial {
active_storage: Some(new_active_storage_identity_key.to_string()),
..Default::default()
},
None,
)
.await
}
async fn get_sync_chunk(&self, args: &RequestSyncChunkArgs) -> WalletResult<SyncChunk> {
let (sync_map, offsets) = build_sync_map_and_offsets(args);
let internal_args = GetSyncChunkArgs {
from_storage_identity_key: args.from_storage_identity_key.clone(),
to_storage_identity_key: args.to_storage_identity_key.clone(),
user_identity_key: args.identity_key.clone(),
sync_map: &sync_map,
max_items_per_entity: args.max_items,
offsets,
};
crate::storage::sync::get_sync_chunk::get_sync_chunk(self, internal_args, None).await
}
async fn process_sync_chunk(
&self,
args: &RequestSyncChunkArgs,
chunk: &SyncChunk,
) -> WalletResult<ProcessSyncChunkResult> {
let chunk_is_empty = chunk.user.is_none()
&& chunk.proven_txs.as_ref().is_none_or(|v| v.is_empty())
&& chunk.output_baskets.as_ref().is_none_or(|v| v.is_empty())
&& chunk.transactions.as_ref().is_none_or(|v| v.is_empty())
&& chunk.outputs.as_ref().is_none_or(|v| v.is_empty())
&& chunk.tx_labels.as_ref().is_none_or(|v| v.is_empty())
&& chunk.tx_label_maps.as_ref().is_none_or(|v| v.is_empty())
&& chunk.output_tags.as_ref().is_none_or(|v| v.is_empty())
&& chunk.output_tag_maps.as_ref().is_none_or(|v| v.is_empty())
&& chunk.certificates.as_ref().is_none_or(|v| v.is_empty())
&& chunk
.certificate_fields
.as_ref()
.is_none_or(|v| v.is_empty())
&& chunk.commissions.as_ref().is_none_or(|v| v.is_empty())
&& chunk.proven_tx_reqs.as_ref().is_none_or(|v| v.is_empty());
let (mut sync_map, _) = build_sync_map_and_offsets(args);
let mut result = crate::storage::sync::process_sync_chunk::process_sync_chunk(
self,
chunk.clone(),
&mut sync_map,
None,
)
.await?;
result.done = chunk_is_empty;
if !chunk_is_empty {
let (user, _) =
StorageReaderWriter::find_or_insert_user(self, &chunk.user_identity_key, None)
.await?;
let existing = verify_one_or_none(
StorageReader::find_sync_states(
self,
&FindSyncStatesArgs {
partial: SyncStatePartial {
user_id: Some(user.user_id),
storage_identity_key: Some(args.from_storage_identity_key.clone()),
..Default::default()
},
..Default::default()
},
None,
)
.await?,
)?;
if let Some(state) = existing {
let new_sync_map_json = serde_json::to_string(&sync_map).map_err(|e| {
WalletError::Internal(format!("failed to serialize sync_map: {e}"))
})?;
let new_when = result
.max_updated_at
.or_else(|| Some(chrono::Utc::now().naive_utc()));
StorageReaderWriter::update_sync_state(
self,
state.sync_state_id,
&SyncStatePartial {
sync_map: Some(new_sync_map_json),
when: new_when,
..Default::default()
},
None,
)
.await?;
}
}
Ok(result)
}
async fn find_user_by_identity_key(&self, key: &str) -> WalletResult<Option<User>> {
StorageReader::find_user_by_identity_key(self, key, None).await
}
async fn find_certificates_storage(
&self,
args: &FindCertificatesArgs,
) -> WalletResult<Vec<Certificate>> {
StorageReader::find_certificates(self, args, None).await
}
async fn find_certificate_fields(
&self,
args: &FindCertificateFieldsArgs,
) -> WalletResult<Vec<CertificateField>> {
StorageReader::find_certificate_fields(self, args, None).await
}
async fn find_outputs_storage(&self, args: &FindOutputsArgs) -> WalletResult<Vec<Output>> {
StorageReader::find_outputs(self, args, None).await
}
async fn update_output(&self, id: i64, update: &OutputPartial) -> WalletResult<i64> {
StorageReaderWriter::update_output(self, id, update, None).await
}
async fn insert_certificate_storage(&self, cert: &Certificate) -> WalletResult<i64> {
StorageReaderWriter::insert_certificate(self, cert, None).await
}
async fn insert_certificate_field_storage(&self, field: &CertificateField) -> WalletResult<()> {
StorageReaderWriter::insert_certificate_field(self, field, None).await
}
async fn find_settings_storage(&self, args: &FindSettingsArgs) -> WalletResult<Vec<Settings>> {
StorageReader::find_settings(self, args, None).await
}
async fn update_settings_storage(&self, update: &SettingsPartial) -> WalletResult<i64> {
StorageReaderWriter::update_settings(self, update, None).await
}
async fn find_proven_txs(&self, args: &FindProvenTxsArgs) -> WalletResult<Vec<ProvenTx>> {
StorageReader::find_proven_txs(self, args, None).await
}
async fn find_transactions(
&self,
args: &FindTransactionsArgs,
) -> WalletResult<Vec<Transaction>> {
StorageReader::find_transactions(self, args, None).await
}
async fn update_proven_tx_req(
&self,
id: i64,
update: &ProvenTxReqPartial,
) -> WalletResult<i64> {
StorageReaderWriter::update_proven_tx_req(self, id, update, None).await
}
async fn update_proven_tx(&self, id: i64, update: &ProvenTxPartial) -> WalletResult<i64> {
StorageReaderWriter::update_proven_tx(self, id, update, None).await
}
async fn update_transaction(&self, id: i64, update: &TransactionPartial) -> WalletResult<i64> {
StorageReaderWriter::update_transaction(self, id, update, None).await
}
async fn update_transaction_status(
&self,
txid: &str,
new_status: TransactionStatus,
) -> WalletResult<()> {
StorageReaderWriter::update_transaction_status(self, txid, new_status, None).await
}
async fn update_proven_tx_req_with_new_proven_tx(
&self,
req_id: i64,
proven_tx: &ProvenTx,
) -> WalletResult<i64> {
StorageReaderWriter::update_proven_tx_req_with_new_proven_tx(self, req_id, proven_tx, None)
.await
}
async fn find_output_baskets(
&self,
args: &FindOutputBasketsArgs,
) -> WalletResult<Vec<OutputBasket>> {
StorageReader::find_output_baskets(self, args, None).await
}
async fn insert_output_basket(&self, basket: &OutputBasket) -> WalletResult<i64> {
StorageReaderWriter::insert_output_basket(self, basket, None).await
}
async fn insert_transaction(&self, tx: &Transaction) -> WalletResult<i64> {
StorageReaderWriter::insert_transaction(self, tx, None).await
}
async fn insert_output(&self, output: &Output) -> WalletResult<i64> {
StorageReaderWriter::insert_output(self, output, None).await
}
async fn insert_monitor_event(&self, event: &MonitorEvent) -> WalletResult<i64> {
StorageReaderWriter::insert_monitor_event(self, event, None).await
}
async fn delete_monitor_events_before_id(
&self,
event_name: &str,
before_id: i64,
) -> WalletResult<u64> {
StorageReaderWriter::delete_monitor_events_before_id(self, event_name, before_id, None)
.await
}
async fn find_monitor_events(
&self,
args: &FindMonitorEventsArgs,
) -> WalletResult<Vec<MonitorEvent>> {
StorageReader::find_monitor_events(self, args, None).await
}
async fn count_monitor_events(&self, args: &FindMonitorEventsArgs) -> WalletResult<i64> {
StorageReader::count_monitor_events(self, args, None).await
}
async fn admin_stats(&self, auth_id: &str) -> WalletResult<AdminStatsResult> {
StorageReaderWriter::admin_stats(self, auth_id).await
}
async fn purge_data(&self, params: &PurgeParams) -> WalletResult<String> {
StorageReaderWriter::purge_data(self, params, None).await
}
async fn review_status(&self, aged_limit: chrono::NaiveDateTime) -> WalletResult<String> {
StorageReaderWriter::review_status(self, aged_limit, None).await
}
async fn get_storage_identity_key(&self) -> WalletResult<String> {
StorageProvider::get_storage_identity_key(self)
}
async fn begin_transaction(&self) -> WalletResult<TrxToken> {
StorageReaderWriter::begin_transaction(self).await
}
async fn commit_transaction(&self, trx: TrxToken) -> WalletResult<()> {
StorageReaderWriter::commit_transaction(self, trx).await
}
async fn rollback_transaction(&self, trx: TrxToken) -> WalletResult<()> {
StorageReaderWriter::rollback_transaction(self, trx).await
}
async fn find_outputs_trx(
&self,
args: &FindOutputsArgs,
trx: Option<&TrxToken>,
) -> WalletResult<Vec<Output>> {
StorageReader::find_outputs(self, args, trx).await
}
async fn find_transactions_trx(
&self,
args: &FindTransactionsArgs,
trx: Option<&TrxToken>,
) -> WalletResult<Vec<Transaction>> {
StorageReader::find_transactions(self, args, trx).await
}
async fn find_proven_tx_reqs_trx(
&self,
args: &FindProvenTxReqsArgs,
trx: Option<&TrxToken>,
) -> WalletResult<Vec<ProvenTxReq>> {
StorageReader::find_proven_tx_reqs(self, args, trx).await
}
async fn update_output_trx(
&self,
id: i64,
update: &OutputPartial,
trx: Option<&TrxToken>,
) -> WalletResult<i64> {
StorageReaderWriter::update_output(self, id, update, trx).await
}
async fn update_proven_tx_req_trx(
&self,
id: i64,
update: &ProvenTxReqPartial,
trx: Option<&TrxToken>,
) -> WalletResult<i64> {
StorageReaderWriter::update_proven_tx_req(self, id, update, trx).await
}
async fn update_transaction_status_trx(
&self,
txid: &str,
new_status: TransactionStatus,
trx: Option<&TrxToken>,
) -> WalletResult<()> {
StorageReaderWriter::update_transaction_status(self, txid, new_status, trx).await
}
async fn restore_consumed_inputs_trx(
&self,
tx_id: i64,
trx: Option<&TrxToken>,
) -> WalletResult<u64> {
StorageReaderWriter::restore_consumed_inputs(self, tx_id, trx).await
}
}
fn build_sync_map_and_offsets(args: &RequestSyncChunkArgs) -> (SyncMap, SyncChunkOffsets) {
let mut sync_map = SyncMap::new();
if let Some(since) = args.since {
sync_map.proven_tx.max_updated_at = Some(since);
sync_map.output_basket.max_updated_at = Some(since);
sync_map.transaction.max_updated_at = Some(since);
sync_map.output.max_updated_at = Some(since);
sync_map.tx_label.max_updated_at = Some(since);
sync_map.tx_label_map.max_updated_at = Some(since);
sync_map.output_tag.max_updated_at = Some(since);
sync_map.output_tag_map.max_updated_at = Some(since);
sync_map.certificate.max_updated_at = Some(since);
sync_map.certificate_field.max_updated_at = Some(since);
sync_map.commission.max_updated_at = Some(since);
sync_map.proven_tx_req.max_updated_at = Some(since);
}
let mut offsets = SyncChunkOffsets::default();
for SyncChunkOffset { name, offset } in &args.offsets {
match name.as_str() {
"provenTx" => offsets.proven_tx = *offset,
"outputBasket" => offsets.output_basket = *offset,
"outputTag" => offsets.output_tag = *offset,
"txLabel" => offsets.tx_label = *offset,
"transaction" => offsets.transaction = *offset,
"output" => offsets.output = *offset,
"txLabelMap" => offsets.tx_label_map = *offset,
"outputTagMap" => offsets.output_tag_map = *offset,
"certificate" => offsets.certificate = *offset,
"certificateField" => offsets.certificate_field = *offset,
"commission" => offsets.commission = *offset,
"provenTxReq" => offsets.proven_tx_req = *offset,
_ => {} }
}
(sync_map, offsets)
}