1use std::hash::Hash;
2use std::sync::Arc;
3use std::thread;
4use std::time::Duration;
5
6use crossbeam_channel::Receiver;
7use log::{error, info};
8
9use crate::cache::command::{CommandStatus, CommandType};
10use crate::cache::command::acknowledgement::CommandAcknowledgement;
11use crate::cache::command::error::CommandSendError;
12use crate::cache::command::RejectionReason::KeyDoesNotExist;
13use crate::cache::expiration::TTLTicker;
14use crate::cache::key_description::KeyDescription;
15use crate::cache::policy::admission_policy::AdmissionPolicy;
16use crate::cache::stats::ConcurrentStatsCounter;
17use crate::cache::store::Store;
18
19pub type CommandSendResult = Result<Arc<CommandAcknowledgement>, CommandSendError>;
22
23pub(crate) fn shutdown_result() -> CommandSendResult {
24 Err(CommandSendError::shutdown())
25}
26
27pub(crate) struct CommandExecutor<Key, Value>
33 where Key: Hash + Eq + Send + Sync + Clone + 'static,
34 Value: Send + Sync + 'static {
35 sender: crossbeam_channel::Sender<CommandAcknowledgementPair<Key, Value>>,
36}
37
38struct CommandAcknowledgementPair<Key, Value>
39 where Key: Hash + Eq + Clone {
40 command: CommandType<Key, Value>,
41 acknowledgement: Arc<CommandAcknowledgement>,
42}
43
44struct PutParameter<'a, Key, Value, DeleteHook>
45 where Key: Hash + Eq + Send + Sync + Clone + 'static,
46 Value: Send + Sync + 'static,
47 DeleteHook: Fn(Key) {
48 store: &'a Arc<Store<Key, Value>>,
49 key_description: &'a KeyDescription<Key>,
50 delete_hook: &'a DeleteHook,
51 value: Value,
52 admission_policy: &'a Arc<AdmissionPolicy<Key>>,
53 stats_counter: &'a Arc<ConcurrentStatsCounter>,
54}
55
56struct PutWithTTLParameter<'a, Key, Value, DeleteHook>
57 where Key: Hash + Eq + Send + Sync + Clone + 'static,
58 Value: Send + Sync + 'static,
59 DeleteHook: Fn(Key) {
60 put_parameter: PutParameter<'a, Key, Value, DeleteHook>,
61 ttl: Duration,
62 ttl_ticker: &'a Arc<TTLTicker>,
63}
64
65struct DeleteParameter<'a, Key, Value>
66 where Key: Hash + Eq + Send + Sync + Clone + 'static {
67 store: &'a Arc<Store<Key, Value>>,
68 key: &'a Key,
69 admission_policy: &'a Arc<AdmissionPolicy<Key>>,
70 ttl_ticker: &'a Arc<TTLTicker>,
71}
72
73impl<Key, Value> CommandExecutor<Key, Value>
74 where Key: Hash + Eq + Send + Sync + Clone + 'static,
75 Value: Send + Sync + 'static {
76 pub(crate) fn new(
77 store: Arc<Store<Key, Value>>,
78 admission_policy: Arc<AdmissionPolicy<Key>>,
79 stats_counter: Arc<ConcurrentStatsCounter>,
80 ttl_ticker: Arc<TTLTicker>,
81 command_channel_size: usize) -> Self {
82 let (sender, receiver) = crossbeam_channel::bounded(command_channel_size);
83 let command_executor = CommandExecutor { sender };
84
85 command_executor.spin(receiver, store, admission_policy, stats_counter, ttl_ticker);
86 command_executor
87 }
88
89 fn spin(&self,
103 receiver: Receiver<CommandAcknowledgementPair<Key, Value>>,
104 store: Arc<Store<Key, Value>>,
105 admission_policy: Arc<AdmissionPolicy<Key>>,
106 stats_counter: Arc<ConcurrentStatsCounter>,
107 ttl_ticker: Arc<TTLTicker>) {
108 let store_clone = store.clone();
109 let delete_hook = move |key| { store_clone.delete(&key); };
110
111 thread::spawn(move || {
112 while let Ok(pair) = receiver.recv() {
113 let command = pair.command;
114 let status = match command {
115 CommandType::Put(key_description, value) =>
116 Self::put(PutParameter {
117 store: &store,
118 key_description: &key_description,
119 delete_hook: &delete_hook,
120 value,
121 admission_policy: &admission_policy,
122 stats_counter: &stats_counter,
123 }),
124 CommandType::PutWithTTL(key_description, value, ttl) =>
125 Self::put_with_ttl(PutWithTTLParameter {
126 put_parameter: PutParameter {
127 store: &store,
128 key_description: &key_description,
129 delete_hook: &delete_hook,
130 value,
131 admission_policy: &admission_policy,
132 stats_counter: &stats_counter,
133 },
134 ttl,
135 ttl_ticker: &ttl_ticker,
136 }),
137 CommandType::UpdateWeight(key_id, weight) => {
138 admission_policy.update(&key_id, weight);
139 CommandStatus::Accepted
140 }
141 CommandType::Delete(key) =>
142 Self::delete(DeleteParameter {
143 store: &store,
144 key: &key,
145 admission_policy: &admission_policy,
146 ttl_ticker: &ttl_ticker,
147 }),
148 CommandType::Shutdown => {
149 info!("Received Shutdown command");
150 pair.acknowledgement.done(CommandStatus::Accepted);
151 for command_acknowledgement_pair in receiver.iter() {
152 command_acknowledgement_pair.acknowledgement.done(CommandStatus::ShuttingDown);
153 }
154 drop(receiver);
155 break;
156 }
157 };
158 pair.acknowledgement.done(status);
159 }
160 });
161 }
162
163 pub(crate) fn send(&self, command: CommandType<Key, Value>) -> CommandSendResult {
168 let acknowledgement = CommandAcknowledgement::new();
169 let send_result = self.sender.send(CommandAcknowledgementPair {
170 command,
171 acknowledgement: acknowledgement.clone(),
172 });
173
174 match send_result {
175 Ok(_) => Ok(acknowledgement),
176 Err(err) => {
177 error!("received a SendError while sending command type {}", err.0.command.description());
178 Err(CommandSendError::new(err.0.command.description()))
179 }
180 }
181 }
182
183 pub(crate) fn shutdown(&self) -> CommandSendResult {
185 self.send(CommandType::Shutdown)
186 }
187
188 fn put<DeleteHook>(put_parameters: PutParameter<Key, Value, DeleteHook>) -> CommandStatus where DeleteHook: Fn(Key) {
189 let status = put_parameters.admission_policy.maybe_add(
190 put_parameters.key_description,
191 put_parameters.delete_hook,
192 );
193 if let CommandStatus::Accepted = status {
194 put_parameters.store.put(
195 put_parameters.key_description.clone_key(),
196 put_parameters.value,
197 put_parameters.key_description.id,
198 );
199 } else {
200 put_parameters.stats_counter.reject_key();
201 }
202 status
203 }
204
205 fn put_with_ttl<DeleteHook>(put_with_ttl_parameter: PutWithTTLParameter<Key, Value, DeleteHook>) -> CommandStatus where DeleteHook: Fn(Key) {
206 let status = put_with_ttl_parameter.put_parameter.admission_policy.maybe_add(
207 put_with_ttl_parameter.put_parameter.key_description,
208 put_with_ttl_parameter.put_parameter.delete_hook,
209 );
210 if let CommandStatus::Accepted = status {
211 let expiry = put_with_ttl_parameter.put_parameter.store.put_with_ttl(
212 put_with_ttl_parameter.put_parameter.key_description.clone_key(),
213 put_with_ttl_parameter.put_parameter.value,
214 put_with_ttl_parameter.put_parameter.key_description.id,
215 put_with_ttl_parameter.ttl,
216 );
217 put_with_ttl_parameter.ttl_ticker.put(
218 put_with_ttl_parameter.put_parameter.key_description.id,
219 expiry,
220 );
221 } else {
222 put_with_ttl_parameter.put_parameter.stats_counter.reject_key();
223 }
224 status
225 }
226
227 fn delete(delete_parameter: DeleteParameter<Key, Value>) -> CommandStatus {
228 let may_be_key_id_expiry = delete_parameter.store.delete(delete_parameter.key);
229 if let Some(key_id_expiry) = may_be_key_id_expiry {
230 delete_parameter.admission_policy.delete(&key_id_expiry.0);
231 if let Some(expiry) = key_id_expiry.1 {
232 delete_parameter.ttl_ticker.delete(&key_id_expiry.0, &expiry);
233 }
234 return CommandStatus::Accepted;
235 }
236 CommandStatus::Rejected(KeyDoesNotExist)
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use std::sync::Arc;
243 use std::thread;
244 use std::time::Duration;
245
246 use crate::cache::clock::{ClockType, SystemClock};
247 use crate::cache::command::{CommandStatus, CommandType};
248 use crate::cache::command::command_executor::{CommandExecutor, shutdown_result};
249 use crate::cache::command::RejectionReason::{KeyDoesNotExist, KeyWeightIsGreaterThanCacheWeight};
250 use crate::cache::expiration::config::TTLConfig;
251 use crate::cache::expiration::TTLTicker;
252 use crate::cache::key_description::KeyDescription;
253 use crate::cache::policy::admission_policy::AdmissionPolicy;
254 use crate::cache::policy::config::CacheWeightConfig;
255 use crate::cache::stats::ConcurrentStatsCounter;
256 use crate::cache::store::Store;
257
258 fn no_action_ttl_ticker() -> Arc<TTLTicker> {
259 TTLTicker::new(TTLConfig::new(4, Duration::from_secs(300), SystemClock::boxed()), |_key_id| {})
260 }
261
262 fn test_store(clock: ClockType, stats_counter: Arc<ConcurrentStatsCounter>) -> Arc<Store<&'static str, &'static str>> {
263 Store::new(clock, stats_counter, 16, 4)
264 }
265
266 fn test_cache_weight_config() -> CacheWeightConfig {
267 CacheWeightConfig::new(100, 4, 100)
268 }
269
270 mod setup {
271 use std::time::SystemTime;
272
273 use crate::cache::clock::Clock;
274
275 #[derive(Clone)]
276 pub(crate) struct UnixEpochClock;
277
278 impl Clock for UnixEpochClock {
279 fn now(&self) -> SystemTime {
280 SystemTime::UNIX_EPOCH
281 }
282 }
283 }
284
285 #[test]
286 fn result_on_shutdown() {
287 let result = shutdown_result();
288 assert!(result.is_err());
289 }
290
291 #[tokio::test]
292 async fn puts_a_key_value_after_shutdown_with_delay() {
293 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
294 let store = test_store(SystemClock::boxed(), stats_counter.clone());
295 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
296
297 let command_executor = CommandExecutor::new(
298 store.clone(),
299 admission_policy,
300 stats_counter,
301 no_action_ttl_ticker(),
302 10,
303 );
304 command_executor.shutdown().unwrap().handle().await;
305
306 thread::sleep(Duration::from_secs(1));
307
308 let send_result = command_executor.send(CommandType::Put(
309 KeyDescription::new("topic", 1, 1029, 10),
310 "microservices",
311 ));
312
313 assert!(send_result.is_err() || send_result.unwrap().handle().await == CommandStatus::ShuttingDown);
314 }
315
316 #[tokio::test]
317 async fn puts_a_key_value_after_shutdown() {
318 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
319 let store = test_store(SystemClock::boxed(), stats_counter.clone());
320 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
321
322 let command_executor = CommandExecutor::new(
323 store.clone(),
324 admission_policy,
325 stats_counter,
326 no_action_ttl_ticker(),
327 10,
328 );
329 command_executor.shutdown().unwrap().handle().await;
330
331 let send_result = command_executor.send(CommandType::Put(
332 KeyDescription::new("topic", 1, 1029, 10),
333 "microservices",
334 ));
335 assert!(send_result.is_err() || send_result.unwrap().handle().await == CommandStatus::ShuttingDown);
336 }
337
338 #[tokio::test]
339 async fn puts_a_key_value() {
340 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
341 let store = test_store(SystemClock::boxed(), stats_counter.clone());
342 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
343
344 let command_executor = CommandExecutor::new(
345 store.clone(),
346 admission_policy,
347 stats_counter,
348 no_action_ttl_ticker(),
349 10,
350 );
351
352 let command_acknowledgement = command_executor.send(CommandType::Put(
353 KeyDescription::new("topic", 1, 1029, 10),
354 "microservices",
355 )).unwrap();
356 command_acknowledgement.handle().await;
357
358 command_executor.shutdown().unwrap().handle().await;
359 assert_eq!(Some("microservices"), store.get(&"topic"));
360 }
361
362 #[tokio::test]
363 async fn key_value_gets_rejected_given_its_weight_is_more_than_the_cache_weight() {
364 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
365 let store = test_store(SystemClock::boxed(), stats_counter.clone());
366 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
367
368 let command_executor = CommandExecutor::new(
369 store.clone(),
370 admission_policy,
371 stats_counter.clone(),
372 no_action_ttl_ticker(),
373 10,
374 );
375
376 let command_acknowledgement = command_executor.send(CommandType::Put(
377 KeyDescription::new("topic", 1, 1029, 200),
378 "microservices",
379 )).unwrap();
380 let status = command_acknowledgement.handle().await;
381
382 command_executor.shutdown().unwrap().handle().await;
383 assert_eq!(None, store.get(&"topic"));
384 assert_eq!(CommandStatus::Rejected(KeyWeightIsGreaterThanCacheWeight), status);
385 }
386
387 #[tokio::test]
388 async fn rejects_a_key_value_and_increase_stats() {
389 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
390 let store = test_store(SystemClock::boxed(), stats_counter.clone());
391 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
392
393 let command_executor = CommandExecutor::new(
394 store.clone(),
395 admission_policy,
396 stats_counter.clone(),
397 no_action_ttl_ticker(),
398 10,
399 );
400
401 let command_acknowledgement = command_executor.send(CommandType::Put(
402 KeyDescription::new("topic", 1, 1029, 200),
403 "microservices",
404 )).unwrap();
405 let status = command_acknowledgement.handle().await;
406
407 command_executor.shutdown().unwrap().handle().await;
408 assert_eq!(CommandStatus::Rejected(KeyWeightIsGreaterThanCacheWeight), status);
409 assert_eq!(1, stats_counter.keys_rejected());
410 }
411
412 #[tokio::test]
413 async fn puts_a_couple_of_key_values() {
414 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
415 let store = test_store(SystemClock::boxed(), stats_counter.clone());
416 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
417
418 let command_executor = CommandExecutor::new(
419 store.clone(),
420 admission_policy,
421 stats_counter,
422 no_action_ttl_ticker(),
423 10,
424 );
425
426 let acknowledgement = command_executor.send(CommandType::Put(
427 KeyDescription::new("topic", 1, 1029, 10),
428 "microservices",
429 )).unwrap();
430 let other_acknowledgment = command_executor.send(CommandType::Put(
431 KeyDescription::new("disk", 2, 2076, 3),
432 "SSD",
433 )).unwrap();
434 acknowledgement.handle().await;
435 other_acknowledgment.handle().await;
436
437 command_executor.shutdown().unwrap().handle().await;
438 assert_eq!(Some("microservices"), store.get(&"topic"));
439 assert_eq!(Some("SSD"), store.get(&"disk"));
440 }
441
442 #[tokio::test]
443 async fn puts_a_key_value_with_ttl() {
444 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
445 let store = test_store(SystemClock::boxed(), stats_counter.clone());
446 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
447
448 let ttl_ticker = no_action_ttl_ticker();
449 let command_executor = CommandExecutor::new(
450 store.clone(),
451 admission_policy,
452 stats_counter,
453 ttl_ticker.clone(),
454 10,
455 );
456
457 let acknowledgement = command_executor.send(CommandType::PutWithTTL(
458 KeyDescription::new("topic", 1, 1029, 10),
459 "microservices",
460 Duration::from_secs(10),
461 )).unwrap();
462 acknowledgement.handle().await;
463
464 command_executor.shutdown().unwrap().handle().await;
465 assert_eq!(Some("microservices"), store.get(&"topic"));
466
467 let expiry = store.get_ref(&"topic").unwrap().value().expire_after().unwrap();
468 let expiry_in_ttl_ticker = ttl_ticker.get(&1, &expiry).unwrap();
469
470 assert_eq!(expiry, expiry_in_ttl_ticker);
471 }
472
473 #[tokio::test]
474 async fn rejects_a_key_value_with_ttl_and_increase_stats() {
475 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
476 let store = test_store(SystemClock::boxed(), stats_counter.clone());
477 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
478
479 let command_executor = CommandExecutor::new(
480 store.clone(),
481 admission_policy,
482 stats_counter.clone(),
483 no_action_ttl_ticker(),
484 10,
485 );
486
487 let acknowledgement = command_executor.send(CommandType::PutWithTTL(
488 KeyDescription::new("topic", 1, 1029, 4000),
489 "microservices",
490 Duration::from_secs(10),
491 )).unwrap();
492 acknowledgement.handle().await;
493
494 command_executor.shutdown().unwrap().handle().await;
495 assert_eq!(1, stats_counter.keys_rejected());
496 }
497
498 #[tokio::test]
499 async fn deletes_a_key() {
500 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
501 let store = test_store(SystemClock::boxed(), stats_counter.clone());
502 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
503 let ttl_ticker = no_action_ttl_ticker();
504
505 let command_executor = CommandExecutor::new(
506 store.clone(),
507 admission_policy,
508 stats_counter,
509 ttl_ticker.clone(),
510 10,
511 );
512
513 let acknowledgement = command_executor.send(CommandType::PutWithTTL(
514 KeyDescription::new("topic", 10, 1029, 10),
515 "microservices",
516 Duration::from_secs(10),
517 )).unwrap();
518 acknowledgement.handle().await;
519
520 let expiry = store.get_ref(&"topic").unwrap().value().expire_after().unwrap();
521 let expiry_in_ttl_ticker = ttl_ticker.get(&10, &expiry).unwrap();
522
523 assert_eq!(Some("microservices"), store.get(&"topic"));
524 assert_eq!(expiry, expiry_in_ttl_ticker);
525
526 let acknowledgement =
527 command_executor.send(CommandType::Delete("topic")).unwrap();
528 acknowledgement.handle().await;
529
530 command_executor.shutdown().unwrap().handle().await;
531 assert_eq!(None, store.get(&"topic"));
532 assert_eq!(None, ttl_ticker.get(&10, &expiry));
533 }
534
535 #[tokio::test]
536 async fn deletion_of_a_non_existing_key_value_gets_rejected() {
537 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
538 let store= test_store(SystemClock::boxed(), stats_counter.clone());
539 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
540
541 let command_executor = CommandExecutor::new(
542 store.clone(),
543 admission_policy,
544 stats_counter,
545 no_action_ttl_ticker(),
546 10,
547 );
548
549 let acknowledgement =
550 command_executor.send(CommandType::Delete("non-existing")).unwrap();
551 let status = acknowledgement.handle().await;
552
553 command_executor.shutdown().unwrap().handle().await;
554 assert_eq!(CommandStatus::Rejected(KeyDoesNotExist), status);
555 }
556}
557
558#[cfg(test)]
559mod sociable_tests {
560 use std::sync::Arc;
561 use std::thread;
562 use std::time::Duration;
563
564 use crate::cache::buffer_event::{BufferConsumer, BufferEvent};
565 use crate::cache::clock::{ClockType, SystemClock};
566 use crate::cache::command::{CommandStatus, CommandType};
567 use crate::cache::command::command_executor::CommandExecutor;
568 use crate::cache::command::command_executor::Store;
569 use crate::cache::expiration::config::TTLConfig;
570 use crate::cache::expiration::TTLTicker;
571 use crate::cache::key_description::KeyDescription;
572 use crate::cache::policy::admission_policy::AdmissionPolicy;
573 use crate::cache::policy::config::CacheWeightConfig;
574 use crate::cache::stats::ConcurrentStatsCounter;
575
576 fn no_action_ttl_ticker() -> Arc<TTLTicker> {
577 TTLTicker::new(TTLConfig::new(4, Duration::from_secs(300), SystemClock::boxed()), |_key_id| {})
578 }
579
580 fn test_store(clock: ClockType, stats_counter: Arc<ConcurrentStatsCounter>) -> Arc<Store<&'static str, &'static str>> {
581 Store::new(clock, stats_counter, 16, 4)
582 }
583
584 fn test_cache_weight_config() -> CacheWeightConfig {
585 CacheWeightConfig::new(100, 4, 100)
586 }
587
588 #[tokio::test]
589 async fn puts_a_key_value() {
590 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
591 let store = test_store(SystemClock::boxed(), stats_counter.clone());
592 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
593
594 let command_executor = CommandExecutor::new(
595 store.clone(),
596 admission_policy.clone(),
597 stats_counter,
598 no_action_ttl_ticker(),
599 10,
600 );
601
602 let key_description = KeyDescription::new("topic", 1, 1029, 10);
603 let key_id = key_description.id;
604 let command_acknowledgement = command_executor.send(CommandType::Put(
605 key_description,
606 "microservices",
607 )).unwrap();
608 command_acknowledgement.handle().await;
609
610 command_executor.shutdown().unwrap().handle().await;
611 assert_eq!(Some("microservices"), store.get(&"topic"));
612 assert!(admission_policy.contains(&key_id));
613 }
614
615 #[tokio::test]
616 async fn puts_a_key_value_by_eliminating_victims() {
617 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
618 let store = test_store(SystemClock::boxed(), stats_counter.clone());
619 let cache_weight_config = CacheWeightConfig::new(100, 4, 10);
620 let admission_policy = Arc::new(AdmissionPolicy::new(10, cache_weight_config, stats_counter.clone()));
621
622 let key_hashes = vec![10, 14, 116];
623 admission_policy.accept(BufferEvent::Full(key_hashes));
624 thread::sleep(Duration::from_secs(1));
625
626 let command_executor = CommandExecutor::new(
627 store.clone(),
628 admission_policy.clone(),
629 stats_counter,
630 no_action_ttl_ticker(),
631 10,
632 );
633
634 let command_acknowledgement = command_executor.send(CommandType::Put(
635 KeyDescription::new("topic", 1, 10, 5),
636 "microservices",
637 )).unwrap();
638 let status = command_acknowledgement.handle().await;
639 assert_eq!(CommandStatus::Accepted, status);
640
641 let command_acknowledgement = command_executor.send(CommandType::Put(
642 KeyDescription::new("disk", 2, 14, 6),
643 "SSD",
644 )).unwrap();
645 let status = command_acknowledgement.handle().await;
646 assert_eq!(CommandStatus::Accepted, status);
647
648 command_executor.shutdown().unwrap().handle().await;
649
650 assert!(admission_policy.contains(&2));
651 assert_eq!(Some("SSD"), store.get(&"disk"));
652
653 assert!(!admission_policy.contains(&1));
654 assert_eq!(None, store.get(&"topic"));
655 }
656
657 #[tokio::test]
658 async fn deletes_a_key() {
659 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
660 let store = test_store(SystemClock::boxed(), stats_counter.clone());
661 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
662 let command_executor = CommandExecutor::new(
663 store.clone(),
664 admission_policy.clone(),
665 stats_counter,
666 no_action_ttl_ticker(),
667 10,
668 );
669
670 let acknowledgement = command_executor.send(CommandType::Put(
671 KeyDescription::new("topic", 1, 1029, 10),
672 "microservices",
673 )).unwrap();
674 acknowledgement.handle().await;
675
676 let acknowledgement =
677 command_executor.send(CommandType::Delete("topic")).unwrap();
678 acknowledgement.handle().await;
679
680 command_executor.shutdown().unwrap().handle().await;
681 assert_eq!(None, store.get(&"topic"));
682 assert!(!admission_policy.contains(&1));
683 }
684
685 #[tokio::test]
686 async fn updates_the_weight_of_the_key() {
687 let stats_counter = Arc::new(ConcurrentStatsCounter::new());
688 let store = test_store(SystemClock::boxed(), stats_counter.clone());
689 let admission_policy = Arc::new(AdmissionPolicy::new(10, test_cache_weight_config(), stats_counter.clone()));
690
691 let command_executor = CommandExecutor::new(
692 store.clone(),
693 admission_policy.clone(),
694 stats_counter,
695 no_action_ttl_ticker(),
696 10,
697 );
698
699 let key_description = KeyDescription::new("topic", 1, 1029, 10);
700 let key_id = key_description.id;
701 let command_acknowledgement = command_executor.send(CommandType::Put(
702 key_description,
703 "microservices",
704 )).unwrap();
705 command_acknowledgement.handle().await;
706
707 let command_acknowledgement = command_executor.send(CommandType::UpdateWeight(
708 1, 20)).unwrap();
709 command_acknowledgement.handle().await;
710
711 command_executor.shutdown().unwrap().handle().await;
712 assert_eq!(Some("microservices"), store.get(&"topic"));
713 assert_eq!(Some(20), admission_policy.weight_of(&key_id));
714 }
715}