use async_trait::async_trait;
use std::time::Duration;
use crate::error::WorkerResult;
use crate::message::ReceivedMessage;
#[derive(Debug, thiserror::Error)]
pub enum ReceiveResult<T> {
#[error("Message received")]
Message(Box<ReceivedMessage<T>>),
#[error("Backend shutting down")]
Shutdown,
#[error("Connection lost: {reason}")]
ConnectionLost {
reason: String,
},
#[error("Operation timed out")]
Timeout,
#[error("Consumer channel closed")]
ChannelClosed,
#[error("Consumer cancelled")]
ConsumerCancelled,
#[error("Retryable error: {reason}")]
Retryable {
reason: String,
retry_after: Option<Duration>,
},
}
impl<T> ReceiveResult<T> {
pub fn is_message(&self) -> bool {
matches!(self, ReceiveResult::Message(_))
}
pub fn is_shutdown(&self) -> bool {
matches!(self, ReceiveResult::Shutdown)
}
pub fn needs_reconnection(&self) -> bool {
matches!(self, ReceiveResult::ConnectionLost { .. })
}
pub fn is_retryable(&self) -> bool {
matches!(self, ReceiveResult::Retryable { .. })
}
pub fn into_message(self) -> Option<ReceivedMessage<T>> {
match self {
ReceiveResult::Message(msg) => Some(*msg),
_ => None,
}
}
pub fn status_str(&self) -> &'static str {
match self {
ReceiveResult::Message(_) => "message",
ReceiveResult::Shutdown => "shutdown",
ReceiveResult::ConnectionLost { .. } => "connection_lost",
ReceiveResult::Timeout => "timeout",
ReceiveResult::ChannelClosed => "channel_closed",
ReceiveResult::ConsumerCancelled => "consumer_cancelled",
ReceiveResult::Retryable { .. } => "retryable",
}
}
}
#[async_trait]
pub trait MessageBackend: Send + Sync {
async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>>;
async fn ack(&self, message_id: &str) -> WorkerResult<()>;
async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()>;
async fn health_check(&self) -> WorkerResult<()>;
async fn shutdown(&self) -> WorkerResult<()>;
}