use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
use super::context::Context;
type AfterFut = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum HandlerResult {
Ack,
Nack {
requeue: bool,
},
NackAfter {
delay: Duration,
},
}
impl HandlerResult {
#[must_use]
pub const fn ack() -> Self {
Self::Ack
}
pub fn and_after<F>(self, fut: F) -> Settle
where
F: Future<Output = ()> + Send + 'static,
{
Settle {
outcome: self,
after: Some(Box::pin(fut)),
}
}
#[must_use]
pub const fn retry() -> Self {
Self::Nack { requeue: true }
}
#[must_use]
pub const fn retry_after(delay: Duration) -> Self {
Self::NackAfter { delay }
}
#[must_use]
pub const fn drop() -> Self {
Self::Nack { requeue: false }
}
}
#[must_use]
pub struct Settle {
outcome: HandlerResult,
after: Option<AfterFut>,
}
impl Settle {
#[must_use]
pub const fn outcome(&self) -> HandlerResult {
self.outcome
}
pub(crate) fn take_after(&mut self) -> Option<AfterFut> {
self.after.take()
}
}
impl std::fmt::Debug for Settle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Settle")
.field("outcome", &self.outcome)
.field("after", &self.after.is_some())
.finish()
}
}
impl From<HandlerResult> for Settle {
fn from(outcome: HandlerResult) -> Self {
Self {
outcome,
after: None,
}
}
}
pub trait IntoSettle {
fn into_settle(self) -> Settle;
}
impl IntoSettle for Settle {
fn into_settle(self) -> Settle {
self
}
}
impl IntoSettle for HandlerResult {
fn into_settle(self) -> Settle {
self.into()
}
}
impl IntoSettle for () {
fn into_settle(self) -> Settle {
HandlerResult::Ack.into()
}
}
impl<E> IntoSettle for Result<(), E> {
fn into_settle(self) -> Settle {
match self {
Ok(()) => HandlerResult::Ack,
Err(_) => HandlerResult::drop(),
}
.into()
}
}
impl<E> IntoSettle for Result<HandlerResult, E> {
fn into_settle(self) -> Settle {
self.unwrap_or_else(|_| HandlerResult::drop()).into()
}
}
impl<E> IntoSettle for Result<Settle, E> {
fn into_settle(self) -> Settle {
self.unwrap_or_else(|_| HandlerResult::drop().into())
}
}
pub trait Handler<M>: Send + Sync {
fn handle(&self, msg: &M, ctx: &mut Context) -> impl Future<Output = Settle> + Send;
}
impl<M, F, Fut> Handler<M> for F
where
F: Fn(&M, &mut Context) -> Fut + Send + Sync,
Fut: Future + Send,
Fut::Output: IntoSettle,
{
fn handle(&self, msg: &M, ctx: &mut Context) -> impl Future<Output = Settle> + Send {
let fut = (self)(msg, ctx);
async move { fut.await.into_settle() }
}
}
impl<M, H> Handler<M> for Arc<H>
where
H: Handler<M>,
{
fn handle(&self, msg: &M, ctx: &mut Context) -> impl Future<Output = Settle> + Send {
(**self).handle(msg, ctx)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::{HandlerResult, IntoSettle, Settle};
#[test]
fn convenience_constructors_map_to_variants() {
assert_eq!(HandlerResult::ack(), HandlerResult::Ack);
assert_eq!(
HandlerResult::retry(),
HandlerResult::Nack { requeue: true }
);
assert_eq!(
HandlerResult::drop(),
HandlerResult::Nack { requeue: false }
);
assert_eq!(
HandlerResult::retry_after(Duration::from_secs(2)),
HandlerResult::NackAfter {
delay: Duration::from_secs(2)
}
);
}
#[test]
fn into_settle_covers_every_return_shape() {
assert_outcome(HandlerResult::Ack.into_settle(), HandlerResult::Ack, false);
assert_outcome(().into_settle(), HandlerResult::Ack, false);
assert_outcome(Ok::<(), ()>(()).into_settle(), HandlerResult::Ack, false);
assert_outcome(
Err::<(), ()>(()).into_settle(),
HandlerResult::drop(),
false,
);
assert_outcome(
Ok::<HandlerResult, ()>(HandlerResult::retry()).into_settle(),
HandlerResult::retry(),
false,
);
assert_outcome(
Err::<HandlerResult, ()>(()).into_settle(),
HandlerResult::drop(),
false,
);
let with_after = HandlerResult::ack().and_after(async {});
assert_outcome(with_after.into_settle(), HandlerResult::Ack, true);
let ok: Result<Settle, ()> = Ok(HandlerResult::drop().and_after(async {}));
assert_outcome(ok.into_settle(), HandlerResult::drop(), true);
let err: Result<Settle, ()> = Err(());
assert_outcome(err.into_settle(), HandlerResult::drop(), false);
}
#[test]
fn and_after_carries_the_outcome_and_continuation() {
let settle = HandlerResult::ack().and_after(async {});
assert_eq!(settle.outcome(), HandlerResult::Ack);
assert!(format!("{settle:?}").contains("after: true"));
let plain: Settle = HandlerResult::retry().into();
assert_eq!(plain.outcome(), HandlerResult::retry());
assert!(format!("{plain:?}").contains("after: false"));
}
fn assert_outcome(mut settle: Settle, outcome: HandlerResult, has_after: bool) {
assert_eq!(settle.outcome(), outcome);
assert_eq!(settle.take_after().is_some(), has_after);
}
}