[−][src]Trait nakadion::handler::EventsHandler
Basically the same a BatchHandler
with the difference that
deserialized events are passed to the processing logic.
This is basically a convenience handler.
The events must implement serde
s DeserializeOwned
.
Hint
The handle
method gets called on &mut self
.
Example
use serde::Deserialize; use futures::FutureExt; use nakadion::handler::*; // Use a struct to maintain state struct MyHandler { pub count: i32, } #[derive(Deserialize)] struct MyEvent(i32); // Implement the processing logic by implementing `BatchHandler` impl EventsHandler for MyHandler { type Event = MyEvent; fn handle(&mut self, events: Vec<MyEvent>, _meta: BatchMeta) -> EventsHandlerFuture { async move { for MyEvent(amount) in events { self.count += amount; } EventsPostAction::Commit }.boxed() } fn deserialize_on(&mut self, n_bytes: usize) -> SpawnTarget { // We expect costly deserialization... if n_bytes > 10_000 { SpawnTarget::Dedicated } else { SpawnTarget::Executor } } }
Associated Types
type Event: DeserializeOwned + Send + 'static
Required methods
fn handle<'a>(
&'a mut self,
events: Vec<Self::Event>,
meta: BatchMeta<'a>
) -> BoxFuture<'a, EventsPostAction>
&'a mut self,
events: Vec<Self::Event>,
meta: BatchMeta<'a>
) -> BoxFuture<'a, EventsPostAction>
Execute the processing logic with a deserialized batch of events.
Provided methods
fn handle_deserialization_errors<'a>(
&'a mut self,
results: Vec<EventDeserializationResult<Self::Event>>,
_meta: BatchMeta<'a>
) -> EventsHandlerFuture<'a>
&'a mut self,
results: Vec<EventDeserializationResult<Self::Event>>,
_meta: BatchMeta<'a>
) -> EventsHandlerFuture<'a>
A handler which is invoked if deserialization of the whole events batch at once failed.
The default implementation will shut down the consumer.
fn deserialize_on(&mut self, _n_bytes: usize) -> SpawnTarget
Decide on how to execute deserialization.
The number of bytes to be deserialized is passed to decide conditionally.
If not overwritten the default is SpawnTarget::Executor
.
fn on_inactive(
&mut self,
_inactive_for: Duration,
_last_activity: Instant
) -> InactivityAnswer
&mut self,
_inactive_for: Duration,
_last_activity: Instant
) -> InactivityAnswer
Periodically called if there were no events for a given time.
This method will only be called if the parameter handler_inactivity_timeout_secs
was set for the Consumer