use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::time::Duration;
extern crate async_std;
use async_std::task;
extern crate tracing_subscriber;
use tracing_subscriber::{filter::LevelFilter, FmtSubscriber};
extern crate tempdir;
use tempdir::TempDir;
use kad::prelude::*;
use kad::store::Datastore;
use dsf_core::net::{Request, RequestKind, Response, ResponseKind};
use dsf_core::prelude::*;
use dsf_core::service::{Publisher, ServiceBuilder};
use dsf_core::types::Flags;
use dsf_rpc::{self as rpc};
use super::{Dsf, Options};
use crate::core::peers::PeerState;
use crate::io::mock::{MockConnector, MockTransaction};
use crate::store::Store;
#[test]
fn test_manager() {
let mut buff = vec![0u8; 1024];
let _ = FmtSubscriber::builder()
.with_max_level(LevelFilter::INFO)
.try_init();
let d = TempDir::new("/tmp/").unwrap();
let config = Options::default();
let db_file = format!("{}/dsf-test.db", d.path().to_str().unwrap());
let store = Arc::new(Mutex::new(Store::new(&db_file).unwrap()));
let mut mux = MockConnector::new();
let service = Service::default();
let mut dsf = Dsf::new(config, service, store, mux.clone()).unwrap();
let id1 = dsf.id().clone();
let _addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 0, 1)), 8111);
let (a2, s2) = (
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 0, 3)), 8112),
ServiceBuilder::default().build().unwrap(),
);
let (a3, s3) = (
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 0, 3)), 8113),
ServiceBuilder::default().build().unwrap(),
);
let (a4, mut s4) = (
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 0, 0, 3)), 8114),
ServiceBuilder::default().build().unwrap(),
);
let mut peers = vec![(&a2, &s2), (&a3, &s3), (&a4, &s4)];
peers.sort_by_key(|(_, s)| DhtDatabaseId::xor(&id1.clone(), &s.id()));
task::block_on(async {
info!("Responds to pings");
assert_eq!(
dsf.handle(
a2,
Request::new(
s2.id(),
rand::random(),
RequestKind::Ping,
Flags::ADDRESS_REQUEST
)
)
.await
.unwrap(),
Response::new(
id1.clone(),
rand::random(),
ResponseKind::NoResult,
Flags::default()
),
);
info!("Connect function");
mux.expect(vec![
MockTransaction::request(
a2.into(),
Request::new(
id1.clone(),
rand::random(),
RequestKind::FindNode(id1.clone()),
Flags::ADDRESS_REQUEST | Flags::PUB_KEY_REQUEST,
)
.with_public_key(dsf.pub_key()),
Ok(Response::new(
s2.id(),
rand::random(),
ResponseKind::NodesFound(
id1.clone(),
vec![(s3.id(), a3.into(), s3.public_key())],
),
Flags::default(),
)
.with_public_key(s2.public_key())),
),
MockTransaction::request(
a3.into(),
Request::new(
id1.clone(),
rand::random(),
RequestKind::FindNode(id1.clone()),
Flags::ADDRESS_REQUEST | Flags::PUB_KEY_REQUEST,
)
.with_public_key(dsf.pub_key()),
Ok(Response::new(
s3.id(),
rand::random(),
ResponseKind::NodesFound(id1.clone(), vec![]),
Flags::default(),
)),
),
]);
dsf.connect(rpc::ConnectOptions {
address: a2,
id: None,
timeout: Duration::from_secs(10).into(),
})
.await
.unwrap();
mux.finalise();
let peer = dsf.peers().find(&s2.id()).unwrap();
assert_eq!(peer.state(), PeerState::Known(s2.public_key()));
assert!(!peer.seen().is_none());
let peer = dsf.peers().find(&s3.id()).unwrap();
assert_eq!(peer.state(), PeerState::Known(s3.public_key()));
info!("Responds to find_nodes");
let mut nodes = vec![
(s2.id(), a2.into(), s2.public_key()),
(s3.id(), a3.into(), s3.public_key()),
];
nodes.sort_by_key(|(id, _, _)| DhtDatabaseId::xor(&s4.id(), &id));
assert_eq!(
dsf.handle(
a2,
Request::new(
s2.id().clone(),
rand::random(),
RequestKind::FindNode(s4.id().clone()),
Flags::default()
)
)
.await
.unwrap(),
Response::new(
id1.clone(),
rand::random(),
ResponseKind::NodesFound(s4.id().clone(), nodes),
Flags::default()
),
);
info!("Handles store requests");
let (_n, p4) = s4.publish_primary(&mut buff).unwrap();
assert_eq!(
dsf.handle(
a4,
Request::new(
s4.id().clone(),
rand::random(),
RequestKind::Store(s4.id().clone(), vec![p4.clone()]),
Flags::default()
)
)
.await
.unwrap(),
Response::new(
id1.clone(),
rand::random(),
ResponseKind::ValuesFound(s4.id().clone(), vec![p4.clone()]),
Flags::default()
),
);
info!("Responds to page requests");
assert_eq!(
dsf.handle(
a4,
Request::new(
s4.id().clone(),
rand::random(),
RequestKind::FindValue(s4.id().clone()),
Flags::default()
)
)
.await
.unwrap(),
Response::new(
id1.clone(),
rand::random(),
ResponseKind::ValuesFound(s4.id().clone(), vec![p4.clone()]),
Flags::default()
),
);
info!("Register function");
let (_n, p1) = dsf.primary(&mut buff).unwrap();
let mut peers = vec![
(a2, s2.id(), s2.public_key()),
(a3, s3.id(), s3.public_key()),
(a4, s4.id().clone(), s4.public_key()),
];
peers.sort_by_key(|(_addr, id, _pk)| DhtDatabaseId::xor(&id1, &id));
let mut searches: Vec<_> = peers
.iter()
.map(|(addr, id, _pk)| {
MockTransaction::request(
addr.clone().into(),
Request::new(
id1.clone(),
rand::random(),
RequestKind::FindNode(id1.clone()),
Flags::PUB_KEY_REQUEST,
)
.with_public_key(dsf.pub_key()),
Ok(Response::new(
id.clone(),
rand::random(),
ResponseKind::NoResult,
Flags::default(),
)),
)
})
.collect();
let mut stores: Vec<_> = peers
.iter()
.map(|(addr, id, pk)| {
MockTransaction::request(
addr.clone().into(),
Request::new(
id1.clone(),
rand::random(),
RequestKind::Store(id1.clone(), vec![p1.clone()]),
Flags::PUB_KEY_REQUEST,
)
.with_public_key(dsf.pub_key()),
Ok(Response::new(
id.clone(),
rand::random(),
ResponseKind::ValuesFound(id1.clone(), vec![p1.clone()]),
Flags::default(),
)
.with_public_key(pk.clone())),
)
})
.collect();
let mut transactions = vec![];
transactions.append(&mut searches);
transactions.append(&mut stores);
mux.expect(transactions.clone());
dsf.store(&id1, vec![p1.clone()]).await.unwrap();
mux.finalise();
mux.expect(transactions);
dsf.store(&id1, vec![p1.clone()]).await.unwrap();
mux.finalise();
info!("Generates services");
mux.expect(vec![]);
let info = dsf
.create(rpc::CreateOptions::default())
.await
.expect("error creating service");
mux.finalise();
let pages = dsf
.datastore()
.find(&info.id)
.expect("no internal store entry found");
let page = &pages[0];
info!("Registers services");
peers.sort_by_key(|(_addr, id, _pk)| DhtDatabaseId::xor(&info.id, &id));
let mut searches: Vec<_> = peers
.iter()
.map(|(addr, id, _pk)| {
MockTransaction::request(
addr.clone().into(),
Request::new(
id1.clone(),
rand::random(),
RequestKind::FindNode(info.id.clone()),
Flags::PUB_KEY_REQUEST,
)
.with_public_key(dsf.pub_key()),
Ok(Response::new(
id.clone(),
rand::random(),
ResponseKind::NoResult,
Flags::default(),
)),
)
})
.collect();
let mut stores: Vec<_> = peers
.iter()
.map(|(addr, id, pk)| {
MockTransaction::request(
addr.clone().into(),
Request::new(
id1.clone(),
rand::random(),
RequestKind::Store(info.id.clone(), vec![page.clone()]),
Flags::PUB_KEY_REQUEST,
)
.with_public_key(dsf.pub_key()),
Ok(Response::new(
id.clone(),
rand::random(),
ResponseKind::ValuesFound(id1.clone(), vec![page.clone()]),
Flags::default(),
)
.with_public_key(pk.clone())),
)
})
.collect();
let mut transactions = vec![];
transactions.append(&mut searches);
transactions.append(&mut stores);
mux.expect(transactions.clone());
dsf.register(rpc::RegisterOptions {
service: rpc::ServiceIdentifier::id(info.id.clone()),
no_replica: true,
})
.await
.expect("Registration error");
mux.finalise();
info!("Publishes data");
mux.expect(vec![]);
dsf.publish(rpc::PublishOptions::new(info.id.clone()))
.await
.expect("publishing error");
mux.finalise();
info!("Responds to subscribe requests");
assert_eq!(
dsf.handle(
a4,
Request::new(
s4.id().clone(),
rand::random(),
RequestKind::Subscribe(info.id.clone()),
Flags::default()
)
)
.await
.unwrap(),
Response::new(
id1.clone(),
rand::random(),
ResponseKind::ValuesFound(info.id.clone(), vec![page.clone()]),
Flags::default()
),
);
let subscribers = dsf.subscribers().find(&info.id).unwrap();
assert_eq!(subscribers.len(), 1, "No subscribers found");
let _subscriber = subscribers
.iter()
.find(|s| s.info.service_id == info.id.clone())
.expect("subscriber entry not found for service");
info!("Publishes data to subscribers");
mux.expect(vec![MockTransaction::request(
a4.clone().into(),
Request::new(
s4.id().clone(),
rand::random(),
RequestKind::PushData(info.id.clone(), vec![page.clone()]),
Flags::PUB_KEY_REQUEST,
)
.with_public_key(dsf.pub_key()),
Ok(Response::new(
id1.clone(),
rand::random(),
ResponseKind::ValuesFound(id1.clone(), vec![page.clone()]),
Flags::default(),
)),
)]);
});
}