jupiter/lru/
cache.rs

1//! Provides an actor which manages a set of LRU caches for string keys and values.
2//!
3//! To use this facility, [install](fn.install) has to be invoked. The configuration is fetched from
4//! the system config and will be automatically re-loaded once the file changes.
5//!
6//! # Configuration
7//! In the system config, an object name **caches** has to be present which specifies the settings
8//! for each cache:
9//!
10//! ```yaml
11//! caches:
12//!     my_cache:
13//!         # Specifies the maximal number of entries to store
14//!         size: 1024
15//!         # Specifies the maximal amount of memory to use (in bytes).
16//!         # Supports common suffixes like: k, m, g, t
17//!         max_memory: 1g
18//!         # Specifies the soft time to live. After this period, an entry is considered stale
19//!         # and will not be delivered by LRU.GET. However, LRU.XGET will deliver this entry
20//!         # but mark it as stale. Supports common suffixes like: s, m, h, d
21//!         soft_ttl: 15m
22//!         # Specifies the hard time to live. After this period, neither LRU.GET nor LRU.XGET
23//!         # will deliver this entry.
24//!         hard_ttl: 1d
25//!         # Specifies the refresh interval for LRU.XGET. If this command delivers a stale entry
26//!         # (as defined by soft_ttl), it indicates that the entry is stale an should be
27//!         # refreshed. However, once this has to be signalled to a client, it will no longer
28//!         # request a refresh from other clients until either the entry has been refreshed or
29//!         # this refresh interval has elapsed.
30//!         refresh_interval: 30s
31//! ```
32//!
33//! # Commands
34//!
35//! The actor defines the following commands:
36//! * **LRU.PUT**: `LRU.PUT cache key value` will store the given value for the given key in the
37//!   given cache.
38//! * **LRU.PUTS**: `LRU.PUTS cache key value secondary_key1 .. secondary_keyN` will store the
39//!   given value for the given key in the given cache. Note that the value can only be queried
40//!   using the given key, but it cann be purged from the cache using one of the given secondary
41//!   key using `LRU.REMOVES`.
42//! * **LRU.GET**: `LRU.GET cache key` will perform a lookup for the given key in the given cache
43//!   and return the value being stored or an empty string if no value is present.
44//! * **LRU.XGET**: `LRU.XGET cache key` will behave just like **LRU.GET**. However, its output is
45//!   a bit more elaborate. It will always respond with three values: ACTIVE, REFRESH, VALUE. If
46//!   no value was found for the given key, ACTIVE and REFRESH will be 0 and VALUE will be an empty
47//!   string. If a non-stale entry way found, ACTIVE is 1, REFRESH is 0 an VALUE will be the value
48//!   associated with the key. Now the interesting part: If a stale entry (older than *soft_ttl* but
49//!   younger than *hard_ttl*) was found, ACTIVE will be 0. For the first client to request this
50//!   entry, REFRESH will be 1 and the VALUE will be the stale value associated with the key. For
51//!   all subsequent invocations of this command, REFRESH will be 0 until either the entry was
52//!   updated (by calling **LRU.PUT**) or if the *refresh_interval* has elapsed since the first
53//!   invocation. Using this approach one can build "lazy" caches, which refresh on demand, without
54//!   slowing the requesting client down (stale content can be delivered quickly, if the application
55//!   accepts doing so) and also without overloading the system, as only one client will typically
56//!   try to obtain a fresh value instead of all clients at once.
57//! * **LRU.REMOVE**: `LRU.REMOVE cache key` will remove the value associated with the given key.
58//!   Note that the value will be immediately gone without respecting any TTL.
59//! * **LRU.REMOVES**: `LRU.REMOVES cache secondary_key` will remove all values which were
60//!    associated with the given secondary key using `LRU.PUTS`.
61//!   Note that the value will be immediately gone without respecting any TTL.
62//! * **LRU.FLUSH**: `LRU.FLUSH cache` will wipe all contents of the given cache.
63//! * **LRU.STATS**: `LRU.STATS` will provide an overview of all active caches. `LRU.STATS cache`
64//!   will provide detailed metrics about the given cache.
65//! * **LRU.KEYS**: `LRU.KEYS cache filter` can be used to retrieve all keys which contain the given
66//!   filter (in their key). Note that the filter can also be omitted. However, only the first
67//!   100 matches will be returned in either case.
68//!
69use std::collections::HashMap;
70use std::sync::Arc;
71use std::time::Duration;
72
73use crate::commands::{queue, Call, CommandResult};
74use crate::commands::{CommandDictionary, ResultExt};
75use crate::config::Config;
76use crate::fmt::{format_duration, format_size, parse_duration, parse_size};
77use crate::platform::Platform;
78use crate::spawn;
79use num_derive::FromPrimitive;
80use num_traits::FromPrimitive;
81
82use crate::lru::LruCache;
83use yaml_rust::yaml::Hash;
84use yaml_rust::Yaml;
85
86/// Enumerates the commands supported by this actor.
87#[derive(FromPrimitive)]
88enum Commands {
89    Put,
90    Puts,
91    Get,
92    ExtendedGet,
93    Remove,
94    Removes,
95    Flush,
96    Stats,
97    Keys,
98}
99
100/// We operate on caches which store plain Strings.
101type StringCache = LruCache<String>;
102
103/// Installs the cache actor into the given platform.
104///
105/// This will automatically load the config from the **Config** in this platform (and also
106/// update the caches on change). Also this will register the commands defined above in the
107/// **CommandDictionary** of this platform.
108pub fn install(platform: Arc<Platform>) {
109    let queue = actor(platform.clone());
110
111    let commands = platform.require::<CommandDictionary>();
112    commands.register_command("LRU.PUT", queue.clone(), Commands::Put as usize);
113    commands.register_command("LRU.PUTS", queue.clone(), Commands::Puts as usize);
114    commands.register_command("LRU.GET", queue.clone(), Commands::Get as usize);
115    commands.register_command("LRU.REMOVE", queue.clone(), Commands::Remove as usize);
116    commands.register_command("LRU.REMOVES", queue.clone(), Commands::Removes as usize);
117    commands.register_command("LRU.XGET", queue.clone(), Commands::ExtendedGet as usize);
118    commands.register_command("LRU.FLUSH", queue.clone(), Commands::Flush as usize);
119    commands.register_command("LRU.STATS", queue.clone(), Commands::Stats as usize);
120    commands.register_command("LRU.KEYS", queue, Commands::Keys as usize);
121}
122
123/// Spawns the actual actor which handles all commands or processes config changes.
124fn actor(platform: Arc<Platform>) -> crate::commands::Queue {
125    let (queue, mut endpoint) = queue();
126
127    spawn!(async move {
128        let config = platform.require::<Config>();
129        let mut config_changed = config.notifier();
130
131        let mut caches: HashMap<String, StringCache> = HashMap::new();
132        caches = update_config(caches, &config);
133
134        while platform.is_running() {
135            tokio::select! {
136                _ = config_changed.recv() => { caches = update_config(caches, &config) }
137                msg = endpoint.recv() => {
138                    if let Some(mut call) = msg {
139                         match Commands::from_usize(call.token) {
140                            Some(Commands::Put) => put_command(&mut call, &mut caches).complete(call),
141                            Some(Commands::Puts) => puts_command(&mut call, &mut caches).complete(call),
142                            Some(Commands::Get) => get_command(&mut call, &mut caches).complete(call),
143                            Some(Commands::ExtendedGet) => extended_get_command(&mut call, &mut caches).complete(call),
144                            Some(Commands::Remove) => remove_command(&mut call, &mut caches).complete(call),
145                            Some(Commands::Removes) => removes_command(&mut call, &mut caches).complete(call),
146                            Some(Commands::Flush) => flush_command(&mut call, &mut caches).complete(call),
147                            Some(Commands::Stats) => stats_command(&mut call, &mut caches).complete(call),
148                            Some(Commands::Keys) => keys_command(&mut call, &mut caches).complete(call),
149                            _ => call.handle_unknown_token(),
150                        }
151                    }
152                }
153            }
154        }
155    });
156
157    queue
158}
159
160/// Updates the currently active caches based on the settings in the given config.
161///
162/// Note that this provides a safety mechanism. If no config object at all is present,
163/// we leave the current caches untouched. This prevents the system from wiping all caches
164/// in the case of an accidental change or an invalid config.
165fn update_config(
166    caches: HashMap<String, StringCache>,
167    config: &Arc<Config>,
168) -> HashMap<String, StringCache> {
169    let handle = config.current();
170    if let Yaml::Hash(ref map) = handle.config()["caches"] {
171        parse_config(caches, map)
172    } else {
173        log::info!("Config does not contain a 'caches' object. Skipping config update.");
174        caches
175    }
176}
177
178/// Actually loads the configuration for the caches now that we've verified that a config is
179/// present.
180fn parse_config(
181    mut caches: HashMap<String, StringCache>,
182    map: &Hash,
183) -> HashMap<String, StringCache> {
184    let mut result = HashMap::new();
185    for (name, config) in map {
186        let name = name.as_str().unwrap_or("");
187        let current_cache = caches.remove(name);
188        if let Some(cache) = create_or_update(name, current_cache, config) {
189            let _ = result.insert(name.to_owned(), cache);
190        }
191    }
192
193    for name in caches.keys() {
194        log::info!("Dropping stale cache {}...", name);
195    }
196
197    result
198}
199
200/// Creates or updates the cache with the given name based on the given config element.
201///
202/// In case of an invalid config, it leaves the current cache untouched. Therefore this will not
203/// create a cache with an invalid or partial config. But it will also not damage or wipe an
204/// active cache due to an accident or config problem.
205fn create_or_update(
206    name: &str,
207    current_cache: Option<StringCache>,
208    config: &Yaml,
209) -> Option<StringCache> {
210    let size = match config["size"].as_i64().filter(|value| *value > 0) {
211        None => {
212            log::error!(
213                "Not going to create or update {} as no cache size was given.",
214                name
215            );
216            return current_cache;
217        }
218        Some(n) => n,
219    } as usize;
220
221    let max_memory = match parse_size(config["max_memory"].as_str().unwrap_or("")) {
222        Err(error) => {
223            log::error!(
224                "Not going to create or update {}. Failed to parse 'max_memory': {}",
225                name,
226                error
227            );
228            return current_cache;
229        }
230        Ok(n) => n,
231    };
232
233    let soft_ttl = match parse_duration(config["soft_ttl"].as_str().unwrap_or("")) {
234        Ok(duration) => duration,
235        Err(error) => {
236            log::error!(
237                "Not going to create or update {}. Failed to parse 'soft_ttl': {}",
238                name,
239                error
240            );
241            return current_cache;
242        }
243    };
244
245    let hard_ttl = match parse_duration(config["hard_ttl"].as_str().unwrap_or("")) {
246        Ok(duration) => duration,
247        Err(error) => {
248            log::error!(
249                "Not going to create or update {}. Failed to parse 'hard_ttl': {}",
250                name,
251                error
252            );
253            return current_cache;
254        }
255    };
256
257    let refresh_interval = match parse_duration(config["refresh_interval"].as_str().unwrap_or("")) {
258        Ok(duration) => duration,
259        Err(error) => {
260            log::error!(
261                "Not going to create or update {}. Failed to parse 'refresh_interval': {}",
262                name,
263                error
264            );
265            return current_cache;
266        }
267    };
268
269    match current_cache {
270        Some(mut cache) => {
271            update_cache(
272                name,
273                &mut cache,
274                size,
275                max_memory,
276                soft_ttl,
277                hard_ttl,
278                refresh_interval,
279            );
280
281            Some(cache)
282        }
283        None => {
284            log::info!("Creating new cache {}...", name);
285            Some(LruCache::new(
286                size,
287                max_memory,
288                soft_ttl,
289                hard_ttl,
290                refresh_interval,
291            ))
292        }
293    }
294}
295
296/// Applies the new config values on an existing cache.
297fn update_cache(
298    name: &str,
299    cache: &mut StringCache,
300    capacity: usize,
301    max_memory: usize,
302    soft_ttl: Duration,
303    hard_ttl: Duration,
304    refresh_interval: Duration,
305) {
306    if cache.capacity() != capacity {
307        log::info!(
308            "Updating the size of {} from {} to {}.",
309            name,
310            cache.capacity(),
311            capacity
312        );
313        cache.set_capacity(capacity);
314    }
315
316    if cache.max_memory() != max_memory {
317        log::info!(
318            "Updating max_memory of {} from {} to {}.",
319            name,
320            format_size(cache.max_memory()),
321            format_size(max_memory)
322        );
323        cache.set_max_memory(max_memory);
324    }
325
326    if cache.soft_ttl() != soft_ttl {
327        log::info!(
328            "Updating soft_ttl of {} from {} to {}.",
329            name,
330            format_duration(cache.soft_ttl()),
331            format_duration(soft_ttl)
332        );
333        cache.set_soft_ttl(soft_ttl);
334
335        log::info!("Flushing {} due to changed TTL settings...", name);
336        cache.flush();
337    }
338
339    if cache.hard_ttl() != hard_ttl {
340        log::info!(
341            "Updating hard_ttl of {} from {} to {}.",
342            name,
343            format_duration(cache.hard_ttl()),
344            format_duration(hard_ttl)
345        );
346        cache.set_hard_ttl(hard_ttl);
347
348        log::info!("Flushing {} due to changed TTL settings...", name);
349        cache.flush();
350    }
351
352    if cache.refresh_interval() != refresh_interval {
353        log::info!(
354            "Updating refresh_interval of {} from {} to {}.",
355            name,
356            format_duration(cache.refresh_interval()),
357            format_duration(refresh_interval)
358        );
359        cache.set_refresh_interval(refresh_interval);
360    }
361}
362
363/// Obtains the cache with the given name or yields an appropriate error message.
364fn get_cache<'a>(
365    name: &str,
366    caches: &'a mut HashMap<String, StringCache>,
367) -> anyhow::Result<&'a mut StringCache> {
368    match caches.get_mut(name) {
369        Some(cache) => Ok(cache),
370        None => Err(anyhow::anyhow!("Unknown cache: {}", name)),
371    }
372}
373
374/// Implements the LRU.PUT command.
375fn put_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
376    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
377
378    cache.put(
379        call.request.str_parameter(1)?.to_owned(),
380        call.request.str_parameter(2)?.to_owned(),
381    )?;
382
383    call.response.ok()?;
384    Ok(())
385}
386
387/// Implements the LRU.PUTS command.
388fn puts_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
389    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
390
391    let mut secondary_keys = Vec::with_capacity(call.request.parameter_count() - 3);
392    for index in 3..call.request.parameter_count() {
393        secondary_keys.push(call.request.str_parameter(index)?.to_owned());
394    }
395
396    cache.put_with_secondaries(
397        call.request.str_parameter(1)?.to_owned(),
398        call.request.str_parameter(2)?.to_owned(),
399        Some(secondary_keys),
400    )?;
401
402    call.response.ok()?;
403    Ok(())
404}
405
406/// Implements the LRU.GET command.
407fn get_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
408    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
409
410    if let Some(value) = cache.get(call.request.str_parameter(1)?) {
411        call.response.bulk(value)?;
412    } else {
413        call.response.empty_string()?;
414    }
415
416    Ok(())
417}
418
419/// Implements the LRU.XGET command.
420fn extended_get_command(
421    call: &mut Call,
422    caches: &mut HashMap<String, StringCache>,
423) -> CommandResult {
424    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
425
426    call.response.array(3)?;
427    if let Some((alive, refresh, value)) = cache.extended_get(call.request.str_parameter(1)?) {
428        call.response.boolean(alive)?;
429        call.response.boolean(refresh)?;
430        call.response.bulk(value)?;
431    } else {
432        call.response.boolean(false)?;
433        call.response.boolean(false)?;
434        call.response.empty_string()?;
435    }
436
437    Ok(())
438}
439
440/// Implements the LRU.REMOVE command.
441fn remove_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
442    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
443    cache.remove(call.request.str_parameter(1)?);
444    call.response.ok()?;
445
446    Ok(())
447}
448
449/// Implements the LRU.REMOVES command.
450fn removes_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
451    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
452    cache.remove_by_secondary(call.request.str_parameter(1)?);
453    call.response.ok()?;
454
455    Ok(())
456}
457
458/// Implements the LRU.FLUSH command.
459fn flush_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
460    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
461    cache.flush();
462    call.response.ok()?;
463
464    Ok(())
465}
466
467/// Delegates the LRU.STATS command to the proper implementation based on its arguments.
468fn stats_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
469    if call.request.parameter_count() == 0 {
470        all_stats_command(call, caches)
471    } else {
472        cache_stats_command(call, caches)
473    }
474}
475
476/// Implements `LRU.STATS` command.
477fn all_stats_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
478    let mut result = String::new();
479
480    result += "Use 'LRU.STATS <cache>' for detailed metrics.\n\n";
481
482    result += format!(
483        "{:<30} {:>12} {:>20}\n",
484        "Name", "Num Entries", "Allocated Memory"
485    )
486    .as_str();
487    result += crate::response::SEPARATOR;
488
489    for (name, cache) in caches {
490        result += format!(
491            "{:<30} {:>12} {:>20}\n",
492            name,
493            cache.len(),
494            format_size(cache.allocated_memory())
495        )
496        .as_str();
497    }
498    result += crate::response::SEPARATOR;
499
500    call.response.bulk(result)?;
501
502    Ok(())
503}
504
505/// Implements the `LRU.STATS cache` command.
506fn cache_stats_command(
507    call: &mut Call,
508    caches: &mut HashMap<String, StringCache>,
509) -> CommandResult {
510    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
511
512    let mut result = String::new();
513
514    result += format!("{:<30} {:>20}\n", "Num Entries", cache.len()).as_str();
515    result += format!("{:<30} {:>20}\n", "Max Entries", cache.capacity()).as_str();
516    result += format!("{:<30} {:>18.2} %\n", "Utilization", cache.utilization()).as_str();
517    result += format!(
518        "{:<30} {:>20}\n",
519        "Allocated Memory",
520        format_size(cache.allocated_memory())
521    )
522    .as_str();
523    result += format!(
524        "{:<30} {:>20}\n",
525        "Max Memory",
526        format_size(cache.max_memory())
527    )
528    .as_str();
529    result += format!(
530        "{:<30} {:>18.2} %\n",
531        "Memory Utilization",
532        cache.memory_utilization()
533    )
534    .as_str();
535    result += format!(
536        "{:<30} {:>20}\n",
537        "Total Memory",
538        format_size(cache.total_allocated_memory())
539    )
540    .as_str();
541    result += format!("{:<30} {:>20}\n", "Reads", cache.reads()).as_str();
542    result += format!("{:<30} {:>20}\n", "Writes", cache.writes()).as_str();
543    result += format!("{:<30} {:>18.2} %\n", "Hit Rate", cache.hit_rate()).as_str();
544    result += format!(
545        "{:<30} {:>18.2} %\n",
546        "Write/Read Ratio",
547        cache.write_read_ratio()
548    )
549    .as_str();
550
551    call.response.bulk(result)?;
552
553    Ok(())
554}
555
556/// Handles LRU.KEYS.
557fn keys_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
558    let cache = get_cache(call.request.str_parameter(0)?, caches)?;
559    let keys: Vec<&String> = if call.request.parameter_count() > 1 {
560        let filter = call.request.str_parameter(1)?;
561        cache
562            .keys()
563            .filter(|key| key.contains(filter))
564            .take(100)
565            .collect()
566    } else {
567        cache.keys().take(100).collect()
568    };
569
570    call.response.array(keys.len() as i32)?;
571    for key in keys {
572        call.response.bulk(key)?;
573    }
574
575    Ok(())
576}
577
578#[cfg(test)]
579mod tests {
580    use crate::builder::Builder;
581    use crate::commands::{CommandDictionary, Dispatcher};
582    use crate::config::Config;
583    use crate::request::Request;
584    use mock_instant::thread_local::MockClock;
585    use std::time::Duration;
586
587    /// Tests if commands yield the expected responses.
588    ///
589    /// Especially this ensures, that XGET behaves as expected.
590    #[test]
591    fn test_commands() {
592        crate::testing::test_async(async {
593            let platform = Builder::new()
594                .enable_config()
595                .enable_commands()
596                .build()
597                .await;
598
599            // Define a test cache with known TTLs
600            platform
601                .require::<Config>()
602                .load_from_string(
603                    "caches:
604                              test:
605                                 size: 10000
606                                 max_memory: 16m
607                                 soft_ttl: 15m
608                                 hard_ttl: 30m
609                                 refresh_interval: 10s
610                          ",
611                    None,
612                )
613                .unwrap();
614
615            // Install a cache actor...
616            crate::lru::cache::install(platform.clone());
617
618            // PUT an value into the cache...
619            let mut dispatcher = platform.require::<CommandDictionary>().dispatcher();
620
621            // Test PUT, GET, KEYS and REMOVE...
622            perform_put_get_keys_remove(&mut dispatcher).await;
623
624            // Test PUTS, and REMOVES...
625            perform_puts_removes(&mut dispatcher).await;
626
627            // Test XGET...
628            perform_put_get_xget(&mut dispatcher).await;
629
630            // Test Flush
631            perform_put_flush(&mut dispatcher).await;
632
633            // Test STATS
634            perform_stats(dispatcher).await;
635        });
636    }
637
638    async fn perform_stats(mut dispatcher: Dispatcher) {
639        // Now let's invoke LRU.STATS - being a diagnostic command, we do not test
640        // the actual result, but at least ensure a positive response...
641        let result = dispatcher
642            .invoke(Request::example(vec!["LRU.STATS"]), None)
643            .await
644            .unwrap();
645        assert_eq!(std::str::from_utf8(&result[0..1]).unwrap(), "$");
646        let result = dispatcher
647            .invoke(Request::example(vec!["LRU.STATS", "test"]), None)
648            .await
649            .unwrap();
650        assert_eq!(std::str::from_utf8(&result[0..1]).unwrap(), "$");
651    }
652
653    async fn perform_put_flush(dispatcher: &mut Dispatcher) {
654        let _ = dispatcher
655            .invoke(
656                Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
657                None,
658            )
659            .await
660            .unwrap();
661
662        // FLUSH it...
663        let result = dispatcher
664            .invoke(Request::example(vec!["LRU.FLUSH", "test"]), None)
665            .await
666            .unwrap();
667        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
668
669        // ...and ensure it's gone.
670        let result = dispatcher
671            .invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
672            .await
673            .unwrap();
674        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
675    }
676
677    async fn perform_put_get_xget(dispatcher: &mut Dispatcher) {
678        let result = dispatcher
679            .invoke(
680                Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
681                None,
682            )
683            .await
684            .unwrap();
685        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
686
687        // Await longer than soft_ttl...
688        MockClock::advance(Duration::from_secs(16 * 60));
689
690        // ...therefore ensure that GET will no longer return the value.
691        let result = dispatcher
692            .invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
693            .await
694            .unwrap();
695        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
696
697        // but XGET will and also ask for a refresh...
698        let result = dispatcher
699            .invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
700            .await
701            .unwrap();
702        assert_eq!(
703            std::str::from_utf8(&result[..]).unwrap(),
704            "*3\r\n:0\r\n:1\r\n$3\r\nbar\r\n"
705        );
706
707        // after that, XGET will still return the value but no longer ask for a refresh...
708        let result = dispatcher
709            .invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
710            .await
711            .unwrap();
712        assert_eq!(
713            std::str::from_utf8(&result[..]).unwrap(),
714            "*3\r\n:1\r\n:0\r\n$3\r\nbar\r\n"
715        );
716
717        // one the refresh period has passed, XGET will once again ask us to refresh the stale
718        // value...
719        MockClock::advance(Duration::from_secs(12));
720        let result = dispatcher
721            .invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
722            .await
723            .unwrap();
724        assert_eq!(
725            std::str::from_utf8(&result[..]).unwrap(),
726            "*3\r\n:0\r\n:1\r\n$3\r\nbar\r\n"
727        );
728
729        // After waiting for hard_ttl to be elapsed, even XGET will no longer return the value..
730        MockClock::advance(Duration::from_secs(16 * 60));
731        let result = dispatcher
732            .invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
733            .await
734            .unwrap();
735        assert_eq!(
736            std::str::from_utf8(&result[..]).unwrap(),
737            "*3\r\n:0\r\n:0\r\n+\r\n"
738        );
739    }
740
741    async fn perform_put_get_keys_remove(dispatcher: &mut Dispatcher) {
742        let result = dispatcher
743            .invoke(
744                Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
745                None,
746            )
747            .await
748            .unwrap();
749        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
750
751        // ...and ensure we can read it back
752        let result = dispatcher
753            .invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
754            .await
755            .unwrap();
756        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "$3\r\nbar\r\n");
757
758        // ...and ensure we see the key without filtering...
759        let result = dispatcher
760            .invoke(Request::example(vec!["LRU.KEYS", "test"]), None)
761            .await
762            .unwrap();
763        assert_eq!(
764            std::str::from_utf8(&result[..]).unwrap(),
765            "*1\r\n$3\r\nfoo\r\n"
766        );
767        // ...and with filtering...
768        let result = dispatcher
769            .invoke(Request::example(vec!["LRU.KEYS", "test", "fo"]), None)
770            .await
771            .unwrap();
772        assert_eq!(
773            std::str::from_utf8(&result[..]).unwrap(),
774            "*1\r\n$3\r\nfoo\r\n"
775        );
776        // ..and ensure that an "invalid" filter won't match our key.
777        let result = dispatcher
778            .invoke(Request::example(vec!["LRU.KEYS", "test", "xx"]), None)
779            .await
780            .unwrap();
781        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "*0\r\n");
782
783        // REMOVE the value...
784        let result = dispatcher
785            .invoke(Request::example(vec!["LRU.REMOVE", "test", "foo"]), None)
786            .await
787            .unwrap();
788        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
789
790        // ...and ensure it's gone.
791        let result = dispatcher
792            .invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
793            .await
794            .unwrap();
795        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
796    }
797
798    async fn perform_puts_removes(dispatcher: &mut Dispatcher) {
799        let _ = dispatcher
800            .invoke(
801                Request::example(vec!["LRU.PUTS", "test", "foo", "bar", "A"]),
802                None,
803            )
804            .await
805            .unwrap();
806        let _ = dispatcher
807            .invoke(
808                Request::example(vec!["LRU.PUTS", "test", "foo1", "bar1", "A", "B"]),
809                None,
810            )
811            .await
812            .unwrap();
813        let _ = dispatcher
814            .invoke(
815                Request::example(vec!["LRU.PUTS", "test", "foo2", "bar2", "B"]),
816                None,
817            )
818            .await
819            .unwrap();
820
821        // Flush the first two entries by removing secondary key "A"...
822        let _ = dispatcher
823            .invoke(Request::example(vec!["LRU.REMOVES", "test", "A"]), None)
824            .await
825            .unwrap();
826
827        // Ensure that the proper keys are gone and only foo2 survives...
828        let result = dispatcher
829            .invoke(Request::example(vec!["LRU.KEYS", "test"]), None)
830            .await
831            .unwrap();
832        assert_eq!(
833            std::str::from_utf8(&result[..]).unwrap(),
834            "*1\r\n$4\r\nfoo2\r\n"
835        );
836
837        // Flush by secondary key B...
838        let _ = dispatcher
839            .invoke(Request::example(vec!["LRU.REMOVES", "test", "B"]), None)
840            .await
841            .unwrap();
842
843        // ...and ensure the last entry is gone.
844        let result = dispatcher
845            .invoke(Request::example(vec!["LRU.GET", "test", "foo2"]), None)
846            .await
847            .unwrap();
848        assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
849    }
850}