1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#![warn(missing_docs)]
use std::str::FromStr;
use async_trait::async_trait;
use super::storage::TChordStorage;
use crate::dht::subring::SubRing;
use crate::dht::vnode::VirtualNode;
use crate::dht::Did;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction as RemoteAction;
use crate::dht::SubRingManager;
use crate::ecc::HashStr;
use crate::err::Error;
use crate::err::Result;
use crate::message::types::JoinSubRing;
use crate::message::types::Message;
use crate::message::HandleMsg;
use crate::message::MessageHandler;
use crate::message::MessagePayload;
use crate::message::PayloadSender;
use crate::swarm::Swarm;
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait SubRingOperator {
async fn subring_create(&self, name: &str) -> Result<()>;
async fn subring_join(&self, name: &str) -> Result<()>;
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl SubRingOperator for Swarm {
async fn subring_create(&self, name: &str) -> Result<()> {
let subring: SubRing = SubRing::new(name, self.dht.did)?;
let vnode: VirtualNode = subring.clone().try_into()?;
self.dht.store_subring(&subring.clone()).await?;
self.storage_store(vnode).await
}
async fn subring_join(&self, name: &str) -> Result<()> {
let address: HashStr = name.to_owned().into();
let did = Did::from_str(&address.inner())?;
match self.dht.join_subring(self.dht.did, did).await {
Ok(PeerRingAction::RemoteAction(next, RemoteAction::FindAndJoinSubRing(rid))) => {
self.send_direct_message(Message::JoinSubRing(JoinSubRing { rid }), next)
.await?;
Ok(())
}
Ok(PeerRingAction::None) => Ok(()),
Ok(act) => Err(Error::PeerRingUnexpectedAction(act)),
Err(e) => Err(e),
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<JoinSubRing> for MessageHandler {
async fn handle(&self, ctx: &MessagePayload<Message>, msg: &JoinSubRing) -> Result<()> {
let mut relay = ctx.relay.clone();
let origin = relay.origin();
match self.dht.join_subring(origin, msg.rid).await {
Ok(PeerRingAction::RemoteAction(next, RemoteAction::FindAndJoinSubRing(_))) => {
relay.relay(self.dht.did, Some(next))?;
relay.reset_destination(next)?;
self.forward_payload(ctx, relay).await
}
Ok(PeerRingAction::None) => Ok(()),
Ok(act) => Err(Error::PeerRingUnexpectedAction(act)),
Err(e) => Err(e),
}
}
}