[][src]Trait nakadion::HandlerFactory

pub trait HandlerFactory {
    type Handler: BatchHandler + Send + 'static;
    fn create_handler(
        &self,
        partition: &PartitionId
    ) -> Result<Self::Handler, CreateHandlerError>; }

A factory that creates BatchHandlers.

Usage

A HandlerFactory can be used in two ways:

  • It does not contain any state it shares with the created BatchHandlers. This is useful when incoming data is partitioned in a way that all BatchHandlers act only on data that never appears on another partition.

  • It contains state that is shared with the BatchHandlers. E.g. a cache that conatins data that can appear on other partitions.

Example

use std::sync::{Arc, Mutex};

use nakadion::{
    BatchHandler, CreateHandlerError, SubscriptionCursor, HandlerFactory, PartitionId, ProcessingStatus,
};

// Use a struct to maintain state
struct MyHandler(Arc<Mutex<i32>>);

// Implement the processing logic by implementing `BatchHandler`
impl BatchHandler for MyHandler {
    fn handle(&mut self, _cursor: &SubscriptionCursor, _events: &[u8]) -> ProcessingStatus {
        *self.0.lock().unwrap() += 1;
        ProcessingStatus::processed_no_hint()
    }
}

// We keep shared state for all handlers in the `HandlerFactory`
struct MyHandlerFactory(Arc<Mutex<i32>>);

// Now we implement the trait `HandlerFactory` to control how
// our `BatchHandler`s are created
impl HandlerFactory for MyHandlerFactory {
    type Handler = MyHandler;
    fn create_handler(
        &self,
        _partition: &PartitionId,
    ) -> Result<Self::Handler, CreateHandlerError> {
        Ok(MyHandler(self.0.clone()))
    }
}

let count = Arc::new(Mutex::new(0));

let factory = MyHandlerFactory(count.clone());

// Handler creation will be invoked by Nakadion
let mut handler1 = factory.create_handler(&PartitionId::new("1")).unwrap();
let mut handler2 = factory.create_handler(&PartitionId::new("2")).unwrap();

// This will be done by Nakadion
let cursor = SubscriptionCursor {
   partition: PartitionId::new("1"),
   offset: "53".to_string(),
   event_type: "test_event".to_string(),
};
let status1 = handler1.handle(&cursor, &[]);

assert_eq!(*count.lock().unwrap(), 1);
assert_eq!(status1, ProcessingStatus::Processed(None));

// This will be done by Nakadion
let cursor = SubscriptionCursor {
   partition: PartitionId::new("2"),
   offset: "54".to_string(),
   event_type: "test_event".to_string(),
};
let status2 = handler2.handle(&cursor, &[]);

assert_eq!(*count.lock().unwrap(), 2);
assert_eq!(status2, ProcessingStatus::Processed(None));

Associated Types

type Handler: BatchHandler + Send + 'static

Loading content...

Required methods

fn create_handler(
    &self,
    partition: &PartitionId
) -> Result<Self::Handler, CreateHandlerError>

Loading content...

Implementors

Loading content...