use bytes::BytesMut;
use futures::{Future, Stream};
use tokio_io::codec::length_delimited::{Builder, Framed};
use tokio_io::AsyncRead;
use tokio_io::AsyncWrite;
use rain_core::errors::{Error, Result};
pub type SendType = Vec<u8>;
#[derive(Debug)]
pub struct Sender {
channel_sender: ::futures::unsync::mpsc::UnboundedSender<SendType>,
}
impl Sender {
pub fn send(&self, data: SendType) {
self.channel_sender.unbounded_send(data).unwrap()
}
}
pub fn create_protocol_stream<S>(stream: S) -> Framed<S, SendType>
where
S: AsyncRead + AsyncWrite + 'static,
{
Builder::new()
.little_endian()
.max_frame_length(128 * 1024 * 1024 )
.new_framed(stream)
}
pub struct Connection<S>
where
S: AsyncRead + AsyncWrite + 'static,
{
stream: Framed<S, SendType>,
channel_receiver: ::futures::unsync::mpsc::UnboundedReceiver<SendType>,
channel_sender: ::futures::unsync::mpsc::UnboundedSender<SendType>,
}
impl<S> Connection<S>
where
S: AsyncRead + AsyncWrite + 'static,
{
pub fn from(stream: Framed<S, SendType>) -> Self {
let (channel_sender, channel_receiver) = ::futures::unsync::mpsc::unbounded();
Connection {
stream,
channel_receiver,
channel_sender,
}
}
pub fn sender(&self) -> Sender {
Sender {
channel_sender: self.channel_sender.clone(),
}
}
pub fn start_future<OnMessage>(
self,
on_message: OnMessage,
) -> Box<Future<Item = (), Error = Error>>
where
OnMessage: Fn(BytesMut) -> Result<()> + 'static,
{
let Connection {
stream,
channel_receiver: receiver,
..
} = self;
let (write, read) = stream.split();
let send_future = receiver
.map_err(|_| panic!("Send channel failed!"))
.forward(write)
.map(|_| ());
let read_future = read.map_err(|e| e.into()).for_each(on_message);
Box::new(
read_future
.select(send_future)
.map(|_| ())
.map_err(|(e, _)| e),
)
}
}