camel_component_api/
consumer.rs1use async_trait::async_trait;
2use tokio::sync::{mpsc, oneshot};
3use tokio_util::sync::CancellationToken;
4
5use camel_api::{CamelError, Exchange};
6
7pub struct ExchangeEnvelope {
13 pub exchange: Exchange,
14 pub reply_tx: Option<oneshot::Sender<Result<Exchange, CamelError>>>,
15}
16
17#[derive(Clone)]
19pub struct ConsumerContext {
20 sender: mpsc::Sender<ExchangeEnvelope>,
21 cancel_token: CancellationToken,
22}
23
24impl ConsumerContext {
25 pub fn new(sender: mpsc::Sender<ExchangeEnvelope>, cancel_token: CancellationToken) -> Self {
27 Self {
28 sender,
29 cancel_token,
30 }
31 }
32
33 pub async fn cancelled(&self) {
36 self.cancel_token.cancelled().await
37 }
38
39 pub fn is_cancelled(&self) -> bool {
41 self.cancel_token.is_cancelled()
42 }
43
44 pub fn cancel_token(&self) -> CancellationToken {
49 self.cancel_token.clone()
50 }
51
52 pub fn sender(&self) -> mpsc::Sender<ExchangeEnvelope> {
58 self.sender.clone()
59 }
60
61 pub async fn send(&self, exchange: Exchange) -> Result<(), CamelError> {
63 self.sender
64 .send(ExchangeEnvelope {
65 exchange,
66 reply_tx: None,
67 })
68 .await
69 .map_err(|_| CamelError::ChannelClosed)
70 }
71
72 pub async fn send_and_wait(&self, exchange: Exchange) -> Result<Exchange, CamelError> {
77 let (reply_tx, reply_rx) = oneshot::channel();
78 self.sender
79 .send(ExchangeEnvelope {
80 exchange,
81 reply_tx: Some(reply_tx),
82 })
83 .await
84 .map_err(|_| CamelError::ChannelClosed)?;
85 reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
86 }
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
91pub enum ConcurrencyModel {
92 Sequential,
95 Concurrent { max: Option<usize> },
99}
100
101#[async_trait]
103pub trait Consumer: Send + Sync {
104 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError>;
106
107 async fn stop(&mut self) -> Result<(), CamelError>;
109
110 async fn suspend(&self) -> Result<(), CamelError> {
114 Ok(())
115 }
116
117 async fn resume(&self) -> Result<(), CamelError> {
121 Ok(())
122 }
123
124 fn concurrency_model(&self) -> ConcurrencyModel {
133 ConcurrencyModel::Sequential
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[tokio::test]
142 async fn test_consumer_context_cancelled() {
143 let (tx, _rx) = mpsc::channel(16);
144 let token = CancellationToken::new();
145 let ctx = ConsumerContext::new(tx, token.clone());
146
147 assert!(!ctx.is_cancelled());
148 token.cancel();
149 ctx.cancelled().await;
150 assert!(ctx.is_cancelled());
151 }
152
153 #[test]
154 fn test_concurrency_model_default_is_sequential() {
155 use super::ConcurrencyModel;
156
157 struct DummyConsumer;
158
159 #[async_trait::async_trait]
160 impl super::Consumer for DummyConsumer {
161 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
162 Ok(())
163 }
164 async fn stop(&mut self) -> Result<(), CamelError> {
165 Ok(())
166 }
167 }
168
169 let consumer = DummyConsumer;
170 assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
171 }
172
173 #[test]
174 fn test_concurrency_model_concurrent_override() {
175 use super::ConcurrencyModel;
176
177 struct ConcurrentConsumer;
178
179 #[async_trait::async_trait]
180 impl super::Consumer for ConcurrentConsumer {
181 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
182 Ok(())
183 }
184 async fn stop(&mut self) -> Result<(), CamelError> {
185 Ok(())
186 }
187 fn concurrency_model(&self) -> ConcurrencyModel {
188 ConcurrencyModel::Concurrent { max: Some(16) }
189 }
190 }
191
192 let consumer = ConcurrentConsumer;
193 assert_eq!(
194 consumer.concurrency_model(),
195 ConcurrencyModel::Concurrent { max: Some(16) }
196 );
197 }
198
199 #[tokio::test]
200 async fn test_consumer_default_suspend_resume() {
201 struct DummyConsumer;
202
203 #[async_trait::async_trait]
204 impl super::Consumer for DummyConsumer {
205 async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
206 Ok(())
207 }
208 async fn stop(&mut self) -> Result<(), CamelError> {
209 Ok(())
210 }
211 }
212
213 let consumer = DummyConsumer;
214 assert!(consumer.suspend().await.is_ok());
215 assert!(consumer.resume().await.is_ok());
216 }
217}