#![warn(missing_docs)]
use async_recursion::async_recursion;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Result;
use crate::message::types::FindSuccessorSend;
use crate::message::types::Message;
use crate::message::types::QueryForTopoInfoSend;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorThen;
use crate::message::MessageHandlerEvent;
use crate::message::MessagePayload;
#[macro_export]
macro_rules! handle_multi_actions {
($actions:expr, $handler_func:expr, $error_msg:expr) => {{
let ret: Vec<MessageHandlerEvent> =
futures::future::join_all($actions.iter().map($handler_func))
.await
.iter()
.map(|x| {
if x.is_err() {
tracing::error!($error_msg, x)
};
x
})
.filter_map(|x| x.as_ref().ok())
.flat_map(|xs| xs.iter())
.cloned()
.collect();
Ok(ret)
}};
}
#[cfg_attr(feature = "wasm", async_recursion(?Send))]
#[cfg_attr(not(feature = "wasm"), async_recursion)]
pub async fn handle_dht_events(
act: &PeerRingAction,
ctx: &MessagePayload<Message>,
) -> Result<Vec<MessageHandlerEvent>> {
match act {
PeerRingAction::None => Ok(vec![]),
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::FindSuccessorForConnect(did)) => {
if next != did {
Ok(vec![MessageHandlerEvent::SendDirectMessage(
Message::FindSuccessorSend(FindSuccessorSend {
did: *did,
strict: false,
then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect),
}),
*next,
)])
} else {
Ok(vec![])
}
}
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::QueryForSuccessorList) => {
Ok(vec![MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_sync(*next)),
*next,
)])
}
PeerRingAction::RemoteAction(did, PeerRingRemoteAction::TryConnect) => {
Ok(vec![MessageHandlerEvent::ConnectVia(
*did,
ctx.relay.origin_sender(),
)])
}
PeerRingAction::RemoteAction(did, PeerRingRemoteAction::Notify(target_id)) => {
if did == target_id {
tracing::warn!("Did is equal to target_id, may implement wrong.");
return Ok(vec![]);
}
Ok(vec![MessageHandlerEvent::Notify(*target_id)])
}
PeerRingAction::MultiActions(acts) => {
handle_multi_actions!(
acts,
|act| async { handle_dht_events(act, ctx).await },
"Failed on handle multi actions: {:#?}"
)
}
_ => unreachable!(),
}
}