Skip to main content

camel_component_api/
consumer.rs

1use async_trait::async_trait;
2use tokio::sync::{mpsc, oneshot};
3use tokio_util::sync::CancellationToken;
4
5use camel_api::{CamelError, Exchange};
6
7/// A message sent from a consumer to the route pipeline.
8///
9/// Fire-and-forget exchanges use `reply_tx = None`.
10/// Request-reply exchanges (e.g. `direct:`) provide a `reply_tx` so the
11/// pipeline result can be sent back to the consumer.
12pub struct ExchangeEnvelope {
13    pub exchange: Exchange,
14    pub reply_tx: Option<oneshot::Sender<Result<Exchange, CamelError>>>,
15}
16
17/// Context provided to a Consumer, allowing it to send exchanges into the route.
18#[derive(Clone)]
19pub struct ConsumerContext {
20    sender: mpsc::Sender<ExchangeEnvelope>,
21    cancel_token: CancellationToken,
22}
23
24impl ConsumerContext {
25    /// Create a new consumer context wrapping the given channel sender.
26    pub fn new(sender: mpsc::Sender<ExchangeEnvelope>, cancel_token: CancellationToken) -> Self {
27        Self {
28            sender,
29            cancel_token,
30        }
31    }
32
33    /// Returns a future that resolves when shutdown is requested.
34    /// Use in `tokio::select!` inside consumer loops.
35    pub async fn cancelled(&self) {
36        self.cancel_token.cancelled().await
37    }
38
39    /// Returns true if shutdown has been requested.
40    pub fn is_cancelled(&self) -> bool {
41        self.cancel_token.is_cancelled()
42    }
43
44    /// Returns a clone of the `CancellationToken`.
45    ///
46    /// Useful for consumers that spawn per-request tasks and need to propagate
47    /// shutdown to each task. See `HttpConsumer` for an example.
48    pub fn cancel_token(&self) -> CancellationToken {
49        self.cancel_token.clone()
50    }
51
52    /// Returns a clone of the channel sender for manual exchange submission.
53    ///
54    /// Useful for consumers that spawn per-request tasks (e.g., `HttpConsumer`)
55    /// where each task independently sends exchanges into the pipeline.
56    /// For simple consumers, prefer `send()` or `send_and_wait()` instead.
57    pub fn sender(&self) -> mpsc::Sender<ExchangeEnvelope> {
58        self.sender.clone()
59    }
60
61    /// Send an exchange into the route pipeline (fire-and-forget).
62    pub async fn send(&self, exchange: Exchange) -> Result<(), CamelError> {
63        self.sender
64            .send(ExchangeEnvelope {
65                exchange,
66                reply_tx: None,
67            })
68            .await
69            .map_err(|_| CamelError::ChannelClosed)
70    }
71
72    /// Send an exchange and wait for the pipeline result (request-reply).
73    ///
74    /// Returns `Ok(exchange)` on success or `Err(e)` if the pipeline failed
75    /// without an error handler absorbing the error.
76    pub async fn send_and_wait(&self, exchange: Exchange) -> Result<Exchange, CamelError> {
77        let (reply_tx, reply_rx) = oneshot::channel();
78        self.sender
79            .send(ExchangeEnvelope {
80                exchange,
81                reply_tx: Some(reply_tx),
82            })
83            .await
84            .map_err(|_| CamelError::ChannelClosed)?;
85        reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
86    }
87}
88
89/// How a consumer's exchanges should be processed by the pipeline.
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub enum ConcurrencyModel {
92    /// Exchanges are processed one at a time, in order. Default for polling
93    /// consumers (timer, file) and synchronous consumers (direct).
94    Sequential,
95    /// Exchanges are processed concurrently via `tokio::spawn`. Optional
96    /// semaphore limit (`max`). `None` means unbounded (channel buffer is
97    /// the only backpressure).
98    Concurrent { max: Option<usize> },
99}
100
101/// A Consumer receives messages from an external source and feeds them into the route.
102#[async_trait]
103pub trait Consumer: Send + Sync {
104    /// Start consuming messages, sending them through the provided context.
105    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError>;
106
107    /// Stop consuming messages.
108    async fn stop(&mut self) -> Result<(), CamelError>;
109
110    /// Temporarily suspend consuming messages without fully stopping.
111    ///
112    /// Default: no-op, returns `Ok(())`.
113    async fn suspend(&self) -> Result<(), CamelError> {
114        Ok(())
115    }
116
117    /// Resume consuming after a previous suspension.
118    ///
119    /// Default: no-op, returns `Ok(())`.
120    async fn resume(&self) -> Result<(), CamelError> {
121        Ok(())
122    }
123
124    /// Declares this consumer's natural concurrency model.
125    ///
126    /// The runtime uses this to decide whether to process exchanges
127    /// sequentially or spawn per-exchange. Consumers that accept inbound
128    /// connections (HTTP, WebSocket, Kafka) should override this to return
129    /// `ConcurrencyModel::Concurrent`.
130    ///
131    /// Default: `Sequential`.
132    fn concurrency_model(&self) -> ConcurrencyModel {
133        ConcurrencyModel::Sequential
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[tokio::test]
142    async fn test_consumer_context_cancelled() {
143        let (tx, _rx) = mpsc::channel(16);
144        let token = CancellationToken::new();
145        let ctx = ConsumerContext::new(tx, token.clone());
146
147        assert!(!ctx.is_cancelled());
148        token.cancel();
149        ctx.cancelled().await;
150        assert!(ctx.is_cancelled());
151    }
152
153    #[test]
154    fn test_concurrency_model_default_is_sequential() {
155        use super::ConcurrencyModel;
156
157        struct DummyConsumer;
158
159        #[async_trait::async_trait]
160        impl super::Consumer for DummyConsumer {
161            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
162                Ok(())
163            }
164            async fn stop(&mut self) -> Result<(), CamelError> {
165                Ok(())
166            }
167        }
168
169        let consumer = DummyConsumer;
170        assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
171    }
172
173    #[test]
174    fn test_concurrency_model_concurrent_override() {
175        use super::ConcurrencyModel;
176
177        struct ConcurrentConsumer;
178
179        #[async_trait::async_trait]
180        impl super::Consumer for ConcurrentConsumer {
181            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
182                Ok(())
183            }
184            async fn stop(&mut self) -> Result<(), CamelError> {
185                Ok(())
186            }
187            fn concurrency_model(&self) -> ConcurrencyModel {
188                ConcurrencyModel::Concurrent { max: Some(16) }
189            }
190        }
191
192        let consumer = ConcurrentConsumer;
193        assert_eq!(
194            consumer.concurrency_model(),
195            ConcurrencyModel::Concurrent { max: Some(16) }
196        );
197    }
198
199    #[tokio::test]
200    async fn test_consumer_default_suspend_resume() {
201        struct DummyConsumer;
202
203        #[async_trait::async_trait]
204        impl super::Consumer for DummyConsumer {
205            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
206                Ok(())
207            }
208            async fn stop(&mut self) -> Result<(), CamelError> {
209                Ok(())
210            }
211        }
212
213        let consumer = DummyConsumer;
214        assert!(consumer.suspend().await.is_ok());
215        assert!(consumer.resume().await.is_ok());
216    }
217}