serf_core/
event.rs

1use std::{pin::Pin, sync::Arc, task::Poll, time::Duration};
2
3use self::error::Error;
4
5use super::{delegate::Delegate, types::Epoch, *};
6
7mod crate_event;
8
9use async_channel::Sender;
10pub use async_channel::{RecvError, TryRecvError};
11
12use crate::types::{LamportTime, Member, Node, QueryFlag, QueryResponseMessage, UserEventMessage};
13use async_lock::Mutex;
14pub(crate) use crate_event::*;
15use futures::Stream;
16use memberlist_core::{CheapClone, bytes::Bytes, proto::TinyVec, transport::Transport};
17use smol_str::SmolStr;
18
19pub(crate) struct QueryContext<T, D>
20where
21  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
22  T: Transport,
23{
24  pub(crate) query_timeout: Duration,
25  pub(crate) span: Mutex<Option<Epoch>>,
26  pub(crate) this: Serf<T, D>,
27}
28
29impl<T, D> QueryContext<T, D>
30where
31  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
32  T: Transport,
33{
34  fn check_response_size(&self, size: usize) -> Result<(), Error<T, D>> {
35    if size > self.this.inner.opts.query_response_size_limit {
36      Err(Error::query_response_too_large(
37        self.this.inner.opts.query_response_size_limit,
38        size,
39      ))
40    } else {
41      Ok(())
42    }
43  }
44
45  async fn respond_with_message_and_response(
46    &self,
47    respond_to: &T::ResolvedAddress,
48    relay_factor: u8,
49    raw: Bytes,
50    resp: QueryResponseMessage<T::Id, T::ResolvedAddress>,
51  ) -> Result<(), Error<T, D>> {
52    self.check_response_size(raw.len())?;
53
54    let mut mu = self.span.lock().await;
55
56    if let Some(span) = *mu {
57      // Ensure we aren't past our response deadline
58      if span.elapsed() > self.query_timeout {
59        return Err(Error::query_timeout());
60      }
61
62      // Send the response directly to the originator
63      self.this.inner.memberlist.send(respond_to, raw).await?;
64
65      // Relay the response through up to relayFactor other nodes
66      self
67        .this
68        .relay_response(relay_factor, resp.from.cheap_clone(), resp)
69        .await?;
70
71      // Clear the deadline, responses sent
72      *mu = None;
73      Ok(())
74    } else {
75      Err(Error::query_already_responsed())
76    }
77  }
78
79  async fn respond(
80    &self,
81    respond_to: &T::ResolvedAddress,
82    id: u32,
83    ltime: LamportTime,
84    relay_factor: u8,
85    msg: Bytes,
86  ) -> Result<(), Error<T, D>> {
87    let resp = QueryResponseMessage {
88      ltime,
89      id,
90      from: self.this.advertise_node(),
91      flags: QueryFlag::empty(),
92      payload: msg,
93    };
94    let buf = crate::types::encode_message_to_bytes(&resp)?;
95    self
96      .respond_with_message_and_response(respond_to, relay_factor, buf, resp)
97      .await
98  }
99}
100
101/// Query event
102pub struct QueryEvent<T, D>
103where
104  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
105  T: Transport,
106{
107  pub(crate) ltime: LamportTime,
108  pub(crate) name: SmolStr,
109  pub(crate) payload: Bytes,
110
111  pub(crate) ctx: Arc<QueryContext<T, D>>,
112  pub(crate) id: u32,
113  /// source node
114  pub(crate) from: Node<T::Id, T::ResolvedAddress>,
115  /// Number of duplicate responses to relay back to sender
116  pub(crate) relay_factor: u8,
117}
118
119impl<D, T> QueryEvent<T, D>
120where
121  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
122  T: Transport,
123{
124  /// Returns the lamport time of the query
125  #[inline]
126  pub const fn lamport_time(&self) -> LamportTime {
127    self.ltime
128  }
129
130  /// Returns the name of the query
131  #[inline]
132  pub const fn name(&self) -> &SmolStr {
133    &self.name
134  }
135
136  /// Returns the payload of the query
137  #[inline]
138  pub const fn payload(&self) -> &Bytes {
139    &self.payload
140  }
141
142  /// Returns the id of the query
143  #[inline]
144  pub const fn id(&self) -> u32 {
145    self.id
146  }
147
148  /// Returns the source node of the query
149  #[inline]
150  pub const fn from(&self) -> &Node<T::Id, T::ResolvedAddress> {
151    &self.from
152  }
153}
154
155impl<D, T> PartialEq for QueryEvent<T, D>
156where
157  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
158  T: Transport,
159{
160  fn eq(&self, other: &Self) -> bool {
161    self.id == other.id
162      && self.from == other.from
163      && self.relay_factor == other.relay_factor
164      && self.ltime == other.ltime
165      && self.name == other.name
166      && self.payload == other.payload
167  }
168}
169
170impl<D, T> AsRef<QueryEvent<T, D>> for QueryEvent<T, D>
171where
172  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
173  T: Transport,
174{
175  fn as_ref(&self) -> &QueryEvent<T, D> {
176    self
177  }
178}
179
180impl<D, T> Clone for QueryEvent<T, D>
181where
182  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
183  T: Transport,
184{
185  fn clone(&self) -> Self {
186    Self {
187      ltime: self.ltime,
188      name: self.name.clone(),
189      payload: self.payload.clone(),
190      ctx: self.ctx.clone(),
191      id: self.id,
192      from: self.from.clone(),
193      relay_factor: self.relay_factor,
194    }
195  }
196}
197
198impl<D, T> core::fmt::Display for QueryEvent<T, D>
199where
200  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
201  T: Transport,
202{
203  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
204    write!(f, "query")
205  }
206}
207
208impl<D, T> QueryEvent<T, D>
209where
210  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
211  T: Transport,
212{
213  #[cfg(feature = "encryption")]
214  pub(crate) fn create_response(
215    &self,
216    buf: Bytes,
217  ) -> QueryResponseMessage<T::Id, T::ResolvedAddress> {
218    QueryResponseMessage {
219      ltime: self.ltime,
220      id: self.id,
221      from: self.ctx.this.inner.memberlist.advertise_node(),
222      flags: QueryFlag::empty(),
223      payload: buf,
224    }
225  }
226
227  #[cfg(feature = "encryption")]
228  pub(crate) fn check_response_size(&self, size: usize) -> Result<(), Error<T, D>> {
229    self.ctx.check_response_size(size)
230  }
231
232  #[cfg(feature = "encryption")]
233  pub(crate) async fn respond_with_message_and_response(
234    &self,
235    raw: Bytes,
236    resp: QueryResponseMessage<T::Id, T::ResolvedAddress>,
237  ) -> Result<(), Error<T, D>> {
238    self
239      .ctx
240      .respond_with_message_and_response(self.from.address(), self.relay_factor, raw, resp)
241      .await
242  }
243
244  /// Used to send a response to the user query
245  pub async fn respond(&self, msg: Bytes) -> Result<(), Error<T, D>> {
246    self
247      .ctx
248      .respond(
249        self.from().address(),
250        self.id,
251        self.ltime,
252        self.relay_factor,
253        msg,
254      )
255      .await
256  }
257}
258
259/// The event type for member event
260#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
261#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
262#[cfg_attr(feature = "serde", serde(rename_all = "kebab-case", untagged))]
263pub enum MemberEventType {
264  /// Join event
265  #[cfg_attr(feature = "serde", serde(rename = "member-join"))]
266  Join,
267  /// Leave event
268  #[cfg_attr(feature = "serde", serde(rename = "member-leave"))]
269  Leave,
270  /// Failed event
271  #[cfg_attr(feature = "serde", serde(rename = "member-failed"))]
272  Failed,
273  /// Update event
274  #[cfg_attr(feature = "serde", serde(rename = "member-update"))]
275  Update,
276  /// Reap event
277  #[cfg_attr(feature = "serde", serde(rename = "member-reap"))]
278  Reap,
279}
280
281impl MemberEventType {
282  /// Returns the string representation of the event type.
283  #[inline]
284  pub const fn as_str(&self) -> &'static str {
285    match self {
286      Self::Join => "member-join",
287      Self::Leave => "member-leave",
288      Self::Failed => "member-failed",
289      Self::Update => "member-update",
290      Self::Reap => "member-reap",
291    }
292  }
293}
294
295impl core::fmt::Display for MemberEventType {
296  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
297    match self {
298      Self::Join => write!(f, "member-join"),
299      Self::Leave => write!(f, "member-leave"),
300      Self::Failed => write!(f, "member-failed"),
301      Self::Update => write!(f, "member-update"),
302      Self::Reap => write!(f, "member-reap"),
303    }
304  }
305}
306
307#[derive(Debug, Clone, PartialEq)]
308pub(crate) struct MemberEventMut<I, A> {
309  pub(crate) ty: MemberEventType,
310  pub(crate) members: TinyVec<Member<I, A>>,
311}
312
313impl<I, A> MemberEventMut<I, A> {
314  pub(crate) fn freeze(self) -> MemberEvent<I, A> {
315    MemberEvent {
316      ty: self.ty,
317      members: Arc::new(self.members),
318    }
319  }
320}
321
322/// MemberEvent is the struct used for member related events
323/// Because Serf coalesces events, an event may contain multiple members.
324#[derive(Debug, PartialEq)]
325pub struct MemberEvent<I, A> {
326  pub(crate) ty: MemberEventType,
327  pub(crate) members: Arc<TinyVec<Member<I, A>>>,
328}
329
330impl<I, A> Clone for MemberEvent<I, A> {
331  fn clone(&self) -> Self {
332    Self {
333      ty: self.ty,
334      members: self.members.clone(),
335    }
336  }
337}
338
339impl<I, A> CheapClone for MemberEvent<I, A> {}
340
341impl<I, A> core::fmt::Display for MemberEvent<I, A> {
342  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
343    write!(f, "{}", self.ty)
344  }
345}
346
347impl<I, A> MemberEvent<I, A> {
348  /// Returns the event type of this member event
349  pub fn ty(&self) -> MemberEventType {
350    self.ty
351  }
352
353  /// Returns the members of this event
354  pub fn members(&self) -> &[Member<I, A>] {
355    &self.members
356  }
357}
358
359impl<I, A> From<MemberEvent<I, A>> for (MemberEventType, Arc<TinyVec<Member<I, A>>>) {
360  fn from(event: MemberEvent<I, A>) -> Self {
361    (event.ty, event.members)
362  }
363}
364
365/// The event produced by the Serf instance.
366#[derive(derive_more::From)]
367pub enum Event<T, D>
368where
369  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
370  T: Transport,
371{
372  /// Member related events
373  Member(MemberEvent<T::Id, T::ResolvedAddress>),
374  /// User events
375  User(UserEventMessage),
376  /// Query events
377  Query(QueryEvent<T, D>),
378}
379
380impl<D, T> Clone for Event<T, D>
381where
382  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
383  T: Transport,
384{
385  fn clone(&self) -> Self {
386    match self {
387      Self::Member(e) => Self::Member(e.cheap_clone()),
388      Self::User(e) => Self::User(e.cheap_clone()),
389      Self::Query(e) => Self::Query(e.clone()),
390    }
391  }
392}
393
394/// The producer of the Serf events.
395#[derive(Debug)]
396pub struct EventProducer<T, D>
397where
398  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
399  T: Transport,
400{
401  pub(crate) tx: Sender<CrateEvent<T, D>>,
402}
403
404impl<T, D> EventProducer<T, D>
405where
406  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
407  T: Transport,
408{
409  /// Creates a bounded producer and subscriber.
410  ///
411  /// The created subscriber has space to hold at most cap events at a time.
412  /// Users must actively consume the events from the subscriber to prevent the producer from blocking.
413  pub fn bounded(size: usize) -> (Self, EventSubscriber<T, D>) {
414    let (tx, rx) = async_channel::bounded(size);
415    (Self { tx }, EventSubscriber { rx })
416  }
417
418  /// Creates an unbounded producer and subscriber.
419  ///
420  /// The created subscriber has no limit on the number of events it can hold.
421  pub fn unbounded() -> (Self, EventSubscriber<T, D>) {
422    let (tx, rx) = async_channel::unbounded();
423    (Self { tx }, EventSubscriber { rx })
424  }
425}
426
427/// Subscribe the events from the Serf instance.
428#[pin_project::pin_project]
429#[derive(Debug)]
430pub struct EventSubscriber<T, D>
431where
432  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
433  T: Transport,
434{
435  #[pin]
436  pub(crate) rx: async_channel::Receiver<CrateEvent<T, D>>,
437}
438
439impl<T, D> EventSubscriber<T, D>
440where
441  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
442  T: Transport,
443{
444  /// Receives a event from the subscriber.
445  ///
446  /// If the subscriber is empty, this method waits until there is a event.
447  ///
448  /// If the subscriber is closed, this method receives a event or returns an error if there are no more events
449  pub async fn recv(&self) -> Result<Event<T, D>, RecvError> {
450    loop {
451      match self.rx.recv().await {
452        Ok(CrateEvent::InternalQuery { .. }) => continue,
453        Ok(CrateEvent::Member(e)) => return Ok(Event::Member(e)),
454        Ok(CrateEvent::User(e)) => return Ok(Event::User(e)),
455        Ok(CrateEvent::Query(e)) => return Ok(Event::Query(e)),
456        Err(e) => return Err(e),
457      }
458    }
459  }
460
461  /// Tries to receive a event from the subscriber.
462  ///
463  /// If the subscriber is empty, this method returns an error.
464  /// If the subscriber is closed, this method receives a event or returns an error if there are no more events
465  pub fn try_recv(&self) -> Result<Event<T, D>, TryRecvError> {
466    loop {
467      match self.rx.try_recv() {
468        Ok(CrateEvent::InternalQuery { .. }) => continue,
469        Ok(CrateEvent::Member(e)) => return Ok(Event::Member(e)),
470        Ok(CrateEvent::User(e)) => return Ok(Event::User(e)),
471        Ok(CrateEvent::Query(e)) => return Ok(Event::Query(e)),
472        Err(e) => return Err(e),
473      }
474    }
475  }
476
477  /// Returns `true` if the subscriber is empty.
478  pub fn is_empty(&self) -> bool {
479    self.rx.is_empty()
480  }
481
482  /// Returns `true` if the channel is closed.
483  pub fn is_closed(&self) -> bool {
484    self.rx.is_closed()
485  }
486
487  /// Returns the number of events in the subscriber.
488  pub fn len(&self) -> usize {
489    self.rx.len()
490  }
491}
492
493impl<T, D> Stream for EventSubscriber<T, D>
494where
495  D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
496  T: Transport,
497{
498  type Item = Event<T, D>;
499
500  fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
501    match <async_channel::Receiver<CrateEvent<T, D>> as Stream>::poll_next(self.project().rx, cx) {
502      Poll::Ready(Some(event)) => match event {
503        CrateEvent::Member(e) => Poll::Ready(Some(Event::Member(e))),
504        CrateEvent::User(e) => Poll::Ready(Some(Event::User(e))),
505        CrateEvent::Query(e) => Poll::Ready(Some(Event::Query(e))),
506        CrateEvent::InternalQuery { .. } => Poll::Pending,
507      },
508      Poll::Ready(None) => Poll::Ready(None),
509      Poll::Pending => Poll::Pending,
510    }
511  }
512}