#![cfg(feature = "acl")]
use redis::TypedCommands;
use redis::acl::{AclInfo, Rule};
use std::collections::HashSet;
mod support;
use crate::support::*;
#[test]
fn test_acl_whoami() {
let ctx = TestContext::new();
let mut con = ctx.connection();
assert_eq!(con.acl_whoami(), Ok("default".to_owned()));
}
#[test]
fn test_acl_help() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let res = con.acl_help().expect("Got help manual");
assert!(!res.is_empty());
}
#[test]
#[ignore]
fn test_acl_getsetdel_users() {
let ctx = TestContext::new();
let mut con = ctx.connection();
assert_eq!(
con.acl_list(),
Ok(vec!["user default on nopass ~* +@all".to_owned()])
);
assert_eq!(con.acl_users(), Ok(vec!["default".to_owned()]));
assert_eq!(con.acl_setuser("bob"), Ok(()));
assert_eq!(
con.acl_users(),
Ok(vec!["bob".to_owned(), "default".to_owned()])
);
assert_eq!(
con.acl_setuser_rules(
"bob",
&[
Rule::On,
Rule::AddHashedPass(
"c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f2".to_owned()
),
Rule::Pattern("redis:*".to_owned()),
Rule::AddCommand("set".to_owned())
],
),
Ok(())
);
let acl_info = con.acl_getuser("bob").expect("Got user").unwrap();
assert_eq!(
acl_info,
AclInfo {
flags: vec![Rule::On],
passwords: vec![Rule::AddHashedPass(
"c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f2".to_owned()
)],
commands: vec![
Rule::RemoveCategory("all".to_owned()),
Rule::AddCommand("set".to_owned())
],
keys: vec![Rule::Pattern("redis:*".to_owned())],
channels: vec![],
selectors: vec![],
}
);
assert_eq!(
con.acl_list(),
Ok(vec![
"user bob on #c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f2 ~redis:* -@all +set".to_owned(),
"user default on nopass ~* +@all".to_owned(),
])
);
assert_eq!(con.acl_setuser("eve"), Ok(()));
assert_eq!(
con.acl_users(),
Ok(vec![
"bob".to_owned(),
"default".to_owned(),
"eve".to_owned()
])
);
assert_eq!(con.acl_deluser(&["bob", "eve"]), Ok(2));
assert_eq!(con.acl_users(), Ok(vec!["default".to_owned()]));
}
#[test]
fn test_acl_cat() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let res: HashSet<String> = con.acl_cat().expect("Got categories");
let expects = vec![
"keyspace",
"read",
"write",
"set",
"sortedset",
"list",
"hash",
"string",
"bitmap",
"hyperloglog",
"geo",
"stream",
"pubsub",
"admin",
"fast",
"slow",
"blocking",
"dangerous",
"connection",
"transaction",
"scripting",
];
for cat in expects.iter() {
assert!(res.contains(*cat), "Category `{cat}` does not exist");
}
let expects = ["pfmerge", "pfcount", "pfselftest", "pfadd"];
let res = con
.acl_cat_categoryname("hyperloglog")
.expect("Got commands of a category");
for cmd in expects.iter() {
assert!(res.contains(*cmd), "Command `{cmd}` does not exist");
}
}
#[test]
fn test_acl_genpass() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let pass: String = con.acl_genpass().expect("Got password");
assert_eq!(pass.len(), 64);
let pass: String = con.acl_genpass_bits(1024).expect("Got password");
assert_eq!(pass.len(), 256);
}
#[test]
fn test_acl_log() {
let ctx = TestContext::new();
let mut con = ctx.connection();
let logs: Vec<String> = con.acl_log(1).expect("Got logs");
assert_eq!(logs.len(), 0);
assert_eq!(con.acl_log_reset(), Ok(()));
}
#[test]
fn test_acl_dryrun() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_7_2);
let mut con = ctx.connection();
redis::cmd("ACL")
.arg("SETUSER")
.arg("VIRGINIA")
.arg("+SET")
.arg("~*")
.exec(&mut con)
.unwrap();
assert_eq!(
con.acl_dryrun(b"VIRGINIA", String::from("SET"), &["foo", "bar"])
.unwrap(),
"OK"
);
let res: String = con
.acl_dryrun(b"VIRGINIA", String::from("GET"), "foo")
.unwrap();
assert_eq!(
res,
"User VIRGINIA has no permissions to run the 'get' command"
);
}
#[test]
fn test_acl_info() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_7_2);
let mut conn = ctx.connection();
let username = "tenant";
let password = "securepassword123";
const DEFAULT_QUEUE_NAME: &str = "default";
let rules = vec![
Rule::On,
Rule::ResetChannels,
Rule::AllCommands,
Rule::RemoveCategory("dangerous".to_string()),
Rule::AddCommand("keys".to_string()),
Rule::RemoveCommand("info".to_string()),
Rule::RemoveCommand("select".to_string()),
Rule::AddPass(password.to_string()),
Rule::Pattern(format!("asynq:{{{}}}:*", DEFAULT_QUEUE_NAME)),
Rule::Pattern(format!("asynq:{{{}:*", username)),
Rule::Pattern("asynq:queues".to_string()),
Rule::Pattern("asynq:servers:*".to_string()),
Rule::Pattern("asynq:servers".to_string()),
Rule::Pattern("asynq:workers".to_string()),
Rule::Pattern("asynq:workers:*".to_string()),
Rule::Pattern("asynq:schedulers".to_string()),
Rule::Pattern("asynq:schedulers:*".to_string()),
Rule::Channel("asynq:cancel".to_string()),
];
assert_eq!(conn.acl_setuser_rules(username, &rules), Ok(()));
let info = conn.acl_getuser(username).expect("Got user");
assert!(info.is_some());
let info = info.expect("Got asynq");
assert_eq!(
info.flags,
vec![Rule::On, Rule::Other("sanitize-payload".to_string())]
);
assert_eq!(
info.passwords,
vec![Rule::AddHashedPass(
"dda69783f28fdf6f1c5a83e8400f2472e9300887d1dffffe12a07b92a3d0aa25".to_string()
)]
);
assert_eq!(
info.commands,
vec![
Rule::AddCategory("all".to_string()),
Rule::RemoveCategory("dangerous".to_string()),
Rule::AddCommand("keys".to_string()),
Rule::RemoveCommand("info".to_string()),
Rule::RemoveCommand("select".to_string()),
]
);
assert_eq!(
info.keys,
vec![
Rule::Pattern("asynq:{default}:*".to_string()),
Rule::Pattern("asynq:{tenant:*".to_string()),
Rule::Pattern("asynq:queues".to_string()),
Rule::Pattern("asynq:servers:*".to_string()),
Rule::Pattern("asynq:servers".to_string()),
Rule::Pattern("asynq:workers".to_string()),
Rule::Pattern("asynq:workers:*".to_string()),
Rule::Pattern("asynq:schedulers".to_string()),
Rule::Pattern("asynq:schedulers:*".to_string()),
]
);
assert_eq!(
info.channels,
vec![Rule::Channel("asynq:cancel".to_string())]
);
assert_eq!(info.selectors, vec![]);
}
#[test]
fn test_acl_sample_info() {
let ctx = run_test_if_version_supported!(&REDIS_VERSION_CE_7_2);
let mut conn = ctx.connection();
let sample_rule = vec![
Rule::On,
Rule::NoPass,
Rule::AddCommand("GET".to_string()),
Rule::AllKeys,
Rule::Channel("*".to_string()),
Rule::Selector(vec![
Rule::AddCommand("SET".to_string()),
Rule::Pattern("key2".to_string()),
]),
];
conn.acl_setuser_rules("sample", &sample_rule)
.expect("Set sample user");
let sample_user = conn.acl_getuser("sample").expect("Got user");
let sample_user = sample_user.expect("Got sample user");
assert_eq!(
sample_user.flags,
vec![
Rule::On,
Rule::NoPass,
Rule::Other("sanitize-payload".to_string())
]
);
assert_eq!(sample_user.passwords, vec![]);
assert_eq!(
sample_user.commands,
vec![
Rule::RemoveCategory("all".to_string()),
Rule::AddCommand("get".to_string()),
]
);
assert_eq!(sample_user.keys, vec![Rule::AllKeys]);
assert_eq!(sample_user.channels, vec![Rule::Channel("*".to_string())]);
assert_eq!(
sample_user.selectors,
vec![
Rule::RemoveCategory("all".to_string()),
Rule::AddCommand("set".to_string()),
Rule::Pattern("key2".to_string()),
]
);
}
#[cfg(all(feature = "acl", feature = "token-based-authentication"))]
mod token_based_authentication_acl_tests {
use crate::support::*;
use futures_channel::oneshot;
use futures_time::task::sleep;
use futures_util::{Stream, StreamExt};
use redis::{
AsyncTypedCommands, ErrorKind, RedisResult,
aio::ConnectionLike,
auth::{BasicAuth, StreamingCredentialsProvider},
};
use std::{
pin::Pin,
sync::{Arc, Mutex, Once, RwLock},
time::Duration,
};
use test_macros::async_test;
use tokio::sync::mpsc::Sender;
static INIT_LOGGER: Once = Once::new();
fn init_logger() {
INIT_LOGGER.call_once(|| {
let mut builder = env_logger::builder();
builder.is_test(true);
if std::env::var("RUST_LOG").is_err() {
builder.filter_level(log::LevelFilter::Debug);
}
builder.init();
});
}
const TOKEN_PAYLOAD: &str = "eyJvaWQiOiIxMjM0NTY3OC05YWJjLWRlZi0xMjM0LTU2Nzg5YWJjZGVmMCJ9"; const OID_CLAIM_VALUE: &str = "12345678-9abc-def-1234-56789abcdef0";
const TOKEN_SIGNATURE: &str = "signature";
static MOCKED_TOKEN: std::sync::LazyLock<String> = std::sync::LazyLock::new(|| {
format!("mock_jwt_token.{}.{}", TOKEN_PAYLOAD, TOKEN_SIGNATURE)
});
const DEFAULT_USER: &str = "default";
const TEST_USER: &str = "test";
const ALICE_OID_CLAIM: &str = "a11ce000-7a1c-4a1c-9e11-ace000000001";
const ALICE_TOKEN: &str = "alice_mock_jwt_token.eyJvaWQiOiJhMTFjZTAwMC03YTFjLTRhMWMtOWUxMS1hY2UwMDAwMDAwMDEifQ.signature";
const BOB_OID_CLAIM: &str = "b0b00000-0b01-4b0b-9b0b-0b0000000002";
const BOB_TOKEN: &str = "bob_mock_jwt_token.eyJvaWQiOiJiMGIwMDAwMC0wYjAxLTRiMGItOWIwYi0wYjAwMDAwMDAwMDIifQ.signature";
const CHARLIE_OID_CLAIM: &str = "c0a11e00-7c1a-4a1e-9c11-0ca11e000003";
const CHARLIE_TOKEN: &str = "charlie_mock_jwt_token.eyJvaWQiOiJjMGExMWUwMC03YzFhLTRhMWUtOWMxMS0wY2ExMWUwMDAwMDAzIn0.signature";
const CREDENTIALS: [(&str, &str); 3] = [
(ALICE_OID_CLAIM, ALICE_TOKEN),
(BOB_OID_CLAIM, BOB_TOKEN),
(CHARLIE_OID_CLAIM, CHARLIE_TOKEN),
];
const INVALID_USER: &str = "nonexistent_user";
const INVALID_TOKEN: &str = "invalid_token";
#[derive(Debug, Clone)]
pub struct MockProviderConfig {
pub credentials_sequence: Vec<BasicAuth>,
pub refresh_interval: Duration,
pub error_positions: Vec<usize>,
}
impl Default for MockProviderConfig {
fn default() -> Self {
Self {
credentials_sequence: vec![BasicAuth::new(
OID_CLAIM_VALUE.to_string(),
MOCKED_TOKEN.clone(),
)],
refresh_interval: Duration::from_millis(100),
error_positions: vec![],
}
}
}
impl MockProviderConfig {
pub fn multiple_tokens() -> Self {
let mut credentials_sequence = Vec::new();
for (username, token_payload) in CREDENTIALS.iter() {
credentials_sequence.push(BasicAuth::new(
username.to_string(),
token_payload.to_string(),
));
}
Self {
credentials_sequence,
refresh_interval: Duration::from_millis(500),
error_positions: vec![],
}
}
pub fn multiple_tokens_with_errors(error_positions: Vec<usize>) -> Self {
let mut config = Self::multiple_tokens();
config.error_positions = error_positions;
config
}
pub fn valid_then_invalid_credentials() -> Self {
Self {
credentials_sequence: vec![
BasicAuth::new(ALICE_OID_CLAIM.to_string(), ALICE_TOKEN.to_string()),
BasicAuth::new(INVALID_USER.to_string(), INVALID_TOKEN.to_string()),
],
refresh_interval: Duration::from_millis(500),
error_positions: vec![],
}
}
}
type Subscriptions = Vec<Sender<RedisResult<BasicAuth>>>;
type SharedSubscriptions = Arc<Mutex<Subscriptions>>;
#[derive(Debug, Clone)]
pub struct MockStreamingCredentialsProvider {
config: MockProviderConfig,
abort_handle: Arc<Mutex<Option<oneshot::Sender<()>>>>,
subscribers: SharedSubscriptions,
current_credentials: Arc<RwLock<Option<BasicAuth>>>,
current_position: Arc<Mutex<usize>>,
}
impl MockStreamingCredentialsProvider {
pub fn new() -> Self {
Self::with_config(MockProviderConfig::default())
}
pub fn with_config(config: MockProviderConfig) -> Self {
Self {
config,
abort_handle: Default::default(),
subscribers: Default::default(),
current_credentials: Default::default(),
current_position: Default::default(),
}
}
pub fn multiple_tokens() -> Self {
Self::with_config(MockProviderConfig::multiple_tokens())
}
pub fn multiple_tokens_with_errors(error_positions: Vec<usize>) -> Self {
Self::with_config(MockProviderConfig::multiple_tokens_with_errors(
error_positions,
))
}
pub fn start(&mut self) {
if self.abort_handle.lock().unwrap().is_some() {
return;
}
let config = self.config.clone();
let subscribers_arc = Arc::clone(&self.subscribers);
let current_credentials_arc = Arc::clone(&self.current_credentials);
let current_position_arc = Arc::clone(&self.current_position);
let (abort_sender, abort_receiver) = oneshot::channel();
*self.abort_handle.lock().unwrap() = Some(abort_sender);
let notifier_future = async move {
let mut attempt = 0;
loop {
let position = {
let mut pos = current_position_arc
.lock()
.expect("could not acquire lock for current_position");
let current_pos = *pos;
*pos = (*pos + 1) % config.credentials_sequence.len();
current_pos
};
println!("Mock provider: Refreshing credentials. Attempt {attempt}");
let result = if config.error_positions.contains(&position) {
Err(redis::RedisError::from((
redis::ErrorKind::AuthenticationFailed,
"Mock authentication failed",
)))
} else {
let credentials = config.credentials_sequence[position].clone();
{
let mut current = current_credentials_arc.write().unwrap();
*current = Some(credentials.clone());
}
println!("Mock provider: Providing credentials: {:?}", credentials);
Ok(credentials)
};
Self::notify_subscribers(&subscribers_arc, result.clone()).await;
attempt += 1;
sleep(config.refresh_interval.into()).await;
}
};
spawn(async move {
futures::future::select(abort_receiver, Box::pin(notifier_future)).await
});
}
pub fn stop(&mut self) {
if let Some(handle) = self.abort_handle.lock().unwrap().take() {
_ = handle.send(());
}
}
async fn notify_subscribers(
subscribers_arc: &SharedSubscriptions,
result: RedisResult<BasicAuth>,
) {
let subscribers_list = {
let mut guard = subscribers_arc
.lock()
.expect("could not acquire lock for subscribers");
guard.retain(|sender| !sender.is_closed());
guard.clone()
};
futures_util::future::join_all(
subscribers_list
.iter()
.map(|sender| sender.send(result.clone())),
)
.await;
}
}
impl StreamingCredentialsProvider for MockStreamingCredentialsProvider {
fn subscribe(
&self,
) -> Pin<Box<dyn Stream<Item = RedisResult<BasicAuth>> + Send + 'static>> {
let (tx, rx) = tokio::sync::mpsc::channel::<RedisResult<BasicAuth>>(1);
self.subscribers
.lock()
.expect("could not acquire lock for subscribers")
.push(tx);
let stream = futures_util::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
});
if let Some(credentials) = self.current_credentials.read().unwrap().clone() {
futures_util::stream::once(async move { Ok(credentials) })
.chain(stream)
.boxed()
} else {
stream.boxed()
}
}
}
impl Drop for MockStreamingCredentialsProvider {
fn drop(&mut self) {
self.stop();
}
}
#[async_test]
async fn authentication_with_mock_streaming_credentials_provider() {
init_logger();
let ctx = TestContext::new();
let mut admin_con = ctx.async_connection().await.unwrap();
let expected_username = OID_CLAIM_VALUE;
let users_cmd = redis::cmd("ACL").arg("USERS").clone();
println!("Setting up Redis user with JWT token authentication...");
let result = admin_con.req_packed_command(redis::cmd("ACL")
.arg("SETUSER")
.arg(expected_username)
.arg("on") .arg(format!(">{}", MOCKED_TOKEN.as_str())) .arg("~*") .arg("+@all")) .await;
assert_eq!(result, Ok(redis::Value::Okay));
println!("Setting up mock streaming credentials provider with default token...");
let mut mock_provider = MockStreamingCredentialsProvider::new();
mock_provider.start();
let config = redis::AsyncConnectionConfig::new().set_credentials_provider(mock_provider);
println!("Establishing multiplexed connection with JWT authentication...");
let mut con = ctx
.client
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
let current_user: String = redis::cmd("ACL")
.arg("WHOAMI")
.query_async(&mut con)
.await
.unwrap();
assert_eq!(current_user, expected_username);
println!("Authenticated as user: {current_user}.");
let users: Vec<String> = users_cmd.query_async(&mut con).await.unwrap();
assert!(users.contains(&DEFAULT_USER.to_owned()));
assert!(users.contains(&expected_username.to_owned()));
println!("Testing ACL admin operations...");
let _: () = redis::cmd("ACL")
.arg("SETUSER")
.arg(TEST_USER)
.query_async(&mut con)
.await
.unwrap();
let updated_users: Vec<String> = users_cmd.query_async(&mut con).await.unwrap();
assert!(updated_users.contains(&DEFAULT_USER.to_owned()));
assert!(updated_users.contains(&expected_username.to_owned()));
assert!(updated_users.contains(&TEST_USER.to_owned()));
println!("JWT authentication and ACL operations completed successfully!");
}
async fn add_users_with_jwt_tokens(ctx: &TestContext) {
let mut admin_con = ctx.async_connection().await.unwrap();
for (username, token_payload) in CREDENTIALS.iter() {
let result = admin_con.req_packed_command(redis::cmd("ACL")
.arg("SETUSER")
.arg(username)
.arg("on") .arg(format!(">{token_payload}")) .arg("~*") .arg("+@all")) .await;
assert_eq!(result, Ok(redis::Value::Okay));
}
}
#[async_test]
async fn token_rotation_with_mock_streaming_credentials_provider() {
init_logger();
let ctx = TestContext::new();
let users_cmd = redis::cmd("ACL").arg("USERS").clone();
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
println!("Setting up Redis users for token rotation test...");
add_users_with_jwt_tokens(&ctx).await;
println!("Setting up mock provider with multiple tokens...");
let mut mock_provider = MockStreamingCredentialsProvider::multiple_tokens();
mock_provider.start();
let config = redis::AsyncConnectionConfig::new().set_credentials_provider(mock_provider);
println!("Establishing multiplexed connection with JWT authentication...");
let mut con = ctx
.client
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
println!("Authenticated as user: {current_user}.");
println!("Waiting for token rotation...");
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
println!("First rotation completed. Authenticated as user: {current_user}.");
assert_eq!(current_user, BOB_OID_CLAIM);
let users: Vec<String> = users_cmd.query_async(&mut con).await.unwrap();
println!("Users after first rotation: {:?}", users);
println!("Waiting for second token rotation...");
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
println!("Second rotation completed. Authenticated as user: {current_user}.");
assert_eq!(current_user, CHARLIE_OID_CLAIM);
let users: Vec<String> = users_cmd.query_async(&mut con).await.unwrap();
println!("Users after second rotation: {:?}", users);
println!("Token rotation test completed successfully!");
}
#[async_test]
async fn authentication_error_handling_with_mock_streaming_credentials_provider() {
init_logger();
let ctx = TestContext::new();
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
println!("Setting up Redis users for authentication error test...");
add_users_with_jwt_tokens(&ctx).await;
println!("Setting up mock provider with authentication error at position 1...");
let mut mock_provider =
MockStreamingCredentialsProvider::multiple_tokens_with_errors(vec![1]);
mock_provider.start();
let config = redis::AsyncConnectionConfig::new().set_credentials_provider(mock_provider);
println!("Establishing multiplexed connection with JWT authentication...");
let mut con = ctx
.client
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
println!("Initial authentication successful as user: {current_user}.");
println!("Waiting for first rotation attempt (should fail)...");
sleep(Duration::from_millis(600).into()).await;
let current_user_after_error: String = whoami_cmd.query_async(&mut con).await.unwrap();
println!("Current user after error: {current_user_after_error}");
assert_eq!(current_user_after_error, ALICE_OID_CLAIM);
println!("Waiting for second rotation attempt (should succeed)...");
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
println!("User after successful rotation: {current_user}");
assert_eq!(current_user, CHARLIE_OID_CLAIM);
println!("Waiting for third rotation attempt (back to Alice)...");
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
println!("User after cycling back: {current_user}");
assert_eq!(current_user, ALICE_OID_CLAIM);
println!("Authentication error handling test completed successfully!");
}
#[async_test]
async fn multiple_connections_from_one_client_sharing_a_single_credentials_provider() {
init_logger();
let ctx = TestContext::new();
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
println!(
"Setting up Redis users for token rotation test in which a single client establishes multiple connections that share a single credentials provider..."
);
add_users_with_jwt_tokens(&ctx).await;
println!("Setting up mock provider with multiple tokens...");
let mut mock_provider = MockStreamingCredentialsProvider::multiple_tokens();
mock_provider.start();
let config = redis::AsyncConnectionConfig::new().set_credentials_provider(mock_provider);
println!("Establishing multiplexed connections with JWT authentication...");
let mut con1 = ctx
.client
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
let mut con2 = ctx
.client
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
let mut con3 = ctx
.client
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
for (i, con) in [&mut con1, &mut con2, &mut con3].into_iter().enumerate() {
let i = i + 1;
let current_user: String = whoami_cmd.query_async(con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
assert_eq!(con.set(format!("test_key_{i}"), i).await, Ok(()));
}
println!("Waiting for token rotation...");
sleep(Duration::from_millis(600).into()).await;
for (i, con) in [&mut con1, &mut con2, &mut con3].into_iter().enumerate() {
let i = i + 1;
let current_user: String = whoami_cmd.query_async(con).await.unwrap();
assert_eq!(current_user, BOB_OID_CLAIM);
assert_eq!(
con.get(format!("test_key_{i}")).await,
Ok(Some(i.to_string()))
);
}
println!(
"Multiple connections sharing a single credentials provider test completed successfully!"
);
}
#[async_test]
async fn multiple_clients_sharing_a_single_credentials_provider() {
init_logger();
let ctx1 = TestContext::new();
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
println!(
"Setting up Redis users for token rotation test with multiple clients that share a single credentials provider..."
);
add_users_with_jwt_tokens(&ctx1).await;
println!("Setting up mock provider with multiple tokens...");
let mut mock_provider = MockStreamingCredentialsProvider::multiple_tokens();
mock_provider.start();
let config = redis::AsyncConnectionConfig::new().set_credentials_provider(mock_provider);
let client2 = redis::Client::open(ctx1.server.connection_info()).unwrap();
println!("Establishing multiplexed connections with JWT authentication...");
let mut con1 = ctx1
.client
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
let mut con2 = client2
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
for (i, con) in [&mut con1, &mut con2].into_iter().enumerate() {
let i = i + 1;
let current_user: String = whoami_cmd.query_async(con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
assert_eq!(con.set(format!("test_key_{i}"), i).await, Ok(()));
}
println!("Waiting for token rotation...");
sleep(Duration::from_millis(600).into()).await;
for (i, con) in [&mut con1, &mut con2].into_iter().enumerate() {
let i = i + 1;
let current_user: String = whoami_cmd.query_async(con).await.unwrap();
assert_eq!(current_user, BOB_OID_CLAIM);
assert_eq!(
con.get(format!("test_key_{i}")).await,
Ok(Some(i.to_string()))
);
}
println!(
"Multiple clients sharing a single credentials provider test completed successfully!"
);
}
#[async_test]
async fn connection_rendered_unusable_when_reauthentication_fails() {
init_logger();
let ctx = TestContext::new();
println!("Setting up Redis users for re-authentication failure test...");
add_users_with_jwt_tokens(&ctx).await;
println!("Setting up mock provider that yields valid then invalid credentials...");
let mut mock_provider = MockStreamingCredentialsProvider::with_config(
MockProviderConfig::valid_then_invalid_credentials(),
);
mock_provider.start();
let config = redis::AsyncConnectionConfig::new().set_credentials_provider(mock_provider);
println!("Establishing multiplexed connection with JWT authentication...");
let mut con = ctx
.client
.get_multiplexed_async_connection_with_config(&config)
.await
.unwrap();
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
println!("Initial authentication successful as user: {current_user}.");
println!("Waiting for token rotation to yield invalid credentials...");
sleep(Duration::from_millis(600).into()).await;
println!("Attempting to execute a command on an unusable connection...");
let result: redis::RedisResult<String> = whoami_cmd.query_async(&mut con).await;
assert!(result.is_err());
let error = result.unwrap_err();
assert_eq!(error.kind(), ErrorKind::AuthenticationFailed);
assert!(
error.to_string().contains("re-authentication failure"),
"Error message should mention re-authentication failure: {error}"
);
println!("Command correctly failed with AuthenticationFailed: {error}");
println!("Connection rendered unusable test completed successfully!");
}
#[cfg(feature = "cluster-async")]
mod cluster {
use super::*;
use redis::cluster::ClusterClientBuilder;
async fn add_user_on_all_nodes(cluster: &TestClusterContext, username: &str, token: &str) {
for server in &cluster.cluster.servers {
let client = redis::Client::open(server.connection_info()).unwrap();
let mut con = client.get_multiplexed_async_connection().await.unwrap();
redis::cmd("ACL")
.arg("SETUSER")
.arg(username)
.arg("on")
.arg(format!(">{token}"))
.arg("~*")
.arg("+@all")
.exec_async(&mut con)
.await
.expect("ACL SETUSER should succeed");
}
}
async fn add_users_with_jwt_tokens_on_all_nodes(cluster: &TestClusterContext) {
for (username, token_payload) in CREDENTIALS.iter() {
add_user_on_all_nodes(cluster, username, token_payload).await;
}
}
#[async_test]
async fn cluster_authentication_with_mock_streaming_credentials_provider() {
init_logger();
let cluster = TestClusterContext::new_with_cluster_client_builder(
|builder: ClusterClientBuilder| {
let mut mock_provider = MockStreamingCredentialsProvider::new();
mock_provider.start();
builder.set_credentials_provider(mock_provider)
},
);
add_user_on_all_nodes(&cluster, OID_CLAIM_VALUE, &MOCKED_TOKEN).await;
let mut connection = cluster.async_connection().await;
let current_user: String = redis::cmd("ACL")
.arg("WHOAMI")
.query_async(&mut connection)
.await
.unwrap();
assert_eq!(current_user, OID_CLAIM_VALUE);
redis::cmd("SET")
.arg("test_key")
.arg("test_value")
.exec_async(&mut connection)
.await
.expect("SET should succeed with credentials provider");
let result: String = redis::cmd("GET")
.arg("test_key")
.query_async(&mut connection)
.await
.expect("GET should succeed with credentials provider");
assert_eq!(result, "test_value");
}
#[async_test]
async fn cluster_token_rotation_with_mock_streaming_credentials_provider() {
init_logger();
let cluster = TestClusterContext::new_with_cluster_client_builder(
|builder: ClusterClientBuilder| {
let mut mock_provider = MockStreamingCredentialsProvider::multiple_tokens();
mock_provider.start();
builder.set_credentials_provider(mock_provider)
},
);
add_users_with_jwt_tokens_on_all_nodes(&cluster).await;
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
let mut con = cluster.async_connection().await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, BOB_OID_CLAIM);
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, CHARLIE_OID_CLAIM);
}
#[async_test]
async fn cluster_authentication_error_handling_with_mock_streaming_credentials_provider() {
init_logger();
let cluster = TestClusterContext::new_with_cluster_client_builder(
|builder: ClusterClientBuilder| {
let mut mock_provider =
MockStreamingCredentialsProvider::multiple_tokens_with_errors(vec![1]);
mock_provider.start();
builder.set_credentials_provider(mock_provider)
},
);
add_users_with_jwt_tokens_on_all_nodes(&cluster).await;
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
let mut con = cluster.async_connection().await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, CHARLIE_OID_CLAIM);
sleep(Duration::from_millis(600).into()).await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
}
#[async_test]
async fn cluster_multiple_connections_sharing_a_single_credentials_provider() {
init_logger();
let cluster = TestClusterContext::new_with_cluster_client_builder(
|builder: ClusterClientBuilder| {
let mut mock_provider = MockStreamingCredentialsProvider::multiple_tokens();
mock_provider.start();
builder.set_credentials_provider(mock_provider)
},
);
add_users_with_jwt_tokens_on_all_nodes(&cluster).await;
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
let mut con1 = cluster.client.get_async_connection().await.unwrap();
let mut con2 = cluster.client.get_async_connection().await.unwrap();
for (i, con) in [&mut con1, &mut con2].into_iter().enumerate() {
let current_user: String = whoami_cmd.query_async(con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
redis::cmd("SET")
.arg(format!("test_key_{i}"))
.arg(i.to_string())
.exec_async(con)
.await
.unwrap();
}
sleep(Duration::from_millis(600).into()).await;
for (i, con) in [&mut con1, &mut con2].into_iter().enumerate() {
let current_user: String = whoami_cmd.query_async(con).await.unwrap();
assert_eq!(current_user, BOB_OID_CLAIM);
let val: String = redis::cmd("GET")
.arg(format!("test_key_{i}"))
.query_async(con)
.await
.unwrap();
assert_eq!(val, i.to_string());
}
}
#[async_test]
async fn cluster_multiple_clients_sharing_a_single_credentials_provider() {
init_logger();
let cluster = TestClusterContext::new();
add_users_with_jwt_tokens_on_all_nodes(&cluster).await;
let mut mock_provider = MockStreamingCredentialsProvider::multiple_tokens();
mock_provider.start();
let client1 = ClusterClientBuilder::new(cluster.nodes.clone())
.set_credentials_provider(mock_provider.clone())
.build()
.unwrap();
let client2 = ClusterClientBuilder::new(cluster.nodes.clone())
.set_credentials_provider(mock_provider)
.build()
.unwrap();
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
let mut con1 = client1.get_async_connection().await.unwrap();
let mut con2 = client2.get_async_connection().await.unwrap();
for (i, con) in [&mut con1, &mut con2].into_iter().enumerate() {
let current_user: String = whoami_cmd.query_async(con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
redis::cmd("SET")
.arg(format!("test_key_{i}"))
.arg(i.to_string())
.exec_async(con)
.await
.unwrap();
}
sleep(Duration::from_millis(600).into()).await;
for (i, con) in [&mut con1, &mut con2].into_iter().enumerate() {
let current_user: String = whoami_cmd.query_async(con).await.unwrap();
assert_eq!(current_user, BOB_OID_CLAIM);
let val: String = redis::cmd("GET")
.arg(format!("test_key_{i}"))
.query_async(con)
.await
.unwrap();
assert_eq!(val, i.to_string());
}
}
#[async_test]
async fn cluster_connection_rendered_unusable_when_reauthentication_fails() {
init_logger();
let cluster = TestClusterContext::new_with_cluster_client_builder(
|builder: ClusterClientBuilder| {
let mut mock_provider = MockStreamingCredentialsProvider::with_config(
MockProviderConfig::valid_then_invalid_credentials(),
);
mock_provider.start();
builder.set_credentials_provider(mock_provider)
},
);
add_users_with_jwt_tokens_on_all_nodes(&cluster).await;
let whoami_cmd = redis::cmd("ACL").arg("WHOAMI").clone();
let mut con = cluster.async_connection().await;
let current_user: String = whoami_cmd.query_async(&mut con).await.unwrap();
assert_eq!(current_user, ALICE_OID_CLAIM);
sleep(Duration::from_millis(600).into()).await;
let result: redis::RedisResult<String> = whoami_cmd.query_async(&mut con).await;
assert!(
result.is_err(),
"Commands should fail after re-authentication with invalid credentials."
);
let error = result.unwrap_err();
assert_eq!(error.kind(), ErrorKind::ClusterConnectionNotFound);
}
}
}