ant_networking/
record_store.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8#![allow(clippy::mutable_key_type)] // for the Bytes in NetworkAddress
9
10use crate::cmd::LocalSwarmCmd;
11use crate::network_builder::MAX_PACKET_SIZE;
12use crate::send_local_swarm_cmd;
13use crate::time::{spawn, Instant};
14use crate::{event::NetworkEvent, log_markers::Marker};
15use aes_gcm_siv::{
16    aead::{Aead, KeyInit},
17    Aes256GcmSiv, Key as AesKey, Nonce,
18};
19use ant_evm::QuotingMetrics;
20use ant_protocol::{
21    storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
22    NetworkAddress, PrettyPrintRecordKey,
23};
24use hkdf::Hkdf;
25use itertools::Itertools;
26use libp2p::{
27    identity::PeerId,
28    kad::{
29        store::{Error, RecordStore, Result},
30        KBucketDistance as Distance, ProviderRecord, Record, RecordKey as Key,
31    },
32};
33#[cfg(feature = "open-metrics")]
34use prometheus_client::metrics::gauge::Gauge;
35use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
36use serde::{Deserialize, Serialize};
37use sha2::Sha256;
38use std::{
39    borrow::Cow,
40    collections::{BTreeMap, HashMap},
41    fs,
42    path::{Path, PathBuf},
43    time::SystemTime,
44    vec,
45};
46use tokio::{sync::mpsc, time::Duration};
47use walkdir::{DirEntry, WalkDir};
48use xor_name::XorName;
49
50// A GraphEntry record is at the size of 4KB roughly.
51// Given chunk record is maxed at size of 4MB.
52// During Beta phase, it's almost one GraphEntry per chunk,
53// which makes the average record size is around 2MB.
54// Given we are targeting node size to be 32GB,
55// this shall allow around 16K records.
56const MAX_RECORDS_COUNT: usize = 16 * 1024;
57
58/// The maximum number of records to cache in memory.
59const MAX_RECORDS_CACHE_SIZE: usize = 25;
60
61/// File name of the recorded historical quoting metrics.
62const HISTORICAL_QUOTING_METRICS_FILENAME: &str = "historic_quoting_metrics";
63
64/// Defines when the entries inside the cache shall be pruned to free space up.
65/// Shall be two times of the PERIODIC_REPLICATION_INTERVAL_MAX_S
66const CACHE_TIMEOUT: Duration = Duration::from_secs(360);
67
68fn derive_aes256gcm_siv_from_seed(seed: &[u8; 16]) -> (Aes256GcmSiv, [u8; 4]) {
69    // shall be unique for purpose.
70    let salt = b"autonomi_record_store";
71
72    let hk = Hkdf::<Sha256>::new(Some(salt), seed);
73
74    let mut okm = [0u8; 32];
75    hk.expand(b"", &mut okm)
76        .expect("32 bytes is a valid length for HKDF output");
77
78    let seeded_key = AesKey::<Aes256GcmSiv>::from_slice(&okm);
79
80    let mut nonce_starter = [0u8; 4];
81    let bytes_to_copy = seed.len().min(nonce_starter.len());
82    nonce_starter[..bytes_to_copy].copy_from_slice(&seed[..bytes_to_copy]);
83
84    trace!("seeded_key is {seeded_key:?}  nonce_starter is {nonce_starter:?}");
85
86    (Aes256GcmSiv::new(seeded_key), nonce_starter)
87}
88
89/// FIFO simple cache of records to reduce read times
90struct RecordCache {
91    records_cache: HashMap<Key, (Record, SystemTime)>,
92    cache_size: usize,
93    cache_timeout: Duration,
94}
95
96impl RecordCache {
97    fn new(cache_size: usize, cache_timeout: Duration) -> Self {
98        RecordCache {
99            records_cache: HashMap::new(),
100            cache_size,
101            cache_timeout,
102        }
103    }
104
105    fn remove(&mut self, key: &Key) -> Option<(Record, SystemTime)> {
106        self.records_cache.remove(key)
107    }
108
109    fn get(&self, key: &Key) -> Option<&(Record, SystemTime)> {
110        self.records_cache.get(key)
111    }
112
113    fn push_back(&mut self, key: Key, record: Record) {
114        self.free_up_space();
115
116        let _ = self
117            .records_cache
118            .insert(key, (record, SystemTime::now() + self.cache_timeout));
119    }
120
121    fn free_up_space(&mut self) {
122        let current = SystemTime::now();
123        // Remove outdated entries first
124        self.records_cache
125            .retain(|_key, (_record, timestamp)| *timestamp > current);
126
127        while self.records_cache.len() >= self.cache_size {
128            self.remove_oldest_entry()
129        }
130    }
131
132    fn remove_oldest_entry(&mut self) {
133        let mut oldest_timestamp = SystemTime::now() + self.cache_timeout;
134        let mut key_to_remove = None;
135
136        for (key, (_record, timestamp)) in self.records_cache.iter() {
137            if *timestamp < oldest_timestamp {
138                oldest_timestamp = *timestamp;
139                key_to_remove = Some(key.clone());
140            }
141        }
142
143        if let Some(key) = key_to_remove {
144            let _ = self.records_cache.remove(&key);
145        }
146    }
147}
148
149/// A `RecordStore` that stores records on disk.
150pub struct NodeRecordStore {
151    /// The address of the peer owning the store
152    local_address: NetworkAddress,
153    /// The configuration of the store.
154    config: NodeRecordStoreConfig,
155    /// Main records store remains unchanged for compatibility
156    records: HashMap<Key, (NetworkAddress, ValidationType, DataTypes)>,
157    /// Additional index organizing records by distance
158    records_by_distance: BTreeMap<Distance, Key>,
159    /// FIFO simple cache of records to reduce read times
160    records_cache: RecordCache,
161    /// Send network events to the node layer.
162    network_event_sender: mpsc::Sender<NetworkEvent>,
163    /// Send cmds to the network layer. Used to interact with self in an async fashion.
164    local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
165    /// ilog2 distance range of responsible records
166    /// AKA: how many buckets of data do we consider "close"
167    /// None means accept all records.
168    responsible_distance_range: Option<Distance>,
169    #[cfg(feature = "open-metrics")]
170    /// Used to report the number of records held by the store to the metrics server.
171    record_count_metric: Option<Gauge>,
172    /// Counting how many times got paid
173    received_payment_count: usize,
174    /// Encyption cipher for the records, randomly generated at node startup
175    /// Plus a 4 byte nonce starter
176    encryption_details: (Aes256GcmSiv, [u8; 4]),
177    /// Time that this record_store got started
178    timestamp: SystemTime,
179    /// Farthest record to self
180    farthest_record: Option<(Key, Distance)>,
181}
182
183/// Configuration for a `DiskBackedRecordStore`.
184#[derive(Debug, Clone)]
185pub struct NodeRecordStoreConfig {
186    /// The directory where the records are stored.
187    pub storage_dir: PathBuf,
188    /// The directory where the historic quote to be stored
189    /// (normally to be the parent dir of the storage_dir)
190    pub historic_quote_dir: PathBuf,
191    /// The maximum number of records.
192    pub max_records: usize,
193    /// The maximum size of record values, in bytes.
194    pub max_value_bytes: usize,
195    /// The maximum number of records to cache in memory.
196    pub records_cache_size: usize,
197    /// The seed to generate record_store encryption_details
198    pub encryption_seed: [u8; 16],
199}
200
201impl Default for NodeRecordStoreConfig {
202    fn default() -> Self {
203        let historic_quote_dir = std::env::temp_dir();
204        Self {
205            storage_dir: historic_quote_dir.clone(),
206            historic_quote_dir,
207            max_records: MAX_RECORDS_COUNT,
208            max_value_bytes: MAX_PACKET_SIZE,
209            records_cache_size: MAX_RECORDS_CACHE_SIZE,
210            encryption_seed: [0u8; 16],
211        }
212    }
213}
214
215/// Generate an encryption nonce for a given record key and nonce_starter bytes.
216fn generate_nonce_for_record(nonce_starter: &[u8; 4], key: &Key) -> Nonce {
217    let mut nonce_bytes = nonce_starter.to_vec();
218    nonce_bytes.extend_from_slice(key.as_ref());
219    // Ensure the final nonce is exactly 96 bits long by padding or truncating as necessary
220    // https://crypto.stackexchange.com/questions/26790/how-bad-it-is-using-the-same-iv-twice-with-aes-gcm
221    nonce_bytes.resize(12, 0); // 12 (u8) * 8 = 96 bits
222    Nonce::from_iter(nonce_bytes)
223}
224
225#[derive(Clone, Serialize, Deserialize)]
226struct HistoricQuotingMetrics {
227    received_payment_count: usize,
228    timestamp: SystemTime,
229}
230
231impl NodeRecordStore {
232    /// If a directory for our node already exists, repopulate the records from the files in the dir
233    fn update_records_from_an_existing_store(
234        config: &NodeRecordStoreConfig,
235        encryption_details: &(Aes256GcmSiv, [u8; 4]),
236    ) -> HashMap<Key, (NetworkAddress, ValidationType, DataTypes)> {
237        let process_entry = |entry: &DirEntry| -> _ {
238            let path = entry.path();
239            if path.is_file() {
240                debug!("Existing record found: {path:?}");
241                // if we've got a file, lets try and read it
242                let filename = match path.file_name().and_then(|n| n.to_str()) {
243                    Some(file_name) => file_name,
244                    None => {
245                        // warn and remove this file as it's not a valid record
246                        warn!(
247                            "Found a file in the storage dir that is not a valid record: {:?}",
248                            path
249                        );
250                        if let Err(e) = fs::remove_file(path) {
251                            warn!(
252                                "Failed to remove invalid record file from storage dir: {:?}",
253                                e
254                            );
255                        }
256                        return None;
257                    }
258                };
259                // get the record key from the filename
260                let key = Self::get_data_from_filename(filename)?;
261                let record = match fs::read(path) {
262                    Ok(bytes) => {
263                        // and the stored record
264                        if let Some(record) =
265                            Self::get_record_from_bytes(bytes, &key, encryption_details)
266                        {
267                            record
268                        } else {
269                            // This will be due to node restart, result in different encrypt_detail.
270                            // Hence need to clean up the old copy.
271                            info!("Failed to decrypt record from file {filename:?}, clean it up.");
272                            if let Err(e) = fs::remove_file(path) {
273                                warn!(
274                                    "Failed to remove outdated record file {filename:?} from storage dir: {:?}",
275                                    e
276                                );
277                            }
278                            return None;
279                        }
280                    }
281                    Err(err) => {
282                        error!("Error while reading file. filename: {filename}, error: {err:?}");
283                        return None;
284                    }
285                };
286
287                match RecordHeader::get_data_type(&record) {
288                    Ok(data_type) => {
289                        let validate_type = match data_type {
290                            DataTypes::Chunk => ValidationType::Chunk,
291                            _ => {
292                                let xorname_hash = XorName::from_content(&record.value);
293                                ValidationType::NonChunk(xorname_hash)
294                            }
295                        };
296
297                        let address = NetworkAddress::from(&key);
298                        info!("Existing record {address:?} loaded from: {path:?}");
299                        return Some((key, (address, validate_type, data_type)));
300                    }
301                    Err(error) => {
302                        warn!(
303                            "Failed to parse record type of record {filename:?}: {:?}",
304                            error
305                        );
306                        // In correct decryption using different key could result in this.
307                        // In that case, a cleanup shall be carried out.
308                        if let Err(e) = fs::remove_file(path) {
309                            warn!(
310                                "Failed to remove invalid record file {filename:?} from storage dir: {:?}",
311                                e
312                            );
313                        }
314                        return None;
315                    }
316                }
317            }
318            None
319        };
320
321        info!("Attempting to repopulate records from existing store...");
322        let records = WalkDir::new(&config.storage_dir)
323            .into_iter()
324            .filter_map(|e| e.ok())
325            .collect_vec()
326            .par_iter()
327            .filter_map(process_entry)
328            .collect();
329        records
330    }
331
332    /// If quote_metrics file already exists, using the existing parameters.
333    fn restore_quoting_metrics(storage_dir: &Path) -> Option<HistoricQuotingMetrics> {
334        let file_path = storage_dir.join(HISTORICAL_QUOTING_METRICS_FILENAME);
335
336        if let Ok(file) = fs::File::open(file_path) {
337            if let Ok(quoting_metrics) = rmp_serde::from_read(&file) {
338                return Some(quoting_metrics);
339            }
340        }
341
342        None
343    }
344
345    fn flush_historic_quoting_metrics(&self) {
346        let file_path = self
347            .config
348            .historic_quote_dir
349            .join(HISTORICAL_QUOTING_METRICS_FILENAME);
350
351        let historic_quoting_metrics = HistoricQuotingMetrics {
352            received_payment_count: self.received_payment_count,
353            timestamp: self.timestamp,
354        };
355
356        spawn(async move {
357            if let Ok(mut file) = fs::File::create(file_path) {
358                let mut serialiser = rmp_serde::encode::Serializer::new(&mut file);
359                let _ = historic_quoting_metrics.serialize(&mut serialiser);
360            }
361        });
362    }
363
364    /// Creates a new `DiskBackedStore` with the given configuration.
365    pub fn with_config(
366        local_id: PeerId,
367        config: NodeRecordStoreConfig,
368        network_event_sender: mpsc::Sender<NetworkEvent>,
369        swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
370        #[cfg(feature = "open-metrics")] record_count_metric: Option<Gauge>,
371    ) -> Self {
372        info!("Using encryption_seed of {:?}", config.encryption_seed);
373        let encryption_details = derive_aes256gcm_siv_from_seed(&config.encryption_seed);
374
375        // Recover the quoting_metrics first, as the historical file will be cleaned by
376        // the later on update_records_from_an_existing_store function
377        let (received_payment_count, timestamp) = if let Some(historic_quoting_metrics) =
378            Self::restore_quoting_metrics(&config.historic_quote_dir)
379        {
380            (
381                historic_quoting_metrics.received_payment_count,
382                historic_quoting_metrics.timestamp,
383            )
384        } else {
385            (0, SystemTime::now())
386        };
387
388        let records = Self::update_records_from_an_existing_store(&config, &encryption_details);
389        let local_address = NetworkAddress::from(local_id);
390
391        // Initialize records_by_distance
392        let mut records_by_distance: BTreeMap<Distance, Key> = BTreeMap::new();
393        for (key, (addr, _record_type, _data_type)) in records.iter() {
394            let distance = &local_address.distance(addr);
395            let _ = records_by_distance.insert(*distance, key.clone());
396        }
397
398        let cache_size = config.records_cache_size;
399        let mut record_store = NodeRecordStore {
400            local_address,
401            config,
402            records,
403            records_by_distance,
404            records_cache: RecordCache::new(cache_size, CACHE_TIMEOUT),
405            network_event_sender,
406            local_swarm_cmd_sender: swarm_cmd_sender,
407            responsible_distance_range: None,
408            #[cfg(feature = "open-metrics")]
409            record_count_metric,
410            received_payment_count,
411            encryption_details,
412            timestamp,
413            farthest_record: None,
414        };
415
416        record_store.farthest_record = record_store.calculate_farthest();
417
418        record_store.flush_historic_quoting_metrics();
419
420        #[cfg(feature = "open-metrics")]
421        if let Some(metric) = &record_store.record_count_metric {
422            let _ = metric.set(record_store.records.len() as i64);
423        }
424
425        record_store
426    }
427
428    /// Returns the current responsible distance range.
429    pub(crate) fn get_responsible_distance_range(&self) -> Option<Distance> {
430        self.responsible_distance_range
431    }
432
433    // Converts a Key into a Hex string.
434    fn generate_filename(key: &Key) -> String {
435        hex::encode(key.as_ref())
436    }
437
438    // Converts a Hex string back into a Key.
439    fn get_data_from_filename(hex_str: &str) -> Option<Key> {
440        match hex::decode(hex_str) {
441            Ok(bytes) => Some(Key::from(bytes)),
442            Err(error) => {
443                error!("Error decoding hex string: {:?}", error);
444                None
445            }
446        }
447    }
448
449    /// Upon read perform any data transformations required to return a `Record`.
450    fn get_record_from_bytes<'a>(
451        bytes: Vec<u8>,
452        key: &Key,
453        encryption_details: &(Aes256GcmSiv, [u8; 4]),
454    ) -> Option<Cow<'a, Record>> {
455        let (cipher, nonce_starter) = encryption_details;
456        let nonce = generate_nonce_for_record(nonce_starter, key);
457
458        match cipher.decrypt(&nonce, bytes.as_slice()) {
459            Ok(value) => {
460                let record = Record {
461                    key: key.clone(),
462                    value,
463                    publisher: None,
464                    expires: None,
465                };
466                Some(Cow::Owned(record))
467            }
468            Err(error) => {
469                error!("Error while decrypting record. key: {key:?}: {error:?}");
470                None
471            }
472        }
473    }
474
475    fn read_from_disk<'a>(
476        encryption_details: &(Aes256GcmSiv, [u8; 4]),
477        key: &Key,
478        storage_dir: &Path,
479    ) -> Option<Cow<'a, Record>> {
480        let start = Instant::now();
481        let filename = Self::generate_filename(key);
482
483        let file_path = storage_dir.join(&filename);
484
485        // we should only be reading if we know the record is written to disk properly
486        match fs::read(file_path) {
487            Ok(bytes) => {
488                // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues):
489                info!(
490                    "Retrieved record from disk! filename: {filename} after {:?}",
491                    start.elapsed()
492                );
493
494                Self::get_record_from_bytes(bytes, key, encryption_details)
495            }
496            Err(err) => {
497                error!("Error while reading file. filename: {filename}, error: {err:?}");
498                None
499            }
500        }
501    }
502
503    // Returns the farthest record_key to self.
504    pub fn get_farthest(&self) -> Option<Key> {
505        if let Some((ref key, _distance)) = self.farthest_record {
506            Some(key.clone())
507        } else {
508            None
509        }
510    }
511
512    // Calculates the farthest record_key to self.
513    fn calculate_farthest(&self) -> Option<(Key, Distance)> {
514        // sort records by distance to our local key
515        let mut sorted_records: Vec<_> = self.records.keys().collect();
516        sorted_records.sort_by_key(|key| {
517            let addr = NetworkAddress::from(*key);
518            self.local_address.distance(&addr)
519        });
520
521        if let Some(key) = sorted_records.last() {
522            let addr = NetworkAddress::from(*key);
523            Some(((*key).clone(), self.local_address.distance(&addr)))
524        } else {
525            None
526        }
527    }
528
529    /// Prune the records in the store to ensure that we free up space
530    /// for the incoming record.
531    /// Returns Ok if the record can be stored because it is closer to the local peer
532    /// or we are not full.
533    ///
534    /// Err MaxRecords if we cannot store as it's farther than the farthest data we have
535    fn prune_records_if_needed(&mut self, incoming_record_key: &Key) -> Result<()> {
536        // we're not full, so we don't need to prune
537        if self.records.len() < self.config.max_records {
538            return Ok(());
539        }
540
541        if let Some((farthest_record, farthest_record_distance)) = self.farthest_record.clone() {
542            // if the incoming record is farther than the farthest record, we can't store it
543            if farthest_record_distance
544                < self
545                    .local_address
546                    .distance(&NetworkAddress::from(incoming_record_key))
547            {
548                return Err(Error::MaxRecords);
549            }
550
551            info!(
552                "Record {:?} will be pruned to free up space for new records",
553                PrettyPrintRecordKey::from(&farthest_record)
554            );
555            self.remove(&farthest_record);
556        }
557
558        Ok(())
559    }
560
561    // When the accumulated record copies exceeds the `expotional pricing point` (max_records * 0.1)
562    // those `out of range` records shall be cleaned up.
563    // This is to avoid :
564    //   * holding too many irrelevant record, which occupies disk space
565    //   * `over-quoting` during restart, when RT is not fully populated,
566    //     result in mis-calculation of relevant records.
567    pub fn cleanup_irrelevant_records(&mut self) {
568        let accumulated_records = self.records.len();
569        if accumulated_records < MAX_RECORDS_COUNT / 10 {
570            return;
571        }
572
573        let responsible_distance = if let Some(distance) = self.responsible_distance_range {
574            distance
575        } else {
576            return;
577        };
578
579        // Collect keys to remove from buckets beyond our range
580        let keys_to_remove: Vec<Key> = self
581            .records_by_distance
582            .range(responsible_distance..)
583            .map(|(_distance, key)| key.clone())
584            .collect();
585
586        let keys_to_remove_len = keys_to_remove.len();
587
588        // Remove collected keys
589        for key in keys_to_remove {
590            self.remove(&key);
591        }
592
593        info!("Cleaned up {} unrelevant records, among the original {accumulated_records} accumulated_records",
594        keys_to_remove_len);
595    }
596}
597
598impl NodeRecordStore {
599    /// Returns `true` if the `Key` is present locally
600    pub(crate) fn contains(&self, key: &Key) -> bool {
601        self.records.contains_key(key)
602    }
603
604    /// Returns the set of `NetworkAddress::RecordKey` held by the store
605    /// Use `record_addresses_ref` to get a borrowed type
606    pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, ValidationType> {
607        self.records
608            .iter()
609            .map(|(_record_key, (addr, record_type, _data_type))| {
610                (addr.clone(), record_type.clone())
611            })
612            .collect()
613    }
614
615    /// Returns the reference to the set of `NetworkAddress::RecordKey` held by the store
616    pub(crate) fn record_addresses_ref(
617        &self,
618    ) -> &HashMap<Key, (NetworkAddress, ValidationType, DataTypes)> {
619        &self.records
620    }
621
622    /// The follow up to `put_verified`, this only registers the RecordKey
623    /// in the RecordStore records set. After this it should be safe
624    /// to return the record as stored.
625    pub(crate) fn mark_as_stored(
626        &mut self,
627        key: Key,
628        validate_type: ValidationType,
629        data_type: DataTypes,
630    ) {
631        let addr = NetworkAddress::from(&key);
632        let distance = self.local_address.distance(&addr);
633
634        // Update main records store
635        self.records
636            .insert(key.clone(), (addr.clone(), validate_type, data_type));
637
638        #[cfg(feature = "open-metrics")]
639        if let Some(metric) = &self.record_count_metric {
640            let _ = metric.set(self.records.len() as i64);
641        }
642
643        // Update bucket index
644        let _ = self.records_by_distance.insert(distance, key.clone());
645
646        // Update farthest record if needed (unchanged)
647        if let Some((_farthest_record, farthest_record_distance)) = self.farthest_record.clone() {
648            if distance > farthest_record_distance {
649                self.farthest_record = Some((key, distance));
650            }
651        } else {
652            self.farthest_record = Some((key, distance));
653        }
654    }
655
656    /// Prepare record bytes for storage
657    /// This will encrypt the record for storage
658    fn prepare_record_bytes(
659        record: Record,
660        encryption_details: (Aes256GcmSiv, [u8; 4]),
661    ) -> Option<Vec<u8>> {
662        let (cipher, nonce_starter) = encryption_details;
663        let nonce = generate_nonce_for_record(&nonce_starter, &record.key);
664
665        match cipher.encrypt(&nonce, record.value.as_ref()) {
666            Ok(value) => Some(value),
667            Err(error) => {
668                warn!(
669                    "Failed to encrypt record {:?} : {error:?}",
670                    PrettyPrintRecordKey::from(&record.key),
671                );
672                None
673            }
674        }
675    }
676
677    /// Warning: Write's a `Record` to disk without validation
678    /// Should be used in context where the `Record` is trusted
679    ///
680    /// The record is marked as written to disk once `mark_as_stored` is called,
681    /// this avoids us returning half-written data or registering it as stored before it is.
682    pub(crate) fn put_verified(
683        &mut self,
684        r: Record,
685        record_type: ValidationType,
686        is_client_put: bool,
687    ) -> Result<()> {
688        let key = &r.key;
689        let record_key = PrettyPrintRecordKey::from(&r.key).into_owned();
690        debug!("PUTting a verified Record: {record_key:?}");
691
692        // if cache already has the record :
693        //   * if with same content, do nothing and return early
694        //   * if with different content, remove the existing one
695        if let Some((existing_record, _timestamp)) = self.records_cache.remove(key) {
696            if existing_record.value == r.value {
697                // we actually just want to keep what we have, and can assume it's been stored properly.
698
699                // so we put it back in the cache
700                self.records_cache.push_back(key.clone(), existing_record);
701                // and exit early.
702                return Ok(());
703            }
704        }
705
706        // Only cash the record that put by client. For a quick response to the ChunkProof check.
707        if is_client_put {
708            self.records_cache.push_back(key.clone(), r.clone());
709        }
710
711        self.prune_records_if_needed(key)?;
712
713        let filename = Self::generate_filename(key);
714        let file_path = self.config.storage_dir.join(&filename);
715
716        let encryption_details = self.encryption_details.clone();
717        let cloned_cmd_sender = self.local_swarm_cmd_sender.clone();
718
719        let record_key2 = record_key.clone();
720        spawn(async move {
721            let key = r.key.clone();
722            let data_type = match RecordHeader::get_data_type(&r) {
723                Ok(data_type) => data_type,
724                Err(err) => {
725                    error!(
726                        "Error get data_type of record {record_key2:?} filename: {filename}, error: {err:?}"
727                    );
728                    return;
729                }
730            };
731            if let Some(bytes) = Self::prepare_record_bytes(r, encryption_details) {
732                let cmd = match fs::write(&file_path, bytes) {
733                    Ok(_) => {
734                        // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues):
735                        info!("Wrote record {record_key2:?} to disk! filename: {filename}");
736
737                        LocalSwarmCmd::AddLocalRecordAsStored {
738                            key,
739                            record_type,
740                            data_type,
741                        }
742                    }
743                    Err(err) => {
744                        error!(
745                        "Error writing record {record_key2:?} filename: {filename}, error: {err:?}"
746                    );
747                        LocalSwarmCmd::RemoveFailedLocalRecord { key }
748                    }
749                };
750
751                send_local_swarm_cmd(cloned_cmd_sender, cmd);
752            }
753        });
754
755        Ok(())
756    }
757
758    /// Return the quoting metrics used to calculate the cost of storing a record
759    /// and whether the record is already stored locally
760    pub(crate) fn quoting_metrics(
761        &self,
762        key: &Key,
763        data_type: u32,
764        data_size: usize,
765        network_size: Option<u64>,
766    ) -> (QuotingMetrics, bool) {
767        let records_stored = self.records.len();
768        let records_per_type = self.records_per_type();
769
770        let live_time = if let Ok(elapsed) = self.timestamp.elapsed() {
771            elapsed.as_secs()
772        } else {
773            0
774        };
775
776        let mut quoting_metrics = QuotingMetrics {
777            data_type,
778            data_size,
779            close_records_stored: records_stored,
780            records_per_type,
781            max_records: self.config.max_records,
782            received_payment_count: self.received_payment_count,
783            live_time,
784            network_density: None,
785            network_size,
786        };
787
788        if let Some(distance_range) = self.responsible_distance_range {
789            let relevant_records = self.get_records_within_distance_range(distance_range);
790
791            // The `responsible_range` is the network density
792            quoting_metrics.network_density = Some(distance_range.0.to_big_endian());
793
794            quoting_metrics.close_records_stored = relevant_records;
795        } else {
796            info!("Basing cost of _total_ records stored.");
797        };
798
799        // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues):
800        info!("Quoting_metrics {quoting_metrics:?}");
801
802        let is_stored = self.contains(key);
803        (quoting_metrics, is_stored)
804    }
805
806    /// Notify the node received a payment.
807    pub(crate) fn payment_received(&mut self) {
808        self.received_payment_count = self.received_payment_count.saturating_add(1);
809
810        self.flush_historic_quoting_metrics();
811    }
812
813    /// Calculate how many records are stored within a distance range
814    pub fn get_records_within_distance_range(&self, range: Distance) -> usize {
815        let within_range = self
816            .records_by_distance
817            .range(..range)
818            .collect::<Vec<_>>()
819            .len();
820
821        Marker::CloseRecordsLen(within_range).log();
822
823        within_range
824    }
825
826    /// Setup the distance range.
827    pub(crate) fn set_responsible_distance_range(&mut self, responsible_distance: Distance) {
828        self.responsible_distance_range = Some(responsible_distance);
829    }
830
831    fn records_per_type(&self) -> Vec<(u32, u32)> {
832        let mut map = BTreeMap::new();
833        for (_, _, data_type) in self.records.values() {
834            *map.entry(data_type.get_index()).or_insert(0) += 1;
835        }
836        map.into_iter().collect()
837    }
838}
839
840impl RecordStore for NodeRecordStore {
841    type RecordsIter<'a> = vec::IntoIter<Cow<'a, Record>>;
842    type ProvidedIter<'a> = vec::IntoIter<Cow<'a, ProviderRecord>>;
843
844    fn get(&self, k: &Key) -> Option<Cow<'_, Record>> {
845        // When a client calls GET, the request is forwarded to the nodes until one node returns
846        // with the record. Thus a node can be bombarded with GET reqs for random keys. These can be safely
847        // ignored if we don't have the record locally.
848        let key = PrettyPrintRecordKey::from(k);
849
850        let cached_record = self.records_cache.get(k);
851        // first return from FIFO cache if existing there
852        if let Some((record, _timestamp)) = cached_record {
853            return Some(Cow::Owned(record.clone()));
854        }
855
856        if !self.records.contains_key(k) {
857            debug!("Record not found locally: {key:?}");
858            return None;
859        }
860
861        debug!("GET request for Record key: {key}");
862
863        Self::read_from_disk(&self.encryption_details, k, &self.config.storage_dir)
864    }
865
866    fn put(&mut self, record: Record) -> Result<()> {
867        let record_key = PrettyPrintRecordKey::from(&record.key);
868
869        if record.value.len() >= self.config.max_value_bytes {
870            warn!(
871                "Record {record_key:?} not stored. Value too large: {} bytes",
872                record.value.len()
873            );
874            return Err(Error::ValueTooLarge);
875        }
876
877        // Record with payment shall always get passed further
878        // to allow the payment to be taken and credit into own wallet.
879        match RecordHeader::from_record(&record) {
880            Ok(record_header) => {
881                match record_header.kind {
882                    RecordKind::DataWithPayment(_) => {
883                        debug!("Record {record_key:?} with payment shall always be processed.");
884                    }
885                    // Shall not use wildcard, to avoid mis-match during enum update.
886                    RecordKind::DataOnly(_) => {
887                        // Chunk with existing key do not to be stored again.
888                        // Others with same content_hash do not to be stored again,
889                        // otherwise shall be passed further to allow different version of nonchunk
890                        // to be detected or updated.
891                        match self.records.get(&record.key) {
892                            Some((_addr, ValidationType::Chunk, _data_type)) => {
893                                debug!("Chunk {record_key:?} already exists.");
894                                return Ok(());
895                            }
896                            Some((
897                                _addr,
898                                ValidationType::NonChunk(existing_content_hash),
899                                _data_type,
900                            )) => {
901                                let content_hash = XorName::from_content(&record.value);
902                                if content_hash == *existing_content_hash {
903                                    debug!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists.");
904                                    return Ok(());
905                                }
906                            }
907                            _ => {}
908                        }
909                    }
910                }
911            }
912            Err(err) => {
913                error!("For record {record_key:?}, failed to parse record_header {err:?}");
914                return Ok(());
915            }
916        }
917
918        debug!("Unverified Record {record_key:?} try to validate and store");
919        let event_sender = self.network_event_sender.clone();
920        // push the event off thread so as to be non-blocking
921        let _handle = spawn(async move {
922            if let Err(error) = event_sender
923                .send(NetworkEvent::UnverifiedRecord(record))
924                .await
925            {
926                error!("SwarmDriver failed to send event: {}", error);
927            }
928        });
929
930        Ok(())
931    }
932
933    fn remove(&mut self, k: &Key) {
934        // Remove from main store
935        if let Some((addr, _, _)) = self.records.remove(k) {
936            let distance = self.local_address.distance(&addr);
937            let _ = self.records_by_distance.remove(&distance);
938        }
939
940        self.records_cache.remove(k);
941
942        #[cfg(feature = "open-metrics")]
943        if let Some(metric) = &self.record_count_metric {
944            let _ = metric.set(self.records.len() as i64);
945        }
946
947        if let Some((farthest_record, _)) = self.farthest_record.clone() {
948            if farthest_record == *k {
949                self.farthest_record = self.calculate_farthest();
950            }
951        }
952
953        let filename = Self::generate_filename(k);
954        let file_path = self.config.storage_dir.join(&filename);
955
956        let _handle = spawn(async move {
957            match fs::remove_file(file_path) {
958                Ok(_) => {
959                    info!("Removed record from disk! filename: {filename}");
960                }
961                Err(err) => {
962                    error!("Error while removing file. filename: {filename}, error: {err:?}");
963                }
964            }
965        });
966    }
967
968    fn records(&self) -> Self::RecordsIter<'_> {
969        // the records iter is used only during kad replication which is turned off
970        vec![].into_iter()
971    }
972
973    fn add_provider(&mut self, _record: ProviderRecord) -> Result<()> {
974        // ProviderRecords are not used currently
975        Ok(())
976    }
977
978    fn providers(&self, _key: &Key) -> Vec<ProviderRecord> {
979        // ProviderRecords are not used currently
980        vec![]
981    }
982
983    fn provided(&self) -> Self::ProvidedIter<'_> {
984        // ProviderRecords are not used currently
985        vec![].into_iter()
986    }
987
988    fn remove_provider(&mut self, _key: &Key, _provider: &PeerId) {
989        // ProviderRecords are not used currently
990    }
991}
992
993#[expect(trivial_casts)]
994#[cfg(test)]
995mod tests {
996    use super::*;
997    use bls::SecretKey;
998    use xor_name::XorName;
999
1000    use ant_protocol::storage::{
1001        try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, DataTypes, Scratchpad,
1002    };
1003    use assert_fs::{
1004        fixture::{PathChild, PathCreateDir},
1005        TempDir,
1006    };
1007    use bytes::Bytes;
1008    use eyre::ContextCompat;
1009    use libp2p::{core::multihash::Multihash, kad::RecordKey};
1010    use quickcheck::*;
1011    use tokio::runtime::Runtime;
1012    use tokio::time::{sleep, Duration};
1013
1014    const MULITHASH_CODE: u64 = 0x12;
1015
1016    #[derive(Clone, Debug)]
1017    struct ArbitraryKey(Key);
1018    #[derive(Clone, Debug)]
1019    struct ArbitraryRecord(Record);
1020
1021    impl Arbitrary for ArbitraryKey {
1022        fn arbitrary(g: &mut Gen) -> ArbitraryKey {
1023            let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
1024            ArbitraryKey(Key::from(
1025                Multihash::<64>::wrap(MULITHASH_CODE, &hash).expect("Failed to gen MultiHash"),
1026            ))
1027        }
1028    }
1029
1030    impl Arbitrary for ArbitraryRecord {
1031        fn arbitrary(g: &mut Gen) -> ArbitraryRecord {
1032            let value = match try_serialize_record(
1033                &(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
1034                RecordKind::DataOnly(DataTypes::Chunk),
1035            ) {
1036                Ok(value) => value.to_vec(),
1037                Err(err) => panic!("Cannot generate record value {err:?}"),
1038            };
1039            let record = Record {
1040                key: ArbitraryKey::arbitrary(g).0,
1041                value,
1042                publisher: None,
1043                expires: None,
1044            };
1045            ArbitraryRecord(record)
1046        }
1047    }
1048
1049    #[test]
1050    fn put_get_remove_record() {
1051        fn prop(r: ArbitraryRecord) {
1052            let rt = if let Ok(rt) = Runtime::new() {
1053                rt
1054            } else {
1055                panic!("Cannot create runtime");
1056            };
1057            rt.block_on(testing_thread(r));
1058        }
1059        quickcheck(prop as fn(_))
1060    }
1061
1062    async fn testing_thread(r: ArbitraryRecord) {
1063        let r = r.0;
1064        let (network_event_sender, mut network_event_receiver) = mpsc::channel(1);
1065        let (swarm_cmd_sender, _) = mpsc::channel(1);
1066
1067        let mut store = NodeRecordStore::with_config(
1068            PeerId::random(),
1069            Default::default(),
1070            network_event_sender,
1071            swarm_cmd_sender,
1072            #[cfg(feature = "open-metrics")]
1073            None,
1074        );
1075
1076        // An initial unverified put should not write to disk
1077        assert!(store.put(r.clone()).is_ok());
1078        assert!(store.get(&r.key).is_none());
1079
1080        let returned_record = if let Some(event) = network_event_receiver.recv().await {
1081            if let NetworkEvent::UnverifiedRecord(record) = event {
1082                record
1083            } else {
1084                panic!("Unexpected network event {event:?}");
1085            }
1086        } else {
1087            panic!("Failed recevied the record for further verification");
1088        };
1089
1090        let returned_record_key = returned_record.key.clone();
1091
1092        assert!(store
1093            .put_verified(returned_record, ValidationType::Chunk, true)
1094            .is_ok());
1095
1096        // We must also mark the record as stored (which would be triggered after the async write in nodes
1097        // via NetworkEvent::CompletedWrite)
1098        store.mark_as_stored(returned_record_key, ValidationType::Chunk, DataTypes::Chunk);
1099
1100        // loop over store.get max_iterations times to ensure async disk write had time to complete.
1101        let max_iterations = 10;
1102        let mut iteration = 0;
1103        while iteration < max_iterations {
1104            // try to check if it is equal to the actual record. This is needed because, the file
1105            // might not be fully written to the fs and would cause intermittent failures.
1106            // If there is actually a problem with the PUT, the assert statement below would catch it.
1107            if store
1108                .get(&r.key)
1109                .is_some_and(|record| Cow::Borrowed(&r) == record)
1110            {
1111                break;
1112            }
1113            sleep(Duration::from_millis(100)).await;
1114            iteration += 1;
1115        }
1116        if iteration == max_iterations {
1117            panic!("record_store test failed with stored record cann't be read back");
1118        }
1119
1120        assert_eq!(
1121            Some(Cow::Borrowed(&r)),
1122            store.get(&r.key),
1123            "record can be retrieved after put"
1124        );
1125        store.remove(&r.key);
1126
1127        assert!(store.get(&r.key).is_none());
1128    }
1129
1130    #[tokio::test]
1131    async fn can_store_after_restart() -> eyre::Result<()> {
1132        let tmp_dir = TempDir::new()?;
1133        let current_test_dir = tmp_dir.child("can_store_after_restart");
1134        current_test_dir.create_dir_all()?;
1135
1136        let store_config = NodeRecordStoreConfig {
1137            storage_dir: current_test_dir.to_path_buf(),
1138            encryption_seed: [1u8; 16],
1139            ..Default::default()
1140        };
1141        let self_id = PeerId::random();
1142
1143        // Create channels with proper receivers
1144        let (network_event_sender, _network_event_receiver) = mpsc::channel(1);
1145        let (swarm_cmd_sender, mut swarm_cmd_receiver) = mpsc::channel(1);
1146
1147        let mut store = NodeRecordStore::with_config(
1148            self_id,
1149            store_config.clone(),
1150            network_event_sender.clone(),
1151            swarm_cmd_sender.clone(),
1152            #[cfg(feature = "open-metrics")]
1153            None,
1154        );
1155
1156        // Create a chunk
1157        let chunk_data = Bytes::from_static(b"Test chunk data");
1158        let chunk = Chunk::new(chunk_data);
1159        let chunk_address = *chunk.address();
1160
1161        // Create a record from the chunk
1162        let record = Record {
1163            key: NetworkAddress::ChunkAddress(chunk_address).to_record_key(),
1164            value: try_serialize_record(&chunk, RecordKind::DataOnly(DataTypes::Chunk))?.to_vec(),
1165            expires: None,
1166            publisher: None,
1167        };
1168
1169        // Store the chunk using put_verified
1170        assert!(store
1171            .put_verified(record.clone(), ValidationType::Chunk, true)
1172            .is_ok());
1173
1174        // Wait for the async write operation to complete
1175        if let Some(cmd) = swarm_cmd_receiver.recv().await {
1176            match cmd {
1177                LocalSwarmCmd::AddLocalRecordAsStored {
1178                    key,
1179                    record_type,
1180                    data_type,
1181                } => {
1182                    store.mark_as_stored(key, record_type, data_type);
1183                }
1184                _ => panic!("Unexpected command received"),
1185            }
1186        }
1187
1188        // Verify the chunk is stored
1189        let stored_record = store.get(&record.key);
1190        assert!(stored_record.is_some(), "Chunk should be stored initially");
1191
1192        // Sleep a while to let OS completes the flush to disk
1193        sleep(Duration::from_secs(1)).await;
1194
1195        // Create new channels for the restarted store
1196        let (new_network_event_sender, _new_network_event_receiver) = mpsc::channel(1);
1197        let (new_swarm_cmd_sender, _new_swarm_cmd_receiver) = mpsc::channel(1);
1198
1199        // Restart the store with same encrypt_seed but new channels
1200        drop(store);
1201        let store = NodeRecordStore::with_config(
1202            self_id,
1203            store_config,
1204            new_network_event_sender,
1205            new_swarm_cmd_sender,
1206            #[cfg(feature = "open-metrics")]
1207            None,
1208        );
1209
1210        // Verify the record still exists
1211        let stored_record = store.get(&record.key);
1212        assert!(
1213            stored_record.is_some(),
1214            "Chunk should be stored after restart with same key"
1215        );
1216
1217        // Create new channels for the different seed test
1218        let (diff_network_event_sender, _diff_network_event_receiver) = mpsc::channel(1);
1219        let (diff_swarm_cmd_sender, _diff_swarm_cmd_receiver) = mpsc::channel(1);
1220
1221        // Restart the store with different encrypt_seed
1222        let self_id_diff = PeerId::random();
1223        let store_config_diff = NodeRecordStoreConfig {
1224            storage_dir: current_test_dir.to_path_buf(),
1225            encryption_seed: [2u8; 16],
1226            ..Default::default()
1227        };
1228        let store_diff = NodeRecordStore::with_config(
1229            self_id_diff,
1230            store_config_diff,
1231            diff_network_event_sender,
1232            diff_swarm_cmd_sender,
1233            #[cfg(feature = "open-metrics")]
1234            None,
1235        );
1236
1237        // When encryption is enabled, the record should be gone because it can't be decrypted
1238        // with the different encryption seed
1239        assert!(
1240            store_diff.get(&record.key).is_none(),
1241            "Chunk should be gone with different encryption key"
1242        );
1243
1244        Ok(())
1245    }
1246
1247    #[tokio::test]
1248    async fn can_store_and_retrieve_chunk() {
1249        let temp_dir = std::env::temp_dir();
1250        let store_config = NodeRecordStoreConfig {
1251            storage_dir: temp_dir,
1252            ..Default::default()
1253        };
1254        let self_id = PeerId::random();
1255        let (network_event_sender, _) = mpsc::channel(1);
1256        let (swarm_cmd_sender, _) = mpsc::channel(1);
1257
1258        let mut store = NodeRecordStore::with_config(
1259            self_id,
1260            store_config,
1261            network_event_sender,
1262            swarm_cmd_sender,
1263            #[cfg(feature = "open-metrics")]
1264            None,
1265        );
1266
1267        // Create a chunk
1268        let chunk_data = Bytes::from_static(b"Test chunk data");
1269        let chunk = Chunk::new(chunk_data.clone());
1270        let chunk_address = *chunk.address();
1271
1272        // Create a record from the chunk
1273        let record = Record {
1274            key: NetworkAddress::ChunkAddress(chunk_address).to_record_key(),
1275            value: chunk_data.to_vec(),
1276            expires: None,
1277            publisher: None,
1278        };
1279
1280        // Store the chunk using put_verified
1281        assert!(store
1282            .put_verified(record.clone(), ValidationType::Chunk, true)
1283            .is_ok());
1284
1285        // Mark as stored (simulating the CompletedWrite event)
1286        store.mark_as_stored(record.key.clone(), ValidationType::Chunk, DataTypes::Chunk);
1287
1288        // Verify the chunk is stored
1289        let stored_record = store.get(&record.key);
1290        assert!(stored_record.is_some(), "Chunk should be stored");
1291
1292        if let Some(stored) = stored_record {
1293            assert_eq!(
1294                stored.value, chunk_data,
1295                "Stored chunk data should match original"
1296            );
1297
1298            let stored_address = ChunkAddress::new(XorName::from_content(&stored.value));
1299            assert_eq!(
1300                stored_address, chunk_address,
1301                "Stored chunk address should match original"
1302            );
1303        }
1304
1305        // Clean up
1306        store.remove(&record.key);
1307        assert!(
1308            store.get(&record.key).is_none(),
1309            "Chunk should be removed after cleanup"
1310        );
1311    }
1312
1313    #[tokio::test]
1314    async fn can_store_and_retrieve_scratchpad() -> eyre::Result<()> {
1315        let temp_dir = std::env::temp_dir();
1316        let store_config = NodeRecordStoreConfig {
1317            storage_dir: temp_dir,
1318            ..Default::default()
1319        };
1320        let self_id = PeerId::random();
1321        let (network_event_sender, _) = mpsc::channel(1);
1322        let (swarm_cmd_sender, _) = mpsc::channel(1);
1323
1324        let mut store = NodeRecordStore::with_config(
1325            self_id,
1326            store_config,
1327            network_event_sender,
1328            swarm_cmd_sender,
1329            #[cfg(feature = "open-metrics")]
1330            None,
1331        );
1332
1333        // Create a scratchpad
1334        let unencrypted_scratchpad_data = Bytes::from_static(b"Test scratchpad data");
1335        let owner_sk = SecretKey::random();
1336
1337        let scratchpad = Scratchpad::new(&owner_sk, 0, &unencrypted_scratchpad_data, 0);
1338
1339        let scratchpad_address = *scratchpad.address();
1340
1341        // Create a record from the scratchpad
1342        let record = Record {
1343            key: NetworkAddress::ScratchpadAddress(scratchpad_address).to_record_key(),
1344            value: try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))?
1345                .to_vec(),
1346            expires: None,
1347            publisher: None,
1348        };
1349
1350        // Store the scratchpad using put_verified
1351        assert!(store
1352            .put_verified(
1353                record.clone(),
1354                ValidationType::NonChunk(XorName::from_content(&record.value)),
1355                true,
1356            )
1357            .is_ok());
1358
1359        // Mark as stored (simulating the CompletedWrite event)
1360        store.mark_as_stored(
1361            record.key.clone(),
1362            ValidationType::NonChunk(XorName::from_content(&record.value)),
1363            DataTypes::Scratchpad,
1364        );
1365
1366        // Verify the scratchpad is stored
1367        let stored_record = store.get(&record.key);
1368        assert!(stored_record.is_some(), "Scratchpad should be stored");
1369
1370        if let Some(stored) = stored_record {
1371            let scratchpad = try_deserialize_record::<Scratchpad>(&stored)?;
1372
1373            let stored_address = scratchpad.address();
1374            assert_eq!(
1375                stored_address, &scratchpad_address,
1376                "Stored scratchpad address should match original"
1377            );
1378
1379            let decrypted_data = scratchpad.decrypt_data(&owner_sk)?;
1380
1381            assert_eq!(
1382                decrypted_data, unencrypted_scratchpad_data,
1383                "Stored scratchpad data should match original"
1384            );
1385        }
1386
1387        store.remove(&record.key);
1388        assert!(
1389            store.get(&record.key).is_none(),
1390            "Scratchpad should be removed after cleanup"
1391        );
1392
1393        Ok(())
1394    }
1395    #[tokio::test]
1396    async fn pruning_on_full() -> Result<()> {
1397        let max_iterations = 10;
1398        // lower max records for faster testing
1399        let max_records = 50;
1400
1401        let temp_dir = std::env::temp_dir();
1402        let unique_dir_name = uuid::Uuid::new_v4().to_string();
1403        let storage_dir = temp_dir.join(unique_dir_name);
1404        fs::create_dir_all(&storage_dir).expect("Failed to create directory");
1405
1406        // Set the config::max_record to be 50, then generate 100 records
1407        // On storing the 51st to 100th record,
1408        // check there is an expected pruning behaviour got carried out.
1409        let store_config = NodeRecordStoreConfig {
1410            max_records,
1411            storage_dir,
1412            ..Default::default()
1413        };
1414        let self_id = PeerId::random();
1415        let (network_event_sender, _) = mpsc::channel(1);
1416        let (swarm_cmd_sender, _) = mpsc::channel(1);
1417
1418        let mut store = NodeRecordStore::with_config(
1419            self_id,
1420            store_config.clone(),
1421            network_event_sender,
1422            swarm_cmd_sender,
1423            #[cfg(feature = "open-metrics")]
1424            None,
1425        );
1426        // keep track of everything ever stored, to check missing at the end are further away
1427        let mut stored_records_at_some_point: Vec<RecordKey> = vec![];
1428        let self_address = NetworkAddress::from(self_id);
1429
1430        // keep track of fails to assert they're further than stored
1431        let mut failed_records = vec![];
1432
1433        // try and put an excess of records
1434        for _ in 0..max_records * 2 {
1435            // println!("i: {i}");
1436            let record_key = NetworkAddress::from(PeerId::random()).to_record_key();
1437            let value = match try_serialize_record(
1438                &(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
1439                RecordKind::DataOnly(DataTypes::Chunk),
1440            ) {
1441                Ok(value) => value.to_vec(),
1442                Err(err) => panic!("Cannot generate record value {err:?}"),
1443            };
1444            let record = Record {
1445                key: record_key.clone(),
1446                value,
1447                publisher: None,
1448                expires: None,
1449            };
1450
1451            // Will be stored anyway.
1452            let succeeded = store
1453                .put_verified(record, ValidationType::Chunk, true)
1454                .is_ok();
1455
1456            if !succeeded {
1457                failed_records.push(record_key.clone());
1458                println!("failed {:?}", PrettyPrintRecordKey::from(&record_key));
1459            } else {
1460                // We must also mark the record as stored (which would be triggered
1461                // after the async write in nodes via NetworkEvent::CompletedWrite)
1462                store.mark_as_stored(record_key.clone(), ValidationType::Chunk, DataTypes::Chunk);
1463
1464                println!("success sotred len: {:?} ", store.record_addresses().len());
1465                stored_records_at_some_point.push(record_key.clone());
1466                if stored_records_at_some_point.len() <= max_records {
1467                    assert!(succeeded);
1468                }
1469                // loop over max_iterations times to ensure async disk write had time to complete.
1470                let mut iteration = 0;
1471                while iteration < max_iterations {
1472                    if store.get(&record_key).is_some() {
1473                        break;
1474                    }
1475                    sleep(Duration::from_millis(100)).await;
1476                    iteration += 1;
1477                }
1478                if iteration == max_iterations {
1479                    panic!("record_store prune test failed with stored record {record_key:?} can't be read back");
1480                }
1481            }
1482        }
1483
1484        let stored_data_at_end = store.record_addresses();
1485        assert!(
1486            stored_data_at_end.len() == max_records,
1487            "Stored records ({:?}) should be max_records, {max_records:?}",
1488            stored_data_at_end.len(),
1489        );
1490
1491        // now assert that we've stored at _least_ max records (likely many more over the liftime of the store)
1492        assert!(
1493            stored_records_at_some_point.len() >= max_records,
1494            "we should have stored ata least max over time"
1495        );
1496
1497        // now all failed records should be farther than the farthest stored record
1498        let mut sorted_stored_data = stored_data_at_end.iter().collect_vec();
1499
1500        sorted_stored_data
1501            .sort_by(|(a, _), (b, _)| self_address.distance(a).cmp(&self_address.distance(b)));
1502
1503        // next assert that all records stored are closer than the next closest of the failed records
1504        if let Some((most_distant_data, _)) = sorted_stored_data.last() {
1505            for failed_record in failed_records {
1506                let failed_data = NetworkAddress::from(&failed_record);
1507                assert!(
1508                    self_address.distance(&failed_data) > self_address.distance(most_distant_data),
1509                    "failed record {failed_data:?} should be farther than the farthest stored record {most_distant_data:?}"
1510                );
1511            }
1512
1513            // now for any stored data. It either shoudl still be stored OR further away than `most_distant_data`
1514            for data in stored_records_at_some_point {
1515                let data_addr = NetworkAddress::from(&data);
1516                if !sorted_stored_data.contains(&(&data_addr, &ValidationType::Chunk)) {
1517                    assert!(
1518                        self_address.distance(&data_addr)
1519                            > self_address.distance(most_distant_data),
1520                        "stored record should be farther than the farthest stored record"
1521                    );
1522                }
1523            }
1524        }
1525
1526        Ok(())
1527    }
1528
1529    #[tokio::test]
1530    async fn get_records_within_range() -> eyre::Result<()> {
1531        let max_records = 50;
1532
1533        let temp_dir = std::env::temp_dir();
1534        let unique_dir_name = uuid::Uuid::new_v4().to_string();
1535        let storage_dir = temp_dir.join(unique_dir_name);
1536
1537        // setup the store
1538        let store_config = NodeRecordStoreConfig {
1539            max_records,
1540            storage_dir,
1541            ..Default::default()
1542        };
1543        let self_id = PeerId::random();
1544        let (network_event_sender, _) = mpsc::channel(1);
1545        let (swarm_cmd_sender, _) = mpsc::channel(1);
1546        let mut store = NodeRecordStore::with_config(
1547            self_id,
1548            store_config,
1549            network_event_sender,
1550            swarm_cmd_sender,
1551            #[cfg(feature = "open-metrics")]
1552            None,
1553        );
1554
1555        let mut stored_records: Vec<RecordKey> = vec![];
1556        let self_address = NetworkAddress::from(self_id);
1557
1558        // add records...
1559        // minus one here as if we hit max, the store will fail
1560        for _ in 0..max_records - 1 {
1561            let record_key = NetworkAddress::from(PeerId::random()).to_record_key();
1562            let value = match try_serialize_record(
1563                &(0..max_records)
1564                    .map(|_| rand::random::<u8>())
1565                    .collect::<Bytes>(),
1566                RecordKind::DataOnly(DataTypes::Chunk),
1567            ) {
1568                Ok(value) => value.to_vec(),
1569                Err(err) => panic!("Cannot generate record value {err:?}"),
1570            };
1571            let record = Record {
1572                key: record_key.clone(),
1573                value,
1574                publisher: None,
1575                expires: None,
1576            };
1577            assert!(store
1578                .put_verified(record, ValidationType::Chunk, true)
1579                .is_ok());
1580            // We must also mark the record as stored (which would be triggered after the async write in nodes
1581            // via NetworkEvent::CompletedWrite)
1582            store.mark_as_stored(record_key.clone(), ValidationType::Chunk, DataTypes::Chunk);
1583
1584            stored_records.push(record_key.clone());
1585            stored_records.sort_by(|a, b| {
1586                let a = NetworkAddress::from(a);
1587                let b = NetworkAddress::from(b);
1588                self_address.distance(&a).cmp(&self_address.distance(&b))
1589            });
1590        }
1591
1592        // get a record halfway through the list
1593        let halfway_record_address = NetworkAddress::from(
1594            stored_records
1595                .get(max_records / 2)
1596                .wrap_err("Could not parse record store key")?,
1597        );
1598        // get the distance to this record from our local key
1599        let distance = &self_address.distance(&halfway_record_address);
1600
1601        // must be plus one bucket from the halfway record
1602        store.set_responsible_distance_range(*distance);
1603
1604        let records_in_range = store.get_records_within_distance_range(*distance);
1605
1606        // check that the number of records returned is larger than half our records
1607        // (ie, that we cover _at least_ all the records within our distance range)
1608        assert!(
1609            records_in_range >= max_records / 2,
1610            "Not enough records in range {records_in_range}/{}",
1611            max_records / 2
1612        );
1613
1614        Ok(())
1615    }
1616
1617    #[tokio::test]
1618    async fn historic_quoting_metrics() -> Result<()> {
1619        let temp_dir = std::env::temp_dir();
1620        let unique_dir_name = uuid::Uuid::new_v4().to_string();
1621        let storage_dir = temp_dir.join(unique_dir_name);
1622        fs::create_dir_all(&storage_dir).expect("Failed to create directory");
1623        let historic_quote_dir = storage_dir.clone();
1624
1625        let store_config = NodeRecordStoreConfig {
1626            storage_dir,
1627            historic_quote_dir,
1628            ..Default::default()
1629        };
1630        let self_id = PeerId::random();
1631        let (network_event_sender, _) = mpsc::channel(1);
1632        let (swarm_cmd_sender, _) = mpsc::channel(1);
1633
1634        let mut store = NodeRecordStore::with_config(
1635            self_id,
1636            store_config.clone(),
1637            network_event_sender.clone(),
1638            swarm_cmd_sender.clone(),
1639            #[cfg(feature = "open-metrics")]
1640            None,
1641        );
1642
1643        store.payment_received();
1644
1645        // Wait for a while to allow the file written to disk.
1646        sleep(Duration::from_millis(5000)).await;
1647
1648        let new_store = NodeRecordStore::with_config(
1649            self_id,
1650            store_config,
1651            network_event_sender,
1652            swarm_cmd_sender,
1653            #[cfg(feature = "open-metrics")]
1654            None,
1655        );
1656
1657        assert_eq!(1, new_store.received_payment_count);
1658        assert_eq!(store.timestamp, new_store.timestamp);
1659
1660        Ok(())
1661    }
1662
1663    #[tokio::test]
1664    async fn test_cache_pruning_and_size_limit() {
1665        // Create cache with small size and short timeout for testing
1666        let cache_size = 3;
1667        let cache_timeout = Duration::from_millis(100);
1668        let mut cache = RecordCache::new(cache_size, cache_timeout);
1669
1670        // Create test records
1671        let record1 = Record {
1672            key: RecordKey::new(b"key1"),
1673            value: b"value1".to_vec(),
1674            publisher: None,
1675            expires: None,
1676        };
1677        let record2 = Record {
1678            key: RecordKey::new(b"key2"),
1679            value: b"value2".to_vec(),
1680            publisher: None,
1681            expires: None,
1682        };
1683        let record3 = Record {
1684            key: RecordKey::new(b"key3"),
1685            value: b"value3".to_vec(),
1686            publisher: None,
1687            expires: None,
1688        };
1689        let record4 = Record {
1690            key: RecordKey::new(b"key4"),
1691            value: b"value4".to_vec(),
1692            publisher: None,
1693            expires: None,
1694        };
1695
1696        // Add records up to cache size
1697        cache.push_back(record1.key.clone(), record1.clone());
1698        sleep(Duration::from_millis(1)).await;
1699        cache.push_back(record2.key.clone(), record2.clone());
1700        sleep(Duration::from_millis(1)).await;
1701        cache.push_back(record3.key.clone(), record3.clone());
1702        sleep(Duration::from_millis(1)).await;
1703
1704        // Verify all records are present
1705        assert!(cache.get(&record1.key).is_some());
1706        assert!(cache.get(&record2.key).is_some());
1707        assert!(cache.get(&record3.key).is_some());
1708
1709        // Add one more record to trigger size-based pruning
1710        cache.push_back(record4.key.clone(), record4.clone());
1711
1712        // Verify cache size is maintained
1713        assert_eq!(cache.records_cache.len(), cache_size);
1714
1715        // Verify oldest record was removed
1716        assert!(cache.get(&record1.key).is_none());
1717
1718        // Wait for timeout to expire
1719        sleep(cache_timeout + Duration::from_millis(10)).await;
1720
1721        // Add another record to trigger time-based pruning
1722        let record5 = Record {
1723            key: RecordKey::new(b"key5"),
1724            value: b"value5".to_vec(),
1725            publisher: None,
1726            expires: None,
1727        };
1728        cache.push_back(record5.key.clone(), record5.clone());
1729
1730        // Verify all timed-out records were removed
1731        assert!(cache.get(&record2.key).is_none());
1732        assert!(cache.get(&record3.key).is_none());
1733        assert!(cache.get(&record4.key).is_none());
1734
1735        // Verify new record is present
1736        assert!(cache.get(&record5.key).is_some());
1737    }
1738}