1use std::{str::Utf8Error, sync::Arc};
2
3use crate::{SeqNo, ShardId, StreamKey, Timestamp};
4
5#[derive(Debug, Clone, PartialEq, Eq, Hash)]
6pub struct OwnedMessage {
7 header: MessageHeader,
8 payload: Vec<u8>,
9}
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12pub struct SharedMessage {
14 header: MessageHeader,
15 bytes: Arc<Vec<u8>>,
16 offset: u32,
17 length: u32,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub struct Payload<'a> {
23 data: BytesOrStr<'a>,
24}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub enum BytesOrStr<'a> {
29 Bytes(&'a [u8]),
30 Str(&'a str),
31}
32
33pub trait IntoBytesOrStr<'a>
35where
36 Self: 'a,
37{
38 fn into_bytes_or_str(self) -> BytesOrStr<'a>;
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub struct MessageHeader {
44 stream_key: StreamKey,
45 shard_id: ShardId,
46 sequence: SeqNo,
47 timestamp: Timestamp,
48}
49
50pub trait Buffer {
52 fn size(&self) -> usize;
53
54 fn into_bytes(self) -> Vec<u8>;
55
56 fn as_bytes(&self) -> &[u8];
57
58 fn as_str(&self) -> Result<&str, Utf8Error>;
59}
60
61pub trait Message: Send {
63 fn stream_key(&self) -> StreamKey;
64
65 fn shard_id(&self) -> ShardId;
66
67 fn sequence(&self) -> SeqNo;
68
69 fn timestamp(&self) -> Timestamp;
70
71 fn message(&self) -> Payload;
72
73 fn to_owned(&self) -> SharedMessage {
74 SharedMessage::new(
75 MessageHeader::new(
76 self.stream_key(),
77 self.shard_id(),
78 self.sequence(),
79 self.timestamp(),
80 ),
81 self.message().into_bytes(),
82 0,
83 self.message().size(),
84 )
85 }
86
87 fn identifier(&self) -> (StreamKey, ShardId, SeqNo) {
89 (self.stream_key(), self.shard_id(), self.sequence())
90 }
91}
92
93impl OwnedMessage {
94 pub fn new(header: MessageHeader, payload: Vec<u8>) -> Self {
95 Self { header, payload }
96 }
97
98 pub fn header(&self) -> &MessageHeader {
99 &self.header
100 }
101
102 pub fn take(self) -> (MessageHeader, Vec<u8>) {
103 let Self { header, payload } = self;
104 (header, payload)
105 }
106
107 pub fn to_shared(self) -> SharedMessage {
108 let (header, payload) = self.take();
109 let size = payload.len();
110 SharedMessage::new(header, payload, 0, size)
111 }
112}
113
114impl SharedMessage {
115 pub fn new(header: MessageHeader, bytes: Vec<u8>, offset: usize, length: usize) -> Self {
116 assert!(offset <= bytes.len());
117 Self {
118 header,
119 bytes: Arc::new(bytes),
120 offset: offset as u32,
121 length: length as u32,
122 }
123 }
124
125 pub fn touch(&mut self) {
127 self.header.timestamp = Timestamp::now_utc();
128 }
129
130 pub fn header(&self) -> &MessageHeader {
131 &self.header
132 }
133
134 pub fn take_header(self) -> MessageHeader {
135 self.header
136 }
137
138 pub fn to_owned_message(self) -> OwnedMessage {
141 let payload = if self.offset == 0 && self.length as usize == self.bytes.len() {
142 Arc::try_unwrap(self.bytes).unwrap_or_else(|arc| (*arc).clone())
143 } else {
144 self.message().into_bytes()
145 };
146 OwnedMessage {
147 header: self.header,
148 payload,
149 }
150 }
151}
152
153impl Message for OwnedMessage {
154 fn stream_key(&self) -> StreamKey {
155 self.header.stream_key().clone()
156 }
157
158 fn shard_id(&self) -> ShardId {
159 *self.header.shard_id()
160 }
161
162 fn sequence(&self) -> SeqNo {
163 *self.header.sequence()
164 }
165
166 fn timestamp(&self) -> Timestamp {
167 *self.header.timestamp()
168 }
169
170 fn message(&self) -> Payload {
171 Payload {
172 data: BytesOrStr::Bytes(&self.payload),
173 }
174 }
175}
176
177impl Message for SharedMessage {
178 fn stream_key(&self) -> StreamKey {
179 self.header.stream_key().clone()
180 }
181
182 fn shard_id(&self) -> ShardId {
183 *self.header.shard_id()
184 }
185
186 fn sequence(&self) -> SeqNo {
187 *self.header.sequence()
188 }
189
190 fn timestamp(&self) -> Timestamp {
191 *self.header.timestamp()
192 }
193
194 fn message(&self) -> Payload {
195 Payload {
196 data: BytesOrStr::Bytes(
197 &self.bytes[self.offset as usize..(self.offset + self.length) as usize],
198 ),
199 }
200 }
201}
202
203impl MessageHeader {
204 pub fn new(
205 stream_key: StreamKey,
206 shard_id: ShardId,
207 sequence: SeqNo,
208 timestamp: Timestamp,
209 ) -> Self {
210 Self {
211 stream_key,
212 shard_id,
213 sequence,
214 timestamp,
215 }
216 }
217
218 pub fn stream_key(&self) -> &StreamKey {
219 &self.stream_key
220 }
221
222 pub fn shard_id(&self) -> &ShardId {
223 &self.shard_id
224 }
225
226 pub fn sequence(&self) -> &SeqNo {
227 &self.sequence
228 }
229
230 pub fn timestamp(&self) -> &Timestamp {
231 &self.timestamp
232 }
233}
234
235impl<'a> Buffer for Payload<'a> {
236 fn size(&self) -> usize {
237 self.data.len()
238 }
239
240 fn into_bytes(self) -> Vec<u8> {
241 match self.data {
242 BytesOrStr::Bytes(bytes) => bytes.into_bytes(),
243 BytesOrStr::Str(str) => str.into_bytes(),
244 }
245 }
246
247 fn as_bytes(&self) -> &[u8] {
248 match self.data {
249 BytesOrStr::Bytes(bytes) => bytes,
250 BytesOrStr::Str(str) => str.as_bytes(),
251 }
252 }
253
254 fn as_str(&self) -> Result<&str, Utf8Error> {
255 match &self.data {
256 BytesOrStr::Bytes(bytes) => bytes.as_str(),
257 BytesOrStr::Str(str) => Ok(str),
258 }
259 }
260}
261
262impl<'a> Buffer for &'a [u8] {
263 fn size(&self) -> usize {
264 self.len()
265 }
266
267 fn into_bytes(self) -> Vec<u8> {
268 self.to_owned()
269 }
270
271 fn as_bytes(&self) -> &[u8] {
272 self
273 }
274
275 fn as_str(&self) -> Result<&str, Utf8Error> {
276 std::str::from_utf8(self)
277 }
278}
279
280impl<'a> Buffer for &'a str {
281 fn size(&self) -> usize {
282 self.len()
283 }
284
285 fn into_bytes(self) -> Vec<u8> {
286 self.as_bytes().to_owned()
287 }
288
289 fn as_bytes(&self) -> &[u8] {
290 str::as_bytes(self)
291 }
292
293 fn as_str(&self) -> Result<&str, Utf8Error> {
294 Ok(self)
295 }
296}
297
298impl Buffer for String {
299 fn size(&self) -> usize {
300 self.len()
301 }
302
303 fn into_bytes(self) -> Vec<u8> {
304 String::into_bytes(self)
305 }
306
307 fn as_bytes(&self) -> &[u8] {
308 String::as_bytes(self)
309 }
310
311 fn as_str(&self) -> Result<&str, Utf8Error> {
312 Ok(self.as_str())
313 }
314}
315
316impl<'a> Payload<'a> {
317 pub fn new<D: IntoBytesOrStr<'a>>(data: D) -> Self {
318 Self {
319 data: data.into_bytes_or_str(),
320 }
321 }
322
323 #[cfg(feature = "json")]
324 #[cfg_attr(docsrs, doc(cfg(feature = "json")))]
325 pub fn deserialize_json<D: serde::de::DeserializeOwned>(&self) -> Result<D, crate::JsonErr> {
326 Ok(serde_json::from_str(self.as_str()?)?)
327 }
328}
329
330impl<'a> Default for Payload<'a> {
331 fn default() -> Self {
332 Self::new("")
333 }
334}
335
336impl<'a> BytesOrStr<'a> {
337 pub fn len(&self) -> usize {
338 match self {
339 BytesOrStr::Bytes(bytes) => bytes.len(),
340 BytesOrStr::Str(str) => str.len(),
341 }
342 }
343
344 pub fn is_empty(&self) -> bool {
345 self.len() == 0
346 }
347}
348
349impl<'a> IntoBytesOrStr<'a> for &'a str {
350 fn into_bytes_or_str(self) -> BytesOrStr<'a> {
351 BytesOrStr::Str(self)
352 }
353}
354
355impl<'a> IntoBytesOrStr<'a> for &'a [u8] {
356 fn into_bytes_or_str(self) -> BytesOrStr<'a> {
357 BytesOrStr::Bytes(self)
358 }
359}
360
361#[cfg(feature = "serde")]
362impl serde::Serialize for MessageHeader {
363 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
364 where
365 S: serde::Serializer,
366 {
367 #[derive(serde::Serialize)]
368 struct HeaderJson<'a> {
369 stream_key: &'a str,
370 shard_id: u64,
371 sequence: SeqNo,
372 timestamp: String,
373 }
374
375 HeaderJson {
376 timestamp: self
377 .timestamp
378 .format(crate::TIMESTAMP_FORMAT)
379 .expect("Timestamp format error"),
380 stream_key: self.stream_key.name(),
381 sequence: self.sequence,
382 shard_id: self.shard_id.id(),
383 }
384 .serialize(serializer)
385 }
386}