1use byteorder::{ByteOrder, NetworkEndian};
2use memberlist_types::{bytes::Bytes, CheapClone, OneOrMore};
3use smol_str::SmolStr;
4use transformable::{BytesTransformError, StringTransformError, Transformable};
5
6use super::{LamportTime, LamportTimeTransformError};
7
8#[viewit::viewit(setters(prefix = "with"))]
10#[derive(Debug, Clone, Eq, PartialEq)]
11#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
12pub struct UserEvents {
13 #[viewit(
15 getter(const, attrs(doc = "Returns the lamport time for this message")),
16 setter(
17 const,
18 attrs(doc = "Sets the lamport time for this message (Builder pattern)")
19 )
20 )]
21 ltime: LamportTime,
22
23 #[viewit(
25 getter(const, style = "ref", attrs(doc = "Returns the user events")),
26 setter(attrs(doc = "Sets the user events (Builder pattern)"))
27 )]
28 events: OneOrMore<UserEvent>,
29}
30
31#[derive(Debug, thiserror::Error)]
33pub enum UserEventsTransformError {
34 #[error("encode buffer too small")]
36 BufferTooSmall,
37 #[error("not enough bytes to decode `UserEvents`")]
39 NotEnoughBytes,
40 #[error(transparent)]
42 Event(#[from] UserEventTransformError),
43 #[error(transparent)]
45 LamportTime(#[from] LamportTimeTransformError),
46}
47
48impl Transformable for UserEvents {
49 type Error = UserEventsTransformError;
50
51 fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
52 let encoded_len = self.encoded_len();
53
54 if dst.len() < encoded_len {
55 return Err(Self::Error::BufferTooSmall);
56 }
57
58 let mut offset = 0;
59 NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
60 offset += 4;
61
62 offset += self.ltime.encode(&mut dst[offset..])?;
63 NetworkEndian::write_u32(&mut dst[offset..], self.events.len() as u32);
64 offset += 4;
65
66 for event in self.events.iter() {
67 offset += event.encode(&mut dst[offset..])?;
68 }
69
70 debug_assert_eq!(
71 offset, encoded_len,
72 "expect write {} bytes, actual read {} bytes",
73 encoded_len, offset
74 );
75
76 Ok(offset)
77 }
78
79 fn encoded_len(&self) -> usize {
80 4 + self.ltime.encoded_len()
81 + 4
82 + self
83 .events
84 .iter()
85 .map(UserEvent::encoded_len)
86 .sum::<usize>()
87 }
88
89 fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
90 where
91 Self: Sized,
92 {
93 let src_len = src.len();
94 if src_len < 4 {
95 return Err(Self::Error::NotEnoughBytes);
96 }
97
98 let len = NetworkEndian::read_u32(&src[0..4]) as usize;
99 if src_len < len {
100 return Err(Self::Error::NotEnoughBytes);
101 }
102
103 let mut offset = 4;
104 let (ltime_offset, ltime) = LamportTime::decode(&src[offset..])?;
105 offset += ltime_offset;
106
107 let event_len = NetworkEndian::read_u32(&src[offset..]) as usize;
108 offset += 4;
109
110 let mut events = OneOrMore::with_capacity(event_len);
111 for _ in 0..event_len {
112 let (event_offset, event) = UserEvent::decode(&src[offset..])?;
113 offset += event_offset;
114 events.push(event);
115 }
116
117 debug_assert_eq!(
118 offset, len,
119 "expect read {} bytes, actual read {} bytes",
120 len, offset
121 );
122
123 Ok((len, Self { ltime, events }))
124 }
125}
126
127#[viewit::viewit(getters(style = "ref"), setters(prefix = "with"))]
129#[derive(Debug, Clone, Eq, PartialEq)]
130#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
131pub struct UserEvent {
132 #[viewit(
134 getter(const, attrs(doc = "Returns the name of the event")),
135 setter(attrs(doc = "Sets the name of the event (Builder pattern)"))
136 )]
137 name: SmolStr,
138 #[viewit(
140 getter(const, attrs(doc = "Returns the payload of the event")),
141 setter(attrs(doc = "Sets the payload of the event (Builder pattern)"))
142 )]
143 payload: Bytes,
144}
145
146#[derive(Debug, thiserror::Error)]
148pub enum UserEventTransformError {
149 #[error("not enough bytes to decode `UserEvent`")]
151 NotEnoughBytes,
152 #[error("encode buffer too small")]
154 BufferTooSmall,
155
156 #[error(transparent)]
158 Name(#[from] StringTransformError),
159
160 #[error(transparent)]
162 Payload(#[from] BytesTransformError),
163}
164
165impl Transformable for UserEvent {
166 type Error = UserEventTransformError;
167
168 fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
169 let encoded_len = self.encoded_len();
170 if dst.len() < encoded_len {
171 return Err(Self::Error::BufferTooSmall);
172 }
173
174 let mut offset = 0;
175 NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
176 offset += 4;
177
178 offset += self.name.encode(&mut dst[offset..])?;
179 offset += self.payload.encode(&mut dst[offset..])?;
180
181 debug_assert_eq!(
182 offset, encoded_len,
183 "expect write {} bytes, actual read {} bytes",
184 encoded_len, offset
185 );
186
187 Ok(offset)
188 }
189
190 fn encoded_len(&self) -> usize {
191 4 + self.name.encoded_len() + self.payload.encoded_len()
192 }
193
194 fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
195 where
196 Self: Sized,
197 {
198 let src_len = src.len();
199 if src_len < 4 {
200 return Err(Self::Error::NotEnoughBytes);
201 }
202
203 let len = NetworkEndian::read_u32(&src[0..4]) as usize;
204 if src_len < len {
205 return Err(Self::Error::NotEnoughBytes);
206 }
207
208 let mut offset = 4;
209 let (name_offset, name) = SmolStr::decode(&src[offset..])?;
210 offset += name_offset;
211 let (payload_offset, payload) = Bytes::decode(&src[offset..])?;
212 offset += payload_offset;
213
214 debug_assert_eq!(
215 offset, len,
216 "expect read {} bytes, actual read {} bytes",
217 len, offset
218 );
219
220 Ok((len, Self { name, payload }))
221 }
222}
223
224#[viewit::viewit(setters(prefix = "with"))]
226#[derive(Debug, Default, Clone, Eq, PartialEq)]
227#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
228pub struct UserEventMessage {
229 #[viewit(
231 getter(
232 const,
233 style = "move",
234 attrs(doc = "Returns the lamport time for this message")
235 ),
236 setter(
237 const,
238 attrs(doc = "Sets the lamport time for this message (Builder pattern)")
239 )
240 )]
241 ltime: LamportTime,
242 #[viewit(
244 getter(const, attrs(doc = "Returns the name of the event")),
245 setter(attrs(doc = "Sets the name of the event (Builder pattern)"))
246 )]
247 name: SmolStr,
248 #[viewit(
250 getter(const, attrs(doc = "Returns the payload of the event")),
251 setter(attrs(doc = "Sets the payload of the event (Builder pattern)"))
252 )]
253 payload: Bytes,
254 #[viewit(
256 getter(
257 const,
258 style = "move",
259 attrs(doc = "Returns if this message can be coalesced")
260 ),
261 setter(
262 const,
263 attrs(doc = "Sets if this message can be coalesced (Builder pattern)")
264 )
265 )]
266 cc: bool,
267}
268
269impl CheapClone for UserEventMessage {
270 fn cheap_clone(&self) -> Self {
271 Self {
272 ltime: self.ltime,
273 name: self.name.cheap_clone(),
274 payload: self.payload.clone(),
275 cc: self.cc,
276 }
277 }
278}
279
280#[derive(Debug, thiserror::Error)]
282pub enum UserEventMessageTransformError {
283 #[error("not enough bytes to decode `UserEventMessage`")]
285 NotEnoughBytes,
286 #[error("encode buffer too small")]
288 BufferTooSmall,
289
290 #[error(transparent)]
292 LamportTime(#[from] LamportTimeTransformError),
293
294 #[error(transparent)]
296 Name(#[from] StringTransformError),
297
298 #[error(transparent)]
300 Payload(#[from] BytesTransformError),
301}
302
303impl Transformable for UserEventMessage {
304 type Error = UserEventMessageTransformError;
305
306 fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
307 let encoded_len = self.encoded_len();
308 if dst.len() < encoded_len {
309 return Err(Self::Error::BufferTooSmall);
310 }
311
312 let mut offset = 0;
313 NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
314 offset += 4;
315 dst[offset] = self.cc as u8;
316 offset += 1;
317 offset += self.ltime.encode(&mut dst[offset..])?;
318 offset += self.name.encode(&mut dst[offset..])?;
319 offset += self.payload.encode(&mut dst[offset..])?;
320
321 debug_assert_eq!(
322 offset, encoded_len,
323 "expect write {} bytes, actual read {} bytes",
324 encoded_len, offset
325 );
326
327 Ok(offset)
328 }
329
330 fn encoded_len(&self) -> usize {
331 4 + self.ltime.encoded_len() + self.name.encoded_len() + self.payload.encoded_len() + 1
332 }
333
334 fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
335 where
336 Self: Sized,
337 {
338 let src_len = src.len();
339 if src_len < 4 {
340 return Err(Self::Error::NotEnoughBytes);
341 }
342
343 let len = NetworkEndian::read_u32(&src[0..4]) as usize;
344 if src_len < len {
345 return Err(Self::Error::NotEnoughBytes);
346 }
347
348 let mut offset = 4;
349 let cc = src[offset] != 0;
350 offset += 1;
351 let (ltime_offset, ltime) = LamportTime::decode(&src[offset..])?;
352 offset += ltime_offset;
353 let (name_offset, name) = SmolStr::decode(&src[offset..])?;
354 offset += name_offset;
355 let (payload_offset, payload) = Bytes::decode(&src[offset..])?;
356 offset += payload_offset;
357
358 debug_assert_eq!(
359 offset, len,
360 "expect read {} bytes, actual read {} bytes",
361 len, offset
362 );
363
364 Ok((
365 len,
366 Self {
367 ltime,
368 name,
369 payload,
370 cc,
371 },
372 ))
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use rand::{distributions::Alphanumeric, random, Rng};
379
380 use super::*;
381
382 impl UserEvent {
383 fn random(size: usize) -> Self {
384 let rng = rand::thread_rng();
385 let name = rng
386 .sample_iter(&Alphanumeric)
387 .take(size)
388 .collect::<Vec<u8>>();
389 let name = String::from_utf8(name).unwrap();
390
391 let rng = rand::thread_rng();
392 let payload = rng
393 .sample_iter(&Alphanumeric)
394 .take(size)
395 .collect::<Vec<u8>>();
396
397 Self {
398 name: name.into(),
399 payload: payload.into(),
400 }
401 }
402 }
403
404 impl UserEvents {
405 pub(crate) fn random(size: usize, num_events: usize) -> Self {
406 let mut events = OneOrMore::with_capacity(num_events);
407 for _ in 0..num_events {
408 events.push(UserEvent::random(size));
409 }
410
411 Self {
412 ltime: LamportTime::random(),
413 events,
414 }
415 }
416 }
417
418 impl UserEventMessage {
419 fn random(size: usize) -> Self {
420 let rng = rand::thread_rng();
421 let name = rng
422 .sample_iter(&Alphanumeric)
423 .take(size)
424 .collect::<Vec<u8>>();
425 let name = String::from_utf8(name).unwrap();
426
427 let rng = rand::thread_rng();
428 let payload = rng
429 .sample_iter(&Alphanumeric)
430 .take(size)
431 .collect::<Vec<u8>>();
432
433 Self {
434 ltime: LamportTime::random(),
435 name: name.into(),
436 payload: payload.into(),
437 cc: random(),
438 }
439 }
440 }
441
442 #[test]
443 fn test_user_event_transform() {
444 futures::executor::block_on(async {
445 for i in 0..100 {
446 let event = UserEvent::random(i);
447 let mut buf = vec![0; event.encoded_len()];
448 let encoded_len = event.encode(&mut buf).unwrap();
449 assert_eq!(encoded_len, event.encoded_len());
450
451 let (decoded_len, decoded) = UserEvent::decode(&buf).unwrap();
452 assert_eq!(decoded_len, encoded_len);
453 assert_eq!(decoded, event);
454
455 let (decoded_len, decoded) =
456 UserEvent::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
457 assert_eq!(decoded_len, encoded_len);
458 assert_eq!(decoded, event);
459
460 let (decoded_len, decoded) =
461 UserEvent::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
462 .await
463 .unwrap();
464 assert_eq!(decoded_len, encoded_len);
465 assert_eq!(decoded, event);
466 }
467 })
468 }
469
470 #[test]
471 fn test_user_events_transform() {
472 futures::executor::block_on(async {
473 for i in 0..100 {
474 let events = UserEvents::random(i, i % 10);
475 let mut buf = vec![0; events.encoded_len()];
476 let encoded_len = events.encode(&mut buf).unwrap();
477 assert_eq!(encoded_len, events.encoded_len());
478
479 let (decoded_len, decoded) = UserEvents::decode(&buf).unwrap();
480 assert_eq!(decoded_len, encoded_len);
481 assert_eq!(decoded, events);
482
483 let (decoded_len, decoded) =
484 UserEvents::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
485 assert_eq!(decoded_len, encoded_len);
486 assert_eq!(decoded, events);
487
488 let (decoded_len, decoded) =
489 UserEvents::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
490 .await
491 .unwrap();
492 assert_eq!(decoded_len, encoded_len);
493 assert_eq!(decoded, events);
494 }
495 })
496 }
497
498 #[test]
499 fn test_user_event_message_transform() {
500 futures::executor::block_on(async {
501 for i in 0..100 {
502 let event = UserEventMessage::random(i);
503 let mut buf = vec![0; event.encoded_len()];
504 let encoded_len = event.encode(&mut buf).unwrap();
505 assert_eq!(encoded_len, event.encoded_len());
506
507 let (decoded_len, decoded) = UserEventMessage::decode(&buf).unwrap();
508 assert_eq!(decoded_len, encoded_len);
509 assert_eq!(decoded, event);
510
511 let (decoded_len, decoded) =
512 UserEventMessage::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
513 assert_eq!(decoded_len, encoded_len);
514 assert_eq!(decoded, event);
515
516 let (decoded_len, decoded) =
517 UserEventMessage::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
518 .await
519 .unwrap();
520 assert_eq!(decoded_len, encoded_len);
521 assert_eq!(decoded, event);
522 }
523 })
524 }
525}