Skip to main content

Consumer

Trait Consumer 

Source
pub trait Consumer: Send + Sync {
    // Required methods
    fn start<'life0, 'async_trait>(
        &'life0 mut self,
        context: ConsumerContext,
    ) -> Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;
    fn stop<'life0, 'async_trait>(
        &'life0 mut self,
    ) -> Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;

    // Provided methods
    fn suspend<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn resume<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait { ... }
    fn concurrency_model(&self) -> ConcurrencyModel { ... }
    fn background_task_handle(
        &mut self,
    ) -> Option<JoinHandle<Result<(), CamelError>>> { ... }
    fn set_security_context(&mut self, _ctx: SecurityContext) { ... }
}
Expand description

A Consumer receives data from an external system and submits Exchanges to the Route’s Pipeline via the ConsumerContext.

§Shutdown Contract

The Runtime guarantees the following lifecycle:

  1. start() is called once. The Runtime spawns a task that owns the Consumer.
  2. On route stop, the Runtime cancels the ConsumerContext cancel token.
  3. The spawned task calls stop() on ALL exit paths after start() succeeds (clean exit, crash, cancellation, natural completion).
  4. background_task_handle() is a supervision hook for crash propagation (ADR-0007), NOT the shutdown API.

Component authors MUST ensure:

  • stop() cancels all component-owned inner tasks and cleans up registrations/resources.
  • If inner tasks use a private CancellationToken, stop() MUST cancel it.
  • Best practice: inner tasks should use the ConsumerContext cancel token (or a child) so they respond to runtime shutdown without waiting for stop().
  • If using a private token, stop() must cancel it to ensure prompt cleanup.
  • background_task_handle() returns the JoinHandle of the primary background task, if any. The Runtime monitors this handle for unexpected exits (crash propagation).

Required Methods§

Source

fn start<'life0, 'async_trait>( &'life0 mut self, context: ConsumerContext, ) -> Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Start consuming messages, sending them through the provided context.

Source

fn stop<'life0, 'async_trait>( &'life0 mut self, ) -> Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Stop consuming messages and clean up all resources.

Called by the Runtime on every exit path after start() succeeds. See the Shutdown Contract above.

Provided Methods§

Source

fn suspend<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Temporarily suspend consuming messages without fully stopping.

Default: no-op, returns Ok(()).

Source

fn resume<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), CamelError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Resume consuming after a previous suspension.

Default: no-op, returns Ok(()).

Source

fn concurrency_model(&self) -> ConcurrencyModel

Declares this consumer’s natural concurrency model.

The runtime uses this to decide whether to process exchanges sequentially or spawn per-exchange. Consumers that accept inbound connections (HTTP, WebSocket, Kafka) should override this to return ConcurrencyModel::Concurrent.

Default: Sequential.

Source

fn background_task_handle( &mut self, ) -> Option<JoinHandle<Result<(), CamelError>>>

Return a handle to the consumer’s long-running background task so the runtime can monitor it for unexpected exits after start() returns Ok.

Default: None — consumer’s work completes entirely within start(). Override: return Some(handle) if start() spawns a detached task.

Contract: the task must observe ConsumerContext::cancelled() so runtime shutdown is distinguishable from crash exits.

This method is called at most once; implementations should use .take() to transfer ownership of the handle.

Source

fn set_security_context(&mut self, _ctx: SecurityContext)

Set the security context for this consumer.

Called by the route controller before start() so the consumer can register auth state (e.g. WebSocket auth in WsAppState).

Default: no-op, returns Ok(()).

Dyn Compatibility§

This trait is dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§