tinylfu_cached/cache/
cached.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::atomic::Ordering::Acquire;
6use std::time::Duration;
7
8use log::info;
9
10use crate::cache::command::acknowledgement::CommandAcknowledgement;
11use crate::cache::command::command_executor::{CommandExecutor, CommandSendResult, shutdown_result};
12use crate::cache::command::{CommandType, RejectionReason};
13use crate::cache::config::Config;
14use crate::cache::config::weight_calculation::Calculation;
15use crate::cache::errors::Errors;
16use crate::cache::expiration::TTLTicker;
17use crate::cache::key_description::KeyDescription;
18use crate::cache::policy::admission_policy::AdmissionPolicy;
19use crate::cache::pool::Pool;
20use crate::cache::put_or_update::PutOrUpdateRequest;
21use crate::cache::stats::{ConcurrentStatsCounter, StatsSummary};
22use crate::cache::store::{Store, TypeOfExpiryUpdate};
23use crate::cache::store::key_value_ref::KeyValueRef;
24use crate::cache::store::stored_value::StoredValue;
25use crate::cache::types::{KeyId, Weight};
26use crate::cache::unique_id::increasing_id_generator::IncreasingIdGenerator;
27
28/// `CacheD` is a high performance, LFU based in-memory cache. Cached provides various behaviors including:
29/// `put`, `put_with_weight`, `put_with_ttl`, `get`, `get_ref`, `map_get_ref`, `multi_get`, `delete`, `put_or_update`.
30///
31/// The core abstractions that `CacheD` interacts with include:
32/// - `crate::cache::store::Store`: `Store` holds the key/value mapping.
33/// - `crate::cache::command::command_executor::CommandExecutor`: `CommandExecutor` executes various commands of type `crate::cache::command::CommandType`. Each write operation results in a command to `CommandExecutor`.
34/// - `crate::cache::policy::admission_policy::AdmissionPolicy`: `AdmissionPolicy` maintains the weight of each key in the cache and takes a decision on whether a key should be admitted.
35/// - `crate::cache::expiration::TTLTicker`: `TTLTicker` removes the expired keys.
36///
37/// Core design ideas behind `CacheD`:
38/// 1) LFU (least frequently used):
39///
40///  `CacheD` is an LFU based cache which makes it essential to store the access frequency of each key.
41///   Storing the access frequency in a `HashMap` like data structure would mean that the space used to store the frequency is directly proportional to the number of keys in the cache.
42///   So, the tradeoff is to use a probabilistic data structure like `count-min sketch`.
43///   `Cached` uses `count-min sketch` inside `crate::cache::lfu::frequency_counter::FrequencyCounter` to store the frequency for each key.
44///
45/// 2) Memory bound:
46///
47///  `CacheD` is a memory bound cache. It uses `Weight` as the terminology to denote the space.
48///   Every key/value pair has a weight, either the clients can provide weight while putting a key/value pair or the weight is auto-calculated.
49///   In order to create a new instance of `CacheD`, clients provide the total weight of the cache, which signifies the total space reserved for the cache.
50///  `CacheD` ensure that it never crosses the maximum weight of the cache.
51///
52/// 3) Admission/Rejection of incoming keys:
53///
54///   After the space allocated to the instance of `CacheD` is full, put of a new key/value pair will result in `AdmissionPolicy`.
55///   deciding whether the incoming key/value pair should be accepted. This decision is based on estimating the access frequency of the incoming key
56///   and comparing it against the estimated access frequencies of a sample of keys.
57///
58/// 4) Fine-grained locks:
59///
60///   `CacheD` makes an attempt to used fine grained locks over coarse grained locks wherever possible.
61///
62/// 5) Expressive APIs:
63///
64///   `Cached` provides expressive APIs to the clients.
65///   For example, `put` is not an immediate operation, it happens at a later point in time. The return type of `put` operation is an instance of
66///   [`crate::cache::command::command_executor::CommandSendResult`] and clients can use it to `await` until the status of the `put` operation is returned.
67///
68///   Similarly, `put_or_update` operation takes an instance of [`crate::cache::put_or_update::PutOrUpdateRequest`], thereby allowing the clients to
69///   be very explicit in the type of change they want to perform.
70pub struct CacheD<Key, Value>
71    where Key: Hash + Eq + Send + Sync + Clone + 'static,
72          Value: Send + Sync + 'static {
73    config: Config<Key, Value>,
74    store: Arc<Store<Key, Value>>,
75    command_executor: CommandExecutor<Key, Value>,
76    admission_policy: Arc<AdmissionPolicy<Key>>,
77    pool: Pool<AdmissionPolicy<Key>>,
78    ttl_ticker: Arc<TTLTicker>,
79    id_generator: IncreasingIdGenerator,
80    is_shutting_down: AtomicBool,
81}
82
83impl<Key, Value> CacheD<Key, Value>
84    where Key: Hash + Eq + Send + Sync + Clone + 'static,
85          Value: Send + Sync + 'static {
86    /// Creates a new instance of `Cached` with the provided [`crate::cache::config::Config`]
87    pub fn new(config: Config<Key, Value>) -> Self {
88        assert!(config.counters > 0);
89
90        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
91        let store = Store::new(config.clock.clone_box(), stats_counter.clone(), config.capacity, config.shards);
92        let admission_policy = Arc::new(AdmissionPolicy::new(config.counters, config.cache_weight_config(), stats_counter.clone()));
93        let pool = Pool::new(config.access_pool_size, config.access_buffer_size, admission_policy.clone());
94        let ttl_ticker = Self::ttl_ticker(&config, store.clone(), admission_policy.clone());
95        let command_buffer_size = config.command_buffer_size;
96
97        CacheD {
98            config,
99            store: store.clone(),
100            command_executor: CommandExecutor::new(store, admission_policy.clone(), stats_counter, ttl_ticker.clone(), command_buffer_size),
101            admission_policy,
102            pool,
103            ttl_ticker,
104            id_generator: IncreasingIdGenerator::new(),
105            is_shutting_down: AtomicBool::new(false),
106        }
107    }
108
109    /// Puts the key/value pair in the cacheD instance and returns an instance of [` crate::cache::command::command_executor::CommandSendResult`] to the clients.
110    ///
111    /// Weight is calculated by the weight calculation function provided as a part of `Config`.
112    ///
113    ///  [`crate::cache::command::CommandStatus::Rejected`] is returned to the clients if the key already exists, since v0.0.3.
114    ///
115    /// `put` is not an immediate operation. Every invocation of `put` results in `crate::cache::command::CommandType::Put` to the `CommandExecutor`.
116    /// `CommandExecutor` in turn delegates to the `AdmissionPolicy` to perform the put operation.
117    /// `AdmissionPolicy` may accept or reject the key/value pair depending on the available cache weight.
118    ///
119    /// Since, `put` is not an immediate operation, clients can `await` on the response to get the [`crate::cache::command::CommandStatus`]
120    /// ```
121    /// use tinylfu_cached::cache::cached::CacheD;
122    /// use tinylfu_cached::cache::command::CommandStatus;
123    /// use tinylfu_cached::cache::config::ConfigBuilder;
124    /// #[tokio::main]
125    ///  async fn main() {
126    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
127    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
128    ///     assert_eq!(CommandStatus::Accepted, status);
129    /// }
130    /// ```
131    pub fn put(&self, key: Key, value: Value) -> CommandSendResult {
132        let weight = (self.config.weight_calculation_fn)(&key, &value, false);
133        assert!(weight > 0, "{}", Errors::WeightCalculationGtZero);
134        self.put_with_weight(key, value, weight)
135    }
136
137    /// Puts the key/value pair in the cacheD instance and returns an instance of [` crate::cache::command::command_executor::CommandSendResult`] to the clients.
138    ///
139    /// Weight is provided by the clients.
140    ///
141    ///  [`crate::cache::command::CommandStatus::Rejected`] is returned to the clients if the key already exists, since v0.0.3.
142    ///
143    /// `put_with_weight` is not an immediate operation. Every invocation of `put_with_weight` results in `crate::cache::command::CommandType::Put` to the `CommandExecutor`.
144    /// `CommandExecutor` in turn delegates to the `AdmissionPolicy` to perform the put operation.
145    /// `AdmissionPolicy` may accept or reject the key/value pair depending on the available cache weight.
146    ///
147    /// Since, `put_with_weight` is not an immediate operation, clients can `await` on the response to get the [`crate::cache::command::CommandStatus`]
148    /// ```
149    /// use tinylfu_cached::cache::cached::CacheD;
150    /// use tinylfu_cached::cache::command::CommandStatus;
151    /// use tinylfu_cached::cache::config::ConfigBuilder;
152    /// #[tokio::main]
153    ///  async fn main() {
154    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
155    ///     let status = cached.put_with_weight("topic", "microservices", 50).unwrap().handle().await;
156    ///     assert_eq!(CommandStatus::Accepted, status);
157    ///     assert_eq!(50, cached.total_weight_used());
158    /// }
159    /// ```
160    pub fn put_with_weight(&self, key: Key, value: Value, weight: Weight) -> CommandSendResult {
161        if self.is_shutting_down() { return shutdown_result(); }
162
163        assert!(weight > 0, "{}", Errors::KeyWeightGtZero("put_with_weight"));
164        if self.store.is_present(&key) {
165            return Ok(CommandAcknowledgement::rejected(RejectionReason::KeyAlreadyExists))
166        }
167        self.command_executor.send(CommandType::Put(
168            self.key_description(key, weight),
169            value,
170        ))
171    }
172
173    /// Puts the key/value pair with `time_to_live` in the cacheD instance and returns an instance of [` crate::cache::command::command_executor::CommandSendResult`] to the clients.
174    ///
175    /// Weight is calculated by the weight calculation function provided as a part of `Config`.
176    ///
177    /// [`crate::cache::command::CommandStatus::Rejected`] is returned to the clients if the key already exists, since v0.0.3.
178    ///
179    /// `put_with_ttl` is not an immediate operation. Every invocation of `put_with_ttl` results in `crate::cache::command::CommandType::PutWithTTL` to the `CommandExecutor`.
180    /// `CommandExecutor` in turn delegates to the `AdmissionPolicy` to perform the put operation.
181    /// `AdmissionPolicy` may accept or reject the key/value pair depending on the available cache weight.
182    ///
183    /// Since, `put_with_ttl` is not an immediate operation, clients can `await` on the response to get the [`crate::cache::command::CommandStatus`]
184    /// ```
185    /// use tinylfu_cached::cache::cached::CacheD;
186    /// use tinylfu_cached::cache::command::CommandStatus;
187    /// use tinylfu_cached::cache::config::ConfigBuilder;
188    /// use std::time::Duration;
189    /// #[tokio::main]
190    ///  async fn main() {
191    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
192    ///     let status = cached.put_with_ttl("topic", "microservices", Duration::from_secs(120)).unwrap().handle().await;
193    ///     assert_eq!(CommandStatus::Accepted, status);
194    /// }
195    /// ```
196    pub fn put_with_ttl(&self, key: Key, value: Value, time_to_live: Duration) -> CommandSendResult {
197        if self.is_shutting_down() { return shutdown_result(); }
198
199        let weight = (self.config.weight_calculation_fn)(&key, &value, true);
200        assert!(weight > 0, "{}", Errors::WeightCalculationGtZero);
201        if self.store.is_present(&key) {
202            return Ok(CommandAcknowledgement::rejected(RejectionReason::KeyAlreadyExists))
203        }
204        self.command_executor.send(CommandType::PutWithTTL(
205            self.key_description(key, weight), value, time_to_live)
206        )
207    }
208
209    /// Puts the key/value pair with `time_to_live` in the cacheD instance and returns an instance of [` crate::cache::command::command_executor::CommandSendResult`] to the clients.
210    ///
211    /// Weight is provided by the clients.
212    ///
213    /// [`crate::cache::command::CommandStatus::Rejected`] is returned to the clients if the key already exists, since v0.0.3.
214    ///
215    /// `put_with_weight_and_ttl` is not an immediate operation. Every invocation of `put_with_weight_and_ttl` results in `crate::cache::command::CommandType::PutWithTTL` to the `CommandExecutor`.
216    /// `CommandExecutor` in turn delegates to the `AdmissionPolicy` to perform the put operation.
217    /// `AdmissionPolicy` may accept or reject the key/value pair depending on the available cache weight.
218    ///
219    /// Since, `put_with_weight_and_ttl` is not an immediate operation, clients can `await` on the response to get the [`crate::cache::command::CommandStatus`]
220    /// ```
221    /// use tinylfu_cached::cache::cached::CacheD;
222    /// use tinylfu_cached::cache::command::CommandStatus;
223    /// use tinylfu_cached::cache::config::ConfigBuilder;
224    /// use std::time::Duration;
225    /// #[tokio::main]
226    ///  async fn main() {
227    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
228    ///     let status = cached.put_with_weight_and_ttl("topic", "microservices", 50, Duration::from_secs(120)).unwrap().handle().await;
229    ///     assert_eq!(50, cached.total_weight_used());
230    ///     assert_eq!(CommandStatus::Accepted, status);
231    /// }
232    /// ```
233    pub fn put_with_weight_and_ttl(&self, key: Key, value: Value, weight: Weight, time_to_live: Duration) -> CommandSendResult {
234        if self.is_shutting_down() { return shutdown_result(); }
235
236        assert!(weight > 0, "{}", Errors::KeyWeightGtZero("put_with_weight_and_ttl"));
237        if self.store.is_present(&key) {
238            return Ok(CommandAcknowledgement::rejected(RejectionReason::KeyAlreadyExists))
239        }
240        self.command_executor.send(CommandType::PutWithTTL(
241            self.key_description(key, weight), value, time_to_live,
242        ))
243    }
244
245    /// Performs a `put` if the key does not exist or an `update` operation, if the key exists. [`PutOrUpdateRequest`] is a convenient way to perform put or update operation.
246    /// `put_or_update` attempts to perform the update operation on `crate::cache::store::Store` first.
247    /// If the update operation is successful then the changes are made to `TTLTicker` and `AdmissionPolicy`, if applicable.
248    /// If the update is not successful then a `put` operation is performed.
249    /// ```
250    /// use tinylfu_cached::cache::cached::CacheD;
251    /// use tinylfu_cached::cache::command::CommandStatus;
252    /// use tinylfu_cached::cache::config::ConfigBuilder;
253    /// use tinylfu_cached::cache::put_or_update::PutOrUpdateRequestBuilder;
254    /// #[tokio::main]
255    ///  async fn main() {
256    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
257    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
258    ///     assert_eq!(CommandStatus::Accepted, status);
259    ///     let _ = cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("Cached").build()).unwrap().handle().await;
260    ///     let value = cached.get(&"topic");
261    ///     assert_eq!(Some("Cached"), value);
262    /// }
263    /// ```
264    pub fn put_or_update(&self, request: PutOrUpdateRequest<Key, Value>) -> CommandSendResult {
265        if self.is_shutting_down() { return shutdown_result(); }
266
267        let updated_weight = request.updated_weight(&self.config.weight_calculation_fn);
268        let (key, value, time_to_live)
269            = (request.key, request.value, request.time_to_live);
270
271        let update_response
272            = self.store.update(&key, value, time_to_live, request.remove_time_to_live);
273
274        if !update_response.did_update_happen() {
275            let value = update_response.value();
276            assert!(value.is_some(), "{}", Errors::PutOrUpdateValueMissing);
277            assert!(updated_weight.is_some());
278
279            let value = value.unwrap();
280            let weight = updated_weight.unwrap();
281            assert!(weight > 0, "{}", Errors::KeyWeightGtZero("PutOrUpdate"));
282
283            return if let Some(time_to_live) = time_to_live {
284                self.command_executor.send(CommandType::PutWithTTL(
285                    self.key_description(key, weight), value, time_to_live,
286                ))
287            } else {
288                self.command_executor.send(CommandType::Put(
289                    self.key_description(key, weight),
290                    value,
291                ))
292            };
293        }
294
295        let key_id = update_response.key_id_or_panic();
296        let existing_weight = self.admission_policy.weight_of(&key_id).unwrap_or(0);
297
298        let updated_weight = match update_response.type_of_expiry_update() {
299            TypeOfExpiryUpdate::Added(key_id, expiry) => {
300                self.ttl_ticker.put(key_id, expiry);
301                updated_weight.or_else(|| Some(existing_weight + Calculation::ttl_ticker_entry_size() as i64))
302            }
303            TypeOfExpiryUpdate::Deleted(key_id, expiry) => {
304                self.ttl_ticker.delete(&key_id, &expiry);
305                updated_weight.or_else(|| Some(existing_weight - Calculation::ttl_ticker_entry_size() as i64))
306            }
307            TypeOfExpiryUpdate::Updated(key_id, old_expiry, new_expiry) => {
308                self.ttl_ticker.update(key_id, &old_expiry, new_expiry);
309                updated_weight
310            }
311            _ => updated_weight,
312        };
313
314        if let Some(weight) = updated_weight {
315            assert!(weight > 0, "{}", Errors::KeyWeightGtZero("PutOrUpdate"));
316            return self.command_executor.send(CommandType::UpdateWeight(key_id, weight));
317        }
318        Ok(CommandAcknowledgement::accepted())
319    }
320
321    /// Deletes the key/value pair from the instance of `CacheD`. Delete is a 2 step process:
322    ///
323    /// 1) Marks the key as deleted in the `crate::cache::store::Store`. So, any `get` operations on the key would return None.
324    ///    This step is immediate.
325    ///
326    /// 2) Sends a `crate::cache::command::CommandType::Delete` to the `CommandExecutor` which causes the key weight to be removed from `AdmissionPolicy`.
327    ///    This step may happen at a later point in time.
328    ///
329    /// Since, `delete` is not an immediate operation, clients can `await` on the response to get the [`crate::cache::command::CommandStatus`]
330    /// ```
331    /// use tinylfu_cached::cache::cached::CacheD;
332    /// use tinylfu_cached::cache::command::CommandStatus;
333    /// use tinylfu_cached::cache::config::ConfigBuilder;
334    /// #[tokio::main]
335    ///  async fn main() {
336    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
337    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
338    ///     assert_eq!(CommandStatus::Accepted, status);
339    ///     let _ = cached.delete(&"topic").unwrap().handle().await;
340    ///     assert_eq!(None, cached.get(&"topic"));
341    /// }
342    /// ```
343    pub fn delete(&self, key: Key) -> CommandSendResult {
344        if self.is_shutting_down() { return shutdown_result(); }
345
346        self.store.mark_deleted(&key);
347        self.command_executor.send(CommandType::Delete(key))
348    }
349
350    /// Returns an optional reference to the key/value present in the instance of `Cached`.
351    ///
352    /// The reference is wrapped in [`crate::cache::store::key_value_ref::KeyValueRef`].
353    /// KeyValueRef contains DashMap's Ref [`dashmap::mapref::one::Ref`] which internally holds a `RwLockReadGuard` for the shard.
354    /// Any time `get_ref` method is invoked, the `Store` returns `Option<KeyValueRef<'_, Key, StoredValue<Value>>>`.
355    /// If the key is present in the `Store`, `get_ref` will return `Some<KeyValueRef<'_, Key, StoredValue<Value>>>`.
356    ///
357    /// Hence, the invocation of `get_ref` will hold a lock against the shard that contains the key (within the scope of its usage).
358    /// ```
359    /// use tinylfu_cached::cache::cached::CacheD;
360    /// use tinylfu_cached::cache::command::CommandStatus;
361    /// use tinylfu_cached::cache::config::ConfigBuilder;
362    /// #[tokio::main]
363    ///  async fn main() {
364    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
365    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
366    ///     assert_eq!(CommandStatus::Accepted, status);
367    ///     let value = cached.get_ref(&"topic");
368    ///     let value_ref = value.unwrap();
369    ///     let stored_value = value_ref.value();
370    ///     assert_eq!("microservices", stored_value.value());
371    /// }
372    /// ```
373    pub fn get_ref(&self, key: &Key) -> Option<KeyValueRef<'_, Key, StoredValue<Value>>> {
374        if self.is_shutting_down() { return None; }
375
376        if let Some(value_ref) = self.store.get_ref(key) {
377            self.mark_key_accessed(key);
378            return Some(value_ref);
379        }
380        None
381    }
382
383    /// Returns an optional MappedValue for key present in the instance of `Cached`.
384    ///
385    /// The parameter `map_fn` is an instance of `Fn` that takes a reference to [`crate::cache::store::stored_value::StoredValue`] and returns any MappedValue.
386    /// This is an extension to `get_ref` method.
387    /// If the key is present in `Cached`, it returns `Some(MappedValue)`, else returns `None`.
388    /// ```
389    /// use tinylfu_cached::cache::cached::CacheD;
390    /// use tinylfu_cached::cache::command::CommandStatus;
391    /// use tinylfu_cached::cache::config::ConfigBuilder;
392    /// #[tokio::main]
393    ///  async fn main() {
394    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
395    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
396    ///     assert_eq!(CommandStatus::Accepted, status);
397    ///     let value = cached.map_get_ref(&"topic", |stored_value| stored_value.value_ref().to_uppercase());
398    ///     assert_eq!("MICROSERVICES", value.unwrap());
399    /// }
400    /// ```
401    pub fn map_get_ref<MapFn, MappedValue>(&self, key: &Key, map_fn: MapFn) -> Option<MappedValue>
402        where MapFn: Fn(&StoredValue<Value>) -> MappedValue {
403        if self.is_shutting_down() { return None; }
404
405        if let Some(value_ref) = self.get_ref(key) {
406            return Some(map_fn(value_ref.value()));
407        }
408        None
409    }
410
411    /// Returns the total weight used in the cache.
412    pub fn total_weight_used(&self) -> Weight {
413        self.admission_policy.weight_used()
414    }
415
416    /// Returns an instance of [`crate::cache::stats::StatsSummary`].
417    /// ```
418    /// use tinylfu_cached::cache::cached::CacheD;
419    /// use tinylfu_cached::cache::config::ConfigBuilder;
420    /// use tinylfu_cached::cache::stats::StatsType;
421    /// #[tokio::main]
422    ///  async fn main() {
423    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 200).build());
424    ///     let _ = cached.put("topic", "microservices").unwrap().handle().await;
425    ///     let _ = cached.put("cache", "cached").unwrap().handle().await;
426    ///     let _ = cached.get(&"topic");
427    ///     let _ = cached.get(&"cache");
428    ///     let stats_summary = cached.stats_summary();
429    ///     assert_eq!(2, stats_summary.get(&StatsType::CacheHits).unwrap());
430    /// }
431    /// ```
432    pub fn stats_summary(&self) -> StatsSummary {
433        self.store.stats_counter().summary()
434    }
435
436    /// Shuts down the cache.
437    ///
438    /// Shutdown involves the following:
439    /// 1) Marking `is_shutting_down` to true
440    /// 2) Sending a `crate::cache::command::CommandType::Shutdown` to the `crate::cache::command::command_executor::CommandExecutor`
441    /// 3) Shutting down `crate::cache::expiration::TTLTicker`
442    /// 4) Clearing the data inside `crate::cache::store::Store`
443    /// 5) Clearing the data inside `crate::cache::policy::admission_policy::AdmissionPolicy`
444    /// 6) Clearing the data inside `crate::cache::expiration::TTLTicker`
445    ///
446    /// Any attempt to perform an operation after the `CacheD` instance is shutdown, will result in an error.
447    ///
448    /// However, there is race condition sort of a scenario here.
449    /// Consider that `shutdown()` and `put()` on an instance of `Cached` are invoked at the same time.
450    /// Both these operations result in sending different commands to the `CommandExecutor`.
451    /// Somehow, the `Shutdown` command goes in before the `put` command.
452    /// This also means that the client could have performed `await` operation on response from `put`.
453    /// It becomes important to finish the future of the `put` command that has come in at the same time `shutdown` was invoked.
454    ///
455    /// This is how `shutdown` in `CommandExecutor` is handled, it finishes all the futures in the pipeline that are placed after the `Shutdown` command.
456    /// All such futures ultimately get [`crate::cache::command::CommandStatus::ShuttingDown`].
457    pub fn shutdown(&self) {
458        if self.is_shutting_down.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed).is_ok() {
459            info!("Starting to shutdown cached");
460            let _ = self.command_executor.shutdown();
461            self.admission_policy.shutdown();
462            self.ttl_ticker.shutdown();
463
464            self.store.clear();
465            self.admission_policy.clear();
466            self.ttl_ticker.clear();
467        }
468    }
469
470    fn mark_key_accessed(&self, key: &Key) {
471        self.pool.add((self.config.key_hash_fn)(key));
472    }
473
474    fn key_description(&self, key: Key, weight: Weight) -> KeyDescription<Key> {
475        let hash = (self.config.key_hash_fn)(&key);
476        KeyDescription::new(key, self.id_generator.next(), hash, weight)
477    }
478
479    fn ttl_ticker(config: &Config<Key, Value>, store: Arc<Store<Key, Value>>, admission_policy: Arc<AdmissionPolicy<Key>>) -> Arc<TTLTicker> {
480        let store_evict_hook = move |key| {
481            store.delete(&key);
482        };
483        let cache_weight_evict_hook = move |key_id: &KeyId| {
484            admission_policy.delete_with_hook(key_id, &store_evict_hook);
485        };
486
487        TTLTicker::new(config.ttl_config(), cache_weight_evict_hook)
488    }
489
490    fn is_shutting_down(&self) -> bool {
491        self.is_shutting_down.load(Acquire)
492    }
493}
494
495impl<Key, Value> CacheD<Key, Value>
496    where Key: Hash + Eq + Send + Sync + Clone + 'static,
497          Value: Send + Sync + Clone + 'static {
498    /// Returns an optional reference to the Value in the instance of `Cached`.
499    ///
500    /// This method is only available if the Value type is Cloneable. This method clones the value and returns it to the client.
501    /// ```
502    /// use tinylfu_cached::cache::cached::CacheD;
503    /// use tinylfu_cached::cache::command::CommandStatus;
504    /// use tinylfu_cached::cache::config::ConfigBuilder;
505    /// #[tokio::main]
506    ///  async fn main() {
507    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
508    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
509    ///     assert_eq!(CommandStatus::Accepted, status);
510    ///     let value = cached.get(&"topic");
511    ///     assert_eq!(Some("microservices"), value);
512    /// }
513    /// ```
514    pub fn get(&self, key: &Key) -> Option<Value> {
515        if self.is_shutting_down() { return None; }
516
517        if let Some(value) = self.store.get(key) {
518            self.mark_key_accessed(key);
519            return Some(value);
520        }
521        None
522    }
523
524    /// Returns an optional MappedValue for key present in the instance of `Cached`.
525    ///
526    /// The parameter `map_fn` is an instance of `Fn` that takes the cloned Value and returns any MappedValue
527    /// This is an extension to the `get` method.
528    ///
529    /// This method is only available if the Value type is Cloneable.
530    /// If the key is present in `Cached`, it returns `Some(MappedValue)`, else returns `None`.
531    /// ```
532    /// use tinylfu_cached::cache::cached::CacheD;
533    /// use tinylfu_cached::cache::command::CommandStatus;
534    /// use tinylfu_cached::cache::config::ConfigBuilder;
535    /// #[tokio::main]
536    ///  async fn main() {
537    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
538    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
539    ///     assert_eq!(CommandStatus::Accepted, status);
540    ///     let value = cached.map_get(&"topic", |value| value.to_uppercase());
541    ///     assert_eq!("MICROSERVICES", value.unwrap());
542    /// }
543    /// ```
544    pub fn map_get<MapFn, MappedValue>(&self, key: &Key, map_fn: MapFn) -> Option<MappedValue>
545        where MapFn: Fn(Value) -> MappedValue {
546        if self.is_shutting_down() { return None; }
547
548        if let Some(value) = self.get(key) {
549            return Some(map_fn(value));
550        }
551        None
552    }
553
554    /// Returns values corresponding to multiple keys.
555    ///
556    /// It takes a vector of reference of keys and returns a `HashMap` containing the key reference and the optional Value.
557    /// If the value is present for a key, the returned `HashMap` will contain the key reference and `Some(Value)`.
558    /// If the value is not present for a key, the returned `HashMap` will contain the key reference and `None` as the value.
559    ///
560    /// This method is only available if the Value type is Cloneable.
561    /// ```
562    /// use tinylfu_cached::cache::cached::CacheD;
563    /// use tinylfu_cached::cache::config::ConfigBuilder;
564    /// #[tokio::main]
565    ///  async fn main() {
566    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
567    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
568    ///     let values = cached.multi_get(vec![&"topic", &"non-existing"]);
569    ///     assert_eq!(&Some("microservices"), values.get(&"topic").unwrap());
570    ///     assert_eq!(&None, values.get(&"non-existing").unwrap());
571    /// }
572    /// ```
573    pub fn multi_get<'a>(&self, keys: Vec<&'a Key>) -> HashMap<&'a Key, Option<Value>> {
574        if self.is_shutting_down() { return HashMap::new(); }
575
576        keys.into_iter().map(|key| (key, self.get(key))).collect::<HashMap<_, _>>()
577    }
578
579    /// Returns an instance of [`MultiGetIterator`] that allows iterating over multiple keys and getting the value corresponding to each key.
580    ///
581    /// It takes a vector of reference of keys and an instance of `MultiGetIterator`
582    ///
583    /// This method is only available if the Value type is Cloneable.
584    /// ```
585    /// use tinylfu_cached::cache::cached::CacheD;
586    /// use tinylfu_cached::cache::config::ConfigBuilder;
587    /// #[tokio::main]
588    ///  async fn main() {
589    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
590    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
591    ///     let mut iterator = cached.multi_get_iterator(vec![&"topic", &"non-existing"]);
592    ///     assert_eq!(Some("microservices"), iterator.next().unwrap());
593    ///     assert_eq!(None, iterator.next().unwrap());
594    ///     assert_eq!(None, iterator.next());
595    /// }
596    /// ```
597    pub fn multi_get_iterator<'a>(&'a self, keys: Vec<&'a Key>) -> MultiGetIterator<'a, Key, Value> {
598        MultiGetIterator {
599            cache: self,
600            keys,
601        }
602    }
603
604    /// Returns an instance of [`MultiGetMapIterator`] that allows iterating over multiple keys, performing a map operation over each key and then getting the value corresponding to each key.
605    ///
606    /// It takes a vector of reference of keys and an instance of `MultiGetIterator`.
607    ///
608    /// This method is only available if the Value type is Cloneable.
609    /// ```
610    /// use tinylfu_cached::cache::cached::CacheD;
611    /// use tinylfu_cached::cache::config::ConfigBuilder;
612    /// #[tokio::main]
613    ///  async fn main() {
614    ///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
615    ///     let status = cached.put("topic", "microservices").unwrap().handle().await;
616    ///     let mut iterator = cached.multi_get_map_iterator(vec![&"topic", &"non-existing"], |value| value.to_uppercase());
617    ///     assert_eq!(Some("MICROSERVICES".to_string()), iterator.next().unwrap());
618    ///     assert_eq!(None, iterator.next().unwrap());
619    ///     assert_eq!(None, iterator.next());
620    /// }
621    /// ```
622    pub fn multi_get_map_iterator<'a, MapFn, MappedValue>(&'a self, keys: Vec<&'a Key>, map_fn: MapFn) -> MultiGetMapIterator<'a, Key, Value, MapFn, MappedValue>
623        where MapFn: Fn(Value) -> MappedValue {
624        MultiGetMapIterator {
625            iterator: MultiGetIterator {
626                cache: self,
627                keys,
628            },
629            map_fn,
630        }
631    }
632}
633
634/// `MultiGetIterator` allows iterating over multiple keys and getting the value corresponding to each key.
635/// ```
636/// use tinylfu_cached::cache::cached::CacheD;
637/// use tinylfu_cached::cache::config::ConfigBuilder;
638/// #[tokio::main]
639///  async fn main() {
640///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
641///     let status = cached.put("topic", "microservices").unwrap().handle().await;
642///     let mut iterator = cached.multi_get_iterator(vec![&"topic", &"non-existing"]);
643///     assert_eq!(Some("microservices"), iterator.next().unwrap());
644///     assert_eq!(None, iterator.next().unwrap());
645///     assert_eq!(None, iterator.next());
646/// }
647/// ```
648pub struct MultiGetIterator<'a, Key, Value>
649    where Key: Hash + Eq + Send + Sync + Clone + 'static,
650          Value: Send + Sync + Clone + 'static {
651    cache: &'a CacheD<Key, Value>,
652    keys: Vec<&'a Key>,
653}
654
655impl<'a, Key, Value> Iterator for MultiGetIterator<'a, Key, Value>
656    where Key: Hash + Eq + Send + Sync + Clone + 'static,
657          Value: Send + Sync + Clone + 'static {
658    type Item = Option<Value>;
659
660    fn next(&mut self) -> Option<Self::Item> {
661        if self.keys.is_empty() || self.cache.is_shutting_down() {
662            return None;
663        }
664        let key = self.keys.get(0).unwrap();
665        let value = self.cache.get(key);
666
667        self.keys.remove(0);
668        Some(value)
669    }
670}
671
672/// `MultiGetMapIterator` allows iterating over multiple keys, performing a map operation over each key and then getting the value corresponding to each key.
673/// ```
674/// use tinylfu_cached::cache::cached::CacheD;
675/// use tinylfu_cached::cache::config::ConfigBuilder;
676/// #[tokio::main]
677///  async fn main() {
678///     let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
679///     let status = cached.put("topic", "microservices").unwrap().handle().await;
680///     let mut iterator = cached.multi_get_map_iterator(vec![&"topic", &"non-existing"], |value| value.to_uppercase());
681///     assert_eq!(Some("MICROSERVICES".to_string()), iterator.next().unwrap());
682///     assert_eq!(None, iterator.next().unwrap());
683///     assert_eq!(None, iterator.next());
684/// }
685/// ```
686pub struct MultiGetMapIterator<'a, Key, Value, MapFn, MappedValue>
687    where Key: Hash + Eq + Send + Sync + Clone + 'static,
688          Value: Send + Sync + Clone + 'static,
689          MapFn: Fn(Value) -> MappedValue, {
690    iterator: MultiGetIterator<'a, Key, Value>,
691    map_fn: MapFn,
692}
693
694impl<'a, Key, Value, MapFn, MappedValue> Iterator for MultiGetMapIterator<'a, Key, Value, MapFn, MappedValue>
695    where Key: Hash + Eq + Send + Sync + Clone + 'static,
696          Value: Send + Sync + Clone + 'static,
697          MapFn: Fn(Value) -> MappedValue, {
698    type Item = Option<MappedValue>;
699
700    fn next(&mut self) -> Option<Self::Item> {
701        self.iterator.next().map(|optional_value| {
702            match optional_value {
703                None => None,
704                Some(value) => Some((self.map_fn)(value))
705            }
706        })
707    }
708}
709
710
711#[cfg(test)]
712mod tests {
713    use std::sync::Arc;
714    use std::thread;
715    use std::time::Duration;
716
717    use crate::cache::cached::CacheD;
718    use crate::cache::command::{CommandStatus, RejectionReason};
719    use crate::cache::config::{ConfigBuilder, WeightCalculationFn};
720    use crate::cache::put_or_update::{PutOrUpdateRequest, PutOrUpdateRequestBuilder};
721    use crate::cache::stats::StatsType;
722
723    #[derive(Eq, PartialEq, Debug)]
724    struct Name {
725        first: String,
726        last: String,
727    }
728
729    mod setup {
730        use std::time::SystemTime;
731
732        use crate::cache::clock::Clock;
733
734        #[derive(Clone)]
735        pub(crate) struct UnixEpochClock;
736
737        impl Clock for UnixEpochClock {
738            fn now(&self) -> SystemTime {
739                SystemTime::UNIX_EPOCH
740            }
741        }
742    }
743
744    fn test_config_builder() -> ConfigBuilder<&'static str, &'static str> {
745        ConfigBuilder::new(100, 10, 100)
746    }
747
748    #[test]
749    #[should_panic]
750    fn shards_mut_be_power_of_2_and_greater_than_1() {
751        let _: CacheD<&str, &str> = CacheD::new(test_config_builder().shards(1).build());
752    }
753
754    #[test]
755    #[should_panic]
756    fn weight_must_be_greater_than_zero_1() {
757        let cached = CacheD::new(test_config_builder().build());
758        let _ =
759            cached.put_with_weight("topic", "microservices", 0).unwrap();
760    }
761
762    #[test]
763    #[should_panic]
764    fn weight_must_be_greater_than_zero_2() {
765        let cached = CacheD::new(test_config_builder().build());
766        let _ =
767            cached.put_with_weight_and_ttl("topic", "microservices", 0, Duration::from_secs(5)).unwrap();
768    }
769
770    #[test]
771    #[should_panic]
772    fn weight_calculation_fn_must_return_weight_greater_than_zero_1() {
773        let weight_calculation: Box<WeightCalculationFn<&str, &str>> = Box::new(|_key, _value, _is_time_to_live_specified| 0);
774        let cached = CacheD::new(test_config_builder().weight_calculation_fn(weight_calculation).build());
775        let _ =
776            cached.put("topic", "microservices").unwrap();
777    }
778
779    #[test]
780    #[should_panic]
781    fn weight_calculation_fn_must_return_weight_greater_than_zero_2() {
782        let weight_calculation: Box<WeightCalculationFn<&str, &str>> = Box::new(|_key, _value, _is_time_to_live_specified| 0);
783        let cached = CacheD::new(test_config_builder().weight_calculation_fn(weight_calculation).build());
784        let _ =
785            cached.put_with_ttl("topic", "microservices", Duration::from_secs(5)).unwrap();
786    }
787
788    #[test]
789    #[should_panic]
790    fn put_or_update_results_in_put_value_must_be_present() {
791        let cached = CacheD::new(test_config_builder().build());
792        let put_or_update: PutOrUpdateRequest<&str, &str> = PutOrUpdateRequestBuilder::new("store").build();
793        let _ = cached.put_or_update(put_or_update);
794    }
795
796    #[test]
797    #[should_panic]
798    fn put_or_update_results_in_put_with_weight_calculation_fn_must_return_weight_greater_than_zero() {
799        let weight_calculation: Box<WeightCalculationFn<&str, &str>> = Box::new(|_key, _value, _is_time_to_live_specified| 0);
800        let cached = CacheD::new(test_config_builder().weight_calculation_fn(weight_calculation).build());
801
802        let put_or_update = PutOrUpdateRequestBuilder::new("store").value("cached").build();
803        let _ = cached.put_or_update(put_or_update);
804    }
805
806    #[tokio::test]
807    #[should_panic]
808    async fn put_or_update_results_in_update_with_weight_calculation_fn_must_return_weight_greater_than_zero() {
809        let weight_calculation: Box<WeightCalculationFn<&str, &str>> = Box::new(|_key, _value, _is_time_to_live_specified| 0);
810        let cached = CacheD::new(test_config_builder().weight_calculation_fn(weight_calculation).build());
811        cached.put("topic", "microservices").unwrap().handle().await;
812
813        let put_or_update = PutOrUpdateRequestBuilder::new("topic").value("cached").build();
814        let _ = cached.put_or_update(put_or_update);
815    }
816
817
818    #[tokio::test]
819    #[should_panic]
820    async fn put_or_update_results_in_update_with_weight_must_be_greater_than_zero() {
821        let cached = CacheD::new(test_config_builder().build());
822        cached.put("topic", "microservices").unwrap().handle().await;
823
824        let put_or_update = PutOrUpdateRequestBuilder::new("topic").value("cached").weight(0).build();
825        let _ = cached.put_or_update(put_or_update);
826    }
827
828    #[tokio::test]
829    async fn put_a_key_value_without_weight_and_ttl() {
830        let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
831
832        let key: u64 = 100;
833        let value: u64 = 1000;
834
835        let acknowledgement =
836            cached.put(key, value).unwrap();
837        acknowledgement.handle().await;
838
839        let value = cached.get_ref(&100);
840        let value_ref = value.unwrap();
841        let stored_value = value_ref.value();
842        let key_id = stored_value.key_id();
843
844        assert_eq!(1000, stored_value.value());
845        assert_eq!(Some(40), cached.admission_policy.weight_of(&key_id));
846    }
847
848    #[tokio::test]
849    async fn put_a_key_value_without_weight_with_ttl() {
850        let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
851
852        let key: u64 = 100;
853        let value: u64 = 1000;
854
855        let acknowledgement =
856            cached.put_with_ttl(key, value, Duration::from_secs(300)).unwrap();
857        acknowledgement.handle().await;
858
859        let value = cached.get_ref(&100);
860        let value_ref = value.unwrap();
861        let stored_value = value_ref.value();
862        let key_id = stored_value.key_id();
863
864        assert_eq!(1000, stored_value.value());
865        assert_eq!(Some(64), cached.admission_policy.weight_of(&key_id));
866        assert!(stored_value.expire_after().is_some());
867    }
868
869    #[tokio::test]
870    async fn put_the_same_key_value_again() {
871        let cached = CacheD::new(ConfigBuilder::new(100, 10, 100).build());
872
873        let key: u64 = 100;
874        let value: u64 = 1000;
875
876        let acknowledgement = cached.put(key, value).unwrap();
877        acknowledgement.handle().await;
878
879        let acknowledgement = cached.put(key, value).unwrap();
880        let status = acknowledgement.handle().await;
881
882        assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), status);
883
884        let value = cached.get_ref(&100);
885        let value_ref = value.unwrap();
886        let stored_value = value_ref.value();
887
888        assert_eq!(1000, stored_value.value());
889        assert_eq!(40, cached.total_weight_used());
890    }
891
892    #[tokio::test]
893    async fn put_a_key_value_with_weight() {
894        let cached = CacheD::new(test_config_builder().build());
895
896        let acknowledgement =
897            cached.put_with_weight("topic", "microservices", 50).unwrap();
898        acknowledgement.handle().await;
899
900        let value = cached.get_ref(&"topic");
901        let value_ref = value.unwrap();
902        let stored_value = value_ref.value();
903        let key_id = stored_value.key_id();
904
905        assert_eq!("microservices", stored_value.value());
906        assert_eq!(Some(50), cached.admission_policy.weight_of(&key_id));
907    }
908
909    #[tokio::test]
910    async fn put_a_key_value_with_weight_again() {
911        let cached = CacheD::new(test_config_builder().build());
912
913        let acknowledgement =
914            cached.put_with_weight("topic", "microservices", 50).unwrap();
915        acknowledgement.handle().await;
916
917        let acknowledgement =
918            cached.put_with_weight("topic", "microservices", 50).unwrap();
919        let status = acknowledgement.handle().await;
920
921        assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), status);
922
923        let value = cached.get_ref(&"topic");
924        let value_ref = value.unwrap();
925        let stored_value = value_ref.value();
926        let key_id = stored_value.key_id();
927
928        assert_eq!("microservices", stored_value.value());
929        assert_eq!(Some(50), cached.admission_policy.weight_of(&key_id));
930        assert_eq!(50, cached.total_weight_used());
931    }
932
933    #[tokio::test]
934    async fn put_a_key_value_with_ttl() {
935        let cached = CacheD::new(test_config_builder().build());
936
937        let acknowledgement =
938            cached.put_with_ttl("topic", "microservices", Duration::from_secs(120)).unwrap();
939        acknowledgement.handle().await;
940
941        let value = cached.get(&"topic");
942        assert_eq!(Some("microservices"), value);
943    }
944
945    #[tokio::test]
946    async fn put_a_key_value_with_ttl_again() {
947        let cached = CacheD::new(test_config_builder().build());
948
949        let acknowledgement =
950            cached.put_with_ttl("topic", "microservices", Duration::from_secs(120)).unwrap();
951        acknowledgement.handle().await;
952
953        let acknowledgement =
954            cached.put_with_ttl("topic", "microservices", Duration::from_secs(120)).unwrap();
955        let status = acknowledgement.handle().await;
956
957        assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), status);
958
959        let value = cached.get(&"topic");
960        assert_eq!(Some("microservices"), value);
961    }
962
963    #[tokio::test]
964    async fn put_a_key_value_with_weight_and_ttl() {
965        let cached = CacheD::new(test_config_builder().build());
966
967        let acknowledgement =
968            cached.put_with_weight_and_ttl("topic", "microservices", 10, Duration::from_secs(120)).unwrap();
969        acknowledgement.handle().await;
970
971        let value = cached.get(&"topic");
972        assert_eq!(Some("microservices"), value);
973    }
974
975    #[tokio::test]
976    async fn put_a_key_value_with_weight_and_ttl_again() {
977        let cached = CacheD::new(test_config_builder().build());
978
979        let acknowledgement =
980            cached.put_with_weight_and_ttl("topic", "microservices", 10, Duration::from_secs(120)).unwrap();
981        acknowledgement.handle().await;
982
983        let acknowledgement =
984            cached.put_with_weight_and_ttl("topic", "microservices", 10, Duration::from_secs(120)).unwrap();
985        let status = acknowledgement.handle().await;
986        assert_eq!(CommandStatus::Rejected(RejectionReason::KeyAlreadyExists), status);
987
988        let value = cached.get(&"topic");
989        assert_eq!(Some("microservices"), value);
990        assert_eq!(10, cached.total_weight_used());
991    }
992
993    #[tokio::test]
994    async fn put_a_key_value_with_ttl_and_ttl_ticker_evicts_it() {
995        let cached = CacheD::new(test_config_builder().shards(2).ttl_tick_duration(Duration::from_millis(10)).build());
996
997        let acknowledgement =
998            cached.put_with_ttl("topic", "microservices", Duration::from_millis(20)).unwrap();
999        acknowledgement.handle().await;
1000
1001        let value = cached.get(&"topic");
1002        assert_eq!(Some("microservices"), value);
1003
1004        thread::sleep(Duration::from_millis(20));
1005        assert_eq!(None, cached.get(&"topic"));
1006    }
1007
1008    #[test]
1009    fn get_value_ref_for_a_non_existing_key() {
1010        let cached: CacheD<&str, &str> = CacheD::new(test_config_builder().build());
1011
1012        let value = cached.get_ref(&"non-existing");
1013        assert!(value.is_none());
1014    }
1015
1016    #[test]
1017    fn get_value_ref_for_a_non_existing_key_and_attempt_to_map_it() {
1018        let cached: CacheD<&str, &str> = CacheD::new(test_config_builder().build());
1019
1020        let value = cached.map_get_ref(&"non_existing", |stored_value| stored_value.value_ref().to_uppercase());
1021        assert!(value.is_none());
1022    }
1023
1024    #[tokio::test]
1025    async fn get_value_ref_for_an_existing_key() {
1026        let cached = CacheD::new(test_config_builder().build());
1027
1028        let acknowledgement =
1029            cached.put("topic", "microservices").unwrap();
1030        acknowledgement.handle().await;
1031
1032        let value = cached.get_ref(&"topic");
1033        assert_eq!(&"microservices", value.unwrap().value().value_ref());
1034    }
1035
1036    #[tokio::test]
1037    async fn get_value_ref_for_an_existing_key_and_map_it() {
1038        let cached = CacheD::new(test_config_builder().build());
1039
1040        let acknowledgement =
1041            cached.put("topic", "microservices").unwrap();
1042        acknowledgement.handle().await;
1043
1044        let value = cached.map_get_ref(&"topic", |stored_value| stored_value.value_ref().to_uppercase());
1045        assert_eq!("MICROSERVICES", value.unwrap());
1046    }
1047
1048    #[tokio::test]
1049    async fn get_value_for_an_existing_key() {
1050        let cached = CacheD::new(test_config_builder().build());
1051
1052        let acknowledgement =
1053            cached.put("topic", "microservices").unwrap();
1054        acknowledgement.handle().await;
1055
1056        let value = cached.get(&"topic");
1057        assert_eq!(Some("microservices"), value);
1058    }
1059
1060    #[tokio::test]
1061    async fn get_value_for_an_existing_key_and_map_it() {
1062        let cached = CacheD::new(test_config_builder().build());
1063
1064        let acknowledgement =
1065            cached.put("topic", "microservices").unwrap();
1066        acknowledgement.handle().await;
1067
1068        let value = cached.map_get(&"topic", |value| value.to_uppercase());
1069        assert_eq!("MICROSERVICES", value.unwrap());
1070    }
1071
1072    #[test]
1073    fn get_value_for_a_non_existing_key() {
1074        let cached: CacheD<&str, &str> = CacheD::new(test_config_builder().build());
1075
1076        let value = cached.get(&"non-existing");
1077        assert_eq!(None, value);
1078    }
1079
1080    #[test]
1081    fn get_value_for_a_non_existing_key_and_attempt_to_map_it() {
1082        let cached: CacheD<&str, &str> = CacheD::new(test_config_builder().build());
1083
1084        let value = cached.map_get(&"topic", |value| value.to_uppercase());
1085        assert_eq!(None, value);
1086    }
1087
1088    #[tokio::test]
1089    async fn get_value_ref_for_an_existing_key_if_value_is_not_cloneable() {
1090        let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1091
1092        let acknowledgement =
1093            cached.put("name", Name { first: "John".to_string(), last: "Mcnamara".to_string() }).unwrap();
1094        acknowledgement.handle().await;
1095
1096        let value = cached.get_ref(&"name");
1097        assert_eq!(&Name { first: "John".to_string(), last: "Mcnamara".to_string() }, value.unwrap().value().value_ref());
1098    }
1099
1100    #[tokio::test]
1101    async fn get_value_for_an_existing_key_if_value_is_not_cloneable_by_passing_an_arc() {
1102        let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1103
1104        let acknowledgement =
1105            cached.put("name", Arc::new(Name { first: "John".to_string(), last: "Mcnamara".to_string() })).unwrap();
1106        acknowledgement.handle().await;
1107
1108        let value = cached.get(&"name").unwrap();
1109        assert_eq!("John".to_string(), value.first);
1110        assert_eq!("Mcnamara".to_string(), value.last);
1111    }
1112
1113    #[tokio::test]
1114    async fn delete_a_key() {
1115        let cached = CacheD::new(test_config_builder().build());
1116
1117        let acknowledgement =
1118            cached.put("topic", "microservices").unwrap();
1119        acknowledgement.handle().await;
1120
1121        let key_id = {
1122            let key_value_ref = cached.get_ref(&"topic").unwrap();
1123            key_value_ref.value().key_id()
1124        };
1125
1126        let acknowledgement =
1127            cached.delete("topic").unwrap();
1128        acknowledgement.handle().await;
1129
1130        let value = cached.get(&"topic");
1131        assert_eq!(None, value);
1132        assert!(!cached.admission_policy.contains(&key_id));
1133    }
1134
1135    #[tokio::test]
1136    async fn get_access_frequency() {
1137        let cached = CacheD::new(ConfigBuilder::new(10, 10, 1000).access_pool_size(1).access_buffer_size(3).build());
1138
1139        let acknowledgement_topic =
1140            cached.put("topic", "microservices").unwrap();
1141        let acknowledgement_disk =
1142            cached.put("disk", "SSD").unwrap();
1143
1144        acknowledgement_topic.handle().await;
1145        acknowledgement_disk.handle().await;
1146
1147        cached.get(&"topic");
1148        cached.get(&"disk");
1149        cached.get(&"topic");
1150        cached.get(&"disk"); //will cause the drain of the buffer which will have 2 accesses of topic and one for disk
1151
1152        thread::sleep(Duration::from_secs(2));
1153
1154        let hasher = &(cached.config.key_hash_fn);
1155        let policy = cached.admission_policy;
1156
1157        assert_eq!(2, policy.estimate(hasher(&"topic")));
1158        assert_eq!(1, policy.estimate(hasher(&"disk")));
1159    }
1160
1161    #[tokio::test]
1162    async fn get_multiple_keys() {
1163        let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1164
1165        let acknowledgement =
1166            cached.put("topic", "microservices").unwrap();
1167        acknowledgement.handle().await;
1168
1169        let acknowledgement =
1170            cached.put("disk", "SSD").unwrap();
1171        acknowledgement.handle().await;
1172
1173        let acknowledgement =
1174            cached.put("cache", "in-memory").unwrap();
1175        acknowledgement.handle().await;
1176
1177        let values = cached.multi_get(vec![&"topic", &"non-existing", &"cache", &"disk"]);
1178
1179        assert_eq!(&Some("microservices"), values.get(&"topic").unwrap());
1180        assert_eq!(&None, values.get(&"non-existing").unwrap());
1181        assert_eq!(&Some("in-memory"), values.get(&"cache").unwrap());
1182        assert_eq!(&Some("SSD"), values.get(&"disk").unwrap());
1183    }
1184
1185    #[tokio::test]
1186    async fn get_multiple_keys_via_an_iterator() {
1187        let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1188
1189        let acknowledgement =
1190            cached.put("topic", "microservices").unwrap();
1191        acknowledgement.handle().await;
1192
1193        let acknowledgement =
1194            cached.put("disk", "SSD").unwrap();
1195        acknowledgement.handle().await;
1196
1197        let acknowledgement =
1198            cached.put("cache", "in-memory").unwrap();
1199        acknowledgement.handle().await;
1200
1201        let mut iterator = cached.multi_get_iterator(vec![&"topic", &"non-existing", &"cache", &"disk"]);
1202        assert_eq!(Some("microservices"), iterator.next().unwrap());
1203        assert_eq!(None, iterator.next().unwrap());
1204        assert_eq!(Some("in-memory"), iterator.next().unwrap());
1205        assert_eq!(Some("SSD"), iterator.next().unwrap());
1206        assert_eq!(None, iterator.next());
1207    }
1208
1209    #[tokio::test]
1210    async fn get_multiple_keys_via_an_iterator_given_value_is_not_cloneable() {
1211        let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1212
1213        let acknowledgement =
1214            cached.put("captain", Arc::new(Name { first: "John".to_string(), last: "Mcnamara".to_string() })).unwrap();
1215        acknowledgement.handle().await;
1216
1217        let acknowledgement =
1218            cached.put("vice-captain", Arc::new(Name { first: "Martin".to_string(), last: "Trolley".to_string() })).unwrap();
1219        acknowledgement.handle().await;
1220
1221        let mut iterator = cached.multi_get_iterator(vec![&"captain", &"vice-captain", &"disk"]);
1222        assert_eq!("John", iterator.next().unwrap().unwrap().first);
1223        assert_eq!("Martin", iterator.next().unwrap().unwrap().first);
1224        assert_eq!(None, iterator.next().unwrap());
1225    }
1226
1227    #[tokio::test]
1228    async fn map_multiple_keys_via_an_iterator() {
1229        let cached = CacheD::new(ConfigBuilder::new(100, 10, 1000).build());
1230
1231        let acknowledgement =
1232            cached.put("topic", "microservices").unwrap();
1233        acknowledgement.handle().await;
1234
1235        let acknowledgement =
1236            cached.put("disk", "ssd").unwrap();
1237        acknowledgement.handle().await;
1238
1239        let acknowledgement =
1240            cached.put("cache", "in-memory").unwrap();
1241        acknowledgement.handle().await;
1242
1243        let mut iterator = cached.multi_get_map_iterator(vec![&"topic", &"non-existing", &"cache", &"disk"], |value| value.to_uppercase());
1244        assert_eq!(Some("MICROSERVICES".to_string()), iterator.next().unwrap());
1245        assert_eq!(None, iterator.next().unwrap());
1246        assert_eq!(Some("IN-MEMORY".to_string()), iterator.next().unwrap());
1247        assert_eq!(Some("SSD".to_string()), iterator.next().unwrap());
1248        assert_eq!(None, iterator.next());
1249    }
1250
1251    #[tokio::test]
1252    async fn total_weight_used() {
1253        let cached = CacheD::new(test_config_builder().build());
1254
1255        let acknowledgement =
1256            cached.put_with_weight("topic", "microservices", 50).unwrap();
1257        acknowledgement.handle().await;
1258
1259        assert_eq!(50, cached.total_weight_used());
1260    }
1261
1262    #[tokio::test]
1263    async fn stats_summary() {
1264        let cached = CacheD::new(test_config_builder().build());
1265
1266        cached.put_with_weight("topic", "microservices", 50).unwrap().handle().await;
1267        cached.put_with_weight("cache", "cached", 10).unwrap().handle().await;
1268        cached.delete("cache").unwrap().handle().await;
1269
1270        let _ = cached.get(&"topic");
1271        let _ = cached.get(&"cache");
1272
1273        let summary = cached.stats_summary();
1274        assert_eq!(1, summary.get(&StatsType::CacheMisses).unwrap());
1275        assert_eq!(1, summary.get(&StatsType::CacheHits).unwrap());
1276        assert_eq!(60, summary.get(&StatsType::WeightAdded).unwrap());
1277        assert_eq!(10, summary.get(&StatsType::WeightRemoved).unwrap());
1278        assert_eq!(2, summary.get(&StatsType::KeysAdded).unwrap());
1279        assert_eq!(1, summary.get(&StatsType::KeysDeleted).unwrap());
1280
1281        assert_eq!(0, summary.get(&StatsType::KeysRejected).unwrap());
1282        assert_eq!(0, summary.get(&StatsType::AccessAdded).unwrap());
1283        assert_eq!(0, summary.get(&StatsType::AccessDropped).unwrap());
1284    }
1285}
1286
1287#[cfg(test)]
1288mod shutdown_tests {
1289    use std::sync::Arc;
1290    use std::sync::atomic::Ordering;
1291    use std::thread;
1292    use std::time::Duration;
1293
1294    use async_std::future::timeout;
1295    use tokio::time::sleep;
1296
1297    use crate::cache::cached::CacheD;
1298    use crate::cache::config::ConfigBuilder;
1299    use crate::cache::put_or_update::PutOrUpdateRequestBuilder;
1300
1301    fn test_config_builder() -> ConfigBuilder<&'static str, &'static str> {
1302        ConfigBuilder::new(100, 10, 100)
1303    }
1304
1305    #[test]
1306    fn put_after_shutdown() {
1307        let cached = CacheD::new(test_config_builder().build());
1308        cached.shutdown();
1309
1310        let put_result = cached.put("storage", "cached");
1311        assert!(put_result.is_err());
1312    }
1313
1314    #[test]
1315    fn put_with_weight_after_shutdown() {
1316        let cached = CacheD::new(test_config_builder().build());
1317        cached.shutdown();
1318
1319        let put_result = cached.put_with_weight("storage", "cached", 10);
1320        assert!(put_result.is_err());
1321    }
1322
1323    #[test]
1324    fn put_with_ttl_after_shutdown() {
1325        let cached = CacheD::new(test_config_builder().build());
1326        cached.shutdown();
1327
1328        let put_result = cached.put_with_ttl("storage", "cached", Duration::from_secs(5));
1329        assert!(put_result.is_err());
1330    }
1331
1332    #[test]
1333    fn put_with_weight_and_ttl_after_shutdown() {
1334        let cached = CacheD::new(test_config_builder().build());
1335        cached.shutdown();
1336
1337        let put_result = cached.put_with_weight_and_ttl("storage", "cached", 10, Duration::from_secs(5));
1338        assert!(put_result.is_err());
1339    }
1340
1341    #[test]
1342    fn delete_after_shutdown() {
1343        let cached = CacheD::new(test_config_builder().build());
1344        cached.shutdown();
1345
1346        let delete_result = cached.delete("storage");
1347        assert!(delete_result.is_err());
1348    }
1349
1350    #[test]
1351    fn put_or_update_after_shutdown() {
1352        let cached = CacheD::new(test_config_builder().build());
1353        cached.shutdown();
1354
1355        let put_or_update_result = cached.put_or_update(PutOrUpdateRequestBuilder::new("storage").weight(10).build());
1356        assert!(put_or_update_result.is_err());
1357    }
1358
1359    #[tokio::test]
1360    async fn get_after_shutdown() {
1361        let cached = CacheD::new(test_config_builder().build());
1362        cached.put("storage", "cached").unwrap().handle().await;
1363        cached.shutdown();
1364
1365        let get_result = cached.get(&"storage");
1366        assert_eq!(None, get_result);
1367    }
1368
1369    #[tokio::test]
1370    async fn get_ref_after_shutdown() {
1371        let cached = CacheD::new(test_config_builder().build());
1372        cached.put("storage", "cached").unwrap().handle().await;
1373        cached.shutdown();
1374
1375        let get_result = cached.get_ref(&"storage");
1376        assert!(get_result.is_none());
1377    }
1378
1379    #[tokio::test]
1380    async fn map_get_after_shutdown() {
1381        let cached = CacheD::new(test_config_builder().build());
1382        cached.put("storage", "cached").unwrap().handle().await;
1383        cached.shutdown();
1384
1385        let get_result = cached.map_get(&"storage", |value| value.to_uppercase());
1386        assert!(get_result.is_none());
1387    }
1388
1389    #[tokio::test]
1390    async fn map_get_ref_after_shutdown() {
1391        let cached = CacheD::new(test_config_builder().build());
1392        cached.put("storage", "cached").unwrap().handle().await;
1393        cached.shutdown();
1394
1395        let get_result = cached.map_get_ref(&"storage", |stored_value| stored_value.value_ref().to_uppercase());
1396        assert!(get_result.is_none());
1397    }
1398
1399    #[tokio::test]
1400    async fn multi_get_after_shutdown() {
1401        let cached = CacheD::new(test_config_builder().build());
1402        cached.put("storage", "cached").unwrap().handle().await;
1403        cached.put("topic", "microservices").unwrap().handle().await;
1404
1405        cached.shutdown();
1406
1407        let multi_get_result = cached.multi_get(vec![&"storage", &"topic"]);
1408        assert!(multi_get_result.is_empty());
1409    }
1410
1411    #[tokio::test]
1412    async fn multi_get_iterator_after_shutdown() {
1413        let cached = CacheD::new(test_config_builder().build());
1414        cached.put("storage", "cached").unwrap().handle().await;
1415        cached.put("topic", "microservices").unwrap().handle().await;
1416
1417        cached.shutdown();
1418
1419        let mut iterator = cached.multi_get_iterator(vec![&"storage", &"topic"]);
1420        assert!(iterator.next().is_none());
1421    }
1422
1423    #[tokio::test]
1424    async fn multi_get_map_iterator_after_shutdown() {
1425        let cached = CacheD::new(test_config_builder().build());
1426        cached.put("storage", "cached").unwrap().handle().await;
1427        cached.put("topic", "microservices").unwrap().handle().await;
1428
1429        cached.shutdown();
1430
1431        let mut iterator = cached.multi_get_map_iterator(vec![&"storage", &"topic"], |value| { value.to_uppercase() });
1432        assert!(iterator.next().is_none());
1433    }
1434
1435    #[tokio::test]
1436    async fn shutdown() {
1437        let cached = CacheD::new(test_config_builder().build());
1438
1439        cached.put_with_weight("topic", "microservices", 50).unwrap().handle().await;
1440        cached.put("cache", "cached").unwrap().handle().await;
1441
1442        cached.shutdown();
1443        assert!(cached.is_shutting_down.load(Ordering::Acquire));
1444
1445        let put_result = cached.put("storage", "cached");
1446        assert!(put_result.is_err());
1447
1448        assert_eq!(0, cached.total_weight_used());
1449        assert_eq!(None, cached.get(&"topic"));
1450        assert_eq!(None, cached.get(&"cache"));
1451    }
1452
1453    #[tokio::test]
1454    async fn concurrent_shutdown() {
1455        let cached = Arc::new(CacheD::new(test_config_builder().build()));
1456        cached.put_with_weight("topic", "microservices", 50).unwrap().handle().await;
1457        cached.put("cache", "cached").unwrap().handle().await;
1458
1459        let thread_handles = (1..=10).map(|_| {
1460            thread::spawn({
1461                let cached = cached.clone();
1462                move || {
1463                    cached.shutdown();
1464                }
1465            })
1466        }).collect::<Vec<_>>();
1467        for handle in thread_handles {
1468            handle.join().unwrap();
1469        }
1470
1471        assert!(cached.is_shutting_down.load(Ordering::Acquire));
1472
1473        let put_result = cached.put("storage", "cached");
1474        assert!(put_result.is_err());
1475    }
1476
1477    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1478    async fn should_not_block_on_shutdown() {
1479        let config_builder = ConfigBuilder::new(1000, 100, 1_000_000);
1480        let cached = Arc::new(CacheD::new(config_builder.build()));
1481
1482        let task_handles = (1..=50).map(|index| {
1483            let cached_clone = cached.clone();
1484            tokio::spawn(
1485                async move {
1486                    let start_index = index * 10;
1487                    let end_index = start_index + 10;
1488
1489                    for count in start_index..end_index {
1490                        let put_result = cached_clone.put(count, count * 10);
1491                        if let Ok(result) = put_result {
1492                            timeout(Duration::from_secs(1), result.handle()).await.unwrap();
1493                        }
1494                        sleep(Duration::from_millis(2)).await;
1495                    }
1496                }
1497            )
1498        }).collect::<Vec<_>>();
1499
1500        let cached_clone = cached.clone();
1501        let shutdown_handle = tokio::spawn(
1502            async move {
1503                sleep(Duration::from_millis(8)).await;
1504                cached_clone.shutdown();
1505            }
1506        );
1507        for handle in task_handles {
1508            handle.await.unwrap()
1509        }
1510        shutdown_handle.await.unwrap();
1511    }
1512
1513    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1514    async fn should_not_block_on_shutdown_with_limited_space() {
1515        let config_builder = ConfigBuilder::new(1000, 100, 1000);
1516        let cached = Arc::new(CacheD::new(config_builder.build()));
1517
1518        let task_handles = (1..=50).map(|index| {
1519            let cached_clone = cached.clone();
1520            tokio::spawn(
1521                async move {
1522                    let start_index = index * 10;
1523                    let end_index = start_index + 10;
1524
1525                    for count in start_index..end_index {
1526                        let put_result = cached_clone.put(count, count * 10);
1527                        if let Ok(result) = put_result {
1528                            timeout(Duration::from_secs(1), result.handle()).await.unwrap();
1529                        }
1530                        sleep(Duration::from_millis(2)).await;
1531                    }
1532                }
1533            )
1534        }).collect::<Vec<_>>();
1535
1536        let cached_clone = cached.clone();
1537        let shutdown_handle = tokio::spawn(
1538            async move {
1539                sleep(Duration::from_millis(8)).await;
1540                cached_clone.shutdown();
1541            }
1542        );
1543        for handle in task_handles {
1544            handle.await.unwrap()
1545        }
1546        shutdown_handle.await.unwrap();
1547    }
1548}
1549
1550#[cfg(test)]
1551mod put_or_update_tests {
1552    use std::ops::Add;
1553    use std::time::Duration;
1554
1555    use crate::cache::cached::CacheD;
1556    use crate::cache::cached::put_or_update_tests::setup::UnixEpochClock;
1557    use crate::cache::clock::ClockType;
1558    use crate::cache::config::ConfigBuilder;
1559    use crate::cache::put_or_update::PutOrUpdateRequestBuilder;
1560    use crate::cache::types::Weight;
1561
1562    mod setup {
1563        use std::time::SystemTime;
1564
1565        use crate::cache::clock::Clock;
1566
1567        #[derive(Clone)]
1568        pub(crate) struct UnixEpochClock;
1569
1570        impl Clock for UnixEpochClock {
1571            fn now(&self) -> SystemTime {
1572                SystemTime::UNIX_EPOCH
1573            }
1574        }
1575    }
1576
1577    fn test_config_builder() -> ConfigBuilder<&'static str, &'static str> {
1578        ConfigBuilder::new(100, 10, 100)
1579    }
1580
1581    #[tokio::test]
1582    async fn put_or_update_a_non_existing_key_value() {
1583        let cached = CacheD::new(test_config_builder().build());
1584
1585        let acknowledgement =
1586            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("microservices").build()).unwrap();
1587        acknowledgement.handle().await;
1588
1589        let value = cached.get_ref(&"topic");
1590        let value_ref = value.unwrap();
1591        let stored_value = value_ref.value();
1592
1593        assert_eq!("microservices", stored_value.value());
1594    }
1595
1596    #[tokio::test]
1597    async fn put_or_update_a_non_existing_key_value_with_weight() {
1598        let cached = CacheD::new(test_config_builder().build());
1599
1600        let acknowledgement =
1601            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("microservices").weight(33).build()).unwrap();
1602        acknowledgement.handle().await;
1603
1604        let value = cached.get_ref(&"topic");
1605        let value_ref = value.unwrap();
1606        let stored_value = value_ref.value();
1607        let key_id = stored_value.key_id();
1608
1609        assert_eq!("microservices", stored_value.value());
1610        assert_eq!(Some(33), cached.admission_policy.weight_of(&key_id));
1611    }
1612
1613    #[tokio::test]
1614    async fn put_or_update_a_non_existing_key_value_with_time_to_live() {
1615        let clock: ClockType = Box::new(UnixEpochClock {});
1616        let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1617
1618        let acknowledgement =
1619            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("microservices").weight(10).time_to_live(Duration::from_secs(10)).build()).unwrap();
1620        acknowledgement.handle().await;
1621
1622        let value = cached.get_ref(&"topic");
1623        let value_ref = value.unwrap();
1624        let stored_value = value_ref.value();
1625        let key_id = stored_value.key_id();
1626
1627        assert_eq!(Some(clock.now().add(Duration::from_secs(10))), stored_value.expire_after());
1628        assert_eq!("microservices", stored_value.value());
1629        assert_eq!(Some(10), cached.admission_policy.weight_of(&key_id));
1630    }
1631
1632    #[tokio::test]
1633    async fn update_the_value_of_an_existing_key() {
1634        let cached = CacheD::new(test_config_builder().build());
1635
1636        let acknowledgement =
1637            cached.put("topic", "microservices").unwrap();
1638        acknowledgement.handle().await;
1639
1640        let acknowledgement =
1641            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("storage engine").build()).unwrap();
1642        acknowledgement.handle().await;
1643
1644        let value = cached.get_ref(&"topic");
1645        let value_ref = value.unwrap();
1646        let stored_value = value_ref.value();
1647
1648        assert_eq!("storage engine", stored_value.value());
1649    }
1650
1651    #[tokio::test]
1652    async fn update_the_weight_of_an_existing_key() {
1653        let cached = CacheD::new(test_config_builder().build());
1654
1655        let acknowledgement =
1656            cached.put("topic", "microservices").unwrap();
1657        acknowledgement.handle().await;
1658
1659        let acknowledgement =
1660            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").weight(29).build()).unwrap();
1661        acknowledgement.handle().await;
1662
1663        let value = cached.get_ref(&"topic");
1664        let value_ref = value.unwrap();
1665        let stored_value = value_ref.value();
1666        let key_id = stored_value.key_id();
1667
1668        assert_eq!("microservices", stored_value.value());
1669        assert_eq!(Some(29), cached.admission_policy.weight_of(&key_id));
1670    }
1671
1672    #[tokio::test]
1673    async fn update_the_time_to_live_of_an_existing_key_with_original_key_not_having_time_to_live() {
1674        let clock: ClockType = Box::new(UnixEpochClock {});
1675        let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1676
1677        let acknowledgement =
1678            cached.put("topic", "microservices").unwrap();
1679        acknowledgement.handle().await;
1680
1681        let original_weight = weight_of(&cached, "topic");
1682
1683        let acknowledgement =
1684            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").time_to_live(Duration::from_secs(100)).build()).unwrap();
1685        acknowledgement.handle().await;
1686
1687        let value = cached.get_ref(&"topic");
1688        let value_ref = value.unwrap();
1689        let stored_value = value_ref.value();
1690        let key_id = stored_value.key_id();
1691
1692        assert_eq!("microservices", stored_value.value());
1693        assert_ne!(original_weight, cached.admission_policy.weight_of(&key_id));
1694
1695        assert_eq!(Some(clock.now().add(Duration::from_secs(100))), stored_value.expire_after());
1696        assert_eq!(stored_value.expire_after(), cached.ttl_ticker.get(&key_id, &stored_value.expire_after().unwrap()));
1697    }
1698
1699    #[tokio::test]
1700    async fn remove_the_time_to_live_of_an_existing_key() {
1701        let clock: ClockType = Box::new(UnixEpochClock {});
1702        let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1703
1704        let acknowledgement =
1705            cached.put_with_ttl("topic", "microservices", Duration::from_secs(100)).unwrap();
1706        acknowledgement.handle().await;
1707
1708        let original_weight = weight_of(&cached, "topic");
1709
1710        let acknowledgement =
1711            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").remove_time_to_live().build()).unwrap();
1712        acknowledgement.handle().await;
1713
1714        let value = cached.get_ref(&"topic");
1715        let value_ref = value.unwrap();
1716        let stored_value = value_ref.value();
1717        let key_id = stored_value.key_id();
1718
1719        assert_eq!("microservices", stored_value.value());
1720        assert_ne!(original_weight, cached.admission_policy.weight_of(&key_id));
1721
1722        assert_eq!(None, stored_value.expire_after());
1723    }
1724
1725    #[tokio::test]
1726    async fn add_the_time_to_live_of_an_existing_key() {
1727        let clock: ClockType = Box::new(UnixEpochClock {});
1728        let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1729
1730        let acknowledgement =
1731            cached.put("topic", "microservices").unwrap();
1732        acknowledgement.handle().await;
1733
1734        let original_weight = weight_of(&cached, "topic");
1735
1736        let acknowledgement =
1737            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").time_to_live(Duration::from_secs(120)).build()).unwrap();
1738        acknowledgement.handle().await;
1739
1740        let value = cached.get_ref(&"topic");
1741        let value_ref = value.unwrap();
1742        let stored_value = value_ref.value();
1743        let key_id = stored_value.key_id();
1744
1745        assert_eq!("microservices", stored_value.value());
1746        assert_ne!(original_weight, cached.admission_policy.weight_of(&key_id));
1747
1748        assert_eq!(Some(clock.now().add(Duration::from_secs(120))), stored_value.expire_after());
1749        assert_eq!(stored_value.expire_after(), cached.ttl_ticker.get(&key_id, &stored_value.expire_after().unwrap()));
1750    }
1751
1752    #[tokio::test]
1753    async fn update_the_value_and_time_to_live_of_an_existing_key() {
1754        let clock: ClockType = Box::new(UnixEpochClock {});
1755        let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1756
1757        let acknowledgement =
1758            cached.put("topic", "microservices").unwrap();
1759        acknowledgement.handle().await;
1760
1761        let original_weight = weight_of(&cached, "topic");
1762
1763        let acknowledgement =
1764            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("storage engine").time_to_live(Duration::from_secs(100)).build()).unwrap();
1765        acknowledgement.handle().await;
1766
1767        let value = cached.get_ref(&"topic");
1768        let value_ref = value.unwrap();
1769        let stored_value = value_ref.value();
1770        let key_id = stored_value.key_id();
1771
1772        assert_eq!("storage engine", stored_value.value());
1773        assert_ne!(original_weight, cached.admission_policy.weight_of(&key_id));
1774
1775        assert_eq!(Some(clock.now().add(Duration::from_secs(100))), stored_value.expire_after());
1776        assert_eq!(stored_value.expire_after(), cached.ttl_ticker.get(&key_id, &stored_value.expire_after().unwrap()));
1777    }
1778
1779    #[tokio::test]
1780    async fn update_the_value_and_remove_time_to_live_of_an_existing_key() {
1781        let clock: ClockType = Box::new(UnixEpochClock {});
1782        let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1783
1784        let acknowledgement =
1785            cached.put_with_ttl("topic", "microservices", Duration::from_secs(100)).unwrap();
1786        acknowledgement.handle().await;
1787
1788        let original_weight = weight_of(&cached, "topic");
1789
1790        let acknowledgement =
1791            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("storage engine").remove_time_to_live().build()).unwrap();
1792        acknowledgement.handle().await;
1793
1794        let value = cached.get_ref(&"topic");
1795        let value_ref = value.unwrap();
1796        let stored_value = value_ref.value();
1797        let key_id = stored_value.key_id();
1798
1799        let new_weight = cached.admission_policy.weight_of(&key_id);
1800        assert_eq!("storage engine", stored_value.value());
1801        assert_ne!(original_weight, new_weight);
1802        assert!(new_weight < original_weight);
1803
1804        assert_eq!(None, stored_value.expire_after());
1805    }
1806
1807    #[tokio::test]
1808    async fn update_the_value_weight_and_remove_time_to_live_of_an_existing_key() {
1809        let clock: ClockType = Box::new(UnixEpochClock {});
1810        let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1811
1812        let acknowledgement =
1813            cached.put_with_ttl("topic", "microservices", Duration::from_secs(100)).unwrap();
1814        acknowledgement.handle().await;
1815
1816        let original_weight = weight_of(&cached, "topic");
1817
1818        let acknowledgement =
1819            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").value("storage engine").weight(300).remove_time_to_live().build()).unwrap();
1820        acknowledgement.handle().await;
1821
1822        let value = cached.get_ref(&"topic");
1823        let value_ref = value.unwrap();
1824        let stored_value = value_ref.value();
1825        let key_id = stored_value.key_id();
1826
1827        let new_weight = cached.admission_policy.weight_of(&key_id);
1828        assert_eq!("storage engine", stored_value.value());
1829        assert_ne!(original_weight, new_weight);
1830        assert_eq!(Some(300), new_weight);
1831
1832        assert_eq!(None, stored_value.expire_after());
1833    }
1834
1835    #[tokio::test]
1836    async fn update_the_time_to_live_of_an_existing_key() {
1837        let clock: ClockType = Box::new(UnixEpochClock {});
1838        let cached = CacheD::new(test_config_builder().clock(clock.clone_box()).build());
1839
1840        let acknowledgement =
1841            cached.put_with_ttl("topic", "microservices", Duration::from_secs(100)).unwrap();
1842        acknowledgement.handle().await;
1843
1844        let original_weight = weight_of(&cached, "topic");
1845
1846        let acknowledgement =
1847            cached.put_or_update(PutOrUpdateRequestBuilder::new("topic").time_to_live(Duration::from_secs(500)).build()).unwrap();
1848        acknowledgement.handle().await;
1849
1850        let value = cached.get_ref(&"topic");
1851        let value_ref = value.unwrap();
1852        let stored_value = value_ref.value();
1853        let key_id = stored_value.key_id();
1854
1855        let new_weight = cached.admission_policy.weight_of(&key_id);
1856        assert_eq!("microservices", stored_value.value());
1857        assert_eq!(original_weight, new_weight);
1858    }
1859
1860    fn weight_of(cached: &CacheD<&str, &str>, key: &'static str) -> Option<Weight> {
1861        let value = cached.get_ref(&key);
1862        let value_ref = value.unwrap();
1863        let stored_value = value_ref.value();
1864        let key_id = stored_value.key_id();
1865
1866        cached.admission_policy.weight_of(&key_id)
1867    }
1868}