use crate::redis::keys::{NAME_FIELD, PAYLOAD_FIELD};
use bytes::Bytes;
use fred::types::Value;
pub(crate) const PROMOTE_SCRIPT: &str = r#"
local time = redis.call('TIME')
local now_ms = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000)
local due = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', now_ms, 'LIMIT', 0, tonumber(ARGV[1]))
local payloads = {}
for i, member in ipairs(due) do
-- Parse the slice-3 length-prefixed member:
-- [u32_le name_len][name utf8][msgpack payload]
local b1, b2, b3, b4 = string.byte(member, 1, 4)
local name_len = b1 + (b2 * 256) + (b3 * 65536) + (b4 * 16777216)
local name = name_len > 0 and string.sub(member, 5, 4 + name_len) or ''
local payload = string.sub(member, 5 + name_len)
if name ~= '' then
redis.call('XADD', KEYS[2], 'MAXLEN', '~', tonumber(ARGV[2]), '*', 'd', payload, 'n', name)
else
redis.call('XADD', KEYS[2], 'MAXLEN', '~', tonumber(ARGV[2]), '*', 'd', payload)
end
redis.call('ZREM', KEYS[1], member)
payloads[i] = payload
end
local depth = redis.call('ZCARD', KEYS[1])
local lag_ms = 0
if depth > 0 then
local oldest = redis.call('ZRANGE', KEYS[1], 0, 0, 'WITHSCORES')
if oldest[2] then
local diff = now_ms - tonumber(oldest[2])
if diff > 0 then lag_ms = diff end
end
end
return {#due, depth, lag_ms, payloads}
"#;
pub(crate) const RETRY_RESCHEDULE_SCRIPT: &str = r#"
local result = redis.call('XACKDEL', KEYS[1], ARGV[1], 'IDS', 1, ARGV[2])
local first
if type(result) == 'table' then
first = tonumber(result[1])
else
first = tonumber(result)
end
if first == 1 then
redis.call('ZADD', KEYS[2], tonumber(ARGV[3]), ARGV[4])
return 1
end
return 0
"#;
pub(crate) const REPLAY_DLQ_SCRIPT: &str = r#"
local dlq = KEYS[1]
local stream = KEYS[2]
local max_stream_len = ARGV[1]
local replayed = 0
local i = 2
while i <= #ARGV do
local dlq_id = ARGV[i]
local payload = ARGV[i + 1]
local name = ARGV[i + 2]
local deleted = redis.call('XDEL', dlq, dlq_id)
if deleted == 1 then
if name ~= nil and name ~= '' then
redis.call('XADD', stream, 'MAXLEN', '~', max_stream_len, '*', 'd', payload, 'n', name)
else
redis.call('XADD', stream, 'MAXLEN', '~', max_stream_len, '*', 'd', payload)
end
replayed = replayed + 1
end
i = i + 3
end
return replayed
"#;
pub(crate) const JOB_OK_SCRIPT: &str = r#"#!lua flags=allow-oom
local result = redis.call('XACKDEL', KEYS[1], ARGV[1], 'IDS', 1, ARGV[2])
local first
if type(result) == 'table' then
first = tonumber(result[1])
else
first = tonumber(result)
end
-- XACKDEL returns 1 (acked + removed), -1 (id not found), or 0 (not in
-- group); only 1 means we own this delivery — both other values mean a
-- concurrent CLAIM/replay won the race and SET is correctly skipped.
--
-- redis.pcall lets the SET fail silently under maxmemory-policy noeviction:
-- if Redis is OOM and rejects the write, we still return success so the
-- ack commits and the entry doesn't stay pending. The result is lost,
-- which matches the documented `None == expired-or-never-written`
-- contract on `Producer::get_result`.
if first == 1 and #ARGV[3] > 0 then
redis.pcall('SET', KEYS[2], ARGV[3], 'EX', tonumber(ARGV[4]))
end
return first
"#;
pub(crate) const RELEASE_LOCK_SCRIPT: &str = r#"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"#;
pub(crate) const SCHEDULE_DELAYED_IDEMPOTENT_SCRIPT: &str = r#"
local set_res = redis.call('SET', KEYS[1], '1', 'NX', 'EX', tonumber(ARGV[1]))
if set_res == false then
return 0
end
redis.call('ZADD', KEYS[2], tonumber(ARGV[2]), ARGV[3])
redis.call('SET', KEYS[3], ARGV[3], 'EX', tonumber(ARGV[1]))
return 1
"#;
pub(crate) const CANCEL_DELAYED_SCRIPT: &str = r#"
local member = redis.call('GET', KEYS[2])
if not member then
return 0
end
local removed = redis.call('ZREM', KEYS[1], member)
if removed == 0 then
return 0
end
redis.call('DEL', KEYS[2])
redis.call('DEL', KEYS[3])
return 1
"#;
pub(crate) const UPSERT_REPEATABLE_SCRIPT: &str = r#"
redis.call('ZADD', KEYS[1], tonumber(ARGV[1]), ARGV[2])
redis.call('HSET', KEYS[2], 'spec', ARGV[3])
return 1
"#;
pub(crate) const REMOVE_REPEATABLE_SCRIPT: &str = r#"
local removed = redis.call('ZREM', KEYS[1], ARGV[1])
redis.call('DEL', KEYS[2])
return removed
"#;
pub(crate) const SCHEDULE_REPEATABLE_SCRIPT: &str = r#"
local now_ms = tonumber(ARGV[1])
local next_fire_ms = tonumber(ARGV[2])
local max_stream_len = tonumber(ARGV[3])
local spec_key = ARGV[4]
local limit = tonumber(ARGV[5])
local end_before_ms = tonumber(ARGV[6])
local fire_count = tonumber(ARGV[7])
local fired_now = 0
local hit_limit = false
local i = 0
while i < fire_count do
local fire_at_ms = tonumber(ARGV[8 + i * 2])
local member = ARGV[9 + i * 2]
if limit > 0 then
local fired_so_far = tonumber(redis.call('HGET', KEYS[4], 'fired')) or 0
if fired_so_far >= limit then
hit_limit = true
break
end
end
if fire_at_ms <= now_ms then
-- Split the slice-3 length-prefixed member:
-- [u32_le name_len][name utf8][msgpack payload]
local b1, b2, b3, b4 = string.byte(member, 1, 4)
local name_len = b1 + (b2 * 256) + (b3 * 65536) + (b4 * 16777216)
local name = name_len > 0 and string.sub(member, 5, 4 + name_len) or ''
local payload = string.sub(member, 5 + name_len)
if name ~= '' then
redis.call('XADD', KEYS[1], 'MAXLEN', '~', max_stream_len, '*', 'd', payload, 'n', name)
else
redis.call('XADD', KEYS[1], 'MAXLEN', '~', max_stream_len, '*', 'd', payload)
end
else
redis.call('ZADD', KEYS[2], fire_at_ms, member)
end
redis.call('HINCRBY', KEYS[4], 'fired', 1)
fired_now = fired_now + 1
i = i + 1
end
local fired = tonumber(redis.call('HGET', KEYS[4], 'fired')) or 0
local removed = 0
local exhausted = (limit > 0 and fired >= limit) or hit_limit
local past_end = (end_before_ms > 0 and next_fire_ms > end_before_ms)
local no_next = (next_fire_ms <= 0)
if exhausted or past_end or no_next then
redis.call('ZREM', KEYS[3], spec_key)
redis.call('DEL', KEYS[4])
removed = 1
else
redis.call('ZADD', KEYS[3], next_fire_ms, spec_key)
end
return {fired_now, removed}
"#;
pub(crate) const ACQUIRE_LOCK_SCRIPT: &str = r#"
local cur = redis.call('GET', KEYS[1])
if cur == false then
redis.call('SET', KEYS[1], ARGV[1], 'EX', tonumber(ARGV[2]))
return 1
end
if cur == ARGV[1] then
redis.call('EXPIRE', KEYS[1], tonumber(ARGV[2]))
return 1
end
return 0
"#;
pub(crate) fn xadd_args(
stream_key: &str,
producer_id: &str,
iid: &str,
max_stream_len: u64,
bytes: Bytes,
name: &str,
) -> Vec<Value> {
let mut args: Vec<Value> = Vec::with_capacity(10 + (!name.is_empty() as usize) * 2);
args.push(Value::from(stream_key));
args.push(Value::from("IDMP"));
args.push(Value::from(producer_id));
args.push(Value::from(iid));
args.push(Value::from("MAXLEN"));
args.push(Value::from("~"));
args.push(Value::from(max_stream_len as i64));
args.push(Value::from("*"));
args.push(Value::from(PAYLOAD_FIELD));
args.push(Value::Bytes(bytes));
if !name.is_empty() {
args.push(Value::from(NAME_FIELD));
args.push(Value::from(name));
}
args
}
pub(crate) fn xreadgroup_args(
group: &str,
consumer: &str,
batch: usize,
block_ms: u64,
claim_min_idle_ms: u64,
stream_key: &str,
) -> Vec<Value> {
vec![
Value::from("GROUP"),
Value::from(group),
Value::from(consumer),
Value::from("COUNT"),
Value::from(batch as i64),
Value::from("BLOCK"),
Value::from(block_ms as i64),
Value::from("CLAIM"),
Value::from(claim_min_idle_ms as i64),
Value::from("STREAMS"),
Value::from(stream_key),
Value::from(">"),
]
}
pub(crate) fn xackdel_args(stream_key: &str, group: &str, ids: &[impl AsRef<str>]) -> Vec<Value> {
let mut args: Vec<Value> = Vec::with_capacity(4 + ids.len());
args.push(Value::from(stream_key));
args.push(Value::from(group));
args.push(Value::from("IDS"));
args.push(Value::from(ids.len() as i64));
for id in ids {
args.push(Value::from(id.as_ref()));
}
args
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn xadd_dlq_args(
dlq_key: &str,
producer_id: &str,
source_id: &str,
payload: Bytes,
reason: &str,
detail: Option<&str>,
max_stream_len: u64,
name: &str,
) -> Vec<Value> {
let mut args: Vec<Value> =
Vec::with_capacity(16 + (detail.is_some() as usize + !name.is_empty() as usize) * 2);
args.push(Value::from(dlq_key));
args.push(Value::from("IDMP"));
args.push(Value::from(producer_id));
args.push(Value::from(source_id));
args.push(Value::from("MAXLEN"));
args.push(Value::from("~"));
args.push(Value::from(max_stream_len as i64));
args.push(Value::from("*"));
args.push(Value::from(PAYLOAD_FIELD));
args.push(Value::Bytes(payload));
if !name.is_empty() {
args.push(Value::from(NAME_FIELD));
args.push(Value::from(name));
}
args.push(Value::from("source_id"));
args.push(Value::from(source_id));
args.push(Value::from("reason"));
args.push(Value::from(reason));
if let Some(d) = detail {
args.push(Value::from("detail"));
args.push(Value::from(d));
}
args
}
pub(crate) fn zadd_delayed_args(delayed_key: &str, run_at_ms: i64, bytes: Bytes) -> Vec<Value> {
vec![
Value::from(delayed_key),
Value::from(run_at_ms),
Value::Bytes(bytes),
]
}
pub(crate) fn evalsha_promote_args(
sha: &str,
delayed_key: &str,
stream_key: &str,
limit: usize,
max_stream_len: u64,
) -> Vec<Value> {
vec![
Value::from(sha),
Value::from(2_i64),
Value::from(delayed_key),
Value::from(stream_key),
Value::from(limit as i64),
Value::from(max_stream_len as i64),
]
}
pub(crate) fn eval_promote_args(
script: &str,
delayed_key: &str,
stream_key: &str,
limit: usize,
max_stream_len: u64,
) -> Vec<Value> {
vec![
Value::from(script),
Value::from(2_i64),
Value::from(delayed_key),
Value::from(stream_key),
Value::from(limit as i64),
Value::from(max_stream_len as i64),
]
}
pub(crate) fn script_load_args(script: &str) -> Vec<Value> {
vec![Value::from("LOAD"), Value::from(script)]
}
pub(crate) fn xrange_args(stream_key: &str, limit: usize) -> Vec<Value> {
vec![
Value::from(stream_key),
Value::from("-"),
Value::from("+"),
Value::from("COUNT"),
Value::from(limit as i64),
]
}
pub(crate) fn evalsha_replay_args(
sha: &str,
dlq_key: &str,
stream_key: &str,
max_stream_len: u64,
triples: &[(String, Bytes, String)],
) -> Vec<Value> {
let mut args: Vec<Value> = Vec::with_capacity(5 + triples.len() * 3);
args.push(Value::from(sha));
args.push(Value::from(2_i64));
args.push(Value::from(dlq_key));
args.push(Value::from(stream_key));
args.push(Value::from(max_stream_len as i64));
for (id, bytes, name) in triples {
args.push(Value::from(id.as_str()));
args.push(Value::Bytes(bytes.clone()));
args.push(Value::from(name.as_str()));
}
args
}
pub(crate) fn eval_replay_args(
script: &str,
dlq_key: &str,
stream_key: &str,
max_stream_len: u64,
triples: &[(String, Bytes, String)],
) -> Vec<Value> {
let mut args: Vec<Value> = Vec::with_capacity(5 + triples.len() * 3);
args.push(Value::from(script));
args.push(Value::from(2_i64));
args.push(Value::from(dlq_key));
args.push(Value::from(stream_key));
args.push(Value::from(max_stream_len as i64));
for (id, bytes, name) in triples {
args.push(Value::from(id.as_str()));
args.push(Value::Bytes(bytes.clone()));
args.push(Value::from(name.as_str()));
}
args
}
pub(crate) fn evalsha_retry_args(
sha: &str,
stream_key: &str,
delayed_key: &str,
group: &str,
entry_id: &str,
run_at_ms: i64,
bytes: Bytes,
) -> Vec<Value> {
vec![
Value::from(sha),
Value::from(2_i64),
Value::from(stream_key),
Value::from(delayed_key),
Value::from(group),
Value::from(entry_id),
Value::from(run_at_ms),
Value::Bytes(bytes),
]
}
pub(crate) fn eval_retry_args(
script: &str,
stream_key: &str,
delayed_key: &str,
group: &str,
entry_id: &str,
run_at_ms: i64,
bytes: Bytes,
) -> Vec<Value> {
vec![
Value::from(script),
Value::from(2_i64),
Value::from(stream_key),
Value::from(delayed_key),
Value::from(group),
Value::from(entry_id),
Value::from(run_at_ms),
Value::Bytes(bytes),
]
}
pub(crate) fn eval_acquire_lock_args(
script: &str,
lock_key: &str,
holder_id: &str,
ttl_secs: u64,
) -> Vec<Value> {
vec![
Value::from(script),
Value::from(1_i64),
Value::from(lock_key),
Value::from(holder_id),
Value::from(ttl_secs as i64),
]
}
pub(crate) fn eval_release_lock_args(script: &str, lock_key: &str, holder_id: &str) -> Vec<Value> {
vec![
Value::from(script),
Value::from(1_i64),
Value::from(lock_key),
Value::from(holder_id),
]
}
pub(crate) fn evalsha_schedule_delayed_idempotent_args(
sha: &str,
marker_key: &str,
delayed_key: &str,
index_key: &str,
marker_ttl_secs: u64,
run_at_ms: i64,
bytes: Bytes,
) -> Vec<Value> {
vec![
Value::from(sha),
Value::from(3_i64),
Value::from(marker_key),
Value::from(delayed_key),
Value::from(index_key),
Value::from(marker_ttl_secs as i64),
Value::from(run_at_ms),
Value::Bytes(bytes),
]
}
pub(crate) fn eval_schedule_delayed_idempotent_args(
script: &str,
marker_key: &str,
delayed_key: &str,
index_key: &str,
marker_ttl_secs: u64,
run_at_ms: i64,
bytes: Bytes,
) -> Vec<Value> {
vec![
Value::from(script),
Value::from(3_i64),
Value::from(marker_key),
Value::from(delayed_key),
Value::from(index_key),
Value::from(marker_ttl_secs as i64),
Value::from(run_at_ms),
Value::Bytes(bytes),
]
}
pub(crate) fn evalsha_cancel_delayed_args(
sha: &str,
delayed_key: &str,
index_key: &str,
marker_key: &str,
) -> Vec<Value> {
vec![
Value::from(sha),
Value::from(3_i64),
Value::from(delayed_key),
Value::from(index_key),
Value::from(marker_key),
]
}
pub(crate) fn eval_cancel_delayed_args(
script: &str,
delayed_key: &str,
index_key: &str,
marker_key: &str,
) -> Vec<Value> {
vec![
Value::from(script),
Value::from(3_i64),
Value::from(delayed_key),
Value::from(index_key),
Value::from(marker_key),
]
}
pub(crate) fn evalsha_upsert_repeatable_args(
sha: &str,
repeat_key: &str,
spec_hash_key: &str,
next_fire_ms: i64,
spec_key: &str,
bytes: Bytes,
) -> Vec<Value> {
vec![
Value::from(sha),
Value::from(2_i64),
Value::from(repeat_key),
Value::from(spec_hash_key),
Value::from(next_fire_ms),
Value::from(spec_key),
Value::Bytes(bytes),
]
}
pub(crate) fn eval_upsert_repeatable_args(
script: &str,
repeat_key: &str,
spec_hash_key: &str,
next_fire_ms: i64,
spec_key: &str,
bytes: Bytes,
) -> Vec<Value> {
vec![
Value::from(script),
Value::from(2_i64),
Value::from(repeat_key),
Value::from(spec_hash_key),
Value::from(next_fire_ms),
Value::from(spec_key),
Value::Bytes(bytes),
]
}
pub(crate) fn evalsha_remove_repeatable_args(
sha: &str,
repeat_key: &str,
spec_hash_key: &str,
spec_key: &str,
) -> Vec<Value> {
vec![
Value::from(sha),
Value::from(2_i64),
Value::from(repeat_key),
Value::from(spec_hash_key),
Value::from(spec_key),
]
}
pub(crate) fn eval_remove_repeatable_args(
script: &str,
repeat_key: &str,
spec_hash_key: &str,
spec_key: &str,
) -> Vec<Value> {
vec![
Value::from(script),
Value::from(2_i64),
Value::from(repeat_key),
Value::from(spec_hash_key),
Value::from(spec_key),
]
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn evalsha_schedule_repeatable_args(
sha: &str,
stream_key: &str,
delayed_key: &str,
repeat_key: &str,
spec_hash_key: &str,
now_ms: i64,
next_fire_ms: i64,
max_stream_len: u64,
spec_key: &str,
limit: u64,
end_before_ms: u64,
fires: &[(i64, Bytes)],
) -> Vec<Value> {
let mut args: Vec<Value> = Vec::with_capacity(12 + fires.len() * 2);
args.push(Value::from(sha));
args.push(Value::from(4_i64));
args.push(Value::from(stream_key));
args.push(Value::from(delayed_key));
args.push(Value::from(repeat_key));
args.push(Value::from(spec_hash_key));
args.push(Value::from(now_ms));
args.push(Value::from(next_fire_ms));
args.push(Value::from(max_stream_len as i64));
args.push(Value::from(spec_key));
args.push(Value::from(limit as i64));
args.push(Value::from(end_before_ms as i64));
args.push(Value::from(fires.len() as i64));
for (fire_at_ms, bytes) in fires {
args.push(Value::from(*fire_at_ms));
args.push(Value::Bytes(bytes.clone()));
}
args
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn eval_schedule_repeatable_args(
script: &str,
stream_key: &str,
delayed_key: &str,
repeat_key: &str,
spec_hash_key: &str,
now_ms: i64,
next_fire_ms: i64,
max_stream_len: u64,
spec_key: &str,
limit: u64,
end_before_ms: u64,
fires: &[(i64, Bytes)],
) -> Vec<Value> {
let mut args: Vec<Value> = Vec::with_capacity(12 + fires.len() * 2);
args.push(Value::from(script));
args.push(Value::from(4_i64));
args.push(Value::from(stream_key));
args.push(Value::from(delayed_key));
args.push(Value::from(repeat_key));
args.push(Value::from(spec_hash_key));
args.push(Value::from(now_ms));
args.push(Value::from(next_fire_ms));
args.push(Value::from(max_stream_len as i64));
args.push(Value::from(spec_key));
args.push(Value::from(limit as i64));
args.push(Value::from(end_before_ms as i64));
args.push(Value::from(fires.len() as i64));
for (fire_at_ms, bytes) in fires {
args.push(Value::from(*fire_at_ms));
args.push(Value::Bytes(bytes.clone()));
}
args
}
pub(crate) fn evalsha_job_ok_args(
sha: &str,
stream_key: &str,
result_key: &str,
group: &str,
entry_id: &str,
result_bytes: Bytes,
ttl_secs: u64,
) -> Vec<Value> {
vec![
Value::from(sha),
Value::from(2_i64),
Value::from(stream_key),
Value::from(result_key),
Value::from(group),
Value::from(entry_id),
Value::Bytes(result_bytes),
Value::from(ttl_secs as i64),
]
}
pub(crate) fn eval_job_ok_args(
script: &str,
stream_key: &str,
result_key: &str,
group: &str,
entry_id: &str,
result_bytes: Bytes,
ttl_secs: u64,
) -> Vec<Value> {
vec![
Value::from(script),
Value::from(2_i64),
Value::from(stream_key),
Value::from(result_key),
Value::from(group),
Value::from(entry_id),
Value::Bytes(result_bytes),
Value::from(ttl_secs as i64),
]
}
pub(crate) fn evalsha_acquire_lock_args(
sha: &str,
lock_key: &str,
holder_id: &str,
ttl_secs: u64,
) -> Vec<Value> {
vec![
Value::from(sha),
Value::from(1_i64),
Value::from(lock_key),
Value::from(holder_id),
Value::from(ttl_secs as i64),
]
}