#![warn(missing_docs)]
use std::sync::Arc;
use async_recursion::async_recursion;
use async_trait::async_trait;
use super::MessagePayload;
use crate::dht::Chord;
use crate::dht::CorrectChord;
use crate::dht::Did;
use crate::dht::PeerRing;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Error;
use crate::error::Result;
use crate::message::types::FindSuccessorSend;
use crate::message::types::Message;
use crate::message::types::QueryForTopoInfoSend;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorThen;
use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::swarm::callback::InnerSwarmCallback;
use crate::swarm::callback::SharedSwarmCallback;
use crate::swarm::transport::SwarmTransport;
pub mod connection;
pub mod custom;
pub mod stabilization;
pub mod storage;
pub mod subring;
#[derive(Clone)]
pub struct MessageHandler {
transport: Arc<SwarmTransport>,
dht: Arc<PeerRing>,
swarm_callback: SharedSwarmCallback,
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait HandleMsg<T> {
async fn handle(&self, ctx: &MessagePayload, msg: &T) -> Result<()>;
}
impl MessageHandler {
pub fn new(transport: Arc<SwarmTransport>, swarm_callback: SharedSwarmCallback) -> Self {
let dht = transport.dht.clone();
Self {
transport,
dht,
swarm_callback,
}
}
fn inner_callback(&self) -> InnerSwarmCallback {
InnerSwarmCallback::new(self.transport.clone(), self.swarm_callback.clone())
}
pub(crate) async fn join_dht(&self, peer: Did) -> Result<()> {
if cfg!(feature = "experimental") {
let conn = self
.transport
.get_connection(peer)
.ok_or(Error::SwarmMissDidInTable(peer))?;
let dht_ev = self.dht.join_then_sync(conn).await?;
self.handle_dht_events(&dht_ev).await
} else {
let dht_ev = self.dht.join(peer)?;
self.handle_dht_events(&dht_ev).await.unwrap();
Ok(())
}
}
pub(crate) async fn leave_dht(&self, peer: Did) -> Result<()> {
if self
.transport
.get_and_check_connection(peer)
.await
.is_none()
{
self.dht.remove(peer)?
};
Ok(())
}
#[cfg_attr(feature = "wasm", async_recursion(?Send))]
#[cfg_attr(not(feature = "wasm"), async_recursion)]
pub(crate) async fn handle_dht_events(&self, act: &PeerRingAction) -> Result<()> {
match act {
PeerRingAction::None => Ok(()),
PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::FindSuccessorForConnect(did),
) => {
if next != did {
self.transport
.send_direct_message(
Message::FindSuccessorSend(FindSuccessorSend {
did: *did,
strict: false,
then: FindSuccessorThen::Report(
FindSuccessorReportHandler::Connect,
),
}),
*next,
)
.await?;
Ok(())
} else {
Ok(())
}
}
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::QueryForSuccessorList) => {
self.transport
.send_direct_message(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_sync(*next)),
*next,
)
.await?;
Ok(())
}
PeerRingAction::RemoteAction(did, PeerRingRemoteAction::TryConnect) => {
self.transport.connect(*did, self.inner_callback()).await?;
Ok(())
}
PeerRingAction::RemoteAction(did, PeerRingRemoteAction::Notify(target_id)) => {
if did == target_id {
tracing::warn!("Did is equal to target_id, may implement wrong.");
return Ok(());
}
let msg =
Message::NotifyPredecessorSend(NotifyPredecessorSend { did: self.dht.did });
self.transport.send_message(msg, *target_id).await?;
Ok(())
}
PeerRingAction::MultiActions(acts) => {
let jobs = acts
.iter()
.map(|act| async move { self.handle_dht_events(act).await });
for res in futures::future::join_all(jobs).await {
if res.is_err() {
tracing::error!("Failed on handle multi actions: {:#?}", res)
}
}
Ok(())
}
_ => unreachable!(),
}
}
}