#![cfg(all(feature = "aio", feature = "cache-aio"))]
use crate::support::*;
use futures_time::task::sleep;
use redis::CommandCacheConfig;
#[cfg(feature = "cluster-async")]
use redis::cluster_routing::*;
use redis::{AsyncCommands, RedisError, caching::CacheConfig};
#[cfg(feature = "json")]
use redis_test::server::Module;
#[cfg(feature = "json")]
use serde_json::json;
use std::collections::HashMap;
use std::time::Duration;
use test_macros::async_test;
mod support;
macro_rules! assert_hit {
($con:expr, $val:expr) => {
assert_eq!($con.get_cache_statistics().unwrap().hit, $val);
};
}
macro_rules! assert_miss {
($con:expr, $val:expr) => {
assert_eq!($con.get_cache_statistics().unwrap().miss, $val);
};
}
macro_rules! assert_invalidate {
($con:expr, $val:expr) => {
assert_eq!($con.get_cache_statistics().unwrap().invalidate, $val);
};
}
#[async_test]
async fn test_cache_basic(test_with_optin: bool) {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let cache_config = if test_with_optin {
CacheConfig::new().set_mode(redis::caching::CacheMode::OptIn)
} else {
CacheConfig::default()
};
let mut con = ctx
.async_connection_with_cache_config(cache_config)
.await
.unwrap();
let val: Option<String> = get_cmd("GET", test_with_optin)
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, None);
assert_hit!(&con, 0);
assert_miss!(&con, 1);
let val: Option<String> = get_cmd("GET", test_with_optin)
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, None);
assert_hit!(&con, 1);
assert_miss!(&con, 1);
let _: () = get_cmd("SET", test_with_optin)
.arg("key_1")
.arg("1")
.query_async(&mut con)
.await
.unwrap();
sleep(Duration::from_millis(50).into()).await; assert_hit!(&con, 1);
assert_miss!(&con, 1);
assert_invalidate!(&con, 1);
let val: String = get_cmd("GET", test_with_optin)
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, "1");
assert_hit!(&con, 1);
assert_miss!(&con, 2);
assert_invalidate!(&con, 1);
}
#[async_test]
async fn cache_mget() {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let mut con = ctx.async_connection_with_cache().await.unwrap();
let _: () = get_pipe(true)
.cmd("SET")
.arg("key_1")
.arg(41)
.ignore()
.cmd("SET")
.arg("key_3")
.arg(43)
.ignore()
.query_async(&mut con)
.await
.unwrap();
let res1: Vec<Option<String>> = redis::cmd("MGET")
.arg("key_1")
.arg("key_2")
.query_async(&mut con)
.await
.unwrap();
assert_hit!(&con, 0);
assert_miss!(&con, 2);
assert_eq!(res1, vec![Some("41".to_string()), None]);
let res2: Vec<Option<String>> = redis::cmd("MGET")
.arg("key_1")
.arg("key_3")
.arg("key_2")
.query_async(&mut con)
.await
.unwrap();
assert_hit!(&con, 2);
assert_miss!(&con, 3);
assert_eq!(
res2,
vec![Some("41".to_string()), Some("43".to_string()), None]
);
let _: Option<String> = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_hit!(&con, 3);
assert_miss!(&con, 3);
}
#[cfg(feature = "json")]
#[async_test]
async fn module_cache_json_get_mget() {
let ctx = TestContext::with_modules(&[Module::Json]);
if !ctx.protocol.supports_resp3() {
return;
}
let cache_config = CacheConfig::new().set_mode(redis::caching::CacheMode::OptIn);
let mut con = ctx
.async_connection_with_cache_config(cache_config)
.await
.unwrap();
let value_1 = serde_json::to_string(&json!({"value":41i64})).unwrap();
let value_3 = serde_json::to_string(&json!({"value":43i64})).unwrap();
let _: () = get_pipe(true)
.cmd("JSON.SET")
.arg("key_1")
.arg("$")
.arg(value_1.clone())
.ignore()
.cmd("JSON.SET")
.arg("key_3")
.arg("$")
.arg(value_3.clone())
.ignore()
.query_async(&mut con)
.await
.unwrap();
let res1: Vec<Option<String>> = get_cmd("JSON.MGET", true)
.arg(&["key_1", "key_2"])
.arg(".")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res1.len(), 2);
assert_eq!(res1, vec![Some(value_1.clone()), None]);
assert_hit!(&con, 0);
assert_miss!(&con, 2);
let res2: Vec<Option<String>> = get_cmd("JSON.MGET", true)
.arg("key_1")
.arg("key_3")
.arg("key_2")
.arg(".")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res2.len(), 3);
assert_eq!(res2, vec![Some(value_1.clone()), Some(value_3), None]);
assert_hit!(&con, 2);
assert_miss!(&con, 3);
let res3: Option<String> = get_cmd("JSON.GET", true)
.arg("key_1")
.arg(".")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res3, Some(value_1.clone()));
assert_hit!(&con, 3);
assert_miss!(&con, 3);
}
#[cfg(feature = "json")]
#[async_test]
async fn module_cache_json_get_mget_different_paths() {
let ctx = TestContext::with_modules(&[Module::Json]);
if !ctx.protocol.supports_resp3() {
return;
}
let cache_config = CacheConfig::new().set_mode(redis::caching::CacheMode::OptIn);
let mut con = ctx
.async_connection_with_cache_config(cache_config)
.await
.unwrap();
let value_1 = serde_json::to_string(&json!({"value":41i64, "id":1i8})).unwrap();
let value_3 = serde_json::to_string(&json!({"value":43i64, "id":3i8})).unwrap();
let _: () = get_pipe(true)
.cmd("JSON.SET")
.arg("key_1")
.arg("$")
.arg(value_1.clone())
.ignore()
.cmd("JSON.SET")
.arg("key_3")
.arg("$")
.arg(value_3.clone())
.ignore()
.query_async(&mut con)
.await
.unwrap();
let res1: Vec<Option<i64>> = get_cmd("JSON.MGET", true)
.arg(&["key_1", "key_2"])
.arg(".value")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res1.len(), 2);
assert_eq!(res1, vec![Some(41), None]);
assert_hit!(&con, 0);
assert_miss!(&con, 2);
let res2: Vec<Option<i64>> = get_cmd("JSON.MGET", true)
.arg(&["key_1", "key_2"])
.arg(".value")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res2.len(), 2);
assert_eq!(res2, vec![Some(41), None]);
assert_hit!(&con, 2);
assert_miss!(&con, 2);
let res3: Vec<Option<i64>> = get_cmd("JSON.MGET", true)
.arg(&["key_1", "key_3"])
.arg(".id")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res3.len(), 2);
assert_eq!(res3, vec![Some(1), Some(3)]);
assert_hit!(&con, 2);
assert_miss!(&con, 4);
let res4: Vec<Option<i64>> = get_cmd("JSON.MGET", true)
.arg(&["key_2", "key_3"])
.arg(".id")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res4.len(), 2);
assert_eq!(res4, vec![None, Some(3)]);
assert_hit!(&con, 3);
assert_miss!(&con, 5);
let res5: Option<i64> = get_cmd("JSON.GET", true)
.arg("key_1")
.arg(".value")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res5, Some(41));
assert_hit!(&con, 4);
assert_miss!(&con, 5);
let res6: Option<i64> = get_cmd("JSON.GET", true)
.arg("key_1")
.arg(".value")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res6, Some(41));
assert_hit!(&con, 5);
assert_miss!(&con, 5);
let res7: Option<i8> = get_cmd("JSON.GET", true)
.arg("key_1")
.arg(".id")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(res7, Some(1));
assert_hit!(&con, 6);
assert_miss!(&con, 5);
}
#[async_test]
async fn cache_is_not_target_type_dependent() {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let mut con = ctx.async_connection_with_cache().await.unwrap();
let _: () = con.set("KEY", "77").await.unwrap();
let x: u32 = con.get("KEY").await.unwrap();
assert_eq!(x, 77);
let x: String = con.get("KEY").await.unwrap();
assert_eq!(x, "77");
let x: u8 = con.get("KEY").await.unwrap();
assert_eq!(x, 77);
assert_hit!(&con, 2);
assert_miss!(&con, 1);
}
#[async_test]
async fn test_cache_with_pipeline(atomic: bool) {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let mut con = ctx.async_connection_with_cache().await.unwrap();
let (mget_k1_k2,): ((i32, i32),) = get_pipe(atomic)
.cmd("SET")
.arg("key_1")
.arg(41)
.ignore()
.cmd("SET")
.arg("key_2")
.arg(42)
.ignore()
.cmd("MGET")
.arg(&["key_1", "key_2"])
.query_async(&mut con)
.await
.unwrap();
assert_eq!(mget_k1_k2, (41, 42));
assert_hit!(&con, 0);
assert_miss!(&con, 2);
let (k1, mget_k1_k2, k_unknown): (i32, (i32, i32), Option<i32>) = get_pipe(atomic)
.cmd("GET")
.arg("key_1")
.cmd("MGET")
.arg(&["key_1", "key_2"])
.cmd("GET")
.arg("key_doesnt_exists")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(k1, 41);
assert_eq!(mget_k1_k2, (41, 42));
assert_eq!(k_unknown, Option::None);
assert_hit!(&con, 3);
assert_miss!(&con, 3);
}
#[async_test]
async fn cache_basic_partial_opt_in() {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let cache_config = CacheConfig::new().set_mode(redis::caching::CacheMode::OptIn);
let mut con = ctx
.async_connection_with_cache_config(cache_config)
.await
.unwrap();
let val: Option<String> = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, None);
assert_hit!(&con, 0);
assert_miss!(&con, 0);
let _: () = redis::cmd("SET")
.arg("key_1")
.arg("1")
.query_async(&mut con)
.await
.unwrap();
assert_hit!(&con, 0);
assert_miss!(&con, 0);
assert_invalidate!(&con, 0);
let val: String = redis::cmd("GET")
.arg("key_1")
.set_cache_config(CommandCacheConfig::new().set_enable_cache(true))
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, "1");
assert_hit!(&con, 0);
assert_miss!(&con, 1);
let val: String = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, "1");
assert_hit!(&con, 0);
assert_miss!(&con, 1);
let val: String = redis::cmd("GET")
.arg("key_1")
.set_cache_config(CommandCacheConfig::new().set_enable_cache(true))
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, "1");
assert_hit!(&con, 1);
assert_miss!(&con, 1);
assert_invalidate!(&con, 0);
}
#[async_test]
async fn test_cache_pipeline_partial_opt_in(atomic: bool) {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let cache_config = CacheConfig::new().set_mode(redis::caching::CacheMode::OptIn);
let mut con = ctx
.async_connection_with_cache_config(cache_config)
.await
.unwrap();
let (mget_k1_k2,): ((i32, i32),) = get_pipe(atomic)
.cmd("SET")
.arg("key_1")
.arg(42)
.ignore()
.cmd("SET")
.arg("key_2")
.arg(43)
.ignore()
.cmd("MGET")
.arg(&["key_1", "key_2"])
.query_async(&mut con)
.await
.unwrap();
assert_eq!(mget_k1_k2, (42, 43));
assert_hit!(&con, 0);
assert_miss!(&con, 0);
for _ in 0..2 {
let (mget_k1_k2, k1, k_unknown): ((i32, i32), i32, Option<i32>) = get_pipe(atomic)
.cmd("MGET")
.set_cache_config(CommandCacheConfig::new().set_enable_cache(true))
.arg(&["key_1", "key_2"])
.cmd("GET")
.arg("key_1")
.cmd("GET")
.arg("key_doesnt_exists")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(mget_k1_k2, (42, 43));
assert_eq!(k1, 42);
assert_eq!(k_unknown, Option::None);
}
assert_hit!(&con, 2);
assert_miss!(&con, 2);
}
#[async_test]
async fn test_cache_different_commands(test_with_opt_in: bool) {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let cache_config = if test_with_opt_in {
CacheConfig::new().set_mode(redis::caching::CacheMode::OptIn)
} else {
CacheConfig::default()
};
let mut con = ctx
.async_connection_with_cache_config(cache_config)
.await
.unwrap();
let _: () = get_cmd("HSET", test_with_opt_in)
.arg("user")
.arg("health")
.arg("100")
.query_async(&mut con)
.await
.unwrap();
let val: usize = get_cmd("HGET", test_with_opt_in)
.arg("user")
.arg("health")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, 100);
assert_hit!(&con, 0);
assert_miss!(&con, 1);
let val: Option<usize> = get_cmd("HGET", test_with_opt_in)
.arg("user")
.arg("non_existent_key")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, None);
assert_hit!(&con, 0);
assert_miss!(&con, 2);
let val: HashMap<String, usize> = get_cmd("HGETALL", test_with_opt_in)
.arg("user")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val.get("health"), Some(100).as_ref());
assert_hit!(&con, 0);
assert_miss!(&con, 3);
let val: HashMap<String, usize> = get_cmd("HGETALL", test_with_opt_in)
.arg("user")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val.get("health"), Some(100).as_ref());
assert_hit!(&con, 1);
assert_miss!(&con, 3);
}
#[cfg(feature = "connection-manager")]
#[async_test]
async fn test_connection_manager_maintains_statistics_after_crashes(test_with_optin: bool) {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let cache_config = if test_with_optin {
CacheConfig::new().set_mode(redis::caching::CacheMode::OptIn)
} else {
CacheConfig::default()
};
let config = redis::aio::ConnectionManagerConfig::new()
.set_max_delay(Duration::from_millis(1))
.set_cache_config(cache_config);
let mut get = get_cmd("GET", test_with_optin);
get.arg("key_1");
let mut manager = ctx
.client
.get_connection_manager_with_config(config)
.await
.unwrap();
let val: Option<String> = get.query_async(&mut manager).await.unwrap();
assert_eq!(val, None);
assert_hit!(&manager, 0);
assert_miss!(&manager, 1);
let val: Option<String> = get.query_async(&mut manager).await.unwrap();
assert_eq!(val, None);
assert_hit!(&manager, 1);
assert_miss!(&manager, 1);
let addr = ctx.server.client_addr().clone();
drop(ctx);
let result: Result<redis::Value, RedisError> =
manager.send_packed_command(&redis::cmd("PING")).await;
assert!(result.unwrap_err().is_unrecoverable_error());
let _server = redis_test::server::RedisServer::new_with_addr_and_modules(addr, &[], false);
loop {
if manager.send_packed_command(&get).await.is_ok() {
break;
}
}
assert_eq!(val, None);
assert_hit!(&manager, 1);
assert_miss!(&manager, 2);
}
#[cfg(feature = "cluster-async")]
#[async_test]
async fn cache_async_cluster_reconnect_all_nodes() {
let ctx = TestClusterContext::new_with_cluster_client_builder(|builder| {
builder.cache_config(CacheConfig::default())
});
if !ctx.protocol.supports_resp3() {
return;
}
let mut con = ctx.async_connection().await;
let val: Option<String> = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, None);
assert_hit!(&con, 0);
assert_miss!(&con, 1);
let val: Option<String> = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, None);
assert_hit!(&con, 1);
assert_miss!(&con, 1);
let _ = con
.route_command(
redis::cmd("QUIT"),
redis::cluster_routing::RoutingInfo::MultiNode((
redis::cluster_routing::MultipleNodeRoutingInfo::AllNodes,
None,
)),
)
.await
.unwrap();
let _ = con
.route_command(
redis::cmd("PING"),
redis::cluster_routing::RoutingInfo::MultiNode((
redis::cluster_routing::MultipleNodeRoutingInfo::AllNodes,
None,
)),
)
.await;
let val: Option<String> = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, None);
assert_hit!(&con, 1);
assert_miss!(&con, 2);
assert_invalidate!(&con, 1);
}
#[cfg(feature = "cluster-async")]
#[async_test]
async fn cache_async_cluster_mget() {
let ctx = TestClusterContext::new_with_cluster_client_builder(|builder| {
builder.cache_config(CacheConfig::default())
});
if !ctx.protocol.supports_resp3() {
return;
}
let mut con = ctx.async_connection().await;
let _: redis::Value = con.set("key_1", 41).await.unwrap();
let _: redis::Value = con.set("key_3", 43).await.unwrap();
let res1: Vec<Option<String>> = redis::cmd("MGET")
.arg("key_1")
.arg("key_2")
.query_async(&mut con)
.await
.unwrap();
assert_hit!(&con, 0);
assert_miss!(&con, 2);
assert_eq!(res1, vec![Some("41".to_string()), None]);
let res2: Vec<Option<String>> = redis::cmd("MGET")
.arg("key_1")
.arg("key_3")
.arg("key_2")
.query_async(&mut con)
.await
.unwrap();
assert_hit!(&con, 2);
assert_miss!(&con, 3);
assert_eq!(
res2,
vec![Some("41".to_string()), Some("43".to_string()), None]
);
let _: Option<String> = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_hit!(&con, 3);
assert_miss!(&con, 3);
}
fn get_pipe(atomic: bool) -> redis::Pipeline {
if atomic {
let mut pipe = redis::pipe();
pipe.atomic();
pipe
} else {
redis::pipe()
}
}
#[cfg(feature = "cluster-async")]
#[async_test]
async fn test_cache_async_cluster_slot_change(migrate: bool) {
let ctx = TestClusterContext::new_with_cluster_client_builder(|builder| {
builder.cache_config(CacheConfig::default())
});
if !ctx.protocol.supports_resp3() {
return;
}
if !migrate {
run_test_if_version_supported!(&REDIS_VERSION_CE_7_2);
}
struct NodeData {
id: String,
host: String,
port: u16,
range: std::ops::Range<u16>,
}
impl NodeData {
fn get_singe_route(&self) -> RoutingInfo {
RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
host: self.host.clone(),
port: self.port,
})
}
}
let key_slot = 11998;
let mut con = ctx.async_connection().await;
let _: redis::Value = redis::cmd("SET")
.arg("key_1")
.arg(77)
.query_async(&mut con)
.await
.unwrap();
let val: usize = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, 77);
assert_hit!(&con, 0);
assert_miss!(&con, 1);
assert_invalidate!(&con, 0);
let val: usize = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, 77);
assert_hit!(&con, 1);
assert_miss!(&con, 1);
let nodes_str: String = redis::cmd("CLUSTER")
.arg("NODES")
.query_async(&mut con)
.await
.unwrap();
let node_slot_data: Vec<_> = nodes_str
.split("\n")
.filter(|node_str| !node_str.is_empty())
.map(|node_str| {
let node_sep: Vec<&str> = node_str.split(" ").collect();
let node_id = node_sep[0];
let (node_addr, _cport) = node_sep[1].split_once("@").unwrap();
let (node_host, node_port) = node_addr.split_once(":").unwrap();
let (slot_start, slot_end) = node_sep[8].split_once("-").unwrap();
let range = std::ops::Range {
start: slot_start.parse().unwrap(),
end: slot_end.parse::<u16>().unwrap() + 1,
};
NodeData {
id: node_id.to_string(),
host: node_host.to_string(),
port: node_port.parse::<u16>().unwrap(),
range,
}
})
.collect();
let old_slot_owner = node_slot_data
.iter()
.find(|node_data| node_data.range.contains(&key_slot))
.unwrap();
let new_slot_owner = node_slot_data
.iter()
.find(|node_data| !node_data.range.contains(&key_slot))
.unwrap();
con.route_command(
redis::cmd("CLUSTER")
.arg("SETSLOT")
.arg(key_slot)
.arg("IMPORTING")
.arg(&old_slot_owner.id)
.to_owned(),
new_slot_owner.get_singe_route(),
)
.await
.unwrap();
con.route_command(
redis::cmd("CLUSTER")
.arg("SETSLOT")
.arg(key_slot)
.arg("MIGRATING")
.arg(&new_slot_owner.id)
.to_owned(),
old_slot_owner.get_singe_route(),
)
.await
.unwrap();
if migrate {
con.route_command(
redis::cmd("MIGRATE")
.arg(&new_slot_owner.host)
.arg(new_slot_owner.port)
.arg("key_1")
.arg(0)
.arg(5000)
.to_owned(),
old_slot_owner.get_singe_route(),
)
.await
.unwrap();
sleep(Duration::from_millis(50).into()).await;
assert_invalidate!(&con, 1);
} else {
assert_invalidate!(&con, 0);
}
con.route_command(
redis::cmd("CLUSTER")
.arg("SETSLOT")
.arg(key_slot)
.arg("NODE")
.arg(&new_slot_owner.id)
.take(),
new_slot_owner.get_singe_route(),
)
.await
.unwrap();
con.route_command(
redis::cmd("CLUSTER")
.arg("SETSLOT")
.arg(key_slot)
.arg("NODE")
.arg(&new_slot_owner.id)
.take(),
old_slot_owner.get_singe_route(),
)
.await
.unwrap();
assert_miss!(&con, 1);
let val: Option<usize> = redis::cmd("GET")
.arg("key_1")
.query_async(&mut con)
.await
.unwrap();
if migrate {
assert_eq!(val, Some(77))
} else {
assert_eq!(val, None);
};
assert_hit!(&con, 1);
assert_miss!(&con, 3);
assert_invalidate!(&con, 1);
}
fn get_cmd(name: &str, enable_opt_in: bool) -> redis::Cmd {
let mut cmd = redis::cmd(name);
if enable_opt_in {
cmd.set_cache_config(CommandCacheConfig::new().set_enable_cache(true));
}
cmd
}
#[async_test]
async fn test_readonly_commands_with_patterns_are_not_cached() {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let mut con = ctx
.async_connection_with_cache_config(CacheConfig::new())
.await
.unwrap();
let cmd = redis::cmd("KEYS").arg("foo*").take();
let _: () = cmd.query_async(&mut con).await.unwrap();
let _: () = cmd.query_async(&mut con).await.unwrap();
assert_hit!(&con, 0);
assert_miss!(&con, 0);
}
#[async_test]
async fn test_bitcount_is_handled_correctly() {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let mut con = ctx
.async_connection_with_cache_config(CacheConfig::new())
.await
.unwrap();
() = redis::cmd("set")
.arg("foo")
.arg("bar")
.query_async(&mut con)
.await
.unwrap();
let val: u32 = redis::cmd("BITCOUNT")
.arg("foo")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, 10);
let val: u32 = redis::cmd("BITCOUNT")
.arg("foo")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, 10);
assert_hit!(&con, 1);
assert_miss!(&con, 1);
assert_invalidate!(&con, 0);
() = redis::cmd("set")
.arg("foo")
.arg("brrrrR")
.query_async(&mut con)
.await
.unwrap();
sleep(Duration::from_millis(10).into()).await;
let val: u32 = redis::cmd("BITCOUNT")
.arg("foo")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(val, 22);
assert_hit!(&con, 1);
assert_miss!(&con, 2);
assert_invalidate!(&con, 1);
}
#[async_test]
async fn test_that_a_pipeline_with_all_commands_cached_does_not_hang() {
let ctx = TestContext::new();
if !ctx.protocol.supports_resp3() {
return;
}
let mut con = ctx
.async_connection_with_cache_config(CacheConfig::new())
.await
.unwrap();
let mut pipeline = redis::pipe();
pipeline.get("foobar");
let _: Vec<Option<String>> = pipeline.query_async(&mut con).await.unwrap();
let _: Vec<Option<String>> = pipeline.query_async(&mut con).await.unwrap();
assert_hit!(&con, 1);
assert_miss!(&con, 1);
}