use chasquimq::config::{ConsumerConfig, ProducerConfig, RetryConfig};
use chasquimq::consumer::Consumer;
use chasquimq::producer::Producer;
use chasquimq::{Bytes, Job, JobId};
use fred::clients::Client;
use fred::interfaces::ClientLike;
use fred::prelude::Config;
use fred::types::{ClusterHash, CustomCommand, Value};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
fn redis_url() -> String {
std::env::var("REDIS_URL").expect("REDIS_URL must be set to run integration tests")
}
fn maxmemory_gate() -> bool {
std::env::var("CHASQUIMQ_RUN_MAXMEMORY_TEST")
.map(|v| v == "1")
.unwrap_or(false)
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
struct Sample {
n: u32,
}
async fn admin() -> Client {
let cfg = Config::from_url(&redis_url()).expect("REDIS_URL");
let client = Client::new(cfg, None, None, None);
client.init().await.expect("connect admin");
client
}
async fn config_set(admin: &Client, key: &str, value: &str) {
let _: Value = admin
.custom(
CustomCommand::new_static("CONFIG", ClusterHash::FirstKey, false),
vec![Value::from("SET"), Value::from(key), Value::from(value)],
)
.await
.expect("CONFIG SET");
}
async fn config_get(admin: &Client, key: &str) -> String {
let v: Value = admin
.custom(
CustomCommand::new_static("CONFIG", ClusterHash::FirstKey, false),
vec![Value::from("GET"), Value::from(key)],
)
.await
.expect("CONFIG GET");
if let Value::Array(items) = v {
if let Some(Value::String(s)) = items.get(1) {
return s.to_string();
}
if let Some(Value::Bytes(b)) = items.get(1) {
return String::from_utf8_lossy(b).into_owned();
}
}
String::new()
}
async fn used_memory(admin: &Client) -> u64 {
let v: Value = admin
.custom(
CustomCommand::new_static("INFO", ClusterHash::FirstKey, false),
vec![Value::from("memory")],
)
.await
.expect("INFO memory");
let s = match v {
Value::String(s) => s.to_string(),
Value::Bytes(b) => String::from_utf8_lossy(&b).into_owned(),
other => panic!("INFO returned unexpected: {other:?}"),
};
for line in s.lines() {
if let Some(rest) = line.strip_prefix("used_memory:") {
return rest.trim().parse::<u64>().unwrap_or(0);
}
}
0
}
async fn flush_queue(admin: &Client, queue: &str) {
for suffix in [
"stream",
"dlq",
"delayed",
"promoter:lock",
"events",
"scheduler:lock",
] {
let key = format!("{{chasqui:{queue}}}:{suffix}");
let _: Value = admin
.custom(
CustomCommand::new_static("DEL", ClusterHash::FirstKey, false),
vec![Value::from(key)],
)
.await
.expect("DEL");
}
let pattern = format!("{{chasqui:{queue}}}:result:*");
let scan: Value = admin
.custom(
CustomCommand::new_static("KEYS", ClusterHash::FirstKey, false),
vec![Value::from(pattern)],
)
.await
.expect("KEYS");
if let Value::Array(items) = scan
&& !items.is_empty()
{
let _: Value = admin
.custom(
CustomCommand::new_static("DEL", ClusterHash::FirstKey, false),
items,
)
.await
.expect("DEL");
}
}
async fn xlen(admin: &Client, key: &str) -> i64 {
match admin
.custom::<Value, Value>(
CustomCommand::new_static("XLEN", ClusterHash::FirstKey, false),
vec![Value::from(key)],
)
.await
.expect("XLEN")
{
Value::Integer(n) => n,
Value::Null => 0,
other => panic!("XLEN unexpected: {other:?}"),
}
}
async fn xpending_count(admin: &Client, stream: &str, group: &str) -> i64 {
let v: Value = admin
.custom(
CustomCommand::new_static("XPENDING", ClusterHash::FirstKey, false),
vec![Value::from(stream), Value::from(group)],
)
.await
.unwrap_or(Value::Null);
if let Value::Array(arr) = v
&& let Some(first) = arr.first()
{
if let Value::Integer(n) = first {
return *n;
}
}
0
}
fn producer_cfg(queue: &str) -> ProducerConfig {
ProducerConfig {
queue_name: queue.to_string(),
pool_size: 2,
max_stream_len: 10_000,
..Default::default()
}
}
fn consumer_cfg(queue: &str, store_results: bool, ttl_secs: u64) -> ConsumerConfig {
ConsumerConfig {
queue_name: queue.to_string(),
consumer_id: format!("c-{}", uuid::Uuid::new_v4()),
block_ms: 50,
delayed_enabled: true,
delayed_poll_interval_ms: 25,
run_scheduler: false,
events_enabled: false,
concurrency: 64,
max_attempts: 3,
retry: RetryConfig {
initial_backoff_ms: 20,
max_backoff_ms: 200,
multiplier: 2.0,
jitter_ms: 0,
},
store_results,
result_ttl_secs: ttl_secs,
..Default::default()
}
}
async fn wait_until<F, Fut>(timeout: Duration, mut check: F) -> bool
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let start = Instant::now();
loop {
if check().await {
return true;
}
if start.elapsed() > timeout {
return false;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
}
struct ConfigSnapshot {
maxmemory: String,
maxmemory_policy: String,
stop_writes: String,
}
async fn snapshot_config(admin: &Client) -> ConfigSnapshot {
ConfigSnapshot {
maxmemory: config_get(admin, "maxmemory").await,
maxmemory_policy: config_get(admin, "maxmemory-policy").await,
stop_writes: config_get(admin, "stop-writes-on-bgsave-error").await,
}
}
async fn restore_config(admin: &Client, snap: &ConfigSnapshot) {
config_set(admin, "stop-writes-on-bgsave-error", "no").await;
config_set(admin, "maxmemory-policy", &snap.maxmemory_policy).await;
config_set(admin, "maxmemory", &snap.maxmemory).await;
config_set(admin, "stop-writes-on-bgsave-error", &snap.stop_writes).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "requires REDIS_URL + CHASQUIMQ_RUN_MAXMEMORY_TEST=1; mutates Redis CONFIG"]
async fn maxmemory_noeviction_drains_without_loop() {
if !maxmemory_gate() {
eprintln!("skipping: set CHASQUIMQ_RUN_MAXMEMORY_TEST=1 to run");
return;
}
let admin = admin().await;
let queue = "maxmemory_noeviction";
let snap = snapshot_config(&admin).await;
config_set(&admin, "stop-writes-on-bgsave-error", "no").await;
flush_queue(&admin, queue).await;
let producer: Producer<Sample> = Producer::connect(&redis_url(), producer_cfg(queue))
.await
.expect("producer");
let result_payload = Bytes::from(vec![b'X'; 4096]);
let n_jobs = 32usize;
let mut ids: Vec<JobId> = Vec::with_capacity(n_jobs);
for i in 0..n_jobs {
let id = producer.add(Sample { n: i as u32 }).await.expect("add");
ids.push(id);
}
let stream_key = format!("{{chasqui:{queue}}}:stream");
let group = "default";
let gate = Arc::new(tokio::sync::Notify::new());
let gate_h = gate.clone();
let result_for_handler = result_payload.clone();
let calls = Arc::new(AtomicUsize::new(0));
let calls_h = calls.clone();
let consumer: Consumer<Sample> = Consumer::new(redis_url(), consumer_cfg(queue, true, 60));
let shutdown = CancellationToken::new();
let shutdown_clone = shutdown.clone();
let join = tokio::spawn(async move {
consumer
.run(
move |_job: Job<Sample>| {
let calls = calls_h.clone();
let result = result_for_handler.clone();
let gate = gate_h.clone();
async move {
gate.notified().await;
calls.fetch_add(1, Ordering::SeqCst);
Ok(result)
}
},
shutdown_clone,
)
.await
});
let pending_ready = {
let admin = admin.clone();
let stream_key = stream_key.clone();
wait_until(Duration::from_secs(15), move || {
let admin = admin.clone();
let stream_key = stream_key.clone();
async move { xpending_count(&admin, &stream_key, group).await >= n_jobs as i64 }
})
.await
};
assert!(pending_ready, "consumer must read all N jobs into pending");
let cur = used_memory(&admin).await;
let limit = cur.saturating_sub(4096);
config_set(&admin, "maxmemory-policy", "noeviction").await;
config_set(&admin, "maxmemory", &limit.to_string()).await;
let probe: std::result::Result<Value, fred::error::Error> = admin
.custom(
CustomCommand::new_static("SET", ClusterHash::FirstKey, false),
vec![
Value::from("{chasqui:maxmemory_noeviction}:probe"),
Value::Bytes(Bytes::from(vec![b'P'; 8192])),
],
)
.await;
let oom_active = matches!(probe, Err(e) if format!("{e}").contains("OOM"));
for _ in 0..(n_jobs * 4) {
gate.notify_one();
}
let drained = {
let admin = admin.clone();
let stream_key = stream_key.clone();
wait_until(Duration::from_secs(60), move || {
let admin = admin.clone();
let stream_key = stream_key.clone();
async move {
let len = xlen(&admin, &stream_key).await;
let pending = xpending_count(&admin, &stream_key, group).await;
len == 0 && pending == 0
}
})
.await
};
let final_xlen = xlen(&admin, &stream_key).await;
let final_pending = xpending_count(&admin, &stream_key, group).await;
restore_config(&admin, &snap).await;
assert!(
oom_active,
"smoke test: direct SET must OOM under tightened maxmemory; the test isn't exercising the OOM path otherwise"
);
assert!(
drained,
"stream + pending must drain under noeviction; XLEN={final_xlen} pending={final_pending}"
);
let total_calls = calls.load(Ordering::SeqCst);
assert!(
total_calls < n_jobs * 5,
"handler invocations ({total_calls}) ran away — looks like a CLAIM loop for {n_jobs} jobs"
);
let mut some_count = 0;
for id in &ids {
if matches!(producer.get_result(id).await, Ok(Some(_))) {
some_count += 1;
}
}
eprintln!("noeviction: {some_count}/{n_jobs} results stored (rest dropped under OOM)");
shutdown.cancel();
let _ = tokio::time::timeout(Duration::from_secs(5), join).await;
flush_queue(&admin, queue).await;
let _: () = admin.quit().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore = "requires REDIS_URL + CHASQUIMQ_RUN_MAXMEMORY_TEST=1; mutates Redis CONFIG"]
async fn maxmemory_allkeys_lru_evicts_results_no_loop() {
if !maxmemory_gate() {
eprintln!("skipping: set CHASQUIMQ_RUN_MAXMEMORY_TEST=1 to run");
return;
}
let admin = admin().await;
let queue = "maxmemory_lru";
let snap = snapshot_config(&admin).await;
config_set(&admin, "stop-writes-on-bgsave-error", "no").await;
flush_queue(&admin, queue).await;
let producer: Producer<Sample> = Producer::connect(&redis_url(), producer_cfg(queue))
.await
.expect("producer");
let result_payload = Bytes::from(vec![b'Y'; 4096]);
let n_jobs = 32usize;
let mut ids: Vec<JobId> = Vec::with_capacity(n_jobs);
for i in 0..n_jobs {
let id = producer.add(Sample { n: i as u32 }).await.expect("add");
ids.push(id);
}
let stream_key = format!("{{chasqui:{queue}}}:stream");
let group = "default";
let gate = Arc::new(tokio::sync::Notify::new());
let gate_h = gate.clone();
let result_for_handler = result_payload.clone();
let calls = Arc::new(AtomicUsize::new(0));
let calls_h = calls.clone();
let consumer: Consumer<Sample> = Consumer::new(redis_url(), consumer_cfg(queue, true, 60));
let shutdown = CancellationToken::new();
let shutdown_clone = shutdown.clone();
let join = tokio::spawn(async move {
consumer
.run(
move |_job: Job<Sample>| {
let calls = calls_h.clone();
let result = result_for_handler.clone();
let gate = gate_h.clone();
async move {
gate.notified().await;
calls.fetch_add(1, Ordering::SeqCst);
Ok(result)
}
},
shutdown_clone,
)
.await
});
let pending_ready = {
let admin = admin.clone();
let stream_key = stream_key.clone();
wait_until(Duration::from_secs(15), move || {
let admin = admin.clone();
let stream_key = stream_key.clone();
async move { xpending_count(&admin, &stream_key, group).await >= n_jobs as i64 }
})
.await
};
assert!(
pending_ready,
"consumer must read all N jobs into pending under allkeys-lru"
);
let cur = used_memory(&admin).await;
let limit = cur.saturating_sub(2048);
config_set(&admin, "maxmemory-policy", "allkeys-lru").await;
config_set(&admin, "maxmemory", &limit.to_string()).await;
for _ in 0..(n_jobs * 4) {
gate.notify_one();
}
let drained = {
let admin = admin.clone();
let stream_key = stream_key.clone();
wait_until(Duration::from_secs(60), move || {
let admin = admin.clone();
let stream_key = stream_key.clone();
async move {
let len = xlen(&admin, &stream_key).await;
let pending = xpending_count(&admin, &stream_key, group).await;
len == 0 && pending == 0
}
})
.await
};
let final_xlen = xlen(&admin, &stream_key).await;
let final_pending = xpending_count(&admin, &stream_key, group).await;
restore_config(&admin, &snap).await;
assert!(
drained,
"stream + pending must drain under allkeys-lru; XLEN={final_xlen} pending={final_pending}"
);
let total_calls = calls.load(Ordering::SeqCst);
assert!(
total_calls < n_jobs * 5,
"handler invocations ({total_calls}) ran away under allkeys-lru for {n_jobs} jobs"
);
let mut some_count = 0;
for id in &ids {
if matches!(producer.get_result(id).await, Ok(Some(_))) {
some_count += 1;
}
}
eprintln!("allkeys-lru: {some_count}/{n_jobs} results survived eviction");
shutdown.cancel();
let _ = tokio::time::timeout(Duration::from_secs(5), join).await;
flush_queue(&admin, queue).await;
let _: () = admin.quit().await.unwrap();
}