use std::collections::{HashSet, HashMap};
use std::net::SocketAddr;
use std::sync::mpsc::SyncSender;
use bip_handshake::Handshaker;
use bip_util::bt::{self, NodeId};
use mio::{Timeout, EventLoop};
use message::find_node::FindNodeRequest;
use routing::bucket::Bucket;
use routing::node::{Node, NodeStatus};
use routing::table::{self, RoutingTable, BucketContents};
use transaction::{MIDGenerator, TransactionID};
use worker::ScheduledTask;
use worker::handler::DhtHandler;
const BOOTSTRAP_INITIAL_TIMEOUT: u64 = 2500;
const BOOTSTRAP_NODE_TIMEOUT: u64 = 500;
const BOOTSTRAP_PINGS_PER_BUCKET: usize = 8;
#[derive(Debug, PartialEq, Eq)]
pub enum BootstrapStatus {
Idle,
Bootstrapping,
Completed,
Failed,
}
pub struct TableBootstrap {
table_id: NodeId,
id_generator: MIDGenerator,
starting_nodes: Vec<SocketAddr>,
active_messages: HashMap<TransactionID, Timeout>,
starting_routers: HashSet<SocketAddr>,
curr_bootstrap_bucket: usize,
}
impl TableBootstrap {
pub fn new<I>(table_id: NodeId,
id_generator: MIDGenerator,
nodes: Vec<SocketAddr>,
routers: I)
-> TableBootstrap
where I: Iterator<Item = SocketAddr>
{
let router_filter: HashSet<SocketAddr> = routers.collect();
TableBootstrap {
table_id: table_id,
id_generator: id_generator,
starting_nodes: nodes,
starting_routers: router_filter,
active_messages: HashMap::new(),
curr_bootstrap_bucket: 0,
}
}
pub fn start_bootstrap<H>(&mut self,
out: &SyncSender<(Vec<u8>, SocketAddr)>,
event_loop: &mut EventLoop<DhtHandler<H>>)
-> BootstrapStatus
where H: Handshaker
{
self.active_messages.clear();
self.curr_bootstrap_bucket = 0;
let trans_id = self.id_generator.generate();
let res_timeout = event_loop.timeout_ms((BOOTSTRAP_INITIAL_TIMEOUT,
ScheduledTask::CheckBootstrapTimeout(trans_id)),
BOOTSTRAP_INITIAL_TIMEOUT);
let timeout = if let Ok(t) = res_timeout {
t
} else {
error!("bip_dht: Failed to set a timeout for the start of a table bootstrap...");
return BootstrapStatus::Failed;
};
self.active_messages.insert(trans_id, timeout);
let find_node_msg = FindNodeRequest::new(trans_id.as_ref(), self.table_id, self.table_id)
.encode();
for addr in self.starting_routers.iter().chain(self.starting_nodes.iter()) {
if out.send((find_node_msg.clone(), *addr)).is_err() {
error!("bip_dht: Failed to send bootstrap message to router through channel...");
return BootstrapStatus::Failed;
}
}
self.current_bootstrap_status()
}
pub fn is_router(&self, addr: &SocketAddr) -> bool {
self.starting_routers.contains(&addr)
}
pub fn recv_response<'a, H>(&mut self,
trans_id: &TransactionID,
table: &RoutingTable,
out: &SyncSender<(Vec<u8>, SocketAddr)>,
event_loop: &mut EventLoop<DhtHandler<H>>)
-> BootstrapStatus
where H: Handshaker
{
let timeout = if let Some(t) = self.active_messages.get(trans_id) {
*t
} else {
warn!("bip_dht: Received expired/unsolicited node response for an active table \
bootstrap...");
return self.current_bootstrap_status();
};
if self.curr_bootstrap_bucket != 0 {
event_loop.clear_timeout(timeout);
self.active_messages.remove(trans_id);
}
if self.active_messages.is_empty() {
return self.bootstrap_next_bucket(table, out, event_loop);
}
self.current_bootstrap_status()
}
pub fn recv_timeout<H>(&mut self,
trans_id: &TransactionID,
table: &RoutingTable,
out: &SyncSender<(Vec<u8>, SocketAddr)>,
event_loop: &mut EventLoop<DhtHandler<H>>)
-> BootstrapStatus
where H: Handshaker
{
if self.active_messages.remove(trans_id).is_none() {
warn!("bip_dht: Received expired/unsolicited node timeout for an active table \
bootstrap...");
return self.current_bootstrap_status();
}
if self.active_messages.is_empty() {
return self.bootstrap_next_bucket(table, out, event_loop);
}
self.current_bootstrap_status()
}
fn bootstrap_next_bucket<H>(&mut self,
table: &RoutingTable,
out: &SyncSender<(Vec<u8>, SocketAddr)>,
event_loop: &mut EventLoop<DhtHandler<H>>)
-> BootstrapStatus
where H: Handshaker
{
let target_id = flip_id_bit_at_index(self.table_id, self.curr_bootstrap_bucket);
if self.curr_bootstrap_bucket == 0 || self.curr_bootstrap_bucket == 1 {
let iter = table.closest_nodes(target_id)
.filter(|n| n.status() == NodeStatus::Questionable);
self.send_bootstrap_requests(iter, target_id, table, out, event_loop)
} else {
let mut buckets = table.buckets().skip(self.curr_bootstrap_bucket - 2);
let dummy_bucket = Bucket::new();
let percent_25_bucket = if let Some(bucket) = buckets.next() {
match bucket {
BucketContents::Empty => dummy_bucket.iter(),
BucketContents::Sorted(b) => b.iter(),
BucketContents::Assorted(b) => b.iter(),
}
} else {
dummy_bucket.iter()
};
let percent_50_bucket = if let Some(bucket) = buckets.next() {
match bucket {
BucketContents::Empty => dummy_bucket.iter(),
BucketContents::Sorted(b) => b.iter(),
BucketContents::Assorted(b) => b.iter(),
}
} else {
dummy_bucket.iter()
};
let percent_100_bucket = if let Some(bucket) = buckets.next() {
match bucket {
BucketContents::Empty => dummy_bucket.iter(),
BucketContents::Sorted(b) => b.iter(),
BucketContents::Assorted(b) => b.iter(),
}
} else {
dummy_bucket.iter()
};
let iter = percent_25_bucket.chain(percent_50_bucket)
.chain(percent_100_bucket)
.filter(|n| n.status() == NodeStatus::Questionable);
self.send_bootstrap_requests(iter, target_id, table, out, event_loop)
}
}
fn send_bootstrap_requests<'a, H, I>(&mut self,
nodes: I,
target_id: NodeId,
table: &RoutingTable,
out: &SyncSender<(Vec<u8>, SocketAddr)>,
event_loop: &mut EventLoop<DhtHandler<H>>)
-> BootstrapStatus
where I: Iterator<Item = &'a Node>,
H: Handshaker
{
info!("bip_dht: bootstrap::send_bootstrap_requests {}",
self.curr_bootstrap_bucket);
let mut messages_sent = 0;
for node in nodes.take(BOOTSTRAP_PINGS_PER_BUCKET) {
let trans_id = self.id_generator.generate();
let find_node_msg = FindNodeRequest::new(trans_id.as_ref(), self.table_id, target_id)
.encode();
let res_timeout =
event_loop.timeout_ms((BOOTSTRAP_NODE_TIMEOUT,
ScheduledTask::CheckBootstrapTimeout(trans_id)),
BOOTSTRAP_NODE_TIMEOUT);
let timeout = if let Ok(t) = res_timeout {
t
} else {
error!("bip_dht: Failed to set a timeout for the start of a table bootstrap...");
return BootstrapStatus::Failed;
};
if out.send((find_node_msg, node.addr())).is_err() {
error!("bip_dht: Could not send a bootstrap message through the channel...");
return BootstrapStatus::Failed;
}
node.local_request();
self.active_messages.insert(trans_id, timeout);
messages_sent += 1;
}
self.curr_bootstrap_bucket += 1;
if self.curr_bootstrap_bucket == table::MAX_BUCKETS {
return BootstrapStatus::Completed;
} else if messages_sent == 0 {
self.bootstrap_next_bucket(table, out, event_loop)
} else {
return BootstrapStatus::Bootstrapping;
}
}
fn current_bootstrap_status(&self) -> BootstrapStatus {
if self.curr_bootstrap_bucket == table::MAX_BUCKETS || self.active_messages.is_empty() {
BootstrapStatus::Idle
} else {
BootstrapStatus::Bootstrapping
}
}
}
fn flip_id_bit_at_index(node_id: NodeId, index: usize) -> NodeId {
let mut id_bytes: [u8; bt::NODE_ID_LEN] = node_id.into();
let (byte_index, bit_index) = (index / 8, index % 8);
let actual_bit_index = 7 - bit_index;
id_bytes[byte_index] ^= 1 << actual_bit_index;
id_bytes.into()
}