[][src]Trait nakadion::HandlerFactory

pub trait HandlerFactory {
type Handler: BatchHandler + Send + 'static;
    fn create_handler(
        &self,
        partition: &PartitionId
    ) -> Result<Self::Handler, CreateHandlerError>; }

A factory that creates BatchHandlers.

Usage

A HandlerFactory can be used in two ways:

  • It does not contain any state it shares with the created BatchHandlers. This is useful when incoming data is partitioned in a way that all BatchHandlers act only on data that never appears on another partition.

  • It contains state that is shared with the BatchHandlers. E.g. a cache that conatins data that can appear on other partitions.

Example

use std::sync::{Arc, Mutex};

use nakadion::{
    BatchHandler, CreateHandlerError, EventType, HandlerFactory, PartitionId, ProcessingStatus,
};

// Use a struct to maintain state
struct MyHandler(Arc<Mutex<i32>>);

// Implement the processing logic by implementing `BatchHandler`
impl BatchHandler for MyHandler {
    fn handle(&mut self, _event_type: EventType, _events: &[u8]) -> ProcessingStatus {
        *self.0.lock().unwrap() += 1;
        ProcessingStatus::processed_no_hint()
    }
}

// We keep shared state for all handlers in the `HandlerFactory`
struct MyHandlerFactory(Arc<Mutex<i32>>);

// Now we implement the trait `HandlerFactory` to control how
// our `BatchHandler`s are created
impl HandlerFactory for MyHandlerFactory {
    type Handler = MyHandler;
    fn create_handler(
        &self,
        _partition: &PartitionId,
    ) -> Result<Self::Handler, CreateHandlerError> {
        Ok(MyHandler(self.0.clone()))
    }
}

let count = Arc::new(Mutex::new(0));

let factory = MyHandlerFactory(count.clone());

// Handler creation will be done by Nakadion
let mut handler1 = factory.create_handler(&PartitionId::new("1")).unwrap();
let mut handler2 = factory.create_handler(&PartitionId::new("2")).unwrap();

// This will be done by Nakadion
let status1 = handler1.handle(EventType::new("test_event"), &[]);

assert_eq!(*count.lock().unwrap(), 1);
assert_eq!(status1, ProcessingStatus::Processed(None));

// This will be done by Nakadion
let status2 = handler2.handle(EventType::new("test_event"), &[]);

assert_eq!(*count.lock().unwrap(), 2);
assert_eq!(status2, ProcessingStatus::Processed(None));

Associated Types

type Handler: BatchHandler + Send + 'static

Loading content...

Required methods

fn create_handler(
    &self,
    partition: &PartitionId
) -> Result<Self::Handler, CreateHandlerError>

Loading content...

Implementors

Loading content...