Skip to main content

mq_bridge/
command_handler.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6use crate::traits::{send_batch_helper, Handler, MessagePublisher};
7use crate::traits::{Handled, HandlerError};
8use crate::CanonicalMessage;
9use async_trait::async_trait;
10use std::any::Any;
11use std::future::Future;
12use std::sync::Arc;
13
14use crate::traits::{PublisherError, Sent, SentBatch};
15#[async_trait]
16impl<F, Fut> Handler for F
17where
18    F: Fn(CanonicalMessage) -> Fut + Send + Sync + 'static,
19    Fut: Future<Output = Result<Handled, HandlerError>> + Send,
20{
21    async fn handle(&self, msg: CanonicalMessage) -> Result<Handled, HandlerError> {
22        self(msg).await
23    }
24}
25
26/// A publisher middleware that intercepts messages and passes them to a `Handler`.
27/// If the handler returns a new message, it is passed to the inner publisher.
28pub struct CommandPublisher {
29    inner: Box<dyn MessagePublisher>,
30    handler: Arc<dyn Handler>,
31}
32
33impl CommandPublisher {
34    pub fn new(inner: impl MessagePublisher, handler: impl Handler + 'static) -> Self {
35        Self {
36            inner: Box::new(inner),
37            handler: Arc::new(handler),
38        }
39    }
40}
41
42#[async_trait]
43impl MessagePublisher for CommandPublisher {
44    async fn send(&self, message: CanonicalMessage) -> Result<Sent, PublisherError> {
45        match self.handler.handle(message).await {
46            Ok(Handled::Publish(response_msg)) => self.inner.send(response_msg).await, // Propagate result
47            Ok(Handled::Ack) => Ok(Sent::Ack),
48            Err(e) => Err(e), // Converts HandlerError to PublisherError
49        }
50    }
51
52    async fn send_batch(
53        &self,
54        messages: Vec<CanonicalMessage>,
55    ) -> Result<SentBatch, PublisherError> {
56        send_batch_helper(self, messages, |publisher, message| {
57            Box::pin(publisher.send(message))
58        })
59        .await
60    }
61
62    async fn flush(&self) -> anyhow::Result<()> {
63        self.inner.flush().await
64    }
65
66    fn as_any(&self) -> &dyn Any {
67        self
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use std::sync::atomic::{AtomicBool, Ordering};
74
75    use super::*;
76    use crate::endpoints::memory::MemoryPublisher;
77
78    #[tokio::test]
79    async fn test_command_handler_produces_response() {
80        let memory_publisher = MemoryPublisher::new_local("test_command_out_resp", 10);
81        let channel = memory_publisher.channel();
82
83        let handler = |msg: CanonicalMessage| async move {
84            let response_payload = format!("response_to_{}", String::from_utf8_lossy(&msg.payload));
85            Ok(Handled::Publish(response_payload.into()))
86        };
87
88        let publisher = CommandPublisher::new(memory_publisher, handler);
89
90        publisher.send("command1".into()).await.unwrap();
91
92        let received = channel.drain_messages();
93        assert_eq!(received.len(), 1);
94        assert_eq!(received[0].payload, "response_to_command1".as_bytes());
95    }
96
97    #[tokio::test]
98    async fn test_command_handler_acks() {
99        let memory_publisher = MemoryPublisher::new_local("test_command_out_ack", 10);
100        let channel = memory_publisher.channel();
101
102        let handler = |_msg: CanonicalMessage| async move { Ok(Handled::Ack) };
103
104        let publisher = CommandPublisher::new(memory_publisher, handler);
105
106        let result = publisher.send("command1".into()).await.unwrap();
107
108        assert!(matches!(result, Sent::Ack));
109        let received = channel.drain_messages();
110        assert_eq!(received.len(), 0);
111    }
112
113    #[tokio::test]
114    async fn test_command_handler_retryable_error() {
115        let memory_publisher = MemoryPublisher::new_local("test_command_out_err", 10);
116
117        let handler = |_msg: CanonicalMessage| async move {
118            Err(HandlerError::Retryable(anyhow::anyhow!("db is down")))
119        };
120
121        let publisher = CommandPublisher::new(memory_publisher, handler);
122        let result = publisher.send("command1".into()).await;
123
124        assert!(result.is_err());
125        let err = result.unwrap_err();
126        // The HandlerError is converted into a PublisherError
127        assert!(matches!(err, PublisherError::Retryable(_)));
128    }
129
130    #[tokio::test]
131    async fn test_command_handler_integration_with_memory_consumer() {
132        use crate::endpoints::memory::MemoryConsumer;
133        use crate::traits::MessageConsumer;
134
135        // 1. Setup Input (MemoryConsumer)
136        let mut consumer = MemoryConsumer::new_local("cmd_input", 10);
137        let input_channel = consumer.channel();
138
139        // 2. Setup Output (MemoryPublisher wrapped by CommandPublisher)
140        let memory_publisher = MemoryPublisher::new_local("cmd_output", 10);
141        let output_channel = memory_publisher.channel();
142
143        // 3. Create Publisher Middleware with inline handler
144        let publisher =
145            CommandPublisher::new(memory_publisher, |msg: CanonicalMessage| async move {
146                let payload = String::from_utf8_lossy(&msg.payload);
147                let response = format!("processed_{}", payload);
148                Ok(Handled::Publish(response.into()))
149            });
150
151        // 4. Inject message into input
152        input_channel
153            .send_message("test_data".into())
154            .await
155            .unwrap();
156
157        // 5. Simulate Bridge Loop (Consume -> Publish)
158        let received = consumer.receive().await.unwrap();
159        let result = publisher.send(received.message).await.unwrap();
160
161        // 6. Verify
162        assert!(matches!(result, Sent::Ack));
163
164        let output_msgs = output_channel.drain_messages();
165        assert_eq!(output_msgs.len(), 1);
166        assert_eq!(output_msgs[0].payload.to_vec(), b"processed_test_data");
167
168        let _ = (received.commit)(crate::traits::MessageDisposition::Ack).await;
169    }
170
171    #[tokio::test(flavor = "multi_thread")]
172    async fn test_command_handler_with_route_config() {
173        use crate::models::{Endpoint, Route};
174
175        let success = Arc::new(AtomicBool::new(false));
176        let success_clone = success.clone();
177
178        // 1. Define Handler
179        let handler = move |mut msg: CanonicalMessage| {
180            success_clone.store(true, Ordering::SeqCst);
181            msg.set_payload_str(format!("modified {}", msg.get_payload_str()));
182            async move { Ok(Handled::Publish(msg)) }
183        };
184        // 2. Define Route
185        let route = Route::new(
186            Endpoint::new_memory("route_in", 100),
187            Endpoint::new_memory("route_out", 100),
188        )
189        .with_handler(handler);
190
191        // 3. Deploy Route
192        route.deploy("command_handler_test_route").await.unwrap();
193
194        // 4. Inject Data
195        let input_channel = route.input.channel().unwrap();
196        input_channel.send_message("hello".into()).await.unwrap();
197
198        // 5. Verify
199        let mut verifier = route.connect_to_output("verifier").await.unwrap();
200        let received = verifier.receive().await.unwrap();
201        assert_eq!(received.message.get_payload_str(), "modified hello");
202        assert!(success.load(Ordering::SeqCst));
203        Route::stop("command_handler_test_route").await;
204    }
205
206    #[tokio::test]
207    async fn test_command_handler_inner_publisher_failure() {
208        use crate::traits::MessagePublisher;
209
210        struct FailPublisher;
211        #[async_trait]
212        impl MessagePublisher for FailPublisher {
213            async fn send(&self, _msg: CanonicalMessage) -> Result<Sent, PublisherError> {
214                Err(PublisherError::NonRetryable(anyhow::anyhow!("inner fail")))
215            }
216            async fn send_batch(
217                &self,
218                _msgs: Vec<CanonicalMessage>,
219            ) -> Result<SentBatch, PublisherError> {
220                Ok(SentBatch::Ack)
221            }
222            fn as_any(&self) -> &dyn std::any::Any {
223                self
224            }
225        }
226
227        let handler = |msg: CanonicalMessage| async move { Ok(Handled::Publish(msg)) };
228        let publisher = CommandPublisher::new(FailPublisher, handler);
229        let result = publisher.send("test".into()).await;
230        assert!(result.is_err());
231        assert!(result.unwrap_err().to_string().contains("inner fail"));
232    }
233}