1use async_trait::async_trait;
4use celers_protocol::Message;
5use std::collections::HashMap;
6use std::time::Duration;
7use uuid::Uuid;
8
9use crate::{
10 BatchConsumer, BatchProducer, BatchPublishResult, Broker, BrokerError, BrokerMetrics, Consumer,
11 Envelope, HealthCheck, HealthCheckResponse, MetricsProvider, MiddlewareConsumer,
12 MiddlewareProducer, Producer, QueueMode, Result, Transport,
13};
14
15#[derive(Debug)]
20pub struct MockBroker {
21 connected: bool,
22 queues: HashMap<String, Vec<Envelope>>,
23 pending_acks: HashMap<String, Message>,
24 metrics: BrokerMetrics,
25}
26
27impl Default for MockBroker {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl MockBroker {
34 pub fn new() -> Self {
36 Self {
37 connected: false,
38 queues: HashMap::new(),
39 pending_acks: HashMap::new(),
40 metrics: BrokerMetrics::new(),
41 }
42 }
43
44 pub fn queue_len(&self, queue: &str) -> usize {
46 self.queues.get(queue).map(|q| q.len()).unwrap_or(0)
47 }
48
49 pub fn queue_names(&self) -> Vec<String> {
51 self.queues.keys().cloned().collect()
52 }
53}
54
55#[async_trait]
56impl Transport for MockBroker {
57 async fn connect(&mut self) -> Result<()> {
58 self.metrics.inc_connection_attempt();
59 self.connected = true;
60 Ok(())
61 }
62
63 async fn disconnect(&mut self) -> Result<()> {
64 self.connected = false;
65 Ok(())
66 }
67
68 fn is_connected(&self) -> bool {
69 self.connected
70 }
71
72 fn name(&self) -> &str {
73 "mock"
74 }
75}
76
77#[async_trait]
78impl Producer for MockBroker {
79 async fn publish(&mut self, queue: &str, message: Message) -> Result<()> {
80 if !self.connected {
81 self.metrics.inc_publish_error();
82 return Err(BrokerError::Connection("Not connected".to_string()));
83 }
84
85 let tag = Uuid::new_v4().to_string();
86 let envelope = Envelope::new(message, tag);
87
88 self.queues
89 .entry(queue.to_string())
90 .or_default()
91 .push(envelope);
92
93 self.metrics.inc_published();
94 Ok(())
95 }
96
97 async fn publish_with_routing(
98 &mut self,
99 _exchange: &str,
100 routing_key: &str,
101 message: Message,
102 ) -> Result<()> {
103 self.publish(routing_key, message).await
105 }
106}
107
108#[async_trait]
109impl Consumer for MockBroker {
110 async fn consume(&mut self, queue: &str, _timeout: Duration) -> Result<Option<Envelope>> {
111 if !self.connected {
112 self.metrics.inc_consume_error();
113 return Err(BrokerError::Connection("Not connected".to_string()));
114 }
115
116 let envelope = self.queues.get_mut(queue).and_then(|q| {
117 if q.is_empty() {
118 None
119 } else {
120 Some(q.remove(0))
121 }
122 });
123
124 if let Some(ref env) = envelope {
125 self.pending_acks
126 .insert(env.delivery_tag.clone(), env.message.clone());
127 self.metrics.inc_consumed();
128 }
129
130 Ok(envelope)
131 }
132
133 async fn ack(&mut self, delivery_tag: &str) -> Result<()> {
134 if self.pending_acks.remove(delivery_tag).is_some() {
135 self.metrics.inc_acknowledged();
136 Ok(())
137 } else {
138 Err(BrokerError::MessageNotFound(Uuid::nil()))
139 }
140 }
141
142 async fn reject(&mut self, delivery_tag: &str, requeue: bool) -> Result<()> {
143 if let Some(message) = self.pending_acks.remove(delivery_tag) {
144 self.metrics.inc_rejected();
145 if requeue {
146 let tag = Uuid::new_v4().to_string();
148 let mut envelope = Envelope::new(message, tag);
149 envelope.redelivered = true;
150 self.queues
151 .entry("celery".to_string())
152 .or_default()
153 .push(envelope);
154 }
155 Ok(())
156 } else {
157 Err(BrokerError::MessageNotFound(Uuid::nil()))
158 }
159 }
160
161 async fn queue_size(&mut self, queue: &str) -> Result<usize> {
162 Ok(self.queue_len(queue))
163 }
164}
165
166#[async_trait]
167impl Broker for MockBroker {
168 async fn purge(&mut self, queue: &str) -> Result<usize> {
169 let count = self.queue_len(queue);
170 if let Some(q) = self.queues.get_mut(queue) {
171 q.clear();
172 }
173 Ok(count)
174 }
175
176 async fn create_queue(&mut self, queue: &str, _mode: QueueMode) -> Result<()> {
177 self.queues.entry(queue.to_string()).or_default();
178 Ok(())
179 }
180
181 async fn delete_queue(&mut self, queue: &str) -> Result<()> {
182 self.queues.remove(queue);
183 Ok(())
184 }
185
186 async fn list_queues(&mut self) -> Result<Vec<String>> {
187 Ok(self.queue_names())
188 }
189}
190
191#[async_trait]
192impl BatchProducer for MockBroker {
193 async fn publish_batch(
194 &mut self,
195 queue: &str,
196 messages: Vec<Message>,
197 ) -> Result<BatchPublishResult> {
198 let count = messages.len();
199 for message in messages {
200 self.publish(queue, message).await?;
201 }
202 Ok(BatchPublishResult::success(count))
203 }
204
205 async fn publish_batch_with_routing(
206 &mut self,
207 exchange: &str,
208 routing_key: &str,
209 messages: Vec<Message>,
210 ) -> Result<BatchPublishResult> {
211 let count = messages.len();
212 for message in messages {
213 self.publish_with_routing(exchange, routing_key, message)
214 .await?;
215 }
216 Ok(BatchPublishResult::success(count))
217 }
218}
219
220#[async_trait]
221impl BatchConsumer for MockBroker {
222 async fn consume_batch(
223 &mut self,
224 queue: &str,
225 max_messages: usize,
226 timeout: Duration,
227 ) -> Result<Vec<Envelope>> {
228 let mut results = Vec::new();
229 for _ in 0..max_messages {
230 if let Some(envelope) = self.consume(queue, timeout).await? {
231 results.push(envelope);
232 } else {
233 break;
234 }
235 }
236 Ok(results)
237 }
238
239 async fn ack_batch(&mut self, delivery_tags: &[String]) -> Result<()> {
240 for tag in delivery_tags {
241 self.ack(tag).await?;
242 }
243 Ok(())
244 }
245
246 async fn reject_batch(&mut self, delivery_tags: &[String], requeue: bool) -> Result<()> {
247 for tag in delivery_tags {
248 self.reject(tag, requeue).await?;
249 }
250 Ok(())
251 }
252}
253
254#[async_trait]
255impl HealthCheck for MockBroker {
256 async fn health_check(&self) -> HealthCheckResponse {
257 if self.connected {
258 HealthCheckResponse::healthy("mock", "mock://localhost")
259 } else {
260 HealthCheckResponse::unhealthy("mock", "mock://localhost", "Not connected")
261 }
262 }
263
264 async fn ping(&self) -> bool {
265 self.connected
266 }
267}
268
269#[async_trait]
270impl MetricsProvider for MockBroker {
271 async fn get_metrics(&self) -> BrokerMetrics {
272 self.metrics.clone()
273 }
274
275 async fn reset_metrics(&mut self) {
276 self.metrics = BrokerMetrics::new();
277 }
278}
279
280impl<T: Producer> MiddlewareProducer for T {}
282impl<T: Consumer> MiddlewareConsumer for T {}