[−][src]Trait nakadion::HandlerFactory
A factory that creates BatchHandler
s.
Usage
A HandlerFactory
can be used in two ways:
-
It does not contain any state it shares with the created
BatchHandler
s. This is useful when incoming data is partitioned in a way that allBatchHandler
s act only on data that never appears on another partition. -
It contains state that is shared with the
BatchHandler
s. E.g. a cache that conatins data that can appear on other partitions.
Example
use std::sync::{Arc, Mutex}; use nakadion::{ BatchHandler, CreateHandlerError, EventType, HandlerFactory, PartitionId, ProcessingStatus, }; // Use a struct to maintain state struct MyHandler(Arc<Mutex<i32>>); // Implement the processing logic by implementing `BatchHandler` impl BatchHandler for MyHandler { fn handle(&mut self, _event_type: EventType, _events: &[u8]) -> ProcessingStatus { *self.0.lock().unwrap() += 1; ProcessingStatus::processed_no_hint() } } // We keep shared state for all handlers in the `HandlerFactory` struct MyHandlerFactory(Arc<Mutex<i32>>); // Now we implement the trait `HandlerFactory` to control how // our `BatchHandler`s are created impl HandlerFactory for MyHandlerFactory { type Handler = MyHandler; fn create_handler( &self, _partition: &PartitionId, ) -> Result<Self::Handler, CreateHandlerError> { Ok(MyHandler(self.0.clone())) } } let count = Arc::new(Mutex::new(0)); let factory = MyHandlerFactory(count.clone()); // Handler creation will be done by Nakadion let mut handler1 = factory.create_handler(&PartitionId::new("1")).unwrap(); let mut handler2 = factory.create_handler(&PartitionId::new("2")).unwrap(); // This will be done by Nakadion let status1 = handler1.handle(EventType::new("test_event"), &[]); assert_eq!(*count.lock().unwrap(), 1); assert_eq!(status1, ProcessingStatus::Processed(None)); // This will be done by Nakadion let status2 = handler2.handle(EventType::new("test_event"), &[]); assert_eq!(*count.lock().unwrap(), 2); assert_eq!(status2, ProcessingStatus::Processed(None));
Associated Types
type Handler: BatchHandler + Send + 'static
Required methods
fn create_handler(
&self,
partition: &PartitionId
) -> Result<Self::Handler, CreateHandlerError>
&self,
partition: &PartitionId
) -> Result<Self::Handler, CreateHandlerError>