use crate::{node::Node, Error, Marker, Result};
use libp2p::kad::{Record, RecordKey};
use sn_evm::ProofOfPayment;
use sn_networking::{get_raw_signed_spends_from_record, GetRecordError, NetworkError};
use sn_protocol::{
storage::{
try_deserialize_record, try_serialize_record, Chunk, RecordHeader, RecordKind, RecordType,
Scratchpad, SpendAddress,
},
NetworkAddress, PrettyPrintRecordKey,
};
use sn_registers::SignedRegister;
use sn_transfers::{SignedSpend, TransferError, UniquePubkey, QUOTE_EXPIRATION_SECS};
use std::collections::BTreeSet;
use std::time::{Duration, UNIX_EPOCH};
use tokio::task::JoinSet;
use xor_name::XorName;
impl Node {
pub(crate) async fn validate_and_store_record(&self, record: Record) -> Result<()> {
let record_header = RecordHeader::from_record(&record)?;
match record_header.kind {
RecordKind::ChunkWithPayment => {
let record_key = record.key.clone();
let (payment, chunk) = try_deserialize_record::<(ProofOfPayment, Chunk)>(&record)?;
let already_exists = self
.validate_key_and_existence(&chunk.network_address(), &record_key)
.await?;
let payment_res = self
.payment_for_us_exists_and_is_still_valid(&chunk.network_address(), payment)
.await;
if already_exists {
self.replicate_valid_fresh_record(record_key, RecordType::Chunk);
self.network()
.notify_fetch_completed(record.key.clone(), RecordType::Chunk);
debug!(
"Chunk with addr {:?} already exists: {already_exists}, payment extracted.",
chunk.network_address()
);
return Ok(());
}
payment_res?;
let store_chunk_result = self.store_chunk(&chunk);
if store_chunk_result.is_ok() {
Marker::ValidPaidChunkPutFromClient(&PrettyPrintRecordKey::from(&record.key))
.log();
self.replicate_valid_fresh_record(record_key, RecordType::Chunk);
self.network()
.notify_fetch_completed(record.key.clone(), RecordType::Chunk);
}
store_chunk_result
}
RecordKind::Chunk => {
error!("Chunk should not be validated at this point");
Err(Error::InvalidPutWithoutPayment(
PrettyPrintRecordKey::from(&record.key).into_owned(),
))
}
RecordKind::ScratchpadWithPayment => {
let record_key = record.key.clone();
let (payment, scratchpad) =
try_deserialize_record::<(ProofOfPayment, Scratchpad)>(&record)?;
let _already_exists = self
.validate_key_and_existence(&scratchpad.network_address(), &record_key)
.await?;
let payment_res = self
.payment_for_us_exists_and_is_still_valid(
&scratchpad.network_address(),
payment,
)
.await;
payment_res?;
let store_scratchpad_result = self
.validate_and_store_scratchpad_record(scratchpad, record_key.clone(), true)
.await;
if store_scratchpad_result.is_ok() {
Marker::ValidScratchpadRecordPutFromClient(&PrettyPrintRecordKey::from(
&record_key,
))
.log();
self.replicate_valid_fresh_record(record_key.clone(), RecordType::Scratchpad);
self.network()
.notify_fetch_completed(record_key, RecordType::Scratchpad);
}
store_scratchpad_result
}
RecordKind::Scratchpad => {
let key = record.key.clone();
let scratchpad = try_deserialize_record::<Scratchpad>(&record)?;
let net_addr = NetworkAddress::ScratchpadAddress(*scratchpad.address());
let pretty_key = PrettyPrintRecordKey::from(&key);
trace!("Got record to store without payment for scratchpad at {pretty_key:?}");
if !self.validate_key_and_existence(&net_addr, &key).await? {
warn!("Ignore store without payment for scratchpad at {pretty_key:?}");
return Err(Error::InvalidPutWithoutPayment(
PrettyPrintRecordKey::from(&record.key).into_owned(),
));
}
self.validate_and_store_scratchpad_record(scratchpad, key, false)
.await
}
RecordKind::Spend => {
let record_key = record.key.clone();
let value_to_hash = record.value.clone();
let spends = try_deserialize_record::<Vec<SignedSpend>>(&record)?;
let result = self
.validate_merge_and_store_spends(spends, &record_key)
.await;
if result.is_ok() {
Marker::ValidSpendPutFromClient(&PrettyPrintRecordKey::from(&record_key)).log();
let content_hash = XorName::from_content(&value_to_hash);
self.replicate_valid_fresh_record(
record_key,
RecordType::NonChunk(content_hash),
);
self.network().notify_fetch_completed(
record.key.clone(),
RecordType::NonChunk(content_hash),
);
}
result
}
RecordKind::Register => {
let register = try_deserialize_record::<SignedRegister>(&record)?;
let net_addr = NetworkAddress::from_register_address(*register.address());
let key = net_addr.to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&key);
debug!("Got record to store without payment for register at {pretty_key:?}");
if !self.validate_key_and_existence(&net_addr, &key).await? {
debug!("Ignore store without payment for register at {pretty_key:?}");
return Err(Error::InvalidPutWithoutPayment(
PrettyPrintRecordKey::from(&record.key).into_owned(),
));
}
debug!("Store update without payment as we already had register at {pretty_key:?}");
let result = self.validate_and_store_register(register, true).await;
if result.is_ok() {
debug!("Successfully stored register update at {pretty_key:?}");
Marker::ValidPaidRegisterPutFromClient(&pretty_key).log();
let content_hash = XorName::from_content(&record.value);
self.network().notify_fetch_completed(
record.key.clone(),
RecordType::NonChunk(content_hash),
);
} else {
warn!("Failed to store register update at {pretty_key:?}");
}
result
}
RecordKind::RegisterWithPayment => {
let (payment, register) =
try_deserialize_record::<(ProofOfPayment, SignedRegister)>(&record)?;
let net_addr = NetworkAddress::from_register_address(*register.address());
let key = net_addr.to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&key);
if record.key != key {
warn!(
"Record's key {pretty_key:?} does not match with the value's RegisterAddress, ignoring PUT."
);
return Err(Error::RecordKeyMismatch);
}
let already_exists = self.validate_key_and_existence(&net_addr, &key).await?;
if let Err(err) = self
.payment_for_us_exists_and_is_still_valid(&net_addr, payment)
.await
{
if already_exists {
debug!("Payment of the incoming exists register {pretty_key:?} having error {err:?}");
} else {
error!("Payment of the incoming non-exist register {pretty_key:?} having error {err:?}");
return Err(err);
}
}
let res = self.validate_and_store_register(register, true).await;
if res.is_ok() {
let content_hash = XorName::from_content(&record.value);
self.network().notify_fetch_completed(
record.key.clone(),
RecordType::NonChunk(content_hash),
);
}
res
}
}
}
pub(crate) async fn store_replicated_in_record(&self, record: Record) -> Result<()> {
debug!("Storing record which was replicated to us {:?}", record.key);
let record_header = RecordHeader::from_record(&record)?;
match record_header.kind {
RecordKind::ChunkWithPayment
| RecordKind::RegisterWithPayment
| RecordKind::ScratchpadWithPayment => {
warn!("Prepaid record came with Payment, which should be handled in another flow");
Err(Error::UnexpectedRecordWithPayment(
PrettyPrintRecordKey::from(&record.key).into_owned(),
))
}
RecordKind::Chunk => {
let chunk = try_deserialize_record::<Chunk>(&record)?;
let record_key = record.key.clone();
let already_exists = self
.validate_key_and_existence(&chunk.network_address(), &record_key)
.await?;
if already_exists {
debug!(
"Chunk with addr {:?} already exists?: {already_exists}, do nothing",
chunk.network_address()
);
return Ok(());
}
self.store_chunk(&chunk)
}
RecordKind::Scratchpad => {
let key = record.key.clone();
let scratchpad = try_deserialize_record::<Scratchpad>(&record)?;
self.validate_and_store_scratchpad_record(scratchpad, key, false)
.await
}
RecordKind::Spend => {
let record_key = record.key.clone();
let spends = try_deserialize_record::<Vec<SignedSpend>>(&record)?;
self.validate_merge_and_store_spends(spends, &record_key)
.await
}
RecordKind::Register => {
let register = try_deserialize_record::<SignedRegister>(&record)?;
let key =
NetworkAddress::from_register_address(*register.address()).to_record_key();
if record.key != key {
warn!(
"Record's key does not match with the value's RegisterAddress, ignoring PUT."
);
return Err(Error::RecordKeyMismatch);
}
self.validate_and_store_register(register, false).await
}
}
}
async fn validate_key_and_existence(
&self,
address: &NetworkAddress,
expected_record_key: &RecordKey,
) -> Result<bool> {
let data_key = address.to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&data_key);
if expected_record_key != &data_key {
warn!(
"record key: {:?}, key: {:?}",
PrettyPrintRecordKey::from(expected_record_key),
pretty_key
);
warn!("Record's key does not match with the value's address, ignoring PUT.");
return Err(Error::RecordKeyMismatch);
}
let present_locally = self
.network()
.is_record_key_present_locally(&data_key)
.await?;
if present_locally {
debug!(
"Record with addr {:?} already exists, not overwriting",
address
);
return Ok(true);
}
Ok(false)
}
pub(crate) fn store_chunk(&self, chunk: &Chunk) -> Result<()> {
let chunk_name = *chunk.name();
let chunk_addr = *chunk.address();
let key = NetworkAddress::from_chunk_address(*chunk.address()).to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&key).into_owned();
let record = Record {
key,
value: try_serialize_record(&chunk, RecordKind::Chunk)?.to_vec(),
publisher: None,
expires: None,
};
debug!("Storing chunk {chunk_name:?} as Record locally");
self.network().put_local_record(record);
self.record_metrics(Marker::ValidChunkRecordPutFromNetwork(&pretty_key));
self.events_channel()
.broadcast(crate::NodeEvent::ChunkStored(chunk_addr));
Ok(())
}
pub(crate) async fn validate_and_store_scratchpad_record(
&self,
scratchpad: Scratchpad,
record_key: RecordKey,
is_client_put: bool,
) -> Result<()> {
let addr = scratchpad.address();
let count = scratchpad.count();
debug!("Validating and storing scratchpad {addr:?} with count {count}");
let scratchpad_key = NetworkAddress::ScratchpadAddress(*addr).to_record_key();
if scratchpad_key != record_key {
warn!("Record's key does not match with the value's ScratchpadAddress, ignoring PUT.");
return Err(Error::RecordKeyMismatch);
}
if let Some(local_pad) = self.network().get_local_record(&scratchpad_key).await? {
let local_pad = try_deserialize_record::<Scratchpad>(&local_pad)?;
if local_pad.count() >= scratchpad.count() {
warn!("Rejecting Scratchpad PUT with counter less than or equal to the current counter");
return Err(Error::IgnoringOutdatedScratchpadPut);
}
}
if !scratchpad.is_valid() {
warn!("Rejecting Scratchpad PUT with invalid signature");
return Err(Error::InvalidScratchpadSignature);
}
info!(
"Storing sratchpad {addr:?} with content of {:?} as Record locally",
scratchpad.encrypted_data_hash()
);
let record = Record {
key: scratchpad_key.clone(),
value: try_serialize_record(&scratchpad, RecordKind::Scratchpad)?.to_vec(),
publisher: None,
expires: None,
};
self.network().put_local_record(record);
let pretty_key = PrettyPrintRecordKey::from(&scratchpad_key);
self.record_metrics(Marker::ValidScratchpadRecordPutFromNetwork(&pretty_key));
if is_client_put {
self.replicate_valid_fresh_record(scratchpad_key, RecordType::Scratchpad);
}
Ok(())
}
pub(crate) async fn validate_and_store_register(
&self,
register: SignedRegister,
is_client_put: bool,
) -> Result<()> {
let reg_addr = register.address();
debug!("Validating and storing register {reg_addr:?}");
let key = NetworkAddress::from_register_address(*reg_addr).to_record_key();
let present_locally = self.network().is_record_key_present_locally(&key).await?;
let pretty_key = PrettyPrintRecordKey::from(&key);
let updated_register = match self.register_validation(®ister, present_locally).await? {
Some(reg) => {
debug!("Register {pretty_key:?} needed to be updated");
reg
}
None => {
debug!("No update needed for register");
return Ok(());
}
};
let record = Record {
key: key.clone(),
value: try_serialize_record(&updated_register, RecordKind::Register)?.to_vec(),
publisher: None,
expires: None,
};
let content_hash = XorName::from_content(&record.value);
info!("Storing register {reg_addr:?} with content of {content_hash:?} as Record locally");
self.network().put_local_record(record);
self.record_metrics(Marker::ValidRegisterRecordPutFromNetwork(&pretty_key));
if is_client_put {
self.replicate_valid_fresh_record(key, RecordType::NonChunk(content_hash));
}
Ok(())
}
pub(crate) async fn validate_merge_and_store_spends(
&self,
signed_spends: Vec<SignedSpend>,
record_key: &RecordKey,
) -> Result<()> {
let pretty_key = PrettyPrintRecordKey::from(record_key);
debug!("Validating spends before storage at {pretty_key:?}");
let spends_for_key: Vec<SignedSpend> = signed_spends
.into_iter()
.filter(|s| {
let spend_address = SpendAddress::from_unique_pubkey(s.unique_pubkey());
let network_address = NetworkAddress::from_spend_address(spend_address);
let spend_record_key = network_address.to_record_key();
let spend_pretty = PrettyPrintRecordKey::from(&spend_record_key);
if &spend_record_key != record_key {
warn!("Ignoring spend for another record key {spend_pretty:?} when verifying: {pretty_key:?}");
return false;
}
true
})
.collect();
let unique_pubkey = match spends_for_key.as_slice() {
[] => {
warn!("Found no valid spends to verify upon validation for {pretty_key:?}");
return Err(Error::InvalidRequest(format!(
"No spends to verify when validating {pretty_key:?}"
)));
}
[a, ..] => {
a.unique_pubkey()
}
};
debug!("Validating spends for {pretty_key:?} with unique key: {unique_pubkey:?}");
let validated_spends = match self
.signed_spends_to_keep(spends_for_key.clone(), *unique_pubkey)
.await
{
Ok((one, None)) => vec![one],
Ok((one, Some(two))) => vec![one, two],
Err(e) => {
warn!("Failed to validate spends at {pretty_key:?} with unique key {unique_pubkey:?}: {e}");
return Err(e);
}
};
debug!(
"Got {} validated spends with key: {unique_pubkey:?} at {pretty_key:?}",
validated_spends.len()
);
let record = Record {
key: record_key.clone(),
value: try_serialize_record(&validated_spends, RecordKind::Spend)?.to_vec(),
publisher: None,
expires: None,
};
self.network().put_local_record(record);
debug!(
"Successfully stored validated spends with key: {unique_pubkey:?} at {pretty_key:?}"
);
if validated_spends.len() > 1 {
warn!("Got double spend(s) of len {} for the Spend PUT with unique_pubkey {unique_pubkey}", validated_spends.len());
}
self.record_metrics(Marker::ValidSpendRecordPutFromNetwork(&pretty_key));
Ok(())
}
async fn payment_for_us_exists_and_is_still_valid(
&self,
address: &NetworkAddress,
payment: ProofOfPayment,
) -> Result<()> {
let key = address.to_record_key();
let pretty_key = PrettyPrintRecordKey::from(&key).into_owned();
debug!("Validating record payment for {pretty_key}");
let storecost = payment.quote.cost;
let self_peer_id = self.network().peer_id();
if !payment.quote.check_is_signed_by_claimed_peer(self_peer_id) {
warn!("Payment quote signature is not valid for record {pretty_key}");
return Err(Error::InvalidRequest(format!(
"Payment quote signature is not valid for record {pretty_key}"
)));
}
debug!("Payment quote signature is valid for record {pretty_key}");
let quote_timestamp = payment.quote.timestamp;
let quote_expiration_time = quote_timestamp + Duration::from_secs(QUOTE_EXPIRATION_SECS);
let quote_expiration_time_in_secs = quote_expiration_time
.duration_since(UNIX_EPOCH)
.map_err(|e| {
Error::InvalidRequest(format!(
"Payment quote timestamp is invalid for record {pretty_key}: {e}"
))
})?
.as_secs();
debug!("Verifying payment for record {pretty_key}");
self.evm_network()
.verify_data_payment(
payment.tx_hash,
payment.quote.hash(),
*self.reward_address(),
storecost.as_atto(),
quote_expiration_time_in_secs,
)
.await
.map_err(|e| Error::EvmNetwork(format!("Failed to verify chunk payment: {e}")))?;
debug!("Payment is valid for record {pretty_key}");
self.network().notify_payment_received();
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = self.metrics_recorder() {
let current_value = metrics_recorder.current_reward_wallet_balance.get();
let new_value =
current_value.saturating_add(storecost.as_atto().try_into().unwrap_or(i64::MAX));
let _ = metrics_recorder
.current_reward_wallet_balance
.set(new_value);
}
self.events_channel()
.broadcast(crate::NodeEvent::RewardReceived(storecost, address.clone()));
info!("Total payment of {storecost:?} atto tokens accepted for record {pretty_key}");
#[cfg(feature = "loud")]
{
println!("🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟 RECEIVED REWARD 🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟");
println!("Total payment of {storecost:?} atto tokens accepted for record {pretty_key}");
println!("🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟🌟");
}
Ok(())
}
async fn register_validation(
&self,
register: &SignedRegister,
present_locally: bool,
) -> Result<Option<SignedRegister>> {
let reg_addr = register.address();
register.verify()?;
if !present_locally {
debug!("Register with addr {reg_addr:?} is valid and doesn't exist locally");
return Ok(Some(register.to_owned()));
}
debug!("Register with addr {reg_addr:?} exists locally, comparing with local version");
let key = NetworkAddress::from_register_address(*reg_addr).to_record_key();
let maybe_record = self.network().get_local_record(&key).await?;
let record = match maybe_record {
Some(r) => r,
None => {
error!("Register with addr {reg_addr:?} already exists locally, but not found in local storage");
return Err(Error::InvalidRequest(format!(
"Register with addr {reg_addr:?} claimed to be existing locally was not found"
)));
}
};
let local_register: SignedRegister = try_deserialize_record(&record)?;
let mut merged_register = local_register.clone();
merged_register.verified_merge(register)?;
if merged_register == local_register {
debug!("Register with addr {reg_addr:?} is the same as the local version");
Ok(None)
} else {
debug!("Register with addr {reg_addr:?} is different from the local version");
Ok(Some(merged_register))
}
}
async fn get_local_spends(&self, addr: SpendAddress) -> Result<Vec<SignedSpend>> {
let record_key = NetworkAddress::from_spend_address(addr).to_record_key();
debug!("Checking for local spends with key: {record_key:?}");
let local_record = match self.network().get_local_record(&record_key).await? {
Some(r) => r,
None => {
debug!("Spend is not present locally: {record_key:?}");
return Ok(vec![]);
}
};
let local_header = RecordHeader::from_record(&local_record)?;
let record_kind = local_header.kind;
if !matches!(record_kind, RecordKind::Spend) {
error!("Found a {record_kind} when expecting to find Spend at {addr:?}");
return Err(NetworkError::RecordKindMismatch(RecordKind::Spend).into());
}
let local_signed_spends: Vec<SignedSpend> = try_deserialize_record(&local_record)?;
Ok(local_signed_spends)
}
async fn signed_spends_to_keep(
&self,
signed_spends: Vec<SignedSpend>,
unique_pubkey: UniquePubkey,
) -> Result<(SignedSpend, Option<SignedSpend>)> {
let spend_addr = SpendAddress::from_unique_pubkey(&unique_pubkey);
debug!(
"Validating before storing spend at {spend_addr:?} with unique key: {unique_pubkey}"
);
let local_spends = self.get_local_spends(spend_addr).await?;
let network_spends = match self.network().get_raw_spends(spend_addr).await {
Ok(spends) => spends,
Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => {
warn!("Got a split record (double spend) for {unique_pubkey:?} from the network");
let mut spends = vec![];
for (record, _) in result_map.values() {
match get_raw_signed_spends_from_record(record) {
Ok(s) => spends.extend(s),
Err(e) => warn!("Ignoring invalid record received from the network for spend: {unique_pubkey:?}: {e}"),
}
}
spends
}
Err(NetworkError::GetRecordError(GetRecordError::NotEnoughCopies {
record,
got,
..
})) => {
info!(
"Retrieved {got} copies of the record for {unique_pubkey:?} from the network"
);
match get_raw_signed_spends_from_record(&record) {
Ok(spends) => spends,
Err(err) => {
warn!("Ignoring invalid record received from the network for spend: {unique_pubkey:?}: {err}");
vec![]
}
}
}
Err(e) => {
warn!("Continuing without network spends as failed to get spends from the network for {unique_pubkey:?}: {e}");
vec![]
}
};
debug!(
"For {unique_pubkey:?} got {} local spends, {} from network and {} provided",
local_spends.len(),
network_spends.len(),
signed_spends.len()
);
debug!("Local spends {local_spends:?}; from network {network_spends:?}; provided {signed_spends:?}");
let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter());
let unverified_spends =
BTreeSet::from_iter(network_spends.into_iter().chain(signed_spends.into_iter()));
let known_spends = all_verified_spends.clone();
let new_unverified_spends: BTreeSet<_> =
unverified_spends.difference(&known_spends).collect();
let mut tasks = JoinSet::new();
for s in new_unverified_spends.into_iter() {
let self_clone = self.clone();
let spend_clone = s.clone();
let _ = tasks.spawn(async move {
let res = self_clone.network().verify_spend(&spend_clone).await;
(spend_clone, res)
});
}
let mut double_spent_parent = BTreeSet::new();
while let Some(res) = tasks.join_next().await {
match res {
Ok((spend, Ok(()))) => {
info!("Successfully verified {spend:?}");
let _inserted = all_verified_spends.insert(spend.to_owned().clone());
}
Ok((spend, Err(NetworkError::Transfer(TransferError::DoubleSpentParent)))) => {
warn!("Parent of {spend:?} was double spent, keeping aside in case we're a double spend as well");
let _ = double_spent_parent.insert(spend.clone());
}
Ok((spend, Err(e))) => {
warn!("Skipping spend {spend:?} as an error occurred during validation: {e:?}");
}
Err(e) => {
let s =
format!("Async thread error while verifying spend {unique_pubkey}: {e:?}");
error!("{}", s);
return Err(Error::JoinErrorInAsyncThread(s))?;
}
}
}
if !all_verified_spends.is_empty() && !double_spent_parent.is_empty() {
warn!("Parent of {unique_pubkey:?} was double spent, but it's also a double spend. So keeping track of this double spend attempt.");
all_verified_spends.extend(double_spent_parent.into_iter())
}
let all_verified_spends: Vec<_> = all_verified_spends.into_iter().collect();
match all_verified_spends.as_slice() {
[one_spend] => Ok((one_spend.clone(), None)),
[one, two] => Ok((one.clone(), Some(two.clone()))),
[] => {
warn!("Invalid request: none of the spends were valid for {unique_pubkey:?}");
Err(Error::InvalidRequest(format!(
"Found no valid spends while validating Spends for {unique_pubkey:?}"
)))
}
more => {
warn!("Got more than 2 verified spends, this might be a double spend spam attack, making sure to favour live branches (branches with spent descendants)");
let (one, two) = self.verified_spends_select_2_live(more).await?;
Ok((one, Some(two)))
}
}
}
async fn verified_spends_select_2_live(
&self,
many_spends: &[SignedSpend],
) -> Result<(SignedSpend, SignedSpend)> {
let mut tasks = JoinSet::new();
for spend in many_spends {
let descendants: BTreeSet<_> = spend
.spend
.descendants
.keys()
.map(SpendAddress::from_unique_pubkey)
.collect();
for d in descendants {
let self_clone = self.clone();
let spend_clone = spend.to_owned();
let _ = tasks.spawn(async move {
let res = self_clone.network().get_raw_spends(d).await;
(spend_clone, res)
});
}
}
let mut live_spends = BTreeSet::new();
while let Some(res) = tasks.join_next().await {
match res {
Ok((spend, Ok(_descendant))) => {
debug!("Spend {spend:?} has a live descendant");
let _inserted = live_spends.insert(spend);
}
Ok((spend, Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)))) => {
debug!("Spend {spend:?} descendant was not found, continuing...");
}
Ok((spend, Err(e))) => {
warn!(
"Error fetching spend descendant while checking if {spend:?} is live: {e}"
);
}
Err(e) => {
let s = format!("Async thread error while selecting live spends: {e}");
error!("{}", s);
return Err(Error::JoinErrorInAsyncThread(s))?;
}
}
}
let not_live_spends: BTreeSet<_> = many_spends
.iter()
.filter(|s| !live_spends.contains(s))
.collect();
debug!(
"Got {} live spends and {} not live ones, keeping only the favoured 2",
live_spends.len(),
not_live_spends.len()
);
let ordered_spends: Vec<_> = live_spends
.iter()
.chain(not_live_spends.into_iter())
.collect();
match ordered_spends.as_slice() {
[one, two, ..] => Ok((one.to_owned().clone(), two.to_owned().clone())),
_ => Err(Error::InvalidRequest(format!(
"Expected many spends but got {}",
many_spends.len()
))),
}
}
}