1use std::{pin::Pin, sync::Arc, task::Poll, time::Duration};
2
3use self::error::Error;
4
5use super::{delegate::Delegate, types::Epoch, *};
6
7mod crate_event;
8
9use async_channel::Sender;
10pub use async_channel::{RecvError, TryRecvError};
11
12use crate::types::{LamportTime, Member, Node, QueryFlag, QueryResponseMessage, UserEventMessage};
13use async_lock::Mutex;
14pub(crate) use crate_event::*;
15use futures::Stream;
16use memberlist_core::{CheapClone, bytes::Bytes, proto::TinyVec, transport::Transport};
17use smol_str::SmolStr;
18
19pub(crate) struct QueryContext<T, D>
20where
21 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
22 T: Transport,
23{
24 pub(crate) query_timeout: Duration,
25 pub(crate) span: Mutex<Option<Epoch>>,
26 pub(crate) this: Serf<T, D>,
27}
28
29impl<T, D> QueryContext<T, D>
30where
31 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
32 T: Transport,
33{
34 fn check_response_size(&self, size: usize) -> Result<(), Error<T, D>> {
35 if size > self.this.inner.opts.query_response_size_limit {
36 Err(Error::query_response_too_large(
37 self.this.inner.opts.query_response_size_limit,
38 size,
39 ))
40 } else {
41 Ok(())
42 }
43 }
44
45 async fn respond_with_message_and_response(
46 &self,
47 respond_to: &T::ResolvedAddress,
48 relay_factor: u8,
49 raw: Bytes,
50 resp: QueryResponseMessage<T::Id, T::ResolvedAddress>,
51 ) -> Result<(), Error<T, D>> {
52 self.check_response_size(raw.len())?;
53
54 let mut mu = self.span.lock().await;
55
56 if let Some(span) = *mu {
57 if span.elapsed() > self.query_timeout {
59 return Err(Error::query_timeout());
60 }
61
62 self.this.inner.memberlist.send(respond_to, raw).await?;
64
65 self
67 .this
68 .relay_response(relay_factor, resp.from.cheap_clone(), resp)
69 .await?;
70
71 *mu = None;
73 Ok(())
74 } else {
75 Err(Error::query_already_responsed())
76 }
77 }
78
79 async fn respond(
80 &self,
81 respond_to: &T::ResolvedAddress,
82 id: u32,
83 ltime: LamportTime,
84 relay_factor: u8,
85 msg: Bytes,
86 ) -> Result<(), Error<T, D>> {
87 let resp = QueryResponseMessage {
88 ltime,
89 id,
90 from: self.this.advertise_node(),
91 flags: QueryFlag::empty(),
92 payload: msg,
93 };
94 let buf = crate::types::encode_message_to_bytes(&resp)?;
95 self
96 .respond_with_message_and_response(respond_to, relay_factor, buf, resp)
97 .await
98 }
99}
100
101pub struct QueryEvent<T, D>
103where
104 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
105 T: Transport,
106{
107 pub(crate) ltime: LamportTime,
108 pub(crate) name: SmolStr,
109 pub(crate) payload: Bytes,
110
111 pub(crate) ctx: Arc<QueryContext<T, D>>,
112 pub(crate) id: u32,
113 pub(crate) from: Node<T::Id, T::ResolvedAddress>,
115 pub(crate) relay_factor: u8,
117}
118
119impl<D, T> QueryEvent<T, D>
120where
121 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
122 T: Transport,
123{
124 #[inline]
126 pub const fn lamport_time(&self) -> LamportTime {
127 self.ltime
128 }
129
130 #[inline]
132 pub const fn name(&self) -> &SmolStr {
133 &self.name
134 }
135
136 #[inline]
138 pub const fn payload(&self) -> &Bytes {
139 &self.payload
140 }
141
142 #[inline]
144 pub const fn id(&self) -> u32 {
145 self.id
146 }
147
148 #[inline]
150 pub const fn from(&self) -> &Node<T::Id, T::ResolvedAddress> {
151 &self.from
152 }
153}
154
155impl<D, T> PartialEq for QueryEvent<T, D>
156where
157 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
158 T: Transport,
159{
160 fn eq(&self, other: &Self) -> bool {
161 self.id == other.id
162 && self.from == other.from
163 && self.relay_factor == other.relay_factor
164 && self.ltime == other.ltime
165 && self.name == other.name
166 && self.payload == other.payload
167 }
168}
169
170impl<D, T> AsRef<QueryEvent<T, D>> for QueryEvent<T, D>
171where
172 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
173 T: Transport,
174{
175 fn as_ref(&self) -> &QueryEvent<T, D> {
176 self
177 }
178}
179
180impl<D, T> Clone for QueryEvent<T, D>
181where
182 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
183 T: Transport,
184{
185 fn clone(&self) -> Self {
186 Self {
187 ltime: self.ltime,
188 name: self.name.clone(),
189 payload: self.payload.clone(),
190 ctx: self.ctx.clone(),
191 id: self.id,
192 from: self.from.clone(),
193 relay_factor: self.relay_factor,
194 }
195 }
196}
197
198impl<D, T> core::fmt::Display for QueryEvent<T, D>
199where
200 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
201 T: Transport,
202{
203 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
204 write!(f, "query")
205 }
206}
207
208impl<D, T> QueryEvent<T, D>
209where
210 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
211 T: Transport,
212{
213 #[cfg(feature = "encryption")]
214 pub(crate) fn create_response(
215 &self,
216 buf: Bytes,
217 ) -> QueryResponseMessage<T::Id, T::ResolvedAddress> {
218 QueryResponseMessage {
219 ltime: self.ltime,
220 id: self.id,
221 from: self.ctx.this.inner.memberlist.advertise_node(),
222 flags: QueryFlag::empty(),
223 payload: buf,
224 }
225 }
226
227 #[cfg(feature = "encryption")]
228 pub(crate) fn check_response_size(&self, size: usize) -> Result<(), Error<T, D>> {
229 self.ctx.check_response_size(size)
230 }
231
232 #[cfg(feature = "encryption")]
233 pub(crate) async fn respond_with_message_and_response(
234 &self,
235 raw: Bytes,
236 resp: QueryResponseMessage<T::Id, T::ResolvedAddress>,
237 ) -> Result<(), Error<T, D>> {
238 self
239 .ctx
240 .respond_with_message_and_response(self.from.address(), self.relay_factor, raw, resp)
241 .await
242 }
243
244 pub async fn respond(&self, msg: Bytes) -> Result<(), Error<T, D>> {
246 self
247 .ctx
248 .respond(
249 self.from().address(),
250 self.id,
251 self.ltime,
252 self.relay_factor,
253 msg,
254 )
255 .await
256 }
257}
258
259#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
261#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
262#[cfg_attr(feature = "serde", serde(rename_all = "kebab-case", untagged))]
263pub enum MemberEventType {
264 #[cfg_attr(feature = "serde", serde(rename = "member-join"))]
266 Join,
267 #[cfg_attr(feature = "serde", serde(rename = "member-leave"))]
269 Leave,
270 #[cfg_attr(feature = "serde", serde(rename = "member-failed"))]
272 Failed,
273 #[cfg_attr(feature = "serde", serde(rename = "member-update"))]
275 Update,
276 #[cfg_attr(feature = "serde", serde(rename = "member-reap"))]
278 Reap,
279}
280
281impl MemberEventType {
282 #[inline]
284 pub const fn as_str(&self) -> &'static str {
285 match self {
286 Self::Join => "member-join",
287 Self::Leave => "member-leave",
288 Self::Failed => "member-failed",
289 Self::Update => "member-update",
290 Self::Reap => "member-reap",
291 }
292 }
293}
294
295impl core::fmt::Display for MemberEventType {
296 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
297 match self {
298 Self::Join => write!(f, "member-join"),
299 Self::Leave => write!(f, "member-leave"),
300 Self::Failed => write!(f, "member-failed"),
301 Self::Update => write!(f, "member-update"),
302 Self::Reap => write!(f, "member-reap"),
303 }
304 }
305}
306
307#[derive(Debug, Clone, PartialEq)]
308pub(crate) struct MemberEventMut<I, A> {
309 pub(crate) ty: MemberEventType,
310 pub(crate) members: TinyVec<Member<I, A>>,
311}
312
313impl<I, A> MemberEventMut<I, A> {
314 pub(crate) fn freeze(self) -> MemberEvent<I, A> {
315 MemberEvent {
316 ty: self.ty,
317 members: Arc::new(self.members),
318 }
319 }
320}
321
322#[derive(Debug, PartialEq)]
325pub struct MemberEvent<I, A> {
326 pub(crate) ty: MemberEventType,
327 pub(crate) members: Arc<TinyVec<Member<I, A>>>,
328}
329
330impl<I, A> Clone for MemberEvent<I, A> {
331 fn clone(&self) -> Self {
332 Self {
333 ty: self.ty,
334 members: self.members.clone(),
335 }
336 }
337}
338
339impl<I, A> CheapClone for MemberEvent<I, A> {}
340
341impl<I, A> core::fmt::Display for MemberEvent<I, A> {
342 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
343 write!(f, "{}", self.ty)
344 }
345}
346
347impl<I, A> MemberEvent<I, A> {
348 pub fn ty(&self) -> MemberEventType {
350 self.ty
351 }
352
353 pub fn members(&self) -> &[Member<I, A>] {
355 &self.members
356 }
357}
358
359impl<I, A> From<MemberEvent<I, A>> for (MemberEventType, Arc<TinyVec<Member<I, A>>>) {
360 fn from(event: MemberEvent<I, A>) -> Self {
361 (event.ty, event.members)
362 }
363}
364
365#[derive(derive_more::From)]
367pub enum Event<T, D>
368where
369 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
370 T: Transport,
371{
372 Member(MemberEvent<T::Id, T::ResolvedAddress>),
374 User(UserEventMessage),
376 Query(QueryEvent<T, D>),
378}
379
380impl<D, T> Clone for Event<T, D>
381where
382 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
383 T: Transport,
384{
385 fn clone(&self) -> Self {
386 match self {
387 Self::Member(e) => Self::Member(e.cheap_clone()),
388 Self::User(e) => Self::User(e.cheap_clone()),
389 Self::Query(e) => Self::Query(e.clone()),
390 }
391 }
392}
393
394#[derive(Debug)]
396pub struct EventProducer<T, D>
397where
398 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
399 T: Transport,
400{
401 pub(crate) tx: Sender<CrateEvent<T, D>>,
402}
403
404impl<T, D> EventProducer<T, D>
405where
406 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
407 T: Transport,
408{
409 pub fn bounded(size: usize) -> (Self, EventSubscriber<T, D>) {
414 let (tx, rx) = async_channel::bounded(size);
415 (Self { tx }, EventSubscriber { rx })
416 }
417
418 pub fn unbounded() -> (Self, EventSubscriber<T, D>) {
422 let (tx, rx) = async_channel::unbounded();
423 (Self { tx }, EventSubscriber { rx })
424 }
425}
426
427#[pin_project::pin_project]
429#[derive(Debug)]
430pub struct EventSubscriber<T, D>
431where
432 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
433 T: Transport,
434{
435 #[pin]
436 pub(crate) rx: async_channel::Receiver<CrateEvent<T, D>>,
437}
438
439impl<T, D> EventSubscriber<T, D>
440where
441 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
442 T: Transport,
443{
444 pub async fn recv(&self) -> Result<Event<T, D>, RecvError> {
450 loop {
451 match self.rx.recv().await {
452 Ok(CrateEvent::InternalQuery { .. }) => continue,
453 Ok(CrateEvent::Member(e)) => return Ok(Event::Member(e)),
454 Ok(CrateEvent::User(e)) => return Ok(Event::User(e)),
455 Ok(CrateEvent::Query(e)) => return Ok(Event::Query(e)),
456 Err(e) => return Err(e),
457 }
458 }
459 }
460
461 pub fn try_recv(&self) -> Result<Event<T, D>, TryRecvError> {
466 loop {
467 match self.rx.try_recv() {
468 Ok(CrateEvent::InternalQuery { .. }) => continue,
469 Ok(CrateEvent::Member(e)) => return Ok(Event::Member(e)),
470 Ok(CrateEvent::User(e)) => return Ok(Event::User(e)),
471 Ok(CrateEvent::Query(e)) => return Ok(Event::Query(e)),
472 Err(e) => return Err(e),
473 }
474 }
475 }
476
477 pub fn is_empty(&self) -> bool {
479 self.rx.is_empty()
480 }
481
482 pub fn is_closed(&self) -> bool {
484 self.rx.is_closed()
485 }
486
487 pub fn len(&self) -> usize {
489 self.rx.len()
490 }
491}
492
493impl<T, D> Stream for EventSubscriber<T, D>
494where
495 D: Delegate<Id = T::Id, Address = T::ResolvedAddress>,
496 T: Transport,
497{
498 type Item = Event<T, D>;
499
500 fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
501 match <async_channel::Receiver<CrateEvent<T, D>> as Stream>::poll_next(self.project().rx, cx) {
502 Poll::Ready(Some(event)) => match event {
503 CrateEvent::Member(e) => Poll::Ready(Some(Event::Member(e))),
504 CrateEvent::User(e) => Poll::Ready(Some(Event::User(e))),
505 CrateEvent::Query(e) => Poll::Ready(Some(Event::Query(e))),
506 CrateEvent::InternalQuery { .. } => Poll::Pending,
507 },
508 Poll::Ready(None) => Poll::Ready(None),
509 Poll::Pending => Poll::Pending,
510 }
511 }
512}