1use std::collections::HashMap;
70use std::sync::Arc;
71use std::time::Duration;
72
73use crate::commands::{queue, Call, CommandResult};
74use crate::commands::{CommandDictionary, ResultExt};
75use crate::config::Config;
76use crate::fmt::{format_duration, format_size, parse_duration, parse_size};
77use crate::platform::Platform;
78use crate::spawn;
79use num_derive::FromPrimitive;
80use num_traits::FromPrimitive;
81
82use crate::lru::LruCache;
83use yaml_rust::yaml::Hash;
84use yaml_rust::Yaml;
85
86#[derive(FromPrimitive)]
88enum Commands {
89 Put,
90 Puts,
91 Get,
92 ExtendedGet,
93 Remove,
94 Removes,
95 Flush,
96 Stats,
97 Keys,
98}
99
100type StringCache = LruCache<String>;
102
103pub fn install(platform: Arc<Platform>) {
109 let queue = actor(platform.clone());
110
111 let commands = platform.require::<CommandDictionary>();
112 commands.register_command("LRU.PUT", queue.clone(), Commands::Put as usize);
113 commands.register_command("LRU.PUTS", queue.clone(), Commands::Puts as usize);
114 commands.register_command("LRU.GET", queue.clone(), Commands::Get as usize);
115 commands.register_command("LRU.REMOVE", queue.clone(), Commands::Remove as usize);
116 commands.register_command("LRU.REMOVES", queue.clone(), Commands::Removes as usize);
117 commands.register_command("LRU.XGET", queue.clone(), Commands::ExtendedGet as usize);
118 commands.register_command("LRU.FLUSH", queue.clone(), Commands::Flush as usize);
119 commands.register_command("LRU.STATS", queue.clone(), Commands::Stats as usize);
120 commands.register_command("LRU.KEYS", queue, Commands::Keys as usize);
121}
122
123fn actor(platform: Arc<Platform>) -> crate::commands::Queue {
125 let (queue, mut endpoint) = queue();
126
127 spawn!(async move {
128 let config = platform.require::<Config>();
129 let mut config_changed = config.notifier();
130
131 let mut caches: HashMap<String, StringCache> = HashMap::new();
132 caches = update_config(caches, &config);
133
134 while platform.is_running() {
135 tokio::select! {
136 _ = config_changed.recv() => { caches = update_config(caches, &config) }
137 msg = endpoint.recv() => {
138 if let Some(mut call) = msg {
139 match Commands::from_usize(call.token) {
140 Some(Commands::Put) => put_command(&mut call, &mut caches).complete(call),
141 Some(Commands::Puts) => puts_command(&mut call, &mut caches).complete(call),
142 Some(Commands::Get) => get_command(&mut call, &mut caches).complete(call),
143 Some(Commands::ExtendedGet) => extended_get_command(&mut call, &mut caches).complete(call),
144 Some(Commands::Remove) => remove_command(&mut call, &mut caches).complete(call),
145 Some(Commands::Removes) => removes_command(&mut call, &mut caches).complete(call),
146 Some(Commands::Flush) => flush_command(&mut call, &mut caches).complete(call),
147 Some(Commands::Stats) => stats_command(&mut call, &mut caches).complete(call),
148 Some(Commands::Keys) => keys_command(&mut call, &mut caches).complete(call),
149 _ => call.handle_unknown_token(),
150 }
151 }
152 }
153 }
154 }
155 });
156
157 queue
158}
159
160fn update_config(
166 caches: HashMap<String, StringCache>,
167 config: &Arc<Config>,
168) -> HashMap<String, StringCache> {
169 let handle = config.current();
170 if let Yaml::Hash(ref map) = handle.config()["caches"] {
171 parse_config(caches, map)
172 } else {
173 log::info!("Config does not contain a 'caches' object. Skipping config update.");
174 caches
175 }
176}
177
178fn parse_config(
181 mut caches: HashMap<String, StringCache>,
182 map: &Hash,
183) -> HashMap<String, StringCache> {
184 let mut result = HashMap::new();
185 for (name, config) in map {
186 let name = name.as_str().unwrap_or("");
187 let current_cache = caches.remove(name);
188 if let Some(cache) = create_or_update(name, current_cache, config) {
189 let _ = result.insert(name.to_owned(), cache);
190 }
191 }
192
193 for name in caches.keys() {
194 log::info!("Dropping stale cache {}...", name);
195 }
196
197 result
198}
199
200fn create_or_update(
206 name: &str,
207 current_cache: Option<StringCache>,
208 config: &Yaml,
209) -> Option<StringCache> {
210 let size = match config["size"].as_i64().filter(|value| *value > 0) {
211 None => {
212 log::error!(
213 "Not going to create or update {} as no cache size was given.",
214 name
215 );
216 return current_cache;
217 }
218 Some(n) => n,
219 } as usize;
220
221 let max_memory = match parse_size(config["max_memory"].as_str().unwrap_or("")) {
222 Err(error) => {
223 log::error!(
224 "Not going to create or update {}. Failed to parse 'max_memory': {}",
225 name,
226 error
227 );
228 return current_cache;
229 }
230 Ok(n) => n,
231 };
232
233 let soft_ttl = match parse_duration(config["soft_ttl"].as_str().unwrap_or("")) {
234 Ok(duration) => duration,
235 Err(error) => {
236 log::error!(
237 "Not going to create or update {}. Failed to parse 'soft_ttl': {}",
238 name,
239 error
240 );
241 return current_cache;
242 }
243 };
244
245 let hard_ttl = match parse_duration(config["hard_ttl"].as_str().unwrap_or("")) {
246 Ok(duration) => duration,
247 Err(error) => {
248 log::error!(
249 "Not going to create or update {}. Failed to parse 'hard_ttl': {}",
250 name,
251 error
252 );
253 return current_cache;
254 }
255 };
256
257 let refresh_interval = match parse_duration(config["refresh_interval"].as_str().unwrap_or("")) {
258 Ok(duration) => duration,
259 Err(error) => {
260 log::error!(
261 "Not going to create or update {}. Failed to parse 'refresh_interval': {}",
262 name,
263 error
264 );
265 return current_cache;
266 }
267 };
268
269 match current_cache {
270 Some(mut cache) => {
271 update_cache(
272 name,
273 &mut cache,
274 size,
275 max_memory,
276 soft_ttl,
277 hard_ttl,
278 refresh_interval,
279 );
280
281 Some(cache)
282 }
283 None => {
284 log::info!("Creating new cache {}...", name);
285 Some(LruCache::new(
286 size,
287 max_memory,
288 soft_ttl,
289 hard_ttl,
290 refresh_interval,
291 ))
292 }
293 }
294}
295
296fn update_cache(
298 name: &str,
299 cache: &mut StringCache,
300 capacity: usize,
301 max_memory: usize,
302 soft_ttl: Duration,
303 hard_ttl: Duration,
304 refresh_interval: Duration,
305) {
306 if cache.capacity() != capacity {
307 log::info!(
308 "Updating the size of {} from {} to {}.",
309 name,
310 cache.capacity(),
311 capacity
312 );
313 cache.set_capacity(capacity);
314 }
315
316 if cache.max_memory() != max_memory {
317 log::info!(
318 "Updating max_memory of {} from {} to {}.",
319 name,
320 format_size(cache.max_memory()),
321 format_size(max_memory)
322 );
323 cache.set_max_memory(max_memory);
324 }
325
326 if cache.soft_ttl() != soft_ttl {
327 log::info!(
328 "Updating soft_ttl of {} from {} to {}.",
329 name,
330 format_duration(cache.soft_ttl()),
331 format_duration(soft_ttl)
332 );
333 cache.set_soft_ttl(soft_ttl);
334
335 log::info!("Flushing {} due to changed TTL settings...", name);
336 cache.flush();
337 }
338
339 if cache.hard_ttl() != hard_ttl {
340 log::info!(
341 "Updating hard_ttl of {} from {} to {}.",
342 name,
343 format_duration(cache.hard_ttl()),
344 format_duration(hard_ttl)
345 );
346 cache.set_hard_ttl(hard_ttl);
347
348 log::info!("Flushing {} due to changed TTL settings...", name);
349 cache.flush();
350 }
351
352 if cache.refresh_interval() != refresh_interval {
353 log::info!(
354 "Updating refresh_interval of {} from {} to {}.",
355 name,
356 format_duration(cache.refresh_interval()),
357 format_duration(refresh_interval)
358 );
359 cache.set_refresh_interval(refresh_interval);
360 }
361}
362
363fn get_cache<'a>(
365 name: &str,
366 caches: &'a mut HashMap<String, StringCache>,
367) -> anyhow::Result<&'a mut StringCache> {
368 match caches.get_mut(name) {
369 Some(cache) => Ok(cache),
370 None => Err(anyhow::anyhow!("Unknown cache: {}", name)),
371 }
372}
373
374fn put_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
376 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
377
378 cache.put(
379 call.request.str_parameter(1)?.to_owned(),
380 call.request.str_parameter(2)?.to_owned(),
381 )?;
382
383 call.response.ok()?;
384 Ok(())
385}
386
387fn puts_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
389 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
390
391 let mut secondary_keys = Vec::with_capacity(call.request.parameter_count() - 3);
392 for index in 3..call.request.parameter_count() {
393 secondary_keys.push(call.request.str_parameter(index)?.to_owned());
394 }
395
396 cache.put_with_secondaries(
397 call.request.str_parameter(1)?.to_owned(),
398 call.request.str_parameter(2)?.to_owned(),
399 Some(secondary_keys),
400 )?;
401
402 call.response.ok()?;
403 Ok(())
404}
405
406fn get_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
408 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
409
410 if let Some(value) = cache.get(call.request.str_parameter(1)?) {
411 call.response.bulk(value)?;
412 } else {
413 call.response.empty_string()?;
414 }
415
416 Ok(())
417}
418
419fn extended_get_command(
421 call: &mut Call,
422 caches: &mut HashMap<String, StringCache>,
423) -> CommandResult {
424 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
425
426 call.response.array(3)?;
427 if let Some((alive, refresh, value)) = cache.extended_get(call.request.str_parameter(1)?) {
428 call.response.boolean(alive)?;
429 call.response.boolean(refresh)?;
430 call.response.bulk(value)?;
431 } else {
432 call.response.boolean(false)?;
433 call.response.boolean(false)?;
434 call.response.empty_string()?;
435 }
436
437 Ok(())
438}
439
440fn remove_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
442 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
443 cache.remove(call.request.str_parameter(1)?);
444 call.response.ok()?;
445
446 Ok(())
447}
448
449fn removes_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
451 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
452 cache.remove_by_secondary(call.request.str_parameter(1)?);
453 call.response.ok()?;
454
455 Ok(())
456}
457
458fn flush_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
460 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
461 cache.flush();
462 call.response.ok()?;
463
464 Ok(())
465}
466
467fn stats_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
469 if call.request.parameter_count() == 0 {
470 all_stats_command(call, caches)
471 } else {
472 cache_stats_command(call, caches)
473 }
474}
475
476fn all_stats_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
478 let mut result = String::new();
479
480 result += "Use 'LRU.STATS <cache>' for detailed metrics.\n\n";
481
482 result += format!(
483 "{:<30} {:>12} {:>20}\n",
484 "Name", "Num Entries", "Allocated Memory"
485 )
486 .as_str();
487 result += crate::response::SEPARATOR;
488
489 for (name, cache) in caches {
490 result += format!(
491 "{:<30} {:>12} {:>20}\n",
492 name,
493 cache.len(),
494 format_size(cache.allocated_memory())
495 )
496 .as_str();
497 }
498 result += crate::response::SEPARATOR;
499
500 call.response.bulk(result)?;
501
502 Ok(())
503}
504
505fn cache_stats_command(
507 call: &mut Call,
508 caches: &mut HashMap<String, StringCache>,
509) -> CommandResult {
510 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
511
512 let mut result = String::new();
513
514 result += format!("{:<30} {:>20}\n", "Num Entries", cache.len()).as_str();
515 result += format!("{:<30} {:>20}\n", "Max Entries", cache.capacity()).as_str();
516 result += format!("{:<30} {:>18.2} %\n", "Utilization", cache.utilization()).as_str();
517 result += format!(
518 "{:<30} {:>20}\n",
519 "Allocated Memory",
520 format_size(cache.allocated_memory())
521 )
522 .as_str();
523 result += format!(
524 "{:<30} {:>20}\n",
525 "Max Memory",
526 format_size(cache.max_memory())
527 )
528 .as_str();
529 result += format!(
530 "{:<30} {:>18.2} %\n",
531 "Memory Utilization",
532 cache.memory_utilization()
533 )
534 .as_str();
535 result += format!(
536 "{:<30} {:>20}\n",
537 "Total Memory",
538 format_size(cache.total_allocated_memory())
539 )
540 .as_str();
541 result += format!("{:<30} {:>20}\n", "Reads", cache.reads()).as_str();
542 result += format!("{:<30} {:>20}\n", "Writes", cache.writes()).as_str();
543 result += format!("{:<30} {:>18.2} %\n", "Hit Rate", cache.hit_rate()).as_str();
544 result += format!(
545 "{:<30} {:>18.2} %\n",
546 "Write/Read Ratio",
547 cache.write_read_ratio()
548 )
549 .as_str();
550
551 call.response.bulk(result)?;
552
553 Ok(())
554}
555
556fn keys_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
558 let cache = get_cache(call.request.str_parameter(0)?, caches)?;
559 let keys: Vec<&String> = if call.request.parameter_count() > 1 {
560 let filter = call.request.str_parameter(1)?;
561 cache
562 .keys()
563 .filter(|key| key.contains(filter))
564 .take(100)
565 .collect()
566 } else {
567 cache.keys().take(100).collect()
568 };
569
570 call.response.array(keys.len() as i32)?;
571 for key in keys {
572 call.response.bulk(key)?;
573 }
574
575 Ok(())
576}
577
578#[cfg(test)]
579mod tests {
580 use crate::builder::Builder;
581 use crate::commands::{CommandDictionary, Dispatcher};
582 use crate::config::Config;
583 use crate::request::Request;
584 use mock_instant::thread_local::MockClock;
585 use std::time::Duration;
586
587 #[test]
591 fn test_commands() {
592 crate::testing::test_async(async {
593 let platform = Builder::new()
594 .enable_config()
595 .enable_commands()
596 .build()
597 .await;
598
599 platform
601 .require::<Config>()
602 .load_from_string(
603 "caches:
604 test:
605 size: 10000
606 max_memory: 16m
607 soft_ttl: 15m
608 hard_ttl: 30m
609 refresh_interval: 10s
610 ",
611 None,
612 )
613 .unwrap();
614
615 crate::lru::cache::install(platform.clone());
617
618 let mut dispatcher = platform.require::<CommandDictionary>().dispatcher();
620
621 perform_put_get_keys_remove(&mut dispatcher).await;
623
624 perform_puts_removes(&mut dispatcher).await;
626
627 perform_put_get_xget(&mut dispatcher).await;
629
630 perform_put_flush(&mut dispatcher).await;
632
633 perform_stats(dispatcher).await;
635 });
636 }
637
638 async fn perform_stats(mut dispatcher: Dispatcher) {
639 let result = dispatcher
642 .invoke(Request::example(vec!["LRU.STATS"]), None)
643 .await
644 .unwrap();
645 assert_eq!(std::str::from_utf8(&result[0..1]).unwrap(), "$");
646 let result = dispatcher
647 .invoke(Request::example(vec!["LRU.STATS", "test"]), None)
648 .await
649 .unwrap();
650 assert_eq!(std::str::from_utf8(&result[0..1]).unwrap(), "$");
651 }
652
653 async fn perform_put_flush(dispatcher: &mut Dispatcher) {
654 let _ = dispatcher
655 .invoke(
656 Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
657 None,
658 )
659 .await
660 .unwrap();
661
662 let result = dispatcher
664 .invoke(Request::example(vec!["LRU.FLUSH", "test"]), None)
665 .await
666 .unwrap();
667 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
668
669 let result = dispatcher
671 .invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
672 .await
673 .unwrap();
674 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
675 }
676
677 async fn perform_put_get_xget(dispatcher: &mut Dispatcher) {
678 let result = dispatcher
679 .invoke(
680 Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
681 None,
682 )
683 .await
684 .unwrap();
685 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
686
687 MockClock::advance(Duration::from_secs(16 * 60));
689
690 let result = dispatcher
692 .invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
693 .await
694 .unwrap();
695 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
696
697 let result = dispatcher
699 .invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
700 .await
701 .unwrap();
702 assert_eq!(
703 std::str::from_utf8(&result[..]).unwrap(),
704 "*3\r\n:0\r\n:1\r\n$3\r\nbar\r\n"
705 );
706
707 let result = dispatcher
709 .invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
710 .await
711 .unwrap();
712 assert_eq!(
713 std::str::from_utf8(&result[..]).unwrap(),
714 "*3\r\n:1\r\n:0\r\n$3\r\nbar\r\n"
715 );
716
717 MockClock::advance(Duration::from_secs(12));
720 let result = dispatcher
721 .invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
722 .await
723 .unwrap();
724 assert_eq!(
725 std::str::from_utf8(&result[..]).unwrap(),
726 "*3\r\n:0\r\n:1\r\n$3\r\nbar\r\n"
727 );
728
729 MockClock::advance(Duration::from_secs(16 * 60));
731 let result = dispatcher
732 .invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
733 .await
734 .unwrap();
735 assert_eq!(
736 std::str::from_utf8(&result[..]).unwrap(),
737 "*3\r\n:0\r\n:0\r\n+\r\n"
738 );
739 }
740
741 async fn perform_put_get_keys_remove(dispatcher: &mut Dispatcher) {
742 let result = dispatcher
743 .invoke(
744 Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
745 None,
746 )
747 .await
748 .unwrap();
749 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
750
751 let result = dispatcher
753 .invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
754 .await
755 .unwrap();
756 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "$3\r\nbar\r\n");
757
758 let result = dispatcher
760 .invoke(Request::example(vec!["LRU.KEYS", "test"]), None)
761 .await
762 .unwrap();
763 assert_eq!(
764 std::str::from_utf8(&result[..]).unwrap(),
765 "*1\r\n$3\r\nfoo\r\n"
766 );
767 let result = dispatcher
769 .invoke(Request::example(vec!["LRU.KEYS", "test", "fo"]), None)
770 .await
771 .unwrap();
772 assert_eq!(
773 std::str::from_utf8(&result[..]).unwrap(),
774 "*1\r\n$3\r\nfoo\r\n"
775 );
776 let result = dispatcher
778 .invoke(Request::example(vec!["LRU.KEYS", "test", "xx"]), None)
779 .await
780 .unwrap();
781 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "*0\r\n");
782
783 let result = dispatcher
785 .invoke(Request::example(vec!["LRU.REMOVE", "test", "foo"]), None)
786 .await
787 .unwrap();
788 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
789
790 let result = dispatcher
792 .invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
793 .await
794 .unwrap();
795 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
796 }
797
798 async fn perform_puts_removes(dispatcher: &mut Dispatcher) {
799 let _ = dispatcher
800 .invoke(
801 Request::example(vec!["LRU.PUTS", "test", "foo", "bar", "A"]),
802 None,
803 )
804 .await
805 .unwrap();
806 let _ = dispatcher
807 .invoke(
808 Request::example(vec!["LRU.PUTS", "test", "foo1", "bar1", "A", "B"]),
809 None,
810 )
811 .await
812 .unwrap();
813 let _ = dispatcher
814 .invoke(
815 Request::example(vec!["LRU.PUTS", "test", "foo2", "bar2", "B"]),
816 None,
817 )
818 .await
819 .unwrap();
820
821 let _ = dispatcher
823 .invoke(Request::example(vec!["LRU.REMOVES", "test", "A"]), None)
824 .await
825 .unwrap();
826
827 let result = dispatcher
829 .invoke(Request::example(vec!["LRU.KEYS", "test"]), None)
830 .await
831 .unwrap();
832 assert_eq!(
833 std::str::from_utf8(&result[..]).unwrap(),
834 "*1\r\n$4\r\nfoo2\r\n"
835 );
836
837 let _ = dispatcher
839 .invoke(Request::example(vec!["LRU.REMOVES", "test", "B"]), None)
840 .await
841 .unwrap();
842
843 let result = dispatcher
845 .invoke(Request::example(vec!["LRU.GET", "test", "foo2"]), None)
846 .await
847 .unwrap();
848 assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
849 }
850}