Skip to main content

celers_kombu/
types.rs

1//! Core broker types and traits.
2
3use async_trait::async_trait;
4use celers_protocol::Message;
5use std::collections::HashMap;
6use std::time::Duration;
7
8use crate::{QueueMode, Result};
9
10/// Message envelope (message + metadata)
11///
12/// # Examples
13///
14/// ```
15/// use celers_kombu::Envelope;
16/// use celers_protocol::Message;
17/// use uuid::Uuid;
18///
19/// let task_id = Uuid::new_v4();
20/// let message = Message::new("my_task".to_string(), task_id, vec![1, 2, 3]);
21/// let envelope = Envelope::new(message, "delivery-tag-123".to_string());
22///
23/// assert_eq!(envelope.delivery_tag, "delivery-tag-123");
24/// assert!(!envelope.is_redelivered());
25/// assert_eq!(envelope.task_name(), "my_task");
26/// assert_eq!(envelope.task_id(), task_id);
27/// ```
28#[derive(Debug, Clone)]
29pub struct Envelope {
30    /// The actual message
31    pub message: Message,
32
33    /// Delivery tag (for acknowledgment)
34    pub delivery_tag: String,
35
36    /// Redelivery flag
37    pub redelivered: bool,
38}
39
40impl Envelope {
41    /// Create a new envelope
42    pub fn new(message: Message, delivery_tag: String) -> Self {
43        Self {
44            message,
45            delivery_tag,
46            redelivered: false,
47        }
48    }
49
50    /// Check if this message was redelivered
51    pub fn is_redelivered(&self) -> bool {
52        self.redelivered
53    }
54
55    /// Get the task ID from the message
56    pub fn task_id(&self) -> uuid::Uuid {
57        self.message.task_id()
58    }
59
60    /// Get the task name from the message
61    pub fn task_name(&self) -> &str {
62        self.message.task_name()
63    }
64}
65
66impl std::fmt::Display for Envelope {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(
69            f,
70            "Envelope[tag={}] task={} id={}{}",
71            self.delivery_tag,
72            self.task_name(),
73            &self.task_id().to_string()[..8],
74            if self.redelivered {
75                " (redelivered)"
76            } else {
77                ""
78            }
79        )
80    }
81}
82
83/// Transport trait (low-level broker connection)
84#[async_trait]
85pub trait Transport: Send + Sync {
86    /// Connect to the broker
87    async fn connect(&mut self) -> Result<()>;
88
89    /// Disconnect from the broker
90    async fn disconnect(&mut self) -> Result<()>;
91
92    /// Check if connected
93    fn is_connected(&self) -> bool;
94
95    /// Get transport name
96    fn name(&self) -> &str;
97}
98
99/// Producer trait (message publishing)
100#[async_trait]
101pub trait Producer: Transport {
102    /// Publish a message to a queue
103    async fn publish(&mut self, queue: &str, message: Message) -> Result<()>;
104
105    /// Publish a message with routing key
106    async fn publish_with_routing(
107        &mut self,
108        exchange: &str,
109        routing_key: &str,
110        message: Message,
111    ) -> Result<()>;
112}
113
114/// Consumer trait (message consuming)
115#[async_trait]
116pub trait Consumer: Transport {
117    /// Consume a message from a queue (blocking with timeout)
118    async fn consume(&mut self, queue: &str, timeout: Duration) -> Result<Option<Envelope>>;
119
120    /// Acknowledge a message
121    async fn ack(&mut self, delivery_tag: &str) -> Result<()>;
122
123    /// Reject a message (requeue or send to DLQ)
124    async fn reject(&mut self, delivery_tag: &str, requeue: bool) -> Result<()>;
125
126    /// Get queue size
127    async fn queue_size(&mut self, queue: &str) -> Result<usize>;
128}
129
130/// Full broker trait (combines producer and consumer)
131#[async_trait]
132pub trait Broker: Producer + Consumer + Transport {
133    /// Purge a queue (remove all messages)
134    async fn purge(&mut self, queue: &str) -> Result<usize>;
135
136    /// Create a queue
137    async fn create_queue(&mut self, queue: &str, mode: QueueMode) -> Result<()>;
138
139    /// Delete a queue
140    async fn delete_queue(&mut self, queue: &str) -> Result<()>;
141
142    /// List all queues
143    async fn list_queues(&mut self) -> Result<Vec<String>>;
144}
145
146/// Queue configuration
147///
148/// # Examples
149///
150/// ```
151/// use celers_kombu::{QueueConfig, QueueMode};
152/// use std::time::Duration;
153///
154/// let config = QueueConfig::new("my_queue".to_string())
155///     .with_mode(QueueMode::Priority)
156///     .with_ttl(Duration::from_secs(3600))
157///     .with_durable(true)
158///     .with_max_message_size(1024 * 1024);
159///
160/// assert_eq!(config.name, "my_queue");
161/// assert_eq!(config.mode, QueueMode::Priority);
162/// assert_eq!(config.message_ttl, Some(Duration::from_secs(3600)));
163/// assert!(config.durable);
164/// assert_eq!(config.max_message_size, Some(1024 * 1024));
165/// ```
166#[derive(Debug, Clone)]
167pub struct QueueConfig {
168    /// Queue name
169    pub name: String,
170
171    /// Queue mode
172    pub mode: QueueMode,
173
174    /// Durable (survive broker restart)
175    pub durable: bool,
176
177    /// Auto-delete (delete when no consumers)
178    pub auto_delete: bool,
179
180    /// Maximum message size
181    pub max_message_size: Option<usize>,
182
183    /// Message TTL (time-to-live)
184    pub message_ttl: Option<Duration>,
185}
186
187impl QueueConfig {
188    pub fn new(name: String) -> Self {
189        Self {
190            name,
191            mode: QueueMode::Fifo,
192            durable: true,
193            auto_delete: false,
194            max_message_size: None,
195            message_ttl: None,
196        }
197    }
198
199    pub fn with_mode(mut self, mode: QueueMode) -> Self {
200        self.mode = mode;
201        self
202    }
203
204    pub fn with_ttl(mut self, ttl: Duration) -> Self {
205        self.message_ttl = Some(ttl);
206        self
207    }
208
209    /// Set durability
210    pub fn with_durable(mut self, durable: bool) -> Self {
211        self.durable = durable;
212        self
213    }
214
215    /// Set auto-delete
216    pub fn with_auto_delete(mut self, auto_delete: bool) -> Self {
217        self.auto_delete = auto_delete;
218        self
219    }
220
221    /// Set max message size
222    pub fn with_max_message_size(mut self, size: usize) -> Self {
223        self.max_message_size = Some(size);
224        self
225    }
226}
227
228// =============================================================================
229// Batch Operations
230// =============================================================================
231
232/// Result of a batch publish operation
233///
234/// # Examples
235///
236/// ```
237/// use celers_kombu::BatchPublishResult;
238/// use std::collections::HashMap;
239///
240/// // Successful batch publish
241/// let result = BatchPublishResult::success(10);
242/// assert_eq!(result.succeeded, 10);
243/// assert_eq!(result.failed, 0);
244/// assert!(result.is_complete_success());
245/// assert_eq!(result.total(), 10);
246///
247/// // Partial failure
248/// let mut errors = HashMap::new();
249/// errors.insert(2, "network error".to_string());
250/// let result = BatchPublishResult {
251///     succeeded: 9,
252///     failed: 1,
253///     errors,
254/// };
255/// assert!(!result.is_complete_success());
256/// assert_eq!(result.total(), 10);
257/// ```
258#[derive(Debug, Clone)]
259pub struct BatchPublishResult {
260    /// Number of successfully published messages
261    pub succeeded: usize,
262    /// Number of failed messages
263    pub failed: usize,
264    /// Error details for failed messages (index -> error message)
265    pub errors: HashMap<usize, String>,
266}
267
268impl BatchPublishResult {
269    /// Create a successful result
270    pub fn success(count: usize) -> Self {
271        Self {
272            succeeded: count,
273            failed: 0,
274            errors: HashMap::new(),
275        }
276    }
277
278    /// Check if all messages were published successfully
279    pub fn is_complete_success(&self) -> bool {
280        self.failed == 0
281    }
282
283    /// Get total number of messages attempted
284    pub fn total(&self) -> usize {
285        self.succeeded + self.failed
286    }
287}
288
289/// Batch producer trait (batch message publishing)
290#[async_trait]
291pub trait BatchProducer: Producer {
292    /// Publish multiple messages to a queue in batch
293    async fn publish_batch(
294        &mut self,
295        queue: &str,
296        messages: Vec<Message>,
297    ) -> Result<BatchPublishResult>;
298
299    /// Publish multiple messages with routing in batch
300    async fn publish_batch_with_routing(
301        &mut self,
302        exchange: &str,
303        routing_key: &str,
304        messages: Vec<Message>,
305    ) -> Result<BatchPublishResult>;
306}
307
308/// Batch consumer trait (batch message consuming)
309#[async_trait]
310pub trait BatchConsumer: Consumer {
311    /// Consume multiple messages from a queue (up to max_messages)
312    async fn consume_batch(
313        &mut self,
314        queue: &str,
315        max_messages: usize,
316        timeout: Duration,
317    ) -> Result<Vec<Envelope>>;
318
319    /// Acknowledge multiple messages
320    async fn ack_batch(&mut self, delivery_tags: &[String]) -> Result<()>;
321
322    /// Reject multiple messages
323    async fn reject_batch(&mut self, delivery_tags: &[String], requeue: bool) -> Result<()>;
324}