use crate::error::Result;
use crate::proto::{ServerInfo, TaskMessage, WorkerInfo};
use crate::task::{Task, TaskInfo};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::str::FromStr;
use std::time::Duration;
pub mod constants;
pub mod keys;
#[async_trait]
pub trait Broker: Send + Sync {
async fn ping(&self) -> Result<()>;
async fn close(&self) -> Result<()>;
async fn enqueue(&self, task: &Task) -> Result<TaskInfo>;
async fn enqueue_unique(&self, task: &Task, ttl: Duration) -> Result<TaskInfo>;
async fn dequeue(&self, queues: &[String]) -> Result<Option<TaskMessage>>;
async fn done(&self, msg: &TaskMessage) -> Result<()>;
async fn mark_as_complete(&self, msg: &TaskMessage) -> Result<()>;
async fn requeue(
&self,
msg: &TaskMessage,
process_at: DateTime<Utc>,
error_msg: &str,
) -> Result<()>;
async fn schedule(&self, task: &Task, process_at: DateTime<Utc>) -> Result<TaskInfo>;
async fn schedule_unique(
&self,
task: &Task,
process_at: DateTime<Utc>,
ttl: Duration,
) -> Result<TaskInfo>;
async fn retry(
&self,
msg: &TaskMessage,
process_at: DateTime<Utc>,
error_msg: &str,
is_failure: bool,
) -> Result<()>;
async fn archive(&self, msg: &TaskMessage, error_msg: &str) -> Result<()>;
async fn forward_if_ready(&self, queues: &[String]) -> Result<i64>;
async fn add_to_group(&self, task: &Task, group: &str) -> Result<TaskInfo>;
async fn add_to_group_unique(&self, task: &Task, group: &str, ttl: Duration) -> Result<TaskInfo>;
async fn list_groups(&self, queue: &str) -> Result<Vec<String>>;
async fn aggregation_check(
&self,
queue: &str,
group: &str,
aggregation_delay: Duration,
max_delay: Duration,
max_size: usize,
) -> Result<Option<String>>;
async fn read_aggregation_set(
&self,
queue: &str,
group: &str,
set_id: &str,
) -> Result<Vec<TaskMessage>>;
async fn delete_aggregation_set(&self, queue: &str, group: &str, set_id: &str) -> Result<()>;
async fn reclaim_stale_aggregation_sets(&self, queue: &str) -> Result<()>;
async fn delete_expired_completed_tasks(&self, queue: &str) -> Result<i64>;
async fn list_lease_expired(
&self,
cutoff: DateTime<Utc>,
queues: &[String],
) -> Result<Vec<TaskMessage>>;
async fn extend_lease(&self, queue: &str, task_id: &str, lease_duration: Duration) -> Result<()>;
async fn write_server_state(
&self,
server_info: &ServerInfo,
workers: Vec<WorkerInfo>,
ttl: Duration,
tenant: Option<&str>,
) -> Result<()>;
async fn clear_server_state(
&self,
host: &str,
pid: i32,
server_id: &str,
tenant: Option<&str>,
) -> Result<()>;
async fn cancellation_pub_sub(
&self,
) -> Result<Box<dyn futures::Stream<Item = Result<String>> + Unpin + Send>>;
async fn publish_cancellation(&self, task_id: &str) -> Result<()>;
async fn write_result(&self, queue: &str, task_id: &str, result: &[u8]) -> Result<()>;
}
#[async_trait]
pub trait SchedulerBroker: Send + Sync {
async fn write_scheduler_entries(
&self,
entries: &[crate::proto::SchedulerEntry],
scheduler_id: &str,
ttl_secs: u64,
tenant: Option<&str>,
) -> Result<()>;
async fn record_scheduler_enqueue_event(
&self,
event: &crate::proto::SchedulerEnqueueEvent,
entry_id: &str,
) -> Result<()>;
async fn scheduler_entries_script(
&self,
scheduler_id: &str,
) -> Result<std::collections::HashMap<String, Vec<u8>>>;
async fn scheduler_events_script(&self, count: usize) -> Result<Vec<Vec<u8>>>;
async fn clear_scheduler_entries(&self, scheduler_id: &str, tenant: Option<&str>) -> Result<()>;
}
#[derive(Debug, Clone, PartialEq)]
pub enum ServerState {
New,
Active,
Stopped,
Closed,
}
impl ServerState {
pub fn as_str(&self) -> &'static str {
match self {
Self::New => "new",
Self::Active => "active",
Self::Stopped => "stopped",
Self::Closed => "closed",
}
}
}
impl FromStr for ServerState {
type Err = ();
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"new" => Ok(Self::New),
"active" => Ok(Self::Active),
"stopped" => Ok(Self::Stopped),
"closed" => Ok(Self::Closed),
_ => Err(()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_server_state_conversion() {
assert_eq!(ServerState::Active.as_str(), "active");
assert_eq!("active".parse::<ServerState>(), Ok(ServerState::Active));
assert!("invalid".parse::<ServerState>().is_err());
}
}