1use std::collections::HashMap;
2use std::hash::Hash;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::atomic::Ordering::Acquire;
6use std::time::Duration;
7
8use log::info;
9
10use crate::cache::command::acknowledgement::CommandAcknowledgement;
11use crate::cache::command::command_executor::{CommandExecutor, CommandSendResult, shutdown_result};
12use crate::cache::command::{CommandType, RejectionReason};
13use crate::cache::config::Config;
14use crate::cache::config::weight_calculation::Calculation;
15use crate::cache::errors::Errors;
16use crate::cache::expiration::TTLTicker;
17use crate::cache::key_description::KeyDescription;
18use crate::cache::policy::admission_policy::AdmissionPolicy;
19use crate::cache::pool::Pool;
20use crate::cache::put_or_update::PutOrUpdateRequest;
21use crate::cache::stats::{ConcurrentStatsCounter, StatsSummary};
22use crate::cache::store::{Store, TypeOfExpiryUpdate};
23use crate::cache::store::key_value_ref::KeyValueRef;
24use crate::cache::store::stored_value::StoredValue;
25use crate::cache::types::{KeyId, Weight};
26use crate::cache::unique_id::increasing_id_generator::IncreasingIdGenerator;
27
28pub struct CacheD<Key, Value>
71 where Key: Hash + Eq + Send + Sync + Clone + 'static,
72 Value: Send + Sync + 'static {
73 config: Config<Key, Value>,
74 store: Arc<Store<Key, Value>>,
75 command_executor: CommandExecutor<Key, Value>,
76 admission_policy: Arc<AdmissionPolicy<Key>>,
77 pool: Pool<AdmissionPolicy<Key>>,
78 ttl_ticker: Arc<TTLTicker>,
79 id_generator: IncreasingIdGenerator,
80 is_shutting_down: AtomicBool,
81}
82
83impl<Key, Value> CacheD<Key, Value>
84 where Key: Hash + Eq + Send + Sync + Clone + 'static,
85 Value: Send + Sync + 'static {
86 pub fn new(config: Config<Key, Value>) -> Self {
88 assert!(config.counters > 0);
89
90 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
91 let store = Store::new(config.clock.clone_box(), stats_counter.clone(), config.capacity, config.shards);
92 let admission_policy = Arc::new(AdmissionPolicy::new(config.counters, config.cache_weight_config(), stats_counter.clone()));
93 let pool = Pool::new(config.access_pool_size, config.access_buffer_size, admission_policy.clone());
94 let ttl_ticker = Self::ttl_ticker(&config, store.clone(), admission_policy.clone());
95 let command_buffer_size = config.command_buffer_size;
96
97 CacheD {
98 config,
99 store: store.clone(),
100 command_executor: CommandExecutor::new(store, admission_policy.clone(), stats_counter, ttl_ticker.clone(), command_buffer_size),
101 admission_policy,
102 pool,
103 ttl_ticker,
104 id_generator: IncreasingIdGenerator::new(),
105 is_shutting_down: AtomicBool::new(false),
106 }
107 }
108
109 pub fn put(&self, key: Key, value: Value) -> CommandSendResult {
132 let weight = (self.config.weight_calculation_fn)(&key, &value, false);
133 assert!(weight > 0, "{}", Errors::WeightCalculationGtZero);
134 self.put_with_weight(key, value, weight)
135 }
136
137 pub fn put_with_weight(&self, key: Key, value: Value, weight: Weight) -> CommandSendResult {
161 if self.is_shutting_down() { return shutdown_result(); }
162
163 assert!(weight > 0, "{}", Errors::KeyWeightGtZero("put_with_weight"));
164 if self.store.is_present(&key) {
165 return Ok(CommandAcknowledgement::rejected(RejectionReason::KeyAlreadyExists))
166 }
167 self.command_executor.send(CommandType::Put(
168 self.key_description(key, weight),
169 value,
170 ))
171 }
172
173 pub fn put_with_ttl(&self, key: Key, value: Value, time_to_live: Duration) -> CommandSendResult {
197 if self.is_shutting_down() { return shutdown_result(); }
198
199 let weight = (self.config.weight_calculation_fn)(&key, &value, true);
200 assert!(weight > 0, "{}", Errors::WeightCalculationGtZero);
201 if self.store.is_present(&key) {
202 return Ok(CommandAcknowledgement::rejected(RejectionReason::KeyAlreadyExists))
203 }
204 self.command_executor.send(CommandType::PutWithTTL(
205 self.key_description(key, weight), value, time_to_live)
206 )
207 }
208
209 pub fn put_with_weight_and_ttl(&self, key: Key, value: Value, weight: Weight, time_to_live: Duration) -> CommandSendResult {
234 if self.is_shutting_down() { return shutdown_result(); }
235
236 assert!(weight > 0, "{}", Errors::KeyWeightGtZero("put_with_weight_and_ttl"));
237 if self.store.is_present(&key) {
238 return Ok(CommandAcknowledgement::rejected(RejectionReason::KeyAlreadyExists))
239 }
240 self.command_executor.send(CommandType::PutWithTTL(
241 self.key_description(key, weight), value, time_to_live,
242 ))
243 }
244
245 pub fn put_or_update(&self, request: PutOrUpdateRequest<Key, Value>) -> CommandSendResult {
265 if self.is_shutting_down() { return shutdown_result(); }
266
267 let updated_weight = request.updated_weight(&self.config.weight_calculation_fn);
268 let (key, value, time_to_live)
269 = (request.key, request.value, request.time_to_live);
270
271 let update_response
272 = self.store.update(&key, value, time_to_live, request.remove_time_to_live);
273
274 if !update_response.did_update_happen() {
275 let value = update_response.value();
276 assert!(value.is_some(), "{}", Errors::PutOrUpdateValueMissing);
277 assert!(updated_weight.is_some());
278
279 let value = value.unwrap();
280 let weight = updated_weight.unwrap();
281 assert!(weight > 0, "{}", Errors::KeyWeightGtZero("PutOrUpdate"));
282
283 return if let Some(time_to_live) = time_to_live {
284 self.command_executor.send(CommandType::PutWithTTL(
285 self.key_description(key, weight), value, time_to_live,
286 ))
287 } else {
288 self.command_executor.send(CommandType::Put(
289 self.key_description(key, weight),
290 value,
291 ))
292 };
293 }
294
295 let key_id = update_response.key_id_or_panic();
296 let existing_weight = self.admission_policy.weight_of(&key_id).unwrap_or(0);
297
298 let updated_weight = match update_response.type_of_expiry_update() {
299 TypeOfExpiryUpdate::Added(key_id, expiry) => {
300 self.ttl_ticker.put(key_id, expiry);
301 updated_weight.or_else(|| Some(existing_weight + Calculation::ttl_ticker_entry_size() as i64))
302 }
303 TypeOfExpiryUpdate::Deleted(key_id, expiry) => {
304 self.ttl_ticker.delete(&key_id, &expiry);
305 updated_weight.or_else(|| Some(existing_weight - Calculation::ttl_ticker_entry_size() as i64))
306 }
307 TypeOfExpiryUpdate::Updated(key_id, old_expiry, new_expiry) => {
308 self.ttl_ticker.update(key_id, &old_expiry, new_expiry);
309 updated_weight
310 }
311 _ => updated_weight,
312 };
313
314 if let Some(weight) = updated_weight {
315 assert!(weight > 0, "{}", Errors::KeyWeightGtZero("PutOrUpdate"));
316 return self.command_executor.send(CommandType::UpdateWeight(key_id, weight));
317 }
318 Ok(CommandAcknowledgement::accepted())
319 }
320
321 pub fn delete(&self, key: Key) -> CommandSendResult {
344 if self.is_shutting_down() { return shutdown_result(); }
345
346 self.store.mark_deleted(&key);
347 self.command_executor.send(CommandType::Delete(key))
348 }
349
350 pub fn get_ref(&self, key: &Key) -> Option<KeyValueRef<'_, Key, StoredValue<Value>>> {
374 if self.is_shutting_down() { return None; }
375
376 if let Some(value_ref) = self.store.get_ref(key) {
377 self.mark_key_accessed(key);
378 return Some(value_ref);
379 }
380 None
381 }
382
383 pub fn map_get_ref<MapFn, MappedValue>(&self, key: &Key, map_fn: MapFn) -> Option<MappedValue>
402 where MapFn: Fn(&StoredValue<Value>) -> MappedValue {
403 if self.is_shutting_down() { return None; }
404
405 if let Some(value_ref) = self.get_ref(key) {
406 return Some(map_fn(value_ref.value()));
407 }
408 None
409 }
410
411 pub fn total_weight_used(&self) -> Weight {
413 self.admission_policy.weight_used()
414 }
415
416 pub fn stats_summary(&self) -> StatsSummary {
433 self.store.stats_counter().summary()
434 }
435
436 pub fn shutdown(&self) {
458 if self.is_shutting_down.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed).is_ok() {
459 info!("Starting to shutdown cached");
460 let _ = self.command_executor.shutdown();
461 self.admission_policy.shutdown();
462 self.ttl_ticker.shutdown();
463
464 self.store.clear();
465 self.admission_policy.clear();
466 self.ttl_ticker.clear();
467 }
468 }
469
470 fn mark_key_accessed(&self, key: &Key) {
471 self.pool.add((self.config.key_hash_fn)(key));
472 }
473
474 fn key_description(&self, key: Key, weight: Weight) -> KeyDescription<Key> {
475 let hash = (self.config.key_hash_fn)(&key);
476 KeyDescription::new(key, self.id_generator.next(), hash, weight)
477 }
478
479 fn ttl_ticker(config: &Config<Key, Value>, store: Arc<Store<Key, Value>>, admission_policy: Arc<AdmissionPolicy<Key>>) -> Arc<TTLTicker> {
480 let store_evict_hook = move |key| {
481 store.delete(&key);
482 };
483 let cache_weight_evict_hook = move |key_id: &KeyId| {
484 admission_policy.delete_with_hook(key_id, &store_evict_hook);
485 };
486
487 TTLTicker::new(config.ttl_config(), cache_weight_evict_hook)
488 }
489
490 fn is_shutting_down(&self) -> bool {
491 self.is_shutting_down.load(Acquire)
492 }
493}
494
495impl<Key, Value> CacheD<Key, Value>
496 where Key: Hash + Eq + Send + Sync + Clone + 'static,
497 Value: Send + Sync + Clone + 'static {
498 pub fn get(&self, key: &Key) -> Option<Value> {
515 if self.is_shutting_down() { return None; }
516
517 if let Some(value) = self.store.get(key) {
518 self.mark_key_accessed(key);
519 return Some(value);
520 }
521 None
522 }
523
524 pub fn map_get<MapFn, MappedValue>(&self, key: &Key, map_fn: MapFn) -> Option<MappedValue>
545 where MapFn: Fn(Value) -> MappedValue {
546 if self.is_shutting_down() { return None; }
547
548 if let Some(value) = self.get(key) {
549 return Some(map_fn(value));
550 }
551 None
552 }
553
554 pub fn multi_get<'a>(&self, keys: Vec<&'a Key>) -> HashMap<&'a Key, Option<Value>> {
574 if self.is_shutting_down() { return HashMap::new(); }
575
576 keys.into_iter().map(|key| (key, self.get(key))).collect::<HashMap<_, _>>()
577 }
578
579 pub fn multi_get_iterator<'a>(&'a self, keys: Vec<&'a Key>) -> MultiGetIterator<'a, Key, Value> {
598 MultiGetIterator {
599 cache: self,
600 keys,
601 }
602 }
603
604 pub fn multi_get_map_iterator<'a, MapFn, MappedValue>(&'a self, keys: Vec<&'a Key>, map_fn: MapFn) -> MultiGetMapIterator<'a, Key, Value, MapFn, MappedValue>
623 where MapFn: Fn(Value) -> MappedValue {
624 MultiGetMapIterator {
625 iterator: MultiGetIterator {
626 cache: self,
627 keys,
628 },
629 map_fn,
630 }
631 }
632}
633
634pub struct MultiGetIterator<'a, Key, Value>
649 where Key: Hash + Eq + Send + Sync + Clone + 'static,
650 Value: Send + Sync + Clone + 'static {
651 cache: &'a CacheD<Key, Value>,
652 keys: Vec<&'a Key>,
653}
654
655impl<'a, Key, Value> Iterator for MultiGetIterator<'a, Key, Value>
656 where Key: Hash + Eq + Send + Sync + Clone + 'static,
657 Value: Send + Sync + Clone + 'static {
658 type Item = Option<Value>;
659
660 fn next(&mut self) -> Option<Self::Item> {
661 if self.keys.is_empty() || self.cache.is_shutting_down() {
662 return None;
663 }
664 let key = self.keys.get(0).unwrap();
665 let value = self.cache.get(key);
666
667 self.keys.remove(0);
668 Some(value)
669 }
670}
671
672pub struct MultiGetMapIterator<'a, Key, Value, MapFn, MappedValue>
687 where Key: Hash + Eq + Send + Sync + Clone + 'static,
688 Value: Send + Sync + Clone + 'static,
689 MapFn: Fn(Value) -> MappedValue, {
690 iterator: MultiGetIterator<'a, Key, Value>,
691 map_fn: MapFn,
692}
693
694impl<'a, Key, Value, MapFn, MappedValue> Iterator for MultiGetMapIterator<'a, Key, Value, MapFn, MappedValue>
695 where Key: Hash + Eq + Send + Sync + Clone + 'static,
696 Value: Send + Sync + Clone + 'static,
697 MapFn: Fn(Value) -> MappedValue, {
698 type Item = Option<MappedValue>;
699
700 fn next(&mut self) -> Option<Self::Item> {
701 self.iterator.next().map(|optional_value| {
702 match optional_value {
703 None => None,
704 Some(value) => Some((self.map_fn)(value))
705 }
706 })
707 }
708}
709
710
711#[cfg(test)]
712mod tests {
713 use std::sync::Arc;
714 use std::thread;
715 use std::time::Duration;
716
717 use crate::cache::cached::CacheD;
718 use crate::cache::command::{CommandStatus, RejectionReason};
719 use crate::cache::config::{ConfigBuilder, WeightCalculationFn};
720 use crate::cache::put_or_update::{PutOrUpdateRequest, PutOrUpdateRequestBuilder};
721 use crate::cache::stats::StatsType;
722
723 #[derive(Eq, PartialEq, Debug)]
724 struct Name {
725 first: String,
726 last: String,
727 }
728
729 mod setup {
730 use std::time::SystemTime;
731
732 use crate::cache::clock::Clock;
733
734 #[derive(Clone)]
735 pub(crate) struct UnixEpochClock;
736
737 impl Clock for UnixEpochClock {
738 fn now(&self) -> SystemTime {
739 SystemTime::UNIX_EPOCH
740 }
741 }
742 }
743
744 fn test_config_builder() -> ConfigBuilder<&'static str, &'static str> {
745 ConfigBuilder::new(100, 10, 100)
746 }
747
748 #[test]
749 #[should_panic]
750 fn shards_mut_be_power_of_2_and_greater_than_1() {
751 let _: CacheD<&str, &str> = CacheD::new(test_config_builder().shards(1).build());
752 }
753
754 #[test]
755 #[should_panic]
756 fn weight_must_be_greater_than_zero_1() {
757 let cached = CacheD::new(test_config_builder().build());
758 let _ =
759 cached.put_with_weight("topic", "microservices", 0).unwrap();
760 }
761
762 #[test]
763 #[should_panic]
764 fn weight_must_be_greater_than_zero_2() {
765 let cached = CacheD::new(test_config_builder().build());
766 let _ =
767 cached.put_with_weight_and_ttl("topic", "microservices", 0, Duration::from_secs(5)).unwrap();
768 }
769
770 #[test]
771 #[should_panic]
772 fn weight_calculation_fn_must_return_weight_greater_than_zero_1() {
773 let weight_calculation: Box<WeightCalculationFn<&str, &str>> = Box::new(|_key, _value, _is_time_to_live_specified| 0);
774 let cached = CacheD::new(test_config_builder().weight_calculation_fn(weight_calculation).build());
775 let _ =
776 cached.put("topic", "microservices").unwrap();
777 }
778
779 #[test]
780 #[should_panic]
781 fn weight_calculation_fn_must_return_weight_greater_than_zero_2() {
782 let weight_calculation: Box<WeightCalculationFn<&str, &str>> = Box::new(|_key, _value, _is_time_to_live_specified| 0);
783 let cached = CacheD::new(test_config_builder().weight_calculation_fn(weight_calculation).build());
784 let _ =
785 cached.put_with_ttl("topic", "microservices", Duration::from_secs(5)).unwrap();
786 }
787
788 #[test]
789 #[should_panic]
790 fn put_or_update_results_in_put_value_must_be_present() {
791 let cached = CacheD::new(test_config_builder().build());
792 let put_or_update: PutOrUpdateRequest<&str, &str> = PutOrUpdateRequestBuilder::new("store").build();
793 let _ = cached.put_or_update(put_or_update);
794 }
795
796 #[test]
797 #[should_panic]
798 fn put_or_update_results_in_put_with_weight_calculation_fn_must_return_weight_greater_than_zero() {
799 let weight_calculation: Box<WeightCalculationFn<&str, &str>> = Box::new(|_key, _value, _is_time_to_live_specified| 0);
800 let cached = CacheD::new(test_config_builder().weight_calculation_fn(weight_calculation).build());
801
802 let put_or_update = PutOrUpdateRequestBuilder::new("store").value("cached").build();
803 let _ = cached.put_or_update(put_or_update);
804 }
805
806 #[tokio::test]
807 #[should_panic]
808 async fn put_or_update_results_in_update_with_weight_calculation_fn_must_return_weight_greater_than_zero() {
809 let weight_calculation: Box<WeightCalculationFn<&str, &str>> = Box::new(|_key, _value, _is_time_to_live_specified| 0);
810 let cached = CacheD::new(test_config_builder().weight_calculation_fn(weight_calculation).build());
811 cached.put("topic", "microservices").unwrap().handle().await;
812
813 let put_or_update = PutOrUpdateRequestBuilder::new("topic").value("cached").build();
814 let _ = cached.put_or_update(put_or_update);
815 }
816
817
818 #[tokio::test]
819 #[should_panic]
820 async fn put_or_update_results_in_update_with_weight_must_be_greater_than_zero() {
821 let cached = CacheD::new(test_config_builder().build());
822 cached.put("topic", "microservices").unwrap().handle().await;
823
824 let put_or_update = PutOrUpdateRequestBuilder::new("topic").value("cached").weight(0).build();
825 let _ = cached.put_or_update(put_or_update);
826 }
827
828 #[tokio::test]
829 async fn put_a_key_value_without_weight_and_ttl() {
830 let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
831
832 let key: u64 = 100;
833 let value: u64 = 1000;
834
835 let acknowledgement =
836 cached.put(key, value).unwrap();
837 acknowledgement.handle().await;
838
839 let value = cached.get_ref(&100);
840 let value_ref = value.unwrap();
841 let stored_value = value_ref.value();
842 let key_id = stored_value.key_id();
843
844 assert_eq!(1000, stored_value.value());
845 assert_eq!(Some(40), cached.admission_policy.weight_of(&key_id));
846 }
847
848 #[tokio::test]
849 async fn put_a_key_value_without_weight_with_ttl() {
850 let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
851
852 let key: u64 = 100;
853 let value: u64 = 1000;
854
855 let acknowledgement =
856 cached.put_with_ttl(key, value, Duration::from_secs(300)).unwrap();
857 acknowledgement.handle().await;
858
859 let value = cached.get_ref(&100);
860 let value_ref = value.unwrap();
861 let stored_value = value_ref.value();
862 let key_id = stored_value.key_id();
863
864 assert_eq!(1000, stored_value.value());
865 assert_eq!(Some(64), cached.admission_policy.weight_of(&key_id));
866 assert!(stored_value.expire_after().is_some());
867 }
868
869 #[tokio::test]
870 async fn put_the_same_key_value_again() {
871 let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
872
873 let key: u64 = 100;
874 let value: u64 = 1000;
875
876 let acknowledgement = cached.put(key, value).unwrap();
877 acknowledgement.handle().await;
878
879 let acknowledgement = cached.put(key, value).unwrap();
880 let status = acknowledgement.handle().await;
881
882 assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), status);
883
884 let value = cached.get_ref(&100);
885 let value_ref = value.unwrap();
886 let stored_value = value_ref.value();
887
888 assert_eq!(1000, stored_value.value());
889 assert_eq!(40, cached.total_weight_used());
890 }
891
892 #[tokio::test]
893 async fn put_a_key_value_with_weight() {
894 let cached = CacheD::new(test_config_builder().build());
895
896 let acknowledgement =
897 cached.put_with_weight("topic", "microservices", 50).unwrap();
898 acknowledgement.handle().await;
899
900 let value = cached.get_ref(&"topic");
901 let value_ref = value.unwrap();
902 let stored_value = value_ref.value();
903 let key_id = stored_value.key_id();
904
905 assert_eq!("microservices", stored_value.value());
906 assert_eq!(Some(50), cached.admission_policy.weight_of(&key_id));
907 }
908
909 #[tokio::test]
910 async fn put_a_key_value_with_weight_again() {
911 let cached = CacheD::new(test_config_builder().build());
912
913 let acknowledgement =
914 cached.put_with_weight("topic", "microservices", 50).unwrap();
915 acknowledgement.handle().await;
916
917 let acknowledgement =
918 cached.put_with_weight("topic", "microservices", 50).unwrap();
919 let status = acknowledgement.handle().await;
920
921 assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), status);
922
923 let value = cached.get_ref(&"topic");
924 let value_ref = value.unwrap();
925 let stored_value = value_ref.value();
926 let key_id = stored_value.key_id();
927
928 assert_eq!("microservices", stored_value.value());
929 assert_eq!(Some(50), cached.admission_policy.weight_of(&key_id));
930 assert_eq!(50, cached.total_weight_used());
931 }
932
933 #[tokio::test]
934 async fn put_a_key_value_with_ttl() {
935 let cached = CacheD::new(test_config_builder().build());
936
937 let acknowledgement =
938 cached.put_with_ttl("topic", "microservices", Duration::from_secs(120)).unwrap();
939 acknowledgement.handle().await;
940
941 let value = cached.get(&"topic");
942 assert_eq!(Some("microservices"), value);
943 }
944
945 #[tokio::test]
946 async fn put_a_key_value_with_ttl_again() {
947 let cached = CacheD::new(test_config_builder().build());
948
949 let acknowledgement =
950 cached.put_with_ttl("topic", "microservices", Duration::from_secs(120)).unwrap();
951 acknowledgement.handle().await;
952
953 let acknowledgement =
954 cached.put_with_ttl("topic", "microservices", Duration::from_secs(120)).unwrap();
955 let status = acknowledgement.handle().await;
956
957 assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), status);
958
959 let value = cached.get(&"topic");
960 assert_eq!(Some("microservices"), value);
961 }
962
963 #[tokio::test]
964 async fn put_a_key_value_with_weight_and_ttl() {
965 let cached = CacheD::new(test_config_builder().build());
966
967 let acknowledgement =
968 cached.put_with_weight_and_ttl("topic", "microservices", 10, Duration::from_secs(120)).unwrap();
969 acknowledgement.handle().await;
970
971 let value = cached.get(&"topic");
972 assert_eq!(Some("microservices"), value);
973 }
974
975 #[tokio::test]
976 async fn put_a_key_value_with_weight_and_ttl_again() {
977 let cached = CacheD::new(test_config_builder().build());
978
979 let acknowledgement =
980 cached.put_with_weight_and_ttl("topic", "microservices", 10, Duration::from_secs(120)).unwrap();
981 acknowledgement.handle().await;
982
983 let acknowledgement =
984 cached.put_with_weight_and_ttl("topic", "microservices", 10, Duration::from_secs(120)).unwrap();
985 let status = acknowledgement.handle().await;
986 assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), status);
987
988 let value = cached.get(&"topic");
989 assert_eq!(Some("microservices"), value);
990 assert_eq!(10, cached.total_weight_used());
991 }
992
993 #[tokio::test]
994 async fn put_a_key_value_with_ttl_and_ttl_ticker_evicts_it() {
995 let cached = CacheD::new(test_config_builder().shards(2).ttl_tick_duration(Duration::from_millis(10)).build());
996
997 let acknowledgement =
998 cached.put_with_ttl("topic", "microservices", Duration::from_millis(20)).unwrap();
999 acknowledgement.handle().await;
1000
1001 let value = cached.get(&"topic");
1002 assert_eq!(Some("microservices"), value);
1003
1004 thread::sleep(Duration::from_millis(20));
1005 assert_eq!(None, cached.get(&"topic"));
1006 }
1007
1008 #[test]
1009 fn get_value_ref_for_a_non_existing_key() {
1010 let cached: CacheD<&str, &str> = CacheD::new(test_config_builder().build());
1011
1012 let value = cached.get_ref(&"non-existing");
1013 assert!(value.is_none());
1014 }
1015
1016 #[test]
1017 fn get_value_ref_for_a_non_existing_key_and_attempt_to_map_it() {
1018 let cached: CacheD<&str, &str> = CacheD::new(test_config_builder().build());
1019
1020 let value = cached.map_get_ref(&"non_existing", |stored_value| stored_value.value_ref().to_uppercase());
1021 assert!(value.is_none());
1022 }
1023
1024 #[tokio::test]
1025 async fn get_value_ref_for_an_existing_key() {
1026 let cached = CacheD::new(test_config_builder().build());
1027
1028 let acknowledgement =
1029 cached.put("topic", "microservices").unwrap();
1030 acknowledgement.handle().await;
1031
1032 let value = cached.get_ref(&"topic");
1033 assert_eq!(&"microservices", value.unwrap().value().value_ref());
1034 }
1035
1036 #[tokio::test]
1037 async fn get_value_ref_for_an_existing_key_and_map_it() {
1038 let cached = CacheD::new(test_config_builder().build());
1039
1040 let acknowledgement =
1041 cached.put("topic", "microservices").unwrap();
1042 acknowledgement.handle().await;
1043
1044 let value = cached.map_get_ref(&"topic", |stored_value| stored_value.value_ref().to_uppercase());
1045 assert_eq!("MICROSERVICES", value.unwrap());
1046 }
1047
1048 #[tokio::test]
1049 async fn get_value_for_an_existing_key() {
1050 let cached = CacheD::new(test_config_builder().build());
1051
1052 let acknowledgement =
1053 cached.put("topic", "microservices").unwrap();
1054 acknowledgement.handle().await;
1055
1056 let value = cached.get(&"topic");
1057 assert_eq!(Some("microservices"), value);
1058 }
1059
1060 #[tokio::test]
1061 async fn get_value_for_an_existing_key_and_map_it() {
1062 let cached = CacheD::new(test_config_builder().build());
1063
1064 let acknowledgement =
1065 cached.put("topic", "microservices").unwrap();
1066 acknowledgement.handle().await;
1067
1068 let value = cached.map_get(&"topic", |value| value.to_uppercase());
1069 assert_eq!("MICROSERVICES", value.unwrap());
1070 }
1071
1072 #[test]
1073 fn get_value_for_a_non_existing_key() {
1074 let cached: CacheD<&str, &str> = CacheD::new(test_config_builder().build());
1075
1076 let value = cached.get(&"non-existing");
1077 assert_eq!(None, value);
1078 }
1079
1080 #[test]
1081 fn get_value_for_a_non_existing_key_and_attempt_to_map_it() {
1082 let cached: CacheD<&str, &str> = CacheD::new(test_config_builder().build());
1083
1084 let value = cached.map_get(&"topic", |value| value.to_uppercase());
1085 assert_eq!(None, value);
1086 }
1087
1088 #[tokio::test]
1089 async fn get_value_ref_for_an_existing_key_if_value_is_not_cloneable() {
1090 let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1091
1092 let acknowledgement =
1093 cached.put("name", Name { first: "John".to_string(), last: "Mcnamara".to_string() }).unwrap();
1094 acknowledgement.handle().await;
1095
1096 let value = cached.get_ref(&"name");
1097 assert_eq!(&Name { first: "John".to_string(), last: "Mcnamara".to_string() }, value.unwrap().value().value_ref());
1098 }
1099
1100 #[tokio::test]
1101 async fn get_value_for_an_existing_key_if_value_is_not_cloneable_by_passing_an_arc() {
1102 let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1103
1104 let acknowledgement =
1105 cached.put("name", Arc::new(Name { first: "John".to_string(), last: "Mcnamara".to_string() })).unwrap();
1106 acknowledgement.handle().await;
1107
1108 let value = cached.get(&"name").unwrap();
1109 assert_eq!("John".to_string(), value.first);
1110 assert_eq!("Mcnamara".to_string(), value.last);
1111 }
1112
1113 #[tokio::test]
1114 async fn delete_a_key() {
1115 let cached = CacheD::new(test_config_builder().build());
1116
1117 let acknowledgement =
1118 cached.put("topic", "microservices").unwrap();
1119 acknowledgement.handle().await;
1120
1121 let key_id = {
1122 let key_value_ref = cached.get_ref(&"topic").unwrap();
1123 key_value_ref.value().key_id()
1124 };
1125
1126 let acknowledgement =
1127 cached.delete("topic").unwrap();
1128 acknowledgement.handle().await;
1129
1130 let value = cached.get(&"topic");
1131 assert_eq!(None, value);
1132 assert!(!cached.admission_policy.contains(&key_id));
1133 }
1134
1135 #[tokio::test]
1136 async fn get_access_frequency() {
1137 let cached = CacheD::new(ConfigBuilder::new(10, 10, 1000).access_pool_size(1).access_buffer_size(3).build());
1138
1139 let acknowledgement_topic =
1140 cached.put("topic", "microservices").unwrap();
1141 let acknowledgement_disk =
1142 cached.put("disk", "SSD").unwrap();
1143
1144 acknowledgement_topic.handle().await;
1145 acknowledgement_disk.handle().await;
1146
1147 cached.get(&"topic");
1148 cached.get(&"disk");
1149 cached.get(&"topic");
1150 cached.get(&"disk"); thread::sleep(Duration::from_secs(2));
1153
1154 let hasher = &(cached.config.key_hash_fn);
1155 let policy = cached.admission_policy;
1156
1157 assert_eq!(2, policy.estimate(hasher(&"topic")));
1158 assert_eq!(1, policy.estimate(hasher(&"disk")));
1159 }
1160
1161 #[tokio::test]
1162 async fn get_multiple_keys() {
1163 let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1164
1165 let acknowledgement =
1166 cached.put("topic", "microservices").unwrap();
1167 acknowledgement.handle().await;
1168
1169 let acknowledgement =
1170 cached.put("disk", "SSD").unwrap();
1171 acknowledgement.handle().await;
1172
1173 let acknowledgement =
1174 cached.put("cache", "in-memory").unwrap();
1175 acknowledgement.handle().await;
1176
1177 let values = cached.multi_get(vec![&"topic", &"non-existing", &"cache", &"disk"]);
1178
1179 assert_eq!(&Some("microservices"), values.get(&"topic").unwrap());
1180 assert_eq!(&None, values.get(&"non-existing").unwrap());
1181 assert_eq!(&Some("in-memory"), values.get(&"cache").unwrap());
1182 assert_eq!(&Some("SSD"), values.get(&"disk").unwrap());
1183 }
1184
1185 #[tokio::test]
1186 async fn get_multiple_keys_via_an_iterator() {
1187 let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1188
1189 let acknowledgement =
1190 cached.put("topic", "microservices").unwrap();
1191 acknowledgement.handle().await;
1192
1193 let acknowledgement =
1194 cached.put("disk", "SSD").unwrap();
1195 acknowledgement.handle().await;
1196
1197 let acknowledgement =
1198 cached.put("cache", "in-memory").unwrap();
1199 acknowledgement.handle().await;
1200
1201 let mut iterator = cached.multi_get_iterator(vec![&"topic", &"non-existing", &"cache", &"disk"]);
1202 assert_eq!(Some("microservices"), iterator.next().unwrap());
1203 assert_eq!(None, iterator.next().unwrap());
1204 assert_eq!(Some("in-memory"), iterator.next().unwrap());
1205 assert_eq!(Some("SSD"), iterator.next().unwrap());
1206 assert_eq!(None, iterator.next());
1207 }
1208
1209 #[tokio::test]
1210 async fn get_multiple_keys_via_an_iterator_given_value_is_not_cloneable() {
1211 let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1212
1213 let acknowledgement =
1214 cached.put("captain", Arc::new(Name { first: "John".to_string(), last: "Mcnamara".to_string() })).unwrap();
1215 acknowledgement.handle().await;
1216
1217 let acknowledgement =
1218 cached.put("vice-captain", Arc::new(Name { first: "Martin".to_string(), last: "Trolley".to_string() })).unwrap();
1219 acknowledgement.handle().await;
1220
1221 let mut iterator = cached.multi_get_iterator(vec![&"captain", &"vice-captain", &"disk"]);
1222 assert_eq!("John", iterator.next().unwrap().unwrap().first);
1223 assert_eq!("Martin", iterator.next().unwrap().unwrap().first);
1224 assert_eq!(None, iterator.next().unwrap());
1225 }
1226
1227 #[tokio::test]
1228 async fn map_multiple_keys_via_an_iterator() {
1229 let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1230
1231 let acknowledgement =
1232 cached.put("topic", "microservices").unwrap();
1233 acknowledgement.handle().await;
1234
1235 let acknowledgement =
1236 cached.put("disk", "ssd").unwrap();
1237 acknowledgement.handle().await;
1238
1239 let acknowledgement =
1240 cached.put("cache", "in-memory").unwrap();
1241 acknowledgement.handle().await;
1242
1243 let mut iterator = cached.multi_get_map_iterator(vec![&"topic", &"non-existing", &"cache", &"disk"], |value| value.to_uppercase());
1244 assert_eq!(Some("MICROSERVICES".to_string()), iterator.next().unwrap());
1245 assert_eq!(None, iterator.next().unwrap());
1246 assert_eq!(Some("IN-MEMORY".to_string()), iterator.next().unwrap());
1247 assert_eq!(Some("SSD".to_string()), iterator.next().unwrap());
1248 assert_eq!(None, iterator.next());
1249 }
1250
1251 #[tokio::test]
1252 async fn total_weight_used() {
1253 let cached = CacheD::new(test_config_builder().build());
1254
1255 let acknowledgement =
1256 cached.put_with_weight("topic", "microservices", 50).unwrap();
1257 acknowledgement.handle().await;
1258
1259 assert_eq!(50, cached.total_weight_used());
1260 }
1261
1262 #[tokio::test]
1263 async fn stats_summary() {
1264 let cached = CacheD::new(test_config_builder().build());
1265
1266 cached.put_with_weight("topic", "microservices", 50).unwrap().handle().await;
1267 cached.put_with_weight("cache", "cached", 10).unwrap().handle().await;
1268 cached.delete("cache").unwrap().handle().await;
1269
1270 let _ = cached.get(&"topic");
1271 let _ = cached.get(&"cache");
1272
1273 let summary = cached.stats_summary();
1274 assert_eq!(1, summary.get(&StatsType::CacheMisses).unwrap());
1275 assert_eq!(1, summary.get(&StatsType::CacheHits).unwrap());
1276 assert_eq!(60, summary.get(&StatsType::WeightAdded).unwrap());
1277 assert_eq!(10, summary.get(&StatsType::WeightRemoved).unwrap());
1278 assert_eq!(2, summary.get(&StatsType::KeysAdded).unwrap());
1279 assert_eq!(1, summary.get(&StatsType::KeysDeleted).unwrap());
1280
1281 assert_eq!(0, summary.get(&StatsType::KeysRejected).unwrap());
1282 assert_eq!(0, summary.get(&StatsType::AccessAdded).unwrap());
1283 assert_eq!(0, summary.get(&StatsType::AccessDropped).unwrap());
1284 }
1285}
1286
1287#[cfg(test)]
1288mod shutdown_tests {
1289 use std::sync::Arc;
1290 use std::sync::atomic::Ordering;
1291 use std::thread;
1292 use std::time::Duration;
1293
1294 use async_std::future::timeout;
1295 use tokio::time::sleep;
1296
1297 use crate::cache::cached::CacheD;
1298 use crate::cache::config::ConfigBuilder;
1299 use crate::cache::put_or_update::PutOrUpdateRequestBuilder;
1300
1301 fn test_config_builder() -> ConfigBuilder<&'static str, &'static str> {
1302 ConfigBuilder::new(100, 10, 100)
1303 }
1304
1305 #[test]
1306 fn put_after_shutdown() {
1307 let cached = CacheD::new(test_config_builder().build());
1308 cached.shutdown();
1309
1310 let put_result = cached.put("storage", "cached");
1311 assert!(put_result.is_err());
1312 }
1313
1314 #[test]
1315 fn put_with_weight_after_shutdown() {
1316 let cached = CacheD::new(test_config_builder().build());
1317 cached.shutdown();
1318
1319 let put_result = cached.put_with_weight("storage", "cached", 10);
1320 assert!(put_result.is_err());
1321 }
1322
1323 #[test]
1324 fn put_with_ttl_after_shutdown() {
1325 let cached = CacheD::new(test_config_builder().build());
1326 cached.shutdown();
1327
1328 let put_result = cached.put_with_ttl("storage", "cached", Duration::from_secs(5));
1329 assert!(put_result.is_err());
1330 }
1331
1332 #[test]
1333 fn put_with_weight_and_ttl_after_shutdown() {
1334 let cached = CacheD::new(test_config_builder().build());
1335 cached.shutdown();
1336
1337 let put_result = cached.put_with_weight_and_ttl("storage", "cached", 10, Duration::from_secs(5));
1338 assert!(put_result.is_err());
1339 }
1340
1341 #[test]
1342 fn delete_after_shutdown() {
1343 let cached = CacheD::new(test_config_builder().build());
1344 cached.shutdown();
1345
1346 let delete_result = cached.delete("storage");
1347 assert!(delete_result.is_err());
1348 }
1349
1350 #[test]
1351 fn put_or_update_after_shutdown() {
1352 let cached = CacheD::new(test_config_builder().build());
1353 cached.shutdown();
1354
1355 let put_or_update_result = cached.put_or_update(PutOrUpdateRequestBuilder::new("storage").weight(10).build());
1356 assert!(put_or_update_result.is_err());
1357 }
1358
1359 #[tokio::test]
1360 async fn get_after_shutdown() {
1361 let cached = CacheD::new(test_config_builder().build());
1362 cached.put("storage", "cached").unwrap().handle().await;
1363 cached.shutdown();
1364
1365 let get_result = cached.get(&"storage");
1366 assert_eq!(None, get_result);
1367 }
1368
1369 #[tokio::test]
1370 async fn get_ref_after_shutdown() {
1371 let cached = CacheD::new(test_config_builder().build());
1372 cached.put("storage", "cached").unwrap().handle().await;
1373 cached.shutdown();
1374
1375 let get_result = cached.get_ref(&"storage");
1376 assert!(get_result.is_none());
1377 }
1378
1379 #[tokio::test]
1380 async fn map_get_after_shutdown() {
1381 let cached = CacheD::new(test_config_builder().build());
1382 cached.put("storage", "cached").unwrap().handle().await;
1383 cached.shutdown();
1384
1385 let get_result = cached.map_get(&"storage", |value| value.to_uppercase());
1386 assert!(get_result.is_none());
1387 }
1388
1389 #[tokio::test]
1390 async fn map_get_ref_after_shutdown() {
1391 let cached = CacheD::new(test_config_builder().build());
1392 cached.put("storage", "cached").unwrap().handle().await;
1393 cached.shutdown();
1394
1395 let get_result = cached.map_get_ref(&"storage", |stored_value| stored_value.value_ref().to_uppercase());
1396 assert!(get_result.is_none());
1397 }
1398
1399 #[tokio::test]
1400 async fn multi_get_after_shutdown() {
1401 let cached = CacheD::new(test_config_builder().build());
1402 cached.put("storage", "cached").unwrap().handle().await;
1403 cached.put("topic", "microservices").unwrap().handle().await;
1404
1405 cached.shutdown();
1406
1407 let multi_get_result = cached.multi_get(vec![&"storage", &"topic"]);
1408 assert!(multi_get_result.is_empty());
1409 }
1410
1411 #[tokio::test]
1412 async fn multi_get_iterator_after_shutdown() {
1413 let cached = CacheD::new(test_config_builder().build());
1414 cached.put("storage", "cached").unwrap().handle().await;
1415 cached.put("topic", "microservices").unwrap().handle().await;
1416
1417 cached.shutdown();
1418
1419 let mut iterator = cached.multi_get_iterator(vec![&"storage", &"topic"]);
1420 assert!(iterator.next().is_none());
1421 }
1422
1423 #[tokio::test]
1424 async fn multi_get_map_iterator_after_shutdown() {
1425 let cached = CacheD::new(test_config_builder().build());
1426 cached.put("storage", "cached").unwrap().handle().await;
1427 cached.put("topic", "microservices").unwrap().handle().await;
1428
1429 cached.shutdown();
1430
1431 let mut iterator = cached.multi_get_map_iterator(vec![&"storage", &"topic"], |value| { value.to_uppercase() });
1432 assert!(iterator.next().is_none());
1433 }
1434
1435 #[tokio::test]
1436 async fn shutdown() {
1437 let cached = CacheD::new(test_config_builder().build());
1438
1439 cached.put_with_weight("topic", "microservices", 50).unwrap().handle().await;
1440 cached.put("cache", "cached").unwrap().handle().await;
1441
1442 cached.shutdown();
1443 assert!(cached.is_shutting_down.load(Ordering::Acquire));
1444
1445 let put_result = cached.put("storage", "cached");
1446 assert!(put_result.is_err());
1447
1448 assert_eq!(0, cached.total_weight_used());
1449 assert_eq!(None, cached.get(&"topic"));
1450 assert_eq!(None, cached.get(&"cache"));
1451 }
1452
1453 #[tokio::test]
1454 async fn concurrent_shutdown() {
1455 let cached = Arc::new(CacheD::new(test_config_builder().build()));
1456 cached.put_with_weight("topic", "microservices", 50).unwrap().handle().await;
1457 cached.put("cache", "cached").unwrap().handle().await;
1458
1459 let thread_handles = (1..=10).map(|_| {
1460 thread::spawn({
1461 let cached = cached.clone();
1462 move || {
1463 cached.shutdown();
1464 }
1465 })
1466 }).collect::<Vec<_>>();
1467 for handle in thread_handles {
1468 handle.join().unwrap();
1469 }
1470
1471 assert!(cached.is_shutting_down.load(Ordering::Acquire));
1472
1473 let put_result = cached.put("storage", "cached");
1474 assert!(put_result.is_err());
1475 }
1476
1477 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1478 async fn should_not_block_on_shutdown() {
1479 let config_builder = ConfigBuilder::new(1000, 100, 1_000_000);
1480 let cached = Arc::new(CacheD::new(config_builder.build()));
1481
1482 let task_handles = (1..=50).map(|index| {
1483 let cached_clone = cached.clone();
1484 tokio::spawn(
1485 async move {
1486 let start_index = index * 10;
1487 let end_index = start_index + 10;
1488
1489 for count in start_index..end_index {
1490 let put_result = cached_clone.put(count, count * 10);
1491 if let Ok(result) = put_result {
1492 timeout(Duration::from_secs(1), result.handle()).await.unwrap();
1493 }
1494 sleep(Duration::from_millis(2)).await;
1495 }
1496 }
1497 )
1498 }).collect::<Vec<_>>();
1499
1500 let cached_clone = cached.clone();
1501 let shutdown_handle = tokio::spawn(
1502 async move {
1503 sleep(Duration::from_millis(8)).await;
1504 cached_clone.shutdown();
1505 }
1506 );
1507 for handle in task_handles {
1508 handle.await.unwrap()
1509 }
1510 shutdown_handle.await.unwrap();
1511 }
1512
1513 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1514 async fn should_not_block_on_shutdown_with_limited_space() {
1515 let config_builder = ConfigBuilder::new(1000, 100, 1000);
1516 let cached = Arc::new(CacheD::new(config_builder.build()));
1517
1518 let task_handles = (1..=50).map(|index| {
1519 let cached_clone = cached.clone();
1520 tokio::spawn(
1521 async move {
1522 let start_index = index * 10;
1523 let end_index = start_index + 10;
1524
1525 for count in start_index..end_index {
1526 let put_result = cached_clone.put(count, count * 10);
1527 if let Ok(result) = put_result {
1528 timeout(Duration::from_secs(1), result.handle()).await.unwrap();
1529 }
1530 sleep(Duration::from_millis(2)).await;
1531 }
1532 }
1533 )
1534 }).collect::<Vec<_>>();
1535
1536 let cached_clone = cached.clone();
1537 let shutdown_handle = tokio::spawn(
1538 async move {
1539 sleep(Duration::from_millis(8)).await;
1540 cached_clone.shutdown();
1541 }
1542 );
1543 for handle in task_handles {
1544 handle.await.unwrap()
1545 }
1546 shutdown_handle.await.unwrap();
1547 }
1548}
1549
1550#[cfg(test)]
1551mod put_or_update_tests {
1552 use std::ops::Add;
1553 use std::time::Duration;
1554
1555 use crate::cache::cached::CacheD;
1556 use crate::cache::cached::put_or_update_tests::setup::UnixEpochClock;
1557 use crate::cache::clock::ClockType;
1558 use crate::cache::config::ConfigBuilder;
1559 use crate::cache::put_or_update::PutOrUpdateRequestBuilder;
1560 use crate::cache::types::Weight;
1561
1562 mod setup {
1563 use std::time::SystemTime;
1564
1565 use crate::cache::clock::Clock;
1566
1567 #[derive(Clone)]
1568 pub(crate) struct UnixEpochClock;
1569
1570 impl Clock for UnixEpochClock {
1571 fn now(&self) -> SystemTime {
1572 SystemTime::UNIX_EPOCH
1573 }
1574 }
1575 }
1576
1577 fn test_config_builder() -> ConfigBuilder<&'static str, &'static str> {
1578 ConfigBuilder::new(100, 10, 100)
1579 }
1580
1581 #[tokio::test]
1582 async fn put_or_update_a_non_existing_key_value() {
1583 let cached = CacheD::new(test_config_builder().build());
1584
1585 let acknowledgement =
1586 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("microservices").build()).unwrap();
1587 acknowledgement.handle().await;
1588
1589 let value = cached.get_ref(&"topic");
1590 let value_ref = value.unwrap();
1591 let stored_value = value_ref.value();
1592
1593 assert_eq!("microservices", stored_value.value());
1594 }
1595
1596 #[tokio::test]
1597 async fn put_or_update_a_non_existing_key_value_with_weight() {
1598 let cached = CacheD::new(test_config_builder().build());
1599
1600 let acknowledgement =
1601 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("microservices").weight(33).build()).unwrap();
1602 acknowledgement.handle().await;
1603
1604 let value = cached.get_ref(&"topic");
1605 let value_ref = value.unwrap();
1606 let stored_value = value_ref.value();
1607 let key_id = stored_value.key_id();
1608
1609 assert_eq!("microservices", stored_value.value());
1610 assert_eq!(Some(33), cached.admission_policy.weight_of(&key_id));
1611 }
1612
1613 #[tokio::test]
1614 async fn put_or_update_a_non_existing_key_value_with_time_to_live() {
1615 let clock: ClockType = Box::new(UnixEpochClock {});
1616 let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1617
1618 let acknowledgement =
1619 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("microservices").weight(10).time_to_live(Duration::from_secs(10)).build()).unwrap();
1620 acknowledgement.handle().await;
1621
1622 let value = cached.get_ref(&"topic");
1623 let value_ref = value.unwrap();
1624 let stored_value = value_ref.value();
1625 let key_id = stored_value.key_id();
1626
1627 assert_eq!(Some(clock.now().add(Duration::from_secs(10))), stored_value.expire_after());
1628 assert_eq!("microservices", stored_value.value());
1629 assert_eq!(Some(10), cached.admission_policy.weight_of(&key_id));
1630 }
1631
1632 #[tokio::test]
1633 async fn update_the_value_of_an_existing_key() {
1634 let cached = CacheD::new(test_config_builder().build());
1635
1636 let acknowledgement =
1637 cached.put("topic", "microservices").unwrap();
1638 acknowledgement.handle().await;
1639
1640 let acknowledgement =
1641 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("storage engine").build()).unwrap();
1642 acknowledgement.handle().await;
1643
1644 let value = cached.get_ref(&"topic");
1645 let value_ref = value.unwrap();
1646 let stored_value = value_ref.value();
1647
1648 assert_eq!("storage engine", stored_value.value());
1649 }
1650
1651 #[tokio::test]
1652 async fn update_the_weight_of_an_existing_key() {
1653 let cached = CacheD::new(test_config_builder().build());
1654
1655 let acknowledgement =
1656 cached.put("topic", "microservices").unwrap();
1657 acknowledgement.handle().await;
1658
1659 let acknowledgement =
1660 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").weight(29).build()).unwrap();
1661 acknowledgement.handle().await;
1662
1663 let value = cached.get_ref(&"topic");
1664 let value_ref = value.unwrap();
1665 let stored_value = value_ref.value();
1666 let key_id = stored_value.key_id();
1667
1668 assert_eq!("microservices", stored_value.value());
1669 assert_eq!(Some(29), cached.admission_policy.weight_of(&key_id));
1670 }
1671
1672 #[tokio::test]
1673 async fn update_the_time_to_live_of_an_existing_key_with_original_key_not_having_time_to_live() {
1674 let clock: ClockType = Box::new(UnixEpochClock {});
1675 let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1676
1677 let acknowledgement =
1678 cached.put("topic", "microservices").unwrap();
1679 acknowledgement.handle().await;
1680
1681 let original_weight = weight_of(&cached, "topic");
1682
1683 let acknowledgement =
1684 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").time_to_live(Duration::from_secs(100)).build()).unwrap();
1685 acknowledgement.handle().await;
1686
1687 let value = cached.get_ref(&"topic");
1688 let value_ref = value.unwrap();
1689 let stored_value = value_ref.value();
1690 let key_id = stored_value.key_id();
1691
1692 assert_eq!("microservices", stored_value.value());
1693 assert_ne!(original_weight, cached.admission_policy.weight_of(&key_id));
1694
1695 assert_eq!(Some(clock.now().add(Duration::from_secs(100))), stored_value.expire_after());
1696 assert_eq!(stored_value.expire_after(), cached.ttl_ticker.get(&key_id, &stored_value.expire_after().unwrap()));
1697 }
1698
1699 #[tokio::test]
1700 async fn remove_the_time_to_live_of_an_existing_key() {
1701 let clock: ClockType = Box::new(UnixEpochClock {});
1702 let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1703
1704 let acknowledgement =
1705 cached.put_with_ttl("topic", "microservices", Duration::from_secs(100)).unwrap();
1706 acknowledgement.handle().await;
1707
1708 let original_weight = weight_of(&cached, "topic");
1709
1710 let acknowledgement =
1711 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").remove_time_to_live().build()).unwrap();
1712 acknowledgement.handle().await;
1713
1714 let value = cached.get_ref(&"topic");
1715 let value_ref = value.unwrap();
1716 let stored_value = value_ref.value();
1717 let key_id = stored_value.key_id();
1718
1719 assert_eq!("microservices", stored_value.value());
1720 assert_ne!(original_weight, cached.admission_policy.weight_of(&key_id));
1721
1722 assert_eq!(None, stored_value.expire_after());
1723 }
1724
1725 #[tokio::test]
1726 async fn add_the_time_to_live_of_an_existing_key() {
1727 let clock: ClockType = Box::new(UnixEpochClock {});
1728 let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1729
1730 let acknowledgement =
1731 cached.put("topic", "microservices").unwrap();
1732 acknowledgement.handle().await;
1733
1734 let original_weight = weight_of(&cached, "topic");
1735
1736 let acknowledgement =
1737 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").time_to_live(Duration::from_secs(120)).build()).unwrap();
1738 acknowledgement.handle().await;
1739
1740 let value = cached.get_ref(&"topic");
1741 let value_ref = value.unwrap();
1742 let stored_value = value_ref.value();
1743 let key_id = stored_value.key_id();
1744
1745 assert_eq!("microservices", stored_value.value());
1746 assert_ne!(original_weight, cached.admission_policy.weight_of(&key_id));
1747
1748 assert_eq!(Some(clock.now().add(Duration::from_secs(120))), stored_value.expire_after());
1749 assert_eq!(stored_value.expire_after(), cached.ttl_ticker.get(&key_id, &stored_value.expire_after().unwrap()));
1750 }
1751
1752 #[tokio::test]
1753 async fn update_the_value_and_time_to_live_of_an_existing_key() {
1754 let clock: ClockType = Box::new(UnixEpochClock {});
1755 let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1756
1757 let acknowledgement =
1758 cached.put("topic", "microservices").unwrap();
1759 acknowledgement.handle().await;
1760
1761 let original_weight = weight_of(&cached, "topic");
1762
1763 let acknowledgement =
1764 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("storage engine").time_to_live(Duration::from_secs(100)).build()).unwrap();
1765 acknowledgement.handle().await;
1766
1767 let value = cached.get_ref(&"topic");
1768 let value_ref = value.unwrap();
1769 let stored_value = value_ref.value();
1770 let key_id = stored_value.key_id();
1771
1772 assert_eq!("storage engine", stored_value.value());
1773 assert_ne!(original_weight, cached.admission_policy.weight_of(&key_id));
1774
1775 assert_eq!(Some(clock.now().add(Duration::from_secs(100))), stored_value.expire_after());
1776 assert_eq!(stored_value.expire_after(), cached.ttl_ticker.get(&key_id, &stored_value.expire_after().unwrap()));
1777 }
1778
1779 #[tokio::test]
1780 async fn update_the_value_and_remove_time_to_live_of_an_existing_key() {
1781 let clock: ClockType = Box::new(UnixEpochClock {});
1782 let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1783
1784 let acknowledgement =
1785 cached.put_with_ttl("topic", "microservices", Duration::from_secs(100)).unwrap();
1786 acknowledgement.handle().await;
1787
1788 let original_weight = weight_of(&cached, "topic");
1789
1790 let acknowledgement =
1791 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("storage engine").remove_time_to_live().build()).unwrap();
1792 acknowledgement.handle().await;
1793
1794 let value = cached.get_ref(&"topic");
1795 let value_ref = value.unwrap();
1796 let stored_value = value_ref.value();
1797 let key_id = stored_value.key_id();
1798
1799 let new_weight = cached.admission_policy.weight_of(&key_id);
1800 assert_eq!("storage engine", stored_value.value());
1801 assert_ne!(original_weight, new_weight);
1802 assert!(new_weight < original_weight);
1803
1804 assert_eq!(None, stored_value.expire_after());
1805 }
1806
1807 #[tokio::test]
1808 async fn update_the_value_weight_and_remove_time_to_live_of_an_existing_key() {
1809 let clock: ClockType = Box::new(UnixEpochClock {});
1810 let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1811
1812 let acknowledgement =
1813 cached.put_with_ttl("topic", "microservices", Duration::from_secs(100)).unwrap();
1814 acknowledgement.handle().await;
1815
1816 let original_weight = weight_of(&cached, "topic");
1817
1818 let acknowledgement =
1819 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("storage engine").weight(300).remove_time_to_live().build()).unwrap();
1820 acknowledgement.handle().await;
1821
1822 let value = cached.get_ref(&"topic");
1823 let value_ref = value.unwrap();
1824 let stored_value = value_ref.value();
1825 let key_id = stored_value.key_id();
1826
1827 let new_weight = cached.admission_policy.weight_of(&key_id);
1828 assert_eq!("storage engine", stored_value.value());
1829 assert_ne!(original_weight, new_weight);
1830 assert_eq!(Some(300), new_weight);
1831
1832 assert_eq!(None, stored_value.expire_after());
1833 }
1834
1835 #[tokio::test]
1836 async fn update_the_time_to_live_of_an_existing_key() {
1837 let clock: ClockType = Box::new(UnixEpochClock {});
1838 let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1839
1840 let acknowledgement =
1841 cached.put_with_ttl("topic", "microservices", Duration::from_secs(100)).unwrap();
1842 acknowledgement.handle().await;
1843
1844 let original_weight = weight_of(&cached, "topic");
1845
1846 let acknowledgement =
1847 cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").time_to_live(Duration::from_secs(500)).build()).unwrap();
1848 acknowledgement.handle().await;
1849
1850 let value = cached.get_ref(&"topic");
1851 let value_ref = value.unwrap();
1852 let stored_value = value_ref.value();
1853 let key_id = stored_value.key_id();
1854
1855 let new_weight = cached.admission_policy.weight_of(&key_id);
1856 assert_eq!("microservices", stored_value.value());
1857 assert_eq!(original_weight, new_weight);
1858 }
1859
1860 fn weight_of(cached: &CacheD<&str, &str>, key: &'static str) -> Option<Weight> {
1861 let value = cached.get_ref(&key);
1862 let value_ref = value.unwrap();
1863 let stored_value = value_ref.value();
1864 let key_id = stored_value.key_id();
1865
1866 cached.admission_policy.weight_of(&key_id)
1867 }
1868}