use std::io;
use std::io::{Read, Write};
use std::io::ErrorKind::WouldBlock;
use std::io::Cursor;
use std::mem;
use queen_io::tcp::TcpStream;
use queen_io::Poll;
use queen_io::Token;
use queen_io::Ready;
use queen_io::PollOpt;
use queen_io::Evented;
use queen_io::channel::Sender;
use byteorder::{LittleEndian, ReadBytesExt};
use wire_protocol::message::Message;
use wire_protocol::header::Header;
use super::bufstream::Stream;
use super::service::ServiceEvent;
pub struct Connection {
socket: TcpStream,
token: Token,
interest: Ready,
stream: Stream,
}
impl Connection {
pub fn new(socket: TcpStream, token: Token) -> io::Result<Connection> {
let conn = Connection {
socket: socket,
token: token,
interest: Ready::readable() | Ready::hup(),
stream: Stream::new(),
};
Ok(conn)
}
pub fn reader(&mut self, poll: &Poll, tx_out: &Sender<ServiceEvent>) -> io::Result<()> {
self.interest.remove(Ready::readable());
loop {
let mut buf = [0; 8 * 1024];
match self.socket.read(&mut buf) {
Ok(size) => {
if size == 0 {
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ConnectionAborted"))
} else {
self.stream.reader.extend_from_slice(&buf[0..size]);
}
}
Err(err) => {
if let WouldBlock = err.kind() {
break;
} else {
return Err(err)
}
}
}
if self.stream.reader.len() > 32 * 1024 * 1024 {
break;
}
}
self.send_message(tx_out)?;
if self.stream.reader.len() < 16 * 1024 * 1024 {
self.interest.insert(Ready::readable());
}
if self.stream.writer.len() > 0 {
self.interest.insert(Ready::writable());
}
self.reregister_insterest(poll);
Ok(())
}
pub fn writer(&mut self, poll: &Poll) -> io::Result<()>{
self.interest.remove(Ready::writable());
loop {
match self.socket.write(&self.stream.writer) {
Ok(size) => {
if size == 0 {
return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ConnectionAborted"))
} else {
if size < self.stream.writer.len() {
self.stream.writer = self.stream.writer.split_off(size);
} else {
self.stream.writer.clear();
break;
}
}
}
Err(err) => {
if let WouldBlock = err.kind() {
break;
} else {
return Err(err)
}
}
}
}
if self.stream.reader.len() < 16 * 1024 * 1024 {
self.interest.insert(Ready::readable());
}
if self.stream.writer.len() > 0 {
self.interest.insert(Ready::writable());
}
self.reregister_insterest(poll);
Ok(())
}
pub fn register_insterest(&self, poll: &Poll) {
self.register(
poll,
self.token,
self.interest,
PollOpt::edge() | PollOpt::oneshot()
).unwrap();
}
pub fn reregister_insterest(&mut self, poll: &Poll) {
self.reregister(
poll,
self.token,
self.interest,
PollOpt::edge() | PollOpt::oneshot()
).unwrap();
}
pub fn send_message(&mut self, tx_out: &Sender<ServiceEvent>) -> io::Result<()> {
loop {
if self.stream.reader.len() < mem::size_of::<Header>() {
return Ok(())
}
let message_length = {
Cursor::new(&self.stream.reader).read_i32::<LittleEndian>()? as usize
};
if self.stream.reader.len() < message_length {
return Ok(())
}
let (message, position) = {
let mut cursor = Cursor::new(&self.stream.reader);
let message = match Message::read(&mut cursor)? {
Some(message) => message,
None => return Ok(()),
};
(message, cursor.position())
};
self.stream.reader = self.stream.reader.split_off(position as usize);
let _ = tx_out.send(ServiceEvent::Message(self.token.into(), message));
}
}
pub fn recv_message(&mut self, poll: &Poll, message: Message) -> io::Result<()> {
message.write(&mut self.stream.writer)?;
if self.stream.reader.len() < 16 * 1024 * 1024 {
self.interest.insert(Ready::readable());
}
if self.stream.writer.len() > 0 {
self.interest.insert(Ready::writable());
}
self.reregister_insterest(poll);
Ok(())
}
}
impl Evented for Connection {
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
-> io::Result<()>
{
self.socket.register(poll, token, interest, opts)
}
fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt)
-> io::Result<()>
{
self.socket.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.socket.deregister(poll)
}
}