use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use crate::commands::{queue, Call, CommandResult};
use crate::commands::{CommandDictionary, ResultExt};
use crate::config::Config;
use crate::fmt::{format_duration, format_size, parse_duration, parse_size};
use crate::platform::Platform;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use crate::ig::docs::Element;
use crate::lru::LRUCache;
#[derive(FromPrimitive)]
enum Commands {
Put,
Get,
ExtendedGet,
Remove,
Flush,
Stats,
}
type StringCache = LRUCache<String>;
pub fn install(platform: Arc<Platform>) {
let queue = actor(platform.clone());
let commands = platform.require::<CommandDictionary>();
commands.register_command("LRU.PUT", queue.clone(), Commands::Put as usize);
commands.register_command("LRU.GET", queue.clone(), Commands::Get as usize);
commands.register_command("LRU.REMOVE", queue.clone(), Commands::Remove as usize);
commands.register_command("LRU.XGET", queue.clone(), Commands::ExtendedGet as usize);
commands.register_command("LRU.FLUSH", queue.clone(), Commands::Flush as usize);
commands.register_command("LRU.STATS", queue, Commands::Stats as usize);
}
fn actor(platform: Arc<Platform>) -> crate::commands::Queue {
let (queue, mut endpoint) = queue();
tokio::spawn(async move {
let config = platform.require::<Config>();
let mut config_changed = config.notifier();
let mut caches: HashMap<String, StringCache> = HashMap::new();
caches = update_config(caches, &config);
while platform.is_running() {
tokio::select! {
_ = config_changed.recv() => { caches = update_config(caches, &config) }
msg = endpoint.recv() => {
if let Some(mut call) = msg {
match Commands::from_usize(call.token) {
Some(Commands::Put) => put_command(&mut call, &mut caches).complete(call),
Some(Commands::Get) => get_command(&mut call, &mut caches).complete(call),
Some(Commands::ExtendedGet) => extended_get_command(&mut call, &mut caches).complete(call),
Some(Commands::Remove) => remove_command(&mut call, &mut caches).complete(call),
Some(Commands::Flush) => flush_command(&mut call, &mut caches).complete(call),
Some(Commands::Stats) => stats_command(&mut call, &mut caches).complete(call),
_ => ()
}
}
}
}
}
});
queue
}
fn update_config(
caches: HashMap<String, StringCache>,
config: &Arc<Config>,
) -> HashMap<String, StringCache> {
let handle = config.current();
let map = handle.config().root().query("caches");
if map.is_object() {
parse_config(caches, map)
} else {
log::info!("Config does not contain a 'caches' object. Skipping config update.");
caches
}
}
fn parse_config(
mut caches: HashMap<String, StringCache>,
map: Element,
) -> HashMap<String, StringCache> {
let mut result = HashMap::new();
for (name, config) in map.entries() {
let current_cache = caches.remove(name);
if let Some(cache) = create_or_update(name, current_cache, config) {
result.insert(name.to_owned(), cache);
}
}
for name in caches.keys() {
log::info!("Dropping stale cache {}...", name);
}
result
}
fn create_or_update(
name: &str,
current_cache: Option<StringCache>,
config: Element,
) -> Option<StringCache> {
let size = match config.query("size").as_int().filter(|value| *value > 0) {
None => {
log::error!(
"Not going to create or update {} as no cache size was given.",
name
);
return current_cache;
}
Some(n) => n,
} as usize;
let max_memory = match parse_size(config.query("max_memory").to_str()) {
Err(error) => {
log::error!(
"Not going to create or update {}. Failed to parse 'max_memory': {}",
name,
error
);
return current_cache;
}
Ok(n) => n,
};
let soft_ttl = match parse_duration(config.query("soft_ttl").to_str()) {
Ok(duration) => duration,
Err(error) => {
log::error!(
"Not going to create or update {}. Failed to parse 'soft_ttl': {}",
name,
error
);
return current_cache;
}
};
let hard_ttl = match parse_duration(config.query("hard_ttl").to_str()) {
Ok(duration) => duration,
Err(error) => {
log::error!(
"Not going to create or update {}. Failed to parse 'hard_ttl': {}",
name,
error
);
return current_cache;
}
};
let refresh_interval = match parse_duration(config.query("refresh_interval").to_str()) {
Ok(duration) => duration,
Err(error) => {
log::error!(
"Not going to create or update {}. Failed to parse 'refresh_interval': {}",
name,
error
);
return current_cache;
}
};
match current_cache {
Some(mut cache) => {
update_cache(
name,
&mut cache,
size,
max_memory,
soft_ttl,
hard_ttl,
refresh_interval,
);
Some(cache)
}
None => {
log::info!("Creating new cache {}...", name);
Some(LRUCache::new(
size,
max_memory,
soft_ttl,
hard_ttl,
refresh_interval,
))
}
}
}
fn update_cache(
name: &str,
cache: &mut StringCache,
capacity: usize,
max_memory: usize,
soft_ttl: Duration,
hard_ttl: Duration,
refresh_interval: Duration,
) {
if cache.capacity() != capacity {
log::info!(
"Updating the size of {} from {} to {}.",
name,
cache.capacity(),
capacity
);
cache.set_capacity(capacity);
}
if cache.max_memory() != max_memory {
log::info!(
"Updating max_memory of {} from {} to {}.",
name,
format_size(cache.max_memory()),
format_size(max_memory)
);
cache.set_max_memory(max_memory);
}
if cache.soft_ttl() != soft_ttl {
log::info!(
"Updating soft_ttl of {} from {} to {}.",
name,
format_duration(cache.soft_ttl()),
format_duration(soft_ttl)
);
cache.set_soft_ttl(soft_ttl);
log::info!("Flushing {} due to changed TTL settings...", name);
cache.flush();
}
if cache.hard_ttl() != hard_ttl {
log::info!(
"Updating hard_ttl of {} from {} to {}.",
name,
format_duration(cache.hard_ttl()),
format_duration(hard_ttl)
);
cache.set_hard_ttl(hard_ttl);
log::info!("Flushing {} due to changed TTL settings...", name);
cache.flush();
}
if cache.refresh_interval() != refresh_interval {
log::info!(
"Updating refresh_interval of {} from {} to {}.",
name,
format_duration(cache.refresh_interval()),
format_duration(refresh_interval)
);
cache.set_refresh_interval(refresh_interval);
}
}
fn get_cache<'a>(
name: &str,
caches: &'a mut HashMap<String, StringCache>,
) -> anyhow::Result<&'a mut StringCache> {
match caches.get_mut(name) {
Some(cache) => Ok(cache),
None => Err(anyhow::anyhow!("Unknown cache: {}", name)),
}
}
fn put_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
cache.put(
call.request.str_parameter(1)?.to_owned(),
call.request.str_parameter(2)?.to_owned(),
)?;
call.response.ok()?;
Ok(())
}
fn get_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
if let Some(value) = cache.get(call.request.str_parameter(1)?) {
call.response.bulk(value)?;
} else {
call.response.empty_string()?;
}
Ok(())
}
fn extended_get_command(
call: &mut Call,
caches: &mut HashMap<String, StringCache>,
) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
if let Some((alive, refresh, value)) = cache.extended_get(call.request.str_parameter(1)?) {
call.response.array(3)?;
call.response.boolean(alive)?;
call.response.boolean(refresh)?;
call.response.bulk(value)?;
} else {
call.response.array(3)?;
call.response.boolean(false)?;
call.response.boolean(false)?;
call.response.empty_string()?;
}
Ok(())
}
fn remove_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
cache.remove(call.request.str_parameter(1)?);
call.response.ok()?;
Ok(())
}
fn flush_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
cache.flush();
call.response.ok()?;
Ok(())
}
fn stats_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
if call.request.parameter_count() == 0 {
all_stats_command(call, caches)
} else {
cache_stats_command(call, caches)
}
}
fn all_stats_command(call: &mut Call, caches: &mut HashMap<String, StringCache>) -> CommandResult {
let mut result = String::new();
result += "Use 'LRU.STATS <cache>' for detailed metrics.\n\n";
result += format!(
"{:<30} {:>12} {:>20}\n",
"Name", "Num Entries", "Allocated Memory"
)
.as_str();
result += crate::response::SEPARATOR;
for (name, cache) in caches {
result += format!(
"{:<30} {:>12} {:>20}\n",
name,
cache.len(),
format_size(cache.allocated_memory())
)
.as_str();
}
result += crate::response::SEPARATOR;
call.response.bulk(result)?;
Ok(())
}
fn cache_stats_command(
call: &mut Call,
caches: &mut HashMap<String, StringCache>,
) -> CommandResult {
let cache = get_cache(call.request.str_parameter(0)?, caches)?;
let mut result = String::new();
result += format!("{:<30} {:>20}\n", "Num Entries", cache.len()).as_str();
result += format!("{:<30} {:>20}\n", "Max Entries", cache.capacity()).as_str();
result += format!("{:<30} {:>18.2} %\n", "Utilization", cache.utilization()).as_str();
result += format!(
"{:<30} {:>20}\n",
"Allocated Memory",
format_size(cache.allocated_memory())
)
.as_str();
result += format!(
"{:<30} {:>20}\n",
"Max Memory",
format_size(cache.max_memory())
)
.as_str();
result += format!(
"{:<30} {:>18.2} %\n",
"Memory Utilization",
cache.memory_utilization()
)
.as_str();
result += format!(
"{:<30} {:>20}\n",
"Total Memory",
format_size(cache.total_allocated_memory())
)
.as_str();
result += format!("{:<30} {:>20}\n", "Reads", cache.reads()).as_str();
result += format!("{:<30} {:>20}\n", "Writes", cache.writes()).as_str();
result += format!("{:<30} {:>18.2} %\n", "Hit Rate", cache.hit_rate()).as_str();
result += format!(
"{:<30} {:>18.2} %\n",
"Write/Read Ratio",
cache.write_read_ratio()
)
.as_str();
call.response.bulk(result)?;
Ok(())
}
#[cfg(test)]
mod tests {
use crate::builder::Builder;
use crate::commands::CommandDictionary;
use crate::config::Config;
use crate::request::Request;
use mock_instant::MockClock;
use std::time::Duration;
#[test]
fn test_commands() {
crate::testing::test_async(async {
let platform = Builder::new()
.enable_config()
.enable_commands()
.build()
.await;
platform
.require::<Config>()
.load_from_string(
"caches:
test:
size: 10000
max_memory: 16m
soft_ttl: 15m
hard_ttl: 30m
refresh_interval: 10s
",
None,
)
.unwrap();
crate::lru::cache::install(platform.clone());
let mut dispatcher = platform.require::<CommandDictionary>().dispatcher();
let result = dispatcher
.invoke(
Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
None,
)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "$3\r\nbar\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.REMOVE", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
let result = dispatcher
.invoke(
Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
None,
)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
MockClock::advance(Duration::from_secs(16 * 60));
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*3\r\n:0\r\n:1\r\n$3\r\nbar\r\n"
);
let result = dispatcher
.invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*3\r\n:0\r\n:0\r\n$3\r\nbar\r\n"
);
MockClock::advance(Duration::from_secs(12));
let result = dispatcher
.invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*3\r\n:0\r\n:1\r\n$3\r\nbar\r\n"
);
MockClock::advance(Duration::from_secs(16 * 60));
let result = dispatcher
.invoke(Request::example(vec!["LRU.XGET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(
std::str::from_utf8(&result[..]).unwrap(),
"*3\r\n:0\r\n:0\r\n+\r\n"
);
platform.require::<CommandDictionary>().dispatcher();
dispatcher
.invoke(
Request::example(vec!["LRU.PUT", "test", "foo", "bar"]),
None,
)
.await
.unwrap();
let result = dispatcher
.invoke(Request::example(vec!["LRU.FLUSH", "test"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+OK\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.GET", "test", "foo"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[..]).unwrap(), "+\r\n");
let result = dispatcher
.invoke(Request::example(vec!["LRU.STATS"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[0..1]).unwrap(), "$");
let result = dispatcher
.invoke(Request::example(vec!["LRU.STATS", "test"]), None)
.await
.unwrap();
assert_eq!(std::str::from_utf8(&result[0..1]).unwrap(), "$");
});
}
}