use std::sync::{Arc, Mutex};
use crate::{
BoxFuture, BoxedError, Delivery, DeliveryControl, DeliveryHandle, DeliveryInspector,
DeliveryState, EventBusError, Message,
};
type SharedHandle = Arc<Mutex<Option<Box<dyn DeliveryHandle>>>>;
pub(super) struct AutoFinalizeTracker {
inner: SharedHandle,
}
impl AutoFinalizeTracker {
pub(super) async fn new(
boxed: Box<dyn DeliveryHandle>,
) -> Result<(Self, AutoFinalizeProxy), EventBusError> {
let msg_snapshot = Arc::new(boxed.message().clone());
let state_snapshot = boxed.state().await?;
let inner: SharedHandle = Arc::new(Mutex::new(Some(boxed)));
let tracker = Self {
inner: Arc::clone(&inner),
};
let proxy = AutoFinalizeProxy {
inner,
msg_snapshot,
state_snapshot,
};
Ok((tracker, proxy))
}
pub(super) fn take_remaining(&self) -> Option<Box<dyn DeliveryHandle>> {
self.inner.lock().expect("auto-finalize lock").take()
}
}
pub(super) struct AutoFinalizeProxy {
inner: SharedHandle,
msg_snapshot: Arc<Message>,
state_snapshot: DeliveryState,
}
impl AutoFinalizeProxy {
fn take(&self) -> Option<Box<dyn DeliveryHandle>> {
self.inner.lock().expect("auto-finalize lock").take()
}
}
impl Delivery for AutoFinalizeProxy {
fn message(&self) -> &Message {
&self.msg_snapshot
}
}
impl DeliveryInspector for AutoFinalizeProxy {
fn state(&self) -> BoxFuture<'_, Result<DeliveryState, EventBusError>> {
let snap = self.state_snapshot.clone();
Box::pin(async move { Ok(snap) })
}
}
impl DeliveryControl for AutoFinalizeProxy {
fn ack(self: Box<Self>) -> BoxFuture<'static, Result<(), EventBusError>> {
Box::pin(async move {
match self.take() {
Some(b) => b.ack().await,
None => Ok(()),
}
})
}
fn nack(self: Box<Self>, reason: BoxedError) -> BoxFuture<'static, Result<(), EventBusError>> {
Box::pin(async move {
match self.take() {
Some(b) => b.nack(reason).await,
None => Ok(()),
}
})
}
fn retry(self: Box<Self>, reason: BoxedError) -> BoxFuture<'static, Result<(), EventBusError>> {
Box::pin(async move {
match self.take() {
Some(b) => b.retry(reason).await,
None => Ok(()),
}
})
}
}