use crate::backend::wsdb::message::{
AddToGroupRequest, AddToGroupUniqueRequest, AggregationCheckRequest, ArchiveRequest,
ClearServerStateRequest, ClientMessage, DeleteAggregationSetRequest, DequeueRequest,
EnqueueUniqueRequest, ExtendLeaseRequest, ListGroupsRequest, ListLeaseExpiredRequest,
ReadAggregationSetRequest, RetryRequest, ScheduleRequest, ScheduleUniqueRequest, ServerMessage,
WorkerInfoData, WriteResultRequest, WriteServerStateRequest,
};
use crate::backend::wsdb::{ws_broker::CLOSE_FRAME_TIMEOUT_MS, WebSocketBroker};
use crate::base::Broker;
use crate::error::{Error, Result};
use crate::proto::{ServerInfo, TaskMessage, WorkerInfo};
use crate::task::{Task, TaskInfo};
use async_trait::async_trait;
use base64::prelude::*;
use chrono::{DateTime, Utc};
use std::time::Duration;
#[async_trait]
impl Broker for WebSocketBroker {
async fn ping(&self) -> Result<()> {
let resp = self.send_and_receive(ClientMessage::Ping).await?;
match resp {
ServerMessage::Pong => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn close(&self) -> Result<()> {
let mut conn = self.connection.write().await;
if let Some(mut ws_conn) = conn.take() {
if let Some(shutdown_tx) = ws_conn.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
tokio::time::sleep(tokio::time::Duration::from_millis(CLOSE_FRAME_TIMEOUT_MS)).await;
}
Ok(())
}
async fn enqueue(&self, task: &Task) -> Result<TaskInfo> {
let req = self.task_to_enqueue_request(task);
let resp = self.send_and_receive(ClientMessage::Enqueue(req)).await?;
self.handle_task_info_response(resp)
}
async fn enqueue_unique(&self, task: &Task, ttl: Duration) -> Result<TaskInfo> {
let req = EnqueueUniqueRequest {
enqueue: self.task_to_enqueue_request(task),
ttl_seconds: ttl.as_secs(),
};
let resp = self
.send_and_receive(ClientMessage::EnqueueUnique(req))
.await?;
self.handle_task_info_response(resp)
}
async fn dequeue(&self, queues: &[String]) -> Result<Option<TaskMessage>> {
let req = DequeueRequest {
queues: queues.to_vec(),
};
let resp = self.send_and_receive(ClientMessage::Dequeue(req)).await?;
match resp {
ServerMessage::DequeueResult(Some(task_resp)) => {
Ok(Some(self.response_to_task_message(&task_resp)?))
}
ServerMessage::DequeueResult(None) => Ok(None),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn done(&self, msg: &TaskMessage) -> Result<()> {
let req = self.task_message_to_done_request(msg);
let resp = self.send_and_receive(ClientMessage::Done(req)).await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn mark_as_complete(&self, msg: &TaskMessage) -> Result<()> {
let req = self.task_message_to_done_request(msg);
let resp = self
.send_and_receive(ClientMessage::MarkComplete(req))
.await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn requeue(
&self,
msg: &TaskMessage,
process_at: DateTime<Utc>,
error_msg: &str,
) -> Result<()> {
self
.retry(msg, process_at, error_msg, !error_msg.is_empty())
.await
}
async fn schedule(&self, task: &Task, process_at: DateTime<Utc>) -> Result<TaskInfo> {
let req = ScheduleRequest {
enqueue: self.task_to_enqueue_request(task),
process_at: process_at.timestamp(),
};
let resp = self.send_and_receive(ClientMessage::Schedule(req)).await?;
self.handle_task_info_response(resp)
}
async fn schedule_unique(
&self,
task: &Task,
process_at: DateTime<Utc>,
ttl: Duration,
) -> Result<TaskInfo> {
let req = ScheduleUniqueRequest {
schedule: ScheduleRequest {
enqueue: self.task_to_enqueue_request(task),
process_at: process_at.timestamp(),
},
ttl_seconds: ttl.as_secs(),
};
let resp = self
.send_and_receive(ClientMessage::ScheduleUnique(req))
.await?;
self.handle_task_info_response(resp)
}
async fn retry(
&self,
msg: &TaskMessage,
process_at: DateTime<Utc>,
error_msg: &str,
is_failure: bool,
) -> Result<()> {
let req = RetryRequest {
task: self.task_message_to_done_request(msg),
process_at: process_at.timestamp(),
error_msg: error_msg.to_string(),
is_failure,
};
let resp = self.send_and_receive(ClientMessage::Retry(req)).await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn archive(&self, msg: &TaskMessage, error_msg: &str) -> Result<()> {
let req = ArchiveRequest {
task: self.task_message_to_done_request(msg),
error_msg: error_msg.to_string(),
};
let resp = self.send_and_receive(ClientMessage::Archive(req)).await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn forward_if_ready(&self, _queues: &[String]) -> Result<i64> {
Ok(0)
}
async fn add_to_group(&self, task: &Task, group: &str) -> Result<TaskInfo> {
let req = AddToGroupRequest {
enqueue: self.task_to_enqueue_request(task),
group: group.to_string(),
};
let resp = self
.send_and_receive(ClientMessage::AddToGroup(req))
.await?;
self.handle_task_info_response(resp)
}
async fn add_to_group_unique(&self, task: &Task, group: &str, ttl: Duration) -> Result<TaskInfo> {
let req = AddToGroupUniqueRequest {
enqueue: self.task_to_enqueue_request(task),
group: group.to_string(),
ttl_seconds: ttl.as_secs(),
};
let resp = self
.send_and_receive(ClientMessage::AddToGroupUnique(req))
.await?;
self.handle_task_info_response(resp)
}
async fn list_groups(&self, queue: &str) -> Result<Vec<String>> {
let req = ListGroupsRequest {
queue: queue.to_string(),
};
let resp = self
.send_and_receive(ClientMessage::ListGroups(req))
.await?;
match resp {
ServerMessage::GroupsList(groups) => Ok(groups),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn aggregation_check(
&self,
queue: &str,
group: &str,
aggregation_delay: Duration,
max_delay: Duration,
max_size: usize,
) -> Result<Option<String>> {
let req = AggregationCheckRequest {
queue: queue.to_string(),
group: group.to_string(),
aggregation_delay_seconds: aggregation_delay.as_secs(),
max_delay_seconds: max_delay.as_secs(),
max_size,
};
let resp = self
.send_and_receive(ClientMessage::AggregationCheck(req))
.await?;
match resp {
ServerMessage::AggregationSetId(set_id) => Ok(set_id),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn read_aggregation_set(
&self,
queue: &str,
group: &str,
set_id: &str,
) -> Result<Vec<TaskMessage>> {
let req = ReadAggregationSetRequest {
queue: queue.to_string(),
group: group.to_string(),
set_id: set_id.to_string(),
};
let resp = self
.send_and_receive(ClientMessage::ReadAggregationSet(req))
.await?;
match resp {
ServerMessage::AggregationSet(tasks) => {
let mut result = Vec::new();
for task_resp in tasks {
result.push(self.response_to_task_message(&task_resp)?);
}
Ok(result)
}
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn delete_aggregation_set(&self, queue: &str, group: &str, set_id: &str) -> Result<()> {
let req = DeleteAggregationSetRequest {
queue: queue.to_string(),
group: group.to_string(),
set_id: set_id.to_string(),
};
let resp = self
.send_and_receive(ClientMessage::DeleteAggregationSet(req))
.await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn reclaim_stale_aggregation_sets(&self, _queue: &str) -> Result<()> {
Ok(())
}
async fn delete_expired_completed_tasks(&self, _queue: &str) -> Result<i64> {
Ok(0)
}
async fn list_lease_expired(
&self,
cutoff: DateTime<Utc>,
queues: &[String],
) -> Result<Vec<TaskMessage>> {
let req = ListLeaseExpiredRequest {
cutoff: cutoff.timestamp(),
queues: queues.to_vec(),
};
let resp = self
.send_and_receive(ClientMessage::ListLeaseExpired(req))
.await?;
match resp {
ServerMessage::LeaseExpiredTasks(tasks) => {
let mut result = Vec::new();
for task_resp in tasks {
result.push(self.response_to_task_message(&task_resp)?);
}
Ok(result)
}
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn extend_lease(&self, queue: &str, task_id: &str, lease_duration: Duration) -> Result<()> {
let req = ExtendLeaseRequest {
queue: queue.to_string(),
task_id: task_id.to_string(),
lease_duration_seconds: lease_duration.as_secs(),
};
let resp = self
.send_and_receive(ClientMessage::ExtendLease(req))
.await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn write_server_state(
&self,
server_info: &ServerInfo,
workers: Vec<WorkerInfo>,
ttl: Duration,
_tenant: Option<&str>,
) -> Result<()> {
let worker_data: Vec<WorkerInfoData> = workers
.into_iter()
.map(|w| WorkerInfoData {
host: w.host.clone(),
pid: w.pid,
server_id: w.server_id.clone(),
task_id: w.task_id.clone(),
task_type: w.task_type.clone(),
task_payload: BASE64_STANDARD.encode(&w.task_payload),
queue: w.queue.clone(),
})
.collect();
let req = WriteServerStateRequest {
host: server_info.host.clone(),
pid: server_info.pid,
server_id: server_info.server_id.clone(),
concurrency: server_info.concurrency,
queues: server_info.queues.clone(),
strict_priority: server_info.strict_priority,
status: server_info.status.clone(),
active_worker_count: server_info.active_worker_count,
ttl_seconds: ttl.as_secs(),
workers: worker_data,
};
let resp = self
.send_and_receive(ClientMessage::WriteServerState(req))
.await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn clear_server_state(
&self,
host: &str,
pid: i32,
server_id: &str,
_tenant: Option<&str>,
) -> Result<()> {
let req = ClearServerStateRequest {
host: host.to_string(),
pid,
server_id: server_id.to_string(),
};
let resp = self
.send_and_receive(ClientMessage::ClearServerState(req))
.await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn cancellation_pub_sub(
&self,
) -> Result<Box<dyn futures::Stream<Item = Result<String>> + Unpin + Send>> {
use futures::stream::unfold;
let receiver = self.cancel_tx.subscribe();
let _ = self
.send_and_receive(ClientMessage::SubscribeCancellation)
.await;
let stream = unfold(receiver, |mut rx| async move {
loop {
match rx.recv().await {
Ok(task_id) => return Some((Ok(task_id), rx)),
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => return None,
}
}
});
Ok(Box::new(Box::pin(stream)))
}
async fn publish_cancellation(&self, task_id: &str) -> Result<()> {
let resp = self
.send_and_receive(ClientMessage::PublishCancellation {
task_id: task_id.to_string(),
})
.await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
async fn write_result(&self, queue: &str, task_id: &str, result: &[u8]) -> Result<()> {
let req = WriteResultRequest {
queue: queue.to_string(),
task_id: task_id.to_string(),
result: BASE64_STANDARD.encode(result),
};
let resp = self
.send_and_receive(ClientMessage::WriteResult(req))
.await?;
match resp {
ServerMessage::Success => Ok(()),
ServerMessage::Error { message } => Err(Error::broker(message)),
_ => Err(Error::invalid_message("Unexpected response")),
}
}
}
#[async_trait]
impl crate::base::SchedulerBroker for WebSocketBroker {
async fn write_scheduler_entries(
&self,
_entries: &[crate::proto::SchedulerEntry],
_scheduler_id: &str,
_ttl_secs: u64,
_tenant: Option<&str>,
) -> Result<()> {
Ok(())
}
async fn record_scheduler_enqueue_event(
&self,
_event: &crate::proto::SchedulerEnqueueEvent,
_entry_id: &str,
) -> Result<()> {
Ok(())
}
async fn scheduler_entries_script(
&self,
_scheduler_id: &str,
) -> Result<std::collections::HashMap<String, Vec<u8>>> {
Ok(std::collections::HashMap::new())
}
async fn scheduler_events_script(&self, _count: usize) -> Result<Vec<Vec<u8>>> {
Ok(Vec::new())
}
async fn clear_scheduler_entries(
&self,
_scheduler_id: &str,
_tenant: Option<&str>,
) -> Result<()> {
Ok(())
}
}