#![allow(clippy::mutable_key_type)]
use crate::target_arch::{spawn, Instant};
use crate::{cmd::SwarmCmd, event::NetworkEvent, send_swarm_cmd};
use aes_gcm_siv::{
aead::{Aead, KeyInit, OsRng},
Aes256GcmSiv, Nonce,
};
use libp2p::{
identity::PeerId,
kad::{
store::{Error, RecordStore, Result},
KBucketDistance as Distance, KBucketKey, ProviderRecord, Record, RecordKey as Key,
},
};
#[cfg(feature = "open-metrics")]
use prometheus_client::metrics::gauge::Gauge;
use rand::RngCore;
use sn_protocol::{
storage::{RecordHeader, RecordKind, RecordType},
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::NanoTokens;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
fs,
path::{Path, PathBuf},
vec,
};
use tokio::sync::mpsc;
use xor_name::XorName;
const MAX_RECORDS_COUNT: usize = 2048;
pub struct NodeRecordStore {
local_key: KBucketKey<PeerId>,
config: NodeRecordStoreConfig,
records: HashMap<Key, (NetworkAddress, RecordType)>,
network_event_sender: mpsc::Sender<NetworkEvent>,
swarm_cmd_sender: mpsc::Sender<SwarmCmd>,
distance_range: Option<Distance>,
#[cfg(feature = "open-metrics")]
record_count_metric: Option<Gauge>,
received_payment_count: usize,
encryption_details: (Aes256GcmSiv, [u8; 4]),
}
#[derive(Debug, Clone)]
pub struct NodeRecordStoreConfig {
pub storage_dir: PathBuf,
pub max_records: usize,
pub max_value_bytes: usize,
}
impl Default for NodeRecordStoreConfig {
fn default() -> Self {
Self {
storage_dir: std::env::temp_dir(),
max_records: MAX_RECORDS_COUNT,
max_value_bytes: 65 * 1024,
}
}
}
fn generate_nonce_for_record(nonce_starter: &[u8; 4], key: &Key) -> Nonce {
let mut nonce_bytes = nonce_starter.to_vec();
nonce_bytes.extend_from_slice(key.as_ref());
nonce_bytes.resize(12, 0); Nonce::from_iter(nonce_bytes)
}
impl NodeRecordStore {
pub fn with_config(
local_id: PeerId,
config: NodeRecordStoreConfig,
network_event_sender: mpsc::Sender<NetworkEvent>,
swarm_cmd_sender: mpsc::Sender<SwarmCmd>,
) -> Self {
let key = Aes256GcmSiv::generate_key(&mut OsRng);
let cipher = Aes256GcmSiv::new(&key);
let mut nonce_starter = [0u8; 4];
OsRng.fill_bytes(&mut nonce_starter);
NodeRecordStore {
local_key: KBucketKey::from(local_id),
config,
records: Default::default(),
network_event_sender,
swarm_cmd_sender,
distance_range: None,
#[cfg(feature = "open-metrics")]
record_count_metric: None,
received_payment_count: 0,
encryption_details: (cipher, nonce_starter),
}
}
#[cfg(feature = "open-metrics")]
pub fn set_record_count_metric(mut self, metric: Gauge) -> Self {
self.record_count_metric = Some(metric);
self
}
fn key_to_hex(key: &Key) -> String {
let key_bytes = key.as_ref();
let mut hex_string = String::with_capacity(key_bytes.len() * 2);
for byte in key_bytes {
hex_string.push_str(&format!("{byte:02x}"));
}
hex_string
}
fn get_record_from_bytes<'a>(
bytes: Vec<u8>,
key: &Key,
encryption_details: &(Aes256GcmSiv, [u8; 4]),
) -> Option<Cow<'a, Record>> {
let mut record = Record {
key: key.clone(),
value: bytes,
publisher: None,
expires: None,
};
if !cfg!(feature = "encrypt-records") {
return Some(Cow::Owned(record));
}
let (cipher, nonce_starter) = encryption_details;
let nonce = generate_nonce_for_record(nonce_starter, key);
match cipher.decrypt(&nonce, record.value.as_ref()) {
Ok(value) => {
record.value = value;
return Some(Cow::Owned(record));
}
Err(error) => {
error!("Error while decrypting record. key: {key:?}: {error:?}");
None
}
}
}
fn read_from_disk<'a>(
encryption_details: &(Aes256GcmSiv, [u8; 4]),
key: &Key,
storage_dir: &Path,
) -> Option<Cow<'a, Record>> {
let start = Instant::now();
let filename = Self::key_to_hex(key);
let file_path = storage_dir.join(&filename);
match fs::read(file_path) {
Ok(bytes) => {
info!(
"Retrieved record from disk! filename: {filename} after {:?}",
start.elapsed()
);
Self::get_record_from_bytes(bytes, key, encryption_details)
}
Err(err) => {
error!("Error while reading file. filename: {filename}, error: {err:?}");
None
}
}
}
fn prune_storage_if_needed_for_record(&mut self, r: &Key) -> Result<()> {
let num_records = self.records.len();
if num_records < self.config.max_records {
return Ok(());
}
let furthest = self
.records
.keys()
.max_by_key(|k| {
let kbucket_key = KBucketKey::from(k.to_vec());
self.local_key.distance(&kbucket_key)
})
.cloned();
if let Some(furthest_record) = furthest {
let furthest_record_key = KBucketKey::from(furthest_record.to_vec());
let incoming_record_key = KBucketKey::from(r.to_vec());
if incoming_record_key.distance(&self.local_key)
< furthest_record_key.distance(&self.local_key)
{
trace!(
"{:?} will be pruned to make space for new record: {:?}",
PrettyPrintRecordKey::from(&furthest_record),
PrettyPrintRecordKey::from(r)
);
self.remove(&furthest_record);
if let Some(distance_range) = self.distance_range {
if furthest_record_key.distance(&self.local_key) < distance_range {
warn!("Pruned record would also be within our distance range.");
}
}
} else {
warn!("Record not stored (key: {r:?}). Maximum number of records reached. Current num_records: {num_records}");
return Err(Error::MaxRecords);
}
}
Ok(())
}
}
impl NodeRecordStore {
pub(crate) fn contains(&self, key: &Key) -> bool {
self.records.contains_key(key)
}
pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, RecordType> {
self.records
.iter()
.map(|(_record_key, (addr, record_type))| (addr.clone(), record_type.clone()))
.collect()
}
#[allow(clippy::mutable_key_type)]
pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, RecordType)> {
&self.records
}
pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: RecordType) {
let _ = self.records.insert(
key.clone(),
(NetworkAddress::from_record_key(&key), record_type),
);
}
fn prepare_record_bytes(
record: Record,
encryption_details: (Aes256GcmSiv, [u8; 4]),
) -> Option<Vec<u8>> {
if !cfg!(feature = "encrypt-records") {
return Some(record.value);
}
let (cipher, nonce_starter) = encryption_details;
let nonce = generate_nonce_for_record(&nonce_starter, &record.key);
match cipher.encrypt(&nonce, record.value.as_ref()) {
Ok(value) => Some(value),
Err(error) => {
warn!(
"Failed to encrypt record {:?} : {error:?}",
PrettyPrintRecordKey::from(&record.key),
);
None
}
}
}
pub(crate) fn put_verified(&mut self, r: Record, record_type: RecordType) -> Result<()> {
let record_key = PrettyPrintRecordKey::from(&r.key).into_owned();
trace!("PUT a verified Record: {record_key:?}");
self.prune_storage_if_needed_for_record(&r.key)?;
let filename = Self::key_to_hex(&r.key);
let file_path = self.config.storage_dir.join(&filename);
#[cfg(feature = "open-metrics")]
if let Some(metric) = &self.record_count_metric {
let _ = metric.set(self.records.len() as i64);
}
let encryption_details = self.encryption_details.clone();
let cloned_cmd_sender = self.swarm_cmd_sender.clone();
spawn(async move {
let key = r.key.clone();
if let Some(bytes) = Self::prepare_record_bytes(r, encryption_details) {
let cmd = match fs::write(&file_path, bytes) {
Ok(_) => {
info!("Wrote record {record_key:?} to disk! filename: {filename}");
SwarmCmd::AddLocalRecordAsStored { key, record_type }
}
Err(err) => {
error!(
"Error writing record {record_key:?} filename: {filename}, error: {err:?}"
);
SwarmCmd::RemoveFailedLocalRecord { key }
}
};
send_swarm_cmd(cloned_cmd_sender, cmd);
}
});
Ok(())
}
#[allow(clippy::mutable_key_type)]
pub(crate) fn store_cost(&self) -> NanoTokens {
let stored_records = self.records.len();
let cost = calculate_cost_for_records(stored_records, self.received_payment_count);
info!("Cost is now {cost:?} for {stored_records:?} stored of {MAX_RECORDS_COUNT:?} max, {:?} times got paid.",
self.received_payment_count);
NanoTokens::from(cost)
}
pub(crate) fn payment_received(&mut self) {
self.received_payment_count = self.received_payment_count.saturating_add(1);
}
#[allow(clippy::mutable_key_type)]
pub fn get_records_within_distance_range(
&self,
records: &HashSet<Key>,
distance_range: Distance,
) -> usize {
debug!(
"Total record count is {:?}. Distance is: {distance_range:?}",
self.records.len()
);
let relevant_records_len = records
.iter()
.filter(|key| {
let kbucket_key = KBucketKey::new(key.to_vec());
distance_range >= self.local_key.distance(&kbucket_key)
})
.count();
debug!("Relevant records len is {:?}", relevant_records_len);
relevant_records_len
}
pub(crate) fn set_distance_range(&mut self, distance_range: Distance) {
self.distance_range = Some(distance_range);
}
}
impl RecordStore for NodeRecordStore {
type RecordsIter<'a> = vec::IntoIter<Cow<'a, Record>>;
type ProvidedIter<'a> = vec::IntoIter<Cow<'a, ProviderRecord>>;
fn get(&self, k: &Key) -> Option<Cow<'_, Record>> {
let key = PrettyPrintRecordKey::from(k);
if !self.records.contains_key(k) {
trace!("Record not found locally: {key}");
return None;
}
debug!("GET request for Record key: {key}");
Self::read_from_disk(&self.encryption_details, k, &self.config.storage_dir)
}
fn put(&mut self, record: Record) -> Result<()> {
if record.value.len() >= self.config.max_value_bytes {
warn!(
"Record not stored. Value too large: {} bytes",
record.value.len()
);
return Err(Error::ValueTooLarge);
}
let record_key = PrettyPrintRecordKey::from(&record.key);
match RecordHeader::from_record(&record) {
Ok(record_header) => {
match record_header.kind {
RecordKind::ChunkWithPayment | RecordKind::RegisterWithPayment => {
trace!("Record {record_key:?} with payment shall always be processed.");
}
_ => {
match self.records.get(&record.key) {
Some((_addr, RecordType::Chunk)) => {
trace!("Chunk {record_key:?} already exists.");
return Ok(());
}
Some((_addr, RecordType::NonChunk(existing_content_hash))) => {
let content_hash = XorName::from_content(&record.value);
if content_hash == *existing_content_hash {
trace!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists.");
return Ok(());
}
}
_ => {}
}
}
}
}
Err(err) => {
error!("For record {record_key:?}, failed to parse record_header {err:?}");
return Ok(());
}
}
trace!("Unverified Record {record_key:?} try to validate and store");
let event_sender = self.network_event_sender.clone();
let _handle = spawn(async move {
if let Err(error) = event_sender
.send(NetworkEvent::UnverifiedRecord(record))
.await
{
error!("SwarmDriver failed to send event: {}", error);
}
});
Ok(())
}
fn remove(&mut self, k: &Key) {
let _ = self.records.remove(k);
#[cfg(feature = "open-metrics")]
if let Some(metric) = &self.record_count_metric {
let _ = metric.set(self.records.len() as i64);
}
let filename = Self::key_to_hex(k);
let file_path = self.config.storage_dir.join(&filename);
let _handle = spawn(async move {
match fs::remove_file(file_path) {
Ok(_) => {
info!("Removed record from disk! filename: {filename}");
}
Err(err) => {
error!("Error while removing file. filename: {filename}, error: {err:?}");
}
}
});
}
fn records(&self) -> Self::RecordsIter<'_> {
vec![].into_iter()
}
fn add_provider(&mut self, _record: ProviderRecord) -> Result<()> {
Ok(())
}
fn providers(&self, _key: &Key) -> Vec<ProviderRecord> {
vec![]
}
fn provided(&self) -> Self::ProvidedIter<'_> {
vec![].into_iter()
}
fn remove_provider(&mut self, _key: &Key, _provider: &PeerId) {
}
}
#[derive(Default, Debug)]
pub struct ClientRecordStore {
empty_record_addresses: HashMap<Key, (NetworkAddress, RecordType)>,
}
impl ClientRecordStore {
pub(crate) fn contains(&self, _key: &Key) -> bool {
false
}
pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, RecordType> {
HashMap::new()
}
#[allow(clippy::mutable_key_type)]
pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, RecordType)> {
&self.empty_record_addresses
}
pub(crate) fn put_verified(&mut self, _r: Record, _record_type: RecordType) -> Result<()> {
Ok(())
}
pub(crate) fn mark_as_stored(&mut self, _r: Key, _t: RecordType) {}
pub(crate) fn set_distance_range(&mut self, _distance_range: Distance) {}
}
impl RecordStore for ClientRecordStore {
type RecordsIter<'a> = vec::IntoIter<Cow<'a, Record>>;
type ProvidedIter<'a> = vec::IntoIter<Cow<'a, ProviderRecord>>;
fn get(&self, _k: &Key) -> Option<Cow<'_, Record>> {
None
}
fn put(&mut self, _record: Record) -> Result<()> {
Ok(())
}
fn remove(&mut self, _k: &Key) {}
fn records(&self) -> Self::RecordsIter<'_> {
vec![].into_iter()
}
fn add_provider(&mut self, _record: ProviderRecord) -> Result<()> {
Ok(())
}
fn providers(&self, _key: &Key) -> Vec<ProviderRecord> {
vec![]
}
fn provided(&self) -> Self::ProvidedIter<'_> {
vec![].into_iter()
}
fn remove_provider(&mut self, _key: &Key, _provider: &PeerId) {}
}
fn calculate_cost_for_records(step: usize, received_payment_count: usize) -> u64 {
use std::cmp::max;
let ori_cost = (10 * step) as u64;
let divider = max(1, step / max(1, received_payment_count)) as u64;
max(10, ori_cost / divider)
}
#[allow(trivial_casts)]
#[cfg(test)]
mod tests {
use super::*;
use crate::{close_group_majority, sort_peers_by_key, REPLICATE_RANGE};
use bytes::Bytes;
use eyre::ContextCompat;
use libp2p::{core::multihash::Multihash, kad::RecordKey};
use quickcheck::*;
use sn_protocol::storage::{try_serialize_record, ChunkAddress};
use std::collections::BTreeMap;
use tokio::runtime::Runtime;
use tokio::time::{sleep, Duration};
const MULITHASH_CODE: u64 = 0x12;
#[derive(Clone, Debug)]
struct ArbitraryKey(Key);
#[derive(Clone, Debug)]
struct ArbitraryRecord(Record);
impl Arbitrary for ArbitraryKey {
fn arbitrary(g: &mut Gen) -> ArbitraryKey {
let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
ArbitraryKey(Key::from(
Multihash::<64>::wrap(MULITHASH_CODE, &hash).expect("Failed to gen MultiHash"),
))
}
}
impl Arbitrary for ArbitraryRecord {
fn arbitrary(g: &mut Gen) -> ArbitraryRecord {
let value = match try_serialize_record(
&(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
RecordKind::Chunk,
) {
Ok(value) => value.to_vec(),
Err(err) => panic!("Cannot generate record value {err:?}"),
};
let record = Record {
key: ArbitraryKey::arbitrary(g).0,
value,
publisher: None,
expires: None,
};
ArbitraryRecord(record)
}
}
#[test]
fn put_get_remove_record() {
fn prop(r: ArbitraryRecord) {
let rt = if let Ok(rt) = Runtime::new() {
rt
} else {
panic!("Cannot create runtime");
};
rt.block_on(testing_thread(r));
}
quickcheck(prop as fn(_))
}
async fn testing_thread(r: ArbitraryRecord) {
let r = r.0;
let (network_event_sender, mut network_event_receiver) = mpsc::channel(1);
let (swarm_cmd_sender, _) = mpsc::channel(1);
let mut store = NodeRecordStore::with_config(
PeerId::random(),
Default::default(),
network_event_sender,
swarm_cmd_sender,
);
let store_cost_before = store.store_cost();
assert!(store.put(r.clone()).is_ok());
assert!(store.get(&r.key).is_none());
assert_eq!(
store.store_cost(),
store_cost_before,
"store cost should not change over unverified put"
);
let returned_record = if let Some(event) = network_event_receiver.recv().await {
if let NetworkEvent::UnverifiedRecord(record) = event {
record
} else {
panic!("Unexpected network event {event:?}");
}
} else {
panic!("Failed recevied the record for further verification");
};
let returned_record_key = returned_record.key.clone();
assert!(store
.put_verified(returned_record, RecordType::Chunk)
.is_ok());
store.mark_as_stored(returned_record_key, RecordType::Chunk);
let max_iterations = 10;
let mut iteration = 0;
while iteration < max_iterations {
if store
.get(&r.key)
.is_some_and(|record| Cow::Borrowed(&r) == record)
{
break;
}
sleep(Duration::from_millis(100)).await;
iteration += 1;
}
if iteration == max_iterations {
panic!("record_store test failed with stored record cann't be read back");
}
assert_eq!(
Some(Cow::Borrowed(&r)),
store.get(&r.key),
"record can be retrieved after put"
);
store.remove(&r.key);
assert!(store.get(&r.key).is_none());
}
#[tokio::test]
async fn pruning_on_full() -> Result<()> {
let max_iterations = 10;
let max_records = 50;
let store_config = NodeRecordStoreConfig {
max_records,
..Default::default()
};
let self_id = PeerId::random();
let (network_event_sender, _) = mpsc::channel(1);
let (swarm_cmd_sender, _) = mpsc::channel(1);
let mut store = NodeRecordStore::with_config(
self_id,
store_config.clone(),
network_event_sender,
swarm_cmd_sender,
);
let mut stored_records: Vec<RecordKey> = vec![];
let self_address = NetworkAddress::from_peer(self_id);
for i in 0..100 {
let record_key = NetworkAddress::from_peer(PeerId::random()).to_record_key();
let value = match try_serialize_record(
&(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
RecordKind::Chunk,
) {
Ok(value) => value.to_vec(),
Err(err) => panic!("Cannot generate record value {err:?}"),
};
let record = Record {
key: record_key.clone(),
value,
publisher: None,
expires: None,
};
let retained_key = if i < max_records {
assert!(store.put_verified(record, RecordType::Chunk).is_ok());
store.mark_as_stored(record_key.clone(), RecordType::Chunk);
record_key
} else {
let furthest_key = stored_records.remove(stored_records.len() - 1);
let furthest_addr = NetworkAddress::from_record_key(&furthest_key);
let record_addr = NetworkAddress::from_record_key(&record_key);
let (retained_key, pruned_key) = if self_address.distance(&furthest_addr)
> self_address.distance(&record_addr)
{
assert!(store.put_verified(record, RecordType::Chunk).is_ok());
store.mark_as_stored(record_key.clone(), RecordType::Chunk);
(record_key, furthest_key)
} else {
assert!(store.put_verified(record, RecordType::Chunk).is_err());
(furthest_key, record_key)
};
let mut iteration = 0;
while iteration < max_iterations {
if NodeRecordStore::read_from_disk(
&store.encryption_details,
&pruned_key,
&store_config.storage_dir,
)
.is_none()
{
break;
}
sleep(Duration::from_millis(100)).await;
iteration += 1;
}
if iteration == max_iterations {
panic!("record_store prune test failed with pruned record still exists.");
}
retained_key
};
let mut iteration = 0;
while iteration < max_iterations {
if store.get(&retained_key).is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
iteration += 1;
}
if iteration == max_iterations {
panic!("record_store prune test failed with stored record cann't be read back");
}
stored_records.push(retained_key);
stored_records.sort_by(|a, b| {
let a = NetworkAddress::from_record_key(a);
let b = NetworkAddress::from_record_key(b);
self_address.distance(&a).cmp(&self_address.distance(&b))
});
}
Ok(())
}
#[tokio::test]
#[allow(clippy::mutable_key_type)]
async fn get_records_within_distance_range() -> eyre::Result<()> {
let max_records = 50;
let store_config = NodeRecordStoreConfig {
max_records,
..Default::default()
};
let self_id = PeerId::random();
let (network_event_sender, _) = mpsc::channel(1);
let (swarm_cmd_sender, _) = mpsc::channel(1);
let mut store = NodeRecordStore::with_config(
self_id,
store_config,
network_event_sender,
swarm_cmd_sender,
);
let mut stored_records: Vec<RecordKey> = vec![];
let self_address = NetworkAddress::from_peer(self_id);
for _ in 0..max_records - 1 {
let record_key = NetworkAddress::from_peer(PeerId::random()).to_record_key();
let value = match try_serialize_record(
&(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
RecordKind::Chunk,
) {
Ok(value) => value.to_vec(),
Err(err) => panic!("Cannot generate record value {err:?}"),
};
let record = Record {
key: record_key.clone(),
value,
publisher: None,
expires: None,
};
assert!(store.put_verified(record, RecordType::Chunk).is_ok());
store.mark_as_stored(record_key.clone(), RecordType::Chunk);
stored_records.push(record_key);
stored_records.sort_by(|a, b| {
let a = NetworkAddress::from_record_key(a);
let b = NetworkAddress::from_record_key(b);
self_address.distance(&a).cmp(&self_address.distance(&b))
});
}
let halfway_record_address = NetworkAddress::from_record_key(
stored_records
.get((stored_records.len() / 2) - 1)
.wrap_err("Could not parse record store key")?,
);
let distance = self_address.distance(&halfway_record_address);
store.set_distance_range(distance);
let record_keys: HashSet<_> = store.records.keys().cloned().collect();
assert_eq!(
store.get_records_within_distance_range(&record_keys, distance),
stored_records.len() / 2
);
Ok(())
}
#[test]
fn address_distribution_sim() {
let mut peers: HashMap<PeerId, (usize, u64, usize)> = Default::default();
let mut peers_vec = vec![];
let num_of_peers = 2000;
let num_of_chunks_per_itr = 2000;
for _ in 0..num_of_peers {
let peer_id = PeerId::random();
let _ = peers.insert(peer_id, (0, 0, 0));
peers_vec.push(peer_id);
}
let mut iteration = 0;
let mut total_received_payment_count = 0;
loop {
for _ in 0..num_of_chunks_per_itr {
let name = xor_name::rand::random();
let address = NetworkAddress::from_chunk_address(ChunkAddress::new(name));
match sort_peers_by_key(&peers_vec, &address.as_kbucket_key(), REPLICATE_RANGE) {
Ok(peers_in_replicate_range) => {
let peers_in_replicate_range: Vec<PeerId> = peers_in_replicate_range
.iter()
.map(|peer_id| **peer_id)
.collect();
let peers_in_close: Vec<PeerId> = match sort_peers_by_key(
&peers_in_replicate_range,
&address.as_kbucket_key(),
close_group_majority(),
) {
Ok(peers_in_close) => {
peers_in_close.iter().map(|peer_id| **peer_id).collect()
}
Err(err) => {
panic!("Cann't find close range of {name:?} with error {err:?}")
}
};
let payee = pick_cheapest_payee(&peers_in_close, &peers);
for peer in peers_in_replicate_range.iter() {
let entry = peers.entry(*peer).or_insert((0, 0, 0));
if *peer == payee {
let cost = calculate_cost_for_records(entry.0, entry.2);
entry.1 += cost;
entry.2 += 1;
}
entry.0 += 1;
}
}
Err(err) => {
panic!("Cann't find replicate range of {name:?} with error {err:?}")
}
}
}
let mut received_payment_count = 0;
let mut empty_earned_nodes = 0;
let mut min_earned = u64::MAX;
let mut min_store_cost = u64::MAX;
let mut max_earned = 0;
let mut max_store_cost = 0;
for (_peer_id, stats) in peers.iter() {
let cost = calculate_cost_for_records(stats.0, stats.2);
received_payment_count += stats.2;
if stats.1 == 0 {
empty_earned_nodes += 1;
}
if stats.1 < min_earned {
min_earned = stats.1;
}
if stats.1 > max_earned {
max_earned = stats.1;
}
if cost < min_store_cost {
min_store_cost = cost;
}
if cost > max_store_cost {
max_store_cost = cost;
}
}
total_received_payment_count += num_of_chunks_per_itr;
assert_eq!(total_received_payment_count, received_payment_count);
println!("After the completion of {iteration} with {num_of_chunks_per_itr} chunks, there is still {empty_earned_nodes} nodes earned nothing");
println!("\t\t with storecost variation of (min {min_store_cost} - max {max_store_cost}), and earned variation of (min {min_earned} - max {max_earned})");
iteration += 1;
if iteration == 50 {
assert_eq!(0, empty_earned_nodes, "every node has earnt _something_");
assert!(
(max_store_cost / min_store_cost) < 100,
"store cost is balanced"
);
assert!(
(max_earned / min_earned) < 1000,
"earning distribution is well balanced"
);
break;
}
}
}
#[allow(dead_code)]
fn log_chunks_distribution(peers: &HashMap<PeerId, (usize, u64, usize)>) {
let mut distribution_map: BTreeMap<u8, (usize, usize)> = Default::default();
for (peer_id, stats) in peers.iter() {
let leading_byte = NetworkAddress::from_peer(*peer_id)
.as_kbucket_key()
.hashed_bytes()[0];
let entry = distribution_map.entry(leading_byte).or_insert((0, 0));
entry.0 += 1;
entry.1 += stats.2;
}
for (leading_byte, stats) in distribution_map.iter() {
println!("{leading_byte:08b}\t{}\t{} ", stats.0, stats.1)
}
}
fn pick_cheapest_payee(
peers_in_close: &Vec<PeerId>,
peers: &HashMap<PeerId, (usize, u64, usize)>,
) -> PeerId {
let mut payee = None;
let mut cheapest_cost = u64::MAX;
for peer in peers_in_close {
if let Some(stats) = peers.get(peer) {
let store_cost = calculate_cost_for_records(stats.0, stats.2);
if store_cost < cheapest_cost {
cheapest_cost = store_cost;
payee = Some(*peer);
}
} else {
panic!("Cannot find stats of {peer:?}");
}
}
if let Some(peer_id) = payee {
peer_id
} else {
panic!("Cannot find cheapest payee among {peers_in_close:?}");
}
}
}