1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
//! Handler for handling events and implementing event processing logic use serde::de::DeserializeOwned; use serde_json; use nakadi::model::{EventType, PartitionId}; /// This struct must be returned after processing a batch /// to tell nakadion how to continue. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ProcessingStatus { /// The cursor of the just processed batch /// can be committed to make progrss on the stream. /// /// Optionally the number of processed events can be provided /// to help with deciding on when to commit the cursor. /// /// The number of events should be the number of events that were in the batch. Processed(Option<usize>), /// Processing failed. Do not commit the cursor. This /// always ends in the streaming being aborted for the current /// stream. /// /// A reason must be given which will be logged. Failed { reason: String }, } impl ProcessingStatus { /// Cursor can be committed and no information on /// how many events were processed is given. pub fn processed_no_hint() -> ProcessingStatus { ProcessingStatus::Processed(None) } /// Cursor can be committed and a hint on /// how many events were processed is given. pub fn processed(num_events_hint: usize) -> ProcessingStatus { ProcessingStatus::Processed(Some(num_events_hint)) } /// Processing events failed with the given reason. pub fn failed<T: Into<String>>(reason: T) -> ProcessingStatus { ProcessingStatus::Failed { reason: reason.into(), } } } /// A handler that contains batch processing logic. /// /// This trait will be called by Nakadion when a batch has to /// be processed. The `BatchHandler` only receives an `EventType` /// and a slice of bytes that contains the batch. /// /// The `events` slice always contains a JSON encoded array of events. /// /// # Hint /// /// The `handle` method gets called on `&mut self`. /// /// # Example /// /// ```rust /// use nakadion::{BatchHandler, EventType, ProcessingStatus}; /// /// // Use a struct to maintain state /// struct MyHandler { /// pub count: i32, /// } /// /// // Implement the processing logic by implementing `BatchHandler` /// impl BatchHandler for MyHandler { /// fn handle(&mut self, _event_type: EventType, _events: &[u8]) -> ProcessingStatus { /// self.count += 1; /// ProcessingStatus::processed_no_hint() /// } /// } /// /// // Handler creation will be done by `HandlerFactory` /// let mut handler = MyHandler { count: 0 }; /// /// // This will be done by Nakadion /// let status = handler.handle(EventType::new("test_event"), &[]); /// /// assert_eq!(handler.count, 1); /// assert_eq!(status, ProcessingStatus::Processed(None)); /// ``` pub trait BatchHandler { /// Handle the events. /// /// Calling this method may never panic! fn handle(&mut self, event_type: EventType, events: &[u8]) -> ProcessingStatus; } /// An error that can happen when the `HandlerFactory` was not able to create /// a new handler. This will abort the consumption of the current stream. #[derive(Debug, Fail)] #[fail(display = "{}", message)] pub struct CreateHandlerError { pub message: String, } /// A factory that creates `BatchHandler`s. /// /// # Usage /// /// A `HandlerFactory` can be used in two ways: /// /// * It does not contain any state it shares with the created `BatchHandler`s. /// This is useful when incoming data is partitioned in a way that all `BatchHandler`s /// act only on data that never appears on another partition. /// /// * It contains state that is shared with the `BatchHandler`s. E.g. a cache that /// conatins data that can appear on other partitions. /// # Example /// /// ```rust /// use std::sync::{Arc, Mutex}; /// /// use nakadion::{BatchHandler, CreateHandlerError, EventType, HandlerFactory, PartitionId, /// ProcessingStatus}; /// /// // Use a struct to maintain state /// struct MyHandler(Arc<Mutex<i32>>); /// /// // Implement the processing logic by implementing `BatchHandler` /// impl BatchHandler for MyHandler { /// fn handle(&mut self, _event_type: EventType, _events: &[u8]) -> ProcessingStatus { /// *self.0.lock().unwrap() += 1; /// ProcessingStatus::processed_no_hint() /// } /// } /// /// // We keep shared state for all handlers in the `HandlerFactory` /// struct MyHandlerFactory(Arc<Mutex<i32>>); /// /// // Now we implement the trait `HandlerFactory` to control how /// // our `BatchHandler`s are created /// impl HandlerFactory for MyHandlerFactory { /// type Handler = MyHandler; /// fn create_handler( /// &self, /// _partition: &PartitionId, /// ) -> Result<Self::Handler, CreateHandlerError> { /// Ok(MyHandler(self.0.clone())) /// } /// } /// /// let count = Arc::new(Mutex::new(0)); /// /// let factory = MyHandlerFactory(count.clone()); /// /// // Handler creation will be done by Nakadion /// let mut handler1 = factory.create_handler(&PartitionId::new("1")).unwrap(); /// let mut handler2 = factory.create_handler(&PartitionId::new("2")).unwrap(); /// /// // This will be done by Nakadion /// let status1 = handler1.handle(EventType::new("test_event"), &[]); /// /// assert_eq!(*count.lock().unwrap(), 1); /// assert_eq!(status1, ProcessingStatus::Processed(None)); /// /// // This will be done by Nakadion /// let status2 = handler2.handle(EventType::new("test_event"), &[]); /// /// assert_eq!(*count.lock().unwrap(), 2); /// assert_eq!(status2, ProcessingStatus::Processed(None)); /// ``` pub trait HandlerFactory { type Handler: BatchHandler + Send + 'static; fn create_handler(&self, partition: &PartitionId) -> Result<Self::Handler, CreateHandlerError>; } /// This is basically the same as a `ProcessingStatus` but returned /// from a `TypedBatchHandler`. /// /// It is not necessary to report the number of processed events since /// the `TypedBatchHandler` itself keeps track of them. #[derive(Debug, Clone, PartialEq, Eq)] pub enum TypedProcessingStatus { /// All events were processed and the cursor may be committed to /// make progress on the stream. Processed, /// Processing events failed and the stream should be aborted. Failed { reason: String }, } /// Basically the same a `BatchHandler` with the difference that /// deserialized events are passed to the processing logic. /// /// This is basically a convinience handler. /// /// The events must implement `serde`s `DeserializeOwned`. /// /// # Hint /// /// The `handle` method gets called on `&mut self`. /// # Example /// /// ```norun /// /// use nakadion::{EventType, TypedBatchHandler, TypedProcessingStatus}; /// /// // Use a struct to maintain state /// struct MyHandler { /// pub count: i32, /// } /// /// #[derive(Deserialize)] /// struct MyEvent(i32); /// /// // Implement the processing logic by implementing `BatchHandler` /// impl TypedBatchHandler for MyHandler { /// type Event = MyEvent; /// /// fn handle(&mut self, events: Vec<MyEvent>) -> TypedProcessingStatus { /// for MyEvent(amount) in events { /// self.count += amount; /// } /// TypedProcessingStatus::Processed /// } /// } /// /// // Handler creation will be done by `HandlerFactory` /// let mut handler = MyHandler { count: 0 }; /// /// // This will be done by Nakadion /// handler.handle(vec![MyEvent(1), MyEvent(2)]); /// /// assert_eq!(handler.count, 3); /// ``` pub trait TypedBatchHandler { type Event: DeserializeOwned; /// Execute the processing logic with a deserialized batch of events. fn handle(&mut self, events: Vec<Self::Event>) -> TypedProcessingStatus; } impl<T, E> BatchHandler for T where T: TypedBatchHandler<Event = E>, E: DeserializeOwned, { fn handle(&mut self, event_type: EventType, events: &[u8]) -> ProcessingStatus { let events: Vec<E> = match serde_json::from_slice(events) { Ok(events) => events, Err(err) => { return ProcessingStatus::Failed { reason: format!( "Could not deserialize events(event type: {}): {}", event_type.0, err ), } } }; let n = events.len(); match TypedBatchHandler::handle(self, events) { TypedProcessingStatus::Processed => ProcessingStatus::processed(n), TypedProcessingStatus::Failed { reason } => ProcessingStatus::Failed { reason }, } } }