Skip to main content

celers_kombu/
mock.rs

1//! Mock broker implementation for testing.
2
3use async_trait::async_trait;
4use celers_protocol::Message;
5use std::collections::HashMap;
6use std::time::Duration;
7use uuid::Uuid;
8
9use crate::{
10    BatchConsumer, BatchProducer, BatchPublishResult, Broker, BrokerError, BrokerMetrics, Consumer,
11    Envelope, HealthCheck, HealthCheckResponse, MetricsProvider, MiddlewareConsumer,
12    MiddlewareProducer, Producer, QueueMode, Result, Transport,
13};
14
15// Mock Broker Implementation (for testing)
16// =============================================================================
17
18/// Mock broker for testing
19#[derive(Debug)]
20pub struct MockBroker {
21    connected: bool,
22    queues: HashMap<String, Vec<Envelope>>,
23    pending_acks: HashMap<String, Message>,
24    metrics: BrokerMetrics,
25}
26
27impl Default for MockBroker {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl MockBroker {
34    /// Create a new mock broker
35    pub fn new() -> Self {
36        Self {
37            connected: false,
38            queues: HashMap::new(),
39            pending_acks: HashMap::new(),
40            metrics: BrokerMetrics::new(),
41        }
42    }
43
44    /// Get number of messages in a queue
45    pub fn queue_len(&self, queue: &str) -> usize {
46        self.queues.get(queue).map(|q| q.len()).unwrap_or(0)
47    }
48
49    /// Get all queue names
50    pub fn queue_names(&self) -> Vec<String> {
51        self.queues.keys().cloned().collect()
52    }
53}
54
55#[async_trait]
56impl Transport for MockBroker {
57    async fn connect(&mut self) -> Result<()> {
58        self.metrics.inc_connection_attempt();
59        self.connected = true;
60        Ok(())
61    }
62
63    async fn disconnect(&mut self) -> Result<()> {
64        self.connected = false;
65        Ok(())
66    }
67
68    fn is_connected(&self) -> bool {
69        self.connected
70    }
71
72    fn name(&self) -> &str {
73        "mock"
74    }
75}
76
77#[async_trait]
78impl Producer for MockBroker {
79    async fn publish(&mut self, queue: &str, message: Message) -> Result<()> {
80        if !self.connected {
81            self.metrics.inc_publish_error();
82            return Err(BrokerError::Connection("Not connected".to_string()));
83        }
84
85        let tag = Uuid::new_v4().to_string();
86        let envelope = Envelope::new(message, tag);
87
88        self.queues
89            .entry(queue.to_string())
90            .or_default()
91            .push(envelope);
92
93        self.metrics.inc_published();
94        Ok(())
95    }
96
97    async fn publish_with_routing(
98        &mut self,
99        _exchange: &str,
100        routing_key: &str,
101        message: Message,
102    ) -> Result<()> {
103        // For mock, just use routing_key as queue name
104        self.publish(routing_key, message).await
105    }
106}
107
108#[async_trait]
109impl Consumer for MockBroker {
110    async fn consume(&mut self, queue: &str, _timeout: Duration) -> Result<Option<Envelope>> {
111        if !self.connected {
112            self.metrics.inc_consume_error();
113            return Err(BrokerError::Connection("Not connected".to_string()));
114        }
115
116        let envelope = self.queues.get_mut(queue).and_then(|q| {
117            if q.is_empty() {
118                None
119            } else {
120                Some(q.remove(0))
121            }
122        });
123
124        if let Some(ref env) = envelope {
125            self.pending_acks
126                .insert(env.delivery_tag.clone(), env.message.clone());
127            self.metrics.inc_consumed();
128        }
129
130        Ok(envelope)
131    }
132
133    async fn ack(&mut self, delivery_tag: &str) -> Result<()> {
134        if self.pending_acks.remove(delivery_tag).is_some() {
135            self.metrics.inc_acknowledged();
136            Ok(())
137        } else {
138            Err(BrokerError::MessageNotFound(Uuid::nil()))
139        }
140    }
141
142    async fn reject(&mut self, delivery_tag: &str, requeue: bool) -> Result<()> {
143        if let Some(message) = self.pending_acks.remove(delivery_tag) {
144            self.metrics.inc_rejected();
145            if requeue {
146                // Requeue to the default queue (we don't track which queue it came from)
147                let tag = Uuid::new_v4().to_string();
148                let mut envelope = Envelope::new(message, tag);
149                envelope.redelivered = true;
150                self.queues
151                    .entry("celery".to_string())
152                    .or_default()
153                    .push(envelope);
154            }
155            Ok(())
156        } else {
157            Err(BrokerError::MessageNotFound(Uuid::nil()))
158        }
159    }
160
161    async fn queue_size(&mut self, queue: &str) -> Result<usize> {
162        Ok(self.queue_len(queue))
163    }
164}
165
166#[async_trait]
167impl Broker for MockBroker {
168    async fn purge(&mut self, queue: &str) -> Result<usize> {
169        let count = self.queue_len(queue);
170        if let Some(q) = self.queues.get_mut(queue) {
171            q.clear();
172        }
173        Ok(count)
174    }
175
176    async fn create_queue(&mut self, queue: &str, _mode: QueueMode) -> Result<()> {
177        self.queues.entry(queue.to_string()).or_default();
178        Ok(())
179    }
180
181    async fn delete_queue(&mut self, queue: &str) -> Result<()> {
182        self.queues.remove(queue);
183        Ok(())
184    }
185
186    async fn list_queues(&mut self) -> Result<Vec<String>> {
187        Ok(self.queue_names())
188    }
189}
190
191#[async_trait]
192impl BatchProducer for MockBroker {
193    async fn publish_batch(
194        &mut self,
195        queue: &str,
196        messages: Vec<Message>,
197    ) -> Result<BatchPublishResult> {
198        let count = messages.len();
199        for message in messages {
200            self.publish(queue, message).await?;
201        }
202        Ok(BatchPublishResult::success(count))
203    }
204
205    async fn publish_batch_with_routing(
206        &mut self,
207        exchange: &str,
208        routing_key: &str,
209        messages: Vec<Message>,
210    ) -> Result<BatchPublishResult> {
211        let count = messages.len();
212        for message in messages {
213            self.publish_with_routing(exchange, routing_key, message)
214                .await?;
215        }
216        Ok(BatchPublishResult::success(count))
217    }
218}
219
220#[async_trait]
221impl BatchConsumer for MockBroker {
222    async fn consume_batch(
223        &mut self,
224        queue: &str,
225        max_messages: usize,
226        timeout: Duration,
227    ) -> Result<Vec<Envelope>> {
228        let mut results = Vec::new();
229        for _ in 0..max_messages {
230            if let Some(envelope) = self.consume(queue, timeout).await? {
231                results.push(envelope);
232            } else {
233                break;
234            }
235        }
236        Ok(results)
237    }
238
239    async fn ack_batch(&mut self, delivery_tags: &[String]) -> Result<()> {
240        for tag in delivery_tags {
241            self.ack(tag).await?;
242        }
243        Ok(())
244    }
245
246    async fn reject_batch(&mut self, delivery_tags: &[String], requeue: bool) -> Result<()> {
247        for tag in delivery_tags {
248            self.reject(tag, requeue).await?;
249        }
250        Ok(())
251    }
252}
253
254#[async_trait]
255impl HealthCheck for MockBroker {
256    async fn health_check(&self) -> HealthCheckResponse {
257        if self.connected {
258            HealthCheckResponse::healthy("mock", "mock://localhost")
259        } else {
260            HealthCheckResponse::unhealthy("mock", "mock://localhost", "Not connected")
261        }
262    }
263
264    async fn ping(&self) -> bool {
265        self.connected
266    }
267}
268
269#[async_trait]
270impl MetricsProvider for MockBroker {
271    async fn get_metrics(&self) -> BrokerMetrics {
272        self.metrics.clone()
273    }
274
275    async fn reset_metrics(&mut self) {
276        self.metrics = BrokerMetrics::new();
277    }
278}
279
280// Blanket implementations for middleware traits
281impl<T: Producer> MiddlewareProducer for T {}
282impl<T: Consumer> MiddlewareConsumer for T {}