serf_types/
message.rs

1use std::sync::Arc;
2
3use crate::{
4  JoinMessageTransformError, LeaveMessageTransformError, MemberTransformError,
5  PushPullMessageTransformError, QueryMessageTransformError, QueryResponseMessageTransformError,
6  UserEventMessageTransformError,
7};
8
9use super::{
10  Encodable, JoinMessage, LeaveMessage, Member, PushPullMessage, PushPullMessageRef, QueryMessage,
11  QueryResponseMessage, Transformable, UserEventMessage,
12};
13
14#[cfg(feature = "encryption")]
15use super::{KeyRequestMessage, KeyResponseMessage};
16
17const LEAVE_MESSAGE_TAG: u8 = 0;
18const JOIN_MESSAGE_TAG: u8 = 1;
19const PUSH_PULL_MESSAGE_TAG: u8 = 2;
20const USER_EVENT_MESSAGE_TAG: u8 = 3;
21const QUERY_MESSAGE_TAG: u8 = 4;
22const QUERY_RESPONSE_MESSAGE_TAG: u8 = 5;
23const CONFLICT_RESPONSE_MESSAGE_TAG: u8 = 6;
24const RELAY_MESSAGE_TAG: u8 = 7;
25#[cfg(feature = "encryption")]
26const KEY_REQUEST_MESSAGE_TAG: u8 = 253;
27#[cfg(feature = "encryption")]
28const KEY_RESPONSE_MESSAGE_TAG: u8 = 254;
29
30/// Unknown message type error
31#[derive(Debug, thiserror::Error)]
32#[error("unknown message type byte: {0}")]
33pub struct UnknownMessageType(u8);
34
35impl TryFrom<u8> for MessageType {
36  type Error = UnknownMessageType;
37
38  fn try_from(value: u8) -> Result<Self, Self::Error> {
39    Ok(match value {
40      LEAVE_MESSAGE_TAG => Self::Leave,
41      JOIN_MESSAGE_TAG => Self::Join,
42      PUSH_PULL_MESSAGE_TAG => Self::PushPull,
43      USER_EVENT_MESSAGE_TAG => Self::UserEvent,
44      QUERY_MESSAGE_TAG => Self::Query,
45      QUERY_RESPONSE_MESSAGE_TAG => Self::QueryResponse,
46      CONFLICT_RESPONSE_MESSAGE_TAG => Self::ConflictResponse,
47      RELAY_MESSAGE_TAG => Self::Relay,
48      #[cfg(feature = "encryption")]
49      KEY_REQUEST_MESSAGE_TAG => Self::KeyRequest,
50      #[cfg(feature = "encryption")]
51      KEY_RESPONSE_MESSAGE_TAG => Self::KeyResponse,
52      _ => return Err(UnknownMessageType(value)),
53    })
54  }
55}
56
57impl From<MessageType> for u8 {
58  fn from(value: MessageType) -> Self {
59    value as u8
60  }
61}
62
63/// The types of gossip messages Serf will send along
64/// memberlist.
65#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
66#[repr(u8)]
67#[non_exhaustive]
68pub enum MessageType {
69  /// Leave message
70  Leave = LEAVE_MESSAGE_TAG,
71  /// Join message
72  Join = JOIN_MESSAGE_TAG,
73  /// PushPull message
74  PushPull = PUSH_PULL_MESSAGE_TAG,
75  /// UserEvent message
76  UserEvent = USER_EVENT_MESSAGE_TAG,
77  /// Query message
78  Query = QUERY_MESSAGE_TAG,
79  /// QueryResponse message
80  QueryResponse = QUERY_RESPONSE_MESSAGE_TAG,
81  /// ConflictResponse message
82  ConflictResponse = CONFLICT_RESPONSE_MESSAGE_TAG,
83  /// Relay message
84  Relay = RELAY_MESSAGE_TAG,
85  /// KeyRequest message
86  #[cfg(feature = "encryption")]
87  KeyRequest = KEY_REQUEST_MESSAGE_TAG,
88  /// KeyResponse message
89  #[cfg(feature = "encryption")]
90  KeyResponse = KEY_RESPONSE_MESSAGE_TAG,
91}
92
93impl MessageType {
94  /// Get the string representation of the message type
95  #[inline]
96  pub const fn as_str(&self) -> &'static str {
97    match self {
98      Self::Leave => "leave",
99      Self::Join => "join",
100      Self::PushPull => "push pull",
101      Self::UserEvent => "user event",
102      Self::Query => "query",
103      Self::QueryResponse => "query response",
104      Self::ConflictResponse => "conflict response",
105      Self::Relay => "relay",
106      #[cfg(feature = "encryption")]
107      Self::KeyRequest => "key request",
108      #[cfg(feature = "encryption")]
109      Self::KeyResponse => "key response",
110    }
111  }
112}
113
114/// Used to do a cheap reference to message reference conversion.
115pub trait AsMessageRef<I, A> {
116  /// Converts this type into a shared reference of the (usually inferred) input type.
117  fn as_message_ref(&self) -> SerfMessageRef<'_, I, A>;
118}
119
120/// The reference type of [`SerfMessage`].
121#[derive(Debug)]
122pub enum SerfMessageRef<'a, I, A> {
123  /// Leave message reference
124  Leave(&'a LeaveMessage<I>),
125  /// Join message reference
126  Join(&'a JoinMessage<I>),
127  /// PushPull message reference
128  PushPull(PushPullMessageRef<'a, I>),
129  /// UserEvent message reference
130  UserEvent(&'a UserEventMessage),
131  /// Query message reference
132  Query(&'a QueryMessage<I, A>),
133  /// QueryResponse message reference
134  QueryResponse(&'a QueryResponseMessage<I, A>),
135  /// ConflictResponse message reference
136  ConflictResponse(&'a Member<I, A>),
137  /// KeyRequest message reference
138  #[cfg(feature = "encryption")]
139  KeyRequest(&'a KeyRequestMessage),
140  /// KeyResponse message reference
141  #[cfg(feature = "encryption")]
142  KeyResponse(&'a KeyResponseMessage),
143}
144
145impl<I, A> Clone for SerfMessageRef<'_, I, A> {
146  fn clone(&self) -> Self {
147    *self
148  }
149}
150
151impl<I, A> Copy for SerfMessageRef<'_, I, A> {}
152
153impl<I, A> AsMessageRef<I, A> for SerfMessageRef<'_, I, A> {
154  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
155    *self
156  }
157}
158
159/// The types of gossip messages Serf will send along
160/// memberlist.
161#[derive(Debug, Clone)]
162pub enum SerfMessage<I, A> {
163  /// Leave message
164  Leave(LeaveMessage<I>),
165  /// Join message
166  Join(JoinMessage<I>),
167  /// PushPull message
168  PushPull(PushPullMessage<I>),
169  /// UserEvent message
170  UserEvent(UserEventMessage),
171  /// Query message
172  Query(QueryMessage<I, A>),
173  /// QueryResponse message
174  QueryResponse(QueryResponseMessage<I, A>),
175  /// ConflictResponse message
176  ConflictResponse(Member<I, A>),
177  /// Relay message
178  #[cfg(feature = "encryption")]
179  KeyRequest(KeyRequestMessage),
180  /// KeyResponse message
181  #[cfg(feature = "encryption")]
182  KeyResponse(KeyResponseMessage),
183}
184
185impl<'a, I, A> From<&'a SerfMessage<I, A>> for MessageType {
186  fn from(msg: &'a SerfMessage<I, A>) -> Self {
187    match msg {
188      SerfMessage::Leave(_) => MessageType::Leave,
189      SerfMessage::Join(_) => MessageType::Join,
190      SerfMessage::PushPull(_) => MessageType::PushPull,
191      SerfMessage::UserEvent(_) => MessageType::UserEvent,
192      SerfMessage::Query(_) => MessageType::Query,
193      SerfMessage::QueryResponse(_) => MessageType::QueryResponse,
194      SerfMessage::ConflictResponse(_) => MessageType::ConflictResponse,
195      #[cfg(feature = "encryption")]
196      SerfMessage::KeyRequest(_) => MessageType::KeyRequest,
197      #[cfg(feature = "encryption")]
198      SerfMessage::KeyResponse(_) => MessageType::KeyResponse,
199    }
200  }
201}
202
203impl<I, A> AsMessageRef<I, A> for QueryMessage<I, A> {
204  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
205    SerfMessageRef::Query(self)
206  }
207}
208
209impl<I, A> AsMessageRef<I, A> for QueryResponseMessage<I, A> {
210  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
211    SerfMessageRef::QueryResponse(self)
212  }
213}
214
215impl<I, A> AsMessageRef<I, A> for JoinMessage<I> {
216  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
217    SerfMessageRef::Join(self)
218  }
219}
220
221impl<I, A> AsMessageRef<I, A> for UserEventMessage {
222  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
223    SerfMessageRef::UserEvent(self)
224  }
225}
226
227impl<I, A> AsMessageRef<I, A> for &QueryMessage<I, A> {
228  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
229    SerfMessageRef::Query(self)
230  }
231}
232
233impl<I, A> AsMessageRef<I, A> for &QueryResponseMessage<I, A> {
234  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
235    SerfMessageRef::QueryResponse(self)
236  }
237}
238
239impl<I, A> AsMessageRef<I, A> for &JoinMessage<I> {
240  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
241    SerfMessageRef::Join(self)
242  }
243}
244
245impl<I, A> AsMessageRef<I, A> for PushPullMessageRef<'_, I> {
246  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
247    SerfMessageRef::PushPull(*self)
248  }
249}
250
251impl<I, A> AsMessageRef<I, A> for &PushPullMessage<I> {
252  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
253    SerfMessageRef::PushPull(PushPullMessageRef {
254      ltime: self.ltime,
255      status_ltimes: &self.status_ltimes,
256      left_members: &self.left_members,
257      event_ltime: self.event_ltime,
258      events: &self.events,
259      query_ltime: self.query_ltime,
260    })
261  }
262}
263
264impl<I, A> AsMessageRef<I, A> for &UserEventMessage {
265  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
266    SerfMessageRef::UserEvent(self)
267  }
268}
269
270impl<I, A> AsMessageRef<I, A> for &LeaveMessage<I> {
271  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
272    SerfMessageRef::Leave(self)
273  }
274}
275
276impl<I, A> AsMessageRef<I, A> for &Member<I, A> {
277  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
278    SerfMessageRef::ConflictResponse(self)
279  }
280}
281
282impl<I, A> AsMessageRef<I, A> for &Arc<Member<I, A>> {
283  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
284    SerfMessageRef::ConflictResponse(self)
285  }
286}
287
288#[cfg(feature = "encryption")]
289impl<I, A> AsMessageRef<I, A> for &KeyRequestMessage {
290  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
291    SerfMessageRef::KeyRequest(self)
292  }
293}
294
295#[cfg(feature = "encryption")]
296impl<I, A> AsMessageRef<I, A> for &KeyResponseMessage {
297  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
298    SerfMessageRef::KeyResponse(self)
299  }
300}
301
302impl<I, A> AsMessageRef<I, A> for SerfMessage<I, A> {
303  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
304    match self {
305      Self::Leave(l) => SerfMessageRef::Leave(l),
306      Self::Join(j) => SerfMessageRef::Join(j),
307      Self::PushPull(pp) => SerfMessageRef::PushPull(PushPullMessageRef {
308        ltime: pp.ltime,
309        status_ltimes: &pp.status_ltimes,
310        left_members: &pp.left_members,
311        event_ltime: pp.event_ltime,
312        events: &pp.events,
313        query_ltime: pp.query_ltime,
314      }),
315      Self::UserEvent(u) => SerfMessageRef::UserEvent(u),
316      Self::Query(q) => SerfMessageRef::Query(q),
317      Self::QueryResponse(q) => SerfMessageRef::QueryResponse(q),
318      Self::ConflictResponse(m) => SerfMessageRef::ConflictResponse(m),
319      #[cfg(feature = "encryption")]
320      Self::KeyRequest(kr) => SerfMessageRef::KeyRequest(kr),
321      #[cfg(feature = "encryption")]
322      Self::KeyResponse(kr) => SerfMessageRef::KeyResponse(kr),
323    }
324  }
325}
326
327impl<I, A> AsMessageRef<I, A> for &SerfMessage<I, A> {
328  fn as_message_ref(&self) -> SerfMessageRef<I, A> {
329    match self {
330      SerfMessage::Leave(l) => SerfMessageRef::Leave(l),
331      SerfMessage::Join(j) => SerfMessageRef::Join(j),
332      SerfMessage::PushPull(pp) => SerfMessageRef::PushPull(PushPullMessageRef {
333        ltime: pp.ltime,
334        status_ltimes: &pp.status_ltimes,
335        left_members: &pp.left_members,
336        event_ltime: pp.event_ltime,
337        events: &pp.events,
338        query_ltime: pp.query_ltime,
339      }),
340      SerfMessage::UserEvent(u) => SerfMessageRef::UserEvent(u),
341      SerfMessage::Query(q) => SerfMessageRef::Query(q),
342      SerfMessage::QueryResponse(q) => SerfMessageRef::QueryResponse(q),
343      SerfMessage::ConflictResponse(m) => SerfMessageRef::ConflictResponse(m),
344      #[cfg(feature = "encryption")]
345      SerfMessage::KeyRequest(kr) => SerfMessageRef::KeyRequest(kr),
346      #[cfg(feature = "encryption")]
347      SerfMessage::KeyResponse(kr) => SerfMessageRef::KeyResponse(kr),
348    }
349  }
350}
351
352impl<I, A> core::fmt::Display for SerfMessage<I, A> {
353  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
354    write!(f, "{}", self.ty().as_str())
355  }
356}
357
358impl<I, A> SerfMessage<I, A> {
359  /// Returns the message type of this message
360  #[inline]
361  pub const fn ty(&self) -> MessageType {
362    match self {
363      Self::Leave(_) => MessageType::Leave,
364      Self::Join(_) => MessageType::Join,
365      Self::PushPull(_) => MessageType::PushPull,
366      Self::UserEvent(_) => MessageType::UserEvent,
367      Self::Query(_) => MessageType::Query,
368      Self::QueryResponse(_) => MessageType::QueryResponse,
369      Self::ConflictResponse(_) => MessageType::ConflictResponse,
370      #[cfg(feature = "encryption")]
371      Self::KeyRequest(_) => MessageType::KeyRequest,
372      #[cfg(feature = "encryption")]
373      Self::KeyResponse(_) => MessageType::KeyResponse,
374    }
375  }
376}
377
378/// Error that can occur when transforming a [`SerfMessage`] or [`SerfMessageRef`]
379#[derive(thiserror::Error)]
380pub enum SerfMessageTransformError<I, A>
381where
382  I: Transformable + core::hash::Hash + Eq,
383  A: Transformable + core::hash::Hash + Eq,
384{
385  /// [`LeaveMessage`] transformation error
386  #[error(transparent)]
387  Leave(#[from] LeaveMessageTransformError<I>),
388  /// [`JoinMessage`] transformation error
389  #[error(transparent)]
390  Join(#[from] JoinMessageTransformError<I>),
391  /// [`PushPullMessage`] transformation error
392  #[error(transparent)]
393  PushPull(#[from] PushPullMessageTransformError<I>),
394  /// [`UserEventMessage`] transformation error
395  #[error(transparent)]
396  UserEvent(#[from] UserEventMessageTransformError),
397  /// [`QueryMessage`] transformation error
398  #[error(transparent)]
399  Query(#[from] QueryMessageTransformError<I, A>),
400  /// [`QueryResponseMessage`] transformation error
401  #[error(transparent)]
402  QueryResponse(#[from] QueryResponseMessageTransformError<I, A>),
403  /// [`Member`] transformation error
404  #[error(transparent)]
405  ConflictResponse(#[from] MemberTransformError<I, A>),
406  /// [`KeyRequestMessage`] transformation error
407  #[cfg(feature = "encryption")]
408  #[error(transparent)]
409  KeyRequest(#[from] crate::key::OptionSecretKeyTransformError),
410  /// [`KeyResponseMessage`] transformation error
411  #[cfg(feature = "encryption")]
412  #[error(transparent)]
413  KeyResponse(#[from] crate::key::KeyResponseMessageTransformError),
414}
415
416impl<I, A> core::fmt::Debug for SerfMessageTransformError<I, A>
417where
418  I: Transformable + core::hash::Hash + Eq,
419  A: Transformable + core::hash::Hash + Eq,
420{
421  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
422    write!(f, "{}", self)
423  }
424}
425
426impl<I, A> Encodable for SerfMessageRef<'_, I, A>
427where
428  I: Transformable + core::hash::Hash + Eq,
429  A: Transformable + core::hash::Hash + Eq,
430{
431  type Error = SerfMessageTransformError<I, A>;
432
433  #[inline]
434  fn encoded_len(&self) -> usize {
435    match *self {
436      Self::Leave(msg) => Transformable::encoded_len(msg),
437      Self::Join(msg) => Transformable::encoded_len(msg),
438      Self::PushPull(msg) => Encodable::encoded_len(&msg),
439      Self::UserEvent(msg) => Transformable::encoded_len(msg),
440      Self::Query(msg) => Transformable::encoded_len(msg),
441      Self::QueryResponse(msg) => Transformable::encoded_len(msg),
442      Self::ConflictResponse(msg) => Transformable::encoded_len(msg),
443      #[cfg(feature = "encryption")]
444      Self::KeyRequest(msg) => Transformable::encoded_len(msg),
445      #[cfg(feature = "encryption")]
446      Self::KeyResponse(msg) => Transformable::encoded_len(msg),
447    }
448  }
449
450  #[inline]
451  fn encode(&self, dst: &mut [u8]) -> Result<usize, Self::Error> {
452    match *self {
453      Self::Leave(msg) => Transformable::encode(msg, dst).map_err(Into::into),
454      Self::Join(msg) => Transformable::encode(msg, dst).map_err(Into::into),
455      Self::PushPull(msg) => Encodable::encode(&msg, dst).map_err(Into::into),
456      Self::UserEvent(msg) => Transformable::encode(msg, dst).map_err(Into::into),
457      Self::Query(msg) => Transformable::encode(msg, dst).map_err(Into::into),
458      Self::QueryResponse(msg) => Transformable::encode(msg, dst).map_err(Into::into),
459      Self::ConflictResponse(msg) => Transformable::encode(msg, dst).map_err(Into::into),
460      #[cfg(feature = "encryption")]
461      Self::KeyRequest(msg) => Transformable::encode(msg, dst).map_err(Into::into),
462      #[cfg(feature = "encryption")]
463      Self::KeyResponse(msg) => Transformable::encode(msg, dst).map_err(Into::into),
464    }
465  }
466}