#![warn(missing_docs)]
use async_trait::async_trait;
use crate::dht::vnode::VirtualNode;
use crate::dht::ChordStorage;
use crate::dht::Did;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::err::Error;
use crate::err::Result;
use crate::message::types::FoundVNode;
use crate::message::types::Message;
use crate::message::types::SearchVNode;
use crate::message::types::SyncVNodeWithSuccessor;
use crate::message::Encoded;
use crate::message::HandleMsg;
use crate::message::MessageHandler;
use crate::message::MessagePayload;
use crate::message::PayloadSender;
use crate::prelude::vnode::VNodeOperation;
use crate::swarm::Swarm;
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait ChordStorageInterface {
async fn storage_check_cache(&self, vid: Did) -> Option<VirtualNode>;
async fn storage_fetch(&self, vid: Did) -> Result<()>;
async fn storage_store(&self, vnode: VirtualNode) -> Result<()>;
async fn storage_append_data(&self, topic: &str, data: Encoded) -> Result<()>;
async fn storage_touch_data(&self, topic: &str, data: Encoded) -> Result<()>;
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl ChordStorageInterface for Swarm {
async fn storage_check_cache(&self, vid: Did) -> Option<VirtualNode> {
self.dht.local_cache_get(vid)
}
async fn storage_fetch(&self, vid: Did) -> Result<()> {
match self.dht.vnode_lookup(vid).await? {
PeerRingAction::None => Ok(()),
PeerRingAction::SomeVNode(v) => {
self.dht.local_cache_set(v);
Ok(())
}
PeerRingAction::RemoteAction(next, _) => {
self.send_direct_message(Message::SearchVNode(SearchVNode { vid }), next)
.await?;
Ok(())
}
act => Err(Error::PeerRingUnexpectedAction(act)),
}
}
async fn storage_store(&self, vnode: VirtualNode) -> Result<()> {
let op = VNodeOperation::Overwrite(vnode);
match self.dht.vnode_operate(op).await? {
PeerRingAction::None => Ok(()),
PeerRingAction::RemoteAction(target, PeerRingRemoteAction::FindVNodeForOperate(op)) => {
self.send_direct_message(Message::OperateVNode(op), target)
.await?;
Ok(())
}
act => Err(Error::PeerRingUnexpectedAction(act)),
}
}
async fn storage_append_data(&self, topic: &str, data: Encoded) -> Result<()> {
let vnode = (topic.to_string(), data).try_into()?;
let op = VNodeOperation::Extend(vnode);
match self.dht.vnode_operate(op).await? {
PeerRingAction::None => Ok(()),
PeerRingAction::RemoteAction(target, PeerRingRemoteAction::FindVNodeForOperate(op)) => {
self.send_direct_message(Message::OperateVNode(op), target)
.await?;
Ok(())
}
act => Err(Error::PeerRingUnexpectedAction(act)),
}
}
async fn storage_touch_data(&self, topic: &str, data: Encoded) -> Result<()> {
let vnode = (topic.to_string(), data).try_into()?;
let op = VNodeOperation::Touch(vnode);
match self.dht.vnode_operate(op).await? {
PeerRingAction::None => Ok(()),
PeerRingAction::RemoteAction(target, PeerRingRemoteAction::FindVNodeForOperate(op)) => {
self.send_direct_message(Message::OperateVNode(op), target)
.await?;
Ok(())
}
act => Err(Error::PeerRingUnexpectedAction(act)),
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<SearchVNode> for MessageHandler {
async fn handle(&self, ctx: &MessagePayload<Message>, msg: &SearchVNode) -> Result<()> {
let mut relay = ctx.relay.clone();
match self.dht.vnode_lookup(msg.vid).await {
Ok(action) => match action {
PeerRingAction::None => Ok(()),
PeerRingAction::SomeVNode(v) => {
relay.relay(self.dht.did, None)?;
self.send_report_message(
Message::FoundVNode(FoundVNode { data: vec![v] }),
ctx.tx_id,
relay,
)
.await
}
PeerRingAction::RemoteAction(next, _) => {
relay.relay(self.dht.did, Some(next))?;
self.forward_payload(ctx, relay).await
}
act => Err(Error::PeerRingUnexpectedAction(act)),
},
Err(e) => Err(e),
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<FoundVNode> for MessageHandler {
async fn handle(&self, ctx: &MessagePayload<Message>, msg: &FoundVNode) -> Result<()> {
let mut relay = ctx.relay.clone();
relay.relay(self.dht.did, None)?;
if relay.next_hop.is_some() {
self.forward_payload(ctx, relay).await
} else {
for datum in msg.data.iter().cloned() {
self.dht.local_cache_set(datum);
}
Ok(())
}
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<VNodeOperation> for MessageHandler {
async fn handle(&self, ctx: &MessagePayload<Message>, msg: &VNodeOperation) -> Result<()> {
match self.dht.vnode_operate(msg.clone()).await {
Ok(action) => match action {
PeerRingAction::None => Ok(()),
PeerRingAction::RemoteAction(next, _) => {
let mut relay = ctx.relay.clone();
relay.reset_destination(next)?;
relay.relay(self.dht.did, Some(next))?;
self.forward_payload(ctx, relay).await
}
act => Err(Error::PeerRingUnexpectedAction(act)),
},
Err(e) => Err(e),
}?;
Ok(())
}
}
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<SyncVNodeWithSuccessor> for MessageHandler {
async fn handle(
&self,
_ctx: &MessagePayload<Message>,
msg: &SyncVNodeWithSuccessor,
) -> Result<()> {
for data in msg.data.iter().cloned() {
self.swarm.storage_store(data).await?;
}
Ok(())
}
}
#[cfg(not(feature = "wasm"))]
#[cfg(test)]
mod test {
use super::*;
use crate::ecc::tests::gen_ordered_keys;
use crate::message::handlers::connection::tests::test_only_two_nodes_establish_connection;
use crate::message::Encoder;
use crate::prelude::vnode::VNodeType;
use crate::storage::PersistenceStorageOperation;
use crate::tests::default::prepare_node;
#[tokio::test]
async fn test_store_vnode() -> Result<()> {
let keys = gen_ordered_keys(2);
let (key1, key2) = (keys[0], keys[1]);
let (did1, dht1, swarm1, node1, _path1) = prepare_node(key1).await;
let (did2, dht2, swarm2, node2, _path2) = prepare_node(key2).await;
test_only_two_nodes_establish_connection(&node1, &node2).await?;
let data = "Across the Great Wall we can reach every corner in the world.".to_string();
let vnode: VirtualNode = data.clone().try_into().unwrap();
let vid = vnode.did;
let ((_did1, dht1, swarm1, node1), (did2, dht2, swarm2, node2)) =
if vid.in_range(did2, did2, did1) {
((did1, dht1, swarm1, node1), (did2, dht2, swarm2, node2))
} else {
((did2, dht2, swarm2, node2), (did1, dht1, swarm1, node1))
};
assert!(dht1.cache.is_empty());
assert!(dht2.cache.is_empty());
assert!(swarm1.storage_check_cache(vid).await.is_none());
assert!(swarm2.storage_check_cache(vid).await.is_none());
swarm1.storage_store(vnode.clone()).await.unwrap();
let ev = node2.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::OperateVNode(VNodeOperation::Overwrite(x)) if x.did == vid
));
assert!(swarm1.storage_check_cache(vid).await.is_none());
assert!(swarm2.storage_check_cache(vid).await.is_none());
assert!(dht1.storage.count().await.unwrap() == 0);
assert!(dht2.storage.count().await.unwrap() != 0);
println!("vid is on node2 {:?}", &did2);
swarm1.storage_fetch(vid).await.unwrap();
let ev = node2.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::SearchVNode(x) if x.vid == vid
));
let ev = node1.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::FoundVNode(x) if x.data[0].did == vid
));
assert_eq!(
swarm1.storage_check_cache(vid).await,
Some(VirtualNode {
did: vid,
data: vec![data.encode()?],
kind: VNodeType::Data
})
);
tokio::fs::remove_dir_all("./tmp").await.ok();
Ok(())
}
#[tokio::test]
async fn test_extend_data() -> Result<()> {
let keys = gen_ordered_keys(2);
let (key1, key2) = (keys[0], keys[1]);
let (did1, dht1, swarm1, node1, _path1) = prepare_node(key1).await;
let (did2, dht2, swarm2, node2, _path2) = prepare_node(key2).await;
test_only_two_nodes_establish_connection(&node1, &node2).await?;
let topic = "Across the Great Wall we can reach every corner in the world.".to_string();
let vnode: VirtualNode = topic.clone().try_into().unwrap();
let vid = vnode.did;
let ((_did1, dht1, swarm1, node1), (did2, dht2, swarm2, node2)) =
if vid.in_range(did2, did2, did1) {
((did1, dht1, swarm1, node1), (did2, dht2, swarm2, node2))
} else {
((did2, dht2, swarm2, node2), (did1, dht1, swarm1, node1))
};
assert!(dht1.cache.is_empty());
assert!(dht2.cache.is_empty());
assert!(swarm1.storage_check_cache(vid).await.is_none());
assert!(swarm2.storage_check_cache(vid).await.is_none());
swarm1
.storage_append_data(&topic, "111".to_string().encode()?)
.await
.unwrap();
let ev = node2.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::OperateVNode(VNodeOperation::Extend(VirtualNode { did, data, kind: VNodeType::Data }))
if did == vid && data == vec!["111".to_string().encode()?]
));
swarm1
.storage_append_data(&topic, "222".to_string().encode()?)
.await
.unwrap();
let ev = node2.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::OperateVNode(VNodeOperation::Extend(VirtualNode { did, data, kind: VNodeType::Data }))
if did == vid && data == vec!["222".to_string().encode()?]
));
assert!(swarm1.storage_check_cache(vid).await.is_none());
assert!(swarm2.storage_check_cache(vid).await.is_none());
assert!(dht1.storage.count().await.unwrap() == 0);
assert!(dht2.storage.count().await.unwrap() != 0);
println!("vid is on node2 {:?}", &did2);
swarm1.storage_fetch(vid).await.unwrap();
let ev = node2.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::SearchVNode(x) if x.vid == vid
));
let ev = node1.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::FoundVNode(x) if x.data[0].did == vid
));
assert_eq!(
swarm1.storage_check_cache(vid).await,
Some(VirtualNode {
did: vid,
data: vec!["111".to_string().encode()?, "222".to_string().encode()?],
kind: VNodeType::Data
})
);
swarm1
.storage_append_data(&topic, "333".to_string().encode()?)
.await
.unwrap();
let ev = node2.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::OperateVNode(VNodeOperation::Extend(VirtualNode { did, data, kind: VNodeType::Data }))
if did == vid && data == vec!["333".to_string().encode()?]
));
println!("vid is on node2 {:?}", &did2);
swarm1.storage_fetch(vid).await.unwrap();
let ev = node2.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::SearchVNode(x) if x.vid == vid
));
let ev = node1.listen_once().await.unwrap();
assert!(matches!(
ev.data,
Message::FoundVNode(x) if x.data[0].did == vid
));
assert_eq!(
swarm1.storage_check_cache(vid).await,
Some(VirtualNode {
did: vid,
data: vec![
"111".to_string().encode()?,
"222".to_string().encode()?,
"333".to_string().encode()?
],
kind: VNodeType::Data
})
);
tokio::fs::remove_dir_all("./tmp").await.ok();
Ok(())
}
}