use std::time::Duration;
use bonsaidb::core::{
connection::AsyncConnection,
pubsub::AsyncPubSub,
transaction::{Operation, Transaction},
};
use serde::Serialize;
use time::OffsetDateTime;
use crate::{
queue::{
generate_id, Id, LatestMessage, Message, MessagePayload, RetryTiming, Timestamp, MQ_NOTIFY,
},
Error,
};
#[derive(Debug, Clone)]
pub struct JobBuilder {
name: &'static str,
id: Option<Id>,
ordered: bool,
delay: Option<Duration>,
max_executions: Option<u32>,
retry_timing: RetryTiming,
payload_json: Option<serde_json::Value>,
payload_bytes: Option<Vec<u8>>,
}
impl JobBuilder {
#[must_use]
pub fn new(name: &'static str) -> Self {
Self {
name,
id: None,
ordered: false,
delay: None,
max_executions: None,
retry_timing: RetryTiming::Backoff {
initial: Duration::from_secs(1),
maximum: Some(Duration::from_secs(60 * 60)),
},
payload_json: None,
payload_bytes: None,
}
}
#[must_use]
#[inline]
pub fn id(mut self, id: Id) -> Self {
self.id = Some(id);
self
}
#[must_use]
#[inline]
pub fn ordered(mut self, ordered: bool) -> Self {
self.ordered = ordered;
self
}
#[must_use]
#[inline]
pub fn delay(mut self, delay: impl Into<Option<Duration>>) -> Self {
self.delay = delay.into();
self
}
#[must_use]
#[inline]
pub fn max_executions(mut self, max_executions: impl Into<Option<u32>>) -> Self {
self.max_executions = max_executions.into();
self
}
#[must_use]
#[inline]
pub fn retry_timing(mut self, timing: RetryTiming) -> Self {
self.retry_timing = timing;
self
}
#[inline]
pub fn payload_json<S: Serialize>(mut self, payload: S) -> Result<Self, serde_json::Error> {
let value = serde_json::to_value(payload)?;
self.payload_json = Some(value);
Ok(self)
}
#[must_use]
#[inline]
pub fn payload_bytes(mut self, payload: Vec<u8>) -> Self {
self.payload_bytes = Some(payload);
self
}
async fn prepare_db_entries<DB>(self, db: &DB) -> Result<(Message, MessagePayload), Error>
where
DB: AsyncConnection,
{
let execute_after =
if self.ordered { db.view::<LatestMessage>().reduce().await? } else { None };
let id = self.id.map_or_else(generate_id, Ok)?;
let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
let attempt_at = self
.delay
.map(|delay| Timestamp::try_from(delay.as_nanos()))
.transpose()?
.map_or(now, |delay| now + delay);
let message = Message {
id,
name: self.name.to_owned(),
created_at: now,
attempt_at,
executions: 0,
max_executions: self.max_executions,
retry_timing: self.retry_timing,
ordered: self.ordered,
execute_after,
};
let payload = MessagePayload {
message_id: id,
payload_json: self.payload_json,
payload_bytes: self.payload_bytes,
};
Ok((message, payload))
}
#[tracing::instrument(level = "debug", skip_all)]
pub async fn spawn<DB>(self, db: &DB) -> Result<Id, Error>
where
DB: AsyncConnection + AsyncPubSub,
{
let (message, payload) = self.prepare_db_entries(db).await?;
Transaction::new()
.with(Operation::push_serialized::<Message>(&message)?)
.with(Operation::push_serialized::<MessagePayload>(&payload)?)
.apply_async(db)
.await?;
db.publish(&MQ_NOTIFY, &()).await?;
Ok(message.id)
}
}