Skip to main content

camel_component_api/
consumer.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::{mpsc, oneshot};
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use camel_api::security_policy::SecurityPolicy;
9use camel_api::{CamelError, Exchange};
10use camel_auth::{CredentialSource, TokenAuthenticator};
11
12/// A message sent from a consumer to the route pipeline.
13///
14/// Fire-and-forget exchanges use `reply_tx = None`.
15/// Request-reply exchanges (e.g. `direct:`) provide a `reply_tx` so the
16/// pipeline result can be sent back to the consumer.
17pub struct ExchangeEnvelope {
18    pub exchange: Exchange,
19    pub reply_tx: Option<oneshot::Sender<Result<Exchange, CamelError>>>,
20}
21
22/// Context provided to a Consumer, allowing it to send exchanges into the route.
23#[derive(Clone)]
24pub struct ConsumerContext {
25    sender: mpsc::Sender<ExchangeEnvelope>,
26    cancel_token: CancellationToken,
27}
28
29impl ConsumerContext {
30    /// Create a new consumer context wrapping the given channel sender.
31    pub fn new(sender: mpsc::Sender<ExchangeEnvelope>, cancel_token: CancellationToken) -> Self {
32        Self {
33            sender,
34            cancel_token,
35        }
36    }
37
38    /// Returns a future that resolves when shutdown is requested.
39    /// Use in `tokio::select!` inside consumer loops.
40    pub async fn cancelled(&self) {
41        self.cancel_token.cancelled().await
42    }
43
44    /// Returns true if shutdown has been requested.
45    pub fn is_cancelled(&self) -> bool {
46        self.cancel_token.is_cancelled()
47    }
48
49    /// Returns a clone of the `CancellationToken`.
50    ///
51    /// Useful for consumers that spawn per-request tasks and need to propagate
52    /// shutdown to each task. See `HttpConsumer` for an example.
53    pub fn cancel_token(&self) -> CancellationToken {
54        self.cancel_token.clone()
55    }
56
57    /// Returns a clone of the channel sender for manual exchange submission.
58    ///
59    /// Useful for consumers that spawn per-request tasks (e.g., `HttpConsumer`)
60    /// where each task independently sends exchanges into the pipeline.
61    /// For simple consumers, prefer `send()` or `send_and_wait()` instead.
62    pub fn sender(&self) -> mpsc::Sender<ExchangeEnvelope> {
63        self.sender.clone()
64    }
65
66    /// Send an exchange into the route pipeline (fire-and-forget).
67    pub async fn send(&self, exchange: Exchange) -> Result<(), CamelError> {
68        self.sender
69            .send(ExchangeEnvelope {
70                exchange,
71                reply_tx: None,
72            })
73            .await
74            .map_err(|_| CamelError::ChannelClosed)
75    }
76
77    /// Send an exchange and wait for the pipeline result (request-reply).
78    ///
79    /// Returns `Ok(exchange)` on success or `Err(e)` if the pipeline failed
80    /// without an error handler absorbing the error.
81    pub async fn send_and_wait(&self, exchange: Exchange) -> Result<Exchange, CamelError> {
82        let (reply_tx, reply_rx) = oneshot::channel();
83        self.sender
84            .send(ExchangeEnvelope {
85                exchange,
86                reply_tx: Some(reply_tx),
87            })
88            .await
89            .map_err(|_| CamelError::ChannelClosed)?;
90        reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
91    }
92}
93
94/// Security context passed to a consumer before `start()`.
95///
96/// Carries the `SecurityPolicy` and `TokenAuthenticator` from the route
97/// controller so consumers (e.g. WebSocket) can register auth state
98/// before accepting connections.
99pub struct SecurityContext {
100    pub policy: Arc<dyn SecurityPolicy>,
101    pub authenticator: Arc<dyn TokenAuthenticator>,
102    pub credential_sources: Vec<CredentialSource>,
103}
104
105impl SecurityContext {
106    pub fn new(
107        policy: impl SecurityPolicy + 'static,
108        authenticator: Arc<dyn TokenAuthenticator>,
109    ) -> Self {
110        Self {
111            policy: Arc::new(policy),
112            authenticator,
113            credential_sources: vec![CredentialSource::AuthorizationHeader],
114        }
115    }
116
117    pub fn from_arc(
118        policy: Arc<dyn SecurityPolicy>,
119        authenticator: Arc<dyn TokenAuthenticator>,
120    ) -> Self {
121        Self {
122            policy,
123            authenticator,
124            credential_sources: vec![CredentialSource::AuthorizationHeader],
125        }
126    }
127
128    pub fn with_credential_sources(mut self, sources: Vec<CredentialSource>) -> Self {
129        self.credential_sources = sources;
130        self
131    }
132}
133
134impl Clone for SecurityContext {
135    fn clone(&self) -> Self {
136        Self {
137            policy: Arc::clone(&self.policy),
138            authenticator: Arc::clone(&self.authenticator),
139            credential_sources: self.credential_sources.clone(),
140        }
141    }
142}
143
144impl std::fmt::Debug for SecurityContext {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct("SecurityContext")
147            .field("policy", &"<SecurityPolicy>")
148            .field("authenticator", &"<TokenAuthenticator>")
149            .field("credential_sources", &self.credential_sources)
150            .finish()
151    }
152}
153
154/// How a consumer's exchanges should be processed by the pipeline.
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub enum ConcurrencyModel {
157    /// Exchanges are processed one at a time, in order. Default for polling
158    /// consumers (timer, file) and synchronous consumers (direct).
159    Sequential,
160    /// Exchanges are processed concurrently via `tokio::spawn`. Optional
161    /// semaphore limit (`max`). `None` means unbounded (channel buffer is
162    /// the only backpressure).
163    Concurrent { max: Option<usize> },
164}
165
166/// A Consumer receives data from an external system and submits Exchanges
167/// to the Route's Pipeline via the [`ConsumerContext`].
168///
169/// # Shutdown Contract
170///
171/// The Runtime guarantees the following lifecycle:
172///
173/// 1. `start()` is called once. The Runtime spawns a task that owns the Consumer.
174/// 2. On route stop, the Runtime cancels the [`ConsumerContext`] cancel token.
175/// 3. The spawned task calls `stop()` on ALL exit paths after `start()` succeeds
176///    (clean exit, crash, cancellation, natural completion).
177/// 4. `background_task_handle()` is a supervision hook for crash propagation (ADR-0007),
178///    NOT the shutdown API.
179///
180/// Component authors MUST ensure:
181///
182/// - `stop()` cancels all component-owned inner tasks and cleans up registrations/resources.
183/// - If inner tasks use a private `CancellationToken`, `stop()` MUST cancel it.
184/// - Best practice: inner tasks should use the [`ConsumerContext`] cancel token (or a child)
185///   so they respond to runtime shutdown without waiting for `stop()`.
186/// - If using a private token, `stop()` must cancel it to ensure prompt cleanup.
187/// - `background_task_handle()` returns the `JoinHandle` of the primary background task,
188///   if any. The Runtime monitors this handle for unexpected exits (crash propagation).
189#[async_trait]
190pub trait Consumer: Send + Sync {
191    /// Start consuming messages, sending them through the provided context.
192    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError>;
193
194    /// Stop consuming messages and clean up all resources.
195    ///
196    /// Called by the Runtime on every exit path after `start()` succeeds.
197    /// See the [Shutdown Contract](#shutdown-contract) above.
198    async fn stop(&mut self) -> Result<(), CamelError>;
199
200    /// Temporarily suspend consuming messages without fully stopping.
201    ///
202    /// Default: no-op, returns `Ok(())`.
203    async fn suspend(&self) -> Result<(), CamelError> {
204        Ok(())
205    }
206
207    /// Resume consuming after a previous suspension.
208    ///
209    /// Default: no-op, returns `Ok(())`.
210    async fn resume(&self) -> Result<(), CamelError> {
211        Ok(())
212    }
213
214    /// Declares this consumer's natural concurrency model.
215    ///
216    /// The runtime uses this to decide whether to process exchanges
217    /// sequentially or spawn per-exchange. Consumers that accept inbound
218    /// connections (HTTP, WebSocket, Kafka) should override this to return
219    /// `ConcurrencyModel::Concurrent`.
220    ///
221    /// Default: `Sequential`.
222    fn concurrency_model(&self) -> ConcurrencyModel {
223        ConcurrencyModel::Sequential
224    }
225
226    /// Return a handle to the consumer's long-running background task so the
227    /// runtime can monitor it for unexpected exits after `start()` returns `Ok`.
228    ///
229    /// Default: `None` — consumer's work completes entirely within `start()`.
230    /// Override: return `Some(handle)` if `start()` spawns a detached task.
231    ///
232    /// **Contract:** the task must observe `ConsumerContext::cancelled()` so
233    /// runtime shutdown is distinguishable from crash exits.
234    ///
235    /// This method is called at most once; implementations should use `.take()`
236    /// to transfer ownership of the handle.
237    fn background_task_handle(&mut self) -> Option<JoinHandle<Result<(), CamelError>>> {
238        None
239    }
240
241    /// Set the security context for this consumer.
242    ///
243    /// Called by the route controller before `start()` so the consumer
244    /// can register auth state (e.g. WebSocket auth in `WsAppState`).
245    ///
246    /// Default: no-op, returns `Ok(())`.
247    fn set_security_context(&mut self, _ctx: SecurityContext) {}
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253
254    #[tokio::test]
255    async fn test_consumer_context_cancelled() {
256        let (tx, _rx) = mpsc::channel(16);
257        let token = CancellationToken::new();
258        let ctx = ConsumerContext::new(tx, token.clone());
259
260        assert!(!ctx.is_cancelled());
261        token.cancel();
262        ctx.cancelled().await;
263        assert!(ctx.is_cancelled());
264    }
265
266    #[test]
267    fn test_concurrency_model_default_is_sequential() {
268        use super::ConcurrencyModel;
269
270        struct DummyConsumer;
271
272        #[async_trait::async_trait]
273        impl super::Consumer for DummyConsumer {
274            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
275                Ok(())
276            }
277            async fn stop(&mut self) -> Result<(), CamelError> {
278                Ok(())
279            }
280        }
281
282        let consumer = DummyConsumer;
283        assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
284    }
285
286    #[test]
287    fn test_concurrency_model_concurrent_override() {
288        use super::ConcurrencyModel;
289
290        struct ConcurrentConsumer;
291
292        #[async_trait::async_trait]
293        impl super::Consumer for ConcurrentConsumer {
294            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
295                Ok(())
296            }
297            async fn stop(&mut self) -> Result<(), CamelError> {
298                Ok(())
299            }
300            fn concurrency_model(&self) -> ConcurrencyModel {
301                ConcurrencyModel::Concurrent { max: Some(16) }
302            }
303        }
304
305        let consumer = ConcurrentConsumer;
306        assert_eq!(
307            consumer.concurrency_model(),
308            ConcurrencyModel::Concurrent { max: Some(16) }
309        );
310    }
311
312    #[tokio::test]
313    async fn test_consumer_default_suspend_resume() {
314        struct DummyConsumer;
315
316        #[async_trait::async_trait]
317        impl super::Consumer for DummyConsumer {
318            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
319                Ok(())
320            }
321            async fn stop(&mut self) -> Result<(), CamelError> {
322                Ok(())
323            }
324        }
325
326        let consumer = DummyConsumer;
327        assert!(consumer.suspend().await.is_ok());
328        assert!(consumer.resume().await.is_ok());
329    }
330
331    // --- SecurityContext tests ---
332
333    struct StubPolicy;
334
335    #[async_trait::async_trait]
336    impl SecurityPolicy for StubPolicy {
337        async fn evaluate(
338            &self,
339            _exchange: &mut Exchange,
340        ) -> Result<camel_api::security_policy::AuthorizationDecision, CamelError> {
341            Ok(camel_api::security_policy::AuthorizationDecision::Granted {
342                principal: camel_api::security_policy::Principal {
343                    subject: "stub".into(),
344                    issuer: "stub".into(),
345                    audience: vec![],
346                    scopes: vec![],
347                    roles: vec![],
348                    claims: serde_json::json!({}),
349                },
350            })
351        }
352    }
353
354    struct StubAuthenticator;
355
356    #[async_trait::async_trait]
357    impl camel_auth::TokenAuthenticator for StubAuthenticator {
358        async fn authenticate_bearer(
359            &self,
360            _token: &str,
361        ) -> Result<camel_api::security_policy::Principal, CamelError> {
362            Ok(camel_api::security_policy::Principal {
363                subject: "stub".into(),
364                issuer: "stub".into(),
365                audience: vec![],
366                scopes: vec![],
367                roles: vec![],
368                claims: serde_json::json!({}),
369            })
370        }
371    }
372
373    #[test]
374    fn test_security_context_new() {
375        let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
376        assert!(Arc::strong_count(&ctx.policy) == 1);
377        assert!(Arc::strong_count(&ctx.authenticator) == 1);
378        assert_eq!(
379            ctx.credential_sources,
380            vec![camel_auth::CredentialSource::AuthorizationHeader]
381        );
382    }
383
384    #[test]
385    fn test_security_context_from_arc() {
386        let policy: Arc<dyn SecurityPolicy> = Arc::new(StubPolicy);
387        let authenticator: Arc<dyn camel_auth::TokenAuthenticator> = Arc::new(StubAuthenticator);
388        let ctx = SecurityContext::from_arc(Arc::clone(&policy), Arc::clone(&authenticator));
389        assert!(Arc::ptr_eq(&ctx.policy, &policy));
390        assert!(Arc::ptr_eq(&ctx.authenticator, &authenticator));
391        assert_eq!(
392            ctx.credential_sources,
393            vec![camel_auth::CredentialSource::AuthorizationHeader]
394        );
395    }
396
397    #[test]
398    fn test_security_context_clone_independent() {
399        let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
400        let cloned = ctx.clone();
401        assert!(Arc::ptr_eq(&ctx.policy, &cloned.policy));
402        assert!(Arc::ptr_eq(&ctx.authenticator, &cloned.authenticator));
403        assert_eq!(ctx.credential_sources, cloned.credential_sources);
404    }
405
406    #[test]
407    fn test_security_context_debug_redacts_traits() {
408        let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
409        let debug_str = format!("{ctx:?}");
410        assert!(debug_str.contains("<SecurityPolicy>"));
411        assert!(debug_str.contains("<TokenAuthenticator>"));
412        assert!(debug_str.contains("credential_sources"));
413    }
414
415    #[test]
416    fn test_security_context_with_credential_sources() {
417        let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator))
418            .with_credential_sources(vec![
419                camel_auth::CredentialSource::Cookie {
420                    name: "session".into(),
421                },
422                camel_auth::CredentialSource::AuthorizationHeader,
423            ]);
424        assert_eq!(ctx.credential_sources.len(), 2);
425        assert!(matches!(
426            &ctx.credential_sources[0],
427            camel_auth::CredentialSource::Cookie { .. }
428        ));
429    }
430}