1use std::{pin::Pin, sync::Arc, task::Poll, time::Duration};
2
3use crate::delegate::TransformDelegate;
4
5use self::error::Error;
6
7use super::{delegate::Delegate, types::Epoch, *};
8
9mod crate_event;
10
11use async_channel::Sender;
12pub use async_channel::{RecvError, TryRecvError};
13
14use async_lock::Mutex;
15pub(crate) use crate_event::*;
16use futures::Stream;
17use memberlist_core::{
18 bytes::{BufMut, Bytes, BytesMut},
19 transport::{AddressResolver, Transport},
20 types::TinyVec,
21 CheapClone,
22};
23use serf_types::{
24 LamportTime, Member, MessageType, Node, QueryFlag, QueryResponseMessage, UserEventMessage,
25};
26use smol_str::SmolStr;
27
28pub(crate) struct QueryContext<T, D>
29where
30 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
31 T: Transport,
32{
33 pub(crate) query_timeout: Duration,
34 pub(crate) span: Mutex<Option<Epoch>>,
35 pub(crate) this: Serf<T, D>,
36}
37
38impl<T, D> QueryContext<T, D>
39where
40 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
41 T: Transport,
42{
43 fn check_response_size(&self, resp: &[u8]) -> Result<(), Error<T, D>> {
44 let resp_len = resp.len();
45 if resp_len > self.this.inner.opts.query_response_size_limit {
46 Err(Error::query_response_too_large(
47 self.this.inner.opts.query_response_size_limit,
48 resp_len,
49 ))
50 } else {
51 Ok(())
52 }
53 }
54
55 async fn respond_with_message_and_response(
56 &self,
57 respond_to: &<T::Resolver as AddressResolver>::ResolvedAddress,
58 relay_factor: u8,
59 raw: Bytes,
60 resp: QueryResponseMessage<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
61 ) -> Result<(), Error<T, D>> {
62 self.check_response_size(raw.as_ref())?;
63
64 let mut mu = self.span.lock().await;
65
66 if let Some(span) = *mu {
67 if span.elapsed() > self.query_timeout {
69 return Err(Error::query_timeout());
70 }
71
72 self.this.inner.memberlist.send(respond_to, raw).await?;
74
75 self
77 .this
78 .relay_response(relay_factor, resp.from.cheap_clone(), resp)
79 .await?;
80
81 *mu = None;
83 Ok(())
84 } else {
85 Err(Error::query_already_responsed())
86 }
87 }
88
89 async fn respond(
90 &self,
91 respond_to: &<T::Resolver as AddressResolver>::ResolvedAddress,
92 id: u32,
93 ltime: LamportTime,
94 relay_factor: u8,
95 msg: Bytes,
96 ) -> Result<(), Error<T, D>> {
97 let resp = QueryResponseMessage {
98 ltime,
99 id,
100 from: self.this.advertise_node(),
101 flags: QueryFlag::empty(),
102 payload: msg,
103 };
104 let expected_encoded_len = <D as TransformDelegate>::message_encoded_len(&resp);
105 let mut buf = BytesMut::with_capacity(expected_encoded_len + 1); buf.put_u8(MessageType::QueryResponse as u8);
107 buf.resize(expected_encoded_len + 1, 0);
108 let len = <D as TransformDelegate>::encode_message(&resp, &mut buf[1..])
109 .map_err(Error::transform_delegate)?;
110 debug_assert_eq!(
111 len, expected_encoded_len,
112 "expected encoded len {expected_encoded_len} is not match the actual encoded len {len}"
113 );
114 self
115 .respond_with_message_and_response(respond_to, relay_factor, buf.freeze(), resp)
116 .await
117 }
118}
119
120pub struct QueryEvent<T, D>
122where
123 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
124 T: Transport,
125{
126 pub(crate) ltime: LamportTime,
127 pub(crate) name: SmolStr,
128 pub(crate) payload: Bytes,
129
130 pub(crate) ctx: Arc<QueryContext<T, D>>,
131 pub(crate) id: u32,
132 pub(crate) from: Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
134 pub(crate) relay_factor: u8,
136}
137
138impl<D, T> QueryEvent<T, D>
139where
140 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
141 T: Transport,
142{
143 #[inline]
145 pub const fn lamport_time(&self) -> LamportTime {
146 self.ltime
147 }
148
149 #[inline]
151 pub const fn name(&self) -> &SmolStr {
152 &self.name
153 }
154
155 #[inline]
157 pub const fn payload(&self) -> &Bytes {
158 &self.payload
159 }
160
161 #[inline]
163 pub const fn id(&self) -> u32 {
164 self.id
165 }
166
167 #[inline]
169 pub const fn from(&self) -> &Node<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
170 &self.from
171 }
172}
173
174impl<D, T> PartialEq for QueryEvent<T, D>
175where
176 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
177 T: Transport,
178{
179 fn eq(&self, other: &Self) -> bool {
180 self.id == other.id
181 && self.from == other.from
182 && self.relay_factor == other.relay_factor
183 && self.ltime == other.ltime
184 && self.name == other.name
185 && self.payload == other.payload
186 }
187}
188
189impl<D, T> AsRef<QueryEvent<T, D>> for QueryEvent<T, D>
190where
191 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
192 T: Transport,
193{
194 fn as_ref(&self) -> &QueryEvent<T, D> {
195 self
196 }
197}
198
199impl<D, T> Clone for QueryEvent<T, D>
200where
201 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
202 T: Transport,
203{
204 fn clone(&self) -> Self {
205 Self {
206 ltime: self.ltime,
207 name: self.name.clone(),
208 payload: self.payload.clone(),
209 ctx: self.ctx.clone(),
210 id: self.id,
211 from: self.from.clone(),
212 relay_factor: self.relay_factor,
213 }
214 }
215}
216
217impl<D, T> core::fmt::Display for QueryEvent<T, D>
218where
219 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
220 T: Transport,
221{
222 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
223 write!(f, "query")
224 }
225}
226
227impl<D, T> QueryEvent<T, D>
228where
229 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
230 T: Transport,
231{
232 #[cfg(feature = "encryption")]
233 pub(crate) fn create_response(
234 &self,
235 buf: Bytes,
236 ) -> QueryResponseMessage<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress> {
237 QueryResponseMessage {
238 ltime: self.ltime,
239 id: self.id,
240 from: self.ctx.this.inner.memberlist.advertise_node(),
241 flags: QueryFlag::empty(),
242 payload: buf,
243 }
244 }
245
246 #[cfg(feature = "encryption")]
247 pub(crate) fn check_response_size(&self, resp: &[u8]) -> Result<(), Error<T, D>> {
248 self.ctx.check_response_size(resp)
249 }
250
251 #[cfg(feature = "encryption")]
252 pub(crate) async fn respond_with_message_and_response(
253 &self,
254 raw: Bytes,
255 resp: QueryResponseMessage<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
256 ) -> Result<(), Error<T, D>> {
257 self
258 .ctx
259 .respond_with_message_and_response(self.from.address(), self.relay_factor, raw, resp)
260 .await
261 }
262
263 pub async fn respond(&self, msg: Bytes) -> Result<(), Error<T, D>> {
265 self
266 .ctx
267 .respond(
268 self.from().address(),
269 self.id,
270 self.ltime,
271 self.relay_factor,
272 msg,
273 )
274 .await
275 }
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
280#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
281#[cfg_attr(feature = "serde", serde(rename_all = "kebab-case", untagged))]
282pub enum MemberEventType {
283 #[cfg_attr(feature = "serde", serde(rename = "member-join"))]
285 Join,
286 #[cfg_attr(feature = "serde", serde(rename = "member-leave"))]
288 Leave,
289 #[cfg_attr(feature = "serde", serde(rename = "member-failed"))]
291 Failed,
292 #[cfg_attr(feature = "serde", serde(rename = "member-update"))]
294 Update,
295 #[cfg_attr(feature = "serde", serde(rename = "member-reap"))]
297 Reap,
298}
299
300impl MemberEventType {
301 #[inline]
303 pub const fn as_str(&self) -> &'static str {
304 match self {
305 Self::Join => "member-join",
306 Self::Leave => "member-leave",
307 Self::Failed => "member-failed",
308 Self::Update => "member-update",
309 Self::Reap => "member-reap",
310 }
311 }
312}
313
314impl core::fmt::Display for MemberEventType {
315 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
316 match self {
317 Self::Join => write!(f, "member-join"),
318 Self::Leave => write!(f, "member-leave"),
319 Self::Failed => write!(f, "member-failed"),
320 Self::Update => write!(f, "member-update"),
321 Self::Reap => write!(f, "member-reap"),
322 }
323 }
324}
325
326#[derive(Debug, Clone, PartialEq)]
327pub(crate) struct MemberEventMut<I, A> {
328 pub(crate) ty: MemberEventType,
329 pub(crate) members: TinyVec<Member<I, A>>,
330}
331
332impl<I, A> MemberEventMut<I, A> {
333 pub(crate) fn freeze(self) -> MemberEvent<I, A> {
334 MemberEvent {
335 ty: self.ty,
336 members: Arc::new(self.members),
337 }
338 }
339}
340
341#[derive(Debug, PartialEq)]
344pub struct MemberEvent<I, A> {
345 pub(crate) ty: MemberEventType,
346 pub(crate) members: Arc<TinyVec<Member<I, A>>>,
347}
348
349impl<I, A> Clone for MemberEvent<I, A> {
350 fn clone(&self) -> Self {
351 Self {
352 ty: self.ty,
353 members: self.members.clone(),
354 }
355 }
356}
357
358impl<I, A> CheapClone for MemberEvent<I, A> {}
359
360impl<I, A> core::fmt::Display for MemberEvent<I, A> {
361 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
362 write!(f, "{}", self.ty)
363 }
364}
365
366impl<I, A> MemberEvent<I, A> {
367 pub fn ty(&self) -> MemberEventType {
369 self.ty
370 }
371
372 pub fn members(&self) -> &[Member<I, A>] {
374 &self.members
375 }
376}
377
378impl<I, A> From<MemberEvent<I, A>> for (MemberEventType, Arc<TinyVec<Member<I, A>>>) {
379 fn from(event: MemberEvent<I, A>) -> Self {
380 (event.ty, event.members)
381 }
382}
383
384#[derive(derive_more::From)]
386pub enum Event<T, D>
387where
388 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
389 T: Transport,
390{
391 Member(MemberEvent<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>),
393 User(UserEventMessage),
395 Query(QueryEvent<T, D>),
397}
398
399impl<D, T> Clone for Event<T, D>
400where
401 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
402 T: Transport,
403{
404 fn clone(&self) -> Self {
405 match self {
406 Self::Member(e) => Self::Member(e.cheap_clone()),
407 Self::User(e) => Self::User(e.cheap_clone()),
408 Self::Query(e) => Self::Query(e.clone()),
409 }
410 }
411}
412
413#[derive(Debug)]
415pub struct EventProducer<T, D>
416where
417 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
418 T: Transport,
419{
420 pub(crate) tx: Sender<CrateEvent<T, D>>,
421}
422
423impl<T, D> EventProducer<T, D>
424where
425 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
426 T: Transport,
427{
428 pub fn bounded(size: usize) -> (Self, EventSubscriber<T, D>) {
433 let (tx, rx) = async_channel::bounded(size);
434 (Self { tx }, EventSubscriber { rx })
435 }
436
437 pub fn unbounded() -> (Self, EventSubscriber<T, D>) {
441 let (tx, rx) = async_channel::unbounded();
442 (Self { tx }, EventSubscriber { rx })
443 }
444}
445
446#[pin_project::pin_project]
448#[derive(Debug)]
449pub struct EventSubscriber<T, D>
450where
451 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
452 T: Transport,
453{
454 #[pin]
455 pub(crate) rx: async_channel::Receiver<CrateEvent<T, D>>,
456}
457
458impl<T, D> EventSubscriber<T, D>
459where
460 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
461 T: Transport,
462{
463 pub async fn recv(&self) -> Result<Event<T, D>, RecvError> {
469 loop {
470 match self.rx.recv().await {
471 Ok(CrateEvent::InternalQuery { .. }) => continue,
472 Ok(CrateEvent::Member(e)) => return Ok(Event::Member(e)),
473 Ok(CrateEvent::User(e)) => return Ok(Event::User(e)),
474 Ok(CrateEvent::Query(e)) => return Ok(Event::Query(e)),
475 Err(e) => return Err(e),
476 }
477 }
478 }
479
480 pub fn try_recv(&self) -> Result<Event<T, D>, TryRecvError> {
485 loop {
486 match self.rx.try_recv() {
487 Ok(CrateEvent::InternalQuery { .. }) => continue,
488 Ok(CrateEvent::Member(e)) => return Ok(Event::Member(e)),
489 Ok(CrateEvent::User(e)) => return Ok(Event::User(e)),
490 Ok(CrateEvent::Query(e)) => return Ok(Event::Query(e)),
491 Err(e) => return Err(e),
492 }
493 }
494 }
495
496 pub fn is_empty(&self) -> bool {
498 self.rx.is_empty()
499 }
500
501 pub fn is_closed(&self) -> bool {
503 self.rx.is_closed()
504 }
505
506 pub fn len(&self) -> usize {
508 self.rx.len()
509 }
510}
511
512impl<T, D> Stream for EventSubscriber<T, D>
513where
514 D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
515 T: Transport,
516{
517 type Item = Event<T, D>;
518
519 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
520 match <async_channel::Receiver<CrateEvent<T, D>> as Stream>::poll_next(self.project().rx, cx) {
521 Poll::Ready(Some(event)) => match event {
522 CrateEvent::Member(e) => Poll::Ready(Some(Event::Member(e))),
523 CrateEvent::User(e) => Poll::Ready(Some(Event::User(e))),
524 CrateEvent::Query(e) => Poll::Ready(Some(Event::Query(e))),
525 CrateEvent::InternalQuery { .. } => Poll::Pending,
526 },
527 Poll::Ready(None) => Poll::Ready(None),
528 Poll::Pending => Poll::Pending,
529 }
530 }
531}