Skip to main content

camel_component_api/
consumer.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::{mpsc, oneshot};
5use tokio::task::JoinHandle;
6use tokio_util::sync::CancellationToken;
7
8use camel_api::security_policy::SecurityPolicy;
9use camel_api::{CamelError, Exchange};
10use camel_auth::{CredentialSource, TokenAuthenticator};
11
12/// A message sent from a consumer to the route pipeline.
13///
14/// Fire-and-forget exchanges use `reply_tx = None`.
15/// Request-reply exchanges (e.g. `direct:`) provide a `reply_tx` so the
16/// pipeline result can be sent back to the consumer.
17pub struct ExchangeEnvelope {
18    pub exchange: Exchange,
19    pub reply_tx: Option<oneshot::Sender<Result<Exchange, CamelError>>>,
20}
21
22/// Context provided to a Consumer, allowing it to send exchanges into the route.
23#[derive(Clone)]
24pub struct ConsumerContext {
25    sender: mpsc::Sender<ExchangeEnvelope>,
26    cancel_token: CancellationToken,
27    route_id: String,
28}
29
30impl ConsumerContext {
31    /// Create a new consumer context wrapping the given channel sender.
32    ///
33    /// The `route_id` identifies the route this consumer is bound to, enabling
34    /// ADR-0012 per-route metrics and health observations.
35    pub fn new(
36        sender: mpsc::Sender<ExchangeEnvelope>,
37        cancel_token: CancellationToken,
38        route_id: String,
39    ) -> Self {
40        Self {
41            sender,
42            cancel_token,
43            route_id,
44        }
45    }
46
47    /// Returns a future that resolves when shutdown is requested.
48    /// Use in `tokio::select!` inside consumer loops.
49    pub async fn cancelled(&self) {
50        self.cancel_token.cancelled().await
51    }
52
53    /// Returns true if shutdown has been requested.
54    pub fn is_cancelled(&self) -> bool {
55        self.cancel_token.is_cancelled()
56    }
57
58    /// Returns the route_id this consumer is bound to.
59    ///
60    /// Available for ADR-0012 metrics/health calls that require a route_id
61    /// (categories (b′), (e), (g)). Set at construction time by the route
62    /// controller when spawning the consumer task.
63    pub fn route_id(&self) -> &str {
64        &self.route_id
65    }
66
67    /// Returns a clone of the `CancellationToken`.
68    ///
69    /// Useful for consumers that spawn per-request tasks and need to propagate
70    /// shutdown to each task. See `HttpConsumer` for an example.
71    pub fn cancel_token(&self) -> CancellationToken {
72        self.cancel_token.clone()
73    }
74
75    /// Returns a clone of the channel sender for manual exchange submission.
76    ///
77    /// Useful for consumers that spawn per-request tasks (e.g., `HttpConsumer`)
78    /// where each task independently sends exchanges into the pipeline.
79    /// For simple consumers, prefer `send()` or `send_and_wait()` instead.
80    pub fn sender(&self) -> mpsc::Sender<ExchangeEnvelope> {
81        self.sender.clone()
82    }
83
84    /// Send an exchange into the route pipeline (fire-and-forget).
85    pub async fn send(&self, exchange: Exchange) -> Result<(), CamelError> {
86        self.sender
87            .send(ExchangeEnvelope {
88                exchange,
89                reply_tx: None,
90            })
91            .await
92            .map_err(|_| CamelError::ChannelClosed)
93    }
94
95    /// Send an exchange and wait for the pipeline result (request-reply).
96    ///
97    /// Returns `Ok(exchange)` on success or `Err(e)` if the pipeline failed
98    /// without an error handler absorbing the error.
99    pub async fn send_and_wait(&self, exchange: Exchange) -> Result<Exchange, CamelError> {
100        let (reply_tx, reply_rx) = oneshot::channel();
101        self.sender
102            .send(ExchangeEnvelope {
103                exchange,
104                reply_tx: Some(reply_tx),
105            })
106            .await
107            .map_err(|_| CamelError::ChannelClosed)?;
108        reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
109    }
110}
111
112/// Security context passed to a consumer before `start()`.
113///
114/// Carries the `SecurityPolicy` and `TokenAuthenticator` from the route
115/// controller so consumers (e.g. WebSocket) can register auth state
116/// before accepting connections.
117pub struct SecurityContext {
118    pub policy: Arc<dyn SecurityPolicy>,
119    pub authenticator: Arc<dyn TokenAuthenticator>,
120    pub credential_sources: Vec<CredentialSource>,
121}
122
123impl SecurityContext {
124    pub fn new(
125        policy: impl SecurityPolicy + 'static,
126        authenticator: Arc<dyn TokenAuthenticator>,
127    ) -> Self {
128        Self {
129            policy: Arc::new(policy),
130            authenticator,
131            credential_sources: vec![CredentialSource::AuthorizationHeader],
132        }
133    }
134
135    pub fn from_arc(
136        policy: Arc<dyn SecurityPolicy>,
137        authenticator: Arc<dyn TokenAuthenticator>,
138    ) -> Self {
139        Self {
140            policy,
141            authenticator,
142            credential_sources: vec![CredentialSource::AuthorizationHeader],
143        }
144    }
145
146    pub fn with_credential_sources(mut self, sources: Vec<CredentialSource>) -> Self {
147        self.credential_sources = sources;
148        self
149    }
150}
151
152impl Clone for SecurityContext {
153    fn clone(&self) -> Self {
154        Self {
155            policy: Arc::clone(&self.policy),
156            authenticator: Arc::clone(&self.authenticator),
157            credential_sources: self.credential_sources.clone(),
158        }
159    }
160}
161
162impl std::fmt::Debug for SecurityContext {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("SecurityContext")
165            .field("policy", &"<SecurityPolicy>")
166            .field("authenticator", &"<TokenAuthenticator>")
167            .field("credential_sources", &self.credential_sources)
168            .finish()
169    }
170}
171
172/// How a consumer's exchanges should be processed by the pipeline.
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum ConcurrencyModel {
175    /// Exchanges are processed one at a time, in order. Default for polling
176    /// consumers (timer, file) and synchronous consumers (direct).
177    Sequential,
178    /// Exchanges are processed concurrently via `tokio::spawn`. Optional
179    /// semaphore limit (`max`). `None` means unbounded (channel buffer is
180    /// the only backpressure).
181    Concurrent { max: Option<usize> },
182}
183
184/// A Consumer receives data from an external system and submits Exchanges
185/// to the Route's Pipeline via the [`ConsumerContext`].
186///
187/// # Shutdown Contract
188///
189/// The Runtime guarantees the following lifecycle:
190///
191/// 1. `start()` is called once. The Runtime spawns a task that owns the Consumer.
192/// 2. On route stop, the Runtime cancels the [`ConsumerContext`] cancel token.
193/// 3. The spawned task calls `stop()` on ALL exit paths after `start()` succeeds
194///    (clean exit, crash, cancellation, natural completion).
195/// 4. `background_task_handle()` is a supervision hook for crash propagation (ADR-0007),
196///    NOT the shutdown API.
197///
198/// Component authors MUST ensure:
199///
200/// - `stop()` cancels all component-owned inner tasks and cleans up registrations/resources.
201/// - If inner tasks use a private `CancellationToken`, `stop()` MUST cancel it.
202/// - Best practice: inner tasks should use the [`ConsumerContext`] cancel token (or a child)
203///   so they respond to runtime shutdown without waiting for `stop()`.
204/// - If using a private token, `stop()` must cancel it to ensure prompt cleanup.
205/// - `background_task_handle()` returns the `JoinHandle` of the primary background task,
206///   if any. The Runtime monitors this handle for unexpected exits (crash propagation).
207#[async_trait]
208pub trait Consumer: Send + Sync {
209    /// Start consuming messages, sending them through the provided context.
210    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError>;
211
212    /// Stop consuming messages and clean up all resources.
213    ///
214    /// Called by the Runtime on every exit path after `start()` succeeds.
215    /// See the [Shutdown Contract](#shutdown-contract) above.
216    async fn stop(&mut self) -> Result<(), CamelError>;
217
218    /// Temporarily suspend consuming messages without fully stopping.
219    ///
220    /// Default: no-op, returns `Ok(())`.
221    async fn suspend(&self) -> Result<(), CamelError> {
222        Ok(())
223    }
224
225    /// Resume consuming after a previous suspension.
226    ///
227    /// Default: no-op, returns `Ok(())`.
228    async fn resume(&self) -> Result<(), CamelError> {
229        Ok(())
230    }
231
232    /// Declares this consumer's natural concurrency model.
233    ///
234    /// The runtime uses this to decide whether to process exchanges
235    /// sequentially or spawn per-exchange. Consumers that accept inbound
236    /// connections (HTTP, WebSocket, Kafka) should override this to return
237    /// `ConcurrencyModel::Concurrent`.
238    ///
239    /// Default: `Sequential`.
240    fn concurrency_model(&self) -> ConcurrencyModel {
241        ConcurrencyModel::Sequential
242    }
243
244    /// Return a handle to the consumer's long-running background task so the
245    /// runtime can monitor it for unexpected exits after `start()` returns `Ok`.
246    ///
247    /// Default: `None` — consumer's work completes entirely within `start()`.
248    /// Override: return `Some(handle)` if `start()` spawns a detached task.
249    ///
250    /// **Contract:** the task must observe `ConsumerContext::cancelled()` so
251    /// runtime shutdown is distinguishable from crash exits.
252    ///
253    /// This method is called at most once; implementations should use `.take()`
254    /// to transfer ownership of the handle.
255    fn background_task_handle(&mut self) -> Option<JoinHandle<Result<(), CamelError>>> {
256        None
257    }
258
259    /// Set the security context for this consumer.
260    ///
261    /// Called by the route controller before `start()` so the consumer
262    /// can register auth state (e.g. WebSocket auth in `WsAppState`).
263    ///
264    /// Default: no-op, returns `Ok(())`.
265    fn set_security_context(&mut self, _ctx: SecurityContext) {}
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271
272    #[test]
273    fn consumer_context_exposes_route_id() {
274        let (tx, _rx) = mpsc::channel(1);
275        let ctx = ConsumerContext::new(tx, CancellationToken::new(), "test-route".to_string());
276        assert_eq!(ctx.route_id(), "test-route");
277    }
278
279    #[tokio::test]
280    async fn test_consumer_context_cancelled() {
281        let (tx, _rx) = mpsc::channel(16);
282        let token = CancellationToken::new();
283        let ctx = ConsumerContext::new(tx, token.clone(), "test-route".to_string());
284
285        assert!(!ctx.is_cancelled());
286        token.cancel();
287        ctx.cancelled().await;
288        assert!(ctx.is_cancelled());
289    }
290
291    #[test]
292    fn test_concurrency_model_default_is_sequential() {
293        use super::ConcurrencyModel;
294
295        struct DummyConsumer;
296
297        #[async_trait::async_trait]
298        impl super::Consumer for DummyConsumer {
299            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
300                Ok(())
301            }
302            async fn stop(&mut self) -> Result<(), CamelError> {
303                Ok(())
304            }
305        }
306
307        let consumer = DummyConsumer;
308        assert_eq!(consumer.concurrency_model(), ConcurrencyModel::Sequential);
309    }
310
311    #[test]
312    fn test_concurrency_model_concurrent_override() {
313        use super::ConcurrencyModel;
314
315        struct ConcurrentConsumer;
316
317        #[async_trait::async_trait]
318        impl super::Consumer for ConcurrentConsumer {
319            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
320                Ok(())
321            }
322            async fn stop(&mut self) -> Result<(), CamelError> {
323                Ok(())
324            }
325            fn concurrency_model(&self) -> ConcurrencyModel {
326                ConcurrencyModel::Concurrent { max: Some(16) }
327            }
328        }
329
330        let consumer = ConcurrentConsumer;
331        assert_eq!(
332            consumer.concurrency_model(),
333            ConcurrencyModel::Concurrent { max: Some(16) }
334        );
335    }
336
337    #[tokio::test]
338    async fn test_consumer_default_suspend_resume() {
339        struct DummyConsumer;
340
341        #[async_trait::async_trait]
342        impl super::Consumer for DummyConsumer {
343            async fn start(&mut self, _ctx: super::ConsumerContext) -> Result<(), CamelError> {
344                Ok(())
345            }
346            async fn stop(&mut self) -> Result<(), CamelError> {
347                Ok(())
348            }
349        }
350
351        let consumer = DummyConsumer;
352        assert!(consumer.suspend().await.is_ok());
353        assert!(consumer.resume().await.is_ok());
354    }
355
356    // --- SecurityContext tests ---
357
358    struct StubPolicy;
359
360    #[async_trait::async_trait]
361    impl SecurityPolicy for StubPolicy {
362        async fn evaluate(
363            &self,
364            _exchange: &mut Exchange,
365        ) -> Result<camel_api::security_policy::AuthorizationDecision, CamelError> {
366            Ok(camel_api::security_policy::AuthorizationDecision::Granted {
367                principal: camel_api::security_policy::Principal {
368                    subject: "stub".into(),
369                    issuer: "stub".into(),
370                    audience: vec![],
371                    scopes: vec![],
372                    roles: vec![],
373                    claims: serde_json::json!({}),
374                },
375            })
376        }
377    }
378
379    struct StubAuthenticator;
380
381    #[async_trait::async_trait]
382    impl camel_auth::TokenAuthenticator for StubAuthenticator {
383        async fn authenticate_bearer(
384            &self,
385            _token: &str,
386        ) -> Result<camel_api::security_policy::Principal, CamelError> {
387            Ok(camel_api::security_policy::Principal {
388                subject: "stub".into(),
389                issuer: "stub".into(),
390                audience: vec![],
391                scopes: vec![],
392                roles: vec![],
393                claims: serde_json::json!({}),
394            })
395        }
396    }
397
398    #[test]
399    fn test_security_context_new() {
400        let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
401        assert!(Arc::strong_count(&ctx.policy) == 1);
402        assert!(Arc::strong_count(&ctx.authenticator) == 1);
403        assert_eq!(
404            ctx.credential_sources,
405            vec![camel_auth::CredentialSource::AuthorizationHeader]
406        );
407    }
408
409    #[test]
410    fn test_security_context_from_arc() {
411        let policy: Arc<dyn SecurityPolicy> = Arc::new(StubPolicy);
412        let authenticator: Arc<dyn camel_auth::TokenAuthenticator> = Arc::new(StubAuthenticator);
413        let ctx = SecurityContext::from_arc(Arc::clone(&policy), Arc::clone(&authenticator));
414        assert!(Arc::ptr_eq(&ctx.policy, &policy));
415        assert!(Arc::ptr_eq(&ctx.authenticator, &authenticator));
416        assert_eq!(
417            ctx.credential_sources,
418            vec![camel_auth::CredentialSource::AuthorizationHeader]
419        );
420    }
421
422    #[test]
423    fn test_security_context_clone_independent() {
424        let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
425        let cloned = ctx.clone();
426        assert!(Arc::ptr_eq(&ctx.policy, &cloned.policy));
427        assert!(Arc::ptr_eq(&ctx.authenticator, &cloned.authenticator));
428        assert_eq!(ctx.credential_sources, cloned.credential_sources);
429    }
430
431    #[test]
432    fn test_security_context_debug_redacts_traits() {
433        let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator));
434        let debug_str = format!("{ctx:?}");
435        assert!(debug_str.contains("<SecurityPolicy>"));
436        assert!(debug_str.contains("<TokenAuthenticator>"));
437        assert!(debug_str.contains("credential_sources"));
438    }
439
440    #[test]
441    fn test_security_context_with_credential_sources() {
442        let ctx = SecurityContext::new(StubPolicy, Arc::new(StubAuthenticator))
443            .with_credential_sources(vec![
444                camel_auth::CredentialSource::Cookie {
445                    name: "session".into(),
446                },
447                camel_auth::CredentialSource::AuthorizationHeader,
448            ]);
449        assert_eq!(ctx.credential_sources.len(), 2);
450        assert!(matches!(
451            &ctx.credential_sources[0],
452            camel_auth::CredentialSource::Cookie { .. }
453        ));
454    }
455}