mod member;
pub(crate) use member::*;
mod user;
pub(crate) use user::*;
use std::{future::Future, time::Duration};
use async_channel::{Receiver, Sender, bounded};
use futures::FutureExt;
use memberlist_core::{
agnostic_lite::RuntimeLite,
tracing,
transport::{AddressResolver, Transport},
};
use crate::delegate::Delegate;
use super::event::CrateEvent;
pub(crate) struct ClosedOutChannel;
pub(crate) trait Coalescer: Send + Sync + 'static {
type Delegate: Delegate<
Id = <Self::Transport as Transport>::Id,
Address = <<Self::Transport as Transport>::Resolver as AddressResolver>::ResolvedAddress,
>;
type Transport: Transport;
fn name(&self) -> &'static str;
fn handle(&self, event: &CrateEvent<Self::Transport, Self::Delegate>) -> bool;
fn coalesce(&mut self, event: CrateEvent<Self::Transport, Self::Delegate>);
fn flush(
&mut self,
out_tx: &Sender<CrateEvent<Self::Transport, Self::Delegate>>,
) -> impl Future<Output = Result<(), ClosedOutChannel>> + Send;
}
pub(crate) fn coalesced_event<C: Coalescer>(
out_tx: Sender<CrateEvent<C::Transport, C::Delegate>>,
shutdown_rx: Receiver<()>,
c_period: Duration,
q_period: Duration,
c: C,
) -> Sender<CrateEvent<C::Transport, C::Delegate>> {
let (in_tx, in_rx) = bounded(1024);
<<C::Transport as Transport>::Runtime as RuntimeLite>::spawn_detach(coalesce_loop::<C>(
in_rx,
out_tx,
shutdown_rx,
c_period,
q_period,
c,
));
in_tx
}
async fn coalesce_loop<C: Coalescer>(
in_rx: Receiver<CrateEvent<C::Transport, C::Delegate>>,
out_tx: Sender<CrateEvent<C::Transport, C::Delegate>>,
shutdown_rx: Receiver<()>,
coalesce_peirod: Duration,
quiescent_period: Duration,
mut c: C,
) {
let mut quiescent = None;
let mut quantum = None;
let mut shutdown = false;
loop {
futures::select! {
ev = in_rx.recv().fuse() => {
let Ok(ev) = ev else {
return;
};
if !c.handle(&ev) {
if let Err(e) = out_tx.send(ev).await {
tracing::error!(err=%e, "serf: fail send event to out channel in {} coalesce thread", c.name());
return;
}
continue;
}
if quantum.is_none() {
quantum = Some(<<C::Transport as Transport>::Runtime as RuntimeLite>::sleep(coalesce_peirod));
}
quiescent = Some(<<C::Transport as Transport>::Runtime as RuntimeLite>::sleep(quiescent_period));
c.coalesce(ev);
}
_ = async {
if let Some(quantum) = quantum.take() {
quantum.await;
} else {
std::future::pending::<()>().await;
}
}.fuse() => {
if c.flush(&out_tx).await.is_err() {
tracing::error!(err="closed channel", "serf: fail send event to out channel in {} coalesce thread", c.name());
return;
}
if !shutdown {
quiescent = None;
quantum = None;
continue;
}
return;
}
_ = async {
if let Some(quiescent) = quiescent.take() {
quiescent.await;
} else {
std::future::pending::<()>().await;
}
}.fuse() => {
if c.flush(&out_tx).await.is_err() {
tracing::error!(err="closed channel", "serf: fail send event to out channel in {} coalesce thread", c.name());
return;
}
if !shutdown {
quantum = None;
quiescent = None;
continue;
}
return;
}
_ = shutdown_rx.recv().fuse() => {
shutdown = true;
}
}
}
}