1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
//! Some common types
use std::fmt;

use uuid::Uuid;

/// A `SubscriptionId` is used to guarantee a continous flow of events for
/// clients.
///
/// If an event type is streamed over multiple
/// partitioned multiple clients can consume
/// the event type.
///
/// For more information on event types and subscriptions
/// see [subscriptions](http://nakadi.io/manual.html#using_consuming-events-hila)
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SubscriptionId(pub String);

impl fmt::Display for SubscriptionId {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        write!(f, "{}", self.0)
    }
}

impl SubscriptionId {
    pub fn new<T: Into<String>>(id: T) -> SubscriptionId {
        SubscriptionId(id.into())
    }
}

/// A partition id that comes with a cursor retrieved from a batch.
///
/// A `PartitionId` is passed to a `HandlerFactory` when
/// creating a new `BatchHandler`.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PartitionId(pub String);

impl PartitionId {
    /// Create a new `PartitionId`
    pub fn new<T: Into<String>>(id: T) -> PartitionId {
        PartitionId(id.into())
    }
}

impl fmt::Display for PartitionId {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        write!(f, "{}", self.0)
    }
}

/// A `StreamId` identifies connection to a subscription.
///
/// It must be provided for committing
/// a `Cursor`.
///
/// For more information on event types and subscriptions
/// see [subscriptions](http://nakadi.io/manual.html#using_consuming-events-hila)
#[derive(Clone, Debug)]
pub struct StreamId(pub String);

impl StreamId {
    pub fn new<T: Into<String>>(id: T) -> Self {
        StreamId(id.into())
    }
}

impl fmt::Display for StreamId {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        write!(f, "{}", self.0)
    }
}

/// A `FlowId` helps when finding problems and tracing
/// workflows. It is usually submitted when interacting
/// with the Nakadi REST API but also contained
/// in received event metadata.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FlowId(pub String);

impl FlowId {
    pub fn new<T: Into<String>>(id: T) -> Self {
        FlowId(id.into())
    }
}

impl fmt::Display for FlowId {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        write!(f, "{}", self.0)
    }
}

impl Default for FlowId {
    fn default() -> FlowId {
        FlowId(Uuid::new_v4().to_string())
    }
}

/// Information on a current batch. This might be
/// useful for a `Handler` that wants to do checkpointing on its own.
#[derive(Clone, Debug)]
pub struct BatchCommitData<'a> {
    pub stream_id: StreamId,
    pub cursor: &'a [u8],
}

/// The [`Nakadi Event Type`](https://github.com/zalando/nakadi#creating-event-types).
/// Similiar to a topic.
#[derive(Clone, Debug)]
pub struct EventType<'a>(pub &'a str);

impl<'a> EventType<'a> {
    /// Creates a new instance of an
    /// [`EventType`](https://github.com/zalando/nakadi#creating-event-types).
    pub fn new(value: &'a str) -> EventType {
        EventType(value)
    }
}