irc3 0.3.0

An IRC3 client and server toolchain
Documentation
use std::pin::Pin;
use std::io::{Error as IoError};
use std::collections::VecDeque;
use std::fmt::{Debug, Error as FmtError, Formatter};

use futures::io::{AsyncRead, AsyncWrite};
use futures::task::{Context, Poll};
use futures::future::Future;
use futures::stream::Stream;
use futures::sink::Sink;

use super::message::{Message, ParseError};

pub const IRC_MESSAGE_SEP: &'static str = "\r\n";

pub enum MessageTransportError {
	Io(IoError),
	Parse(ParseError),
	Closed,
}

impl From<IoError> for MessageTransportError {
	fn from(e: IoError) -> MessageTransportError {
		MessageTransportError::Io(e)
	}
}

impl Debug for MessageTransportError {
	fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
		use MessageTransportError::*;
		match self {
			Io(e) => write!(f, "io: {}", e),
			Parse(e) => write!(f, "parse: {}", e),
			Closed => f.write_str("stream closed"),
		}
	}
}

/// A feed of incoming and sending IrcMessages over a utf8 socket.
///
/// `MessageTransport` implements both `Stream` and `Sink` to interact
/// with the server/client.
pub struct MessageTransport<R>
where R: AsyncRead + AsyncWrite + Unpin {
	write_buf: VecDeque<String>,
	read_buf: Vec<u8>,
	cursor: usize,
	pub(self) inner: R,
}

impl<R> MessageTransport<R>
where R: AsyncRead + AsyncWrite + Unpin {
	pub fn new(stream: R) -> MessageTransport<R> {
		MessageTransport {
			write_buf: VecDeque::new(),
			read_buf: Vec::with_capacity(512),
			cursor: 0,
			inner: stream,
		}
	}

	pub fn stream(&self) -> &R {
		&self.inner
	}

	pub fn stream_mut(&mut self) -> &mut R {
		&mut self.inner
	}

	#[deprecated(since="0.2.0", note="use the `Stream`/`Sink` API instead")]
	pub fn read_message<'a>(&'a mut self) -> MessageTransportReadFuture<'a, R> {
		MessageTransportReadFuture::new(self)
	}

	#[deprecated(since="0.2.0", note="use the `Stream`/`Sink` API instead")]
	pub fn write_message<'a>(&'a mut self, m: Message) -> MessageTransportWriteFuture<'a, R> {
		MessageTransportWriteFuture::new(self, m)
	}
}

impl<R> Stream for MessageTransport<R>
where R: AsyncRead + AsyncWrite + Unpin {
	type Item = Result<Message, MessageTransportError>;

	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
		while !self.read_buf.ends_with(IRC_MESSAGE_SEP.as_bytes()) {
			// create a single byte buffer
			let mut byte = [0u8; 1];

			match Pin::new(&mut self.inner).poll_read(cx, &mut byte) {
				Poll::Ready(res) => match res { // check result
					Ok(bytes) => {
						// check how many bytes were read
						// if the byte > 0, then we have read some bytes!
						if bytes > 0 {
							self.read_buf.push(byte[0]);
						} else {
							return Poll::Ready(None);
						}
					},
					Err(err) => return Poll::Ready(Some(Err(err.into()))),
				},
				Poll::Pending => return Poll::Pending,
			}
		}

		// strip bytes
		let stripped = &self.read_buf[..(self.read_buf.len()-IRC_MESSAGE_SEP.as_bytes().len())];

		// parse into utf8
		let lossy = String::from_utf8_lossy(stripped);

		// parse into message
		let msg = Poll::Ready(Some(match Message::parse(&lossy) {
			Ok(msg) => Ok(msg),
			Err(err) => Err(MessageTransportError::Parse(err)),
		}));

		// clear read buffer
		self.read_buf.clear();
		msg
	}
}

impl<R> Sink<Message> for MessageTransport<R>
where R: AsyncRead + AsyncWrite + Unpin {
	type Error = MessageTransportError;

	fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}

	fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
		self.write_buf.push_back(item.to_string() + IRC_MESSAGE_SEP);
		Ok(())
	}

	fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
		let front_len = match self.write_buf.front() {
			Some(front_ref) => front_ref.len(),
			None => return Poll::Ready(Ok(())),
		};

		while self.cursor < front_len {
			let byte: [u8; 1] = [self.write_buf.front().unwrap().as_bytes()[self.cursor]];

			match Pin::new(&mut self.inner).poll_write(cx, &byte) {
				Poll::Ready(res) => match res { // check our result
					Ok(bytes) => {
						// check how many bytes we wrote
						// if we have 0, then the stream is closed
						if bytes == 0 {
							return Poll::Ready(Err(MessageTransportError::Closed));
						}

						self.cursor += 1;
					},
					Err(err) => return Poll::Ready(Err(err.into())),
				},
				Poll::Pending => return Poll::Pending,
			}
		}

		// try to flush the result
		if let Poll::Ready(r) = Pin::new(&mut self.inner).poll_flush(cx) {
			if let Err(err) = r {
				Poll::Ready(Err(err.into()))
			} else {
				// pop buffer and reset cursor
				self.write_buf.pop_front();
				self.cursor = 0;

				Poll::Ready(Ok(()))
			}
		} else {
			Poll::Pending
		}
	}

	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
		match Pin::new(&mut self.inner).poll_close(cx) {
			Poll::Ready(res) => match res {
				Ok(()) => Poll::Ready(Ok(())),
				Err(err) => Poll::Ready(Err(err.into())),
			},
			Poll::Pending => Poll::Pending,
		}
	}
}

// DEPRECATED THINGS
pub struct MessageTransportReadFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
	buf: Vec<u8>,
	parent: &'a mut MessageTransport<R>,
}

impl<'a, R> MessageTransportReadFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
	pub fn new(t: &'a mut MessageTransport<R>) -> MessageTransportReadFuture<'a, R> {
		MessageTransportReadFuture {
			buf: Vec::with_capacity(512),
			parent: t,
		}
	}
}

impl<'a, R> Future for MessageTransportReadFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
	type Output = Result<Message, MessageTransportError>;

	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
		while !self.buf.ends_with(IRC_MESSAGE_SEP.as_bytes()) {
			// create a single byte buffer
			let mut byte = [0u8; 1];

			match Pin::new(&mut self.parent.inner).poll_read(cx, &mut byte) {
				Poll::Ready(res) => match res { // check result
					Ok(bytes) => {
						// check how many bytes were read
						// if the byte > 0, then we have read some bytes!
						if bytes > 0 {
							self.buf.push(byte[0]);
						} else {
							return Poll::Ready(Err(MessageTransportError::Closed));
						}
					},
					Err(err) => return Poll::Ready(Err(err.into())),
				},
				Poll::Pending => return Poll::Pending,
			}
		}

		// strip bytes
		let stripped = &self.buf[..(self.buf.len()-IRC_MESSAGE_SEP.as_bytes().len())];

		// parse into utf8
		let lossy = String::from_utf8_lossy(stripped);

		// parse into message
		Poll::Ready(match Message::parse(&lossy) {
			Ok(msg) => Ok(msg),
			Err(err) => Err(MessageTransportError::Parse(err)),
		})
	}
}

pub struct MessageTransportWriteFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
	message: String,
	cursor: usize,
	parent: &'a mut MessageTransport<R>,
}

impl<'a, R> MessageTransportWriteFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
	pub fn new(t: &'a mut MessageTransport<R>, m: Message) -> MessageTransportWriteFuture<'a, R> {
		MessageTransportWriteFuture {
			message: m.to_string() + "\r\n",
			cursor: 0,
			parent: t,
		}
	}
}

impl<'a, R> Future for MessageTransportWriteFuture<'a, R>
where R: AsyncRead + AsyncWrite + Unpin {
	type Output = Result<(), MessageTransportError>;

	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
		while self.cursor < self.message.len() {
			let byte: [u8; 1] = [self.message.as_bytes()[self.cursor]];

			match Pin::new(&mut self.parent.inner).poll_write(cx, &byte) {
				Poll::Ready(res) => match res { // check our result
					Ok(bytes) => {
						// check how many bytes we wrote
						// if we have 0, then the stream is closed
						if bytes == 0 {
							return Poll::Ready(Err(MessageTransportError::Closed));
						}

						self.cursor += 1;
					},
					Err(err) => return Poll::Ready(Err(err.into())),
				},
				Poll::Pending => return Poll::Pending,
			}
		}

		// try to flush the result
		if let Poll::Ready(r) = Pin::new(&mut self.parent.inner).poll_flush(cx) {
			if let Err(err) = r {
				Poll::Ready(Err(err.into()))
			} else {
				Poll::Ready(Ok(()))
			}
		} else {
			Poll::Pending
		}
	}
}