use postgres::types::ToSql;
use std::io::{Error, ErrorKind};
use err::HecateError;
use std::mem;
pub struct PGStream {
eot: bool, cursor: String,
pending: Option<Vec<u8>>,
trans: postgres::transaction::Transaction<'static>,
#[allow(dead_code)]
conn: Box<r2d2::PooledConnection<r2d2_postgres::PostgresConnectionManager>>
}
impl std::io::Read for PGStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut current = 0;
while current < buf.len() {
let mut write: Vec<u8> = Vec::new();
if self.pending.is_some() {
write = self.pending.clone().unwrap();
self.pending = None;
} else {
let rows = match self.trans.query(&*format!("FETCH 1000 FROM {};", &self.cursor), &[]) {
Ok(rows) => rows,
Err(err) => {
return Err(Error::new(ErrorKind::Other, format!("{:?}", err)))
}
};
if rows.len() != 0 {
for row_it in 0..rows.len() {
let feat: String = rows.get(row_it).get(0);
write.append(&mut feat.into_bytes().to_vec());
write.push(0x0A);
}
}
}
if write.len() == 0 && !self.eot {
write.push(0x04); self.eot = true;
}
if write.len() == 0 && self.eot {
break;
} else if current + write.len() > buf.len() {
for it in current..buf.len() {
buf[it] = write[it - current];
}
let pending = write[buf.len() - current..write.len()].to_vec();
self.pending = Some(pending);
current = current + (buf.len() - current);
break;
} else {
for it in 0..write.len() {
buf[current + it] = write[it];
}
current = current + write.len();
}
}
Ok(current)
}
}
impl PGStream {
pub fn new(conn: r2d2::PooledConnection<r2d2_postgres::PostgresConnectionManager>, cursor: String, query: String, params: &[&ToSql]) -> Result<Self, HecateError> {
let pg_conn = Box::new(conn);
let trans: postgres::transaction::Transaction = unsafe {
mem::transmute(pg_conn.transaction().unwrap())
};
match trans.execute(&*query, params) {
Ok(_) => {
Ok(PGStream {
eot: false,
cursor: cursor,
pending: None,
trans: trans,
conn: pg_conn
})
},
Err(err) => Err(HecateError::from_db(err))
}
}
}