1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
//! Handler for handling events and implementing event processing logic
use serde::de::DeserializeOwned;
use serde_json;

use nakadi::model::{EventType, PartitionId};

/// This struct must be returned after processing a batch
/// to tell nakadion how to continue.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProcessingStatus {
    /// The cursor of the just processed batch
    /// can be committed to make progrss on the stream.
    ///
    /// Optionally the number of processed events can be provided
    /// to help with deciding on when to commit the cursor.
    ///
    /// The number of events should be the number of events that were in the batch.
    Processed(Option<usize>),
    /// Processing failed. Do not commit the cursor. This
    /// always ends in the streaming being aborted for the current
    /// stream.
    ///
    /// A reason must be given which will be logged.
    Failed { reason: String },
}

impl ProcessingStatus {
    /// Cursor can be committed and no information on
    /// how many events were processed is given.
    pub fn processed_no_hint() -> ProcessingStatus {
        ProcessingStatus::Processed(None)
    }

    /// Cursor can be committed and a hint on
    /// how many events were processed is given.
    pub fn processed(num_events_hint: usize) -> ProcessingStatus {
        ProcessingStatus::Processed(Some(num_events_hint))
    }

    /// Processing events failed with the given reason.
    pub fn failed<T: Into<String>>(reason: T) -> ProcessingStatus {
        ProcessingStatus::Failed {
            reason: reason.into(),
        }
    }
}

/// A handler that contains batch processing logic.
///
/// This trait will be called by Nakadion when a batch has to
/// be processed. The `BatchHandler` only receives an `EventType`
/// and a slice of bytes that contains the batch.
///
/// The `events` slice always contains a JSON encoded array of events.
///
/// # Hint
///
/// The `handle` method gets called on `&mut self`.
///
/// # Example
///
/// ```rust
/// use nakadion::{BatchHandler, EventType, ProcessingStatus};
///
/// // Use a struct to maintain state
/// struct MyHandler {
///     pub count: i32,
/// }
///
/// // Implement the processing logic by implementing `BatchHandler`
/// impl BatchHandler for MyHandler {
///     fn handle(&mut self, _event_type: EventType, _events: &[u8]) -> ProcessingStatus {
///         self.count += 1;
///         ProcessingStatus::processed_no_hint()
///     }
/// }
///
/// // Handler creation will be done by `HandlerFactory`
/// let mut handler = MyHandler { count: 0 };
///
/// // This will be done by Nakadion
/// let status = handler.handle(EventType::new("test_event"), &[]);
///
/// assert_eq!(handler.count, 1);
/// assert_eq!(status, ProcessingStatus::Processed(None));
/// ```
pub trait BatchHandler {
    /// Handle the events.
    ///
    /// Calling this method may never panic!
    fn handle(&mut self, event_type: EventType, events: &[u8]) -> ProcessingStatus;
}

/// An error that can happen when the `HandlerFactory` was not able to create
/// a new handler. This will abort the consumption of the current stream.
#[derive(Debug, Fail)]
#[fail(display = "{}", message)]
pub struct CreateHandlerError {
    pub message: String,
}

/// A factory that creates `BatchHandler`s.
///
/// # Usage
///
/// A `HandlerFactory` can be used in two ways:
///
/// * It does not contain any state it shares with the created `BatchHandler`s.
/// This is useful when incoming data is partitioned in a way that all `BatchHandler`s
/// act only on data that never appears on another partition.
///
/// * It contains state that is shared with the `BatchHandler`s. E.g. a cache that
/// conatins data that can appear on other partitions.
/// # Example
///
/// ```rust
/// use std::sync::{Arc, Mutex};
///
/// use nakadion::{BatchHandler, CreateHandlerError, EventType, HandlerFactory, PartitionId,
///                ProcessingStatus};
///
/// // Use a struct to maintain state
/// struct MyHandler(Arc<Mutex<i32>>);
///
/// // Implement the processing logic by implementing `BatchHandler`
/// impl BatchHandler for MyHandler {
///     fn handle(&mut self, _event_type: EventType, _events: &[u8]) -> ProcessingStatus {
///         *self.0.lock().unwrap() += 1;
///         ProcessingStatus::processed_no_hint()
///     }
/// }
///
/// // We keep shared state for all handlers in the `HandlerFactory`
/// struct MyHandlerFactory(Arc<Mutex<i32>>);
///
/// // Now we implement the trait `HandlerFactory` to control how
/// // our `BatchHandler`s are created
/// impl HandlerFactory for MyHandlerFactory {
///     type Handler = MyHandler;
///     fn create_handler(
///         &self,
///         _partition: &PartitionId,
///     ) -> Result<Self::Handler, CreateHandlerError> {
///         Ok(MyHandler(self.0.clone()))
///     }
/// }
///
/// let count = Arc::new(Mutex::new(0));
///
/// let factory = MyHandlerFactory(count.clone());
///
/// // Handler creation will be done by Nakadion
/// let mut handler1 = factory.create_handler(&PartitionId::new("1")).unwrap();
/// let mut handler2 = factory.create_handler(&PartitionId::new("2")).unwrap();
///
/// // This will be done by Nakadion
/// let status1 = handler1.handle(EventType::new("test_event"), &[]);
///
/// assert_eq!(*count.lock().unwrap(), 1);
/// assert_eq!(status1, ProcessingStatus::Processed(None));
///
/// // This will be done by Nakadion
/// let status2 = handler2.handle(EventType::new("test_event"), &[]);
///
/// assert_eq!(*count.lock().unwrap(), 2);
/// assert_eq!(status2, ProcessingStatus::Processed(None));
/// ```
pub trait HandlerFactory {
    type Handler: BatchHandler + Send + 'static;
    fn create_handler(&self, partition: &PartitionId) -> Result<Self::Handler, CreateHandlerError>;
}

/// This is basically the same as a `ProcessingStatus` but returned
/// from a `TypedBatchHandler`.
///
/// It is not necessary to report the number of processed events since
/// the `TypedBatchHandler` itself keeps track of them.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TypedProcessingStatus {
    /// All events were processed and the cursor may be committed to
    /// make progress on the stream.
    Processed,
    /// Processing events failed and the stream should be aborted.
    Failed { reason: String },
}

/// Basically the same a `BatchHandler` with the difference that
/// deserialized events are passed to the processing logic.
///
/// This is basically a convinience handler.
///
/// The events must implement `serde`s `DeserializeOwned`.
///
/// # Hint
///
/// The `handle` method gets called on `&mut self`.
/// # Example
///
/// ```norun
/// /// use nakadion::{EventType, TypedBatchHandler, TypedProcessingStatus};
///
/// // Use a struct to maintain state
/// struct MyHandler {
///     pub count: i32,
/// }
///
/// #[derive(Deserialize)]
/// struct MyEvent(i32);
///
/// // Implement the processing logic by implementing `BatchHandler`
/// impl TypedBatchHandler for MyHandler {
///     type Event = MyEvent;
///
///     fn handle(&mut self, events: Vec<MyEvent>) -> TypedProcessingStatus {
///         for MyEvent(amount) in events {
///             self.count += amount;
///         }
///         TypedProcessingStatus::Processed
///     }
/// }
///
/// // Handler creation will be done by `HandlerFactory`
/// let mut handler = MyHandler { count: 0 };
///
/// // This will be done by Nakadion
/// handler.handle(vec![MyEvent(1), MyEvent(2)]);
///
/// assert_eq!(handler.count, 3);
/// ```
pub trait TypedBatchHandler {
    type Event: DeserializeOwned;
    /// Execute the processing logic with a deserialized batch of events.
    fn handle(&mut self, events: Vec<Self::Event>) -> TypedProcessingStatus;
}

impl<T, E> BatchHandler for T
where
    T: TypedBatchHandler<Event = E>,
    E: DeserializeOwned,
{
    fn handle(&mut self, event_type: EventType, events: &[u8]) -> ProcessingStatus {
        let events: Vec<E> = match serde_json::from_slice(events) {
            Ok(events) => events,
            Err(err) => {
                return ProcessingStatus::Failed {
                    reason: format!(
                        "Could not deserialize events(event type: {}): {}",
                        event_type.0, err
                    ),
                }
            }
        };

        let n = events.len();

        match TypedBatchHandler::handle(self, events) {
            TypedProcessingStatus::Processed => ProcessingStatus::processed(n),
            TypedProcessingStatus::Failed { reason } => ProcessingStatus::Failed { reason },
        }
    }
}