celers_kombu/types.rs
1//! Core broker types and traits.
2
3use async_trait::async_trait;
4use celers_protocol::Message;
5use std::collections::HashMap;
6use std::time::Duration;
7
8use crate::{QueueMode, Result};
9
10/// Message envelope (message + metadata)
11///
12/// # Examples
13///
14/// ```
15/// use celers_kombu::Envelope;
16/// use celers_protocol::Message;
17/// use uuid::Uuid;
18///
19/// let task_id = Uuid::new_v4();
20/// let message = Message::new("my_task".to_string(), task_id, vec![1, 2, 3]);
21/// let envelope = Envelope::new(message, "delivery-tag-123".to_string());
22///
23/// assert_eq!(envelope.delivery_tag, "delivery-tag-123");
24/// assert!(!envelope.is_redelivered());
25/// assert_eq!(envelope.task_name(), "my_task");
26/// assert_eq!(envelope.task_id(), task_id);
27/// ```
28#[derive(Debug, Clone)]
29pub struct Envelope {
30 /// The actual message
31 pub message: Message,
32
33 /// Delivery tag (for acknowledgment)
34 pub delivery_tag: String,
35
36 /// Redelivery flag
37 pub redelivered: bool,
38}
39
40impl Envelope {
41 /// Create a new envelope
42 pub fn new(message: Message, delivery_tag: String) -> Self {
43 Self {
44 message,
45 delivery_tag,
46 redelivered: false,
47 }
48 }
49
50 /// Check if this message was redelivered
51 pub fn is_redelivered(&self) -> bool {
52 self.redelivered
53 }
54
55 /// Get the task ID from the message
56 pub fn task_id(&self) -> uuid::Uuid {
57 self.message.task_id()
58 }
59
60 /// Get the task name from the message
61 pub fn task_name(&self) -> &str {
62 self.message.task_name()
63 }
64}
65
66impl std::fmt::Display for Envelope {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(
69 f,
70 "Envelope[tag={}] task={} id={}{}",
71 self.delivery_tag,
72 self.task_name(),
73 &self.task_id().to_string()[..8],
74 if self.redelivered {
75 " (redelivered)"
76 } else {
77 ""
78 }
79 )
80 }
81}
82
83/// Transport trait (low-level broker connection)
84#[async_trait]
85pub trait Transport: Send + Sync {
86 /// Connect to the broker
87 async fn connect(&mut self) -> Result<()>;
88
89 /// Disconnect from the broker
90 async fn disconnect(&mut self) -> Result<()>;
91
92 /// Check if connected
93 fn is_connected(&self) -> bool;
94
95 /// Get transport name
96 fn name(&self) -> &str;
97}
98
99/// Producer trait (message publishing)
100#[async_trait]
101pub trait Producer: Transport {
102 /// Publish a message to a queue
103 async fn publish(&mut self, queue: &str, message: Message) -> Result<()>;
104
105 /// Publish a message with routing key
106 async fn publish_with_routing(
107 &mut self,
108 exchange: &str,
109 routing_key: &str,
110 message: Message,
111 ) -> Result<()>;
112}
113
114/// Consumer trait (message consuming)
115#[async_trait]
116pub trait Consumer: Transport {
117 /// Consume a message from a queue (blocking with timeout)
118 async fn consume(&mut self, queue: &str, timeout: Duration) -> Result<Option<Envelope>>;
119
120 /// Acknowledge a message
121 async fn ack(&mut self, delivery_tag: &str) -> Result<()>;
122
123 /// Reject a message (requeue or send to DLQ)
124 async fn reject(&mut self, delivery_tag: &str, requeue: bool) -> Result<()>;
125
126 /// Get queue size
127 async fn queue_size(&mut self, queue: &str) -> Result<usize>;
128}
129
130/// Full broker trait (combines producer and consumer)
131#[async_trait]
132pub trait Broker: Producer + Consumer + Transport {
133 /// Purge a queue (remove all messages)
134 async fn purge(&mut self, queue: &str) -> Result<usize>;
135
136 /// Create a queue
137 async fn create_queue(&mut self, queue: &str, mode: QueueMode) -> Result<()>;
138
139 /// Delete a queue
140 async fn delete_queue(&mut self, queue: &str) -> Result<()>;
141
142 /// List all queues
143 async fn list_queues(&mut self) -> Result<Vec<String>>;
144}
145
146/// Queue configuration
147///
148/// # Examples
149///
150/// ```
151/// use celers_kombu::{QueueConfig, QueueMode};
152/// use std::time::Duration;
153///
154/// let config = QueueConfig::new("my_queue".to_string())
155/// .with_mode(QueueMode::Priority)
156/// .with_ttl(Duration::from_secs(3600))
157/// .with_durable(true)
158/// .with_max_message_size(1024 * 1024);
159///
160/// assert_eq!(config.name, "my_queue");
161/// assert_eq!(config.mode, QueueMode::Priority);
162/// assert_eq!(config.message_ttl, Some(Duration::from_secs(3600)));
163/// assert!(config.durable);
164/// assert_eq!(config.max_message_size, Some(1024 * 1024));
165/// ```
166#[derive(Debug, Clone)]
167pub struct QueueConfig {
168 /// Queue name
169 pub name: String,
170
171 /// Queue mode
172 pub mode: QueueMode,
173
174 /// Durable (survive broker restart)
175 pub durable: bool,
176
177 /// Auto-delete (delete when no consumers)
178 pub auto_delete: bool,
179
180 /// Maximum message size
181 pub max_message_size: Option<usize>,
182
183 /// Message TTL (time-to-live)
184 pub message_ttl: Option<Duration>,
185}
186
187impl QueueConfig {
188 pub fn new(name: String) -> Self {
189 Self {
190 name,
191 mode: QueueMode::Fifo,
192 durable: true,
193 auto_delete: false,
194 max_message_size: None,
195 message_ttl: None,
196 }
197 }
198
199 pub fn with_mode(mut self, mode: QueueMode) -> Self {
200 self.mode = mode;
201 self
202 }
203
204 pub fn with_ttl(mut self, ttl: Duration) -> Self {
205 self.message_ttl = Some(ttl);
206 self
207 }
208
209 /// Set durability
210 pub fn with_durable(mut self, durable: bool) -> Self {
211 self.durable = durable;
212 self
213 }
214
215 /// Set auto-delete
216 pub fn with_auto_delete(mut self, auto_delete: bool) -> Self {
217 self.auto_delete = auto_delete;
218 self
219 }
220
221 /// Set max message size
222 pub fn with_max_message_size(mut self, size: usize) -> Self {
223 self.max_message_size = Some(size);
224 self
225 }
226}
227
228// =============================================================================
229// Batch Operations
230// =============================================================================
231
232/// Result of a batch publish operation
233///
234/// # Examples
235///
236/// ```
237/// use celers_kombu::BatchPublishResult;
238/// use std::collections::HashMap;
239///
240/// // Successful batch publish
241/// let result = BatchPublishResult::success(10);
242/// assert_eq!(result.succeeded, 10);
243/// assert_eq!(result.failed, 0);
244/// assert!(result.is_complete_success());
245/// assert_eq!(result.total(), 10);
246///
247/// // Partial failure
248/// let mut errors = HashMap::new();
249/// errors.insert(2, "network error".to_string());
250/// let result = BatchPublishResult {
251/// succeeded: 9,
252/// failed: 1,
253/// errors,
254/// };
255/// assert!(!result.is_complete_success());
256/// assert_eq!(result.total(), 10);
257/// ```
258#[derive(Debug, Clone)]
259pub struct BatchPublishResult {
260 /// Number of successfully published messages
261 pub succeeded: usize,
262 /// Number of failed messages
263 pub failed: usize,
264 /// Error details for failed messages (index -> error message)
265 pub errors: HashMap<usize, String>,
266}
267
268impl BatchPublishResult {
269 /// Create a successful result
270 pub fn success(count: usize) -> Self {
271 Self {
272 succeeded: count,
273 failed: 0,
274 errors: HashMap::new(),
275 }
276 }
277
278 /// Check if all messages were published successfully
279 pub fn is_complete_success(&self) -> bool {
280 self.failed == 0
281 }
282
283 /// Get total number of messages attempted
284 pub fn total(&self) -> usize {
285 self.succeeded + self.failed
286 }
287}
288
289/// Batch producer trait (batch message publishing)
290#[async_trait]
291pub trait BatchProducer: Producer {
292 /// Publish multiple messages to a queue in batch
293 async fn publish_batch(
294 &mut self,
295 queue: &str,
296 messages: Vec<Message>,
297 ) -> Result<BatchPublishResult>;
298
299 /// Publish multiple messages with routing in batch
300 async fn publish_batch_with_routing(
301 &mut self,
302 exchange: &str,
303 routing_key: &str,
304 messages: Vec<Message>,
305 ) -> Result<BatchPublishResult>;
306}
307
308/// Batch consumer trait (batch message consuming)
309#[async_trait]
310pub trait BatchConsumer: Consumer {
311 /// Consume multiple messages from a queue (up to max_messages)
312 async fn consume_batch(
313 &mut self,
314 queue: &str,
315 max_messages: usize,
316 timeout: Duration,
317 ) -> Result<Vec<Envelope>>;
318
319 /// Acknowledge multiple messages
320 async fn ack_batch(&mut self, delivery_tags: &[String]) -> Result<()>;
321
322 /// Reject multiple messages
323 async fn reject_batch(&mut self, delivery_tags: &[String], requeue: bool) -> Result<()>;
324}