tinylfu_cached/cache/command/
command_executor.rs

1use std::hash::Hash;
2use std::sync::Arc;
3use std::thread;
4use std::time::Duration;
5
6use crossbeam_channel::Receiver;
7use log::{error, info};
8
9use crate::cache::command::{CommandStatus, CommandType};
10use crate::cache::command::acknowledgement::CommandAcknowledgement;
11use crate::cache::command::error::CommandSendError;
12use crate::cache::command::RejectionReason::KeyDoesNotExist;
13use crate::cache::expiration::TTLTicker;
14use crate::cache::key_description::KeyDescription;
15use crate::cache::policy::admission_policy::AdmissionPolicy;
16use crate::cache::stats::ConcurrentStatsCounter;
17use crate::cache::store::Store;
18
19/// Every write operation like `put`, `put_or_update` and `delete` is returned a [`crate::cache::command::command_executor::CommandSendResult`] that
20/// wraps an instance of [`crate::cache::command::acknowledgement::CommandAcknowledgement`] and a [`crate::cache::command::error::CommandSendError`]
21pub type CommandSendResult = Result<Arc<CommandAcknowledgement>, CommandSendError>;
22
23pub(crate) fn shutdown_result() -> CommandSendResult {
24    Err(CommandSendError::shutdown())
25}
26
27/// CommandExecutor executes various commands of type `crate::cache::command::CommandType`.
28/// CommandExecutor spins a thread when it is instantiated and starts receiving commands from the `crossbeam_channel::Receiver`.
29/// The command is wrapped in an abstraction `CommandAcknowledgementPair` that combines `CommandType` and `CommandAcknowledgement`
30/// Execution of a command typically involves interacting with `crate::cache::policy::admission_policy::AdmissionPolicy`,
31/// `crate::cache::store::Store` and `crate::cache::expiration::TTLTicker`
32pub(crate) struct CommandExecutor<Key, Value>
33    where Key: Hash + Eq + Send + Sync + Clone + 'static,
34          Value: Send + Sync + 'static {
35    sender: crossbeam_channel::Sender<CommandAcknowledgementPair<Key, Value>>,
36}
37
38struct CommandAcknowledgementPair<Key, Value>
39    where Key: Hash + Eq + Clone {
40    command: CommandType<Key, Value>,
41    acknowledgement: Arc<CommandAcknowledgement>,
42}
43
44struct PutParameter<'a, Key, Value, DeleteHook>
45    where Key: Hash + Eq + Send + Sync + Clone + 'static,
46          Value: Send + Sync + 'static,
47          DeleteHook: Fn(Key) {
48    store: &'a Arc<Store<Key, Value>>,
49    key_description: &'a KeyDescription<Key>,
50    delete_hook: &'a DeleteHook,
51    value: Value,
52    admission_policy: &'a Arc<AdmissionPolicy<Key>>,
53    stats_counter: &'a Arc<ConcurrentStatsCounter>,
54}
55
56struct PutWithTTLParameter<'a, Key, Value, DeleteHook>
57    where Key: Hash + Eq + Send + Sync + Clone + 'static,
58          Value: Send + Sync + 'static,
59          DeleteHook: Fn(Key) {
60    put_parameter: PutParameter<'a, Key, Value, DeleteHook>,
61    ttl: Duration,
62    ttl_ticker: &'a Arc<TTLTicker>,
63}
64
65struct DeleteParameter<'a, Key, Value>
66    where Key: Hash + Eq + Send + Sync + Clone + 'static {
67    store: &'a Arc<Store<Key, Value>>,
68    key: &'a Key,
69    admission_policy: &'a Arc<AdmissionPolicy<Key>>,
70    ttl_ticker: &'a Arc<TTLTicker>,
71}
72
73impl<Key, Value> CommandExecutor<Key, Value>
74    where Key: Hash + Eq + Send + Sync + Clone + 'static,
75          Value: Send + Sync + 'static {
76    pub(crate) fn new(
77        store: Arc<Store<Key, Value>>,
78        admission_policy: Arc<AdmissionPolicy<Key>>,
79        stats_counter: Arc<ConcurrentStatsCounter>,
80        ttl_ticker: Arc<TTLTicker>,
81        command_channel_size: usize) -> Self {
82        let (sender, receiver) = crossbeam_channel::bounded(command_channel_size);
83        let command_executor = CommandExecutor { sender };
84
85        command_executor.spin(receiver, store, admission_policy, stats_counter, ttl_ticker);
86        command_executor
87    }
88
89    /// Spins a thread when `CommandExecutor` is instantiated.
90    /// The thread receives a command wrapped in `CommandAcknowledgementPair` from the [`crossbeam_channel::Receiver<T>`].
91    /// It identifies the command and performs an appropriate action.
92    /// Execution of a command typically involves interacting with `crate::cache::policy::admission_policy::AdmissionPolicy`,
93    /// `crate::cache::store::Store` and `crate::cache::expiration::TTLTicker`.
94    /// Handling `Shutdown` command is a little tricky. There exists a race condition kind of a case:
95    /// Consider that `shutdown()` and `put()` on an instance of `Cached` are invoked at the same time.
96    /// Both these operations result in sending different commands to `CommandExecutor`.
97    /// Somehow, the `Shutdown` command goes in before the `put` command.
98    /// This also means that the client could have performed `await` operation on the `CommandAcknowledgement` of the `put` command.
99    /// It is essential to complete the future that the client is awaiting on. That is what the `Shutdown` command does.
100    /// It drains the `receiver` and marks the status of the CommandAcknowledgement as `CommandStatus::ShuttingDown`.
101    /// The client(s) awaiting on the future will receive `CommandStatus::ShuttingDown`.
102    fn spin(&self,
103            receiver: Receiver<CommandAcknowledgementPair<Key, Value>>,
104            store: Arc<Store<Key, Value>>,
105            admission_policy: Arc<AdmissionPolicy<Key>>,
106            stats_counter: Arc<ConcurrentStatsCounter>,
107            ttl_ticker: Arc<TTLTicker>) {
108        let store_clone = store.clone();
109        let delete_hook = move |key| { store_clone.delete(&key); };
110
111        thread::spawn(move || {
112            while let Ok(pair) = receiver.recv() {
113                let command = pair.command;
114                let status = match command {
115                    CommandType::Put(key_description, value) =>
116                        Self::put(PutParameter {
117                            store: &store,
118                            key_description: &key_description,
119                            delete_hook: &delete_hook,
120                            value,
121                            admission_policy: &admission_policy,
122                            stats_counter: &stats_counter,
123                        }),
124                    CommandType::PutWithTTL(key_description, value, ttl) =>
125                        Self::put_with_ttl(PutWithTTLParameter {
126                            put_parameter: PutParameter {
127                                store: &store,
128                                key_description: &key_description,
129                                delete_hook: &delete_hook,
130                                value,
131                                admission_policy: &admission_policy,
132                                stats_counter: &stats_counter,
133                            },
134                            ttl,
135                            ttl_ticker: &ttl_ticker,
136                        }),
137                    CommandType::UpdateWeight(key_id, weight) => {
138                        admission_policy.update(&key_id, weight);
139                        CommandStatus::Accepted
140                    }
141                    CommandType::Delete(key) =>
142                        Self::delete(DeleteParameter {
143                            store: &store,
144                            key: &key,
145                            admission_policy: &admission_policy,
146                            ttl_ticker: &ttl_ticker,
147                        }),
148                    CommandType::Shutdown => {
149                        info!("Received Shutdown command");
150                        pair.acknowledgement.done(CommandStatus::Accepted);
151                        for command_acknowledgement_pair in receiver.iter() {
152                            command_acknowledgement_pair.acknowledgement.done(CommandStatus::ShuttingDown);
153                        }
154                        drop(receiver);
155                        break;
156                    }
157                };
158                pair.acknowledgement.done(status);
159            }
160        });
161    }
162
163    /// Sends a command to the `CommandExecutor`. Every Command is wrapped in a `CommandAcknowledgementPair`
164    /// that allows 2 things:
165    /// 1) It allows returning an instance of `CommandAcknowledgement` to the clients, so that they can perform `await`
166    /// 2) It allows `CommandExecutor` to change the status of the command inside `CommandAcknowledgement`. This would then finish the `await` at the client's end.
167    pub(crate) fn send(&self, command: CommandType<Key, Value>) -> CommandSendResult {
168        let acknowledgement = CommandAcknowledgement::new();
169        let send_result = self.sender.send(CommandAcknowledgementPair {
170            command,
171            acknowledgement: acknowledgement.clone(),
172        });
173
174        match send_result {
175            Ok(_) => Ok(acknowledgement),
176            Err(err) => {
177                error!("received a SendError while sending command type {}", err.0.command.description());
178                Err(CommandSendError::new(err.0.command.description()))
179            }
180        }
181    }
182
183    /// Sends a Shutdown command to the `CommandExecutor`.
184    pub(crate) fn shutdown(&self) -> CommandSendResult {
185        self.send(CommandType::Shutdown)
186    }
187
188    fn put<DeleteHook>(put_parameters: PutParameter<Key, Value, DeleteHook>) -> CommandStatus where DeleteHook: Fn(Key) {
189        let status = put_parameters.admission_policy.maybe_add(
190            put_parameters.key_description,
191            put_parameters.delete_hook,
192        );
193        if let CommandStatus::Accepted = status {
194            put_parameters.store.put(
195                put_parameters.key_description.clone_key(),
196                put_parameters.value,
197                put_parameters.key_description.id,
198            );
199        } else {
200            put_parameters.stats_counter.reject_key();
201        }
202        status
203    }
204
205    fn put_with_ttl<DeleteHook>(put_with_ttl_parameter: PutWithTTLParameter<Key, Value, DeleteHook>) -> CommandStatus where DeleteHook: Fn(Key) {
206        let status = put_with_ttl_parameter.put_parameter.admission_policy.maybe_add(
207            put_with_ttl_parameter.put_parameter.key_description,
208            put_with_ttl_parameter.put_parameter.delete_hook,
209        );
210        if let CommandStatus::Accepted = status {
211            let expiry = put_with_ttl_parameter.put_parameter.store.put_with_ttl(
212                put_with_ttl_parameter.put_parameter.key_description.clone_key(),
213                put_with_ttl_parameter.put_parameter.value,
214                put_with_ttl_parameter.put_parameter.key_description.id,
215                put_with_ttl_parameter.ttl,
216            );
217            put_with_ttl_parameter.ttl_ticker.put(
218                put_with_ttl_parameter.put_parameter.key_description.id,
219                expiry,
220            );
221        } else {
222            put_with_ttl_parameter.put_parameter.stats_counter.reject_key();
223        }
224        status
225    }
226
227    fn delete(delete_parameter: DeleteParameter<Key, Value>) -> CommandStatus {
228        let may_be_key_id_expiry = delete_parameter.store.delete(delete_parameter.key);
229        if let Some(key_id_expiry) = may_be_key_id_expiry {
230            delete_parameter.admission_policy.delete(&key_id_expiry.0);
231            if let Some(expiry) = key_id_expiry.1 {
232                delete_parameter.ttl_ticker.delete(&key_id_expiry.0, &expiry);
233            }
234            return CommandStatus::Accepted;
235        }
236        CommandStatus::Rejected(KeyDoesNotExist)
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use std::sync::Arc;
243    use std::thread;
244    use std::time::Duration;
245
246    use crate::cache::clock::{ClockType, SystemClock};
247    use crate::cache::command::{CommandStatus, CommandType};
248    use crate::cache::command::command_executor::{CommandExecutor, shutdown_result};
249    use crate::cache::command::RejectionReason::{KeyDoesNotExist, KeyWeightIsGreaterThanCacheWeight};
250    use crate::cache::expiration::config::TTLConfig;
251    use crate::cache::expiration::TTLTicker;
252    use crate::cache::key_description::KeyDescription;
253    use crate::cache::policy::admission_policy::AdmissionPolicy;
254    use crate::cache::policy::config::CacheWeightConfig;
255    use crate::cache::stats::ConcurrentStatsCounter;
256    use crate::cache::store::Store;
257
258    fn no_action_ttl_ticker() -> Arc<TTLTicker> {
259        TTLTicker::new(TTLConfig::new(4, Duration::from_secs(300), SystemClock::boxed()), |_key_id| {})
260    }
261
262    fn test_store(clock: ClockType, stats_counter: Arc<ConcurrentStatsCounter>) -> Arc<Store<&'static str, &'static str>> {
263        Store::new(clock, stats_counter, 16, 4)
264    }
265
266    fn test_cache_weight_config() -> CacheWeightConfig {
267        CacheWeightConfig::new(100, 4, 100)
268    }
269
270    mod setup {
271        use std::time::SystemTime;
272
273        use crate::cache::clock::Clock;
274
275        #[derive(Clone)]
276        pub(crate) struct UnixEpochClock;
277
278        impl Clock for UnixEpochClock {
279            fn now(&self) -> SystemTime {
280                SystemTime::UNIX_EPOCH
281            }
282        }
283    }
284
285    #[test]
286    fn result_on_shutdown() {
287        let result = shutdown_result();
288        assert!(result.is_err());
289    }
290
291    #[tokio::test]
292    async fn puts_a_key_value_after_shutdown_with_delay() {
293        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
294        let store = test_store(SystemClock::boxed(), stats_counter.clone());
295        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
296
297        let command_executor = CommandExecutor::new(
298            store.clone(),
299            admission_policy,
300            stats_counter,
301            no_action_ttl_ticker(),
302            10,
303        );
304        command_executor.shutdown().unwrap().handle().await;
305
306        thread::sleep(Duration::from_secs(1));
307
308        let send_result = command_executor.send(CommandType::Put(
309            KeyDescription::new("topic", 1, 1029, 10),
310            "microservices",
311        ));
312
313        assert!(send_result.is_err() || send_result.unwrap().handle().await == CommandStatus::ShuttingDown);
314    }
315
316    #[tokio::test]
317    async fn puts_a_key_value_after_shutdown() {
318        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
319        let store = test_store(SystemClock::boxed(), stats_counter.clone());
320        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
321
322        let command_executor = CommandExecutor::new(
323            store.clone(),
324            admission_policy,
325            stats_counter,
326            no_action_ttl_ticker(),
327            10,
328        );
329        command_executor.shutdown().unwrap().handle().await;
330
331        let send_result = command_executor.send(CommandType::Put(
332            KeyDescription::new("topic", 1, 1029, 10),
333            "microservices",
334        ));
335        assert!(send_result.is_err() || send_result.unwrap().handle().await == CommandStatus::ShuttingDown);
336    }
337
338    #[tokio::test]
339    async fn puts_a_key_value() {
340        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
341        let store = test_store(SystemClock::boxed(), stats_counter.clone());
342        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
343
344        let command_executor = CommandExecutor::new(
345            store.clone(),
346            admission_policy,
347            stats_counter,
348            no_action_ttl_ticker(),
349            10,
350        );
351
352        let command_acknowledgement = command_executor.send(CommandType::Put(
353            KeyDescription::new("topic", 1, 1029, 10),
354            "microservices",
355        )).unwrap();
356        command_acknowledgement.handle().await;
357
358        command_executor.shutdown().unwrap().handle().await;
359        assert_eq!(Some("microservices"), store.get(&"topic"));
360    }
361
362    #[tokio::test]
363    async fn key_value_gets_rejected_given_its_weight_is_more_than_the_cache_weight() {
364        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
365        let store = test_store(SystemClock::boxed(), stats_counter.clone());
366        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
367
368        let command_executor = CommandExecutor::new(
369            store.clone(),
370            admission_policy,
371            stats_counter.clone(),
372            no_action_ttl_ticker(),
373            10,
374        );
375
376        let command_acknowledgement = command_executor.send(CommandType::Put(
377            KeyDescription::new("topic", 1, 1029, 200),
378            "microservices",
379        )).unwrap();
380        let status = command_acknowledgement.handle().await;
381
382        command_executor.shutdown().unwrap().handle().await;
383        assert_eq!(None, store.get(&"topic"));
384        assert_eq!(CommandStatus::Rejected(KeyWeightIsGreaterThanCacheWeight), status);
385    }
386
387    #[tokio::test]
388    async fn rejects_a_key_value_and_increase_stats() {
389        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
390        let store = test_store(SystemClock::boxed(), stats_counter.clone());
391        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
392
393        let command_executor = CommandExecutor::new(
394            store.clone(),
395            admission_policy,
396            stats_counter.clone(),
397            no_action_ttl_ticker(),
398            10,
399        );
400
401        let command_acknowledgement = command_executor.send(CommandType::Put(
402            KeyDescription::new("topic", 1, 1029, 200),
403            "microservices",
404        )).unwrap();
405        let status = command_acknowledgement.handle().await;
406
407        command_executor.shutdown().unwrap().handle().await;
408        assert_eq!(CommandStatus::Rejected(KeyWeightIsGreaterThanCacheWeight), status);
409        assert_eq!(1, stats_counter.keys_rejected());
410    }
411
412    #[tokio::test]
413    async fn puts_a_couple_of_key_values() {
414        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
415        let store = test_store(SystemClock::boxed(), stats_counter.clone());
416        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
417
418        let command_executor = CommandExecutor::new(
419            store.clone(),
420            admission_policy,
421            stats_counter,
422            no_action_ttl_ticker(),
423            10,
424        );
425
426        let acknowledgement = command_executor.send(CommandType::Put(
427            KeyDescription::new("topic", 1, 1029, 10),
428            "microservices",
429        )).unwrap();
430        let other_acknowledgment = command_executor.send(CommandType::Put(
431            KeyDescription::new("disk", 2, 2076, 3),
432            "SSD",
433        )).unwrap();
434        acknowledgement.handle().await;
435        other_acknowledgment.handle().await;
436
437        command_executor.shutdown().unwrap().handle().await;
438        assert_eq!(Some("microservices"), store.get(&"topic"));
439        assert_eq!(Some("SSD"), store.get(&"disk"));
440    }
441
442    #[tokio::test]
443    async fn puts_a_key_value_with_ttl() {
444        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
445        let store = test_store(SystemClock::boxed(), stats_counter.clone());
446        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
447
448        let ttl_ticker = no_action_ttl_ticker();
449        let command_executor = CommandExecutor::new(
450            store.clone(),
451            admission_policy,
452            stats_counter,
453            ttl_ticker.clone(),
454            10,
455        );
456
457        let acknowledgement = command_executor.send(CommandType::PutWithTTL(
458            KeyDescription::new("topic", 1, 1029, 10),
459            "microservices",
460            Duration::from_secs(10),
461        )).unwrap();
462        acknowledgement.handle().await;
463
464        command_executor.shutdown().unwrap().handle().await;
465        assert_eq!(Some("microservices"), store.get(&"topic"));
466
467        let expiry = store.get_ref(&"topic").unwrap().value().expire_after().unwrap();
468        let expiry_in_ttl_ticker = ttl_ticker.get(&1, &expiry).unwrap();
469
470        assert_eq!(expiry, expiry_in_ttl_ticker);
471    }
472
473    #[tokio::test]
474    async fn rejects_a_key_value_with_ttl_and_increase_stats() {
475        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
476        let store = test_store(SystemClock::boxed(), stats_counter.clone());
477        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
478
479        let command_executor = CommandExecutor::new(
480            store.clone(),
481            admission_policy,
482            stats_counter.clone(),
483            no_action_ttl_ticker(),
484            10,
485        );
486
487        let acknowledgement = command_executor.send(CommandType::PutWithTTL(
488            KeyDescription::new("topic", 1, 1029, 4000),
489            "microservices",
490            Duration::from_secs(10),
491        )).unwrap();
492        acknowledgement.handle().await;
493
494        command_executor.shutdown().unwrap().handle().await;
495        assert_eq!(1, stats_counter.keys_rejected());
496    }
497
498    #[tokio::test]
499    async fn deletes_a_key() {
500        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
501        let store = test_store(SystemClock::boxed(), stats_counter.clone());
502        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
503        let ttl_ticker = no_action_ttl_ticker();
504
505        let command_executor = CommandExecutor::new(
506            store.clone(),
507            admission_policy,
508            stats_counter,
509            ttl_ticker.clone(),
510            10,
511        );
512
513        let acknowledgement = command_executor.send(CommandType::PutWithTTL(
514            KeyDescription::new("topic", 10, 1029, 10),
515            "microservices",
516            Duration::from_secs(10),
517        )).unwrap();
518        acknowledgement.handle().await;
519
520        let expiry = store.get_ref(&"topic").unwrap().value().expire_after().unwrap();
521        let expiry_in_ttl_ticker = ttl_ticker.get(&10, &expiry).unwrap();
522
523        assert_eq!(Some("microservices"), store.get(&"topic"));
524        assert_eq!(expiry, expiry_in_ttl_ticker);
525
526        let acknowledgement =
527            command_executor.send(CommandType::Delete("topic")).unwrap();
528        acknowledgement.handle().await;
529
530        command_executor.shutdown().unwrap().handle().await;
531        assert_eq!(None, store.get(&"topic"));
532        assert_eq!(None, ttl_ticker.get(&10, &expiry));
533    }
534
535    #[tokio::test]
536    async fn deletion_of_a_non_existing_key_value_gets_rejected() {
537        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
538        let store= test_store(SystemClock::boxed(), stats_counter.clone());
539        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
540
541        let command_executor = CommandExecutor::new(
542            store.clone(),
543            admission_policy,
544            stats_counter,
545            no_action_ttl_ticker(),
546            10,
547        );
548
549        let acknowledgement =
550            command_executor.send(CommandType::Delete("non-existing")).unwrap();
551        let status = acknowledgement.handle().await;
552
553        command_executor.shutdown().unwrap().handle().await;
554        assert_eq!(CommandStatus::Rejected(KeyDoesNotExist), status);
555    }
556}
557
558#[cfg(test)]
559mod sociable_tests {
560    use std::sync::Arc;
561    use std::thread;
562    use std::time::Duration;
563
564    use crate::cache::buffer_event::{BufferConsumer, BufferEvent};
565    use crate::cache::clock::{ClockType, SystemClock};
566    use crate::cache::command::{CommandStatus, CommandType};
567    use crate::cache::command::command_executor::CommandExecutor;
568    use crate::cache::command::command_executor::Store;
569    use crate::cache::expiration::config::TTLConfig;
570    use crate::cache::expiration::TTLTicker;
571    use crate::cache::key_description::KeyDescription;
572    use crate::cache::policy::admission_policy::AdmissionPolicy;
573    use crate::cache::policy::config::CacheWeightConfig;
574    use crate::cache::stats::ConcurrentStatsCounter;
575
576    fn no_action_ttl_ticker() -> Arc<TTLTicker> {
577        TTLTicker::new(TTLConfig::new(4, Duration::from_secs(300), SystemClock::boxed()), |_key_id| {})
578    }
579
580    fn test_store(clock: ClockType, stats_counter: Arc<ConcurrentStatsCounter>) -> Arc<Store<&'static str, &'static str>> {
581        Store::new(clock, stats_counter, 16, 4)
582    }
583
584    fn test_cache_weight_config() -> CacheWeightConfig {
585        CacheWeightConfig::new(100, 4, 100)
586    }
587
588    #[tokio::test]
589    async fn puts_a_key_value() {
590        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
591        let store = test_store(SystemClock::boxed(), stats_counter.clone());
592        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
593
594        let command_executor = CommandExecutor::new(
595            store.clone(),
596            admission_policy.clone(),
597            stats_counter,
598            no_action_ttl_ticker(),
599            10,
600        );
601
602        let key_description = KeyDescription::new("topic", 1, 1029, 10);
603        let key_id = key_description.id;
604        let command_acknowledgement = command_executor.send(CommandType::Put(
605            key_description,
606            "microservices",
607        )).unwrap();
608        command_acknowledgement.handle().await;
609
610        command_executor.shutdown().unwrap().handle().await;
611        assert_eq!(Some("microservices"), store.get(&"topic"));
612        assert!(admission_policy.contains(&key_id));
613    }
614
615    #[tokio::test]
616    async fn puts_a_key_value_by_eliminating_victims() {
617        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
618        let store = test_store(SystemClock::boxed(), stats_counter.clone());
619        let cache_weight_config = CacheWeightConfig::new(100, 4, 10);
620        let admission_policy = Arc::new(AdmissionPolicy::new(10, cache_weight_config, stats_counter.clone()));
621
622        let key_hashes = vec![10, 14, 116];
623        admission_policy.accept(BufferEvent::Full(key_hashes));
624        thread::sleep(Duration::from_secs(1));
625
626        let command_executor = CommandExecutor::new(
627            store.clone(),
628            admission_policy.clone(),
629            stats_counter,
630            no_action_ttl_ticker(),
631            10,
632        );
633
634        let command_acknowledgement = command_executor.send(CommandType::Put(
635            KeyDescription::new("topic", 1, 10, 5),
636            "microservices",
637        )).unwrap();
638        let status = command_acknowledgement.handle().await;
639        assert_eq!(CommandStatus::Accepted, status);
640
641        let command_acknowledgement = command_executor.send(CommandType::Put(
642            KeyDescription::new("disk", 2, 14, 6),
643            "SSD",
644        )).unwrap();
645        let status = command_acknowledgement.handle().await;
646        assert_eq!(CommandStatus::Accepted, status);
647
648        command_executor.shutdown().unwrap().handle().await;
649
650        assert!(admission_policy.contains(&2));
651        assert_eq!(Some("SSD"), store.get(&"disk"));
652
653        assert!(!admission_policy.contains(&1));
654        assert_eq!(None, store.get(&"topic"));
655    }
656
657    #[tokio::test]
658    async fn deletes_a_key() {
659        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
660        let store = test_store(SystemClock::boxed(), stats_counter.clone());
661        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
662        let command_executor = CommandExecutor::new(
663            store.clone(),
664            admission_policy.clone(),
665            stats_counter,
666            no_action_ttl_ticker(),
667            10,
668        );
669
670        let acknowledgement = command_executor.send(CommandType::Put(
671            KeyDescription::new("topic", 1, 1029, 10),
672            "microservices",
673        )).unwrap();
674        acknowledgement.handle().await;
675
676        let acknowledgement =
677            command_executor.send(CommandType::Delete("topic")).unwrap();
678        acknowledgement.handle().await;
679
680        command_executor.shutdown().unwrap().handle().await;
681        assert_eq!(None, store.get(&"topic"));
682        assert!(!admission_policy.contains(&1));
683    }
684
685    #[tokio::test]
686    async fn updates_the_weight_of_the_key() {
687        let stats_counter = Arc::new(ConcurrentStatsCounter::new());
688        let store = test_store(SystemClock::boxed(), stats_counter.clone());
689        let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
690
691        let command_executor = CommandExecutor::new(
692            store.clone(),
693            admission_policy.clone(),
694            stats_counter,
695            no_action_ttl_ticker(),
696            10,
697        );
698
699        let key_description = KeyDescription::new("topic", 1, 1029, 10);
700        let key_id = key_description.id;
701        let command_acknowledgement = command_executor.send(CommandType::Put(
702            key_description,
703            "microservices",
704        )).unwrap();
705        command_acknowledgement.handle().await;
706
707        let command_acknowledgement = command_executor.send(CommandType::UpdateWeight(
708            1, 20)).unwrap();
709        command_acknowledgement.handle().await;
710
711        command_executor.shutdown().unwrap().handle().await;
712        assert_eq!(Some("microservices"), store.get(&"topic"));
713        assert_eq!(Some(20), admission_policy.weight_of(&key_id));
714    }
715}