use ferriskey::{
cmd, ConnectionAddr, ConnectionInfo, Value, ValkeyConnectionInfo,
};
use ferriskey::connection::ConnectionLike;
use ferriskey::cluster::compat::ClusterClientBuilder;
use ferriskey::cluster::ClusterConnection;
use ferriskey::pipeline::Pipeline;
fn connection_info() -> Option<ConnectionInfo> {
let host = std::env::var("VALKEY_HOST").ok()?;
let port: u16 = std::env::var("VALKEY_PORT").ok()?.parse().ok()?;
let tls = std::env::var("VALKEY_TLS").unwrap_or_default() == "true";
Some(ConnectionInfo {
addr: if tls {
ConnectionAddr::TcpTls {
host: host.clone(),
port,
insecure: true,
tls_params: None,
}
} else {
ConnectionAddr::Tcp(host, port)
},
valkey: ValkeyConnectionInfo::default(),
})
}
async fn get_conn() -> Option<ClusterConnection> {
let info = connection_info()?;
let mut builder = ClusterClientBuilder::new(vec![info]);
if std::env::var("VALKEY_TLS").unwrap_or_default() == "true" {
builder = builder.tls(ferriskey::cluster::compat::TlsMode::Insecure);
}
let client = builder.build().ok()?;
client.get_async_connection(None, None, None).await.ok()
}
macro_rules! require_cluster {
() => {
match get_conn().await {
Some(c) => c,
None => {
eprintln!("Skipping: VALKEY_HOST not set");
return;
}
}
};
}
#[tokio::test]
#[ignore] async fn test_basic_get_set() {
let mut conn = require_cluster!();
let _ = conn.req_packed_command(cmd("SET").arg("itest:basic").arg("hello")).await.unwrap();
let val = conn.req_packed_command(cmd("GET").arg("itest:basic")).await.unwrap();
assert_eq!(val, Value::BulkString(bytes::Bytes::from_static(b"hello")));
let _ = conn.req_packed_command(cmd("DEL").arg("itest:basic")).await;
}
#[tokio::test]
#[ignore]
async fn test_list_operations_rpop_lpush() {
let mut conn = require_cluster!();
let key = "itest:queue";
let _ = conn.req_packed_command(cmd("DEL").arg(key)).await;
let _ = conn.req_packed_command(cmd("LPUSH").arg(key).arg("task1")).await.unwrap();
let _ = conn.req_packed_command(cmd("LPUSH").arg(key).arg("task2")).await.unwrap();
let _ = conn.req_packed_command(cmd("LPUSH").arg(key).arg("task3")).await.unwrap();
let val = conn.req_packed_command(cmd("RPOP").arg(key)).await.unwrap();
assert_eq!(val, Value::BulkString(bytes::Bytes::from_static(b"task1")));
let val = conn.req_packed_command(cmd("RPOP").arg(key)).await.unwrap();
assert_eq!(val, Value::BulkString(bytes::Bytes::from_static(b"task2")));
let _ = conn.req_packed_command(cmd("DEL").arg(key)).await;
}
#[tokio::test]
#[ignore]
async fn test_hash_operations_hset_hdel() {
let mut conn = require_cluster!();
let key = "itest:results";
let _ = conn.req_packed_command(cmd("DEL").arg(key)).await;
let _ = conn.req_packed_command(cmd("HSET").arg(key).arg("task-123").arg("SUCCESS")).await.unwrap();
let _ = conn.req_packed_command(cmd("HSET").arg(key).arg("task-456").arg("FAILURE")).await.unwrap();
let val = conn.req_packed_command(cmd("HGET").arg(key).arg("task-123")).await.unwrap();
assert_eq!(val, Value::BulkString(bytes::Bytes::from_static(b"SUCCESS")));
let _ = conn.req_packed_command(cmd("HDEL").arg(key).arg("task-123")).await.unwrap();
let val = conn.req_packed_command(cmd("HGET").arg(key).arg("task-123")).await.unwrap();
assert_eq!(val, Value::Nil);
let _ = conn.req_packed_command(cmd("DEL").arg(key)).await;
}
#[tokio::test]
#[ignore]
async fn test_mget_multi_key() {
let mut conn = require_cluster!();
let _ = conn.req_packed_command(cmd("SET").arg("{itest}:k1").arg("v1")).await.unwrap();
let _ = conn.req_packed_command(cmd("SET").arg("{itest}:k2").arg("v2")).await.unwrap();
let _ = conn.req_packed_command(cmd("SET").arg("{itest}:k3").arg("v3")).await.unwrap();
let val = conn.req_packed_command(
cmd("MGET").arg("{itest}:k1").arg("{itest}:k2").arg("{itest}:k3")
).await.unwrap();
if let Value::Array(items) = val {
assert_eq!(items.len(), 3);
assert_eq!(items[0], Ok(Value::BulkString(bytes::Bytes::from_static(b"v1"))));
assert_eq!(items[1], Ok(Value::BulkString(bytes::Bytes::from_static(b"v2"))));
assert_eq!(items[2], Ok(Value::BulkString(bytes::Bytes::from_static(b"v3"))));
} else {
panic!("Expected Array, got {:?}", val);
}
let _ = conn.req_packed_command(cmd("DEL").arg("{itest}:k1").arg("{itest}:k2").arg("{itest}:k3")).await;
}
#[tokio::test]
#[ignore]
async fn test_pipeline_atomic() {
let mut conn = require_cluster!();
let key = "{itest:pipe}:counter";
let _ = conn.req_packed_command(cmd("SET").arg(key).arg("0")).await;
let mut pipe = Pipeline::new();
pipe.atomic();
let mut c1 = cmd("INCR"); c1.arg(key); pipe.add_command(c1);
let mut c2 = cmd("INCR"); c2.arg(key); pipe.add_command(c2);
let mut c3 = cmd("INCR"); c3.arg(key); pipe.add_command(c3);
let results: Vec<Value> = pipe.query_async(&mut conn).await.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0], Value::Int(1));
assert_eq!(results[1], Value::Int(2));
assert_eq!(results[2], Value::Int(3));
let _ = conn.req_packed_command(cmd("DEL").arg(key)).await;
}
#[tokio::test]
#[ignore]
async fn test_ping() {
let mut conn = require_cluster!();
let val = conn.req_packed_command(&cmd("PING")).await.unwrap();
match val {
Value::SimpleString(s) => assert_eq!(s, "PONG"),
Value::BulkString(b) => assert_eq!(&b[..], b"PONG"),
Value::Okay => {} other => panic!("Unexpected PING response: {:?}", other),
}
}
#[tokio::test]
#[ignore]
async fn test_expiry_and_ttl() {
let mut conn = require_cluster!();
let key = "itest:expiry";
let _ = conn.req_packed_command(cmd("SET").arg(key).arg("temp").arg("EX").arg("10")).await.unwrap();
let ttl = conn.req_packed_command(cmd("TTL").arg(key)).await.unwrap();
if let Value::Int(t) = ttl {
assert!(t > 0 && t <= 10, "TTL should be between 1-10, got {t}");
} else {
panic!("Expected Int, got {:?}", ttl);
}
let _ = conn.req_packed_command(cmd("DEL").arg(key)).await;
}
#[tokio::test]
#[ignore]
async fn test_cross_slot_pipeline_non_atomic() {
let mut conn = require_cluster!();
let _ = conn.req_packed_command(cmd("SET").arg("itest:xslot:a").arg("1")).await.unwrap();
let _ = conn.req_packed_command(cmd("SET").arg("itest:xslot:b").arg("2")).await.unwrap();
let mut pipe = Pipeline::new();
let mut c1 = cmd("GET"); c1.arg("itest:xslot:a"); pipe.add_command(c1);
let mut c2 = cmd("GET"); c2.arg("itest:xslot:b"); pipe.add_command(c2);
let results: Vec<Value> = pipe.query_async(&mut conn).await.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0], Value::BulkString(bytes::Bytes::from_static(b"1")));
assert_eq!(results[1], Value::BulkString(bytes::Bytes::from_static(b"2")));
let _ = conn.req_packed_command(cmd("DEL").arg("itest:xslot:a")).await;
let _ = conn.req_packed_command(cmd("DEL").arg("itest:xslot:b")).await;
}