serf_core/
event.rs

1use std::{pin::Pin, sync::Arc, task::Poll, time::Duration};
2
3use crate::delegate::TransformDelegate;
4
5use self::error::Error;
6
7use super::{delegate::Delegate, types::Epoch, *};
8
9mod crate_event;
10
11use async_channel::Sender;
12pub use async_channel::{RecvError, TryRecvError};
13
14use async_lock::Mutex;
15pub(crate) use crate_event::*;
16use futures::Stream;
17use memberlist_core::{
18  bytes::{BufMut, Bytes, BytesMut},
19  transport::{AddressResolver, Transport},
20  types::TinyVec,
21  CheapClone,
22};
23use serf_types::{
24  LamportTime, Member, MessageType, Node, QueryFlag, QueryResponseMessage, UserEventMessage,
25};
26use smol_str::SmolStr;
27
28pub(crate) struct QueryContext<T, D>
29where
30  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
31  T: Transport,
32{
33  pub(crate) query_timeout: Duration,
34  pub(crate) span: Mutex<Option<Epoch>>,
35  pub(crate) this: Serf<T, D>,
36}
37
38impl<T, D> QueryContext<T, D>
39where
40  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
41  T: Transport,
42{
43  fn check_response_size(&self, resp: &[u8]) -> Result<(), Error<T, D>> {
44    let resp_len = resp.len();
45    if resp_len > self.this.inner.opts.query_response_size_limit {
46      Err(Error::query_response_too_large(
47        self.this.inner.opts.query_response_size_limit,
48        resp_len,
49      ))
50    } else {
51      Ok(())
52    }
53  }
54
55  async fn respond_with_message_and_response(
56    &self,
57    respond_to: &<T::Resolver as AddressResolver>::ResolvedAddress,
58    relay_factor: u8,
59    raw: Bytes,
60    resp: QueryResponseMessage<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
61  ) -> Result<(), Error<T, D>> {
62    self.check_response_size(raw.as_ref())?;
63
64    let mut mu = self.span.lock().await;
65
66    if let Some(span) = *mu {
67      // Ensure we aren't past our response deadline
68      if span.elapsed() > self.query_timeout {
69        return Err(Error::query_timeout());
70      }
71
72      // Send the response directly to the originator
73      self.this.inner.memberlist.send(respond_to, raw).await?;
74
75      // Relay the response through up to relayFactor other nodes
76      self
77        .this
78        .relay_response(relay_factor, resp.from.cheap_clone(), resp)
79        .await?;
80
81      // Clear the deadline, responses sent
82      *mu = None;
83      Ok(())
84    } else {
85      Err(Error::query_already_responsed())
86    }
87  }
88
89  async fn respond(
90    &self,
91    respond_to: &<T::Resolver as AddressResolver>::ResolvedAddress,
92    id: u32,
93    ltime: LamportTime,
94    relay_factor: u8,
95    msg: Bytes,
96  ) -> Result<(), Error<T, D>> {
97    let resp = QueryResponseMessage {
98      ltime,
99      id,
100      from: self.this.advertise_node(),
101      flags: QueryFlag::empty(),
102      payload: msg,
103    };
104    let expected_encoded_len = <D as TransformDelegate>::message_encoded_len(&resp);
105    let mut buf = BytesMut::with_capacity(expected_encoded_len + 1); // +1 for the message type byte
106    buf.put_u8(MessageType::QueryResponse as u8);
107    buf.resize(expected_encoded_len + 1, 0);
108    let len = <D as TransformDelegate>::encode_message(&resp, &mut buf[1..])
109      .map_err(Error::transform_delegate)?;
110    debug_assert_eq!(
111      len, expected_encoded_len,
112      "expected encoded len {expected_encoded_len} is not match the actual encoded len {len}"
113    );
114    self
115      .respond_with_message_and_response(respond_to, relay_factor, buf.freeze(), resp)
116      .await
117  }
118}
119
120/// Query event
121pub struct QueryEvent<T, D>
122where
123  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
124  T: Transport,
125{
126  pub(crate) ltime: LamportTime,
127  pub(crate) name: SmolStr,
128  pub(crate) payload: Bytes,
129
130  pub(crate) ctx: Arc<QueryContext<T, D>>,
131  pub(crate) id: u32,
132  /// source node
133  pub(crate) from: Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
134  /// Number of duplicate responses to relay back to sender
135  pub(crate) relay_factor: u8,
136}
137
138impl<D, T> QueryEvent<T, D>
139where
140  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
141  T: Transport,
142{
143  /// Returns the lamport time of the query
144  #[inline]
145  pub const fn lamport_time(&self) -> LamportTime {
146    self.ltime
147  }
148
149  /// Returns the name of the query
150  #[inline]
151  pub const fn name(&self) -> &SmolStr {
152    &self.name
153  }
154
155  /// Returns the payload of the query
156  #[inline]
157  pub const fn payload(&self) -> &Bytes {
158    &self.payload
159  }
160
161  /// Returns the id of the query
162  #[inline]
163  pub const fn id(&self) -> u32 {
164    self.id
165  }
166
167  /// Returns the source node of the query
168  #[inline]
169  pub const fn from(&self) -> &Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
170    &self.from
171  }
172}
173
174impl<D, T> PartialEq for QueryEvent<T, D>
175where
176  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
177  T: Transport,
178{
179  fn eq(&self, other: &Self) -> bool {
180    self.id == other.id
181      && self.from == other.from
182      && self.relay_factor == other.relay_factor
183      && self.ltime == other.ltime
184      && self.name == other.name
185      && self.payload == other.payload
186  }
187}
188
189impl<D, T> AsRef<QueryEvent<T, D>> for QueryEvent<T, D>
190where
191  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
192  T: Transport,
193{
194  fn as_ref(&self) -> &QueryEvent<T, D> {
195    self
196  }
197}
198
199impl<D, T> Clone for QueryEvent<T, D>
200where
201  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
202  T: Transport,
203{
204  fn clone(&self) -> Self {
205    Self {
206      ltime: self.ltime,
207      name: self.name.clone(),
208      payload: self.payload.clone(),
209      ctx: self.ctx.clone(),
210      id: self.id,
211      from: self.from.clone(),
212      relay_factor: self.relay_factor,
213    }
214  }
215}
216
217impl<D, T> core::fmt::Display for QueryEvent<T, D>
218where
219  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
220  T: Transport,
221{
222  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
223    write!(f, "query")
224  }
225}
226
227impl<D, T> QueryEvent<T, D>
228where
229  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
230  T: Transport,
231{
232  #[cfg(feature = "encryption")]
233  pub(crate) fn create_response(
234    &self,
235    buf: Bytes,
236  ) -> QueryResponseMessage<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
237    QueryResponseMessage {
238      ltime: self.ltime,
239      id: self.id,
240      from: self.ctx.this.inner.memberlist.advertise_node(),
241      flags: QueryFlag::empty(),
242      payload: buf,
243    }
244  }
245
246  #[cfg(feature = "encryption")]
247  pub(crate) fn check_response_size(&self, resp: &[u8]) -> Result<(), Error<T, D>> {
248    self.ctx.check_response_size(resp)
249  }
250
251  #[cfg(feature = "encryption")]
252  pub(crate) async fn respond_with_message_and_response(
253    &self,
254    raw: Bytes,
255    resp: QueryResponseMessage<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
256  ) -> Result<(), Error<T, D>> {
257    self
258      .ctx
259      .respond_with_message_and_response(self.from.address(), self.relay_factor, raw, resp)
260      .await
261  }
262
263  /// Used to send a response to the user query
264  pub async fn respond(&self, msg: Bytes) -> Result<(), Error<T, D>> {
265    self
266      .ctx
267      .respond(
268        self.from().address(),
269        self.id,
270        self.ltime,
271        self.relay_factor,
272        msg,
273      )
274      .await
275  }
276}
277
278/// The event type for member event
279#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
280#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
281#[cfg_attr(feature = "serde", serde(rename_all = "kebab-case", untagged))]
282pub enum MemberEventType {
283  /// Join event
284  #[cfg_attr(feature = "serde", serde(rename = "member-join"))]
285  Join,
286  /// Leave event
287  #[cfg_attr(feature = "serde", serde(rename = "member-leave"))]
288  Leave,
289  /// Failed event
290  #[cfg_attr(feature = "serde", serde(rename = "member-failed"))]
291  Failed,
292  /// Update event
293  #[cfg_attr(feature = "serde", serde(rename = "member-update"))]
294  Update,
295  /// Reap event
296  #[cfg_attr(feature = "serde", serde(rename = "member-reap"))]
297  Reap,
298}
299
300impl MemberEventType {
301  /// Returns the string representation of the event type.
302  #[inline]
303  pub const fn as_str(&self) -> &'static str {
304    match self {
305      Self::Join => "member-join",
306      Self::Leave => "member-leave",
307      Self::Failed => "member-failed",
308      Self::Update => "member-update",
309      Self::Reap => "member-reap",
310    }
311  }
312}
313
314impl core::fmt::Display for MemberEventType {
315  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
316    match self {
317      Self::Join => write!(f, "member-join"),
318      Self::Leave => write!(f, "member-leave"),
319      Self::Failed => write!(f, "member-failed"),
320      Self::Update => write!(f, "member-update"),
321      Self::Reap => write!(f, "member-reap"),
322    }
323  }
324}
325
326#[derive(Debug, Clone, PartialEq)]
327pub(crate) struct MemberEventMut<I, A> {
328  pub(crate) ty: MemberEventType,
329  pub(crate) members: TinyVec<Member<I, A>>,
330}
331
332impl<I, A> MemberEventMut<I, A> {
333  pub(crate) fn freeze(self) -> MemberEvent<I, A> {
334    MemberEvent {
335      ty: self.ty,
336      members: Arc::new(self.members),
337    }
338  }
339}
340
341/// MemberEvent is the struct used for member related events
342/// Because Serf coalesces events, an event may contain multiple members.
343#[derive(Debug, PartialEq)]
344pub struct MemberEvent<I, A> {
345  pub(crate) ty: MemberEventType,
346  pub(crate) members: Arc<TinyVec<Member<I, A>>>,
347}
348
349impl<I, A> Clone for MemberEvent<I, A> {
350  fn clone(&self) -> Self {
351    Self {
352      ty: self.ty,
353      members: self.members.clone(),
354    }
355  }
356}
357
358impl<I, A> CheapClone for MemberEvent<I, A> {}
359
360impl<I, A> core::fmt::Display for MemberEvent<I, A> {
361  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
362    write!(f, "{}", self.ty)
363  }
364}
365
366impl<I, A> MemberEvent<I, A> {
367  /// Returns the event type of this member event
368  pub fn ty(&self) -> MemberEventType {
369    self.ty
370  }
371
372  /// Returns the members of this event
373  pub fn members(&self) -> &[Member<I, A>] {
374    &self.members
375  }
376}
377
378impl<I, A> From<MemberEvent<I, A>> for (MemberEventType, Arc<TinyVec<Member<I, A>>>) {
379  fn from(event: MemberEvent<I, A>) -> Self {
380    (event.ty, event.members)
381  }
382}
383
384/// The event produced by the Serf instance.
385#[derive(derive_more::From)]
386pub enum Event<T, D>
387where
388  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
389  T: Transport,
390{
391  /// Member related events
392  Member(MemberEvent<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>),
393  /// User events
394  User(UserEventMessage),
395  /// Query events
396  Query(QueryEvent<T, D>),
397}
398
399impl<D, T> Clone for Event<T, D>
400where
401  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
402  T: Transport,
403{
404  fn clone(&self) -> Self {
405    match self {
406      Self::Member(e) => Self::Member(e.cheap_clone()),
407      Self::User(e) => Self::User(e.cheap_clone()),
408      Self::Query(e) => Self::Query(e.clone()),
409    }
410  }
411}
412
413/// The producer of the Serf events.
414#[derive(Debug)]
415pub struct EventProducer<T, D>
416where
417  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
418  T: Transport,
419{
420  pub(crate) tx: Sender<CrateEvent<T, D>>,
421}
422
423impl<T, D> EventProducer<T, D>
424where
425  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
426  T: Transport,
427{
428  /// Creates a bounded producer and subscriber.
429  ///
430  /// The created subscriber has space to hold at most cap events at a time.
431  /// Users must actively consume the events from the subscriber to prevent the producer from blocking.
432  pub fn bounded(size: usize) -> (Self, EventSubscriber<T, D>) {
433    let (tx, rx) = async_channel::bounded(size);
434    (Self { tx }, EventSubscriber { rx })
435  }
436
437  /// Creates an unbounded producer and subscriber.
438  ///
439  /// The created subscriber has no limit on the number of events it can hold.
440  pub fn unbounded() -> (Self, EventSubscriber<T, D>) {
441    let (tx, rx) = async_channel::unbounded();
442    (Self { tx }, EventSubscriber { rx })
443  }
444}
445
446/// Subscribe the events from the Serf instance.
447#[pin_project::pin_project]
448#[derive(Debug)]
449pub struct EventSubscriber<T, D>
450where
451  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
452  T: Transport,
453{
454  #[pin]
455  pub(crate) rx: async_channel::Receiver<CrateEvent<T, D>>,
456}
457
458impl<T, D> EventSubscriber<T, D>
459where
460  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
461  T: Transport,
462{
463  /// Receives a event from the subscriber.
464  ///
465  /// If the subscriber is empty, this method waits until there is a event.
466  ///
467  /// If the subscriber is closed, this method receives a event or returns an error if there are no more events
468  pub async fn recv(&self) -> Result<Event<T, D>, RecvError> {
469    loop {
470      match self.rx.recv().await {
471        Ok(CrateEvent::InternalQuery { .. }) => continue,
472        Ok(CrateEvent::Member(e)) => return Ok(Event::Member(e)),
473        Ok(CrateEvent::User(e)) => return Ok(Event::User(e)),
474        Ok(CrateEvent::Query(e)) => return Ok(Event::Query(e)),
475        Err(e) => return Err(e),
476      }
477    }
478  }
479
480  /// Tries to receive a event from the subscriber.
481  ///
482  /// If the subscriber is empty, this method returns an error.
483  /// If the subscriber is closed, this method receives a event or returns an error if there are no more events
484  pub fn try_recv(&self) -> Result<Event<T, D>, TryRecvError> {
485    loop {
486      match self.rx.try_recv() {
487        Ok(CrateEvent::InternalQuery { .. }) => continue,
488        Ok(CrateEvent::Member(e)) => return Ok(Event::Member(e)),
489        Ok(CrateEvent::User(e)) => return Ok(Event::User(e)),
490        Ok(CrateEvent::Query(e)) => return Ok(Event::Query(e)),
491        Err(e) => return Err(e),
492      }
493    }
494  }
495
496  /// Returns `true` if the subscriber is empty.
497  pub fn is_empty(&self) -> bool {
498    self.rx.is_empty()
499  }
500
501  /// Returns `true` if the channel is closed.
502  pub fn is_closed(&self) -> bool {
503    self.rx.is_closed()
504  }
505
506  /// Returns the number of events in the subscriber.
507  pub fn len(&self) -> usize {
508    self.rx.len()
509  }
510}
511
512impl<T, D> Stream for EventSubscriber<T, D>
513where
514  D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
515  T: Transport,
516{
517  type Item = Event<T, D>;
518
519  fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
520    match <async_channel::Receiver<CrateEvent<T, D>> as Stream>::poll_next(self.project().rx, cx) {
521      Poll::Ready(Some(event)) => match event {
522        CrateEvent::Member(e) => Poll::Ready(Some(Event::Member(e))),
523        CrateEvent::User(e) => Poll::Ready(Some(Event::User(e))),
524        CrateEvent::Query(e) => Poll::Ready(Some(Event::Query(e))),
525        CrateEvent::InternalQuery { .. } => Poll::Pending,
526      },
527      Poll::Ready(None) => Poll::Ready(None),
528      Poll::Pending => Poll::Pending,
529    }
530  }
531}