use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4};
use std::io;
use std::time::Duration;
use std::cmp;
use discovery::DiscoveryInfo;
use message::initiate::InitiateMessage;
use message::complete::CompleteMessage;
use message::extensions::Extensions;
use handshake::handler::handshaker;
use handshake::handler::initiator;
use handshake::handler::listener::ListenerHandler;
use handshake::handler;
use transport::Transport;
use local_addr::LocalAddr;
use filter::filters::Filters;
use filter::{HandshakeFilter, HandshakeFilters};
use handshake::config::HandshakerConfig;
use handshake::handler::timer::HandshakeTimer;
use bip_util::bt::PeerId;
use bip_util::convert;
use futures::{StartSend, Poll};
use futures::sync::mpsc::{self, Sender, Receiver, SendError};
use futures::sink::Sink;
use futures::stream::Stream;
use tokio_core::reactor::Handle;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{self};
use rand::{self, Rng};
#[derive(Copy, Clone)]
pub struct HandshakerBuilder {
bind: SocketAddr,
port: u16,
pid: PeerId,
ext: Extensions,
config: HandshakerConfig
}
impl HandshakerBuilder {
pub fn new() -> HandshakerBuilder {
let default_v4_addr = Ipv4Addr::new(0, 0, 0, 0);
let default_v4_port = 0;
let default_sock_addr = SocketAddr::V4(SocketAddrV4::new(
default_v4_addr, default_v4_port));
let seed = rand::thread_rng().next_u32();
let default_peer_id = PeerId::from_bytes(&convert::four_bytes_to_array(seed));
HandshakerBuilder{ bind: default_sock_addr, port: default_v4_port, pid: default_peer_id,
ext: Extensions::new(), config: HandshakerConfig::default() }
}
pub fn with_bind_addr(&mut self, addr: SocketAddr) -> &mut HandshakerBuilder {
self.bind = addr;
self
}
pub fn with_open_port(&mut self, port: u16) -> &mut HandshakerBuilder {
self.port = port;
self
}
pub fn with_peer_id(&mut self, peer_id: PeerId) -> &mut HandshakerBuilder {
self.pid = peer_id;
self
}
pub fn with_extensions(&mut self, ext: Extensions) -> &mut HandshakerBuilder {
self.ext = ext;
self
}
pub fn with_config(&mut self, config: HandshakerConfig) -> &mut HandshakerBuilder {
self.config = config;
self
}
pub fn build<T>(&self, transport: T, handle: Handle) -> io::Result<Handshaker<T::Socket>>
where T: Transport + 'static {
Handshaker::with_builder(self, transport, handle)
}
}
pub struct Handshaker<S> {
sink: HandshakerSink,
stream: HandshakerStream<S>
}
impl<S> Handshaker<S> {
pub fn into_parts(self) -> (HandshakerSink, HandshakerStream<S>) {
(self.sink, self.stream)
}
}
impl<S> DiscoveryInfo for Handshaker<S> {
fn port(&self) -> u16 {
self.sink.port()
}
fn peer_id(&self) -> PeerId {
self.sink.peer_id()
}
}
impl<S> Handshaker<S> where S: AsyncRead + AsyncWrite + 'static {
fn with_builder<T>(builder: &HandshakerBuilder, transport: T, handle: Handle) -> io::Result<Handshaker<T::Socket>>
where T: Transport<Socket=S> + 'static {
let listener = try!(transport.listen(&builder.bind, &handle));
let open_port = if builder.port == 0 {
try!(listener.local_addr()).port()
} else { builder.port };
let config = builder.config;
let (addr_send, addr_recv) = mpsc::channel(config.sink_buffer_size());
let (hand_send, hand_recv) = mpsc::channel(config.wait_buffer_size());
let (sock_send, sock_recv) = mpsc::channel(config.done_buffer_size());
let filters = Filters::new();
let (handshake_timer, initiate_timer) = configured_handshake_timers(config.handshake_timeout(), config.connect_timeout());
handler::loop_handler(addr_recv, initiator::initiator_handler, hand_send.clone(), (transport, filters.clone(), handle.clone(), initiate_timer), &handle);
handler::loop_handler(listener, ListenerHandler::new, hand_send, filters.clone(), &handle);
handler::loop_handler(hand_recv, handshaker::execute_handshake, sock_send, (builder.ext, builder.pid, filters.clone(), handshake_timer), &handle);
let sink = HandshakerSink::new(addr_send, open_port, builder.pid, filters);
let stream = HandshakerStream::new(sock_recv);
Ok(Handshaker{ sink: sink, stream: stream })
}
}
fn configured_handshake_timers(duration_one: Duration, duration_two: Duration) -> (HandshakeTimer, HandshakeTimer) {
let timer = tokio_timer::wheel()
.num_slots(64)
.max_timeout(cmp::max(duration_one, duration_two))
.build();
(HandshakeTimer::new(timer.clone(), duration_one), HandshakeTimer::new(timer, duration_two))
}
impl<S> Sink for Handshaker<S> {
type SinkItem = InitiateMessage;
type SinkError = SendError<InitiateMessage>;
fn start_send(&mut self, item: InitiateMessage) -> StartSend<InitiateMessage, SendError<InitiateMessage>> {
self.sink.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), SendError<InitiateMessage>> {
self.sink.poll_complete()
}
}
impl<S> Stream for Handshaker<S> {
type Item = CompleteMessage<S>;
type Error = ();
fn poll(&mut self) -> Poll<Option<CompleteMessage<S>>, ()> {
self.stream.poll()
}
}
impl<S> HandshakeFilters for Handshaker<S> {
fn add_filter<F>(&self, filter: F)
where F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static {
self.sink.add_filter(filter);
}
fn remove_filter<F>(&self, filter: F)
where F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static {
self.sink.remove_filter(filter);
}
fn clear_filters(&self) {
self.sink.clear_filters();
}
}
#[derive(Clone)]
pub struct HandshakerSink {
send: Sender<InitiateMessage>,
port: u16,
pid: PeerId,
filters: Filters
}
impl HandshakerSink {
fn new(send: Sender<InitiateMessage>, port: u16, pid: PeerId, filters: Filters) -> HandshakerSink {
HandshakerSink{ send: send, port: port, pid: pid, filters: filters }
}
}
impl DiscoveryInfo for HandshakerSink {
fn port(&self) -> u16 {
self.port
}
fn peer_id(&self) -> PeerId {
self.pid
}
}
impl Sink for HandshakerSink {
type SinkItem = InitiateMessage;
type SinkError = SendError<InitiateMessage>;
fn start_send(&mut self, item: InitiateMessage) -> StartSend<InitiateMessage, SendError<InitiateMessage>> {
self.send.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), SendError<InitiateMessage>> {
self.send.poll_complete()
}
}
impl HandshakeFilters for HandshakerSink {
fn add_filter<F>(&self, filter: F)
where F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static {
self.filters.add_filter(filter);
}
fn remove_filter<F>(&self, filter: F)
where F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static {
self.filters.remove_filter(filter);
}
fn clear_filters(&self) {
self.filters.clear_filters();
}
}
pub struct HandshakerStream<S> {
recv: Receiver<CompleteMessage<S>>
}
impl<S> HandshakerStream<S> {
fn new(recv: Receiver<CompleteMessage<S>>) -> HandshakerStream<S> {
HandshakerStream{ recv: recv }
}
}
impl<S> Stream for HandshakerStream<S> {
type Item = CompleteMessage<S>;
type Error = ();
fn poll(&mut self) -> Poll<Option<CompleteMessage<S>>, ()> {
self.recv.poll()
}
}