use crate::base::MessageHandoff;
use agnostic_lite::AsyncSpawner;
use super::*;
impl<D, T> Memberlist<T, D>
where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
T: Transport,
{
pub(crate) fn packet_handler(
&self,
shutdown_rx: async_channel::Receiver<()>,
) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
let this = self.clone();
let handoff_rx = this.inner.handoff_rx.clone();
<T::Runtime as RuntimeLite>::spawn(async move {
loop {
futures::select! {
_ = shutdown_rx.recv().fuse() => {
return;
}
_ = handoff_rx.recv().fuse() => while let Some(msg) = this.get_next_message().await {
match msg.msg {
Message::Suspect(m) => this.handle_suspect(msg.from, m).await,
Message::Alive(m) => this.handle_alive(msg.from, m).await,
Message::Dead(m) => this.handle_dead(msg.from, m).await,
Message::UserData(m) => this.handle_user(msg.from, m).await,
m => {
tracing::error!(target = "memberlist.packet", "message type ({}) not supported {} (packet handler)", m.kind(), msg.from);
}
}
}
}
}
})
}
async fn get_next_message(
&self,
) -> Option<MessageHandoff<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>> {
let mut queue = self.inner.queue.lock().await;
queue.high.pop_back().or_else(|| queue.low.pop_back())
}
async fn handle_suspect(
&self,
from: <T::Resolver as AddressResolver>::ResolvedAddress,
suspect: Suspect<T::Id>,
) {
if let Err(e) = self.suspect_node(suspect).await {
tracing::error!(err=%e, remote_addr = %from, "memberlist.packet: failed to suspect node");
}
}
async fn handle_alive(
&self,
from: <T::Resolver as AddressResolver>::ResolvedAddress,
alive: Alive<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
) {
if let Err(e) = self.inner.transport.blocked_address(&from) {
tracing::error!(err=%e, remote_addr = %from, "memberlist.packet: blocked alive message");
return;
}
if let Err(e) = self.inner.transport.blocked_address(alive.node().address()) {
tracing::error!(err=%e, remote_addr = %from, "memberlist.packet: blocked alive address {} message from {}", alive.node().address(), from);
return;
}
self.alive_node(alive, None, false).await
}
async fn handle_dead(
&self,
from: <T::Resolver as AddressResolver>::ResolvedAddress,
dead: Dead<T::Id>,
) {
let mut memberlist = self.inner.nodes.write().await;
if let Err(e) = self.dead_node(&mut memberlist, dead).await {
tracing::error!(err=%e, remote_addr = %from, "memberlist.packet: failed to mark node as dead");
}
}
async fn handle_user(
&self,
from: <T::Resolver as AddressResolver>::ResolvedAddress,
data: Bytes,
) {
if let Some(d) = self.delegate.as_ref() {
tracing::trace!(remote_addr = %from, data=?data.as_ref(), "memberlist.packet: handle user data");
d.notify_message(data).await
}
}
}