intrepid_model/events/
event_record.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use bytes::Bytes;
use intrepid_core::{Frame, MessageFrame};
use uuid::Uuid;

use super::{Event, EventKind, IntoEvent};

/// A record of an event. This is designed to represent events in a log, and so
/// it has more fields than a `CacheRecord`.
///
/// - The `stream_name` field is the name of the stream that the event is in.
/// - The `id` field is a unique identifier for the event.
/// - The `kind` field is the kind of event.
/// - The `data` field is the main data of the event.
/// - The `meta` field is for additional information that may be useful, like
///   timestamps or other bits that might not make sense to store in the `data`
///   field.
/// - The `position` field is the position of the event in the stream, and is
///   used to order events in the log.
///
/// Event records are stored, not ephemeral, but they're designed to easily be
/// turned into frames and back again. This means that in a system of high traffic,
/// it might be better to use frames directly, and then convert them to records
/// only at the point where they absolutely need to be stored or distributed.
///
#[derive(Clone, Debug, Default)]
pub struct EventRecord {
    /// The name of the stream that the event is in.
    pub stream_name: String,
    /// The unique identifier of the event.
    pub id: Uuid,
    /// The kind of event.
    pub kind: EventKind,
    /// The main data of the event.
    pub data: Bytes,
    /// Any additional metadata.
    pub meta: Bytes,
    // TODO: i64 sucks for consumers. Can it be usize without losing any guarantees/memory advantages?
    /// The position of the event in the stream.
    pub position: i64,
}
impl EventRecord {
    /// Create a new event record.
    pub fn new(
        stream_name: impl AsRef<str>,
        kind: EventKind,
        data: impl Into<Bytes>,
        meta: impl Into<Bytes>,
        position: impl Into<i64>,
    ) -> Self {
        Self {
            stream_name: stream_name.as_ref().to_owned(),
            id: Uuid::new_v4(),
            kind,
            data: data.into(),
            meta: meta.into(),
            position: position.into(),
        }
    }

    /// Create a new entry event record.
    pub fn entry(
        stream_name: impl AsRef<str>,
        data: impl Into<Bytes>,
        meta: impl Into<Bytes>,
        position: impl Into<i64>,
    ) -> Self {
        Self::new(stream_name, EventKind::Entry, data, meta, position)
    }

    /// Create a new marker event record.
    pub fn marker(
        stream_name: impl AsRef<str>,
        data: impl Into<Bytes>,
        meta: impl Into<Bytes>,
        position: impl Into<i64>,
    ) -> Self {
        Self::new(stream_name, EventKind::Marker, data, meta, position)
    }

    /// Set the position of the event in the stream.
    pub fn set_position(mut self, position: impl Into<i64>) -> Self {
        self.position = position.into();
        self
    }
}

impl IntoEvent for EventRecord {
    fn into_event(self) -> Event {
        Event::new(self.stream_name, self.kind, self.data)
    }
}

impl From<EventRecord> for Frame {
    fn from(record: EventRecord) -> Self {
        Self::Message(MessageFrame {
            uri: record.stream_name,
            data: record.data,
            meta: record.meta,
        })
    }
}

impl From<Event> for EventRecord {
    fn from(event: Event) -> Self {
        Self::new(event.stream_name, event.kind, event.data, Bytes::new(), 0)
    }
}

#[test]
fn creating_event_records() {
    let record = EventRecord::entry("stream-name", "arbitrary data", "", 1);

    assert_ne!(record.id, Uuid::nil());
    assert_eq!(record.stream_name, "stream-name");
    assert_eq!(record.kind, EventKind::Entry);
    assert_eq!(record.data, Bytes::from("arbitrary data"));
    assert_eq!(record.meta, Bytes::new());
    assert_eq!(record.position, 1);

    let record = EventRecord::marker("stream-name", "arbitrary data", "", 1);

    assert_ne!(record.id, Uuid::nil());
    assert_eq!(record.stream_name, "stream-name");
    assert_eq!(record.kind, EventKind::Marker);
    assert_eq!(record.data, Bytes::from("arbitrary data"));
    assert_eq!(record.meta, Bytes::new());
    assert_eq!(record.position, 1);

    let record = EventRecord::new("stream-name", EventKind::Entry, "arbitrary data", "", 1);

    assert_ne!(record.id, Uuid::nil());
    assert_eq!(record.stream_name, "stream-name");
    assert_eq!(record.kind, EventKind::Entry);
    assert_eq!(record.data, Bytes::from("arbitrary data"));
    assert_eq!(record.meta, Bytes::new());
    assert_eq!(record.position, 1);
}

#[test]
fn event_record_into_event() {
    let record = EventRecord {
        stream_name: "stream-name".into(),
        id: Uuid::new_v4(),
        kind: EventKind::Entry,
        data: Bytes::from("arbitrary data"),
        meta: Bytes::from("arbitrary meta"),
        position: 1,
    };

    let event = record.clone().into_event();

    assert_eq!(event.stream_name, record.stream_name);
    assert_eq!(event.kind, record.kind);
    assert_eq!(event.data, record.data);
}