use crate::{error::Result, node::Node};
use libp2p::{
kad::{Quorum, Record, RecordKey},
PeerId,
};
use sn_networking::{sort_peers_by_address, GetRecordCfg, Network, REPLICATION_PEERS_COUNT};
use sn_protocol::{
messages::{Cmd, Query, QueryResponse, Request, Response},
storage::RecordType,
NetworkAddress, PrettyPrintRecordKey,
};
use tokio::task::spawn;
impl Node {
pub(crate) fn try_interval_replication(network: Network) {
network.trigger_interval_replication()
}
pub(crate) fn trigger_irrelevant_record_cleanup(network: Network) {
network.trigger_irrelevant_record_cleanup()
}
pub(crate) fn fetch_replication_keys_without_wait(
&self,
keys_to_fetch: Vec<(PeerId, RecordKey)>,
) -> Result<()> {
for (holder, key) in keys_to_fetch {
let node = self.clone();
let requester = NetworkAddress::from_peer(self.network().peer_id());
let _handle = spawn(async move {
let pretty_key = PrettyPrintRecordKey::from(&key).into_owned();
debug!("Fetching record {pretty_key:?} from node {holder:?}");
let req = Request::Query(Query::GetReplicatedRecord {
requester,
key: NetworkAddress::from_record_key(&key),
});
let record_opt = if let Ok(resp) = node.network().send_request(req, holder).await {
match resp {
Response::Query(QueryResponse::GetReplicatedRecord(result)) => match result
{
Ok((_holder, record_content)) => Some(record_content),
Err(err) => {
debug!("Failed fetch record {pretty_key:?} from node {holder:?}, with error {err:?}");
None
}
},
other => {
debug!("Cannot fetch record {pretty_key:?} from node {holder:?}, with response {other:?}");
None
}
}
} else {
None
};
let record = if let Some(record_content) = record_opt {
Record::new(key, record_content.to_vec())
} else {
debug!(
"Can not fetch record {pretty_key:?} from node {holder:?}, fetching from the network"
);
let get_cfg = GetRecordCfg {
get_quorum: Quorum::One,
retry_strategy: None,
target_record: None,
expected_holders: Default::default(),
is_register: false,
};
match node.network().get_record_from_network(key, &get_cfg).await {
Ok(record) => record,
Err(err) => {
error!("During replication fetch of {pretty_key:?}, failed in re-attempt of get from network {err:?}");
return;
}
}
};
debug!(
"Got Replication Record {pretty_key:?} from network, validating and storing it"
);
if let Err(err) = node.store_replicated_in_record(record).await {
error!("During store replication fetched {pretty_key:?}, got error {err:?}");
} else {
debug!("Completed storing Replication Record {pretty_key:?} from network.");
}
});
}
Ok(())
}
pub(crate) fn replicate_valid_fresh_record(
&self,
paid_key: RecordKey,
record_type: RecordType,
) {
let network = self.network().clone();
let _handle = spawn(async move {
let start = std::time::Instant::now();
let pretty_key = PrettyPrintRecordKey::from(&paid_key);
let mut retry_count = 0;
debug!("Checking we have successfully stored the fresh record {pretty_key:?} in the store before replicating");
loop {
let record = match network.get_local_record(&paid_key).await {
Ok(record) => record,
Err(err) => {
error!(
"Replicating fresh record {pretty_key:?} get_record_from_store errored: {err:?}"
);
None
}
};
if record.is_some() {
break;
}
if retry_count > 10 {
error!(
"Could not get record from store for replication: {pretty_key:?} after 10 retries"
);
return;
}
retry_count += 1;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
debug!("Start replication of fresh record {pretty_key:?} from store");
let mut closest_k_peers = match network.get_closest_k_value_local_peers().await {
Ok(peers) => peers,
Err(err) => {
error!("Replicating fresh record {pretty_key:?} get_closest_local_peers errored: {err:?}");
return;
}
};
closest_k_peers.retain(|peer_id| peer_id != &network.peer_id());
let data_addr = NetworkAddress::from_record_key(&paid_key);
let sorted_based_on_addr = match sort_peers_by_address(
&closest_k_peers,
&data_addr,
REPLICATION_PEERS_COUNT,
) {
Ok(result) => result,
Err(err) => {
error!(
"When replicating fresh record {pretty_key:?}, having error when sort {err:?}"
);
return;
}
};
let our_peer_id = network.peer_id();
let our_address = NetworkAddress::from_peer(our_peer_id);
let keys = vec![(data_addr.clone(), record_type.clone())];
for peer_id in sorted_based_on_addr {
debug!("Replicating fresh record {pretty_key:?} to {peer_id:?}");
let request = Request::Cmd(Cmd::Replicate {
holder: our_address.clone(),
keys: keys.clone(),
});
network.send_req_ignore_reply(request, *peer_id);
}
debug!(
"Completed replicate fresh record {pretty_key:?} on store, in {:?}",
start.elapsed()
);
});
}
}