bip_peer 0.5.0

Communication with bittorrent peers via peer wire protocol
Documentation
extern crate bip_peer;
extern crate bip_handshake;
extern crate bip_util;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use std::io;

use futures::{StartSend, Poll};
use futures::sink::{Sink};
use futures::stream::{Stream};
use futures::sync::mpsc::{self, Sender, Receiver};

mod peer_manager_send_backpressure;

pub struct ConnectedChannel<I, O> {
    send: Sender<I>,
    recv: Receiver<O>
}

impl<I, O> Sink for ConnectedChannel<I, O> {
    type SinkItem = I;
    type SinkError = io::Error;

    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
        self.send.start_send(item).map_err(|_| io::Error::new(io::ErrorKind::ConnectionAborted, "Sender Failed To Send"))
    }

    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
        self.send.poll_complete().map_err(|_| io::Error::new(io::ErrorKind::ConnectionAborted, "Sender Failed To Send"))
    }
}

impl<I, O> Stream for ConnectedChannel<I, O> {
    type Item = O;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        self.recv.poll().map_err(|_| io::Error::new(io::ErrorKind::Other, "Receiver Failed To Receive"))
    }
}

pub fn connected_channel<I, O>(capacity: usize) -> (ConnectedChannel<I, O>, ConnectedChannel<O, I>) {
    let (send_one, recv_one) = mpsc::channel(capacity);
    let (send_two, recv_two) = mpsc::channel(capacity);

    (ConnectedChannel{ send: send_one, recv: recv_two}, ConnectedChannel{ send: send_two, recv: recv_one })
}