serf_types/
user_event.rs

1use byteorder::{ByteOrder, NetworkEndian};
2use memberlist_types::{bytes::Bytes, CheapClone, OneOrMore};
3use smol_str::SmolStr;
4use transformable::{BytesTransformError, StringTransformError, Transformable};
5
6use super::{LamportTime, LamportTimeTransformError};
7
8/// Used to buffer events to prevent re-delivery
9#[viewit::viewit(setters(prefix = "with"))]
10#[derive(Debug, Clone, Eq, PartialEq)]
11#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
12pub struct UserEvents {
13  /// The lamport time
14  #[viewit(
15    getter(const, attrs(doc = "Returns the lamport time for this message")),
16    setter(
17      const,
18      attrs(doc = "Sets the lamport time for this message (Builder pattern)")
19    )
20  )]
21  ltime: LamportTime,
22
23  /// The user events
24  #[viewit(
25    getter(const, style = "ref", attrs(doc = "Returns the user events")),
26    setter(attrs(doc = "Sets the user events (Builder pattern)"))
27  )]
28  events: OneOrMore<UserEvent>,
29}
30
31/// Error that can occur when transforming a [`UserEvents`]
32#[derive(Debug, thiserror::Error)]
33pub enum UserEventsTransformError {
34  /// Encode buffer too small
35  #[error("encode buffer too small")]
36  BufferTooSmall,
37  /// Not enough bytes to decode [`UserEvents`]
38  #[error("not enough bytes to decode `UserEvents`")]
39  NotEnoughBytes,
40  /// Error transforming [`UserEvent`]
41  #[error(transparent)]
42  Event(#[from] UserEventTransformError),
43  /// Error transforming [`LamportTime`]
44  #[error(transparent)]
45  LamportTime(#[from] LamportTimeTransformError),
46}
47
48impl Transformable for UserEvents {
49  type Error = UserEventsTransformError;
50
51  fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
52    let encoded_len = self.encoded_len();
53
54    if dst.len() < encoded_len {
55      return Err(Self::Error::BufferTooSmall);
56    }
57
58    let mut offset = 0;
59    NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
60    offset += 4;
61
62    offset += self.ltime.encode(&mut dst[offset..])?;
63    NetworkEndian::write_u32(&mut dst[offset..], self.events.len() as u32);
64    offset += 4;
65
66    for event in self.events.iter() {
67      offset += event.encode(&mut dst[offset..])?;
68    }
69
70    debug_assert_eq!(
71      offset, encoded_len,
72      "expect write {} bytes, actual read {} bytes",
73      encoded_len, offset
74    );
75
76    Ok(offset)
77  }
78
79  fn encoded_len(&self) -> usize {
80    4 + self.ltime.encoded_len()
81      + 4
82      + self
83        .events
84        .iter()
85        .map(UserEvent::encoded_len)
86        .sum::<usize>()
87  }
88
89  fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
90  where
91    Self: Sized,
92  {
93    let src_len = src.len();
94    if src_len < 4 {
95      return Err(Self::Error::NotEnoughBytes);
96    }
97
98    let len = NetworkEndian::read_u32(&src[0..4]) as usize;
99    if src_len < len {
100      return Err(Self::Error::NotEnoughBytes);
101    }
102
103    let mut offset = 4;
104    let (ltime_offset, ltime) = LamportTime::decode(&src[offset..])?;
105    offset += ltime_offset;
106
107    let event_len = NetworkEndian::read_u32(&src[offset..]) as usize;
108    offset += 4;
109
110    let mut events = OneOrMore::with_capacity(event_len);
111    for _ in 0..event_len {
112      let (event_offset, event) = UserEvent::decode(&src[offset..])?;
113      offset += event_offset;
114      events.push(event);
115    }
116
117    debug_assert_eq!(
118      offset, len,
119      "expect read {} bytes, actual read {} bytes",
120      len, offset
121    );
122
123    Ok((len, Self { ltime, events }))
124  }
125}
126
127/// Stores all the user events at a specific time
128#[viewit::viewit(getters(style = "ref"), setters(prefix = "with"))]
129#[derive(Debug, Clone, Eq, PartialEq)]
130#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
131pub struct UserEvent {
132  /// The name of the event
133  #[viewit(
134    getter(const, attrs(doc = "Returns the name of the event")),
135    setter(attrs(doc = "Sets the name of the event (Builder pattern)"))
136  )]
137  name: SmolStr,
138  /// The payload of the event
139  #[viewit(
140    getter(const, attrs(doc = "Returns the payload of the event")),
141    setter(attrs(doc = "Sets the payload of the event (Builder pattern)"))
142  )]
143  payload: Bytes,
144}
145
146/// Error that can occur when transforming a [`UserEvent`]
147#[derive(Debug, thiserror::Error)]
148pub enum UserEventTransformError {
149  /// Not enough bytes to decode UserEvent
150  #[error("not enough bytes to decode `UserEvent`")]
151  NotEnoughBytes,
152  /// Encode buffer too small
153  #[error("encode buffer too small")]
154  BufferTooSmall,
155
156  /// Error transforming SmolStr
157  #[error(transparent)]
158  Name(#[from] StringTransformError),
159
160  /// Error transforming Bytes
161  #[error(transparent)]
162  Payload(#[from] BytesTransformError),
163}
164
165impl Transformable for UserEvent {
166  type Error = UserEventTransformError;
167
168  fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
169    let encoded_len = self.encoded_len();
170    if dst.len() < encoded_len {
171      return Err(Self::Error::BufferTooSmall);
172    }
173
174    let mut offset = 0;
175    NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
176    offset += 4;
177
178    offset += self.name.encode(&mut dst[offset..])?;
179    offset += self.payload.encode(&mut dst[offset..])?;
180
181    debug_assert_eq!(
182      offset, encoded_len,
183      "expect write {} bytes, actual read {} bytes",
184      encoded_len, offset
185    );
186
187    Ok(offset)
188  }
189
190  fn encoded_len(&self) -> usize {
191    4 + self.name.encoded_len() + self.payload.encoded_len()
192  }
193
194  fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
195  where
196    Self: Sized,
197  {
198    let src_len = src.len();
199    if src_len < 4 {
200      return Err(Self::Error::NotEnoughBytes);
201    }
202
203    let len = NetworkEndian::read_u32(&src[0..4]) as usize;
204    if src_len < len {
205      return Err(Self::Error::NotEnoughBytes);
206    }
207
208    let mut offset = 4;
209    let (name_offset, name) = SmolStr::decode(&src[offset..])?;
210    offset += name_offset;
211    let (payload_offset, payload) = Bytes::decode(&src[offset..])?;
212    offset += payload_offset;
213
214    debug_assert_eq!(
215      offset, len,
216      "expect read {} bytes, actual read {} bytes",
217      len, offset
218    );
219
220    Ok((len, Self { name, payload }))
221  }
222}
223
224/// Used for user-generated events
225#[viewit::viewit(setters(prefix = "with"))]
226#[derive(Debug, Default, Clone, Eq, PartialEq)]
227#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
228pub struct UserEventMessage {
229  /// The lamport time
230  #[viewit(
231    getter(
232      const,
233      style = "move",
234      attrs(doc = "Returns the lamport time for this message")
235    ),
236    setter(
237      const,
238      attrs(doc = "Sets the lamport time for this message (Builder pattern)")
239    )
240  )]
241  ltime: LamportTime,
242  /// The name of the event
243  #[viewit(
244    getter(const, attrs(doc = "Returns the name of the event")),
245    setter(attrs(doc = "Sets the name of the event (Builder pattern)"))
246  )]
247  name: SmolStr,
248  /// The payload of the event
249  #[viewit(
250    getter(const, attrs(doc = "Returns the payload of the event")),
251    setter(attrs(doc = "Sets the payload of the event (Builder pattern)"))
252  )]
253  payload: Bytes,
254  /// "Can Coalesce".
255  #[viewit(
256    getter(
257      const,
258      style = "move",
259      attrs(doc = "Returns if this message can be coalesced")
260    ),
261    setter(
262      const,
263      attrs(doc = "Sets if this message can be coalesced (Builder pattern)")
264    )
265  )]
266  cc: bool,
267}
268
269impl CheapClone for UserEventMessage {
270  fn cheap_clone(&self) -> Self {
271    Self {
272      ltime: self.ltime,
273      name: self.name.cheap_clone(),
274      payload: self.payload.clone(),
275      cc: self.cc,
276    }
277  }
278}
279
280/// Error that can occur when transforming a [`UserEventMessage`]
281#[derive(Debug, thiserror::Error)]
282pub enum UserEventMessageTransformError {
283  /// Not enough bytes to decode UserEventMessage
284  #[error("not enough bytes to decode `UserEventMessage`")]
285  NotEnoughBytes,
286  /// Encode buffer too small
287  #[error("encode buffer too small")]
288  BufferTooSmall,
289
290  /// Error transforming LamportTime
291  #[error(transparent)]
292  LamportTime(#[from] LamportTimeTransformError),
293
294  /// Error transforming SmolStr
295  #[error(transparent)]
296  Name(#[from] StringTransformError),
297
298  /// Error transforming Bytes
299  #[error(transparent)]
300  Payload(#[from] BytesTransformError),
301}
302
303impl Transformable for UserEventMessage {
304  type Error = UserEventMessageTransformError;
305
306  fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
307    let encoded_len = self.encoded_len();
308    if dst.len() < encoded_len {
309      return Err(Self::Error::BufferTooSmall);
310    }
311
312    let mut offset = 0;
313    NetworkEndian::write_u32(&mut dst[offset..], encoded_len as u32);
314    offset += 4;
315    dst[offset] = self.cc as u8;
316    offset += 1;
317    offset += self.ltime.encode(&mut dst[offset..])?;
318    offset += self.name.encode(&mut dst[offset..])?;
319    offset += self.payload.encode(&mut dst[offset..])?;
320
321    debug_assert_eq!(
322      offset, encoded_len,
323      "expect write {} bytes, actual read {} bytes",
324      encoded_len, offset
325    );
326
327    Ok(offset)
328  }
329
330  fn encoded_len(&self) -> usize {
331    4 + self.ltime.encoded_len() + self.name.encoded_len() + self.payload.encoded_len() + 1
332  }
333
334  fn decode(src: &[u8]) -> Result<(usize, Self), Self::Error>
335  where
336    Self: Sized,
337  {
338    let src_len = src.len();
339    if src_len < 4 {
340      return Err(Self::Error::NotEnoughBytes);
341    }
342
343    let len = NetworkEndian::read_u32(&src[0..4]) as usize;
344    if src_len < len {
345      return Err(Self::Error::NotEnoughBytes);
346    }
347
348    let mut offset = 4;
349    let cc = src[offset] != 0;
350    offset += 1;
351    let (ltime_offset, ltime) = LamportTime::decode(&src[offset..])?;
352    offset += ltime_offset;
353    let (name_offset, name) = SmolStr::decode(&src[offset..])?;
354    offset += name_offset;
355    let (payload_offset, payload) = Bytes::decode(&src[offset..])?;
356    offset += payload_offset;
357
358    debug_assert_eq!(
359      offset, len,
360      "expect read {} bytes, actual read {} bytes",
361      len, offset
362    );
363
364    Ok((
365      len,
366      Self {
367        ltime,
368        name,
369        payload,
370        cc,
371      },
372    ))
373  }
374}
375
376#[cfg(test)]
377mod tests {
378  use rand::{distributions::Alphanumeric, random, Rng};
379
380  use super::*;
381
382  impl UserEvent {
383    fn random(size: usize) -> Self {
384      let rng = rand::thread_rng();
385      let name = rng
386        .sample_iter(&Alphanumeric)
387        .take(size)
388        .collect::<Vec<u8>>();
389      let name = String::from_utf8(name).unwrap();
390
391      let rng = rand::thread_rng();
392      let payload = rng
393        .sample_iter(&Alphanumeric)
394        .take(size)
395        .collect::<Vec<u8>>();
396
397      Self {
398        name: name.into(),
399        payload: payload.into(),
400      }
401    }
402  }
403
404  impl UserEvents {
405    pub(crate) fn random(size: usize, num_events: usize) -> Self {
406      let mut events = OneOrMore::with_capacity(num_events);
407      for _ in 0..num_events {
408        events.push(UserEvent::random(size));
409      }
410
411      Self {
412        ltime: LamportTime::random(),
413        events,
414      }
415    }
416  }
417
418  impl UserEventMessage {
419    fn random(size: usize) -> Self {
420      let rng = rand::thread_rng();
421      let name = rng
422        .sample_iter(&Alphanumeric)
423        .take(size)
424        .collect::<Vec<u8>>();
425      let name = String::from_utf8(name).unwrap();
426
427      let rng = rand::thread_rng();
428      let payload = rng
429        .sample_iter(&Alphanumeric)
430        .take(size)
431        .collect::<Vec<u8>>();
432
433      Self {
434        ltime: LamportTime::random(),
435        name: name.into(),
436        payload: payload.into(),
437        cc: random(),
438      }
439    }
440  }
441
442  #[test]
443  fn test_user_event_transform() {
444    futures::executor::block_on(async {
445      for i in 0..100 {
446        let event = UserEvent::random(i);
447        let mut buf = vec![0; event.encoded_len()];
448        let encoded_len = event.encode(&mut buf).unwrap();
449        assert_eq!(encoded_len, event.encoded_len());
450
451        let (decoded_len, decoded) = UserEvent::decode(&buf).unwrap();
452        assert_eq!(decoded_len, encoded_len);
453        assert_eq!(decoded, event);
454
455        let (decoded_len, decoded) =
456          UserEvent::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
457        assert_eq!(decoded_len, encoded_len);
458        assert_eq!(decoded, event);
459
460        let (decoded_len, decoded) =
461          UserEvent::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
462            .await
463            .unwrap();
464        assert_eq!(decoded_len, encoded_len);
465        assert_eq!(decoded, event);
466      }
467    })
468  }
469
470  #[test]
471  fn test_user_events_transform() {
472    futures::executor::block_on(async {
473      for i in 0..100 {
474        let events = UserEvents::random(i, i % 10);
475        let mut buf = vec![0; events.encoded_len()];
476        let encoded_len = events.encode(&mut buf).unwrap();
477        assert_eq!(encoded_len, events.encoded_len());
478
479        let (decoded_len, decoded) = UserEvents::decode(&buf).unwrap();
480        assert_eq!(decoded_len, encoded_len);
481        assert_eq!(decoded, events);
482
483        let (decoded_len, decoded) =
484          UserEvents::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
485        assert_eq!(decoded_len, encoded_len);
486        assert_eq!(decoded, events);
487
488        let (decoded_len, decoded) =
489          UserEvents::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
490            .await
491            .unwrap();
492        assert_eq!(decoded_len, encoded_len);
493        assert_eq!(decoded, events);
494      }
495    })
496  }
497
498  #[test]
499  fn test_user_event_message_transform() {
500    futures::executor::block_on(async {
501      for i in 0..100 {
502        let event = UserEventMessage::random(i);
503        let mut buf = vec![0; event.encoded_len()];
504        let encoded_len = event.encode(&mut buf).unwrap();
505        assert_eq!(encoded_len, event.encoded_len());
506
507        let (decoded_len, decoded) = UserEventMessage::decode(&buf).unwrap();
508        assert_eq!(decoded_len, encoded_len);
509        assert_eq!(decoded, event);
510
511        let (decoded_len, decoded) =
512          UserEventMessage::decode_from_reader(&mut std::io::Cursor::new(&buf)).unwrap();
513        assert_eq!(decoded_len, encoded_len);
514        assert_eq!(decoded, event);
515
516        let (decoded_len, decoded) =
517          UserEventMessage::decode_from_async_reader(&mut futures::io::Cursor::new(&buf))
518            .await
519            .unwrap();
520        assert_eq!(decoded_len, encoded_len);
521        assert_eq!(decoded, event);
522      }
523    })
524  }
525}