mq_bridge/
command_handler.rs1use crate::traits::{send_batch_helper, Handler, MessagePublisher};
7use crate::traits::{Handled, HandlerError};
8use crate::CanonicalMessage;
9use async_trait::async_trait;
10use std::any::Any;
11use std::future::Future;
12use std::sync::Arc;
13
14use crate::traits::{PublisherError, Sent, SentBatch};
15#[async_trait]
16impl<F, Fut> Handler for F
17where
18 F: Fn(CanonicalMessage) -> Fut + Send + Sync + 'static,
19 Fut: Future<Output = Result<Handled, HandlerError>> + Send,
20{
21 async fn handle(&self, msg: CanonicalMessage) -> Result<Handled, HandlerError> {
22 self(msg).await
23 }
24}
25
26pub struct CommandPublisher {
29 inner: Box<dyn MessagePublisher>,
30 handler: Arc<dyn Handler>,
31}
32
33impl CommandPublisher {
34 pub fn new(inner: impl MessagePublisher, handler: impl Handler + 'static) -> Self {
35 Self {
36 inner: Box::new(inner),
37 handler: Arc::new(handler),
38 }
39 }
40}
41
42#[async_trait]
43impl MessagePublisher for CommandPublisher {
44 async fn send(&self, message: CanonicalMessage) -> Result<Sent, PublisherError> {
45 match self.handler.handle(message).await {
46 Ok(Handled::Publish(response_msg)) => self.inner.send(response_msg).await, Ok(Handled::Ack) => Ok(Sent::Ack),
48 Err(e) => Err(e), }
50 }
51
52 async fn send_batch(
53 &self,
54 messages: Vec<CanonicalMessage>,
55 ) -> Result<SentBatch, PublisherError> {
56 send_batch_helper(self, messages, |publisher, message| {
57 Box::pin(publisher.send(message))
58 })
59 .await
60 }
61
62 async fn flush(&self) -> anyhow::Result<()> {
63 self.inner.flush().await
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use std::sync::atomic::{AtomicBool, Ordering};
74
75 use super::*;
76 use crate::endpoints::memory::MemoryPublisher;
77
78 #[tokio::test]
79 async fn test_command_handler_produces_response() {
80 let memory_publisher = MemoryPublisher::new_local("test_command_out_resp", 10);
81 let channel = memory_publisher.channel();
82
83 let handler = |msg: CanonicalMessage| async move {
84 let response_payload = format!("response_to_{}", String::from_utf8_lossy(&msg.payload));
85 Ok(Handled::Publish(response_payload.into()))
86 };
87
88 let publisher = CommandPublisher::new(memory_publisher, handler);
89
90 publisher.send("command1".into()).await.unwrap();
91
92 let received = channel.drain_messages();
93 assert_eq!(received.len(), 1);
94 assert_eq!(received[0].payload, "response_to_command1".as_bytes());
95 }
96
97 #[tokio::test]
98 async fn test_command_handler_acks() {
99 let memory_publisher = MemoryPublisher::new_local("test_command_out_ack", 10);
100 let channel = memory_publisher.channel();
101
102 let handler = |_msg: CanonicalMessage| async move { Ok(Handled::Ack) };
103
104 let publisher = CommandPublisher::new(memory_publisher, handler);
105
106 let result = publisher.send("command1".into()).await.unwrap();
107
108 assert!(matches!(result, Sent::Ack));
109 let received = channel.drain_messages();
110 assert_eq!(received.len(), 0);
111 }
112
113 #[tokio::test]
114 async fn test_command_handler_retryable_error() {
115 let memory_publisher = MemoryPublisher::new_local("test_command_out_err", 10);
116
117 let handler = |_msg: CanonicalMessage| async move {
118 Err(HandlerError::Retryable(anyhow::anyhow!("db is down")))
119 };
120
121 let publisher = CommandPublisher::new(memory_publisher, handler);
122 let result = publisher.send("command1".into()).await;
123
124 assert!(result.is_err());
125 let err = result.unwrap_err();
126 assert!(matches!(err, PublisherError::Retryable(_)));
128 }
129
130 #[tokio::test]
131 async fn test_command_handler_integration_with_memory_consumer() {
132 use crate::endpoints::memory::MemoryConsumer;
133 use crate::traits::MessageConsumer;
134
135 let mut consumer = MemoryConsumer::new_local("cmd_input", 10);
137 let input_channel = consumer.channel();
138
139 let memory_publisher = MemoryPublisher::new_local("cmd_output", 10);
141 let output_channel = memory_publisher.channel();
142
143 let publisher =
145 CommandPublisher::new(memory_publisher, |msg: CanonicalMessage| async move {
146 let payload = String::from_utf8_lossy(&msg.payload);
147 let response = format!("processed_{}", payload);
148 Ok(Handled::Publish(response.into()))
149 });
150
151 input_channel
153 .send_message("test_data".into())
154 .await
155 .unwrap();
156
157 let received = consumer.receive().await.unwrap();
159 let result = publisher.send(received.message).await.unwrap();
160
161 assert!(matches!(result, Sent::Ack));
163
164 let output_msgs = output_channel.drain_messages();
165 assert_eq!(output_msgs.len(), 1);
166 assert_eq!(output_msgs[0].payload.to_vec(), b"processed_test_data");
167
168 let _ = (received.commit)(crate::traits::MessageDisposition::Ack).await;
169 }
170
171 #[tokio::test(flavor = "multi_thread")]
172 async fn test_command_handler_with_route_config() {
173 use crate::models::{Endpoint, Route};
174
175 let success = Arc::new(AtomicBool::new(false));
176 let success_clone = success.clone();
177
178 let handler = move |mut msg: CanonicalMessage| {
180 success_clone.store(true, Ordering::SeqCst);
181 msg.set_payload_str(format!("modified {}", msg.get_payload_str()));
182 async move { Ok(Handled::Publish(msg)) }
183 };
184 let route = Route::new(
186 Endpoint::new_memory("route_in", 100),
187 Endpoint::new_memory("route_out", 100),
188 )
189 .with_handler(handler);
190
191 route.deploy("command_handler_test_route").await.unwrap();
193
194 let input_channel = route.input.channel().unwrap();
196 input_channel.send_message("hello".into()).await.unwrap();
197
198 let mut verifier = route.connect_to_output("verifier").await.unwrap();
200 let received = verifier.receive().await.unwrap();
201 assert_eq!(received.message.get_payload_str(), "modified hello");
202 assert!(success.load(Ordering::SeqCst));
203 Route::stop("command_handler_test_route").await;
204 }
205
206 #[tokio::test]
207 async fn test_command_handler_inner_publisher_failure() {
208 use crate::traits::MessagePublisher;
209
210 struct FailPublisher;
211 #[async_trait]
212 impl MessagePublisher for FailPublisher {
213 async fn send(&self, _msg: CanonicalMessage) -> Result<Sent, PublisherError> {
214 Err(PublisherError::NonRetryable(anyhow::anyhow!("inner fail")))
215 }
216 async fn send_batch(
217 &self,
218 _msgs: Vec<CanonicalMessage>,
219 ) -> Result<SentBatch, PublisherError> {
220 Ok(SentBatch::Ack)
221 }
222 fn as_any(&self) -> &dyn std::any::Any {
223 self
224 }
225 }
226
227 let handler = |msg: CanonicalMessage| async move { Ok(Handled::Publish(msg)) };
228 let publisher = CommandPublisher::new(FailPublisher, handler);
229 let result = publisher.send("test".into()).await;
230 assert!(result.is_err());
231 assert!(result.unwrap_err().to_string().contains("inner fail"));
232 }
233}