Skip to main content

foxtive_worker/backends/
contract.rs

1use async_trait::async_trait;
2use std::time::Duration;
3
4use crate::error::WorkerResult;
5use crate::message::ReceivedMessage;
6
7/// Result of a receive operation from a message backend.
8///
9/// This enum provides explicit semantics for different receive outcomes,
10/// allowing callers to distinguish between graceful shutdown, connection
11/// loss, transient errors, and successful message receipt.
12///
13/// # Examples
14/// ```no_run
15/// use foxtive_worker::backends::ReceiveResult;
16///
17/// async fn process_messages(backend: &dyn foxtive_worker::backends::MessageBackend) -> anyhow::Result<()> {
18///     loop {
19///         match backend.receive().await? {
20///             ReceiveResult::Message(msg) => {
21///                 // Process the message
22///                 println!("Received: {}", msg.message.id);
23///             }
24///             ReceiveResult::Shutdown => {
25///                 // Backend is shutting down gracefully
26///                 break;
27///             }
28///             ReceiveResult::ConnectionLost { reason } => {
29///                 // Connection lost - trigger reconnection
30///                 eprintln!("Connection lost: {}", reason);
31///                 return Err(anyhow::anyhow!("Connection lost: {}", reason));
32///             }
33///             _ => {
34///                 // Handle other cases
35///             }
36///         }
37///     }
38///     Ok(())
39/// }
40/// ```
41#[derive(Debug, thiserror::Error)]
42pub enum ReceiveResult<T> {
43    /// Message received successfully
44    #[error("Message received")]
45    Message(ReceivedMessage<T>),
46    
47    /// Backend is shutting down gracefully (shutdown() was called)
48    #[error("Backend shutting down")]
49    Shutdown,
50    
51    /// Connection lost - requires reconnection
52    #[error("Connection lost: {reason}")]
53    ConnectionLost { 
54        /// Reason for connection loss
55        reason: String 
56    },
57    
58    /// Operation timed out
59    #[error("Operation timed out")]
60    Timeout,
61    
62    /// Consumer channel was closed unexpectedly
63    #[error("Consumer channel closed")]
64    ChannelClosed,
65    
66    /// Consumer was cancelled by broker or client
67    #[error("Consumer cancelled")]
68    ConsumerCancelled,
69    
70    /// Transient error that may resolve on retry
71    #[error("Retryable error: {reason}")]
72    Retryable { 
73        /// Error description
74        reason: String, 
75        /// Optional suggested wait time before retrying
76        retry_after: Option<Duration> 
77    },
78}
79
80impl<T> ReceiveResult<T> {
81    /// Check if a message was received
82    pub fn is_message(&self) -> bool {
83        matches!(self, ReceiveResult::Message(_))
84    }
85
86    /// Check if the backend is shutting down
87    pub fn is_shutdown(&self) -> bool {
88        matches!(self, ReceiveResult::Shutdown)
89    }
90
91    /// Check if reconnection is needed
92    pub fn needs_reconnection(&self) -> bool {
93        matches!(self, ReceiveResult::ConnectionLost { .. })
94    }
95
96    /// Check if the operation should be retried
97    pub fn is_retryable(&self) -> bool {
98        matches!(self, ReceiveResult::Retryable { .. })
99    }
100
101    /// Extract the message if present
102    pub fn into_message(self) -> Option<ReceivedMessage<T>> {
103        match self {
104            ReceiveResult::Message(msg) => Some(msg),
105            _ => None,
106        }
107    }
108
109    /// Get a description of the result status
110    pub fn status_str(&self) -> &'static str {
111        match self {
112            ReceiveResult::Message(_) => "message",
113            ReceiveResult::Shutdown => "shutdown",
114            ReceiveResult::ConnectionLost { .. } => "connection_lost",
115            ReceiveResult::Timeout => "timeout",
116            ReceiveResult::ChannelClosed => "channel_closed",
117            ReceiveResult::ConsumerCancelled => "consumer_cancelled",
118            ReceiveResult::Retryable { .. } => "retryable",
119        }
120    }
121}
122
123/// Trait for message backend implementations.
124///
125/// Backends are responsible for receiving messages from external sources
126/// (RabbitMQ, Redis Streams, etc.) and providing acknowledgment handles.
127#[async_trait]
128pub trait MessageBackend: Send + Sync {
129    /// Receive the next message with detailed status information.
130    ///
131    /// This is the primary receive method that provides explicit semantics
132    /// for different receive outcomes (shutdown, connection lost, retryable errors, etc.).
133    ///
134    /// # Returns
135    /// * `Ok(ReceiveResult)` - Contains the receive outcome
136    /// * `Err(WorkerError)` - Fatal error occurred
137    async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>>;
138
139    /// Acknowledge a message by ID.
140    ///
141    /// This is used for batch acknowledgments or when the AckHandle is not available.
142    /// For single message acks, prefer using the AckHandle on ReceivedMessage.
143    ///
144    /// # Arguments
145    /// * `message_id` - The ID of the message to acknowledge
146    async fn ack(&self, message_id: &str) -> WorkerResult<()>;
147
148    /// Negative acknowledge a message by ID.
149    ///
150    /// # Arguments
151    /// * `message_id` - The ID of the message to negative-acknowledge
152    /// * `requeue` - If true, requeue the message for redelivery
153    async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()>;
154
155    /// Perform a health check on the backend.
156    ///
157    /// # Returns
158    /// * `Ok(())` - Backend is healthy
159    /// * `Err(WorkerError)` - Backend is unhealthy
160    async fn health_check(&self) -> WorkerResult<()>;
161
162    /// Gracefully shutdown the backend.
163    ///
164    /// This should stop accepting new messages and clean up resources.
165    async fn shutdown(&self) -> WorkerResult<()>;
166}