1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
#![warn(missing_docs)]
//! This module provides helper function for handle DHT related Actions
use async_recursion::async_recursion;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Result;
use crate::message::types::FindSuccessorSend;
use crate::message::types::Message;
use crate::message::types::QueryForTopoInfoSend;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorThen;
use crate::message::MessageHandlerEvent;
use crate::message::MessagePayload;
/// This accept a action instance, handler_func and error_msg string as parameter.
/// This macro is used for handling `PeerRingAction::MultiActions`.
///
/// It accepts three parameters:
/// * `$actions`: This parameter represents the actions that will be processed.
/// * `$handler_func`: This is the handler function that will be used to process each action. It is expected to be
/// an expression that evaluates to a closure. The closure should be an asynchronous function
/// which accepts a single action and returns a `Result<MessageHandlerEvent>`.
/// The function will be called for each action in `$actions`, and should handle the action appropriately.
///
/// * `$error_msg`: This is a string that will be used as the error message if the handler function returns an error.
/// The string should include one set of braces `{}` that will be filled with the `Debug` representation
/// of the error returned from the handler function.
///
/// The macro returns a `Result<Vec<MessageHandlerEvent>>`. If all actions are processed successfully, it returns
/// `Ok(Vec<MessageHandlerEvent>)`, where the vector includes all the successful results from the handler function.
/// If any action fails, an error message will be logged, but the error will not be returned from the macro; instead,
/// it will continue with the next action.
///
/// The macro is asynchronous, so it should be used within an `async` context.
#[macro_export]
macro_rules! handle_multi_actions {
($actions:expr, $handler_func:expr, $error_msg:expr) => {{
let ret: Vec<MessageHandlerEvent> =
futures::future::join_all($actions.iter().map($handler_func))
.await
.iter()
.map(|x| {
if x.is_err() {
tracing::error!($error_msg, x)
};
x
})
.filter_map(|x| x.as_ref().ok())
.flat_map(|xs| xs.iter())
.cloned()
.collect();
Ok(ret)
}};
}
/// Handler of join dht event from PeerRing DHT.
#[cfg_attr(feature = "wasm", async_recursion(?Send))]
#[cfg_attr(not(feature = "wasm"), async_recursion)]
pub async fn handle_dht_events(
act: &PeerRingAction,
ctx: &MessagePayload<Message>,
) -> Result<Vec<MessageHandlerEvent>> {
match act {
PeerRingAction::None => Ok(vec![]),
// Ask next hop to find successor for did,
// if there is only two nodes A, B, it may cause loop, for example
// A's successor is B, B ask A to find successor for B
// A may send message to it's successor, which is B
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::FindSuccessorForConnect(did)) => {
if next != did {
Ok(vec![MessageHandlerEvent::SendDirectMessage(
Message::FindSuccessorSend(FindSuccessorSend {
did: *did,
strict: false,
then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect),
}),
*next,
)])
} else {
Ok(vec![])
}
}
// A new successor is set, request the new successor for it's successor list
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::QueryForSuccessorList) => {
Ok(vec![MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_sync(*next)),
*next,
)])
}
PeerRingAction::RemoteAction(did, PeerRingRemoteAction::TryConnect) => {
Ok(vec![MessageHandlerEvent::ConnectVia(
*did,
ctx.relay.origin_sender(),
)])
}
PeerRingAction::RemoteAction(did, PeerRingRemoteAction::Notify(target_id)) => {
if did == target_id {
tracing::warn!("Did is equal to target_id, may implement wrong.");
return Ok(vec![]);
}
Ok(vec![MessageHandlerEvent::Notify(*target_id)])
}
PeerRingAction::MultiActions(acts) => {
handle_multi_actions!(
acts,
|act| async { handle_dht_events(act, ctx).await },
"Failed on handle multi actions: {:#?}"
)
}
_ => unreachable!(),
}
}