[][src]Trait nakadion::handler::BatchHandlerFactory

pub trait BatchHandlerFactory: Send + Sync + 'static {
    fn handler<'a>(
        &'a self,
        assignment: &'a HandlerAssignment
    ) -> BoxFuture<'a, Result<Box<dyn BatchHandler>, Error>>; }

A factory that creates BatchHandlers.

Usage

A BatchHandlerFactory can be used in two ways:

  • It does not contain any state it shares with the created BatchHandlers. This is useful when incoming data is partitioned in a way that all BatchHandlers act only on data that never appears on another partition.

  • It contains state that is shared with the BatchHandlers. E.g. a cache that contains data that can appear on other partitions.

Example

use std::sync::{Arc, Mutex};
use futures::{FutureExt, future::BoxFuture};

use nakadion::handler::*;

// Use a struct to maintain state
struct MyHandler(Arc<Mutex<i32>>);

// Implement the processing logic by implementing `BatchHandler`
impl BatchHandler for MyHandler {
    fn handle(&mut self, _events: Bytes, _meta: BatchMeta) -> BatchHandlerFuture {
        async move {
            *self.0.lock().unwrap() += 1;
            BatchPostAction::commit_no_stats()
        }.boxed()
    }
}

// We keep shared state for all handlers in the `BatchHandlerFactory`
struct MyBatchHandlerFactory(Arc<Mutex<i32>>);

// Now we implement the trait `BatchHandlerFactory` to control how
// our `BatchHandler`s are created
impl BatchHandlerFactory for MyBatchHandlerFactory {
    fn handler(
        &self,
        _assignment: &HandlerAssignment,
    ) ->  BoxFuture<Result<Box<dyn BatchHandler>, Error>> {
        async move {
            Ok(Box::new(MyHandler(self.0.clone())) as Box<_>)
        }.boxed()
    }
}

let count = Arc::new(Mutex::new(0));

let factory = MyBatchHandlerFactory(count.clone());

Required methods

fn handler<'a>(
    &'a self,
    assignment: &'a HandlerAssignment
) -> BoxFuture<'a, Result<Box<dyn BatchHandler>, Error>>

New BatchHandler was requested.

assignment defines for what event types and partitions the returned BatchHandler will be used. Nakadion guarantees that this will stay true over the whole lifetime of the BatchHandler.

Returning an Error aborts the Consumer.

It is up to the BatchHandlerFactory on whether it respects assignment.

Loading content...

Implementors

impl<T> BatchHandlerFactory for T where
    T: for<'a> Fn(&'a HandlerAssignment) -> BoxFuture<'a, Result<Box<dyn BatchHandler>, Error>> + Send + Sync + 'static, 
[src]

Loading content...