use crate::base::constants::TIME_LAYOUT_YMD;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
pub const QUEUE_START: &str = "{";
pub const QUEUE_END: &str = "}";
pub const ALL_SERVERS: &str = "asynq:servers";
pub const ALL_WORKERS: &str = "asynq:workers";
pub const ALL_SCHEDULERS: &str = "asynq:schedulers";
pub const ALL_QUEUES: &str = "asynq:queues";
pub const CANCEL_CHANNEL: &str = "asynq:cancel";
pub const SCHEDULER_EVENTS: &str = "asynq:scheduler:events";
pub const QUEUE_PREFIX: &str = "asynq:";
pub const SCHEDULED_PREFIX: &str = "asynq:scheduled:";
pub const AGGREGATING_PREFIX: &str = "asynq:aggregating:";
pub const SERVERS_PREFIX: &str = "asynq:servers:";
pub const WORKERS_PREFIX: &str = "asynq:workers:";
pub const TASK_RESULT_PREFIX: &str = "asynq:result:";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TaskState {
Active,
Pending,
Scheduled,
Retry,
Archived,
Completed,
Aggregating,
}
impl TaskState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Active => "active",
Self::Pending => "pending",
Self::Scheduled => "scheduled",
Self::Retry => "retry",
Self::Archived => "archived",
Self::Completed => "completed",
Self::Aggregating => "aggregating",
}
}
pub fn queue_key(&self, qname: &str, gname: Option<&str>) -> String {
match self {
Self::Active => active_key(qname),
Self::Pending => pending_key(qname),
Self::Scheduled => scheduled_key(qname),
Self::Retry => retry_key(qname),
Self::Archived => archived_key(qname),
Self::Completed => completed_key(qname),
Self::Aggregating => aggregating_key(qname, gname.unwrap_or("")), }
}
}
impl FromStr for TaskState {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"active" => Ok(Self::Active),
"pending" => Ok(Self::Pending),
"scheduled" => Ok(Self::Scheduled),
"retry" => Ok(Self::Retry),
"archived" => Ok(Self::Archived),
"completed" => Ok(Self::Completed),
"aggregating" => Ok(Self::Aggregating),
_ => Err(()),
}
}
}
impl std::fmt::Display for TaskState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
pub fn queue_key_prefix(qname: &str) -> String {
format!("asynq:{{{qname}}}:")
}
pub fn task_key_prefix(qname: &str) -> String {
format!("{}t:", queue_key_prefix(qname))
}
pub fn task_key(qname: &str, id: &str) -> String {
format!("{}{}", task_key_prefix(qname), id)
}
pub fn pending_key(qname: &str) -> String {
format!("{}{}", queue_key_prefix(qname), TaskState::Pending)
}
pub fn active_key(qname: &str) -> String {
format!("{}{}", queue_key_prefix(qname), TaskState::Active)
}
pub fn scheduled_key(qname: &str) -> String {
format!("{}{}", queue_key_prefix(qname), TaskState::Scheduled)
}
pub fn retry_key(qname: &str) -> String {
format!("{}{}", queue_key_prefix(qname), TaskState::Retry)
}
pub fn archived_key(qname: &str) -> String {
format!("{}{}", queue_key_prefix(qname), TaskState::Archived)
}
pub fn completed_key(qname: &str) -> String {
format!("{}{}", queue_key_prefix(qname), TaskState::Completed)
}
pub fn paused_key(qname: &str) -> String {
format!("{}paused", queue_key_prefix(qname))
}
pub fn lease_key(qname: &str) -> String {
format!("{}lease", queue_key_prefix(qname))
}
pub fn aggregating_key(queue: &str, group: &str) -> String {
group_key(queue, group)
}
pub fn unique_key(qname: &str, task_type: &str, payload: &[u8]) -> String {
if payload.is_empty() {
return format!("{}unique:{}:", queue_key_prefix(qname), task_type);
}
let digest = md5::compute(payload);
let checksum = format!("{digest:x}");
format!(
"{}unique:{}:{}",
queue_key_prefix(qname),
task_type,
checksum
)
}
pub fn group_key_prefix(qname: &str) -> String {
format!("{}g:", queue_key_prefix(qname))
}
pub fn group_key(qname: &str, group_key: &str) -> String {
format!("{}{}", group_key_prefix(qname), group_key)
}
pub fn aggregation_set_key(qname: &str, group_name: &str, set_id: &str) -> String {
format!("{}:{}", group_key(qname, group_name), set_id)
}
pub fn all_groups(qname: &str) -> String {
format!("{}groups", queue_key_prefix(qname))
}
pub fn groups_key(qname: &str) -> String {
all_groups(qname)
}
pub fn all_aggregation_sets(qname: &str) -> String {
format!("{}aggregation_sets", queue_key_prefix(qname))
}
pub fn server_info_key(hostname: &str, pid: i32, server_id: &str) -> String {
format!("{SERVERS_PREFIX}{{{hostname}:{pid}:{server_id}}}")
}
pub fn server_info_key_with_tenant(
tenant: &str,
hostname: &str,
pid: i32,
server_id: &str,
) -> String {
format!("{SERVERS_PREFIX}{{{tenant}:{hostname}:{pid}:{server_id}}}")
}
pub fn workers_key(hostname: &str, pid: i32, server_id: &str) -> String {
format!("{WORKERS_PREFIX}{{{hostname}:{pid}:{server_id}}}")
}
pub fn workers_key_with_tenant(tenant: &str, hostname: &str, pid: i32, server_id: &str) -> String {
format!("{WORKERS_PREFIX}{{{tenant}:{hostname}:{pid}:{server_id}}}")
}
pub fn server_and_workers_keys(
tenant: Option<&str>,
hostname: &str,
pid: i32,
server_id: &str,
) -> (String, String) {
if let Some(t) = tenant {
(
server_info_key_with_tenant(t, hostname, pid, server_id),
workers_key_with_tenant(t, hostname, pid, server_id),
)
} else {
(
server_info_key(hostname, pid, server_id),
workers_key(hostname, pid, server_id),
)
}
}
pub fn scheduler_entries_key(scheduler_id: &str) -> String {
format!("{ALL_SCHEDULERS}:{{{scheduler_id}}}")
}
pub fn scheduler_entries_key_with_tenant(tenant: &str, scheduler_id: &str) -> String {
format!("{ALL_SCHEDULERS}:{{{tenant}:{scheduler_id}}}")
}
pub fn scheduler_history_key(entry_id: &str) -> String {
format!("asynq:scheduler_history:{entry_id}")
}
pub fn processed_total_key(qname: &str) -> String {
format!("{}processed", queue_key_prefix(qname))
}
pub fn failed_total_key(qname: &str) -> String {
format!("{}failed", queue_key_prefix(qname))
}
pub fn processed_key(qname: &str, date: &DateTime<Utc>) -> String {
format!(
"{}processed:{}",
queue_key_prefix(qname),
date.format(TIME_LAYOUT_YMD)
)
}
pub fn failed_key(qname: &str, date: &DateTime<Utc>) -> String {
format!(
"{}failed:{}",
queue_key_prefix(qname),
date.format(TIME_LAYOUT_YMD)
)
}
pub fn server_info_key_legacy(server_id: &str) -> String {
format!("{SERVERS_PREFIX}{server_id}")
}
pub fn server_info_key_full(hostname: &str, pid: i32, server_id: &str) -> String {
format!("{SERVERS_PREFIX}{{{hostname}:{pid}:{server_id}}}")
}
#[inline]
pub fn server_info_key_full_with_tenant(
tenant: &str,
hostname: &str,
pid: i32,
server_id: &str,
) -> String {
server_info_key_with_tenant(tenant, hostname, pid, server_id)
}
pub fn workers_key_full(hostname: &str, pid: i32, server_id: &str) -> String {
format!("{WORKERS_PREFIX}{{{hostname}:{pid}:{server_id}}}")
}
#[inline]
pub fn workers_key_full_with_tenant(
tenant: &str,
hostname: &str,
pid: i32,
server_id: &str,
) -> String {
workers_key_with_tenant(tenant, hostname, pid, server_id)
}
#[cfg(test)]
mod tests {
use crate::base::keys;
#[test]
fn test_keys_generation() {
assert_eq!(keys::pending_key("default"), "asynq:{default}:pending");
assert_eq!(keys::pending_key("default"), "asynq:{default}:pending"); assert_eq!(keys::active_key("default"), "asynq:{default}:active");
assert_eq!(keys::scheduled_key("default"), "asynq:{default}:scheduled");
assert_eq!(keys::retry_key("default"), "asynq:{default}:retry");
assert_eq!(keys::archived_key("default"), "asynq:{default}:archived");
assert_eq!(keys::completed_key("default"), "asynq:{default}:completed");
assert_eq!(keys::ALL_SERVERS, "asynq:servers");
assert_eq!(keys::ALL_WORKERS, "asynq:workers");
assert_eq!(keys::ALL_QUEUES, "asynq:queues");
assert_eq!(
keys::server_info_key("localhost", 12345, "server1"),
"asynq:servers:{localhost:12345:server1}"
);
assert_eq!(
keys::workers_key("localhost", 12345, "server1"),
"asynq:workers:{localhost:12345:server1}"
);
assert_eq!(
keys::task_key("default", "task1"),
"asynq:{default}:t:task1"
);
assert_eq!(
keys::group_key("default", "group1"),
"asynq:{default}:g:group1"
);
let unique_key = keys::unique_key("default", "email:send", b"test payload");
assert!(unique_key.starts_with("asynq:{default}:unique:email:send:"));
let empty_unique_key = keys::unique_key("default", "email:send", b"");
assert_eq!(empty_unique_key, "asynq:{default}:unique:email:send:");
assert_eq!(
keys::task_key("default", "task1"),
"asynq:{default}:t:task1"
);
}
#[test]
fn test_tenant_keys_generation() {
assert_eq!(
keys::server_info_key_with_tenant(
"tenant1",
"Arch",
6492,
"10b398de-d250-4bdf-b513-4b5f52247352"
),
"asynq:servers:{tenant1:Arch:6492:10b398de-d250-4bdf-b513-4b5f52247352}"
);
assert_eq!(
keys::workers_key_with_tenant(
"tenant1",
"Arch",
6492,
"10b398de-d250-4bdf-b513-4b5f52247352"
),
"asynq:workers:{tenant1:Arch:6492:10b398de-d250-4bdf-b513-4b5f52247352}"
);
assert_eq!(
keys::server_info_key_full_with_tenant("tenant1", "localhost", 12345, "server1"),
"asynq:servers:{tenant1:localhost:12345:server1}"
);
assert_eq!(
keys::workers_key_full_with_tenant("tenant1", "localhost", 12345, "server1"),
"asynq:workers:{tenant1:localhost:12345:server1}"
);
let tenant1_server_key = keys::server_info_key_with_tenant("tenant1", "host", 1234, "server1");
let tenant2_server_key = keys::server_info_key_with_tenant("tenant2", "host", 1234, "server1");
assert_ne!(tenant1_server_key, tenant2_server_key);
assert!(tenant1_server_key.contains("tenant1"));
assert!(tenant2_server_key.contains("tenant2"));
let (server_key, workers_key) =
keys::server_and_workers_keys(Some("tenant1"), "host", 1234, "server1");
assert_eq!(server_key, "asynq:servers:{tenant1:host:1234:server1}");
assert_eq!(workers_key, "asynq:workers:{tenant1:host:1234:server1}");
let (server_key_no_tenant, workers_key_no_tenant) =
keys::server_and_workers_keys(None, "host", 1234, "server1");
assert_eq!(server_key_no_tenant, "asynq:servers:{host:1234:server1}");
assert_eq!(workers_key_no_tenant, "asynq:workers:{host:1234:server1}");
}
#[test]
fn test_scheduler_entries_key() {
assert_eq!(
keys::scheduler_entries_key("arch:51547:b7ae5325-3c4a-491a-bd29-25ce9fcc00e9"),
"asynq:schedulers:{arch:51547:b7ae5325-3c4a-491a-bd29-25ce9fcc00e9}"
);
assert_eq!(
keys::scheduler_entries_key_with_tenant(
"tenant1",
"arch:51547:b7ae5325-3c4a-491a-bd29-25ce9fcc00e9"
),
"asynq:schedulers:{tenant1:arch:51547:b7ae5325-3c4a-491a-bd29-25ce9fcc00e9}"
);
let tenant1_key = keys::scheduler_entries_key_with_tenant("tenant1", "host:1234:uuid1");
let tenant2_key = keys::scheduler_entries_key_with_tenant("tenant2", "host:1234:uuid1");
assert_ne!(tenant1_key, tenant2_key);
assert!(tenant1_key.contains("tenant1"));
assert!(tenant2_key.contains("tenant2"));
let no_tenant_key = keys::scheduler_entries_key("host:1234:uuid1");
assert_ne!(no_tenant_key, tenant1_key);
}
}