sea_streamer_types/
message.rs

1use std::{str::Utf8Error, sync::Arc};
2
3use crate::{SeqNo, ShardId, StreamKey, Timestamp};
4
5#[derive(Debug, Clone, PartialEq, Eq, Hash)]
6pub struct OwnedMessage {
7    header: MessageHeader,
8    payload: Vec<u8>,
9}
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12/// It uses an `Arc` to hold the bytes, so is cheap to clone.
13pub struct SharedMessage {
14    header: MessageHeader,
15    bytes: Arc<Vec<u8>>,
16    offset: u32,
17    length: u32,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21/// The payload of a message.
22pub struct Payload<'a> {
23    data: BytesOrStr<'a>,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27/// Bytes or Str. Being an `str` means the data is UTF-8 valid.
28pub enum BytesOrStr<'a> {
29    Bytes(&'a [u8]),
30    Str(&'a str),
31}
32
33/// Types that be converted into [`BytesOrStr`].
34pub trait IntoBytesOrStr<'a>
35where
36    Self: 'a,
37{
38    fn into_bytes_or_str(self) -> BytesOrStr<'a>;
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42/// Metadata associated with a message.
43pub struct MessageHeader {
44    stream_key: StreamKey,
45    shard_id: ShardId,
46    sequence: SeqNo,
47    timestamp: Timestamp,
48}
49
50/// Common interface of byte containers.
51pub trait Buffer {
52    fn size(&self) -> usize;
53
54    fn into_bytes(self) -> Vec<u8>;
55
56    fn as_bytes(&self) -> &[u8];
57
58    fn as_str(&self) -> Result<&str, Utf8Error>;
59}
60
61/// Common interface of messages, to be implemented by all backends.
62pub trait Message: Send {
63    fn stream_key(&self) -> StreamKey;
64
65    fn shard_id(&self) -> ShardId;
66
67    fn sequence(&self) -> SeqNo;
68
69    fn timestamp(&self) -> Timestamp;
70
71    fn message(&self) -> Payload;
72
73    fn to_owned(&self) -> SharedMessage {
74        SharedMessage::new(
75            MessageHeader::new(
76                self.stream_key(),
77                self.shard_id(),
78                self.sequence(),
79                self.timestamp(),
80            ),
81            self.message().into_bytes(),
82            0,
83            self.message().size(),
84        )
85    }
86
87    /// tuple to uniquely identify a message
88    fn identifier(&self) -> (StreamKey, ShardId, SeqNo) {
89        (self.stream_key(), self.shard_id(), self.sequence())
90    }
91}
92
93impl OwnedMessage {
94    pub fn new(header: MessageHeader, payload: Vec<u8>) -> Self {
95        Self { header, payload }
96    }
97
98    pub fn header(&self) -> &MessageHeader {
99        &self.header
100    }
101
102    pub fn take(self) -> (MessageHeader, Vec<u8>) {
103        let Self { header, payload } = self;
104        (header, payload)
105    }
106
107    pub fn to_shared(self) -> SharedMessage {
108        let (header, payload) = self.take();
109        let size = payload.len();
110        SharedMessage::new(header, payload, 0, size)
111    }
112}
113
114impl SharedMessage {
115    pub fn new(header: MessageHeader, bytes: Vec<u8>, offset: usize, length: usize) -> Self {
116        assert!(offset <= bytes.len());
117        Self {
118            header,
119            bytes: Arc::new(bytes),
120            offset: offset as u32,
121            length: length as u32,
122        }
123    }
124
125    /// Touch the timestamp to now
126    pub fn touch(&mut self) {
127        self.header.timestamp = Timestamp::now_utc();
128    }
129
130    pub fn header(&self) -> &MessageHeader {
131        &self.header
132    }
133
134    pub fn take_header(self) -> MessageHeader {
135        self.header
136    }
137
138    /// This will attempt to convert self into an OwnedMessage *without* copying,
139    /// if the bytes are not shared with any other.
140    pub fn to_owned_message(self) -> OwnedMessage {
141        let payload = if self.offset == 0 && self.length as usize == self.bytes.len() {
142            Arc::try_unwrap(self.bytes).unwrap_or_else(|arc| (*arc).clone())
143        } else {
144            self.message().into_bytes()
145        };
146        OwnedMessage {
147            header: self.header,
148            payload,
149        }
150    }
151}
152
153impl Message for OwnedMessage {
154    fn stream_key(&self) -> StreamKey {
155        self.header.stream_key().clone()
156    }
157
158    fn shard_id(&self) -> ShardId {
159        *self.header.shard_id()
160    }
161
162    fn sequence(&self) -> SeqNo {
163        *self.header.sequence()
164    }
165
166    fn timestamp(&self) -> Timestamp {
167        *self.header.timestamp()
168    }
169
170    fn message(&self) -> Payload {
171        Payload {
172            data: BytesOrStr::Bytes(&self.payload),
173        }
174    }
175}
176
177impl Message for SharedMessage {
178    fn stream_key(&self) -> StreamKey {
179        self.header.stream_key().clone()
180    }
181
182    fn shard_id(&self) -> ShardId {
183        *self.header.shard_id()
184    }
185
186    fn sequence(&self) -> SeqNo {
187        *self.header.sequence()
188    }
189
190    fn timestamp(&self) -> Timestamp {
191        *self.header.timestamp()
192    }
193
194    fn message(&self) -> Payload {
195        Payload {
196            data: BytesOrStr::Bytes(
197                &self.bytes[self.offset as usize..(self.offset + self.length) as usize],
198            ),
199        }
200    }
201}
202
203impl MessageHeader {
204    pub fn new(
205        stream_key: StreamKey,
206        shard_id: ShardId,
207        sequence: SeqNo,
208        timestamp: Timestamp,
209    ) -> Self {
210        Self {
211            stream_key,
212            shard_id,
213            sequence,
214            timestamp,
215        }
216    }
217
218    pub fn stream_key(&self) -> &StreamKey {
219        &self.stream_key
220    }
221
222    pub fn shard_id(&self) -> &ShardId {
223        &self.shard_id
224    }
225
226    pub fn sequence(&self) -> &SeqNo {
227        &self.sequence
228    }
229
230    pub fn timestamp(&self) -> &Timestamp {
231        &self.timestamp
232    }
233}
234
235impl<'a> Buffer for Payload<'a> {
236    fn size(&self) -> usize {
237        self.data.len()
238    }
239
240    fn into_bytes(self) -> Vec<u8> {
241        match self.data {
242            BytesOrStr::Bytes(bytes) => bytes.into_bytes(),
243            BytesOrStr::Str(str) => str.into_bytes(),
244        }
245    }
246
247    fn as_bytes(&self) -> &[u8] {
248        match self.data {
249            BytesOrStr::Bytes(bytes) => bytes,
250            BytesOrStr::Str(str) => str.as_bytes(),
251        }
252    }
253
254    fn as_str(&self) -> Result<&str, Utf8Error> {
255        match &self.data {
256            BytesOrStr::Bytes(bytes) => bytes.as_str(),
257            BytesOrStr::Str(str) => Ok(str),
258        }
259    }
260}
261
262impl<'a> Buffer for &'a [u8] {
263    fn size(&self) -> usize {
264        self.len()
265    }
266
267    fn into_bytes(self) -> Vec<u8> {
268        self.to_owned()
269    }
270
271    fn as_bytes(&self) -> &[u8] {
272        self
273    }
274
275    fn as_str(&self) -> Result<&str, Utf8Error> {
276        std::str::from_utf8(self)
277    }
278}
279
280impl<'a> Buffer for &'a str {
281    fn size(&self) -> usize {
282        self.len()
283    }
284
285    fn into_bytes(self) -> Vec<u8> {
286        self.as_bytes().to_owned()
287    }
288
289    fn as_bytes(&self) -> &[u8] {
290        str::as_bytes(self)
291    }
292
293    fn as_str(&self) -> Result<&str, Utf8Error> {
294        Ok(self)
295    }
296}
297
298impl Buffer for String {
299    fn size(&self) -> usize {
300        self.len()
301    }
302
303    fn into_bytes(self) -> Vec<u8> {
304        String::into_bytes(self)
305    }
306
307    fn as_bytes(&self) -> &[u8] {
308        String::as_bytes(self)
309    }
310
311    fn as_str(&self) -> Result<&str, Utf8Error> {
312        Ok(self.as_str())
313    }
314}
315
316impl<'a> Payload<'a> {
317    pub fn new<D: IntoBytesOrStr<'a>>(data: D) -> Self {
318        Self {
319            data: data.into_bytes_or_str(),
320        }
321    }
322
323    #[cfg(feature = "json")]
324    #[cfg_attr(docsrs, doc(cfg(feature = "json")))]
325    pub fn deserialize_json<D: serde::de::DeserializeOwned>(&self) -> Result<D, crate::JsonErr> {
326        Ok(serde_json::from_str(self.as_str()?)?)
327    }
328}
329
330impl<'a> Default for Payload<'a> {
331    fn default() -> Self {
332        Self::new("")
333    }
334}
335
336impl<'a> BytesOrStr<'a> {
337    pub fn len(&self) -> usize {
338        match self {
339            BytesOrStr::Bytes(bytes) => bytes.len(),
340            BytesOrStr::Str(str) => str.len(),
341        }
342    }
343
344    pub fn is_empty(&self) -> bool {
345        self.len() == 0
346    }
347}
348
349impl<'a> IntoBytesOrStr<'a> for &'a str {
350    fn into_bytes_or_str(self) -> BytesOrStr<'a> {
351        BytesOrStr::Str(self)
352    }
353}
354
355impl<'a> IntoBytesOrStr<'a> for &'a [u8] {
356    fn into_bytes_or_str(self) -> BytesOrStr<'a> {
357        BytesOrStr::Bytes(self)
358    }
359}
360
361#[cfg(feature = "serde")]
362impl serde::Serialize for MessageHeader {
363    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
364    where
365        S: serde::Serializer,
366    {
367        #[derive(serde::Serialize)]
368        struct HeaderJson<'a> {
369            stream_key: &'a str,
370            shard_id: u64,
371            sequence: SeqNo,
372            timestamp: String,
373        }
374
375        HeaderJson {
376            timestamp: self
377                .timestamp
378                .format(crate::TIMESTAMP_FORMAT)
379                .expect("Timestamp format error"),
380            stream_key: self.stream_key.name(),
381            sequence: self.sequence,
382            shard_id: self.shard_id.id(),
383        }
384        .serialize(serializer)
385    }
386}