use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit};
use crate::{
BoxedError, Delivery, DeliveryControl, DeliveryInspector, DeliveryState, EventBusError,
Message, SubscriptionConfig, HEADER_DEAD_LETTER_REASON, HEADER_RETRY_ATTEMPT,
HEADER_RETRY_REASON,
};
use super::ack_flusher::AckRequest;
use super::backend::{SharedBackend, StreamBackend};
pub(super) struct StreamDelivery<B: StreamBackend> {
backend: SharedBackend<B>,
ack_tx: mpsc::Sender<AckRequest>,
message_id: String,
message: Arc<Message>,
state: DeliveryState,
config: Arc<SubscriptionConfig>,
_permit: OwnedSemaphorePermit,
}
impl<B: StreamBackend> StreamDelivery<B> {
pub(super) fn new(
backend: SharedBackend<B>,
ack_tx: mpsc::Sender<AckRequest>,
message_id: String,
message: Arc<Message>,
state: DeliveryState,
config: Arc<SubscriptionConfig>,
permit: OwnedSemaphorePermit,
) -> Self {
Self {
backend,
ack_tx,
message_id,
message,
state,
config,
_permit: permit,
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(message_id = %self.message_id)))]
async fn mark_acked(&self) -> Result<(), EventBusError> {
let (done_tx, done_rx) = oneshot::channel();
self.ack_tx
.send(AckRequest {
id: self.message_id.clone(),
done: done_tx,
})
.await
.map_err(|_| EventBusError::Internal("ack flusher shut down".into()))?;
done_rx
.await
.map_err(|_| EventBusError::Internal("ack flusher dropped response".into()))?
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(reason)))]
async fn publish_dead_letter(&self, reason: &str) -> Result<(), EventBusError> {
if let Some(dead_letter_topic) = self.config.dead_letter_topic().cloned() {
let mut dead_letter = Message::clone(&self.message);
dead_letter.topic = dead_letter_topic.clone();
dead_letter
.headers
.insert(HEADER_DEAD_LETTER_REASON.to_string(), reason.to_string());
self.backend
.publish(dead_letter_topic.as_str(), dead_letter)
.await?;
}
Ok(())
}
pub(super) async fn pre_ack(&self) -> Result<(), EventBusError> {
self.mark_acked().await
}
}
impl<B: StreamBackend> DeliveryInspector for StreamDelivery<B> {
fn state(&self) -> crate::BoxFuture<'_, Result<DeliveryState, EventBusError>> {
Box::pin(async move { Ok(self.state.clone()) })
}
}
impl<B: StreamBackend> Delivery for StreamDelivery<B> {
fn message(&self) -> &Message {
&self.message
}
}
impl<B: StreamBackend> DeliveryControl for StreamDelivery<B> {
fn ack(self: Box<Self>) -> crate::BoxFuture<'static, Result<(), EventBusError>> {
Box::pin(async move { self.mark_acked().await })
}
fn nack(
self: Box<Self>,
reason: BoxedError,
) -> crate::BoxFuture<'static, Result<(), EventBusError>> {
Box::pin(async move {
let reason_str = reason.to_string();
self.publish_dead_letter(&reason_str).await?;
self.mark_acked().await
})
}
fn retry(
self: Box<Self>,
reason: BoxedError,
) -> crate::BoxFuture<'static, Result<(), EventBusError>> {
Box::pin(async move {
let retry_exhausted = self.state.attempt >= self.state.max_attempt;
let reason_str = reason.to_string();
if retry_exhausted {
if self.config.dead_letter_topic().is_none() {
return Err(EventBusError::Validation(format!(
"retry exhausted ({}/{}) but no dead_letter_topic configured for topic {}",
self.state.attempt,
self.state.max_attempt,
self.config.topic().as_str(),
)));
}
self.publish_dead_letter(&reason_str).await?;
} else {
let mut retried = Message::clone(&self.message);
retried.headers.insert(
HEADER_RETRY_ATTEMPT.to_string(),
self.state.attempt.to_string(),
);
retried
.headers
.insert(HEADER_RETRY_REASON.to_string(), reason_str);
self.backend
.publish(self.config.topic().as_str(), retried)
.await?;
}
self.mark_acked().await
})
}
}