use std::pin::Pin;
use std::io::{Error as IoError};
use std::collections::VecDeque;
use std::fmt::{Debug, Error as FmtError, Formatter};
use futures::io::{AsyncRead, AsyncWrite};
use futures::task::{Context, Poll};
use futures::future::Future;
use futures::stream::Stream;
use futures::sink::Sink;
use super::message::{Message, ParseError};
pub const IRC_MESSAGE_SEP: &'static str = "\r\n";
pub enum MessageTransportError {
Io(IoError),
Parse(ParseError),
Closed,
}
impl From<IoError> for MessageTransportError {
fn from(e: IoError) -> MessageTransportError {
MessageTransportError::Io(e)
}
}
impl Debug for MessageTransportError {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
use MessageTransportError::*;
match self {
Io(e) => write!(f, "io: {}", e),
Parse(e) => write!(f, "parse: {}", e),
Closed => f.write_str("stream closed"),
}
}
}
pub struct MessageTransport<R>
where R: AsyncRead + AsyncWrite + Unpin {
write_buf: VecDeque<String>,
read_buf: Vec<u8>,
cursor: usize,
pub(self) inner: R,
}
impl<R> MessageTransport<R>
where R: AsyncRead + AsyncWrite + Unpin {
pub fn new(stream: R) -> MessageTransport<R> {
MessageTransport {
write_buf: VecDeque::new(),
read_buf: Vec::with_capacity(512),
cursor: 0,
inner: stream,
}
}
pub fn stream(&self) -> &R {
&self.inner
}
pub fn stream_mut(&mut self) -> &mut R {
&mut self.inner
}
#[deprecated(since="0.2.0", note="use the `Stream`/`Sink` API instead")]
pub fn read_message<'a>(&'a mut self) -> MessageTransportReadFuture<'a, R> {
MessageTransportReadFuture::new(self)
}
#[deprecated(since="0.2.0", note="use the `Stream`/`Sink` API instead")]
pub fn write_message<'a>(&'a mut self, m: Message) -> MessageTransportWriteFuture<'a, R> {
MessageTransportWriteFuture::new(self, m)
}
}
impl<R> Stream for MessageTransport<R>
where R: AsyncRead + AsyncWrite + Unpin {
type Item = Result<Message, MessageTransportError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
while !self.read_buf.ends_with(IRC_MESSAGE_SEP.as_bytes()) {
let mut byte = [0u8; 1];
match Pin::new(&mut self.inner).poll_read(cx, &mut byte) {
Poll::Ready(res) => match res {
Ok(bytes) => {
if bytes > 0 {
self.read_buf.push(byte[0]);
} else {
return Poll::Ready(None);
}
},
Err(err) => return Poll::Ready(Some(Err(err.into()))),
},
Poll::Pending => return Poll::Pending,
}
}
let stripped = &self.read_buf[..(self.read_buf.len()-IRC_MESSAGE_SEP.as_bytes().len())];
let lossy = String::from_utf8_lossy(stripped);
let msg = Poll::Ready(Some(match Message::parse(&lossy) {
Ok(msg) => Ok(msg),
Err(err) => Err(MessageTransportError::Parse(err)),
}));
self.read_buf.clear();
msg
}
}
impl<R> Sink<Message> for MessageTransport<R>
where R: AsyncRead + AsyncWrite + Unpin {
type Error = MessageTransportError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
self.write_buf.push_back(item.to_string() + IRC_MESSAGE_SEP);
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let front_len = match self.write_buf.front() {
Some(front_ref) => front_ref.len(),
None => return Poll::Ready(Ok(())),
};
while self.cursor < front_len {
let byte: [u8; 1] = [self.write_buf.front().unwrap().as_bytes()[self.cursor]];
match Pin::new(&mut self.inner).poll_write(cx, &byte) {
Poll::Ready(res) => match res {
Ok(bytes) => {
if bytes == 0 {
return Poll::Ready(Err(MessageTransportError::Closed));
}
self.cursor += 1;
},
Err(err) => return Poll::Ready(Err(err.into())),
},
Poll::Pending => return Poll::Pending,
}
}
if let Poll::Ready(r) = Pin::new(&mut self.inner).poll_flush(cx) {
if let Err(err) = r {
Poll::Ready(Err(err.into()))
} else {
self.write_buf.pop_front();
self.cursor = 0;
Poll::Ready(Ok(()))
}
} else {
Poll::Pending
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.inner).poll_close(cx) {
Poll::Ready(res) => match res {
Ok(()) => Poll::Ready(Ok(())),
Err(err) => Poll::Ready(Err(err.into())),
},
Poll::Pending => Poll::Pending,
}
}
}
pub struct MessageTransportReadFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
buf: Vec<u8>,
parent: &'a mut MessageTransport<R>,
}
impl<'a, R> MessageTransportReadFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
pub fn new(t: &'a mut MessageTransport<R>) -> MessageTransportReadFuture<'a, R> {
MessageTransportReadFuture {
buf: Vec::with_capacity(512),
parent: t,
}
}
}
impl<'a, R> Future for MessageTransportReadFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
type Output = Result<Message, MessageTransportError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
while !self.buf.ends_with(IRC_MESSAGE_SEP.as_bytes()) {
let mut byte = [0u8; 1];
match Pin::new(&mut self.parent.inner).poll_read(cx, &mut byte) {
Poll::Ready(res) => match res {
Ok(bytes) => {
if bytes > 0 {
self.buf.push(byte[0]);
} else {
return Poll::Ready(Err(MessageTransportError::Closed));
}
},
Err(err) => return Poll::Ready(Err(err.into())),
},
Poll::Pending => return Poll::Pending,
}
}
let stripped = &self.buf[..(self.buf.len()-IRC_MESSAGE_SEP.as_bytes().len())];
let lossy = String::from_utf8_lossy(stripped);
Poll::Ready(match Message::parse(&lossy) {
Ok(msg) => Ok(msg),
Err(err) => Err(MessageTransportError::Parse(err)),
})
}
}
pub struct MessageTransportWriteFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
message: String,
cursor: usize,
parent: &'a mut MessageTransport<R>,
}
impl<'a, R> MessageTransportWriteFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
pub fn new(t: &'a mut MessageTransport<R>, m: Message) -> MessageTransportWriteFuture<'a, R> {
MessageTransportWriteFuture {
message: m.to_string() + "\r\n",
cursor: 0,
parent: t,
}
}
}
impl<'a, R> Future for MessageTransportWriteFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
type Output = Result<(), MessageTransportError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
while self.cursor < self.message.len() {
let byte: [u8; 1] = [self.message.as_bytes()[self.cursor]];
match Pin::new(&mut self.parent.inner).poll_write(cx, &byte) {
Poll::Ready(res) => match res {
Ok(bytes) => {
if bytes == 0 {
return Poll::Ready(Err(MessageTransportError::Closed));
}
self.cursor += 1;
},
Err(err) => return Poll::Ready(Err(err.into())),
},
Poll::Pending => return Poll::Pending,
}
}
if let Poll::Ready(r) = Pin::new(&mut self.parent.inner).poll_flush(cx) {
if let Err(err) = r {
Poll::Ready(Err(err.into()))
} else {
Poll::Ready(Ok(()))
}
} else {
Poll::Pending
}
}
}