use std::time::Duration;
use bonsaidb::core::{
document::{CollectionDocument, Emit},
schema::{
view::{map::Mappings, ViewUpdatePolicy},
Collection, CollectionMapReduce, ReduceResult, Schema, View, ViewMapResult,
ViewMappedValue, ViewSchema,
},
};
use serde::{Deserialize, Serialize};
pub type Id = u128;
pub type Timestamp = i128;
pub(crate) const MQ_NOTIFY: &str = "message_queue_notify";
#[derive(Debug, Schema)]
#[schema(name = "message_queue", collections = [Message, MessagePayload])]
pub struct MessageQueueSchema;
#[derive(Debug, Clone, Serialize, Deserialize, Collection)]
#[collection(
name = "messages",
primary_key = Id,
views = [DueMessages, LatestMessage]
)]
pub struct Message {
#[natural_id]
pub id: Id,
pub name: String,
pub created_at: Timestamp,
pub attempt_at: Timestamp,
pub executions: u32,
pub max_executions: Option<u32>,
pub retry_timing: RetryTiming,
pub ordered: bool,
pub execute_after: Option<Id>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RetryTiming {
Fixed(Duration),
Backoff {
initial: Duration,
maximum: Option<Duration>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, Collection)]
#[collection(
name = "message_payloads",
primary_key = Id,
)]
pub struct MessagePayload {
#[natural_id]
pub message_id: Id,
pub payload_json: Option<serde_json::Value>,
pub payload_bytes: Option<Vec<u8>>,
}
#[derive(Debug, Clone, View)]
#[view(collection = Message, key = Timestamp, value = Option<Timestamp>, name = "due_messages")]
pub struct DueMessages;
impl ViewSchema for DueMessages {
type View = Self;
type MappedKey<'doc> = <Self as View>::Key;
fn update_policy(&self) -> ViewUpdatePolicy {
ViewUpdatePolicy::default()
}
fn version(&self) -> u64 {
0
}
}
impl CollectionMapReduce for DueMessages {
fn map<'doc>(&self, document: CollectionDocument<Message>) -> ViewMapResult<'doc, Self::View> {
document
.header
.emit_key_and_value(document.contents.attempt_at, Some(document.contents.attempt_at))
}
fn reduce(
&self,
mappings: &[ViewMappedValue<Self::View>],
_rereduce: bool,
) -> ReduceResult<Self::View> {
Ok(mappings.iter().filter_map(|view| view.value).min())
}
}
#[derive(Debug, Clone, View)]
#[view(collection = Message, key = Timestamp, value = Option<Id>, name = "latest_message")]
pub struct LatestMessage;
impl ViewSchema for LatestMessage {
type View = Self;
type MappedKey<'doc> = <Self as View>::Key;
fn update_policy(&self) -> ViewUpdatePolicy {
ViewUpdatePolicy::Unique
}
fn version(&self) -> u64 {
0
}
}
impl CollectionMapReduce for LatestMessage {
fn map<'doc>(&self, document: CollectionDocument<Message>) -> ViewMapResult<'doc, Self::View> {
if document.contents.ordered {
document
.header
.emit_key_and_value(document.contents.created_at, Some(document.header.id))
} else {
Ok(Mappings::Simple(None))
}
}
fn reduce(
&self,
mappings: &[ViewMappedValue<Self::View>],
_rereduce: bool,
) -> ReduceResult<Self::View> {
let max_val = mappings.iter().max_by_key(|view| view.key);
Ok(max_val.and_then(|view| view.value))
}
}
impl RetryTiming {
#[must_use]
pub fn next_duration(&self, executions: u32) -> Duration {
match *self {
RetryTiming::Fixed(fixed) => fixed,
RetryTiming::Backoff { initial, maximum } => {
let duration =
initial.saturating_mul(2_u32.saturating_pow(executions.saturating_sub(1)));
if let Some(max) = maximum {
duration.min(max)
} else {
duration
}
}
}
}
}
pub(crate) fn generate_id() -> Result<Id, getrandom::Error> {
let mut buf = [0_u8; std::mem::size_of::<Id>()];
getrandom::getrandom(&mut buf)?;
let id = unsafe { std::mem::transmute(buf) };
Ok(id)
}