use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::commands::{queue, Call, CommandResult};
use crate::commands::{CommandDictionary, ResultExt};
use crate::config::Config;
use crate::fmt::{format_duration, format_size, parse_duration, parse_size};
use crate::platform::Platform;
use crate::spawn;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use crate::lru::LruCache;
use yaml_rust::yaml::Hash;
use yaml_rust::Yaml;
#[derive(FromPrimitive)]
enum Commands {
Put,
Puts,
Get,
ExtendedGet,
Remove,
Removes,
Flush,
Stats,
Keys,
}
type StringCache = LruCache<String>;
pub fn install(platform: Arc<Platform>) {
let queue = actor(platform.clone());
let commands = platform.require::<CommandDictionary>();
commands.register_command("LRU.PUT", queue.clone(), Commands::Put as usize);
commands.register_command("LRU.PUTS", queue.clone(), Commands::Puts as usize);
commands.register_command("LRU.GET", queue.clone(), Commands::Get as usize);
commands.register_command("LRU.REMOVE", queue.clone(), Commands::Remove as usize);
commands.register_command("LRU.REMOVES", queue.clone(), Commands::Removes as usize);
commands.register_command("LRU.XGET", queue.clone(), Commands::ExtendedGet as usize);
commands.register_command("LRU.FLUSH", queue.clone(), Commands::Flush as usize);
commands.register_command("LRU.STATS", queue.clone(), Commands::Stats as usize);
commands.register_command("LRU.KEYS", queue, Commands::Keys as usize);
}
fn actor(platform: Arc<Platform>) -> crate::commands::Queue {
let (queue, mut endpoint) = queue();
spawn!(async move {
let config = platform.require::<Config>();
let mut config_changed = config.notifier();
let mut caches: HashMap<String, StringCache> = HashMap::new();
caches = update_config(caches, &config);
while platform.is_running() {
tokio::select! {
_ = config_changed.recv() => { caches = update_config(caches, &config) }
msg = endpoint.recv() => {
if let Some(mut call) = msg {
match Commands::from_usize(call.token) {
Some(Commands::Put) => put_command(&mut call, &mut caches).complete(call),
Some(Commands::Puts) => puts_command(&mut call, &mut caches).complete(call),
Some(Commands::Get) => get_command(&mut call, &mut caches).complete(call),
Some(Commands::ExtendedGet) => extended_get_command(&mut call, &mut caches).complete(call),
Some(Commands::Remove) => remove_command(&mut call, &mut caches).complete(call),
Some(Commands::Removes) => removes_command(&mut call, &mut caches).complete(call),
Some(Commands::Flush) => flush_command(&mut call, &mut caches).complete(call),
Some(Commands::Stats) => stats_command(&mut call, &mut caches).complete(call),
Some(Commands::Keys) => keys_command(&mut call, &mut caches).complete(call),
_ => call.handle_unknown_token(),
}
}
}
}
}
});
queue
}
fn update_config(
caches: HashMap<String, StringCache>,
config: &Arc<Config>,
) -> HashMap<String, StringCache> {
let handle = config.current();
if let Yaml::Hash(ref map) = handle.config()["caches"] {
parse_config(caches, map)
} else {
log::info!("Config does not contain a 'caches' object. Skipping config update.");
caches
}
}
fn parse_config(
mut caches: HashMap<String, StringCache>,
map: &Hash,
) -> HashMap<String, StringCache> {
let mut result = HashMap::new();
for (name, config) in map {
let name = name.as_str().unwrap_or("");
let current_cache = caches.remove(name);
if let Some(cache) = create_or_update(name, current_cache, config) {
let _ = result.insert(name.to_owned(), cache);
}
}
for name in caches.keys() {
log::info!("Dropping stale cache {}...", name);
}
result
}
fn create_or_update(
name: &str,
current_cache: Option<StringCache>,
config: &Yaml,
) -> Option<StringCache> {
let size = match config["size"].as_i64().filter(|value| *value > 0) {
None => {
log::error!(
"Not going to create or update {} as no cache size was given.",
name
);
return current_cache;
}
Some(n) => n,
} as usize;
let max_memory = match parse_size(config["max_memory"].as_str().unwrap_or("")) {
Err(error) => {
log::error!(
"Not going to create or update {}. Failed to parse 'max_memory': {}",
name,
error
);
return current_cache;
}
Ok(n) => n,
};
let soft_ttl = match parse_duration(config["soft_ttl"].as_str().unwrap_or("")) {
Ok(duration) => duration,
Err(error) => {
log::error!(
"Not going to create or update {}. Failed to parse 'soft_ttl': {}",
name,
error
);
return current_cache;
}
};
let hard_ttl = match parse_duration(config["hard_ttl"].as_str().unwrap_or("")) {
Ok(duration) => duration,
Err(error) => {
log::error!(
"Not going to create or update {}. Failed to parse 'hard_ttl': {}",
name,
error
);
return current_cache;
}
};
let refresh_interval = match parse_duration(config["refresh_interval"].as_str().unwrap_or("")) {
Ok(duration) => duration,
Err(error) => {
log::error!(
"Not going to create or update {}. Failed to parse 'refresh_interval': {}",
name,
error
);
return current_cache;
}
};
match current_cache {
Some(mut cache) => {
update_cache(
name,
&mut cache,
size,
max_memory,
soft_ttl,
hard_ttl,
refresh_interval,
);
Some(cache)
}
None => {
log::info!("Creating new cache {}...", name);
Some(LruCache::new(
size,
max_memory,
soft_ttl,
hard_ttl,
refresh_interval,
))
}
}
}
fn update_cache(
name: &str,
cache: &mut StringCache,
capacity: usize,
max_memory: usize,
soft_ttl: Duration,
hard_ttl: Duration,
refresh_interval: Duration,
) {
if cache.capacity() != capacity {
log::info!(
"Updating the size of {} from {} to {}.",
name,
cache.capacity(),
capacity
);
cache.set_capacity(capacity);
}
if cache.max_memory() != max_memory {
log::info!(
"Updating max_memory of {} from {} to {}.",
name,
format_size(cache.max_memory()),
format_size(max_memory)
);
cache.set_max_memory(max_memory);
}
if cache.soft_ttl() != soft_ttl {
log::info!(
"Updating soft_ttl of {} from {} to {}.",
name,
format_duration(cache.soft_ttl()),
format_duration(soft_ttl)
);
cache.set_soft_ttl(soft_ttl);
log::info!("Flushing {} due to changed TTL settings...", name);
cache.flush();
}
if cache.hard_ttl() != hard_ttl {
log::info!(
"Updating hard_ttl of {} from {} to {}.",
name,
format_duration(cache.hard_ttl()),
format_duration(hard_ttl)
);
cache.set_hard_ttl(hard_ttl);
log::info!("Flushing {} due to changed TTL settings...", name);
cache.flush();
}
if cache.refresh_interval() != refresh_interval {
log::info!(
"Updating refresh_interval of {} from {} to {}.",
name,
format_duration(cache.refresh_interval()),
format_duration(refresh_interval)
);
cache.set_refresh_interval(refresh_interval);
}
}
fn get_cache<'a>(
name: &str,
caches: &'a mut HashMap<String, StringCache>,
) -> anyhow::Result<&'a mut StringCache> {
match caches.get_mut(name) {
Some(cache) => Ok(cache),
None => Err(anyhow::anyhow!("Unknown cache: {}", name)),
}
}
fn put_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
cache.put(
call.request.str_parameter(1)?.to_owned(),
call.request.str_parameter(2)?.to_owned(),
)?;
call.response.ok()?;
Ok(())
}
fn puts_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
let mut secondary_keys = Vec::with_capacity(call.request.parameter_count() - 3);
for index in 3..call.request.parameter_count() {
secondary_keys.push(call.request.str_parameter(index)?.to_owned());
}
cache.put_with_secondaries(
call.request.str_parameter(1)?.to_owned(),
call.request.str_parameter(2)?.to_owned(),
Some(secondary_keys),
)?;
call.response.ok()?;
Ok(())
}
fn get_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
if let Some(value) = cache.get(call.request.str_parameter(1)?) {
call.response.bulk(value)?;
} else {
call.response.empty_string()?;
}
Ok(())
}
fn extended_get_command(
call: &mut Call,
caches: &mut HashMap<String, StringCache>,
) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
call.response.array(3)?;
if let Some((alive, refresh, value)) = cache.extended_get(call.request.str_parameter(1)?) {
call.response.boolean(alive)?;
call.response.boolean(refresh)?;
call.response.bulk(value)?;
} else {
call.response.boolean(false)?;
call.response.boolean(false)?;
call.response.empty_string()?;
}
Ok(())
}
fn remove_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
cache.remove(call.request.str_parameter(1)?);
call.response.ok()?;
Ok(())
}
fn removes_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
cache.remove_by_secondary(call.request.str_parameter(1)?);
call.response.ok()?;
Ok(())
}
fn flush_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
cache.flush();
call.response.ok()?;
Ok(())
}
fn stats_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
if call.request.parameter_count() == 0 {
all_stats_command(call, caches)
} else {
cache_stats_command(call, caches)
}
}
fn all_stats_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let mut result = String::new();
result += "Use 'LRU.STATS <cache>' for detailed metrics.\n\n";
result += format!(
"{:<30} {:>12} {:>20}\n",
"Name", "Num Entries", "Allocated Memory"
)
.as_str();
result += crate::response::SEPARATOR;
for (name, cache) in caches {
result += format!(
"{:<30} {:>12} {:>20}\n",
name,
cache.len(),
format_size(cache.allocated_memory())
)
.as_str();
}
result += crate::response::SEPARATOR;
call.response.bulk(result)?;
Ok(())
}
fn cache_stats_command(
call: &mut Call,
caches: &mut HashMap<String, StringCache>,
) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
let mut result = String::new();
result += format!("{:<30} {:>20}\n", "Num Entries", cache.len()).as_str();
result += format!("{:<30} {:>20}\n", "Max Entries", cache.capacity()).as_str();
result += format!("{:<30} {:>18.2} %\n", "Utilization", cache.utilization()).as_str();
result += format!(
"{:<30} {:>20}\n",
"Allocated Memory",
format_size(cache.allocated_memory())
)
.as_str();
result += format!(
"{:<30} {:>20}\n",
"Max Memory",
format_size(cache.max_memory())
)
.as_str();
result += format!(
"{:<30} {:>18.2} %\n",
"Memory Utilization",
cache.memory_utilization()
)
.as_str();
result += format!(
"{:<30} {:>20}\n",
"Total Memory",
format_size(cache.total_allocated_memory())
)
.as_str();
result += format!("{:<30} {:>20}\n", "Reads", cache.reads()).as_str();
result += format!("{:<30} {:>20}\n", "Writes", cache.writes()).as_str();
result += format!("{:<30} {:>18.2} %\n", "Hit Rate", cache.hit_rate()).as_str();
result += format!(
"{:<30} {:>18.2} %\n",
"Write/Read Ratio",
cache.write_read_ratio()
)
.as_str();
call.response.bulk(result)?;
Ok(())
}
fn keys_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
let keys: Vec<&String> = if call.request.parameter_count() > 1 {
let filter = call.request.str_parameter(1)?;
cache
.keys()
.filter(|key| key.contains(filter))
.take(100)
.collect()
} else {
cache.keys().take(100).collect()
};
call.response.array(keys.len() as i32)?;
for key in keys {
call.response.bulk(key)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use crate::builder::Builder;
use crate::commands::{CommandDictionary, Dispatcher};
use crate::config::Config;
use crate::request::Request;
use mock_instant::thread_local::MockClock;
use std::time::Duration;
#[test]
fn test_commands() {
crate::testing::test_async(async {
let platform = Builder::new()
.enable_config()
.enable_commands()
.build()
.await;
platform
.require::<Config>()
.load_from_string(
"caches:
test:
size: 10000
max_memory: 16m
soft_ttl: 15m
hard_ttl: 30m
refresh_interval: 10s
",
None,
)
.unwrap();
crate::lru::cache::install(platform.clone());
let mut dispatcher = platform.require::<CommandDictionary>().dispatcher();
perform_put_get_keys_remove(&mut dispatcher).await;
perform_puts_removes(&mut dispatcher).await;
perform_put_get_xget(&mut dispatcher).await;
perform_put_flush(&mut dispatcher).await;
perform_stats(dispatcher).await;
});
}
async fn perform_stats(mut dispatcher: Dispatcher) {
let result = dispatcher
.invoke(Request::example(vec!["LRU.STATS"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[0..1]).unwrap(), "$");
let result = dispatcher
.invoke(Request::example(vec!["LRU.STATS", "test"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[0..1]).unwrap(), "$");
}
async fn perform_put_flush(dispatcher: &mut Dispatcher) {
let _ = dispatcher
.invoke(
Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
None,
)
.await
.unwrap();
let result = dispatcher
.invoke(Request::example(vec!["LRU.FLUSH", "test"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
}
async fn perform_put_get_xget(dispatcher: &mut Dispatcher) {
let result = dispatcher
.invoke(
Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
None,
)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
MockClock::advance(Duration::from_secs(16 * 60));
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*3\r\n:0\r\n:1\r\n$3\r\nbar\r\n"
);
let result = dispatcher
.invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*3\r\n:1\r\n:0\r\n$3\r\nbar\r\n"
);
MockClock::advance(Duration::from_secs(12));
let result = dispatcher
.invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*3\r\n:0\r\n:1\r\n$3\r\nbar\r\n"
);
MockClock::advance(Duration::from_secs(16 * 60));
let result = dispatcher
.invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*3\r\n:0\r\n:0\r\n+\r\n"
);
}
async fn perform_put_get_keys_remove(dispatcher: &mut Dispatcher) {
let result = dispatcher
.invoke(
Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
None,
)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "$3\r\nbar\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.KEYS", "test"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*1\r\n$3\r\nfoo\r\n"
);
let result = dispatcher
.invoke(Request::example(vec!["LRU.KEYS", "test", "fo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*1\r\n$3\r\nfoo\r\n"
);
let result = dispatcher
.invoke(Request::example(vec!["LRU.KEYS", "test", "xx"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "*0\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.REMOVE", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
}
async fn perform_puts_removes(dispatcher: &mut Dispatcher) {
let _ = dispatcher
.invoke(
Request::example(vec!["LRU.PUTS", "test", "foo", "bar", "A"]),
None,
)
.await
.unwrap();
let _ = dispatcher
.invoke(
Request::example(vec!["LRU.PUTS", "test", "foo1", "bar1", "A", "B"]),
None,
)
.await
.unwrap();
let _ = dispatcher
.invoke(
Request::example(vec!["LRU.PUTS", "test", "foo2", "bar2", "B"]),
None,
)
.await
.unwrap();
let _ = dispatcher
.invoke(Request::example(vec!["LRU.REMOVES", "test", "A"]), None)
.await
.unwrap();
let result = dispatcher
.invoke(Request::example(vec!["LRU.KEYS", "test"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*1\r\n$4\r\nfoo2\r\n"
);
let _ = dispatcher
.invoke(Request::example(vec!["LRU.REMOVES", "test", "B"]), None)
.await
.unwrap();
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo2"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
}
}