1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
use std::error::Error as StdError;
use std::fmt;
use std::sync::Arc;

use bytes::Bytes;
use futures::{future::BoxFuture, stream::BoxStream};
use serde::Serialize;

use nakadi_types::model::event_type::*;
use nakadi_types::model::partition::*;
use nakadi_types::model::publishing::*;
use nakadi_types::model::subscription::*;
use nakadi_types::FlowId;

use dispatch_http_request::RemoteCallError;

pub use self::client::ApiClient;
pub use self::error::*;

mod client;
pub mod dispatch_http_request;
mod error;

pub type ApiFuture<'a, T> = BoxFuture<'a, Result<T, NakadiApiError>>;
pub type BytesStream = BoxStream<'static, Result<Bytes, IoError>>;

pub trait MonitoringApi {
    /// Deletes an EventType identified by its name.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types/name/cursor-distances_post)
    fn get_cursor_distances<T: Into<FlowId>>(
        &self,
        name: &EventTypeName,
        query: &CursorDistanceQuery,
        flow_id: T,
    ) -> ApiFuture<CursorDistanceResult>;

    /// Deletes an EventType identified by its name.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types/name/cursors-lag_post)
    fn get_cursor_lag<T: Into<FlowId>>(
        &self,
        name: &EventTypeName,
        cursors: &[Cursor],
        flow_id: T,
    ) -> ApiFuture<Vec<Partition>>;

    fn get_event_type_partitions<T: Into<FlowId>>(
        &self,
        name: &EventTypeName,
        flow_id: T,
    ) -> ApiFuture<Vec<Partition>>;
}

pub trait SchemaRegistryApi {
    /// Returns a list of all registered EventTypes
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types_get)
    fn list_event_types<T: Into<FlowId>>(&self, flow_id: FlowId) -> ApiFuture<Vec<EventType>>;

    /// Creates a new EventType.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types_post)
    fn create_event_type<T: Into<FlowId>>(
        &self,
        event_type: &EventType,
        flow_id: T,
    ) -> ApiFuture<()>;

    /// Returns the EventType identified by its name.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types/name_get)
    fn get_event_type<T: Into<FlowId>>(
        &self,
        name: &EventTypeName,
        flow_id: T,
    ) -> ApiFuture<EventType>;

    /// Updates the EventType identified by its name.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types/name_put)
    fn update_event_type<T: Into<FlowId>>(
        &self,
        name: &EventTypeName,
        event_type: &EventType,
        flow_id: T,
    ) -> ApiFuture<()>;

    /// Deletes an EventType identified by its name.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types/name_delete)
    fn delete_event_type<T: Into<FlowId>>(&self, name: &EventTypeName, flow_id: T)
        -> ApiFuture<()>;
}

/// Possible error variants returned from publishing events
#[derive(Debug)]
pub enum PublishFailure {
    /// The submitted events were unprocessable so none were published
    Unprocessable(BatchResponse),
    /// Only events failed.
    PartialFailure(BatchResponse),
    /// There was an error that was not `Unprocessable`
    Other(NakadiApiError),
}

impl StdError for PublishFailure {
    fn source(&self) -> Option<&(dyn StdError + 'static)> {
        match self {
            PublishFailure::Other(err) => err.source(),
            _ => None,
        }
    }
}

impl fmt::Display for PublishFailure {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            PublishFailure::Other(err) => write!(f, "{}", err)?,
            PublishFailure::PartialFailure(batch) => write!(f, "{}", batch)?,
            PublishFailure::Unprocessable(batch) => write!(f, "{}", batch)?,
        }

        Ok(())
    }
}

impl From<NakadiApiError> for PublishFailure {
    fn from(api_error: NakadiApiError) -> Self {
        Self::Other(api_error)
    }
}

impl From<RemoteCallError> for PublishFailure {
    fn from(remote_call_error: RemoteCallError) -> Self {
        let api_error = NakadiApiError::from(remote_call_error);
        Self::Other(api_error)
    }
}

type PublishFuture<'a> = BoxFuture<'a, Result<(), PublishFailure>>;

/// Publishes a batch of Events.
///
/// All items must be of the EventType identified by name.
///
/// Reception of Events will always respect the configuration of its EventType with respect to
/// validation, enrichment and partition. The steps performed on reception of incoming message
/// are:
///
/// 1.  Every validation rule specified for the EventType will be checked in order against the
///     incoming Events. Validation rules are evaluated in the order they are defined and the Event
///     is rejected in the first case of failure. If the offending validation rule provides
///     information about the violation it will be included in the BatchItemResponse. If the
///     EventType defines schema validation it will be performed at this moment. The size of each
///     Event will also be validated. The maximum size per Event is configured by the administrator.
///     We use the batch input to measure the size of events, so unnecessary spaces, tabs, and
///     carriage returns will count towards the event size.
///
/// 2.  Once the validation succeeded, the content of the Event is updated according to the
///     enrichment rules in the order the rules are defined in the EventType. No preexisting
///     value might be changed (even if added by an enrichment rule). Violations on this will force
///     the immediate rejection of the Event. The invalid overwrite attempt will be included in
///     the item’s BatchItemResponse object.
///
/// 3.  The incoming Event’s relative ordering is evaluated according to the rule on the
///     EventType. Failure to evaluate the rule will reject the Event.
///
///     Given the batched nature of this operation, any violation on validation or failures on
///     enrichment or partitioning will cause the whole batch to be rejected, i.e. none of its
///     elements are pushed to the underlying broker.
///
///     Failures on writing of specific partitions to the broker might influence other
///     partitions. Failures at this stage will fail only the affected partitions.
///
/// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types/name/events_post)
pub trait PublishApi {
    /// Publishes a batch of Events of this EventType. All items must be of the EventType
    /// identified by name.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/event-types/name/events_post)
    fn publish_events<E: Serialize, T: Into<FlowId>>(
        &self,
        event_type: &EventTypeName,
        events: &[E],
        flow_id: T,
    ) -> PublishFuture;
}

pub trait SubscriptionApi {
    /// This endpoint creates a subscription for EventTypes.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions_post)
    fn create_subscription<T: Into<FlowId>>(
        &self,
        input: &SubscriptionInput,
        flow_id: T,
    ) -> ApiFuture<Subscription>;

    /// Returns a subscription identified by id.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions/subscription_id_get)
    fn get_subscription<T: Into<FlowId>>(
        &self,
        id: SubscriptionId,
        flow_id: T,
    ) -> ApiFuture<Subscription>;

    /// This endpoint only allows to update the authorization section of a subscription.
    ///
    /// All other properties are immutable.
    /// This operation is restricted to subjects with administrative role.
    /// This call captures the timestamp of the update request.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions/subscription_id_put)
    fn update_auth<T: Into<FlowId>>(&self, input: &SubscriptionInput, flow_id: T) -> ApiFuture<()>;

    /// Deletes a subscription.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions/subscription_id_delete)
    fn delete_subscription<T: Into<FlowId>>(&self, id: SubscriptionId, flow_id: T)
        -> ApiFuture<()>;

    /// Exposes the currently committed offsets of a subscription.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions/subscription_id/cursors_get)
    fn get_committed_offsets<T: Into<FlowId>>(
        &self,
        id: SubscriptionId,
        flow_id: T,
    ) -> ApiFuture<Vec<SubscriptionCursor>>;

    /// Exposes statistics of specified subscription.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions/subscription_id/stats_get)
    fn get_subscription_stats<T: Into<FlowId>>(
        &self,
        id: SubscriptionId,
        show_time_lag: bool,
        flow_id: T,
    ) -> ApiFuture<Vec<SubscriptionEventTypeStats>>;

    /// Reset subscription offsets to specified values.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions/subscription_id/cursors_patch)
    fn reset_subscription_cursors<T: Into<FlowId>>(
        &self,
        id: SubscriptionId,
        cursors: &[SubscriptionCursor],
        flow_id: T,
    ) -> ApiFuture<()>;
}

pub trait SubscriptionCommitApi {
    /// Endpoint for committing offsets of the subscription.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions/subscription_id/cursors_post)
    fn commit_cursors<T: Into<FlowId>>(
        &self,
        id: SubscriptionId,
        stream: StreamId,
        cursors: &[SubscriptionCursor],
        flow_id: T,
    ) -> ApiFuture<CursorCommitResults>;
}

pub trait SubscriptionStreamApi {
    /// Starts a new stream for reading events from this subscription.
    ///
    /// Starts a new stream for reading events from this subscription. The minimal consumption unit is a partition, so
    /// it is possible to start as many streams as the total number of partitions in event-types of this subscription.
    /// The position of the consumption is managed by Nakadi. The client is required to commit the cursors he gets in
    /// a stream.
    ///
    /// If you create a stream without specifying the partitions to read from - Nakadi will automatically assign
    /// partitions to this new stream. By default Nakadi distributes partitions among clients trying to give an equal
    /// number of partitions to each client (the amount of data is not considered). This is default and the most common
    /// way to use streaming endpoint.
    ///
    /// It is also possible to directly request specific partitions to be delivered within the stream. If these
    /// partitions are already consumed by another stream of this subscription - Nakadi will trigger a rebalance that
    /// will assign these partitions to the new stream. The request will fail if user directly requests partitions that
    /// are already requested directly by another active stream of this subscription. The overall picture will be the
    /// following: streams which directly requested specific partitions will consume from them; streams that didn’t
    /// specify which partitions to consume will consume partitions that left - Nakadi will autobalance free partitions
    /// among these streams (balancing happens by number of partitions).
    ///
    /// Specifying partitions to consume is not a trivial way to consume as it will require additional coordination
    /// effort from the client application, that’s why it should only be used if such way of consumption should be
    /// implemented due to some specific requirements.
    ///
    /// Also, when using streams with directly assigned partitions, it is the user’s responsibility to detect, and react
    /// to, changes in the number of partitions in the subscription (following the re-partitioning of an event type).
    /// Using the GET /subscriptions/{subscription_id}/stats endpoint can be helpful.
    ///
    /// See also [Nakadi Manual](https://nakadi.io/manual.html#/subscriptions/subscription_id/events_post)
    fn request_stream<T: Into<FlowId>>(
        &self,
        subscription_id: SubscriptionId,
        parameters: &StreamParameters,
        flow_id: T,
    ) -> ApiFuture<SubscriptionStream>;
}

/// A stream of event type partitions from Nakadi
pub struct SubscriptionStream {
    pub stream_id: StreamId,
    pub stream: BytesStream,
}

impl SubscriptionStream {
    pub fn parts(self) -> (StreamId, BytesStream) {
        (self.stream_id, self.stream)
    }
}

pub trait NakadionEssentials:
    SubscriptionCommitApi + SubscriptionStreamApi + Send + Sync + 'static
{
}

impl<T> NakadionEssentials for T where
    T: SubscriptionCommitApi + SubscriptionStreamApi + Send + Sync + 'static
{
}