use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use crate::{
light_sync::LightSync,
tests::helpers::{TestNet, Peer as PeerLike, TestPacket}
};
use vapcore::test_helpers::TestBlockChainClient;
use vapcore_io::IoChannel;
use light::{
cache::Cache,
client::fetch::{self, Unavailable},
net::{LightProtocol, IoContext, Capabilities, Params as LightParams},
provider::LightProvider
};
use network::{NodeId, PeerId};
use parking_lot::{Mutex, RwLock};
const NETWORK_ID: u64 = 0xcafebabe;
pub type LightClient = light::client::Client<Unavailable>;
struct TestIoContext<'a> {
queue: &'a RwLock<VecDeque<TestPacket>>,
sender: Option<PeerId>,
to_disconnect: RwLock<HashSet<PeerId>>,
}
impl<'a> IoContext for TestIoContext<'a> {
fn send(&self, peer: PeerId, packet_id: u8, packet_body: Vec<u8>) {
self.queue.write().push_back(TestPacket {
data: packet_body,
packet_id,
recipient: peer,
})
}
fn respond(&self, packet_id: u8, packet_body: Vec<u8>) {
if let Some(sender) = self.sender {
self.send(sender, packet_id, packet_body);
}
}
fn disconnect_peer(&self, peer: PeerId) {
self.to_disconnect.write().insert(peer);
}
fn disable_peer(&self, peer: PeerId) {
self.disconnect_peer(peer)
}
fn protocol_version(&self, _peer: PeerId) -> Option<u8> {
Some(light::net::MAX_PROTOCOL_VERSION)
}
fn persistent_peer_id(&self, _peer: PeerId) -> Option<NodeId> {
unimplemented!()
}
fn is_reserved_peer(&self, _peer: PeerId) -> bool {
false
}
}
enum PeerData {
Light(Arc<LightSync<LightClient>>, Arc<LightClient>),
Full(Arc<TestBlockChainClient>)
}
pub struct Peer {
proto: LightProtocol,
queue: RwLock<VecDeque<TestPacket>>,
data: PeerData,
}
impl Peer {
pub fn new_full(chain: Arc<TestBlockChainClient>) -> Self {
let params = LightParams {
network_id: NETWORK_ID,
config: Default::default(),
capabilities: Capabilities {
serve_headers: true,
serve_chain_since: None,
serve_state_since: None,
tx_relay: true,
},
sample_store: None,
};
let proto = LightProtocol::new(chain.clone(), params);
Peer {
proto: proto,
queue: RwLock::new(VecDeque::new()),
data: PeerData::Full(chain),
}
}
pub fn new_light(chain: Arc<LightClient>) -> Self {
let sync = Arc::new(LightSync::new(chain.clone()).unwrap());
let params = LightParams {
network_id: NETWORK_ID,
config: Default::default(),
capabilities: Capabilities {
serve_headers: false,
serve_chain_since: None,
serve_state_since: None,
tx_relay: false,
},
sample_store: None,
};
let provider = LightProvider::new(chain.clone(), Arc::new(RwLock::new(Default::default())));
let mut proto = LightProtocol::new(Arc::new(provider), params);
proto.add_handler(sync.clone());
Peer {
proto: proto,
queue: RwLock::new(VecDeque::new()),
data: PeerData::Light(sync, chain),
}
}
pub fn chain(&self) -> &TestBlockChainClient {
match self.data {
PeerData::Full(ref chain) => &*chain,
_ => panic!("Attempted to access full chain on light peer."),
}
}
pub fn light_chain(&self) -> &LightClient {
match self.data {
PeerData::Light(_, ref chain) => &*chain,
_ => panic!("Attempted to access light chain on full peer."),
}
}
fn io(&self, sender: Option<PeerId>) -> TestIoContext {
TestIoContext {
queue: &self.queue,
sender: sender,
to_disconnect: RwLock::new(HashSet::new()),
}
}
}
impl PeerLike for Peer {
type Message = TestPacket;
fn on_connect(&self, other: PeerId) {
let io = self.io(Some(other));
self.proto.on_connect(other, &io);
}
fn on_disconnect(&self, other: PeerId){
let io = self.io(Some(other));
self.proto.on_disconnect(other, &io);
}
fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> {
let io = self.io(Some(from));
self.proto.handle_packet(&io, from, msg.packet_id, &msg.data);
io.to_disconnect.into_inner()
}
fn pending_message(&self) -> Option<TestPacket> {
self.queue.write().pop_front()
}
fn is_done(&self) -> bool {
self.queue.read().is_empty() && match self.data {
PeerData::Light(_, ref client) => {
client.import_verified();
client.queue_info().is_empty()
}
_ => true,
}
}
fn sync_step(&self) {
if let PeerData::Light(_, ref client) = self.data {
client.flush_queue();
while !client.queue_info().is_empty() {
client.import_verified()
}
}
}
fn restart_sync(&self) { }
fn process_all_io_messages(&self) { }
fn process_all_new_block_messages(&self) { }
}
impl TestNet<Peer> {
pub fn light(n_light: usize, n_full: usize) -> Self {
let mut peers = Vec::with_capacity(n_light + n_full);
for _ in 0..n_light {
let mut config = light::client::Config::default();
config.verify_full = false;
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::from_secs(6 * 3600))));
let db = tetsy_kvdb_memorydb::create(1);
let client = LightClient::new(
config,
Arc::new(db),
0,
&spec::new_test(),
fetch::unavailable(), IoChannel::disconnected(),
cache
).expect("New DB creation infallible; qed");
peers.push(Arc::new(Peer::new_light(Arc::new(client))))
}
for _ in 0..n_full {
peers.push(Arc::new(Peer::new_full(Arc::new(TestBlockChainClient::new()))))
}
Self {
peers,
started: false,
disconnect_events: Vec::new(),
}
}
}