1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
//! Handler for handling events and implementing event processing logic use serde::de::DeserializeOwned; use serde_json; use nakadi::model::PartitionId; #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct SubscriptionCursor { pub partition: PartitionId, pub offset: String, pub event_type: String, } /// This struct must be returned after processing a batch /// to tell nakadion how to continue. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ProcessingStatus { /// The cursor of the just processed batch /// can be committed to make progrss on the stream. /// /// Optionally the number of processed events can be provided /// to help with deciding on when to commit the cursor. /// /// The number of events should be the number of events that were in the /// batch. Processed(Option<usize>), /// Processing failed. Do not commit the cursor. This /// always ends in the streaming being aborted for the current /// stream. /// /// A reason must be given which will be logged. Failed { reason: String }, } impl ProcessingStatus { /// Cursor can be committed and no information on /// how many events were processed is given. pub fn processed_no_hint() -> ProcessingStatus { ProcessingStatus::Processed(None) } /// Cursor can be committed and a hint on /// how many events were processed is given. pub fn processed(num_events_hint: usize) -> ProcessingStatus { ProcessingStatus::Processed(Some(num_events_hint)) } /// Processing events failed with the given reason. pub fn failed<T: Into<String>>(reason: T) -> ProcessingStatus { ProcessingStatus::Failed { reason: reason.into(), } } } /// A handler that contains batch processing logic. /// /// This trait will be called by Nakadion when a batch has to /// be processed. The `BatchHandler` only receives an `EventType` /// and a slice of bytes that contains the batch. /// /// The `events` slice always contains a JSON encoded array of events. /// /// # Hint /// /// The `handle` method gets called on `&mut self`. /// /// # Example /// /// ```rust /// use nakadion::{BatchHandler, SubscriptionCursor, ProcessingStatus, PartitionId}; /// /// // Use a struct to maintain state /// struct MyHandler { /// pub count: i32, /// } /// /// // Implement the processing logic by implementing `BatchHandler` /// impl BatchHandler for MyHandler { /// fn handle(&mut self, _cursor: &SubscriptionCursor, _events: &[u8]) -> ProcessingStatus { /// self.count += 1; /// ProcessingStatus::processed_no_hint() /// } /// } /// /// // Handler creation will be done by `HandlerFactory` /// let mut handler = MyHandler { count: 0 }; /// /// // This will be done by Nakadion /// let cursor = SubscriptionCursor { /// partition: PartitionId::new("1"), /// offset: "53".to_string(), /// event_type: "test_event".to_string(), /// }; /// let status = handler.handle(&cursor, &[]); /// /// assert_eq!(handler.count, 1); /// assert_eq!(status, ProcessingStatus::Processed(None)); /// ``` pub trait BatchHandler { /// Handle the events. /// /// Calling this method may never panic! fn handle(&mut self, cursor: &SubscriptionCursor, events: &[u8]) -> ProcessingStatus; } /// An error that can happen when the `HandlerFactory` was not able to create /// a new handler. This will abort the consumption of the current stream. #[derive(Debug, Fail)] #[fail(display = "{}", message)] pub struct CreateHandlerError { pub message: String, } /// A factory that creates `BatchHandler`s. /// /// # Usage /// /// A `HandlerFactory` can be used in two ways: /// /// * It does not contain any state it shares with the created `BatchHandler`s. /// This is useful when incoming data is partitioned in a way that all /// `BatchHandler`s act only on data that never appears on another partition. /// /// * It contains state that is shared with the `BatchHandler`s. E.g. a cache /// that conatins data that can appear on other partitions. /// # Example /// /// ```rust /// use std::sync::{Arc, Mutex}; /// /// use nakadion::{ /// BatchHandler, CreateHandlerError, SubscriptionCursor, HandlerFactory, PartitionId, ProcessingStatus, /// }; /// /// // Use a struct to maintain state /// struct MyHandler(Arc<Mutex<i32>>); /// /// // Implement the processing logic by implementing `BatchHandler` /// impl BatchHandler for MyHandler { /// fn handle(&mut self, _cursor: &SubscriptionCursor, _events: &[u8]) -> ProcessingStatus { /// *self.0.lock().unwrap() += 1; /// ProcessingStatus::processed_no_hint() /// } /// } /// /// // We keep shared state for all handlers in the `HandlerFactory` /// struct MyHandlerFactory(Arc<Mutex<i32>>); /// /// // Now we implement the trait `HandlerFactory` to control how /// // our `BatchHandler`s are created /// impl HandlerFactory for MyHandlerFactory { /// type Handler = MyHandler; /// fn create_handler( /// &self, /// _partition: &PartitionId, /// ) -> Result<Self::Handler, CreateHandlerError> { /// Ok(MyHandler(self.0.clone())) /// } /// } /// /// let count = Arc::new(Mutex::new(0)); /// /// let factory = MyHandlerFactory(count.clone()); /// /// // Handler creation will be invoked by Nakadion /// let mut handler1 = factory.create_handler(&PartitionId::new("1")).unwrap(); /// let mut handler2 = factory.create_handler(&PartitionId::new("2")).unwrap(); /// /// // This will be done by Nakadion /// let cursor = SubscriptionCursor { /// partition: PartitionId::new("1"), /// offset: "53".to_string(), /// event_type: "test_event".to_string(), /// }; /// let status1 = handler1.handle(&cursor, &[]); /// /// assert_eq!(*count.lock().unwrap(), 1); /// assert_eq!(status1, ProcessingStatus::Processed(None)); /// /// // This will be done by Nakadion /// let cursor = SubscriptionCursor { /// partition: PartitionId::new("2"), /// offset: "54".to_string(), /// event_type: "test_event".to_string(), /// }; /// let status2 = handler2.handle(&cursor, &[]); /// /// assert_eq!(*count.lock().unwrap(), 2); /// assert_eq!(status2, ProcessingStatus::Processed(None)); /// ``` pub trait HandlerFactory { type Handler: BatchHandler + Send + 'static; fn create_handler(&self, partition: &PartitionId) -> Result<Self::Handler, CreateHandlerError>; } /// This is basically the same as a `ProcessingStatus` but returned /// from a `TypedBatchHandler`. /// /// It is not necessary to report the number of processed events since /// the `TypedBatchHandler` itself keeps track of them. #[derive(Debug, Clone, PartialEq, Eq)] pub enum TypedProcessingStatus { /// All events were processed and the cursor may be committed to /// make progress on the stream. Processed, /// Processing events failed and the stream should be aborted. Failed { reason: String }, } /// Basically the same a `BatchHandler` with the difference that /// deserialized events are passed to the processing logic. /// /// This is basically a convinience handler. /// /// The events must implement `serde`s `DeserializeOwned`. /// /// # Hint /// /// The `handle` method gets called on `&mut self`. /// # Example /// /// ```norun /// /// use nakadion::{EventType, TypedBatchHandler, TypedProcessingStatus}; /// /// // Use a struct to maintain state /// struct MyHandler { /// pub count: i32, /// } /// /// #[derive(Deserialize)] /// struct MyEvent(i32); /// /// // Implement the processing logic by implementing `BatchHandler` /// impl TypedBatchHandler for MyHandler { /// type Event = MyEvent; /// /// fn handle(&mut self, events: Vec<MyEvent>) -> TypedProcessingStatus { /// for MyEvent(amount) in events { /// self.count += amount; /// } /// TypedProcessingStatus::Processed /// } /// } /// /// // Handler creation will be done by `HandlerFactory` /// let mut handler = MyHandler { count: 0 }; /// /// // This will be done by Nakadion /// handler.handle(vec![MyEvent(1), MyEvent(2)]); /// /// assert_eq!(handler.count, 3); /// ``` pub trait TypedBatchHandler { type Event: DeserializeOwned; /// Execute the processing logic with a deserialized batch of events. fn handle( &mut self, cursor: &SubscriptionCursor, events: Vec<Self::Event>, ) -> TypedProcessingStatus; // A handler which is invoked if deserialization of the // whole events batch at once failed. fn handle_deserialization_errors( &mut self, results: Vec<EventDeserializationResult<Self::Event>>, ) -> TypedProcessingStatus { let num_events = results.len(); let num_failed = results.iter().filter(|r| r.is_err()).count(); TypedProcessingStatus::Failed { reason: format!( "Failed to deserialize {} out of {} events", num_failed, num_events ), } } } pub type EventDeserializationResult<T> = Result<T, (serde_json::Value, serde_json::Error)>; impl<T, E> BatchHandler for T where T: TypedBatchHandler<Event = E>, E: DeserializeOwned, { fn handle(&mut self, cursor: &SubscriptionCursor, events: &[u8]) -> ProcessingStatus { match serde_json::from_slice::<Vec<E>>(events) { Ok(events) => { let n = events.len(); match TypedBatchHandler::handle(self, cursor, events) { TypedProcessingStatus::Processed => ProcessingStatus::processed(n), TypedProcessingStatus::Failed { reason } => ProcessingStatus::Failed { reason }, } } Err(_) => match try_deserialize_individually::<E>(events) { Ok(results) => { let n = results.len(); match self.handle_deserialization_errors(results) { TypedProcessingStatus::Processed => ProcessingStatus::processed(n), TypedProcessingStatus::Failed { reason } => { ProcessingStatus::Failed { reason } } } } Err(err) => ProcessingStatus::Failed { reason: err.to_string(), }, }, } } } // This function clones the ast before deserializing... but we are in an // exceptional case anyways... fn try_deserialize_individually<T: DeserializeOwned>( events: &[u8], ) -> Result<Vec<EventDeserializationResult<T>>, serde_json::Error> { let deserialized_json_asts: Vec<serde_json::Value> = serde_json::from_slice(events)?; let mut results = Vec::with_capacity(deserialized_json_asts.len()); for ast in deserialized_json_asts { let ast2 = ast.clone(); match serde_json::from_value(ast) { Ok(event) => results.push(Ok(event)), Err(err) => results.push(Err((ast2, err))), } } Ok(results) } #[test] fn parse_cursor() { use serde_json; let cursor_sample = r#"{"partition":"6","offset":"543","#.to_owned() + r#""event_type":"order.ORDER_RECEIVED","cursor_token":"# + r#""b75c3102-98a4-4385-a5fd-b96f1d7872f2"}"#; let parsed: SubscriptionCursor = serde_json::from_str(&cursor_sample).unwrap(); let expected = SubscriptionCursor { partition: PartitionId::new("6"), offset: "543".to_string(), event_type: "order.ORDER_RECEIVED".to_string(), }; assert_eq!(parsed, expected); }