1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
use futures::{Future, Async, Poll};
use {ReadState, WriteState, read_length, read, Error, Connection, error, Parameters};
use std::str::from_utf8;
use byteorder::{BigEndian, ByteOrder};
use std::fmt;
use msg::*;
use tokio_io::io::flush;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
/// A result row, implementing an iterator on fields. Each field is a
/// slice of bytes.
#[derive(Clone)]
pub struct Row<'a> {
len: u16,
buf: &'a [u8],
n: usize,
pos: usize,
}
impl<'a> fmt::Debug for Row<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let v: Vec<_> = self.clone().map(|x| from_utf8(x)).collect();
v.fmt(f)
}
}
impl<'a> Row<'a> {
/// Number of fields in this row.
pub fn len(&self) -> usize {
self.len as usize
}
}
impl<'a> Iterator for Row<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.buf.len() {
None
} else {
let pos = self.pos;
let len = BigEndian::read_i32(&self.buf[pos..]);
if len >= 0 {
let len = len as usize;
let result = &self.buf[pos + 4..pos + 4 + len];
self.pos += 4 + len;
self.n += 1;
Some(result)
} else {
self.pos += 4;
// self.n += 1;
Some(&self.buf[pos..pos])
}
}
}
}
/// Result rows, returned by the `query` method of
/// `Connection`.
pub struct Rows<W: AsyncRead + AsyncWrite, F: HandleRow, P: AsRef<Parameters>> {
#[doc(hidden)]
pub connection: Option<(Connection<W, P>, F)>,
#[doc(hidden)]
pub timeout: Option<Delay>,
#[doc(hidden)]
pub last_row_was_halted: bool,
}
/// Result from a request.
#[derive(Debug)]
pub enum RowResult<'a> {
/// An actual row.
Row(Row<'a>),
/// A completion signal, with the number of rows modified.
Complete(u32),
/// A termination signal, sent when there are no more rows.
NoMoreRows,
/// An error.
Err(&'a str),
}
/// Requests that can handle a resulting row.
#[allow(unused_variables)]
pub trait HandleRow {
/// Called on each row.
fn row(&mut self, row: Row) -> bool { true }
/// Called at the end of the request, with the number of modified rows.
fn complete<'a>(&mut self, modified: u32) {
debug!("complete, modified {:?}", modified);
}
/// Called when the server is ready for the next request.
fn ready_for_query<'a>(&mut self) {
debug!("ready_for_query");
}
/// This function gets called when the server returns an error.
fn err(&mut self, err: &str) {
debug!("err: {:?}", err);
}
}
impl<F: Fn(Row) -> bool> HandleRow for F {
fn row(&mut self, row: Row) -> bool {
(self)(row)
}
}
#[derive(PartialEq, Eq)]
enum RowPoll {
// The last row was halted.
Halted,
// All rows have been read (and possibly discarded after a halt).
Ready,
// There are still rows to be read.
NotReady,
}
impl<W: AsyncWrite + AsyncRead, F: HandleRow, P: AsRef<Parameters>> Future for Rows<W, F, P> {
type Item = ((Connection<W, P>, F), Option<Error>);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
debug!("rows poll");
let mut was_err = None;
let mut async_not_ready = false;
let ready = if let Some((ref mut c, ref mut f)) = self.connection {
match c.poll_rows(self.last_row_was_halted, f) {
Ok(Async::Ready(RowPoll::Halted)) => {
self.last_row_was_halted = true;
false
}
Ok(Async::Ready(RowPoll::Ready)) => true,
Ok(Async::Ready(RowPoll::NotReady)) => false,
Ok(Async::NotReady) => { async_not_ready = true; false },
Err(e) => {
error!("{:?}", e);
was_err = Some(e);
true
}
}
} else {
unreachable!()
};
debug!("rows: {:?} {:?} {:?}", ready, was_err, async_not_ready);
if ready {
return Ok(Async::Ready((self.connection.take().unwrap(), was_err)));
}
if async_not_ready {
if let Some(ref mut timeout) = self.timeout {
if let Ok(Async::Ready(())) = timeout.poll() {
debug!("Rows timeout fired (file {}, line {})", file!(), line!());
return Ok(Async::Ready((self.connection.take().unwrap(), Some(Error::Timeout))))
}
}
return Ok(Async::NotReady)
} else {
// We've received something, the timeout can be cancelled.
self.timeout = None
}
}
}
}
impl<W: AsyncRead + AsyncWrite, P: AsRef<Parameters>> Connection<W, P> {
fn poll_rows<R: HandleRow>(&mut self,
last_row_was_halted: bool,
f: &mut R)
-> Poll<RowPoll, Error> {
loop {
match self.write.take() {
Some(WriteState::Write { mut w }) => {
match try!(w.poll()) {
Async::Ready((stream, buf)) => {
self.write = Some(WriteState::Flush {
flush: flush(stream),
buf
});
},
Async::NotReady => {
self.write = Some(WriteState::Write { w });
return Ok(Async::NotReady);
}
}
}
Some(WriteState::Flush { mut flush, buf }) => {
match try!(flush.poll()) {
Async::Ready(w) => {
self.write = Some(WriteState::Idle { w, buf });
break
},
Async::NotReady => {
self.write = Some(WriteState::Flush { flush, buf });
return Ok(Async::NotReady);
}
}
}
Some(state) => {
self.write = Some(state);
break
},
None => {}
}
}
// If we're done writing, start reading.
loop {
match self.read.take() {
Some(ReadState::ReadLength(mut r)) => {
match try!(r.poll()) {
Async::Ready((w, buf)) => {
self.read = Some(read(w, buf))
}
Async::NotReady => {
self.read = Some(ReadState::ReadLength(r));
return Ok(Async::NotReady)
}
}
}
Some(ReadState::Read { msg, mut body }) => {
let (stream, buf) = match try!(body.poll()) {
Async::Ready((w, buf)) => (w, buf),
Async::NotReady => {
self.read = Some(ReadState::Read {
msg: msg,
body: body,
});
return Ok(Async::NotReady);
}
};
match msg {
/*MSG_ROW_DESCRIPTION => {
// This message type only happens when we send a "quick query".
{
let mut reader = reader(&buf[..]);
let n_fields = try!(reader.read_i16());
debug!("n_fields: {:?}", n_fields);
self.columns.clear();
for _ in 0..n_fields {
let name = try!(reader.read_string());
let col = Column {
table_id: try!(reader.read_i32()),
column_num: try!(reader.read_i16()),
data_type: try!(reader.read_i32()),
data_size: try!(reader.read_i16()),
type_modifier: try!(reader.read_i32()),
format: try!(reader.read_i16())
};
debug!("name {:?} {:?}", name, col);
self.columns.push(col);
}
}
self.state = Some(read_length(stream, buf));
Ok(Async::Ready(false))
}*/
MSG_DATA_ROW if !last_row_was_halted => {
debug!("data row");
let query_finished = {
let len = BigEndian::read_i16(&buf[..]);
let row = Row {
len: len as u16,
buf: &buf[..],
n: 0,
pos: 2,
};
if f.row(row) {
RowPoll::NotReady
} else {
RowPoll::Halted
}
};
self.read = Some(read_length(stream, buf));
return Ok(Async::Ready(query_finished))
}
MSG_COMMAND_COMPLETE => {
debug!("command_complete, buf = {:?}", buf);
// self.columns.clear();
// f.handle_row(RowResult::NoMoreRows);
{
if let Some(i) = (&buf[..]).iter().position(|&x| x == 0) {
let buf = &buf[..i];
let mut it = buf.split(|x| *x == b' ');
match (it.next(), it.next(), it.next()) {
(Some(b"TABLE"), _, _) => {
f.complete(0);
}
(Some(b"INSERT"), _, Some(rows)) |
(Some(_), Some(rows), _) => {
debug!("rows: {:?}", from_utf8(rows));
let rows:Result<u32, Error> =
from_utf8(rows).map_err(From::from)
.and_then(|x| x.parse::<u32>().map_err(From::from));
if let Ok(rows) = rows {
// debug!("{:?} {:?}", rows, parsed);
f.complete(rows);
}
}
_ => {
debug!("command complete: {:?}", buf);
}
}
} else {
return Err(Error::Protocol);
}
}
self.read = Some(read_length(stream, buf));
return Ok(Async::Ready(RowPoll::NotReady))
}
MSG_READY_FOR_QUERY => {
// self.columns.clear();
f.ready_for_query();
self.read = Some(read_length(stream, buf));
return Ok(Async::Ready(RowPoll::Ready))
}
MSG_ERROR => {
let mut into = String::new();
try!(error(&buf, &mut into));
self.read = Some(read_length(stream, buf));
return Err(Error::Postgres { message: into })
}
msg => {
debug!("unknown message {:?}", msg as char);
self.read = Some(read_length(stream, buf));
return Ok(Async::Ready(RowPoll::NotReady))
}
}
}
None => unreachable!()
}
}
}
}