use byteorder::{BigEndian, ByteOrder};
use futures::{Async, Future, Poll};
use msg::*;
use std::fmt;
use std::str::from_utf8;
use tokio_io::io::flush;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use {error, read, read_length, Connection, Error, Parameters, ReadState, WriteState};
#[derive(Clone)]
pub struct Row<'a> {
len: u16,
buf: &'a [u8],
n: usize,
pos: usize,
}
impl<'a> fmt::Debug for Row<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let v: Vec<_> = self.clone().map(|x| from_utf8(x)).collect();
v.fmt(f)
}
}
impl<'a> Row<'a> {
pub fn len(&self) -> usize {
self.len as usize
}
}
impl<'a> Iterator for Row<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.buf.len() {
None
} else {
let pos = self.pos;
let len = BigEndian::read_i32(&self.buf[pos..]);
if len >= 0 {
let len = len as usize;
let result = &self.buf[pos + 4..pos + 4 + len];
self.pos += 4 + len;
self.n += 1;
Some(result)
} else {
self.pos += 4;
Some(&self.buf[pos..pos])
}
}
}
}
pub struct Rows<W: AsyncRead + AsyncWrite, F: HandleRow, P: AsRef<Parameters>> {
#[doc(hidden)]
pub connection: Option<(Connection<W, P>, F)>,
#[doc(hidden)]
pub timeout: Option<Delay>,
#[doc(hidden)]
pub last_row_was_halted: bool,
}
#[derive(Debug)]
pub enum RowResult<'a> {
Row(Row<'a>),
Complete(u32),
NoMoreRows,
Err(&'a str),
}
#[allow(unused_variables)]
pub trait HandleRow {
fn row(&mut self, row: Row) -> bool {
true
}
fn complete<'a>(&mut self, modified: u32) {
debug!("complete, modified {:?}", modified);
}
fn ready_for_query<'a>(&mut self) {
debug!("ready_for_query");
}
fn err(&mut self, err: &str) {
debug!("err: {:?}", err);
}
}
impl<F: Fn(Row) -> bool> HandleRow for F {
fn row(&mut self, row: Row) -> bool {
(self)(row)
}
}
#[derive(PartialEq, Eq)]
enum RowPoll {
Halted,
Ready,
NotReady,
}
impl<W: AsyncWrite + AsyncRead, F: HandleRow, P: AsRef<Parameters>> Future for Rows<W, F, P> {
type Item = ((Connection<W, P>, F), Option<Error>);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
debug!("rows poll");
let mut was_err = None;
let mut async_not_ready = false;
let ready = if let Some((ref mut c, ref mut f)) = self.connection {
match c.poll_rows(self.last_row_was_halted, f) {
Ok(Async::Ready(RowPoll::Halted)) => {
self.last_row_was_halted = true;
false
}
Ok(Async::Ready(RowPoll::Ready)) => true,
Ok(Async::Ready(RowPoll::NotReady)) => false,
Ok(Async::NotReady) => {
async_not_ready = true;
false
}
Err(e) => {
error!("{:?}", e);
was_err = Some(e);
true
}
}
} else {
unreachable!()
};
debug!("rows: {:?} {:?} {:?}", ready, was_err, async_not_ready);
if ready {
return Ok(Async::Ready((self.connection.take().unwrap(), was_err)));
}
if async_not_ready {
if let Some(ref mut timeout) = self.timeout {
if let Ok(Async::Ready(())) = timeout.poll() {
debug!("Rows timeout fired (file {}, line {})", file!(), line!());
return Ok(Async::Ready((
self.connection.take().unwrap(),
Some(Error::Timeout),
)));
}
}
return Ok(Async::NotReady);
} else {
self.timeout = None
}
}
}
}
impl<W: AsyncRead + AsyncWrite, P: AsRef<Parameters>> Connection<W, P> {
fn poll_rows<R: HandleRow>(
&mut self,
last_row_was_halted: bool,
f: &mut R,
) -> Poll<RowPoll, Error> {
loop {
match self.write.take() {
Some(WriteState::Write { mut w }) => match try!(w.poll()) {
Async::Ready((stream, buf)) => {
self.write = Some(WriteState::Flush {
flush: flush(stream),
buf,
});
}
Async::NotReady => {
self.write = Some(WriteState::Write { w });
return Ok(Async::NotReady);
}
},
Some(WriteState::Flush { mut flush, buf }) => match try!(flush.poll()) {
Async::Ready(w) => {
self.write = Some(WriteState::Idle { w, buf });
break;
}
Async::NotReady => {
self.write = Some(WriteState::Flush { flush, buf });
return Ok(Async::NotReady);
}
},
Some(state) => {
self.write = Some(state);
break;
}
None => {}
}
}
loop {
match self.read.take() {
Some(ReadState::ReadLength(mut r)) => match try!(r.poll()) {
Async::Ready((w, buf)) => self.read = Some(read(w, buf)),
Async::NotReady => {
self.read = Some(ReadState::ReadLength(r));
return Ok(Async::NotReady);
}
},
Some(ReadState::Read { msg, mut body }) => {
let (stream, buf) = match try!(body.poll()) {
Async::Ready((w, buf)) => (w, buf),
Async::NotReady => {
self.read = Some(ReadState::Read {
msg: msg,
body: body,
});
return Ok(Async::NotReady);
}
};
match msg {
MSG_DATA_ROW if !last_row_was_halted => {
debug!("data row");
let query_finished = {
let len = BigEndian::read_i16(&buf[..]);
let row = Row {
len: len as u16,
buf: &buf[..],
n: 0,
pos: 2,
};
if f.row(row) {
RowPoll::NotReady
} else {
RowPoll::Halted
}
};
self.read = Some(read_length(stream, buf));
return Ok(Async::Ready(query_finished));
}
MSG_COMMAND_COMPLETE => {
debug!("command_complete, buf = {:?}", buf);
{
if let Some(i) = (&buf[..]).iter().position(|&x| x == 0) {
let buf = &buf[..i];
let mut it = buf.split(|x| *x == b' ');
match (it.next(), it.next(), it.next()) {
(Some(b"TABLE"), _, _) => {
f.complete(0);
}
(Some(b"INSERT"), _, Some(rows))
| (Some(_), Some(rows), _) => {
debug!("rows: {:?}", from_utf8(rows));
let rows: Result<u32, Error> = from_utf8(rows)
.map_err(From::from)
.and_then(|x| x.parse::<u32>().map_err(From::from));
if let Ok(rows) = rows {
f.complete(rows);
}
}
_ => {
debug!("command complete: {:?}", buf);
}
}
} else {
return Err(Error::Protocol);
}
}
self.read = Some(read_length(stream, buf));
return Ok(Async::Ready(RowPoll::NotReady));
}
MSG_READY_FOR_QUERY => {
f.ready_for_query();
self.ready_for_query = true;
self.read = Some(read_length(stream, buf));
return Ok(Async::Ready(RowPoll::Ready));
}
MSG_ERROR => {
let code = error(&buf)?;
self.read = Some(read_length(stream, buf));
return Err(Error::Postgres(code));
}
msg => {
debug!("unknown message {:?}", msg as char);
self.read = Some(read_length(stream, buf));
return Ok(Async::Ready(RowPoll::NotReady));
}
}
}
None => unreachable!(),
}
}
}
}