pub(crate) mod addresses;
pub(crate) mod foundries;
pub(crate) mod options;
pub(crate) mod outputs;
pub(crate) mod transactions;
use std::collections::{HashMap, HashSet};
pub use self::options::SyncOptions;
use crate::{
client::{secret::SecretManage, ClientError},
types::block::{
address::{AccountAddress, Address, Bech32Address, NftAddress},
output::{FoundryId, Output, OutputId, OutputMetadata},
},
wallet::{
constants::MIN_SYNC_INTERVAL,
types::{
address::{AddressWithUnspentOutputIds, AddressWithUnspentOutputs, SpentOutputId},
Balance, OutputData,
},
Wallet, WalletError,
},
};
impl<S: 'static + SecretManage> Wallet<S> {
pub async fn set_default_sync_options(&self, options: SyncOptions) -> Result<(), WalletError> {
#[cfg(feature = "storage")]
{
self.storage_manager().set_default_sync_options(&options).await?;
}
*self.default_sync_options.lock().await = options;
Ok(())
}
pub async fn default_sync_options(&self) -> SyncOptions {
*self.default_sync_options.lock().await
}
async fn request_outputs_recursively(
&self,
addresses: &[AddressWithUnspentOutputIds],
options: &SyncOptions,
) -> Result<(Vec<AddressWithUnspentOutputIds>, Vec<SpentOutputId>, Vec<OutputData>), WalletError> {
let bech32_hrp = self.client().get_bech32_hrp().await?;
let network_id = self.client().get_network_id().await?;
let (addresses_with_unspent_output_ids, mut spent_or_not_synced_output_ids) =
self.get_output_ids_for_addresses(addresses, options).await?;
let mut addresses_with_unspent_outputs = self
.get_outputs_from_address_output_ids(&addresses_with_unspent_output_ids)
.await?;
let mut addresses_to_scan: HashMap<Address, Address> = HashMap::new();
let mut new_addresses_with_unspent_output_ids = Vec::new();
let mut unspent_outputs_data = Vec::new();
loop {
#[allow(clippy::iter_with_drain)]
for AddressWithUnspentOutputs {
address_with_unspent_output_ids,
unspent_outputs,
} in addresses_with_unspent_outputs.drain(..)
{
for unspent_output in &unspent_outputs {
match &unspent_output.output {
Output::Account(account) => {
addresses_to_scan.insert(
AccountAddress::from(account.account_id_non_null(&unspent_output.output_id)).into(),
(*address_with_unspent_output_ids).inner().clone(),
);
}
Output::Nft(nft) => {
addresses_to_scan.insert(
NftAddress::from(nft.nft_id_non_null(&unspent_output.output_id)).into(),
(*address_with_unspent_output_ids).inner().clone(),
);
}
_ => {}
}
}
new_addresses_with_unspent_output_ids.push(address_with_unspent_output_ids);
unspent_outputs_data.extend(unspent_outputs);
}
log::debug!("[SYNC] new_addresses: {addresses_to_scan:?}");
if addresses_to_scan.is_empty() {
break;
}
for (account_or_nft_address, output_address) in addresses_to_scan.drain() {
let address_with_unspent_output_ids = new_addresses_with_unspent_output_ids
.iter_mut()
.find(|address| address.address.inner() == &output_address)
.unwrap();
let account_or_nft_output_ids = self
.get_output_ids_for_address(&Bech32Address::new(bech32_hrp, account_or_nft_address), options)
.await?;
address_with_unspent_output_ids
.unspent_output_ids
.extend(account_or_nft_output_ids.clone());
let account_or_nft_outputs_with_metadata =
self.get_outputs_request_unknown(&account_or_nft_output_ids).await?;
let account_or_nft_outputs_data = self
.output_response_to_output_data(account_or_nft_outputs_with_metadata, network_id)
.await?;
addresses_with_unspent_outputs.push(AddressWithUnspentOutputs {
address_with_unspent_output_ids: address_with_unspent_output_ids.clone(),
unspent_outputs: account_or_nft_outputs_data,
});
}
}
let unspent_output_ids_all: HashSet<OutputId> =
HashSet::from_iter(unspent_outputs_data.iter().map(|o| o.output_id));
spent_or_not_synced_output_ids.retain(|o| !unspent_output_ids_all.contains(o));
Ok((
new_addresses_with_unspent_output_ids,
spent_or_not_synced_output_ids,
unspent_outputs_data,
))
}
}
impl<S: 'static + SecretManage> Wallet<S>
where
WalletError: From<S::Error>,
ClientError: From<S::Error>,
{
pub async fn sync(&self, options: impl Into<Option<SyncOptions>> + Send) -> Result<Balance, WalletError> {
let options = match options.into() {
Some(opt) => opt,
None => self.default_sync_options().await,
};
log::debug!("[SYNC] start syncing with {:?}", options);
let syc_start_time = instant::Instant::now();
let time_now = crate::client::unix_timestamp_now().as_millis();
let mut last_synced = self.last_synced.lock().await;
log::debug!("[SYNC] last time synced before {}ms", time_now - *last_synced);
if !options.force_syncing && time_now - *last_synced < MIN_SYNC_INTERVAL {
log::debug!(
"[SYNC] synced within the latest {} ms, only calculating balance",
MIN_SYNC_INTERVAL
);
return self.balance().await;
}
self.sync_internal(&options).await?;
if options.sync_pending_transactions {
let confirmed_tx_with_unknown_output = self.sync_pending_transactions().await?;
if confirmed_tx_with_unknown_output {
log::debug!("[SYNC] a transaction for which no output is known got confirmed, syncing outputs again");
self.sync_internal(&options).await?;
}
};
let balance = self.balance().await?;
let time_now = crate::client::unix_timestamp_now().as_millis();
*last_synced = time_now;
log::debug!("[SYNC] finished syncing in {:.2?}", syc_start_time.elapsed());
Ok(balance)
}
async fn sync_internal(&self, options: &SyncOptions) -> Result<(), WalletError> {
log::debug!("[SYNC] sync_internal");
let wallet_address_with_unspent_outputs = AddressWithUnspentOutputIds {
address: self.address().await,
unspent_output_ids: self.ledger().await.unspent_outputs().keys().copied().collect(),
};
let mut addresses_to_sync = vec![wallet_address_with_unspent_outputs];
if options.sync_implicit_accounts {
if let Ok(implicit_account_creation_address) = self.implicit_account_creation_address().await {
addresses_to_sync.push(AddressWithUnspentOutputIds {
unspent_output_ids: self
.ledger()
.await
.implicit_accounts()
.filter_map(|output_data| {
if output_data.output.as_basic().address() == implicit_account_creation_address.inner() {
Some(output_data.output_id)
} else {
None
}
})
.collect(),
address: implicit_account_creation_address,
});
}
}
let (_addresses_with_unspent_outputs, spent_or_not_synced_output_ids, outputs_data) =
self.request_outputs_recursively(&addresses_to_sync, options).await?;
log::debug!("[SYNC] spent_or_not_synced_outputs: {spent_or_not_synced_output_ids:?}");
let spent_or_unsynced_output_metadata_responses = self
.client()
.get_outputs_metadata_ignore_not_found(&spent_or_not_synced_output_ids)
.await?;
let mut spent_or_unsynced_output_metadata: HashMap<OutputId, Option<OutputMetadata>> =
spent_or_not_synced_output_ids.into_iter().map(|o| (o, None)).collect();
for output_metadata_response in spent_or_unsynced_output_metadata_responses {
let output_id = output_metadata_response.output_id();
spent_or_unsynced_output_metadata.insert(*output_id, Some(output_metadata_response));
}
if options.sync_incoming_transactions {
let transaction_ids = outputs_data
.iter()
.map(|output| *output.output_id.transaction_id())
.collect();
self.request_incoming_transaction_data(transaction_ids).await?;
}
if options.sync_native_token_foundries {
let native_token_foundry_ids = outputs_data
.iter()
.filter_map(|output| {
output
.output
.native_token()
.map(|native_token| FoundryId::from(*native_token.token_id()))
})
.collect::<HashSet<_>>();
self.request_and_store_foundry_outputs(native_token_foundry_ids).await?;
}
self.update_after_sync(outputs_data, spent_or_unsynced_output_metadata)
.await
}
}