1#![allow(clippy::mutable_key_type)] use crate::cmd::LocalSwarmCmd;
11use crate::network_builder::MAX_PACKET_SIZE;
12use crate::send_local_swarm_cmd;
13use crate::time::{spawn, Instant};
14use crate::{event::NetworkEvent, log_markers::Marker};
15use aes_gcm_siv::{
16 aead::{Aead, KeyInit},
17 Aes256GcmSiv, Key as AesKey, Nonce,
18};
19use ant_evm::QuotingMetrics;
20use ant_protocol::{
21 storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
22 NetworkAddress, PrettyPrintRecordKey,
23};
24use hkdf::Hkdf;
25use itertools::Itertools;
26use libp2p::{
27 identity::PeerId,
28 kad::{
29 store::{Error, RecordStore, Result},
30 KBucketDistance as Distance, ProviderRecord, Record, RecordKey as Key,
31 },
32};
33#[cfg(feature = "open-metrics")]
34use prometheus_client::metrics::gauge::Gauge;
35use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
36use serde::{Deserialize, Serialize};
37use sha2::Sha256;
38use std::{
39 borrow::Cow,
40 collections::{BTreeMap, HashMap},
41 fs,
42 path::{Path, PathBuf},
43 time::SystemTime,
44 vec,
45};
46use tokio::{sync::mpsc, time::Duration};
47use walkdir::{DirEntry, WalkDir};
48use xor_name::XorName;
49
50const MAX_RECORDS_COUNT: usize = 16 * 1024;
57
58const MAX_RECORDS_CACHE_SIZE: usize = 25;
60
61const HISTORICAL_QUOTING_METRICS_FILENAME: &str = "historic_quoting_metrics";
63
64const CACHE_TIMEOUT: Duration = Duration::from_secs(360);
67
68fn derive_aes256gcm_siv_from_seed(seed: &[u8; 16]) -> (Aes256GcmSiv, [u8; 4]) {
69 let salt = b"autonomi_record_store";
71
72 let hk = Hkdf::<Sha256>::new(Some(salt), seed);
73
74 let mut okm = [0u8; 32];
75 hk.expand(b"", &mut okm)
76 .expect("32 bytes is a valid length for HKDF output");
77
78 let seeded_key = AesKey::<Aes256GcmSiv>::from_slice(&okm);
79
80 let mut nonce_starter = [0u8; 4];
81 let bytes_to_copy = seed.len().min(nonce_starter.len());
82 nonce_starter[..bytes_to_copy].copy_from_slice(&seed[..bytes_to_copy]);
83
84 trace!("seeded_key is {seeded_key:?} nonce_starter is {nonce_starter:?}");
85
86 (Aes256GcmSiv::new(seeded_key), nonce_starter)
87}
88
89struct RecordCache {
91 records_cache: HashMap<Key, (Record, SystemTime)>,
92 cache_size: usize,
93 cache_timeout: Duration,
94}
95
96impl RecordCache {
97 fn new(cache_size: usize, cache_timeout: Duration) -> Self {
98 RecordCache {
99 records_cache: HashMap::new(),
100 cache_size,
101 cache_timeout,
102 }
103 }
104
105 fn remove(&mut self, key: &Key) -> Option<(Record, SystemTime)> {
106 self.records_cache.remove(key)
107 }
108
109 fn get(&self, key: &Key) -> Option<&(Record, SystemTime)> {
110 self.records_cache.get(key)
111 }
112
113 fn push_back(&mut self, key: Key, record: Record) {
114 self.free_up_space();
115
116 let _ = self
117 .records_cache
118 .insert(key, (record, SystemTime::now() + self.cache_timeout));
119 }
120
121 fn free_up_space(&mut self) {
122 let current = SystemTime::now();
123 self.records_cache
125 .retain(|_key, (_record, timestamp)| *timestamp > current);
126
127 while self.records_cache.len() >= self.cache_size {
128 self.remove_oldest_entry()
129 }
130 }
131
132 fn remove_oldest_entry(&mut self) {
133 let mut oldest_timestamp = SystemTime::now() + self.cache_timeout;
134 let mut key_to_remove = None;
135
136 for (key, (_record, timestamp)) in self.records_cache.iter() {
137 if *timestamp < oldest_timestamp {
138 oldest_timestamp = *timestamp;
139 key_to_remove = Some(key.clone());
140 }
141 }
142
143 if let Some(key) = key_to_remove {
144 let _ = self.records_cache.remove(&key);
145 }
146 }
147}
148
149pub struct NodeRecordStore {
151 local_address: NetworkAddress,
153 config: NodeRecordStoreConfig,
155 records: HashMap<Key, (NetworkAddress, ValidationType, DataTypes)>,
157 records_by_distance: BTreeMap<Distance, Key>,
159 records_cache: RecordCache,
161 network_event_sender: mpsc::Sender<NetworkEvent>,
163 local_swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
165 responsible_distance_range: Option<Distance>,
169 #[cfg(feature = "open-metrics")]
170 record_count_metric: Option<Gauge>,
172 received_payment_count: usize,
174 encryption_details: (Aes256GcmSiv, [u8; 4]),
177 timestamp: SystemTime,
179 farthest_record: Option<(Key, Distance)>,
181}
182
183#[derive(Debug, Clone)]
185pub struct NodeRecordStoreConfig {
186 pub storage_dir: PathBuf,
188 pub historic_quote_dir: PathBuf,
191 pub max_records: usize,
193 pub max_value_bytes: usize,
195 pub records_cache_size: usize,
197 pub encryption_seed: [u8; 16],
199}
200
201impl Default for NodeRecordStoreConfig {
202 fn default() -> Self {
203 let historic_quote_dir = std::env::temp_dir();
204 Self {
205 storage_dir: historic_quote_dir.clone(),
206 historic_quote_dir,
207 max_records: MAX_RECORDS_COUNT,
208 max_value_bytes: MAX_PACKET_SIZE,
209 records_cache_size: MAX_RECORDS_CACHE_SIZE,
210 encryption_seed: [0u8; 16],
211 }
212 }
213}
214
215fn generate_nonce_for_record(nonce_starter: &[u8; 4], key: &Key) -> Nonce {
217 let mut nonce_bytes = nonce_starter.to_vec();
218 nonce_bytes.extend_from_slice(key.as_ref());
219 nonce_bytes.resize(12, 0); Nonce::from_iter(nonce_bytes)
223}
224
225#[derive(Clone, Serialize, Deserialize)]
226struct HistoricQuotingMetrics {
227 received_payment_count: usize,
228 timestamp: SystemTime,
229}
230
231impl NodeRecordStore {
232 fn update_records_from_an_existing_store(
234 config: &NodeRecordStoreConfig,
235 encryption_details: &(Aes256GcmSiv, [u8; 4]),
236 ) -> HashMap<Key, (NetworkAddress, ValidationType, DataTypes)> {
237 let process_entry = |entry: &DirEntry| -> _ {
238 let path = entry.path();
239 if path.is_file() {
240 debug!("Existing record found: {path:?}");
241 let filename = match path.file_name().and_then(|n| n.to_str()) {
243 Some(file_name) => file_name,
244 None => {
245 warn!(
247 "Found a file in the storage dir that is not a valid record: {:?}",
248 path
249 );
250 if let Err(e) = fs::remove_file(path) {
251 warn!(
252 "Failed to remove invalid record file from storage dir: {:?}",
253 e
254 );
255 }
256 return None;
257 }
258 };
259 let key = Self::get_data_from_filename(filename)?;
261 let record = match fs::read(path) {
262 Ok(bytes) => {
263 if let Some(record) =
265 Self::get_record_from_bytes(bytes, &key, encryption_details)
266 {
267 record
268 } else {
269 info!("Failed to decrypt record from file {filename:?}, clean it up.");
272 if let Err(e) = fs::remove_file(path) {
273 warn!(
274 "Failed to remove outdated record file {filename:?} from storage dir: {:?}",
275 e
276 );
277 }
278 return None;
279 }
280 }
281 Err(err) => {
282 error!("Error while reading file. filename: {filename}, error: {err:?}");
283 return None;
284 }
285 };
286
287 match RecordHeader::get_data_type(&record) {
288 Ok(data_type) => {
289 let validate_type = match data_type {
290 DataTypes::Chunk => ValidationType::Chunk,
291 _ => {
292 let xorname_hash = XorName::from_content(&record.value);
293 ValidationType::NonChunk(xorname_hash)
294 }
295 };
296
297 let address = NetworkAddress::from(&key);
298 info!("Existing record {address:?} loaded from: {path:?}");
299 return Some((key, (address, validate_type, data_type)));
300 }
301 Err(error) => {
302 warn!(
303 "Failed to parse record type of record {filename:?}: {:?}",
304 error
305 );
306 if let Err(e) = fs::remove_file(path) {
309 warn!(
310 "Failed to remove invalid record file {filename:?} from storage dir: {:?}",
311 e
312 );
313 }
314 return None;
315 }
316 }
317 }
318 None
319 };
320
321 info!("Attempting to repopulate records from existing store...");
322 let records = WalkDir::new(&config.storage_dir)
323 .into_iter()
324 .filter_map(|e| e.ok())
325 .collect_vec()
326 .par_iter()
327 .filter_map(process_entry)
328 .collect();
329 records
330 }
331
332 fn restore_quoting_metrics(storage_dir: &Path) -> Option<HistoricQuotingMetrics> {
334 let file_path = storage_dir.join(HISTORICAL_QUOTING_METRICS_FILENAME);
335
336 if let Ok(file) = fs::File::open(file_path) {
337 if let Ok(quoting_metrics) = rmp_serde::from_read(&file) {
338 return Some(quoting_metrics);
339 }
340 }
341
342 None
343 }
344
345 fn flush_historic_quoting_metrics(&self) {
346 let file_path = self
347 .config
348 .historic_quote_dir
349 .join(HISTORICAL_QUOTING_METRICS_FILENAME);
350
351 let historic_quoting_metrics = HistoricQuotingMetrics {
352 received_payment_count: self.received_payment_count,
353 timestamp: self.timestamp,
354 };
355
356 spawn(async move {
357 if let Ok(mut file) = fs::File::create(file_path) {
358 let mut serialiser = rmp_serde::encode::Serializer::new(&mut file);
359 let _ = historic_quoting_metrics.serialize(&mut serialiser);
360 }
361 });
362 }
363
364 pub fn with_config(
366 local_id: PeerId,
367 config: NodeRecordStoreConfig,
368 network_event_sender: mpsc::Sender<NetworkEvent>,
369 swarm_cmd_sender: mpsc::Sender<LocalSwarmCmd>,
370 #[cfg(feature = "open-metrics")] record_count_metric: Option<Gauge>,
371 ) -> Self {
372 info!("Using encryption_seed of {:?}", config.encryption_seed);
373 let encryption_details = derive_aes256gcm_siv_from_seed(&config.encryption_seed);
374
375 let (received_payment_count, timestamp) = if let Some(historic_quoting_metrics) =
378 Self::restore_quoting_metrics(&config.historic_quote_dir)
379 {
380 (
381 historic_quoting_metrics.received_payment_count,
382 historic_quoting_metrics.timestamp,
383 )
384 } else {
385 (0, SystemTime::now())
386 };
387
388 let records = Self::update_records_from_an_existing_store(&config, &encryption_details);
389 let local_address = NetworkAddress::from(local_id);
390
391 let mut records_by_distance: BTreeMap<Distance, Key> = BTreeMap::new();
393 for (key, (addr, _record_type, _data_type)) in records.iter() {
394 let distance = &local_address.distance(addr);
395 let _ = records_by_distance.insert(*distance, key.clone());
396 }
397
398 let cache_size = config.records_cache_size;
399 let mut record_store = NodeRecordStore {
400 local_address,
401 config,
402 records,
403 records_by_distance,
404 records_cache: RecordCache::new(cache_size, CACHE_TIMEOUT),
405 network_event_sender,
406 local_swarm_cmd_sender: swarm_cmd_sender,
407 responsible_distance_range: None,
408 #[cfg(feature = "open-metrics")]
409 record_count_metric,
410 received_payment_count,
411 encryption_details,
412 timestamp,
413 farthest_record: None,
414 };
415
416 record_store.farthest_record = record_store.calculate_farthest();
417
418 record_store.flush_historic_quoting_metrics();
419
420 #[cfg(feature = "open-metrics")]
421 if let Some(metric) = &record_store.record_count_metric {
422 let _ = metric.set(record_store.records.len() as i64);
423 }
424
425 record_store
426 }
427
428 pub(crate) fn get_responsible_distance_range(&self) -> Option<Distance> {
430 self.responsible_distance_range
431 }
432
433 fn generate_filename(key: &Key) -> String {
435 hex::encode(key.as_ref())
436 }
437
438 fn get_data_from_filename(hex_str: &str) -> Option<Key> {
440 match hex::decode(hex_str) {
441 Ok(bytes) => Some(Key::from(bytes)),
442 Err(error) => {
443 error!("Error decoding hex string: {:?}", error);
444 None
445 }
446 }
447 }
448
449 fn get_record_from_bytes<'a>(
451 bytes: Vec<u8>,
452 key: &Key,
453 encryption_details: &(Aes256GcmSiv, [u8; 4]),
454 ) -> Option<Cow<'a, Record>> {
455 let (cipher, nonce_starter) = encryption_details;
456 let nonce = generate_nonce_for_record(nonce_starter, key);
457
458 match cipher.decrypt(&nonce, bytes.as_slice()) {
459 Ok(value) => {
460 let record = Record {
461 key: key.clone(),
462 value,
463 publisher: None,
464 expires: None,
465 };
466 Some(Cow::Owned(record))
467 }
468 Err(error) => {
469 error!("Error while decrypting record. key: {key:?}: {error:?}");
470 None
471 }
472 }
473 }
474
475 fn read_from_disk<'a>(
476 encryption_details: &(Aes256GcmSiv, [u8; 4]),
477 key: &Key,
478 storage_dir: &Path,
479 ) -> Option<Cow<'a, Record>> {
480 let start = Instant::now();
481 let filename = Self::generate_filename(key);
482
483 let file_path = storage_dir.join(&filename);
484
485 match fs::read(file_path) {
487 Ok(bytes) => {
488 info!(
490 "Retrieved record from disk! filename: {filename} after {:?}",
491 start.elapsed()
492 );
493
494 Self::get_record_from_bytes(bytes, key, encryption_details)
495 }
496 Err(err) => {
497 error!("Error while reading file. filename: {filename}, error: {err:?}");
498 None
499 }
500 }
501 }
502
503 pub fn get_farthest(&self) -> Option<Key> {
505 if let Some((ref key, _distance)) = self.farthest_record {
506 Some(key.clone())
507 } else {
508 None
509 }
510 }
511
512 fn calculate_farthest(&self) -> Option<(Key, Distance)> {
514 let mut sorted_records: Vec<_> = self.records.keys().collect();
516 sorted_records.sort_by_key(|key| {
517 let addr = NetworkAddress::from(*key);
518 self.local_address.distance(&addr)
519 });
520
521 if let Some(key) = sorted_records.last() {
522 let addr = NetworkAddress::from(*key);
523 Some(((*key).clone(), self.local_address.distance(&addr)))
524 } else {
525 None
526 }
527 }
528
529 fn prune_records_if_needed(&mut self, incoming_record_key: &Key) -> Result<()> {
536 if self.records.len() < self.config.max_records {
538 return Ok(());
539 }
540
541 if let Some((farthest_record, farthest_record_distance)) = self.farthest_record.clone() {
542 if farthest_record_distance
544 < self
545 .local_address
546 .distance(&NetworkAddress::from(incoming_record_key))
547 {
548 return Err(Error::MaxRecords);
549 }
550
551 info!(
552 "Record {:?} will be pruned to free up space for new records",
553 PrettyPrintRecordKey::from(&farthest_record)
554 );
555 self.remove(&farthest_record);
556 }
557
558 Ok(())
559 }
560
561 pub fn cleanup_irrelevant_records(&mut self) {
568 let accumulated_records = self.records.len();
569 if accumulated_records < MAX_RECORDS_COUNT / 10 {
570 return;
571 }
572
573 let responsible_distance = if let Some(distance) = self.responsible_distance_range {
574 distance
575 } else {
576 return;
577 };
578
579 let keys_to_remove: Vec<Key> = self
581 .records_by_distance
582 .range(responsible_distance..)
583 .map(|(_distance, key)| key.clone())
584 .collect();
585
586 let keys_to_remove_len = keys_to_remove.len();
587
588 for key in keys_to_remove {
590 self.remove(&key);
591 }
592
593 info!("Cleaned up {} unrelevant records, among the original {accumulated_records} accumulated_records",
594 keys_to_remove_len);
595 }
596}
597
598impl NodeRecordStore {
599 pub(crate) fn contains(&self, key: &Key) -> bool {
601 self.records.contains_key(key)
602 }
603
604 pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, ValidationType> {
607 self.records
608 .iter()
609 .map(|(_record_key, (addr, record_type, _data_type))| {
610 (addr.clone(), record_type.clone())
611 })
612 .collect()
613 }
614
615 pub(crate) fn record_addresses_ref(
617 &self,
618 ) -> &HashMap<Key, (NetworkAddress, ValidationType, DataTypes)> {
619 &self.records
620 }
621
622 pub(crate) fn mark_as_stored(
626 &mut self,
627 key: Key,
628 validate_type: ValidationType,
629 data_type: DataTypes,
630 ) {
631 let addr = NetworkAddress::from(&key);
632 let distance = self.local_address.distance(&addr);
633
634 self.records
636 .insert(key.clone(), (addr.clone(), validate_type, data_type));
637
638 #[cfg(feature = "open-metrics")]
639 if let Some(metric) = &self.record_count_metric {
640 let _ = metric.set(self.records.len() as i64);
641 }
642
643 let _ = self.records_by_distance.insert(distance, key.clone());
645
646 if let Some((_farthest_record, farthest_record_distance)) = self.farthest_record.clone() {
648 if distance > farthest_record_distance {
649 self.farthest_record = Some((key, distance));
650 }
651 } else {
652 self.farthest_record = Some((key, distance));
653 }
654 }
655
656 fn prepare_record_bytes(
659 record: Record,
660 encryption_details: (Aes256GcmSiv, [u8; 4]),
661 ) -> Option<Vec<u8>> {
662 let (cipher, nonce_starter) = encryption_details;
663 let nonce = generate_nonce_for_record(&nonce_starter, &record.key);
664
665 match cipher.encrypt(&nonce, record.value.as_ref()) {
666 Ok(value) => Some(value),
667 Err(error) => {
668 warn!(
669 "Failed to encrypt record {:?} : {error:?}",
670 PrettyPrintRecordKey::from(&record.key),
671 );
672 None
673 }
674 }
675 }
676
677 pub(crate) fn put_verified(
683 &mut self,
684 r: Record,
685 record_type: ValidationType,
686 is_client_put: bool,
687 ) -> Result<()> {
688 let key = &r.key;
689 let record_key = PrettyPrintRecordKey::from(&r.key).into_owned();
690 debug!("PUTting a verified Record: {record_key:?}");
691
692 if let Some((existing_record, _timestamp)) = self.records_cache.remove(key) {
696 if existing_record.value == r.value {
697 self.records_cache.push_back(key.clone(), existing_record);
701 return Ok(());
703 }
704 }
705
706 if is_client_put {
708 self.records_cache.push_back(key.clone(), r.clone());
709 }
710
711 self.prune_records_if_needed(key)?;
712
713 let filename = Self::generate_filename(key);
714 let file_path = self.config.storage_dir.join(&filename);
715
716 let encryption_details = self.encryption_details.clone();
717 let cloned_cmd_sender = self.local_swarm_cmd_sender.clone();
718
719 let record_key2 = record_key.clone();
720 spawn(async move {
721 let key = r.key.clone();
722 let data_type = match RecordHeader::get_data_type(&r) {
723 Ok(data_type) => data_type,
724 Err(err) => {
725 error!(
726 "Error get data_type of record {record_key2:?} filename: {filename}, error: {err:?}"
727 );
728 return;
729 }
730 };
731 if let Some(bytes) = Self::prepare_record_bytes(r, encryption_details) {
732 let cmd = match fs::write(&file_path, bytes) {
733 Ok(_) => {
734 info!("Wrote record {record_key2:?} to disk! filename: {filename}");
736
737 LocalSwarmCmd::AddLocalRecordAsStored {
738 key,
739 record_type,
740 data_type,
741 }
742 }
743 Err(err) => {
744 error!(
745 "Error writing record {record_key2:?} filename: {filename}, error: {err:?}"
746 );
747 LocalSwarmCmd::RemoveFailedLocalRecord { key }
748 }
749 };
750
751 send_local_swarm_cmd(cloned_cmd_sender, cmd);
752 }
753 });
754
755 Ok(())
756 }
757
758 pub(crate) fn quoting_metrics(
761 &self,
762 key: &Key,
763 data_type: u32,
764 data_size: usize,
765 network_size: Option<u64>,
766 ) -> (QuotingMetrics, bool) {
767 let records_stored = self.records.len();
768 let records_per_type = self.records_per_type();
769
770 let live_time = if let Ok(elapsed) = self.timestamp.elapsed() {
771 elapsed.as_secs()
772 } else {
773 0
774 };
775
776 let mut quoting_metrics = QuotingMetrics {
777 data_type,
778 data_size,
779 close_records_stored: records_stored,
780 records_per_type,
781 max_records: self.config.max_records,
782 received_payment_count: self.received_payment_count,
783 live_time,
784 network_density: None,
785 network_size,
786 };
787
788 if let Some(distance_range) = self.responsible_distance_range {
789 let relevant_records = self.get_records_within_distance_range(distance_range);
790
791 quoting_metrics.network_density = Some(distance_range.0.to_big_endian());
793
794 quoting_metrics.close_records_stored = relevant_records;
795 } else {
796 info!("Basing cost of _total_ records stored.");
797 };
798
799 info!("Quoting_metrics {quoting_metrics:?}");
801
802 let is_stored = self.contains(key);
803 (quoting_metrics, is_stored)
804 }
805
806 pub(crate) fn payment_received(&mut self) {
808 self.received_payment_count = self.received_payment_count.saturating_add(1);
809
810 self.flush_historic_quoting_metrics();
811 }
812
813 pub fn get_records_within_distance_range(&self, range: Distance) -> usize {
815 let within_range = self
816 .records_by_distance
817 .range(..range)
818 .collect::<Vec<_>>()
819 .len();
820
821 Marker::CloseRecordsLen(within_range).log();
822
823 within_range
824 }
825
826 pub(crate) fn set_responsible_distance_range(&mut self, responsible_distance: Distance) {
828 self.responsible_distance_range = Some(responsible_distance);
829 }
830
831 fn records_per_type(&self) -> Vec<(u32, u32)> {
832 let mut map = BTreeMap::new();
833 for (_, _, data_type) in self.records.values() {
834 *map.entry(data_type.get_index()).or_insert(0) += 1;
835 }
836 map.into_iter().collect()
837 }
838}
839
840impl RecordStore for NodeRecordStore {
841 type RecordsIter<'a> = vec::IntoIter<Cow<'a, Record>>;
842 type ProvidedIter<'a> = vec::IntoIter<Cow<'a, ProviderRecord>>;
843
844 fn get(&self, k: &Key) -> Option<Cow<'_, Record>> {
845 let key = PrettyPrintRecordKey::from(k);
849
850 let cached_record = self.records_cache.get(k);
851 if let Some((record, _timestamp)) = cached_record {
853 return Some(Cow::Owned(record.clone()));
854 }
855
856 if !self.records.contains_key(k) {
857 debug!("Record not found locally: {key:?}");
858 return None;
859 }
860
861 debug!("GET request for Record key: {key}");
862
863 Self::read_from_disk(&self.encryption_details, k, &self.config.storage_dir)
864 }
865
866 fn put(&mut self, record: Record) -> Result<()> {
867 let record_key = PrettyPrintRecordKey::from(&record.key);
868
869 if record.value.len() >= self.config.max_value_bytes {
870 warn!(
871 "Record {record_key:?} not stored. Value too large: {} bytes",
872 record.value.len()
873 );
874 return Err(Error::ValueTooLarge);
875 }
876
877 match RecordHeader::from_record(&record) {
880 Ok(record_header) => {
881 match record_header.kind {
882 RecordKind::DataWithPayment(_) => {
883 debug!("Record {record_key:?} with payment shall always be processed.");
884 }
885 RecordKind::DataOnly(_) => {
887 match self.records.get(&record.key) {
892 Some((_addr, ValidationType::Chunk, _data_type)) => {
893 debug!("Chunk {record_key:?} already exists.");
894 return Ok(());
895 }
896 Some((
897 _addr,
898 ValidationType::NonChunk(existing_content_hash),
899 _data_type,
900 )) => {
901 let content_hash = XorName::from_content(&record.value);
902 if content_hash == *existing_content_hash {
903 debug!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists.");
904 return Ok(());
905 }
906 }
907 _ => {}
908 }
909 }
910 }
911 }
912 Err(err) => {
913 error!("For record {record_key:?}, failed to parse record_header {err:?}");
914 return Ok(());
915 }
916 }
917
918 debug!("Unverified Record {record_key:?} try to validate and store");
919 let event_sender = self.network_event_sender.clone();
920 let _handle = spawn(async move {
922 if let Err(error) = event_sender
923 .send(NetworkEvent::UnverifiedRecord(record))
924 .await
925 {
926 error!("SwarmDriver failed to send event: {}", error);
927 }
928 });
929
930 Ok(())
931 }
932
933 fn remove(&mut self, k: &Key) {
934 if let Some((addr, _, _)) = self.records.remove(k) {
936 let distance = self.local_address.distance(&addr);
937 let _ = self.records_by_distance.remove(&distance);
938 }
939
940 self.records_cache.remove(k);
941
942 #[cfg(feature = "open-metrics")]
943 if let Some(metric) = &self.record_count_metric {
944 let _ = metric.set(self.records.len() as i64);
945 }
946
947 if let Some((farthest_record, _)) = self.farthest_record.clone() {
948 if farthest_record == *k {
949 self.farthest_record = self.calculate_farthest();
950 }
951 }
952
953 let filename = Self::generate_filename(k);
954 let file_path = self.config.storage_dir.join(&filename);
955
956 let _handle = spawn(async move {
957 match fs::remove_file(file_path) {
958 Ok(_) => {
959 info!("Removed record from disk! filename: {filename}");
960 }
961 Err(err) => {
962 error!("Error while removing file. filename: {filename}, error: {err:?}");
963 }
964 }
965 });
966 }
967
968 fn records(&self) -> Self::RecordsIter<'_> {
969 vec![].into_iter()
971 }
972
973 fn add_provider(&mut self, _record: ProviderRecord) -> Result<()> {
974 Ok(())
976 }
977
978 fn providers(&self, _key: &Key) -> Vec<ProviderRecord> {
979 vec![]
981 }
982
983 fn provided(&self) -> Self::ProvidedIter<'_> {
984 vec![].into_iter()
986 }
987
988 fn remove_provider(&mut self, _key: &Key, _provider: &PeerId) {
989 }
991}
992
993#[expect(trivial_casts)]
994#[cfg(test)]
995mod tests {
996 use super::*;
997 use bls::SecretKey;
998 use xor_name::XorName;
999
1000 use ant_protocol::storage::{
1001 try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, DataTypes, Scratchpad,
1002 };
1003 use assert_fs::{
1004 fixture::{PathChild, PathCreateDir},
1005 TempDir,
1006 };
1007 use bytes::Bytes;
1008 use eyre::ContextCompat;
1009 use libp2p::{core::multihash::Multihash, kad::RecordKey};
1010 use quickcheck::*;
1011 use tokio::runtime::Runtime;
1012 use tokio::time::{sleep, Duration};
1013
1014 const MULITHASH_CODE: u64 = 0x12;
1015
1016 #[derive(Clone, Debug)]
1017 struct ArbitraryKey(Key);
1018 #[derive(Clone, Debug)]
1019 struct ArbitraryRecord(Record);
1020
1021 impl Arbitrary for ArbitraryKey {
1022 fn arbitrary(g: &mut Gen) -> ArbitraryKey {
1023 let hash: [u8; 32] = core::array::from_fn(|_| u8::arbitrary(g));
1024 ArbitraryKey(Key::from(
1025 Multihash::<64>::wrap(MULITHASH_CODE, &hash).expect("Failed to gen MultiHash"),
1026 ))
1027 }
1028 }
1029
1030 impl Arbitrary for ArbitraryRecord {
1031 fn arbitrary(g: &mut Gen) -> ArbitraryRecord {
1032 let value = match try_serialize_record(
1033 &(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
1034 RecordKind::DataOnly(DataTypes::Chunk),
1035 ) {
1036 Ok(value) => value.to_vec(),
1037 Err(err) => panic!("Cannot generate record value {err:?}"),
1038 };
1039 let record = Record {
1040 key: ArbitraryKey::arbitrary(g).0,
1041 value,
1042 publisher: None,
1043 expires: None,
1044 };
1045 ArbitraryRecord(record)
1046 }
1047 }
1048
1049 #[test]
1050 fn put_get_remove_record() {
1051 fn prop(r: ArbitraryRecord) {
1052 let rt = if let Ok(rt) = Runtime::new() {
1053 rt
1054 } else {
1055 panic!("Cannot create runtime");
1056 };
1057 rt.block_on(testing_thread(r));
1058 }
1059 quickcheck(prop as fn(_))
1060 }
1061
1062 async fn testing_thread(r: ArbitraryRecord) {
1063 let r = r.0;
1064 let (network_event_sender, mut network_event_receiver) = mpsc::channel(1);
1065 let (swarm_cmd_sender, _) = mpsc::channel(1);
1066
1067 let mut store = NodeRecordStore::with_config(
1068 PeerId::random(),
1069 Default::default(),
1070 network_event_sender,
1071 swarm_cmd_sender,
1072 #[cfg(feature = "open-metrics")]
1073 None,
1074 );
1075
1076 assert!(store.put(r.clone()).is_ok());
1078 assert!(store.get(&r.key).is_none());
1079
1080 let returned_record = if let Some(event) = network_event_receiver.recv().await {
1081 if let NetworkEvent::UnverifiedRecord(record) = event {
1082 record
1083 } else {
1084 panic!("Unexpected network event {event:?}");
1085 }
1086 } else {
1087 panic!("Failed recevied the record for further verification");
1088 };
1089
1090 let returned_record_key = returned_record.key.clone();
1091
1092 assert!(store
1093 .put_verified(returned_record, ValidationType::Chunk, true)
1094 .is_ok());
1095
1096 store.mark_as_stored(returned_record_key, ValidationType::Chunk, DataTypes::Chunk);
1099
1100 let max_iterations = 10;
1102 let mut iteration = 0;
1103 while iteration < max_iterations {
1104 if store
1108 .get(&r.key)
1109 .is_some_and(|record| Cow::Borrowed(&r) == record)
1110 {
1111 break;
1112 }
1113 sleep(Duration::from_millis(100)).await;
1114 iteration += 1;
1115 }
1116 if iteration == max_iterations {
1117 panic!("record_store test failed with stored record cann't be read back");
1118 }
1119
1120 assert_eq!(
1121 Some(Cow::Borrowed(&r)),
1122 store.get(&r.key),
1123 "record can be retrieved after put"
1124 );
1125 store.remove(&r.key);
1126
1127 assert!(store.get(&r.key).is_none());
1128 }
1129
1130 #[tokio::test]
1131 async fn can_store_after_restart() -> eyre::Result<()> {
1132 let tmp_dir = TempDir::new()?;
1133 let current_test_dir = tmp_dir.child("can_store_after_restart");
1134 current_test_dir.create_dir_all()?;
1135
1136 let store_config = NodeRecordStoreConfig {
1137 storage_dir: current_test_dir.to_path_buf(),
1138 encryption_seed: [1u8; 16],
1139 ..Default::default()
1140 };
1141 let self_id = PeerId::random();
1142
1143 let (network_event_sender, _network_event_receiver) = mpsc::channel(1);
1145 let (swarm_cmd_sender, mut swarm_cmd_receiver) = mpsc::channel(1);
1146
1147 let mut store = NodeRecordStore::with_config(
1148 self_id,
1149 store_config.clone(),
1150 network_event_sender.clone(),
1151 swarm_cmd_sender.clone(),
1152 #[cfg(feature = "open-metrics")]
1153 None,
1154 );
1155
1156 let chunk_data = Bytes::from_static(b"Test chunk data");
1158 let chunk = Chunk::new(chunk_data);
1159 let chunk_address = *chunk.address();
1160
1161 let record = Record {
1163 key: NetworkAddress::ChunkAddress(chunk_address).to_record_key(),
1164 value: try_serialize_record(&chunk, RecordKind::DataOnly(DataTypes::Chunk))?.to_vec(),
1165 expires: None,
1166 publisher: None,
1167 };
1168
1169 assert!(store
1171 .put_verified(record.clone(), ValidationType::Chunk, true)
1172 .is_ok());
1173
1174 if let Some(cmd) = swarm_cmd_receiver.recv().await {
1176 match cmd {
1177 LocalSwarmCmd::AddLocalRecordAsStored {
1178 key,
1179 record_type,
1180 data_type,
1181 } => {
1182 store.mark_as_stored(key, record_type, data_type);
1183 }
1184 _ => panic!("Unexpected command received"),
1185 }
1186 }
1187
1188 let stored_record = store.get(&record.key);
1190 assert!(stored_record.is_some(), "Chunk should be stored initially");
1191
1192 sleep(Duration::from_secs(1)).await;
1194
1195 let (new_network_event_sender, _new_network_event_receiver) = mpsc::channel(1);
1197 let (new_swarm_cmd_sender, _new_swarm_cmd_receiver) = mpsc::channel(1);
1198
1199 drop(store);
1201 let store = NodeRecordStore::with_config(
1202 self_id,
1203 store_config,
1204 new_network_event_sender,
1205 new_swarm_cmd_sender,
1206 #[cfg(feature = "open-metrics")]
1207 None,
1208 );
1209
1210 let stored_record = store.get(&record.key);
1212 assert!(
1213 stored_record.is_some(),
1214 "Chunk should be stored after restart with same key"
1215 );
1216
1217 let (diff_network_event_sender, _diff_network_event_receiver) = mpsc::channel(1);
1219 let (diff_swarm_cmd_sender, _diff_swarm_cmd_receiver) = mpsc::channel(1);
1220
1221 let self_id_diff = PeerId::random();
1223 let store_config_diff = NodeRecordStoreConfig {
1224 storage_dir: current_test_dir.to_path_buf(),
1225 encryption_seed: [2u8; 16],
1226 ..Default::default()
1227 };
1228 let store_diff = NodeRecordStore::with_config(
1229 self_id_diff,
1230 store_config_diff,
1231 diff_network_event_sender,
1232 diff_swarm_cmd_sender,
1233 #[cfg(feature = "open-metrics")]
1234 None,
1235 );
1236
1237 assert!(
1240 store_diff.get(&record.key).is_none(),
1241 "Chunk should be gone with different encryption key"
1242 );
1243
1244 Ok(())
1245 }
1246
1247 #[tokio::test]
1248 async fn can_store_and_retrieve_chunk() {
1249 let temp_dir = std::env::temp_dir();
1250 let store_config = NodeRecordStoreConfig {
1251 storage_dir: temp_dir,
1252 ..Default::default()
1253 };
1254 let self_id = PeerId::random();
1255 let (network_event_sender, _) = mpsc::channel(1);
1256 let (swarm_cmd_sender, _) = mpsc::channel(1);
1257
1258 let mut store = NodeRecordStore::with_config(
1259 self_id,
1260 store_config,
1261 network_event_sender,
1262 swarm_cmd_sender,
1263 #[cfg(feature = "open-metrics")]
1264 None,
1265 );
1266
1267 let chunk_data = Bytes::from_static(b"Test chunk data");
1269 let chunk = Chunk::new(chunk_data.clone());
1270 let chunk_address = *chunk.address();
1271
1272 let record = Record {
1274 key: NetworkAddress::ChunkAddress(chunk_address).to_record_key(),
1275 value: chunk_data.to_vec(),
1276 expires: None,
1277 publisher: None,
1278 };
1279
1280 assert!(store
1282 .put_verified(record.clone(), ValidationType::Chunk, true)
1283 .is_ok());
1284
1285 store.mark_as_stored(record.key.clone(), ValidationType::Chunk, DataTypes::Chunk);
1287
1288 let stored_record = store.get(&record.key);
1290 assert!(stored_record.is_some(), "Chunk should be stored");
1291
1292 if let Some(stored) = stored_record {
1293 assert_eq!(
1294 stored.value, chunk_data,
1295 "Stored chunk data should match original"
1296 );
1297
1298 let stored_address = ChunkAddress::new(XorName::from_content(&stored.value));
1299 assert_eq!(
1300 stored_address, chunk_address,
1301 "Stored chunk address should match original"
1302 );
1303 }
1304
1305 store.remove(&record.key);
1307 assert!(
1308 store.get(&record.key).is_none(),
1309 "Chunk should be removed after cleanup"
1310 );
1311 }
1312
1313 #[tokio::test]
1314 async fn can_store_and_retrieve_scratchpad() -> eyre::Result<()> {
1315 let temp_dir = std::env::temp_dir();
1316 let store_config = NodeRecordStoreConfig {
1317 storage_dir: temp_dir,
1318 ..Default::default()
1319 };
1320 let self_id = PeerId::random();
1321 let (network_event_sender, _) = mpsc::channel(1);
1322 let (swarm_cmd_sender, _) = mpsc::channel(1);
1323
1324 let mut store = NodeRecordStore::with_config(
1325 self_id,
1326 store_config,
1327 network_event_sender,
1328 swarm_cmd_sender,
1329 #[cfg(feature = "open-metrics")]
1330 None,
1331 );
1332
1333 let unencrypted_scratchpad_data = Bytes::from_static(b"Test scratchpad data");
1335 let owner_sk = SecretKey::random();
1336
1337 let scratchpad = Scratchpad::new(&owner_sk, 0, &unencrypted_scratchpad_data, 0);
1338
1339 let scratchpad_address = *scratchpad.address();
1340
1341 let record = Record {
1343 key: NetworkAddress::ScratchpadAddress(scratchpad_address).to_record_key(),
1344 value: try_serialize_record(&scratchpad, RecordKind::DataOnly(DataTypes::Scratchpad))?
1345 .to_vec(),
1346 expires: None,
1347 publisher: None,
1348 };
1349
1350 assert!(store
1352 .put_verified(
1353 record.clone(),
1354 ValidationType::NonChunk(XorName::from_content(&record.value)),
1355 true,
1356 )
1357 .is_ok());
1358
1359 store.mark_as_stored(
1361 record.key.clone(),
1362 ValidationType::NonChunk(XorName::from_content(&record.value)),
1363 DataTypes::Scratchpad,
1364 );
1365
1366 let stored_record = store.get(&record.key);
1368 assert!(stored_record.is_some(), "Scratchpad should be stored");
1369
1370 if let Some(stored) = stored_record {
1371 let scratchpad = try_deserialize_record::<Scratchpad>(&stored)?;
1372
1373 let stored_address = scratchpad.address();
1374 assert_eq!(
1375 stored_address, &scratchpad_address,
1376 "Stored scratchpad address should match original"
1377 );
1378
1379 let decrypted_data = scratchpad.decrypt_data(&owner_sk)?;
1380
1381 assert_eq!(
1382 decrypted_data, unencrypted_scratchpad_data,
1383 "Stored scratchpad data should match original"
1384 );
1385 }
1386
1387 store.remove(&record.key);
1388 assert!(
1389 store.get(&record.key).is_none(),
1390 "Scratchpad should be removed after cleanup"
1391 );
1392
1393 Ok(())
1394 }
1395 #[tokio::test]
1396 async fn pruning_on_full() -> Result<()> {
1397 let max_iterations = 10;
1398 let max_records = 50;
1400
1401 let temp_dir = std::env::temp_dir();
1402 let unique_dir_name = uuid::Uuid::new_v4().to_string();
1403 let storage_dir = temp_dir.join(unique_dir_name);
1404 fs::create_dir_all(&storage_dir).expect("Failed to create directory");
1405
1406 let store_config = NodeRecordStoreConfig {
1410 max_records,
1411 storage_dir,
1412 ..Default::default()
1413 };
1414 let self_id = PeerId::random();
1415 let (network_event_sender, _) = mpsc::channel(1);
1416 let (swarm_cmd_sender, _) = mpsc::channel(1);
1417
1418 let mut store = NodeRecordStore::with_config(
1419 self_id,
1420 store_config.clone(),
1421 network_event_sender,
1422 swarm_cmd_sender,
1423 #[cfg(feature = "open-metrics")]
1424 None,
1425 );
1426 let mut stored_records_at_some_point: Vec<RecordKey> = vec![];
1428 let self_address = NetworkAddress::from(self_id);
1429
1430 let mut failed_records = vec![];
1432
1433 for _ in 0..max_records * 2 {
1435 let record_key = NetworkAddress::from(PeerId::random()).to_record_key();
1437 let value = match try_serialize_record(
1438 &(0..50).map(|_| rand::random::<u8>()).collect::<Bytes>(),
1439 RecordKind::DataOnly(DataTypes::Chunk),
1440 ) {
1441 Ok(value) => value.to_vec(),
1442 Err(err) => panic!("Cannot generate record value {err:?}"),
1443 };
1444 let record = Record {
1445 key: record_key.clone(),
1446 value,
1447 publisher: None,
1448 expires: None,
1449 };
1450
1451 let succeeded = store
1453 .put_verified(record, ValidationType::Chunk, true)
1454 .is_ok();
1455
1456 if !succeeded {
1457 failed_records.push(record_key.clone());
1458 println!("failed {:?}", PrettyPrintRecordKey::from(&record_key));
1459 } else {
1460 store.mark_as_stored(record_key.clone(), ValidationType::Chunk, DataTypes::Chunk);
1463
1464 println!("success sotred len: {:?} ", store.record_addresses().len());
1465 stored_records_at_some_point.push(record_key.clone());
1466 if stored_records_at_some_point.len() <= max_records {
1467 assert!(succeeded);
1468 }
1469 let mut iteration = 0;
1471 while iteration < max_iterations {
1472 if store.get(&record_key).is_some() {
1473 break;
1474 }
1475 sleep(Duration::from_millis(100)).await;
1476 iteration += 1;
1477 }
1478 if iteration == max_iterations {
1479 panic!("record_store prune test failed with stored record {record_key:?} can't be read back");
1480 }
1481 }
1482 }
1483
1484 let stored_data_at_end = store.record_addresses();
1485 assert!(
1486 stored_data_at_end.len() == max_records,
1487 "Stored records ({:?}) should be max_records, {max_records:?}",
1488 stored_data_at_end.len(),
1489 );
1490
1491 assert!(
1493 stored_records_at_some_point.len() >= max_records,
1494 "we should have stored ata least max over time"
1495 );
1496
1497 let mut sorted_stored_data = stored_data_at_end.iter().collect_vec();
1499
1500 sorted_stored_data
1501 .sort_by(|(a, _), (b, _)| self_address.distance(a).cmp(&self_address.distance(b)));
1502
1503 if let Some((most_distant_data, _)) = sorted_stored_data.last() {
1505 for failed_record in failed_records {
1506 let failed_data = NetworkAddress::from(&failed_record);
1507 assert!(
1508 self_address.distance(&failed_data) > self_address.distance(most_distant_data),
1509 "failed record {failed_data:?} should be farther than the farthest stored record {most_distant_data:?}"
1510 );
1511 }
1512
1513 for data in stored_records_at_some_point {
1515 let data_addr = NetworkAddress::from(&data);
1516 if !sorted_stored_data.contains(&(&data_addr, &ValidationType::Chunk)) {
1517 assert!(
1518 self_address.distance(&data_addr)
1519 > self_address.distance(most_distant_data),
1520 "stored record should be farther than the farthest stored record"
1521 );
1522 }
1523 }
1524 }
1525
1526 Ok(())
1527 }
1528
1529 #[tokio::test]
1530 async fn get_records_within_range() -> eyre::Result<()> {
1531 let max_records = 50;
1532
1533 let temp_dir = std::env::temp_dir();
1534 let unique_dir_name = uuid::Uuid::new_v4().to_string();
1535 let storage_dir = temp_dir.join(unique_dir_name);
1536
1537 let store_config = NodeRecordStoreConfig {
1539 max_records,
1540 storage_dir,
1541 ..Default::default()
1542 };
1543 let self_id = PeerId::random();
1544 let (network_event_sender, _) = mpsc::channel(1);
1545 let (swarm_cmd_sender, _) = mpsc::channel(1);
1546 let mut store = NodeRecordStore::with_config(
1547 self_id,
1548 store_config,
1549 network_event_sender,
1550 swarm_cmd_sender,
1551 #[cfg(feature = "open-metrics")]
1552 None,
1553 );
1554
1555 let mut stored_records: Vec<RecordKey> = vec![];
1556 let self_address = NetworkAddress::from(self_id);
1557
1558 for _ in 0..max_records - 1 {
1561 let record_key = NetworkAddress::from(PeerId::random()).to_record_key();
1562 let value = match try_serialize_record(
1563 &(0..max_records)
1564 .map(|_| rand::random::<u8>())
1565 .collect::<Bytes>(),
1566 RecordKind::DataOnly(DataTypes::Chunk),
1567 ) {
1568 Ok(value) => value.to_vec(),
1569 Err(err) => panic!("Cannot generate record value {err:?}"),
1570 };
1571 let record = Record {
1572 key: record_key.clone(),
1573 value,
1574 publisher: None,
1575 expires: None,
1576 };
1577 assert!(store
1578 .put_verified(record, ValidationType::Chunk, true)
1579 .is_ok());
1580 store.mark_as_stored(record_key.clone(), ValidationType::Chunk, DataTypes::Chunk);
1583
1584 stored_records.push(record_key.clone());
1585 stored_records.sort_by(|a, b| {
1586 let a = NetworkAddress::from(a);
1587 let b = NetworkAddress::from(b);
1588 self_address.distance(&a).cmp(&self_address.distance(&b))
1589 });
1590 }
1591
1592 let halfway_record_address = NetworkAddress::from(
1594 stored_records
1595 .get(max_records / 2)
1596 .wrap_err("Could not parse record store key")?,
1597 );
1598 let distance = &self_address.distance(&halfway_record_address);
1600
1601 store.set_responsible_distance_range(*distance);
1603
1604 let records_in_range = store.get_records_within_distance_range(*distance);
1605
1606 assert!(
1609 records_in_range >= max_records / 2,
1610 "Not enough records in range {records_in_range}/{}",
1611 max_records / 2
1612 );
1613
1614 Ok(())
1615 }
1616
1617 #[tokio::test]
1618 async fn historic_quoting_metrics() -> Result<()> {
1619 let temp_dir = std::env::temp_dir();
1620 let unique_dir_name = uuid::Uuid::new_v4().to_string();
1621 let storage_dir = temp_dir.join(unique_dir_name);
1622 fs::create_dir_all(&storage_dir).expect("Failed to create directory");
1623 let historic_quote_dir = storage_dir.clone();
1624
1625 let store_config = NodeRecordStoreConfig {
1626 storage_dir,
1627 historic_quote_dir,
1628 ..Default::default()
1629 };
1630 let self_id = PeerId::random();
1631 let (network_event_sender, _) = mpsc::channel(1);
1632 let (swarm_cmd_sender, _) = mpsc::channel(1);
1633
1634 let mut store = NodeRecordStore::with_config(
1635 self_id,
1636 store_config.clone(),
1637 network_event_sender.clone(),
1638 swarm_cmd_sender.clone(),
1639 #[cfg(feature = "open-metrics")]
1640 None,
1641 );
1642
1643 store.payment_received();
1644
1645 sleep(Duration::from_millis(5000)).await;
1647
1648 let new_store = NodeRecordStore::with_config(
1649 self_id,
1650 store_config,
1651 network_event_sender,
1652 swarm_cmd_sender,
1653 #[cfg(feature = "open-metrics")]
1654 None,
1655 );
1656
1657 assert_eq!(1, new_store.received_payment_count);
1658 assert_eq!(store.timestamp, new_store.timestamp);
1659
1660 Ok(())
1661 }
1662
1663 #[tokio::test]
1664 async fn test_cache_pruning_and_size_limit() {
1665 let cache_size = 3;
1667 let cache_timeout = Duration::from_millis(100);
1668 let mut cache = RecordCache::new(cache_size, cache_timeout);
1669
1670 let record1 = Record {
1672 key: RecordKey::new(b"key1"),
1673 value: b"value1".to_vec(),
1674 publisher: None,
1675 expires: None,
1676 };
1677 let record2 = Record {
1678 key: RecordKey::new(b"key2"),
1679 value: b"value2".to_vec(),
1680 publisher: None,
1681 expires: None,
1682 };
1683 let record3 = Record {
1684 key: RecordKey::new(b"key3"),
1685 value: b"value3".to_vec(),
1686 publisher: None,
1687 expires: None,
1688 };
1689 let record4 = Record {
1690 key: RecordKey::new(b"key4"),
1691 value: b"value4".to_vec(),
1692 publisher: None,
1693 expires: None,
1694 };
1695
1696 cache.push_back(record1.key.clone(), record1.clone());
1698 sleep(Duration::from_millis(1)).await;
1699 cache.push_back(record2.key.clone(), record2.clone());
1700 sleep(Duration::from_millis(1)).await;
1701 cache.push_back(record3.key.clone(), record3.clone());
1702 sleep(Duration::from_millis(1)).await;
1703
1704 assert!(cache.get(&record1.key).is_some());
1706 assert!(cache.get(&record2.key).is_some());
1707 assert!(cache.get(&record3.key).is_some());
1708
1709 cache.push_back(record4.key.clone(), record4.clone());
1711
1712 assert_eq!(cache.records_cache.len(), cache_size);
1714
1715 assert!(cache.get(&record1.key).is_none());
1717
1718 sleep(cache_timeout + Duration::from_millis(10)).await;
1720
1721 let record5 = Record {
1723 key: RecordKey::new(b"key5"),
1724 value: b"value5".to_vec(),
1725 publisher: None,
1726 expires: None,
1727 };
1728 cache.push_back(record5.key.clone(), record5.clone());
1729
1730 assert!(cache.get(&record2.key).is_none());
1732 assert!(cache.get(&record3.key).is_none());
1733 assert!(cache.get(&record4.key).is_none());
1734
1735 assert!(cache.get(&record5.key).is_some());
1737 }
1738}