1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
//! Kit for creating a a handler for batches of events //! //! Start here if you want to implement a handler for processing of events use std::fmt; use std::time::{Duration, Instant}; pub use bytes::Bytes; use futures::future::BoxFuture; pub type BatchHandlerFuture<'a> = BoxFuture<'a, BatchPostAction>; use crate::nakadi_types::model::{ event_type::EventTypeName, partition::PartitionId, subscription::{EventTypePartition, EventTypePartitionLike as _, StreamId, SubscriptionCursor}, }; pub use crate::nakadi_types::Error; mod typed; pub use typed::*; /// Information on the current batch passed to a `BatchHandler`. /// /// The `frame_id` is monotonically increasing for each `BatchHandler` /// within a stream(same `StreamId`) /// as long a s a dispatch strategy which keeps the ordering of /// events is chosen. There may be gaps between the ids. pub struct BatchMeta<'a> { pub stream_id: StreamId, pub cursor: &'a SubscriptionCursor, pub received_at: Instant, pub frame_id: usize, } /// Returned by a `BatchHandler` and tell `Nakadion` /// how to continue. #[derive(Debug, Clone)] pub enum BatchPostAction { /// Commit the batch Commit(BatchStats), /// Do not commit the batch and continue /// /// Use if committed "manually" within the handler DoNotCommit(BatchStats), /// Abort the current stream and reconnect AbortStream(String), /// Abort the consumption and shut down ShutDown(String), } impl BatchPostAction { pub fn commit_no_stats() -> Self { BatchPostAction::Commit(BatchStats::default()) } pub fn commit(n_events: usize, t_deserialize: Duration) -> Self { BatchPostAction::Commit(BatchStats { n_events: Some(n_events), t_deserialize: Some(t_deserialize), }) } pub fn do_not_commit_no_stats() -> Self { BatchPostAction::DoNotCommit(BatchStats::default()) } pub fn do_not_commit(n_events: usize, t_deserialize: Duration) -> Self { BatchPostAction::DoNotCommit(BatchStats { n_events: Some(n_events), t_deserialize: Some(t_deserialize), }) } } /// Statistics on the processed batch #[derive(Default, Debug, Clone, PartialEq, Eq)] pub struct BatchStats { /// The number of events handled pub n_events: Option<usize>, /// The time it took to deserialize the batch pub t_deserialize: Option<Duration>, } /// Returned by a `BatchHandler` when queried /// on inactivity. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum InactivityAnswer { KeepMeAlive, KillMe, } impl InactivityAnswer { /// Returns `true` if the `BatchHandler` should be killed. pub fn should_kill(self) -> bool { self == InactivityAnswer::KillMe } /// Returns `true` if the `BatchHandler` should stay alive. pub fn should_stay_alive(self) -> bool { self == InactivityAnswer::KeepMeAlive } } /// A handler that implements batch processing logic. /// /// This trait will be called by Nakadion when a batch has to /// be processed. The `BatchHandler` only receives an `EventType` /// and a slice of bytes that contains the batch. /// /// The `events` slice always contains a JSON encoded array of events. /// /// # Hint /// /// The `handle` method gets called on `&mut self`. /// /// # Example /// /// ```rust /// use futures::FutureExt; /// /// use nakadion::handler::{BatchHandler, BatchPostAction, BatchMeta, Bytes, BatchHandlerFuture}; /// use nakadion::nakadi_types::model::subscription::EventTypeName; /// /// // Use a struct to maintain state /// struct MyHandler { /// pub count: i32, /// } /// /// // Implement the processing logic by implementing `BatchHandler` /// impl BatchHandler for MyHandler { /// fn handle(&mut self, _events: Bytes, _meta: BatchMeta) -> BatchHandlerFuture { /// async move { /// self.count += 1; /// BatchPostAction::commit_no_stats() /// }.boxed() /// } /// } /// ``` pub trait BatchHandler: Send + 'static { fn handle<'a>(&'a mut self, events: Bytes, meta: BatchMeta<'a>) -> BatchHandlerFuture<'a>; fn on_inactivity_detected( &mut self, _inactive_for: Duration, _last_activity: Instant, ) -> InactivityAnswer { InactivityAnswer::KeepMeAlive } } /// Defines what a `BatchHandler` will receive. /// /// This value should the same for the whole lifetime of the /// `BatchHandler`. "Should" because in the end it is the /// `BatchHandlerFactory` which returns `BatchHandler`s. But it /// is guaranteed that `Nakadion` will only pass events to a `BatchHandler` /// as defined by the `DispatchStrategy`. #[derive(Debug, Clone, Eq, PartialEq)] pub enum HandlerAssignment { /// Everything can be passed to the `BatchHandler`. Unspecified, /// The `BatchHandler` will only receive events /// of the given event type but from any partition. EventType(EventTypeName), /// The `BatchHandler` will only receive events /// of the given event type on the given partition. EventTypePartition(EventTypePartition), } impl fmt::Display for HandlerAssignment { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { HandlerAssignment::Unspecified => write!(f, "[unspecified]")?, HandlerAssignment::EventType(ref event_type) => { write!(f, "[event_type={}]", event_type)? } HandlerAssignment::EventTypePartition(ref event_type_partition) => write!( f, "[event_type={}, partition={}]", event_type_partition.event_type(), event_type_partition.partition() )?, } Ok(()) } } impl HandlerAssignment { pub fn event_type(&self) -> Option<&EventTypeName> { self.event_type_and_partition().0 } pub fn partition(&self) -> Option<&PartitionId> { self.event_type_and_partition().1 } pub fn event_type_and_partition(&self) -> (Option<&EventTypeName>, Option<&PartitionId>) { match self { HandlerAssignment::Unspecified => (None, None), HandlerAssignment::EventType(event_type) => (Some(&event_type), None), HandlerAssignment::EventTypePartition(ref etp) => { (Some(etp.event_type()), Some(etp.partition())) } } } } /// A factory that creates `BatchHandler`s. /// /// # Usage /// /// A `BatchHandlerFactory` can be used in two ways: /// /// * It does not contain any state it shares with the created `BatchHandler`s. /// This is useful when incoming data is partitioned in a way that all /// `BatchHandler`s act only on data that never appears on another partition. /// /// * It contains state that is shared with the `BatchHandler`s. E.g. a cache /// that contains data that can appear on other partitions. /// # Example /// /// ```rust /// use std::sync::{Arc, Mutex}; /// use futures::{FutureExt, future::BoxFuture}; /// /// use nakadion::handler::*; /// /// // Use a struct to maintain state /// struct MyHandler(Arc<Mutex<i32>>); /// /// // Implement the processing logic by implementing `BatchHandler` /// impl BatchHandler for MyHandler { /// fn handle(&mut self, _events: Bytes, _meta: BatchMeta) -> BatchHandlerFuture { /// async move { /// *self.0.lock().unwrap() += 1; /// BatchPostAction::commit_no_stats() /// }.boxed() /// } /// } /// /// // We keep shared state for all handlers in the `BatchHandlerFactory` /// struct MyBatchHandlerFactory(Arc<Mutex<i32>>); /// /// // Now we implement the trait `BatchHandlerFactory` to control how /// // our `BatchHandler`s are created /// impl BatchHandlerFactory for MyBatchHandlerFactory { /// type Handler = MyHandler; /// fn handler( /// &self, /// _assignment: &HandlerAssignment, /// ) -> BoxFuture<Result<Self::Handler, Error>> { /// async move { /// Ok(MyHandler(self.0.clone())) /// }.boxed() /// } /// } /// /// let count = Arc::new(Mutex::new(0)); /// /// let factory = MyBatchHandlerFactory(count.clone()); /// ``` pub trait BatchHandlerFactory: Send + Sync + 'static { type Handler: BatchHandler; /// New `BatchHandler` was requested. /// /// `assignment` defines for what event types and partitions the returned /// `BatchHandler` will be used. `Nakadion` guarantees that this will stay true /// over the whole lifetime of the `BatchHandler`. /// /// Returning an `Error` aborts the `Consumer`. /// /// It is up to the `BatchHandlerFactory` on whether it respects `assignment`. fn handler(&self, assignment: &HandlerAssignment) -> BoxFuture<Result<Self::Handler, Error>>; }