foxtive_worker/backends/contract.rs
1use async_trait::async_trait;
2use std::time::Duration;
3
4use crate::error::WorkerResult;
5use crate::message::ReceivedMessage;
6
7/// Result of a receive operation from a message backend.
8///
9/// This enum provides explicit semantics for different receive outcomes,
10/// allowing callers to distinguish between graceful shutdown, connection
11/// loss, transient errors, and successful message receipt.
12///
13/// # Examples
14/// ```no_run
15/// use foxtive_worker::backends::ReceiveResult;
16///
17/// async fn process_messages(backend: &dyn foxtive_worker::backends::MessageBackend) -> anyhow::Result<()> {
18/// loop {
19/// match backend.receive().await? {
20/// ReceiveResult::Message(msg) => {
21/// // Process the message
22/// println!("Received: {}", msg.message.id);
23/// }
24/// ReceiveResult::Shutdown => {
25/// // Backend is shutting down gracefully
26/// break;
27/// }
28/// ReceiveResult::ConnectionLost { reason } => {
29/// // Connection lost - trigger reconnection
30/// eprintln!("Connection lost: {}", reason);
31/// return Err(anyhow::anyhow!("Connection lost: {}", reason));
32/// }
33/// _ => {
34/// // Handle other cases
35/// }
36/// }
37/// }
38/// Ok(())
39/// }
40/// ```
41#[derive(Debug, thiserror::Error)]
42pub enum ReceiveResult<T> {
43 /// Message received successfully
44 #[error("Message received")]
45 Message(Box<ReceivedMessage<T>>),
46
47 /// Backend is shutting down gracefully (shutdown() was called)
48 #[error("Backend shutting down")]
49 Shutdown,
50
51 /// Connection lost - requires reconnection
52 #[error("Connection lost: {reason}")]
53 ConnectionLost {
54 /// Reason for connection loss
55 reason: String,
56 },
57
58 /// Operation timed out
59 #[error("Operation timed out")]
60 Timeout,
61
62 /// Consumer channel was closed unexpectedly
63 #[error("Consumer channel closed")]
64 ChannelClosed,
65
66 /// Consumer was cancelled by broker or client
67 #[error("Consumer cancelled")]
68 ConsumerCancelled,
69
70 /// Transient error that may resolve on retry
71 #[error("Retryable error: {reason}")]
72 Retryable {
73 /// Error description
74 reason: String,
75 /// Optional suggested wait time before retrying
76 retry_after: Option<Duration>,
77 },
78}
79
80impl<T> ReceiveResult<T> {
81 /// Check if a message was received
82 pub fn is_message(&self) -> bool {
83 matches!(self, ReceiveResult::Message(_))
84 }
85
86 /// Check if the backend is shutting down
87 pub fn is_shutdown(&self) -> bool {
88 matches!(self, ReceiveResult::Shutdown)
89 }
90
91 /// Check if reconnection is needed
92 pub fn needs_reconnection(&self) -> bool {
93 matches!(self, ReceiveResult::ConnectionLost { .. })
94 }
95
96 /// Check if the operation should be retried
97 pub fn is_retryable(&self) -> bool {
98 matches!(self, ReceiveResult::Retryable { .. })
99 }
100
101 /// Extract the message if present
102 pub fn into_message(self) -> Option<ReceivedMessage<T>> {
103 match self {
104 ReceiveResult::Message(msg) => Some(*msg),
105 _ => None,
106 }
107 }
108
109 /// Get a description of the result status
110 pub fn status_str(&self) -> &'static str {
111 match self {
112 ReceiveResult::Message(_) => "message",
113 ReceiveResult::Shutdown => "shutdown",
114 ReceiveResult::ConnectionLost { .. } => "connection_lost",
115 ReceiveResult::Timeout => "timeout",
116 ReceiveResult::ChannelClosed => "channel_closed",
117 ReceiveResult::ConsumerCancelled => "consumer_cancelled",
118 ReceiveResult::Retryable { .. } => "retryable",
119 }
120 }
121}
122
123/// Trait for message backend implementations.
124///
125/// Backends are responsible for receiving messages from external sources
126/// (RabbitMQ, Redis Streams, etc.) and providing acknowledgment handles.
127#[async_trait]
128pub trait MessageBackend: Send + Sync {
129 /// Receive the next message with detailed status information.
130 ///
131 /// This is the primary receive method that provides explicit semantics
132 /// for different receive outcomes (shutdown, connection lost, retryable errors, etc.).
133 ///
134 /// # Returns
135 /// * `Ok(ReceiveResult)` - Contains the receive outcome
136 /// * `Err(WorkerError)` - Fatal error occurred
137 async fn receive(&self) -> WorkerResult<ReceiveResult<serde_json::Value>>;
138
139 /// Acknowledge a message by ID.
140 ///
141 /// This is used for batch acknowledgments or when the AckHandle is not available.
142 /// For single message acks, prefer using the AckHandle on ReceivedMessage.
143 ///
144 /// # Arguments
145 /// * `message_id` - The ID of the message to acknowledge
146 async fn ack(&self, message_id: &str) -> WorkerResult<()>;
147
148 /// Negative acknowledge a message by ID.
149 ///
150 /// # Arguments
151 /// * `message_id` - The ID of the message to negative-acknowledge
152 /// * `requeue` - If true, requeue the message for redelivery
153 async fn nack(&self, message_id: &str, requeue: bool) -> WorkerResult<()>;
154
155 /// Perform a health check on the backend.
156 ///
157 /// # Returns
158 /// * `Ok(())` - Backend is healthy
159 /// * `Err(WorkerError)` - Backend is unhealthy
160 async fn health_check(&self) -> WorkerResult<()>;
161
162 /// Gracefully shutdown the backend.
163 ///
164 /// This should stop accepting new messages and clean up resources.
165 async fn shutdown(&self) -> WorkerResult<()>;
166}