use super::internal::{
DEFAULT_MAX_PAYLOAD_BYTES, ValkeyStoreError, registry_key, revocation_session_channel,
revocation_user_channel, session_key,
};
use super::registry::ValkeySessionRegistry;
use super::store::ValkeySessionStore;
use crate::session::crypto::SessionCrypto;
use crate::session::data::SessionData;
use crate::session::id::SessionId;
use crate::session::storage::session_codec::SessionCodec;
use crate::session::store::{SessionRegistry, SessionStore};
use axess_rng::SystemRng;
use fred::prelude::*;
use std::time::Duration;
#[test]
fn store_encode_decode_encrypted() {
let key = [99u8; 32];
let codec = SessionCodec::encrypted(SessionCrypto::new(key));
let data = SessionData::default();
let bytes = codec.encode_bytes(&data).expect("encode_bytes");
let restored = codec.decode_bytes(&bytes).expect("decode_bytes");
assert_eq!(
serde_json::to_string(&data).unwrap(),
serde_json::to_string(&restored).unwrap(),
);
}
#[test]
fn encode_rejects_oversized_payload() {
let config = Config::from_url("redis://127.0.0.1:6379").expect("parse url");
let client = Client::new(config, None, None, None);
let store = ValkeySessionStore::plaintext(client).with_max_payload(1);
let data = SessionData::default();
let result = store.encode(&data);
assert!(
matches!(result, Err(ValkeyStoreError::PayloadTooLarge { .. })),
"expected PayloadTooLarge, got: {result:?}"
);
}
#[test]
fn encode_accepts_payload_within_limit() {
let config = Config::from_url("redis://127.0.0.1:6379").expect("parse url");
let client = Client::new(config, None, None, None);
let store = ValkeySessionStore::plaintext(client);
let data = SessionData::default();
assert!(store.encode(&data).is_ok());
}
fn dummy_client() -> Client {
let config = Config::from_url("redis://127.0.0.1:6379").expect("parse url");
Client::new(config, None, None, None)
}
fn unreachable_client() -> Client {
let mut config = Config::from_url("redis://127.0.0.1:1/").expect("parse url");
let connection = fred::types::config::ConnectionConfig {
max_command_attempts: 1,
internal_command_timeout: Duration::from_millis(50),
..Default::default()
};
let perf = fred::types::config::PerformanceConfig {
default_command_timeout: Duration::from_millis(200),
..Default::default()
};
config.fail_fast = true;
Client::new(config, Some(perf), Some(connection), None)
}
#[test]
fn key_helpers_produce_expected_strings() {
let rng = SystemRng;
let id = SessionId::new(&rng);
let id_s = id.to_string();
assert_eq!(session_key("axess", &id), format!("axess:sess:{id_s}"));
assert_eq!(registry_key("axess", "u-1"), "axess:reg:u-1");
assert_eq!(
revocation_session_channel("axess", "s-1"),
"axess:revoked-session:s-1"
);
assert_eq!(
revocation_user_channel("axess", "u-1"),
"axess:revoked-user:u-1"
);
assert_eq!(
session_key("other", &id),
format!("other:sess:{id_s}"),
"session_key must interpolate the prefix argument"
);
}
#[test]
fn default_max_payload_bytes_is_64_kib() {
assert_eq!(DEFAULT_MAX_PAYLOAD_BYTES, 64 * 1024);
assert_eq!(DEFAULT_MAX_PAYLOAD_BYTES, 65_536);
}
#[test]
fn encode_size_boundary_is_strict_greater_than() {
let store = ValkeySessionStore::plaintext(dummy_client());
let data = SessionData::default();
let baseline = store
.clone()
.with_max_payload(usize::MAX)
.encode(&data)
.expect("baseline encode")
.len();
let ok_at_cap = store.clone().with_max_payload(baseline).encode(&data);
assert!(
ok_at_cap.is_ok(),
"encode at exactly the cap must succeed (> not >= or ==): {ok_at_cap:?}"
);
let under_cap = store.clone().with_max_payload(baseline - 1).encode(&data);
assert!(
matches!(under_cap, Err(ValkeyStoreError::PayloadTooLarge { .. })),
"encode one byte under cap must reject (discriminates < mutant): {under_cap:?}"
);
}
#[test]
fn store_encode_decode_roundtrip_pins_bodies() {
let store = ValkeySessionStore::plaintext(dummy_client());
let data = SessionData {
custom: serde_json::json!({"k": "v"}),
..SessionData::default()
};
let bytes = store.encode(&data).expect("encode");
assert!(
!bytes.is_empty(),
"encode must produce a non-empty payload: kills Ok(vec![])"
);
assert!(
bytes.len() > 1,
"encode must produce > 1 byte for non-trivial payload: kills Ok(vec![0/1])"
);
let decoded = store.decode(&bytes).expect("decode");
assert_eq!(
serde_json::to_string(&data).unwrap(),
serde_json::to_string(&decoded).unwrap(),
"encode/decode must round-trip: kills Ok(Default::default()) on decode"
);
let default_json = serde_json::to_string(&SessionData::default()).unwrap();
let actual_json = serde_json::to_string(&data).unwrap();
assert_ne!(
default_json, actual_json,
"test payload must differ from default for the decode mutation test to be meaningful"
);
}
#[test]
fn from_impls_preserve_source_variant() {
let fred_err: ValkeyStoreError =
fred::error::Error::new(fred::error::ErrorKind::IO, "x").into();
assert!(matches!(fred_err, ValkeyStoreError::Connection(_)));
let decode_err: Result<SessionData, _> = rmp_serde::from_slice(&[0xFF, 0xFF, 0xFF]);
if let Err(e) = decode_err {
let sql_err: crate::session::storage::session_codec::SqlStoreError = e.into();
let wrapped: ValkeyStoreError = sql_err.into();
assert!(matches!(wrapped, ValkeyStoreError::Codec(_)));
}
}
#[tokio::test]
async fn load_propagates_error_against_unreachable_valkey() {
let store = ValkeySessionStore::plaintext(unreachable_client());
let rng = SystemRng;
let id = SessionId::new(&rng);
let result = tokio::time::timeout(Duration::from_secs(3), store.load(&id)).await;
assert!(
matches!(result, Ok(Err(_))),
"load must return Err quickly against unreachable Valkey: {result:?}"
);
}
#[tokio::test]
async fn save_propagates_error_against_unreachable_valkey() {
let store = ValkeySessionStore::plaintext(unreachable_client());
let rng = SystemRng;
let id = SessionId::new(&rng);
let result = tokio::time::timeout(
Duration::from_secs(3),
store.save(&id, &SessionData::default(), Duration::from_secs(60)),
)
.await;
assert!(
matches!(result, Ok(Err(_))),
"save must return Err: {result:?}"
);
}
#[tokio::test]
async fn delete_propagates_error_against_unreachable_valkey() {
let store = ValkeySessionStore::plaintext(unreachable_client());
let rng = SystemRng;
let id = SessionId::new(&rng);
let result = tokio::time::timeout(Duration::from_secs(3), store.delete(&id)).await;
assert!(
matches!(result, Ok(Err(_))),
"delete must return Err: {result:?}"
);
}
#[tokio::test]
async fn cycle_propagates_error_against_unreachable_valkey() {
let store = ValkeySessionStore::plaintext(unreachable_client());
let rng = SystemRng;
let old = SessionId::new(&rng);
let new = SessionId::new(&rng);
let result = tokio::time::timeout(
Duration::from_secs(3),
store.cycle(&old, &new, &SessionData::default(), Duration::from_secs(60)),
)
.await;
assert!(
matches!(result, Ok(Err(_))),
"cycle must return Err: {result:?}"
);
}
fn unreachable_registry() -> ValkeySessionRegistry {
ValkeySessionRegistry::new(unreachable_client())
}
#[tokio::test]
async fn register_propagates_error_against_unreachable_valkey() {
let registry = unreachable_registry();
let user = axess_identity::testing::user("u");
let rng = SystemRng;
let sid = SessionId::new(&rng);
let result = tokio::time::timeout(Duration::from_secs(3), registry.register(&user, &sid)).await;
assert!(
matches!(result, Ok(Err(_))),
"register must return Err: {result:?}"
);
}
#[tokio::test]
async fn is_valid_propagates_error_against_unreachable_valkey() {
let registry = unreachable_registry();
let user = axess_identity::testing::user("u");
let rng = SystemRng;
let sid = SessionId::new(&rng);
let result = tokio::time::timeout(Duration::from_secs(3), registry.is_valid(&user, &sid)).await;
assert!(
matches!(result, Ok(Err(_))),
"is_valid must return Err: kills Ok(true)/Ok(false) mutants: {result:?}"
);
}
#[tokio::test]
async fn invalidate_user_propagates_error_against_unreachable_valkey() {
let registry = unreachable_registry();
let user = axess_identity::testing::user("u");
let result =
tokio::time::timeout(Duration::from_secs(3), registry.invalidate_user(&user)).await;
assert!(
matches!(result, Ok(Err(_))),
"invalidate_user must return Err: {result:?}"
);
}
#[tokio::test]
async fn invalidate_session_propagates_error_against_unreachable_valkey() {
let registry = unreachable_registry();
let user = axess_identity::testing::user("u");
let rng = SystemRng;
let sid = SessionId::new(&rng);
let result = tokio::time::timeout(
Duration::from_secs(3),
registry.invalidate_session(&user, &sid),
)
.await;
assert!(
matches!(result, Ok(Err(_))),
"invalidate_session must return Err: {result:?}"
);
}
#[tokio::test]
async fn active_sessions_propagates_error_against_unreachable_valkey() {
let registry = unreachable_registry();
let user = axess_identity::testing::user("u");
let result =
tokio::time::timeout(Duration::from_secs(3), registry.active_sessions(&user)).await;
assert!(
matches!(result, Ok(Err(_))),
"active_sessions must return Err: kills Ok(vec![]) AND the \
`delete -` mutation on `-1i64` (both flip the awaited result \
from Err to Ok): {result:?}"
);
}
#[tokio::test]
async fn prune_expired_returns_zero_per_native_expiry_contract() {
let store = ValkeySessionStore::plaintext(dummy_client());
let count = store.prune_expired().await.expect("Ok per contract");
assert_eq!(
count, 0,
"Valkey relies on native key expiry: prune_expired contract is Ok(0)"
);
}
#[tokio::test]
#[ignore = "requires a running Valkey instance at 127.0.0.1:6379"]
async fn valkey_integration() {
let config = Config::from_url("redis://127.0.0.1:6379").expect("parse redis URL");
let client = Client::new(config, None, None, None);
client.init().await.expect("connect to Valkey");
let encryption_key = [77u8; 32];
let store =
ValkeySessionStore::encrypted(client.clone(), encryption_key).with_prefix("axess_test");
let registry =
ValkeySessionRegistry::with_options(client.clone(), "axess_test", Duration::from_secs(60));
let rng = SystemRng;
let sid = SessionId::new(&rng);
let data = SessionData::default();
let ttl = Duration::from_secs(30);
store.save(&sid, &data, ttl).await.expect("save");
let loaded = store.load(&sid).await.expect("load");
assert!(loaded.is_some(), "session should exist after save");
let new_sid = SessionId::new(&rng);
store
.cycle(&sid, &new_sid, &data, ttl)
.await
.expect("cycle");
assert!(store.load(&sid).await.expect("load old").is_none());
assert!(store.load(&new_sid).await.expect("load new").is_some());
store.delete(&new_sid).await.expect("delete");
assert!(store.load(&new_sid).await.expect("load deleted").is_none());
let user_id = axess_identity::testing::user("test-user-integration");
let user_id = &user_id;
let reg_sid = SessionId::new(&rng);
registry
.register(user_id, ®_sid)
.await
.expect("register");
assert!(
registry
.is_valid(user_id, ®_sid)
.await
.expect("is_valid"),
"session should be valid after register"
);
registry
.invalidate_session(user_id, ®_sid)
.await
.expect("invalidate_session");
assert!(
!registry
.is_valid(user_id, ®_sid)
.await
.expect("is_valid after invalidate"),
);
let sid_a = SessionId::new(&rng);
let sid_b = SessionId::new(&rng);
registry.register(user_id, &sid_a).await.expect("reg a");
registry.register(user_id, &sid_b).await.expect("reg b");
registry
.invalidate_user(user_id)
.await
.expect("invalidate_user");
assert!(!registry.is_valid(user_id, &sid_a).await.expect("a"));
assert!(!registry.is_valid(user_id, &sid_b).await.expect("b"));
let old_key = [77u8; 32];
let new_key = [88u8; 32];
let old_store =
ValkeySessionStore::encrypted(client.clone(), old_key).with_prefix("axess_test");
let rotation_store =
ValkeySessionStore::encrypted_with_rotation(client.clone(), new_key, old_key)
.with_prefix("axess_test");
let rot_sid = SessionId::new(&rng);
old_store
.save(&rot_sid, &data, ttl)
.await
.expect("save with old key");
let loaded = rotation_store
.load(&rot_sid)
.await
.expect("load with rotation");
assert!(
loaded.is_some(),
"should decrypt with previous key fallback"
);
old_store.delete(&rot_sid).await.expect("cleanup");
client.quit().await.expect("disconnect");
}