Trait nakadion::handler::EventsHandler[][src]

pub trait EventsHandler {
    type Event: DeserializeOwned + Send + 'static;
    fn handle<'a>(
        &'a mut self,
        events: Vec<Self::Event>,
        meta: BatchMeta<'a>
    ) -> BoxFuture<'a, EventsPostAction>; fn handle_deserialization_errors<'a>(
        &'a mut self,
        results: Vec<EventDeserializationResult<Self::Event>>,
        _meta: BatchMeta<'a>
    ) -> EventsHandlerFuture<'a> { ... }
fn deserialize_on(&mut self, _n_bytes: usize) -> SpawnTarget { ... }
fn on_inactive(
        &mut self,
        _inactive_for: Duration,
        _last_activity: Instant
    ) -> InactivityAnswer { ... } }
Expand description

Basically the same a BatchHandler with the difference that deserialized events are passed to the processing logic.

This is basically a convenience handler.

The events must implement serdes DeserializeOwned.

Hint

The handle method gets called on &mut self.

Example

use serde::Deserialize;
use futures::FutureExt;

use nakadion::handler::*;


// Use a struct to maintain state
struct MyHandler {
    pub count: i32,
}

#[derive(Deserialize)]
struct MyEvent(i32);

// Implement the processing logic by implementing `BatchHandler`
impl EventsHandler for MyHandler {
    type Event = MyEvent;

    fn handle(&mut self, events: Vec<MyEvent>, _meta: BatchMeta) -> EventsHandlerFuture {
        async move {
            for MyEvent(amount) in events {
                self.count += amount;
            }
            EventsPostAction::Commit
        }.boxed()
    }

    fn deserialize_on(&mut self, n_bytes: usize) -> SpawnTarget {
        // We expect costly deserialization...
        if n_bytes > 10_000 {
            SpawnTarget::Dedicated
        } else {
            SpawnTarget::Executor
        }
    }
}

Associated Types

Required methods

Execute the processing logic with a deserialized batch of events.

Provided methods

A handler which is invoked if deserialization of the whole events batch at once failed.

The default implementation will shut down the consumer.

Decide on how to execute deserialization.

The number of bytes to be deserialized is passed to decide conditionally.

If not overwritten the default is SpawnTarget::Executor.

Periodically called if there were no events for a given time.

This method will only be called if the parameter handler_inactivity_timeout_secs was set for the Consumer

Implementors