1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
//! Publishable events
pub use crate::{event_type::EventTypeName, partition::PartitionId, FlowId};

use chrono::{DateTime, Utc};
use serde::Serialize;

pub use super::{DataOp, DataType, EventId};

/// A `DataChangeEvent` template for publishing of events
///
/// See also [Nakadi Manual](https://nakadi.io/manual.html#definition_AuthorizationAttribute)
#[derive(Debug, Clone, Serialize)]
pub struct DataChangeEventPub<T> {
    pub data: T,
    pub data_type: DataType,
    pub data_op: DataOp,
    pub metadata: EventMetaDataPub,
}

impl<T> From<super::DataChangeEvent<T>> for DataChangeEventPub<T> {
    fn from(e: super::DataChangeEvent<T>) -> Self {
        Self {
            data: e.data,
            data_type: e.data_type,
            data_op: e.data_op,
            metadata: e.metadata.into(),
        }
    }
}

/// A `BusinessEvent` template for publishing of events
///
/// See also [Nakadi Manual](https://nakadi.io/manual.html#definition_DataChangeEvent)
#[derive(Debug, Clone, Serialize)]
pub struct BusinessEventPub<T> {
    #[serde(flatten)]
    pub data: T,
    pub metadata: EventMetaDataPub,
}

impl<T> From<super::BusinessEvent<T>> for BusinessEventPub<T> {
    fn from(e: super::BusinessEvent<T>) -> Self {
        Self {
            data: e.data,
            metadata: e.metadata.into(),
        }
    }
}

/// Metadata of an event
///
/// See also [Nakadi Manual](https://nakadi.io/manual.html#definition_EventMetadata)
#[derive(Debug, Clone, Serialize)]
pub struct EventMetaDataPub {
    /// Identifier of this Event.
    pub eid: EventId,
    /// The EventType of this Event. This is enriched by Nakadi on reception of the Event
    /// based on the endpoint where the Producer sent the Event to.
    ///
    /// If provided MUST match the endpoint. Failure to do so will cause rejection of the
    /// Event.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub event_type: Option<EventTypeName>,
    /// Timestamp of creation of the Event generated by the producer.
    pub occurred_at: DateTime<Utc>,
    /// Event identifier of the Event that caused the generation of this Event.
    /// Set by the producer.
    #[serde(default)]
    pub parent_eids: Vec<EventId>,
    /// Indicates the partition assigned to this Event.
    ///
    /// Required to be set by the client if partition strategy of the EventType is
    /// ‘user_defined’.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub partition: Option<PartitionId>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub flow_id: Option<FlowId>,
}

impl EventMetaDataPub {
    pub fn new<T: Into<EventId>>(eid: T) -> Self {
        Self {
            eid: eid.into(),
            event_type: None,
            occurred_at: Utc::now(),
            parent_eids: Vec::new(),
            partition: None,
            flow_id: None,
        }
    }

    pub fn random_eid() -> Self {
        Self::new(EventId::random())
    }

    pub fn event_type<T: Into<EventTypeName>>(mut self, v: T) -> Self {
        self.event_type = Some(v.into());
        self
    }

    pub fn occurred_at<T: Into<DateTime<Utc>>>(mut self, v: T) -> Self {
        self.occurred_at = v.into();
        self
    }

    pub fn parent_eid<T: Into<EventId>>(mut self, v: T) -> Self {
        self.parent_eids.push(v.into());
        self
    }

    pub fn partition<T: Into<PartitionId>>(mut self, v: T) -> Self {
        self.partition = Some(v.into());
        self
    }

    pub fn flow_id<T: Into<FlowId>>(mut self, v: T) -> Self {
        self.flow_id = Some(v.into());
        self
    }
}

impl From<super::EventMetaData> for EventMetaDataPub {
    fn from(m: super::EventMetaData) -> Self {
        Self {
            eid: m.eid,
            event_type: Some(m.event_type),
            occurred_at: m.occurred_at,
            parent_eids: m.parent_eids,
            partition: None,
            flow_id: Some(m.flow_id),
        }
    }
}