#![allow(dead_code)]
#![allow(unused_variables)]
#![allow(unused_imports)]
use super::constants::*;
use crate::{
core::{
gossipsub_cfg::GossipsubConfig,
replication::{ConsensusModel, ConsistencyModel, ReplNetworkConfig},
sharding::{ShardStorage, Sharding},
tests::replication::REPL_NETWORK_ID,
ByteVector, Core, CoreBuilder, NetworkEvent, RpcConfig,
},
setup::BootstrapConfig,
MultiaddrString, PeerIdString, Port,
};
use libp2p::{gossipsub::MessageId, PeerId};
use libp2p_identity::Keypair;
use std::{
collections::{btree_map::Range, BTreeMap, HashMap, VecDeque},
sync::Arc,
time::Duration,
};
use tokio::sync::Mutex;
pub const NETWORK_SHARDING_ID: &'static str = "sharding_xx";
fn rpc_incoming_message_handler(data: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
data
}
fn gossipsub_filter_fn(
propagation_source: PeerId,
message_id: MessageId,
source: Option<PeerId>,
topic: String,
data: Vec<String>,
) -> bool {
true
}
#[derive(Debug)]
struct LocalStorage {
buffer: VecDeque<String>,
}
impl ShardStorage for LocalStorage {
fn fetch_data(&mut self, key: ByteVector) -> ByteVector {
let key_str = String::from_utf8_lossy(&key[0]);
for data in self.buffer.iter() {
if data.starts_with(key_str.as_ref()) {
let data = data.split("-->").collect::<Vec<&str>>();
return vec![data[1].as_bytes().to_vec()];
}
}
Default::default()
}
}
pub struct RangeSharding<T>
where
T: ToString + Send + Sync,
{
ranges: BTreeMap<u64, T>,
}
impl<T> RangeSharding<T>
where
T: ToString + Send + Sync,
{
pub fn new(ranges: BTreeMap<u64, T>) -> Self {
Self { ranges }
}
}
impl<T> Sharding for RangeSharding<T>
where
T: ToString + Send + Sync + Clone,
{
type Key = u64;
type ShardId = T;
fn locate_shard(&self, key: &Self::Key) -> Option<Self::ShardId> {
self.ranges
.iter()
.find(|(&upper_bound, _)| key <= &upper_bound)
.map(|(_, shard_id)| shard_id.clone())
}
}
async fn setup_node(
ports: (Port, Port),
deterministic_protobuf: &[u8],
boot_nodes: HashMap<PeerIdString, MultiaddrString>,
shard_storage: Arc<Mutex<LocalStorage>>,
) -> Core {
let mut protobuf = &mut deterministic_protobuf.to_owned()[..];
let config = BootstrapConfig::default()
.generate_keypair_from_protobuf("ed25519", &mut protobuf)
.with_tcp(ports.0)
.with_udp(ports.1)
.with_bootnodes(boot_nodes);
let mut builder = CoreBuilder::with_config(config);
builder = builder.with_rpc(RpcConfig::Default, rpc_incoming_message_handler);
let filter_fn = gossipsub_filter_fn;
let builder = builder.with_gossipsub(GossipsubConfig::Default, filter_fn);
let repl_config = ReplNetworkConfig::Custom {
queue_length: 150,
expiry_time: Some(10),
sync_wait_time: 5,
consistency_model: ConsistencyModel::Eventual,
data_aging_period: 2,
};
builder
.with_replication(repl_config)
.with_sharding(NETWORK_SHARDING_ID.into(), shard_storage)
.build()
.await
.unwrap()
}
#[tokio::test]
async fn join_and_exit_shard_network() {
let shard_id_1 = 1;
let shard_id_2 = 2;
let shard_id_3 = 3;
let mut ranges = BTreeMap::new();
ranges.insert(100, shard_id_1);
ranges.insert(200, shard_id_2);
ranges.insert(300, shard_id_3);
let shard_exec = Arc::new(Mutex::new(RangeSharding::new(ranges)));
let local_storage_buffer = Arc::new(Mutex::new(LocalStorage {
buffer: Default::default(),
}));
let peer_id_1 = Keypair::from_protobuf_encoding(&NODE_1_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let peer_id_2 = Keypair::from_protobuf_encoding(&NODE_2_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let peer_id_3 = Keypair::from_protobuf_encoding(&NODE_3_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let ports_1: (Port, Port) = (48152, 54193);
let ports_2: (Port, Port) = (32153, 32101);
let ports_3: (Port, Port) = (48154, 54102);
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_1 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_2.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_2.0),
);
bootnodes.insert(
peer_id_3.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_3.0),
);
let node = setup_node(ports_1, &NODE_1_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_1)
.await;
tokio::time::sleep(Duration::from_secs(3)).await;
let _ = sharding_executor
.lock()
.await
.exit_network(node.clone(), &shard_id_1)
.await;
});
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_2 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_1.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_1.0),
);
bootnodes.insert(
peer_id_3.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_3.0),
);
let node = setup_node(ports_2, &NODE_2_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_2)
.await;
tokio::time::sleep(Duration::from_secs(3)).await;
let _ = sharding_executor
.lock()
.await
.exit_network(node.clone(), &shard_id_2)
.await;
});
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_3 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_1.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_1.0),
);
bootnodes.insert(
peer_id_2.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_2.0),
);
let node = setup_node(ports_3, &NODE_3_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_3)
.await;
let shard_network_state =
<RangeSharding<String> as Sharding>::network_state(node.clone()).await;
assert_eq!(shard_network_state.len(), 3);
for shard in &shard_network_state {
assert_eq!(shard.1.len(), 1);
}
tokio::time::sleep(Duration::from_secs(12)).await;
let shard_network_state =
<RangeSharding<String> as Sharding>::network_state(node.clone()).await;
assert_eq!(shard_network_state.len(), 1);
assert_eq!(shard_network_state.iter().nth(0).unwrap().1.len(), 1);
});
for task in vec![task_1, task_2, task_3] {
task.await.unwrap();
}
}
#[tokio::test]
async fn shard_data_forwarding() {
let shard_id_1 = 1;
let shard_id_2 = 2;
let mut ranges = BTreeMap::new();
ranges.insert(100, shard_id_1);
ranges.insert(200, shard_id_2);
let shard_exec = Arc::new(Mutex::new(RangeSharding::new(ranges)));
let local_storage_buffer = Arc::new(Mutex::new(LocalStorage {
buffer: Default::default(),
}));
let peer_id_1 = Keypair::from_protobuf_encoding(&NODE_1_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let peer_id_2 = Keypair::from_protobuf_encoding(&NODE_2_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let ports_1: (Port, Port) = (40105, 54201);
let ports_2: (Port, Port) = (40103, 54109);
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_1 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_2.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_2.0),
);
let node = setup_node(ports_1, &NODE_1_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_1)
.await;
tokio::time::sleep(Duration::from_secs(2)).await;
let shard_key = 150;
if let Ok(response) = sharding_executor
.lock()
.await
.shard(node.clone(), &shard_key, vec!["sharding works".into()])
.await
{
assert_eq!(response, None);
}
tokio::time::sleep(Duration::from_secs(10)).await;
});
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_2 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_1.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_1.0),
);
let mut node = setup_node(ports_2, &NODE_2_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_2)
.await;
tokio::time::sleep(Duration::from_secs(5)).await;
while let Some(event) = node.next_event().await {
match event {
NetworkEvent::IncomingForwardedData { data, source } => {
println!(
"recieved forwarded data: {:?} from peer: {}",
data,
source.to_base58()
);
assert_eq!(data, vec!["sharding works".to_string()]);
},
_ => {},
}
}
});
for task in vec![task_1, task_2] {
task.await.unwrap();
}
}
#[tokio::test]
async fn shard_local_storage_and_replication() {
let shard_id_1 = 1;
let mut ranges = BTreeMap::new();
ranges.insert(100, shard_id_1);
let shard_exec = Arc::new(Mutex::new(RangeSharding::new(ranges)));
let local_storage_buffer = Arc::new(Mutex::new(LocalStorage {
buffer: Default::default(),
}));
let peer_id_1 = Keypair::from_protobuf_encoding(&NODE_1_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let peer_id_2 = Keypair::from_protobuf_encoding(&NODE_2_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let ports_1: (Port, Port) = (40155, 54200);
let ports_2: (Port, Port) = (40153, 54100);
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_1 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_2.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_2.0),
);
let node = setup_node(ports_1, &NODE_1_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_1)
.await;
tokio::time::sleep(Duration::from_secs(2)).await;
let shard_key = 28;
if let Ok(response) = sharding_executor
.lock()
.await
.shard(node.clone(), &shard_key, vec!["sharding works".into()])
.await
{
assert_eq!(response, Some(vec!["sharding works".into()]));
}
tokio::time::sleep(Duration::from_secs(10)).await;
});
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_2 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_1.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_1.0),
);
let mut node = setup_node(ports_2, &NODE_2_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_1)
.await;
tokio::time::sleep(Duration::from_secs(5)).await;
while let Some(event) = node.next_event().await {
match event {
NetworkEvent::ReplicaDataIncoming {
data,
network,
source,
..
} => {
println!(
"recieved replica data: {:?} from shard peer: {}",
data,
source.to_base58()
);
if let Some(repl_data) = node.consume_repl_data(&network).await {
assert_eq!(repl_data.data, vec!["sharding works".to_string()]);
}
},
_ => {},
}
}
});
for task in vec![task_1, task_2] {
task.await.unwrap();
}
}
#[tokio::test]
async fn data_forwarding_replication() {
let shard_id_1 = 1;
let shard_id_2 = 2;
let mut ranges = BTreeMap::new();
ranges.insert(100, shard_id_1);
ranges.insert(200, shard_id_2);
let shard_exec = Arc::new(Mutex::new(RangeSharding::new(ranges)));
let local_storage_buffer = Arc::new(Mutex::new(LocalStorage {
buffer: Default::default(),
}));
let peer_id_1 = Keypair::from_protobuf_encoding(&NODE_1_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let peer_id_2 = Keypair::from_protobuf_encoding(&NODE_2_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let peer_id_3 = Keypair::from_protobuf_encoding(&NODE_3_KEYPAIR)
.unwrap()
.public()
.to_peer_id();
let ports_1: (Port, Port) = (48135, 54303);
let ports_2: (Port, Port) = (48133, 54301);
let ports_3: (Port, Port) = (48134, 54302);
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_1 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_2.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_2.0),
);
bootnodes.insert(
peer_id_3.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_3.0),
);
let node = setup_node(ports_1, &NODE_1_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_1)
.await;
tokio::time::sleep(Duration::from_secs(2)).await;
let shard_key = 150;
if let Ok(response) = sharding_executor
.lock()
.await
.shard(node.clone(), &shard_key, vec!["sharding works".into()])
.await
{
assert_eq!(response, None);
}
tokio::time::sleep(Duration::from_secs(15)).await;
});
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_2 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_1.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_1.0),
);
let mut node = setup_node(ports_2, &NODE_2_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_2)
.await;
tokio::time::sleep(Duration::from_secs(15)).await;
while let Some(event) = node.next_event().await {
match event {
NetworkEvent::IncomingForwardedData { data, source } => {
println!(
"recieved forwarded data: {:?} from peer: {}",
data,
source.to_base58()
);
assert_eq!(data, vec!["sharding works".to_string()]);
},
NetworkEvent::ReplicaDataIncoming {
data,
network,
source,
..
} => {
println!(
"recieved replica data: {:?} from shard peer: {}",
data,
source.to_base58()
);
if let Some(repl_data) = node.consume_repl_data(&network).await {
assert_eq!(repl_data.data, vec!["sharding works".to_string()]);
}
},
_ => {},
}
}
});
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_3 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_1.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_1.0),
);
bootnodes.insert(
peer_id_2.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_2.0),
);
let mut node = setup_node(ports_3, &NODE_3_KEYPAIR[..], bootnodes, local_storage).await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_2)
.await;
tokio::time::sleep(Duration::from_secs(15)).await;
while let Some(event) = node.next_event().await {
match event {
NetworkEvent::IncomingForwardedData { data, source } => {
println!(
"recieved forwarded data: {:?} from peer: {}",
data,
source.to_base58()
);
assert_eq!(data, vec!["sharding works".to_string()]);
},
NetworkEvent::ReplicaDataIncoming {
data,
network,
source,
..
} => {
println!(
"recieved replica data: {:?} from shard peer: {}",
data,
source.to_base58()
);
if let Some(repl_data) = node.consume_repl_data(&network).await {
assert_eq!(repl_data.data, vec!["sharding works".to_string()]);
}
},
_ => {},
}
}
});
for task in vec![task_1, task_2, task_3] {
task.await.unwrap();
}
}
#[tokio::test]
async fn fetching_sharded_data() {
let shard_id_1 = 1;
let shard_id_2 = 2;
let shard_key = 15;
let mut ranges = BTreeMap::new();
ranges.insert(100, shard_id_1);
ranges.insert(200, shard_id_2);
let shard_exec = Arc::new(Mutex::new(RangeSharding::new(ranges)));
let local_storage_buffer = Arc::new(Mutex::new(LocalStorage {
buffer: Default::default(),
}));
let peer_id_1 = Keypair::from_protobuf_encoding(&NODE_1_KEYPAIR[..])
.unwrap()
.public()
.to_peer_id();
let peer_id_2 = Keypair::from_protobuf_encoding(&NODE_2_KEYPAIR[..])
.unwrap()
.public()
.to_peer_id();
let ports_1: (Port, Port) = (48155, 54103);
let ports_2: (Port, Port) = (48153, 64101);
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_1 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_2.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_2.0),
);
let node = setup_node(
ports_1,
&NODE_1_KEYPAIR[..],
bootnodes,
local_storage.clone(),
)
.await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_1)
.await;
tokio::time::sleep(Duration::from_secs(2)).await;
if let Ok(response) = sharding_executor
.lock()
.await
.shard(node.clone(), &shard_key, vec!["name-->Jesus".into()])
.await
{
assert_eq!(response, Some(vec!["name-->Jesus".into()]));
let res_data = response.unwrap()[0].clone();
let data = String::from_utf8_lossy(&res_data);
local_storage.lock().await.buffer.push_back(data.into());
}
});
let sharding_executor = shard_exec.clone();
let local_storage = local_storage_buffer.clone();
let task_2 = tokio::task::spawn(async move {
let mut bootnodes = HashMap::new();
bootnodes.insert(
peer_id_1.to_base58(),
format!("/ip4/127.0.0.1/tcp/{}", ports_1.0),
);
let node = setup_node(
ports_2,
&NODE_2_KEYPAIR[..],
bootnodes,
local_storage.clone(),
)
.await;
let _ = sharding_executor
.lock()
.await
.join_network(node.clone(), &shard_id_2)
.await;
tokio::time::sleep(Duration::from_secs(5)).await;
match sharding_executor
.lock()
.await
.fetch(node.clone(), &shard_key, vec!["name".into()])
.await
{
Ok(response) => match response {
Some(data) => {
let name = String::from_utf8_lossy(&data[0]);
if !data[0].is_empty() {
println!("The response data is '{}'", name);
assert_eq!(name.as_ref(), "Jesus");
} else {
println!("The remote node does not have the data stored.");
}
println!("Successfully pulled data from the network.");
},
None => println!("Data exists locally on node."),
},
Err(e) => println!("Fetching failed: {}", e.to_string()),
}
});
for task in vec![task_1, task_2] {
task.await.unwrap();
}
}