use crate::backend::rdb::redis_scripts::RedisArg;
use crate::backend::rdb::RedisBroker;
use crate::base::constants::{DEFAULT_ARCHIVED_EXPIRATION_IN_DAYS, DEFAULT_MAX_ARCHIVE_SIZE};
use crate::base::keys;
use crate::base::keys::TaskState;
use crate::base::Broker;
use crate::error::{Error, Result};
use crate::proto::{ServerInfo, TaskMessage, WorkerInfo};
use crate::task::{generate_task_id, Task, TaskInfo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use prost::Message;
use redis::AsyncCommands;
use std::time::Duration;
const STATS_TTL: i64 = 90 * 24 * 60 * 60; const LEASE_DURATION: i64 = 3600;
#[async_trait]
impl Broker for RedisBroker {
async fn ping(&self) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let _: String = conn.ping().await?;
Ok(())
}
async fn close(&self) -> Result<()> {
Ok(())
}
async fn enqueue(&self, task: &Task) -> Result<TaskInfo> {
let mut conn = self.get_async_connection().await?;
let msg = self.task_to_message(task);
let encoded = self.encode_task_message(&msg)?;
let _: () = conn.sadd(keys::ALL_QUEUES, &msg.queue).await?;
let task_key = keys::task_key(&msg.queue, &msg.id);
let pending_key = keys::pending_key(&msg.queue);
let now_nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
let result: i64 = {
let keys = vec![task_key, pending_key];
let args = vec![
RedisArg::Bytes(encoded),
RedisArg::Str(msg.id.clone()),
RedisArg::Int(now_nanos),
];
self
.script_manager
.eval_script(&mut conn, "enqueue", &keys, &args)
.await?
};
if result == 0 {
return Err(Error::TaskIdConflict);
}
Ok(TaskInfo::from_proto(&msg, TaskState::Pending, None, None))
}
async fn enqueue_unique(&self, task: &Task, ttl: Duration) -> Result<TaskInfo> {
let mut conn = self.get_async_connection().await?;
let mut msg = self.task_to_message(task);
let unique_key = crate::task::generate_unique_key(&msg.queue, &task.task_type, &task.payload);
msg.unique_key = unique_key.clone();
let encoded = self.encode_task_message(&msg)?;
let _: () = conn.sadd(keys::ALL_QUEUES, &msg.queue).await?;
let task_key = keys::task_key(&msg.queue, &msg.id);
let pending_key = keys::pending_key(&msg.queue);
let now_nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
let result: i64 = {
let keys = vec![unique_key, task_key, pending_key];
let args = vec![
RedisArg::Str(msg.id.clone()),
RedisArg::Int(ttl.as_secs() as i64),
RedisArg::Bytes(encoded),
RedisArg::Int(now_nanos),
];
self
.script_manager
.eval_script(&mut conn, "enqueue_unique", &keys, &args)
.await?
};
match result {
-1 => Err(Error::TaskDuplicate),
0 => Err(Error::TaskIdConflict),
1 => Ok(TaskInfo::from_proto(&msg, TaskState::Pending, None, None)),
_ => Err(Error::other("Unexpected script result")),
}
}
async fn dequeue(&self, queues: &[String]) -> Result<Option<TaskMessage>> {
let mut conn = self.get_async_connection().await?;
if queues.is_empty() {
return Ok(None);
}
for queue in queues {
let script_keys = vec![
keys::pending_key(queue), keys::paused_key(queue), keys::active_key(queue), keys::lease_key(queue), ];
let lease_expiry = Utc::now() + chrono::Duration::seconds(LEASE_DURATION); let task_key_prefix = keys::task_key_prefix(queue);
let args = vec![
RedisArg::Int(lease_expiry.timestamp()),
RedisArg::Str(task_key_prefix),
];
let result: Option<Vec<u8>> = self
.script_manager
.eval_script(&mut conn, "dequeue", &script_keys, &args)
.await?;
if let Some(encoded_msg) = result {
let msg = self.decode_task_message(&encoded_msg)?;
return Ok(Some(msg));
}
}
Ok(None)
}
async fn done(&self, msg: &TaskMessage) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let now = Utc::now();
let tomorrow = now + chrono::Duration::days(1);
let end_of_tomorrow = match tomorrow.date_naive().and_hms_opt(23, 59, 59) {
Some(dt) => dt.and_utc(),
None => return Err(Error::other("Invalid time for end_of_tomorrow")),
};
let stats_expiration = end_of_tomorrow.timestamp();
let mut keys = vec![
keys::active_key(&msg.queue),
keys::lease_key(&msg.queue),
keys::task_key(&msg.queue, &msg.id),
keys::processed_key(&msg.queue, &now),
keys::processed_total_key(&msg.queue),
];
let args = vec![
RedisArg::Str(msg.id.clone()),
RedisArg::Int(stats_expiration),
RedisArg::Int(i64::MAX),
];
if !msg.unique_key.is_empty() {
keys.push(msg.unique_key.clone());
}
let _result: String = self
.script_manager
.eval_script(&mut conn, "done", &keys, &args)
.await?;
Ok(())
}
async fn mark_as_complete(&self, msg: &TaskMessage) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let mut msg = msg.clone();
let queue = &msg.queue;
let now = Utc::now();
msg.completed_at = now.timestamp();
let completed_key = keys::completed_key(queue);
let active_key = keys::active_key(queue);
let lease_key = keys::lease_key(queue);
let task_key = keys::task_key(queue, &msg.id);
let processed_key = keys::processed_key(queue, &now);
let processed_total_key = keys::processed_total_key(queue);
let stats_expiration = now + chrono::Duration::seconds(STATS_TTL);
let completed_at = now;
let retention = msg.retention;
let encoded = self.encode_task_message(&msg)?;
let mut keys = vec![
active_key,
lease_key,
completed_key,
task_key,
processed_key,
processed_total_key,
];
if !msg.unique_key.is_empty() {
keys.push(msg.unique_key.clone())
};
let args = vec![
RedisArg::Str(msg.id.clone()),
RedisArg::Int(stats_expiration.timestamp()),
RedisArg::Int(completed_at.timestamp() + retention),
RedisArg::Bytes(encoded),
RedisArg::Int(i64::MAX),
];
let script = if msg.unique_key.is_empty() {
"mark_as_complete"
} else {
"mark_as_complete_unique"
};
self
.script_manager
.eval_script::<()>(&mut conn, script, &keys, &args)
.await?;
Ok(())
}
async fn requeue(
&self,
msg: &TaskMessage,
process_at: DateTime<Utc>,
error_msg: &str,
) -> Result<()> {
self
.retry(msg, process_at, error_msg, !error_msg.is_empty())
.await
}
async fn schedule(&self, task: &Task, process_at: DateTime<Utc>) -> Result<TaskInfo> {
let mut conn = self.get_async_connection().await?;
let msg = self.task_to_message(task);
let encoded = self.encode_task_message(&msg)?;
let _: () = conn.sadd(keys::ALL_QUEUES, &msg.queue).await?;
let task_key = keys::task_key(&msg.queue, &msg.id);
let scheduled_key = keys::scheduled_key(&msg.queue);
let process_timestamp = process_at.timestamp();
let result: i64 = {
let keys = vec![task_key, scheduled_key];
let args = vec![
RedisArg::Bytes(encoded),
RedisArg::Int(process_timestamp),
RedisArg::Str(msg.id.clone()),
];
self
.script_manager
.eval_script(&mut conn, "schedule", &keys, &args)
.await?
};
if result == 0 {
return Err(Error::TaskIdConflict);
}
Ok(TaskInfo::from_proto(&msg, TaskState::Scheduled, None, None))
}
async fn schedule_unique(
&self,
task: &Task,
process_at: DateTime<Utc>,
ttl: Duration,
) -> Result<TaskInfo> {
let mut conn = self.get_async_connection().await?;
let mut msg = self.task_to_message(task);
let unique_key = crate::task::generate_unique_key(&msg.queue, &task.task_type, &task.payload);
msg.unique_key = unique_key.clone();
let encoded = self.encode_task_message(&msg)?;
let _: () = conn.sadd(keys::ALL_QUEUES, &msg.queue).await?;
let task_key = keys::task_key(&msg.queue, &msg.id);
let scheduled_key = keys::scheduled_key(&msg.queue);
let process_timestamp = process_at.timestamp();
let result: i64 = {
let keys = vec![unique_key.clone(), task_key, scheduled_key];
let args = vec![
RedisArg::Str(msg.id.clone()),
RedisArg::Int(ttl.as_secs() as i64),
RedisArg::Int(process_timestamp),
RedisArg::Bytes(encoded),
];
self
.script_manager
.eval_script(&mut conn, "schedule_unique", &keys, &args)
.await?
};
match result {
1 => Ok(TaskInfo::from_proto(&msg, TaskState::Scheduled, None, None)),
0 => Err(Error::TaskIdConflict),
-1 => Err(Error::TaskDuplicate),
_ => Err(Error::other("Unexpected script result")),
}
}
async fn retry(
&self,
msg: &TaskMessage,
process_at: DateTime<Utc>,
error_msg: &str,
is_failure: bool,
) -> Result<()> {
let mut msg = msg.clone();
if is_failure {
msg.retried += 1;
}
msg.error_msg = error_msg.to_string();
msg.last_failed_at = Utc::now().timestamp();
let mut conn = self.get_async_connection().await?;
let queue = &msg.queue;
let now = Utc::now();
let retry_key = keys::retry_key(queue);
let active_key = keys::active_key(queue);
let lease_key = keys::lease_key(queue);
let task_key = keys::task_key(queue, &msg.id);
let processed_key = keys::processed_key(queue, &now);
let failed_key = keys::failed_key(queue, &now);
let process_total_key = keys::processed_total_key(queue);
let failed_total_key = keys::failed_total_key(queue);
let expire_at = Utc::now().timestamp() + STATS_TTL;
let keys = vec![
task_key,
active_key,
lease_key,
retry_key,
processed_key,
failed_key,
process_total_key,
failed_total_key,
];
let args = vec![
RedisArg::Str(msg.id.clone()),
RedisArg::Bytes(msg.encode_to_vec()),
RedisArg::Int(process_at.timestamp()),
RedisArg::Int(expire_at),
RedisArg::Bool(is_failure),
RedisArg::Int(i64::MAX),
];
self
.script_manager
.eval_script::<()>(&mut conn, "retry", &keys, &args)
.await?;
Ok(())
}
async fn archive(&self, msg: &TaskMessage, error_msg: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let now = Utc::now();
let cutoff = now - chrono::Duration::days(DEFAULT_ARCHIVED_EXPIRATION_IN_DAYS);
let expire_at = now.timestamp() + STATS_TTL;
let mut archived_msg = msg.clone();
archived_msg.error_msg = error_msg.to_string();
archived_msg.last_failed_at = now.timestamp();
let archived_encoded = self.encode_task_message(&archived_msg)?;
let keys = vec![
keys::task_key(&msg.queue, &msg.id),
keys::active_key(&msg.queue),
keys::lease_key(&msg.queue),
keys::archived_key(&msg.queue),
keys::processed_key(&msg.queue, &now),
keys::failed_key(&msg.queue, &now),
keys::processed_total_key(&msg.queue),
keys::failed_total_key(&msg.queue),
keys::task_key_prefix(&msg.queue),
];
let args = vec![
RedisArg::Str(msg.id.clone()), RedisArg::Bytes(archived_encoded),
RedisArg::Int(now.timestamp()),
RedisArg::Int(cutoff.timestamp()),
RedisArg::Int(DEFAULT_MAX_ARCHIVE_SIZE),
RedisArg::Int(expire_at),
RedisArg::Int(i64::MAX),
];
let _: () = self
.script_manager
.eval_script(&mut conn, "archive", &keys, &args)
.await?;
Ok(())
}
async fn forward_if_ready(&self, queues: &[String]) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let now = Utc::now().timestamp();
let now_nanos = Utc::now().timestamp_nanos_opt().unwrap_or_default();
let mut forwarded = 0i64;
for queue in queues {
let scheduled_key = keys::scheduled_key(queue);
let pending_key = keys::pending_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let group_key_prefix = keys::group_key_prefix(queue);
let keys = vec![scheduled_key, pending_key.clone()];
let args = vec![
RedisArg::Int(now),
RedisArg::Str(task_key_prefix.clone()),
RedisArg::Int(now_nanos),
RedisArg::Str(group_key_prefix.clone()),
];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "forward", &keys, &args)
.await?;
forwarded += count;
let retry_key = keys::retry_key(queue);
let keys = vec![retry_key, pending_key];
let args = vec![
RedisArg::Int(now),
RedisArg::Str(task_key_prefix),
RedisArg::Int(now_nanos),
RedisArg::Str(group_key_prefix),
];
let count: i64 = self
.script_manager
.eval_script(&mut conn, "forward", &keys, &args)
.await?;
forwarded += count;
}
Ok(forwarded)
}
async fn add_to_group(&self, task: &Task, group: &str) -> Result<TaskInfo> {
let mut conn = self.get_async_connection().await?;
let mut msg = self.task_to_message(task);
msg.group_key = group.to_string();
let encoded = self.encode_task_message(&msg)?;
let _: () = conn.sadd(keys::ALL_QUEUES, &msg.queue).await?;
let task_key = keys::task_key(&msg.queue, &msg.id);
let group_key = keys::group_key(&msg.queue, group);
let groups_key = keys::groups_key(&msg.queue);
let now = Utc::now().timestamp();
let result: i64 = {
let keys = vec![task_key, group_key, groups_key];
let args = vec![
RedisArg::Bytes(encoded),
RedisArg::Str(msg.id.clone()),
RedisArg::Int(now),
RedisArg::Str(group.to_string()),
];
self
.script_manager
.eval_script(&mut conn, "add_to_group", &keys, &args)
.await?
};
if result == 0 {
return Err(Error::TaskIdConflict);
}
Ok(TaskInfo::from_proto(
&msg,
TaskState::Aggregating,
None,
None,
))
}
async fn add_to_group_unique(&self, task: &Task, group: &str, ttl: Duration) -> Result<TaskInfo> {
let mut conn = self.get_async_connection().await?;
let mut msg = self.task_to_message(task);
msg.group_key = group.to_string();
let unique_key = crate::task::generate_unique_key(&msg.queue, &task.task_type, &task.payload);
msg.unique_key = unique_key.clone();
let encoded = self.encode_task_message(&msg)?;
let _: () = conn.sadd(keys::ALL_QUEUES, &msg.queue).await?;
let task_key = keys::task_key(&msg.queue, &msg.id);
let group_key = keys::group_key(&msg.queue, group);
let groups_key = keys::groups_key(&msg.queue);
let now = Utc::now().timestamp();
let result: i64 = {
let keys = vec![unique_key.clone(), task_key, group_key, groups_key];
let args = vec![
RedisArg::Str(msg.id.clone()),
RedisArg::Int(ttl.as_secs() as i64),
RedisArg::Bytes(encoded),
RedisArg::Int(now),
RedisArg::Str(group.to_string()),
];
self
.script_manager
.eval_script(&mut conn, "add_to_group_unique", &keys, &args)
.await?
};
match result {
1 => Ok(TaskInfo::from_proto(
&msg,
TaskState::Aggregating,
None,
None,
)),
0 => Err(Error::TaskIdConflict),
-1 => Err(Error::TaskDuplicate),
_ => Err(Error::other("Unexpected script result")),
}
}
async fn list_groups(&self, queue: &str) -> Result<Vec<String>> {
let mut conn = self.get_async_connection().await?;
let groups: Vec<String> = conn.smembers(keys::all_groups(queue)).await?;
Ok(groups)
}
async fn aggregation_check(
&self,
queue: &str,
group: &str,
aggregation_delay: Duration,
max_delay: Duration,
max_size: usize,
) -> Result<Option<String>> {
let mut conn = self.get_async_connection().await?;
let aggregation_set_id = generate_task_id();
let keys = vec![
keys::group_key(queue, group),
keys::aggregation_set_key(queue, group, &aggregation_set_id),
keys::all_aggregation_sets(queue),
keys::all_groups(queue),
];
let expire_at = Utc::now().timestamp() + STATS_TTL;
let args = vec![
RedisArg::Int(max_size as i64),
RedisArg::Int(max_delay.as_secs() as i64),
RedisArg::Int(aggregation_delay.as_secs() as i64),
RedisArg::Float(expire_at as f64),
RedisArg::Float(Utc::now().timestamp() as f64),
RedisArg::Str(group.to_string()),
];
let result: Option<String> = self
.script_manager
.eval_script(&mut conn, "aggregation_check", &keys, &args)
.await?;
match result {
Some(ref s) => match s.as_str() {
"1" => Ok(Some(aggregation_set_id)),
_ => Ok(None),
},
None => Ok(None),
}
}
async fn read_aggregation_set(
&self,
queue: &str,
group: &str,
set_id: &str,
) -> Result<Vec<TaskMessage>> {
let mut conn = self.get_async_connection().await?;
let aggregation_key = keys::aggregation_set_key(queue, group, set_id);
let keys = vec![aggregation_key.clone()];
let args = vec![RedisArg::Str(keys::task_key_prefix(queue))];
let result: Vec<Vec<u8>> = self
.script_manager
.eval_script(&mut conn, "read_aggregation_set", &keys, &args)
.await?;
let mut messages = Vec::new();
for data in result {
if let Ok(msg) = self.decode_task_message(&data) {
messages.push(msg);
}
}
let _deadline_unix: f64 = conn
.zscore(keys::all_aggregation_sets(queue), aggregation_key)
.await?;
Ok(messages)
}
async fn delete_aggregation_set(&self, queue: &str, group: &str, set_id: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let keys = vec![
keys::aggregation_set_key(queue, group, set_id),
keys::all_aggregation_sets(queue),
];
let args = vec![RedisArg::Str(keys::task_key_prefix(queue))];
let _: () = self
.script_manager
.eval_script(&mut conn, "delete_aggregation_set", &keys, &args)
.await?;
Ok(())
}
async fn reclaim_stale_aggregation_sets(&self, queue: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let all_sets_key = keys::all_aggregation_sets(queue);
let keys = vec![all_sets_key];
let args = vec![RedisArg::Int(Utc::now().timestamp())];
let _: () = self
.script_manager
.eval_script(&mut conn, "reclaim_stale_aggregation_sets", &keys, &args)
.await?;
Ok(())
}
async fn delete_expired_completed_tasks(&self, queue: &str) -> Result<i64> {
let mut conn = self.get_async_connection().await?;
let completed_key = keys::completed_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let cutoff_time = (Utc::now() - chrono::Duration::days(7)).timestamp();
let batch_size = 100;
let keys = vec![completed_key];
let args = vec![
RedisArg::Int(cutoff_time),
RedisArg::Str(task_key_prefix),
RedisArg::Int(batch_size),
];
let deleted_count: i64 = self
.script_manager
.eval_script(&mut conn, "delete_expired_completed_tasks", &keys, &args)
.await?;
Ok(deleted_count)
}
async fn list_lease_expired(
&self,
cutoff: DateTime<Utc>,
queues: &[String],
) -> Result<Vec<TaskMessage>> {
let mut conn = self.get_async_connection().await?;
let mut expired_tasks = Vec::new();
let cutoff_timestamp = cutoff.timestamp();
for queue in queues {
let lease_key = keys::lease_key(queue);
let task_key_prefix = keys::task_key_prefix(queue);
let keys = vec![lease_key];
let args = vec![
RedisArg::Int(cutoff_timestamp),
RedisArg::Str(task_key_prefix),
];
let task_data_list: Vec<Vec<u8>> = self
.script_manager
.eval_script(&mut conn, "list_lease_expired", &keys, &args)
.await?;
for task_data in task_data_list {
if let Ok(msg) = self.decode_task_message(&task_data) {
expired_tasks.push(msg);
}
}
}
Ok(expired_tasks)
}
async fn extend_lease(&self, queue: &str, task_id: &str, lease_duration: Duration) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let lease_key = keys::lease_key(queue);
let keys = vec![lease_key];
let args = vec![
RedisArg::Str(task_id.to_string()),
RedisArg::Int(lease_duration.as_secs() as i64),
];
self
.script_manager
.eval_script::<()>(&mut conn, "extend_lease", &keys, &args)
.await?;
Ok(())
}
async fn write_server_state(
&self,
server_info: &ServerInfo,
workers: Vec<WorkerInfo>,
ttl: Duration,
tenant: Option<&str>,
) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let (server_key, workers_key) = keys::server_and_workers_keys(
tenant,
&server_info.host,
server_info.pid,
&server_info.server_id,
);
let exp_timestamp = (Utc::now()
+ chrono::Duration::from_std(ttl).map_err(|e| Error::other(format!("invalid ttl: {e}")))?)
.timestamp();
let _: () = conn
.zadd(keys::ALL_SERVERS, &server_key, exp_timestamp as f64)
.await?;
let _: () = conn
.zadd(keys::ALL_WORKERS, &workers_key, exp_timestamp as f64)
.await?;
let server_info_bytes = server_info.encode_to_vec();
let keys = vec![server_key, workers_key];
let mut args = vec![
RedisArg::Int(ttl.as_secs() as i64),
RedisArg::Bytes(server_info_bytes),
];
for worker in workers {
let worker_info = worker.encode_to_vec();
args.push(RedisArg::Str(worker.task_id));
args.push(RedisArg::Bytes(worker_info));
}
let _: String = self
.script_manager
.eval_script(&mut conn, "write_server_state", &keys, &args)
.await?;
Ok(())
}
async fn clear_server_state(
&self,
host: &str,
pid: i32,
server_id: &str,
tenant: Option<&str>,
) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let (server_key, workers_key) = keys::server_and_workers_keys(tenant, host, pid, server_id);
let _: () = conn.zrem(keys::ALL_SERVERS, &server_key).await?;
let _: () = conn.zrem(keys::ALL_WORKERS, &workers_key).await?;
let keys = vec![server_key, workers_key];
let _: String = self
.script_manager
.eval_script(&mut conn, "clear_server_state", &keys, &[])
.await?;
Ok(())
}
async fn cancellation_pub_sub(
&self,
) -> Result<Box<dyn futures::Stream<Item = Result<String>> + Unpin + Send>> {
use futures::StreamExt;
let mut pubsub = self.get_pubsub().await?;
pubsub.subscribe(keys::CANCEL_CHANNEL).await?;
let message_stream = pubsub.into_on_message();
let stream = message_stream.filter_map(|msg| async move {
match msg.get_payload::<String>() {
Ok(task_id) => Some(Ok(task_id)),
Err(e) => {
tracing::warn!("Failed to parse cancellation message: {}", e);
Some(Err(Error::other(format!("Failed to parse message: {e}"))))
}
}
});
Ok(Box::new(Box::pin(stream)))
}
async fn publish_cancellation(&self, task_id: &str) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let _: i32 = conn.publish(keys::CANCEL_CHANNEL, task_id).await?;
Ok(())
}
async fn write_result(&self, queue: &str, task_id: &str, result: &[u8]) -> Result<()> {
let mut conn = self.get_async_connection().await?;
let result_key = keys::task_key(queue, task_id);
let _: () = conn.hset(&result_key, "result", result).await?;
Ok(())
}
}
#[async_trait]
impl crate::base::SchedulerBroker for RedisBroker {
async fn write_scheduler_entries(
&self,
entries: &[crate::proto::SchedulerEntry],
scheduler_id: &str,
ttl_secs: u64,
tenant: Option<&str>,
) -> Result<()> {
RedisBroker::write_scheduler_entries(self, entries, scheduler_id, ttl_secs, tenant).await
}
async fn record_scheduler_enqueue_event(
&self,
event: &crate::proto::SchedulerEnqueueEvent,
entry_id: &str,
) -> Result<()> {
RedisBroker::record_scheduler_enqueue_event(self, event, entry_id).await
}
async fn scheduler_entries_script(
&self,
scheduler_id: &str,
) -> Result<std::collections::HashMap<String, Vec<u8>>> {
RedisBroker::scheduler_entries_script(self, scheduler_id).await
}
async fn scheduler_events_script(&self, count: usize) -> Result<Vec<Vec<u8>>> {
RedisBroker::scheduler_events_script(self, count).await
}
async fn clear_scheduler_entries(&self, scheduler_id: &str, tenant: Option<&str>) -> Result<()> {
RedisBroker::clear_scheduler_entries(self, scheduler_id, tenant).await
}
}