#![allow(uncommon_codepoints)]
#[cfg(test)] #[macro_use] extern crate lazy_static;
#[cfg(test)] #[macro_use] extern crate serde_derive;
#[macro_use] extern crate gstuff;
extern crate atomic;
extern crate chrono;
extern crate futures;
extern crate inlinable_string;
extern crate itertools;
extern crate libc;
extern crate nix;
extern crate serde_json;
extern crate serde;
use atomic::Atomic;
use futures::{Future, Poll, Async};
use futures::task::{current, Task};
use gstuff::now_float;
use inlinable_string::InlinableString;
use libc::{c_char, c_int, c_void, size_t};
use nix::fcntl::OFlag;
use nix::poll::{self, PollFlags, PollFd};
use serde_json::{self as json, Value as Json};
use serde::de::DeserializeOwned;
use std::cmp::max;
use std::collections::VecDeque;
use std::convert::From;
use std::error::Error;
use std::ffi::{CString, CStr};
use std::fmt;
use std::io::{self, Write};
use std::mem::MaybeUninit;
use std::os::fd::{BorrowedFd, IntoRawFd};
use std::ptr::null_mut;
use std::slice::from_raw_parts;
use std::str::{from_utf8_unchecked, from_utf8, Utf8Error};
use std::sync::{Arc, Mutex, PoisonError};
use std::sync::atomic::{Ordering};
use std::sync::mpsc::{channel, Sender, Receiver, SendError, TryRecvError};
use std::thread::JoinHandle;
#[cfg(test)] mod tests;
#[derive(Debug)]
pub enum PgQueryPiece {
Static (&'static str),
Plain (String),
Literal (String),
InlLiteral (InlinableString),
Bytea (Vec<u8>)}
#[derive (Debug)]
pub enum PgSchedulingMode {
AnythingGoes,
PinToConnection (u8)}
impl Default for PgSchedulingMode {
fn default() -> PgSchedulingMode {
PgSchedulingMode::AnythingGoes}}
#[derive (Default)]
pub struct PgOperation {
pub query_pieces: Vec<PgQueryPiece>,
pub statements: u32,
pub scheduling: PgSchedulingMode,
pub on_escape: Option<Box<dyn Fn(&str) + Send + Sync + 'static>>,
pub timeouts_at: f64}
impl fmt::Debug for PgOperation {
fn fmt (&self, fm: &mut fmt::Formatter) -> fmt::Result {
write! (fm, "PgOperation {{query_pieces: {:?}, statements: {}, scheduling: {:?}, on_escape: {}}}",
self.query_pieces, self.statements, self.scheduling, if self.on_escape.is_some() {"Some"} else {"None"})}}
pub trait IntoQueryPieces {
fn into_query_pieces (self) -> PgOperation;}
impl IntoQueryPieces for String {
fn into_query_pieces (self) -> PgOperation {
PgOperation {statements: 1, query_pieces: vec! [PgQueryPiece::Plain (self)], ..Default::default()}}}
impl IntoQueryPieces for &'static str {
fn into_query_pieces (self) -> PgOperation {
PgOperation {statements: 1, query_pieces: vec! [PgQueryPiece::Static (self)], ..Default::default()}}}
impl IntoQueryPieces for Vec<PgQueryPiece> {
fn into_query_pieces (self) -> PgOperation {
PgOperation {statements: 1, query_pieces: self, ..Default::default()}}}
impl IntoQueryPieces for (u32, &'static str) {
fn into_query_pieces (self) -> PgOperation {
PgOperation {statements: self.0, query_pieces: vec! [PgQueryPiece::Static (self.1)], ..Default::default()}}}
impl IntoQueryPieces for (u32, String) {
fn into_query_pieces (self) -> PgOperation {
PgOperation {statements: self.0, query_pieces: vec! [PgQueryPiece::Plain (self.1)], ..Default::default()}}}
impl IntoQueryPieces for (u32, Vec<PgQueryPiece>) {
fn into_query_pieces (self) -> PgOperation {
PgOperation {statements: self.0, query_pieces: self.1, ..Default::default()}}}
impl IntoQueryPieces for (u32, Vec<PgQueryPiece>, f32) {
fn into_query_pieces (self) -> PgOperation {
PgOperation {statements: self.0, query_pieces: self.1, timeouts_at: now_float() + self.2 as f64, ..Default::default()}}}
impl IntoQueryPieces for PgOperation {
fn into_query_pieces (self) -> PgOperation {self}}
pub struct PgRow<'a> (&'a PgResult, u32);
impl<'a> PgRow<'a> {
pub fn is_null (&self, column: u32) -> bool {
if column > self.0.columns {panic! ("Column index {} is out of range (0..{})", column, self.0.columns)}
1 == unsafe {pq::PQgetisnull ((self.0).res, self.1 as c_int, column as c_int)}}
pub fn ftype (&self, column: u32) -> pq::Oid {
if column > self.0.columns {panic! ("Column index {} is out of range (0..{})", column, self.0.columns)}
unsafe {pq::PQftype ((self.0).res, column as c_int)}}
pub fn fname (&'a self, column: u32) -> Result<&'a str, Utf8Error> {
self.0.fname (column)}
pub fn num (&self) -> u32 {self.1}
pub fn len (&self) -> u32 {self.0.columns}
pub fn col (&self, column: u32) -> &'a [u8] {
if column > self.0.columns {panic! ("Column index {} is out of range (0..{})", column, self.0.columns)}
let len = unsafe {pq::PQgetlength ((self.0).res, self.1 as c_int, column as c_int)};
let val = unsafe {pq::PQgetvalue ((self.0).res, self.1 as c_int, column as c_int)};
unsafe {from_raw_parts (val as *const u8, len as usize)}}
pub fn col_str (&self, column: u32) -> Result<&'a str, std::str::Utf8Error> {
if column > self.0.columns {panic! ("Column index {} is out of range (0..{})", column, self.0.columns)}
unsafe {CStr::from_ptr (pq::PQgetvalue ((self.0).res, self.1 as c_int, column as c_int))} .to_str()}
pub fn bytea (&self, column: u32) -> Vec<u8> {
if column > self.0.columns {panic! ("Column index {} is out of range (0..{})", column, self.0.columns)}
let mut len: size_t = 0;
let mut vec = Vec::new();
unsafe {
let value = pq::PQgetvalue ((self.0).res, self.1 as c_int, column as c_int);
let bytes = pq::PQunescapeBytea (value as *const u8, &mut len);
if bytes != null_mut() {
vec.reserve_exact (len);
vec.extend_from_slice (from_raw_parts (bytes, len));
pq::PQfreemem (bytes as *mut c_void);}}
vec}
pub fn col_json (&self, column: u32, name: &str) -> Result<Json, PgFutureErr> {
Ok (if self.is_null (column) {
Json::Null
} else {
match self.ftype (column) { 16 => Json::Bool (self.col (column) == b"t"), 18 => { let slice = self.col (column);
Json::Number ((if slice.is_empty() {0} else {slice[0]}) .into())},
20 | 21 | 23 => Json::Number ((self.col_str (column) ?.parse::<i64>()?).into()), 25 | 1042 | 1043 | 3614 => Json::String (from_utf8 (self.col (column)) ?.into()), 114 | 3802 => json::from_slice (self.col (column)) ?, 700 | 701 => { let f: f64 = self.col_str (column) ?.parse()?;
Json::Number (json::Number::from_f64 (f) .ok_or ("The float is not a JSON number") ?)},
705 => Json::String (from_utf8 (self.col (column)) ?.into()), 1184 => { let ts = unsafe {from_utf8_unchecked (self.col (column))};
#[allow(invalid_value)] let mut buf: [u8; 128] = unsafe {MaybeUninit::uninit().assume_init()};
let buf = gstring! (buf, {
write! (buf, "{}", ts) .expect ("!write");
if ts.chars().rev().take_while (|ch| ch.is_digit (10)) .count() == 2 {write! (buf, "00") .expect ("!write")}});
let dt = match chrono::DateTime::parse_from_str (buf, "%Y-%m-%d %H:%M:%S%.f%z") {
Ok (dt) => dt,
Err (err) => panic! ("!parse_from_str ({}): {}", buf, err)};
let f = dt.timestamp() as f64 + (dt.timestamp_subsec_millis() as f64 / 1000.0);
Json::Number (json::Number::from_f64 (f) .ok_or ("Timestamp is not a JSON number") ?)},
oid if oid > 16000 => {
let bytes = self.col (column);
if bytes.is_empty() {return Err (PgFutureErr::UnknownType (String::from (name), oid))}
for &ch in bytes.iter() {
if !is_alphabetic (ch) && !is_digit (ch) && ch != b'_' && ch != b'-' && ch != b'.' {
return Err (PgFutureErr::UnknownType (String::from (name), oid))}}
Json::String (unsafe {from_utf8_unchecked (bytes)} .into())},
oid => return Err (PgFutureErr::UnknownType (String::from (name), oid))}})}
pub fn col_deserialize<T: DeserializeOwned> (&self, column: u32, name: &str) -> Result<T, PgFutureErr> {
if !self.is_null (column) {
match self.ftype (column) {
114 | 3802 => return Ok (json::from_slice (self.col (column)) ?), _ => ()}}
Ok (json::from_value (self.col_json (column, name) ?) ?)}
pub fn to_json (&self) -> Result<Json, PgFutureErr> {
let mut jrow: json::Map<String, Json> = json::Map::new();
for column in 0 .. self.0.columns {
let name = self.fname (column) ?;
let jval = self.col_json (column, name) ?;
jrow.insert (String::from (name), jval);}
Ok (Json::Object (jrow))}}
fn is_alphabetic (ch:u8) -> bool {
(ch >= 0x41 && ch <= 0x5A) || (ch >= 0x61 && ch <= 0x7A)}
fn is_digit (ch: u8) -> bool {
ch >= 0x30 && ch <= 0x39}
pub struct PgResult {
pub res: *mut pq::PGresult,
pub rows: u32,
pub columns: u32}
unsafe impl Sync for PgResult {}
unsafe impl Send for PgResult {}
impl PgResult {
pub fn is_empty (&self) -> bool {self.rows == 0}
pub fn len (&self) -> u32 {self.rows}
pub fn ftype (&self, column: u32) -> pq::Oid {
if column > self.columns {panic! ("Column index {} is out of range (0..{})", column, self.columns)}
unsafe {pq::PQftype (self.res, column as c_int)}}
pub fn fname<'a> (&'a self, column: u32) -> Result<&'a str, Utf8Error> {
let name = unsafe {pq::PQfname (self.res, column as c_int)};
if name == null_mut() {panic! ("Column index {} is out of range (0..{})", column, self.columns)}
unsafe {CStr::from_ptr (name)} .to_str()}
pub fn row (&self, row: u32) -> PgRow<'_> {
if row >= self.rows {panic! ("Row index {} is out of range (0..{})", row, self.rows)}
PgRow (self, row)}
pub fn iter<'a> (&'a self) -> PgResultIt<'a> {
PgResultIt {pr: self, row: 0}}
pub fn to_json (&self) -> Result<Json, PgFutureErr> {
let mut jrows: Vec<Json> = Vec::with_capacity (self.len() as usize);
for row in self.iter() {jrows.push (row.to_json()?)}
Ok (Json::Array (jrows))}
pub fn deserialize<T: DeserializeOwned> (&self) -> Result<Vec<T>, PgFutureErr> {
let mut jrows: Vec<T> = Vec::with_capacity (self.len() as usize);
for row in self.iter() {jrows.push (json::from_value (row.to_json()?) ?)}
Ok (jrows)}}
impl Drop for PgResult {
fn drop (&mut self) {
assert! (self.res != null_mut());
unsafe {pq::PQclear (self.res)}}}
impl fmt::Debug for PgResult {
fn fmt (&self, ft: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write! (ft, "PgResult")}}
pub struct PgResultIt<'a> {pr: &'a PgResult, row: u32}
impl<'a> Iterator for PgResultIt<'a> {
type Item = PgRow<'a>;
fn next (&mut self) -> Option<PgRow<'a>> {
if self.row < self.pr.rows {
let row = PgRow (self.pr, self.row);
self.row += 1;
Some (row)
} else {None}}}
struct PgFutureSync {
results: Vec<PgResult>,
task: Option<Task>}
struct PgFutureImpl {
id: u64,
op: PgOperation,
sync: Mutex<PgFutureSync>,
miscarried: Option<Box<PgFutureErr>>}
#[derive(Clone)]
pub struct PgSqlErr {
imp: Arc<PgFutureImpl>,
pub num: u32,
pub status: &'static str,
pub sqlstate: String,
pub message: String}
#[derive(Clone)]
pub enum PgFutureErr {
PoisonError,
SendError (String),
NixError (nix::Error),
Utf8Error (Utf8Error),
Sql (PgSqlErr),
Json (Arc<serde_json::Error>),
Int (std::num::ParseIntError),
Float (std::num::ParseFloatError),
UnknownType (String, pq::Oid),
Str (&'static str)}
impl PgFutureErr {
pub fn pg_timeout (&self) -> bool {
if let &PgFutureErr::Sql (ref pg_sql_err) = self {
if pg_sql_err.sqlstate == "57014" {
return true}}
false}}
impl fmt::Debug for PgFutureErr {
fn fmt (&self, ft: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match self {
&PgFutureErr::PoisonError => ft.write_str ("PgFutureErr::PoisonError"),
&PgFutureErr::SendError (ref err) => write! (ft, "PgFutureErr::SendError ({})", err),
&PgFutureErr::NixError (ref err) => write! (ft, "PgFutureErr::NixError ({:?})", err),
&PgFutureErr::Utf8Error (ref err) => write! (ft, "PgFutureErr::Utf8Error ({:?})", err),
&PgFutureErr::Sql (ref se) => write! (ft, "PgFutureErr ({:?}, {}, {})", se.imp.op.query_pieces, se.sqlstate, se.message),
&PgFutureErr::Json (ref err) => write! (ft, "PgFutureErr::Json ({:?})", err),
&PgFutureErr::Int (ref err) => write! (ft, "PgFutureErr::Int ({:?})", err),
&PgFutureErr::Float (ref err) => write! (ft, "PgFutureErr::Float ({:?})", err),
&PgFutureErr::UnknownType (ref fname, oid) => write! (ft, "PgFutureErr::UnknownType (fname '{}', oid {})", fname, oid),
&PgFutureErr::Str (s) => ft.write_str (s)}}}
impl fmt::Display for PgFutureErr {
fn fmt (&self, ft: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match self {
&PgFutureErr::PoisonError => ft.write_str ("PgFutureErr::PoisonError"),
&PgFutureErr::SendError (ref err) => write! (ft, "PgFutureErr::SendError ({})", err),
&PgFutureErr::NixError (ref err) => write! (ft, "PgFutureErr::NixError ({})", err),
&PgFutureErr::Utf8Error (ref err) => write! (ft, "PgFutureErr::Utf8Error ({})", err),
&PgFutureErr::Sql (ref se) => write! (ft, "{}; {}", se.sqlstate, se.message),
&PgFutureErr::Json (ref err) => write! (ft, "PgFutureErr::Json ({})", err),
&PgFutureErr::Int (ref err) => write! (ft, "PgFutureErr::Int ({})", err),
&PgFutureErr::Float (ref err) => write! (ft, "PgFutureErr::Float ({})", err),
&PgFutureErr::UnknownType (ref fname, oid) => write! (ft, "Column '{}' has unfamiliar OID {}", fname, oid),
&PgFutureErr::Str (s) => ft.write_str (s)}}}
impl Error for PgFutureErr {
fn description (&self) -> &str {
match self {
&PgFutureErr::PoisonError => "PgFutureErr::PoisonError",
&PgFutureErr::SendError (_) => "PgFutureErr::SendError",
&PgFutureErr::NixError (_) => "PgFutureErr::NixError",
&PgFutureErr::Utf8Error (_) => "PgFutureErr::Utf8Error",
&PgFutureErr::Sql (ref se) => &se.message[..],
&PgFutureErr::Json (_) => "PgFutureErr::Json",
&PgFutureErr::Int (_) => "PgFutureErr::Int",
&PgFutureErr::Float (_) => "PgFutureErr::Float",
&PgFutureErr::UnknownType (..) => "PgFutureErr::UnknownType",
&PgFutureErr::Str (s) => s}}}
impl<T> From<PoisonError<T>> for PgFutureErr {
fn from (_err: PoisonError<T>) -> PgFutureErr {
PgFutureErr::PoisonError}}
impl From<Utf8Error> for PgFutureErr {
fn from (err: Utf8Error) -> PgFutureErr {
PgFutureErr::Utf8Error (err)}}
impl From<serde_json::Error> for PgFutureErr {
fn from (err: serde_json::Error) -> PgFutureErr {
PgFutureErr::Json (Arc::new (err))}}
impl From<std::num::ParseIntError> for PgFutureErr {
fn from (err: std::num::ParseIntError) -> PgFutureErr {
PgFutureErr::Int (err)}}
impl From<std::num::ParseFloatError> for PgFutureErr {
fn from (err: std::num::ParseFloatError) -> PgFutureErr {
PgFutureErr::Float (err)}}
impl<T> From<SendError<T>> for PgFutureErr {
fn from (err: SendError<T>) -> PgFutureErr {
PgFutureErr::SendError (format! ("{}", err))}}
impl From<nix::Error> for PgFutureErr {
fn from (err: nix::Error) -> PgFutureErr {
PgFutureErr::NixError (err)}}
impl From<&'static str> for PgFutureErr {
fn from (err: &'static str) -> PgFutureErr {
PgFutureErr::Str (err)}}
#[derive(Clone)]
pub struct PgFuture (Arc<PgFutureImpl>);
impl PgFuture {
fn new (id: u64, op: PgOperation) -> PgFuture {
PgFuture (Arc::new (PgFutureImpl {
id: id,
op: op,
sync: Mutex::new (PgFutureSync {
results: Vec::new(),
task: None}),
miscarried: None}))}
fn error (err: PgFutureErr) -> PgFuture {
PgFuture (Arc::new (PgFutureImpl {
id: 0,
op: PgOperation::default(),
sync: Mutex::new (PgFutureSync {
results: Vec::new(),
task: None}),
miscarried: Some (Box::new (err))}))}}
impl fmt::Debug for PgFuture {
fn fmt (&self, ft: &mut fmt::Formatter) -> Result<(), fmt::Error> {
if let Some (ref err) = self.0.miscarried {
write! (ft, "PgFuture ({:?})", err)
} else {
write! (ft, "PgFuture ({}, {:?})", self.0.id, self.0.op)}}}
impl Future for PgFuture {
type Item = Vec<PgResult>;
type Error = PgFutureErr;
fn poll (&mut self) -> Poll<Vec<PgResult>, PgFutureErr> {
if let Some (ref err) = self.0.miscarried {return Err (*err.clone())}
let mut sync = self.0.sync.lock()?;
if sync.results.is_empty() {
sync.task = Some (current());
return Ok (Async::NotReady)}
for (pr, num) in sync.results.iter().zip (0..) {
if let Some (status) = error_in_result (pr.res) {
let status = unsafe {CStr::from_ptr (pq::PQresStatus (status))} .to_str()?;
let sqlstate = unsafe {pq::PQresultErrorField (pr.res, pq::PG_DIAG_SQLSTATE)}; let sqlstate = if sqlstate == null_mut() {""} else {unsafe {CStr::from_ptr (sqlstate)} .to_str()?};
let err = unsafe {CStr::from_ptr (pq::PQresultErrorMessage (pr.res))} .to_str()?;
return Err (PgFutureErr::Sql (PgSqlErr {
imp: self.0.clone(),
num: num,
status: status,
sqlstate: sqlstate.into(),
message: format! ("Error in statement {}: {}; {}", num, status, err)}))}}
let mut res = Vec::with_capacity (sync.results.len());
res.append (&mut sync.results);
Ok (Async::Ready (res))}}
#[derive(Debug)]
enum Message {
Connect (String, u8),
Execute (PgFuture),
EmulateErrorAt (u8, String),
Drop}
#[derive(Debug)]
struct Connection {
#[allow(dead_code)] dsn: String,
handle: *mut pq::PGconn,
in_flight: VecDeque<PgFuture>,
pinned_pending_futures: VecDeque<PgFuture>,
connection_pending: bool,
in_flight_init: bool,
statement_timeout: bool}
impl Connection {
fn free (&self) -> bool {
self.in_flight.is_empty() && !self.in_flight_init}}
fn error_in_result (res: *const pq::PGresult) -> Option<pq::ExecStatusType> {
let status = unsafe {pq::PQresultStatus (res)};
if status != pq::PGRES_COMMAND_OK && status != pq::PGRES_TUPLES_OK {Some (status)} else {None}}
const PIPELINE_LIM_COMMANDS: usize = 128;
const PIPELINE_LIM_BYTES: usize = 16384;
fn reconnect_heuristic (err: &str) -> bool {
err.starts_with ("server closed the connection unexpectedly") ||
err.starts_with ("SSL SYSCALL error")}
fn event_loop (rx: Receiver<Message>, read_end: c_int) {
use std::fmt::Write;
let mut connections: Vec<Connection> = Vec::new();
let mut pending_futures: VecDeque<PgFuture> = VecDeque::new();
let mut sql = String::with_capacity (PIPELINE_LIM_BYTES + 1024);
let mut sql_futures = Vec::with_capacity (PIPELINE_LIM_COMMANDS);
let mut error_at: u8 = 0; let mut error_at_message = String::new();
macro_rules! schedule {($pg_future: ident) => {{
match $pg_future.0.op.scheduling {
PgSchedulingMode::AnythingGoes => pending_futures.push_back ($pg_future),
PgSchedulingMode::PinToConnection (n) => connections[n as usize].pinned_pending_futures.push_back ($pg_future)}}}}
macro_rules! reschedule {($conn: ident, $pg_future: ident) => {{
match $pg_future.0.op.scheduling {
PgSchedulingMode::AnythingGoes => pending_futures.push_back ($pg_future),
PgSchedulingMode::PinToConnection (_) => $conn.pinned_pending_futures.push_back ($pg_future)}}}}
let mut fds = Vec::new();
'event_loop: loop {
{ #[allow(invalid_value)] let mut tmp: [u8; 256] = unsafe {MaybeUninit::uninit().assume_init()};
let _ = nix::unistd::read (unsafe {BorrowedFd::borrow_raw (read_end)}, &mut tmp); }
loop {match rx.try_recv() {
Err (TryRecvError::Disconnected) => break 'event_loop, Err (TryRecvError::Empty) => break,
Ok (message) => match message {
Message::Connect (dsn, mul) => {
let dsn_c = CString::new (&dsn[..]) .expect ("!dsn");
for _ in 0 .. mul {
let conn = unsafe {pq::PQconnectStart (dsn_c.as_ptr())};
if conn == null_mut() {panic! ("!PQconnectStart")}
connections.push (Connection {
dsn: dsn.clone(),
handle: conn,
in_flight: VecDeque::new(),
pinned_pending_futures: VecDeque::new(),
connection_pending: true,
in_flight_init: false,
statement_timeout: false});}},
Message::Execute (pg_future) => schedule! (pg_future),
Message::EmulateErrorAt (at, message) => {error_at = at; error_at_message = message},
Message::Drop => break 'event_loop}}}
fds.clear();
for conn in connections.iter_mut().filter (|conn| conn.connection_pending) {
let status = unsafe {pq::PQstatus (conn.handle)};
if status == pq::CONNECTION_BAD {
unsafe {pq::PQresetStart (conn.handle)}; } else if status == pq::CONNECTION_OK {
let rc = unsafe {pq::PQsetnonblocking (conn.handle, 1)};
if rc != 0 {panic! ("!PQsetnonblocking: {}", error_message (conn.handle))}
let rc = unsafe {pq::PQsendQuery (conn.handle, "SET synchronous_commit = off\0".as_ptr() as *const c_char)};
if rc == 0 {panic! ("!PQsendQuery: {}", error_message (conn.handle))}
conn.connection_pending = false;
conn.in_flight_init = true;
} else {
let sock = unsafe {pq::PQsocket (conn.handle)};
let sock = unsafe {BorrowedFd::borrow_raw (sock)};
match unsafe {pq::PQconnectPoll (conn.handle)} {
pq::PGRES_POLLING_READING => fds.push (PollFd::new (sock, PollFlags::POLLIN)),
pq::PGRES_POLLING_WRITING => fds.push (PollFd::new (sock, PollFlags::POLLOUT)),
_ => ()};}}
for conn in connections.iter_mut().filter (|conn| !conn.connection_pending && conn.free()) {
if pending_futures.is_empty() && conn.pinned_pending_futures.is_empty() {continue}
sql.clear();
sql_futures.clear();
let mut first = true;
let mut statement_timeout_ms: u32 = 0;
loop {
if sql.len() >= PIPELINE_LIM_BYTES {break}
if sql_futures.len() + 1 >= PIPELINE_LIM_COMMANDS {break}
if statement_timeout_ms != 0 && !first {break}
let pending = match pending_futures.pop_front() {
Some (f) => f,
None => match conn.pinned_pending_futures.pop_front() {
Some (f) => f,
None => break}};
if pending.0.op.timeouts_at > 0.0 {
if first {
let now = now_float();
let remains = pending.0.op.timeouts_at - now;
statement_timeout_ms = max ((remains * 1000.0) as i32 + 1, 20) as u32;
conn.statement_timeout = true;
} else {
pending_futures.push_front (pending);
break}}
if first {
first = false;
if statement_timeout_ms != 0 {
write! (&mut sql, "SET statement_timeout = {}; ", statement_timeout_ms) .expect ("!write");
} else if conn.statement_timeout {
sql.push_str ("SET statement_timeout = 0; ")}
} else {
sql.push_str ("; ")}
write! (&mut sql, "BEGIN; SELECT {} AS future_id; ", pending.0.id) .expect ("!write");
let escape_start_len = sql.len();
for piece in pending.0.op.query_pieces.iter() {
match piece {
&PgQueryPiece::Static (ref ss) => sql.push_str (ss),
&PgQueryPiece::Plain (ref plain) => sql.push_str (&plain),
&PgQueryPiece::Literal (ref literal) => {
let esc = unsafe {pq::PQescapeLiteral (conn.handle, literal.as_ptr() as *const c_char, literal.len())};
if esc == null_mut() {panic! ("!PQescapeLiteral: {}", error_message (conn.handle))}
sql.push_str (unsafe {CStr::from_ptr (esc)} .to_str().expect ("!esc"));
unsafe {pq::PQfreemem (esc as *mut c_void)};},
&PgQueryPiece::InlLiteral (ref literal) => {
let esc = unsafe {pq::PQescapeLiteral (conn.handle, literal.as_ptr() as *const c_char, literal.len())};
if esc == null_mut() {panic! ("!PQescapeLiteral: {}", error_message (conn.handle))}
sql.push_str (unsafe {CStr::from_ptr (esc)} .to_str().expect ("!esc"));
unsafe {pq::PQfreemem (esc as *mut c_void)};},
&PgQueryPiece::Bytea (ref bytes) => {
let mut escaped_size: size_t = 0;
let esc = unsafe {pq::PQescapeByteaConn (conn.handle, bytes.as_ptr(), bytes.len(), &mut escaped_size)};
if esc == null_mut() {panic! ("!PQescapeByteaConn: {}", error_message (conn.handle))}
assert! (escaped_size > 0); let esc_slice = unsafe {from_raw_parts (esc, escaped_size - 1)};
sql.push_str (unsafe {from_utf8_unchecked (esc_slice)});
unsafe {pq::PQfreemem (esc as *mut c_void)};}}}
if let Some (ref on_escape) = pending.0.op.on_escape {
on_escape (&sql[escape_start_len..])}
sql.push_str ("; COMMIT");
sql_futures.push (pending)}
let res = unsafe {pq::PQgetResult (conn.handle)};
if res != null_mut() {panic! ("Stray result detected before PQsendQuery! {:?}", res);}
let sql = CString::new (&sql[..]) .expect ("sql !CString");
let rc = unsafe {pq::PQsendQuery (conn.handle, sql.as_ptr())};
if rc == 0 {
let err = error_message (conn.handle);
if reconnect_heuristic (&err) {
unsafe {pq::PQresetStart (conn.handle)};
conn.connection_pending = true;
for future in sql_futures.drain (..) {reschedule! (conn, future)}
continue}
panic! ("!PQsendQuery: {}", error_message (conn.handle))}
conn.in_flight.extend (sql_futures.drain (..));}
'connections_loop: for conn in connections.iter_mut().filter (|conn| !conn.connection_pending) {
if !conn.free() {
let sock = unsafe {pq::PQsocket (conn.handle)};
let rc = unsafe {pq::PQconsumeInput (conn.handle)};
if rc != 1 {
let err = error_message (conn.handle);
if reconnect_heuristic (&err) {
unsafe {pq::PQreset (conn.handle)};
conn.connection_pending = true;
for future in conn.in_flight.drain (..) {reschedule! (conn, future)}
continue 'connections_loop}
panic! ("!PQconsumeInput: {}", err)}
let rc = unsafe {pq::PQflush (conn.handle)};
let sock = unsafe {BorrowedFd::borrow_raw (sock)};
if rc == 1 {fds.push (PollFd::new (sock, PollFlags::POLLOUT))}
loop {
if unsafe {pq::PQisBusy (conn.handle)} == 1 {break}
let mut error = false;
let mut after_future = false; if conn.in_flight_init {
conn.in_flight_init = false;
let res = unsafe {pq::PQgetResult (conn.handle)};
if res == null_mut() {panic! ("Got no result for in_flight_init statement")}
error = error_in_result (res) .is_some()
} else {
let pg_future = match conn.in_flight.pop_front() {Some (f) => f, None => break};
after_future = true;
{ let first_res = unsafe {pq::PQgetResult (conn.handle)};
if first_res == null_mut() {panic! ("No first result for {:?}", pg_future)}
if let Some (error_status) = error_in_result (first_res) {
let sqlstate = unsafe {CStr::from_ptr (pq::PQresultErrorField (first_res, pq::PG_DIAG_SQLSTATE))} .to_str() .expect ("!to_str");
if sqlstate == "57P01" || sqlstate == "57014" {
unsafe {pq::PQresetStart (conn.handle)};
conn.connection_pending = true;
reschedule! (conn, pg_future);
for future in conn.in_flight.drain (..) {reschedule! (conn, future)}
continue 'connections_loop}
let status = unsafe {CStr::from_ptr (pq::PQresStatus (error_status))} .to_str() .expect ("!to_str");
let err = unsafe {CStr::from_ptr (pq::PQresultErrorMessage (first_res))} .to_str() .expect ("!to_str");
panic! ("First statement failed. Probably a syntax error in one of the pipelined SQL statements. {}, {},\n{}", status, sqlstate, err);}
unsafe {pq::PQclear (first_res)} }
if pg_future.0.op.timeouts_at > 0.0 || conn.statement_timeout {
if !(pg_future.0.op.timeouts_at > 0.0) { conn.statement_timeout = false}
let begin_res = unsafe {pq::PQgetResult (conn.handle)};
if begin_res == null_mut() {panic! ("Got no BEGIN result for {:?}", pg_future)}
if let Some (error_status) = error_in_result (begin_res) {
let sqlstate = unsafe {CStr::from_ptr (pq::PQresultErrorField (begin_res, pq::PG_DIAG_SQLSTATE))} .to_str() .expect ("!to_str");
let status = unsafe {CStr::from_ptr (pq::PQresStatus (error_status))} .to_str() .expect ("!to_str");
let err = unsafe {CStr::from_ptr (pq::PQresultErrorMessage (begin_res))} .to_str() .expect ("!to_str");
panic! ("BEGIN failed. {}, {},\n{}", status, sqlstate, err);}}
let mut sync = pg_future.0.sync.lock().expect ("!lock");
if !error { let id_res = unsafe {pq::PQgetResult (conn.handle)};
if id_res == null_mut() {panic! ("Got no ID result for {:?}", pg_future)}
if error_in_result (id_res) .is_some() {panic! ("Error in ID")}
assert_eq! (unsafe {pq::PQntuples (id_res)}, 1);
assert_eq! (unsafe {pq::PQnfields (id_res)}, 1);
let id = unsafe {CStr::from_ptr (pq::PQgetvalue (id_res, 0, 0))} .to_str() .expect ("!to_str");
let id: u64 = id.parse().expect ("!id");
assert_eq! (id, pg_future.0.id); unsafe {pq::PQclear (id_res)} }
let expect_results = pg_future.0.op.statements as usize + 1;
sync.results.reserve_exact (expect_results);
for num in 0..expect_results {
if error {break}
let statement_res = unsafe {pq::PQgetResult (conn.handle)};
if statement_res == null_mut() {panic! ("Got no statement {} result for {:?}", num, pg_future)}
error = error_in_result (statement_res) .is_some();
sync.results.push (PgResult { res: statement_res,
rows: unsafe {pq::PQntuples (statement_res)} as u32,
columns: unsafe {pq::PQnfields (statement_res)} as u32});}
if let Some (ref task) = sync.task {task.notify()}}
if error || (cfg! (test) && error_at == 1) {
if after_future {
let res = unsafe {pq::PQgetResult (conn.handle)};
if res != null_mut() || (cfg! (test) && error_at == 1) {
let err = if cfg! (test) && error_at == 1 {error_at_message.clone()} else {error_message (conn.handle)};
if reconnect_heuristic (&err) {
unsafe {pq::PQresetStart (conn.handle)};
conn.connection_pending = true;
for future in sql_futures.drain (..) {reschedule! (conn, future)}
continue 'connections_loop}
panic! ("Unexpected result after an error. error_message: {}", err)}
let rc = unsafe {pq::PQsendQuery (conn.handle, "ROLLBACK\0".as_ptr() as *const c_char)};
if rc == 0 {panic! ("!PQsendQuery: {}", error_message (conn.handle))}
conn.in_flight_init = true}
for future in conn.in_flight.drain (..) {reschedule! (conn, future)}}}
fds.push (PollFd::new (sock, PollFlags::POLLIN))}}
let read_end = unsafe {BorrowedFd::borrow_raw (read_end)};
fds.push (PollFd::new (read_end, PollFlags::POLLIN));
let rc = poll::poll (&mut fds, 100u16) .expect ("!poll");
if rc == -1 {panic! ("!poll: {}", io::Error::last_os_error())}}
for conn in connections {unsafe {pq::PQfinish (conn.handle)}}}
fn wake_up (write_end: c_int, payload: u8) -> Result<(), String> {
let write_endʹ = unsafe {BorrowedFd::borrow_raw (write_end)};
if let Err (err) = nix::unistd::write (write_endʹ, &[payload]) {
if matches! (err, nix::errno::Errno::EAGAIN) {} else {return ERR! ("!write on write_end {}: {}", write_end, err)}}
Ok(())}
pub struct Cluster {
thread: Option<JoinHandle<()>>,
tx: Mutex<Sender<Message>>,
write_end: c_int,
command_num: Atomic<u64>}
impl Cluster {
pub fn new() -> Result<Cluster, String> {
let (tx, rx) = channel();
let (read_end, write_end) = try_s! (nix::unistd::pipe2 (OFlag::O_NONBLOCK | OFlag::O_CLOEXEC));
let thread = try_s! (std::thread::Builder::new().name ("pg_async".into()) .spawn (move || event_loop (rx, read_end.into_raw_fd())));
Ok (Cluster {
thread: Some (thread),
tx: Mutex::new (tx),
write_end: write_end.into_raw_fd(),
command_num: Atomic::new (0u64)})}
pub fn connect (&self, dsn: String, mul: u8) -> Result<(), String> {
try_s! (try_s! (self.tx.lock()) .send (Message::Connect (dsn, mul)));
try_s! (wake_up (self.write_end, 1));
Ok(())}
pub fn execute<I: IntoQueryPieces> (&self, sql: I) -> PgFuture {
let _ = self.command_num.compare_exchange (u64::max_value(), 0, Ordering::Relaxed, Ordering::Relaxed); let id = self.command_num.fetch_add (1, Ordering::Relaxed) + 1;
let pg_future = PgFuture::new (id, sql.into_query_pieces());
{ let tx = match self.tx.lock() {Ok (k) => k, Err (err) => return PgFuture::error (err.into())};
if let Err (err) = tx.send (Message::Execute (pg_future.clone())) {return PgFuture::error (err.into())}; }
if let Err (err) = wake_up (self.write_end, 2) {panic! ("{}", err)}
pg_future}
#[doc(hidden)]
pub fn emulate_error_at (&self, at: u8, message: String) {
if cfg! (test) {
{ let tx = self.tx.lock().expect ("!lock");
tx.send (Message::EmulateErrorAt (at, message)) .expect ("!send"); }
if let Err (err) = wake_up (self.write_end, 2) {panic! ("{}", err)}}}}
impl Drop for Cluster {
fn drop (&mut self) {
if let Ok (tx) = self.tx.lock() {
let _ = tx.send (Message::Drop);}
let _ = wake_up (self.write_end, 3);
let mut thread = None;
std::mem::swap (&mut thread, &mut self.thread);
if let Some (thread) = thread {let _ = thread.join();}}}
fn error_message (conn: *const pq::PGconn) -> String {
let err = unsafe {pq::PQerrorMessage (conn)};
let err = unsafe {CStr::from_ptr (err)};
err.to_string_lossy().into()}
pub mod pq { use libc::{c_char, c_int, c_uint, c_void, size_t};
pub enum PGconn {}
pub type ConnStatusType = c_uint;
pub const CONNECTION_OK: c_uint = 0;
pub const CONNECTION_BAD: c_uint = 1;
pub const CONNECTION_STARTED: c_uint = 2;
pub const CONNECTION_MADE: c_uint = 3;
pub const CONNECTION_AWAITING_RESPONSE: c_uint = 4;
pub const CONNECTION_AUTH_OK: c_uint = 5;
pub const CONNECTION_SETENV: c_uint = 6;
pub const CONNECTION_SSL_STARTUP: c_uint = 7;
pub const CONNECTION_NEEDED: c_uint = 8;
pub type PostgresPollingStatusType = c_uint;
pub const PGRES_POLLING_FAILED: c_uint = 0;
pub const PGRES_POLLING_READING: c_uint = 1;
pub const PGRES_POLLING_WRITING: c_uint = 2;
pub const PGRES_POLLING_OK: c_uint = 3;
pub const PGRES_POLLING_ACTIVE: c_uint = 4;
pub enum PGresult {}
pub type ExecStatusType = c_uint;
pub const PGRES_EMPTY_QUERY: c_uint = 0;
pub const PGRES_COMMAND_OK: c_uint = 1;
pub const PGRES_TUPLES_OK: c_uint = 2;
pub const PGRES_COPY_OUT: c_uint = 3;
pub const PGRES_COPY_IN: c_uint = 4;
pub const PGRES_BAD_RESPONSE: c_uint = 5;
pub const PGRES_NONFATAL_ERROR: c_uint = 6;
pub const PGRES_FATAL_ERROR: c_uint = 7;
pub const PGRES_COPY_BOTH: c_uint = 8;
pub const PGRES_SINGLE_TUPLE: c_uint = 9;
pub type Oid = c_uint;
pub const PG_DIAG_SQLSTATE: c_int = 'C' as c_int;
#[link(name = "pq")] extern "C" {
pub fn PQisthreadsafe() -> c_int;
pub fn PQconnectdb (conninfo: *const c_char) -> *mut PGconn;
pub fn PQconnectStart (conninfo: *const c_char) -> *mut PGconn;
pub fn PQstatus (conn: *const PGconn) -> ConnStatusType;
pub fn PQreset (conn: *mut PGconn);
pub fn PQresetStart (conn: *mut PGconn) -> c_int;
pub fn PQconnectPoll (conn: *mut PGconn) -> PostgresPollingStatusType;
pub fn PQsocket (conn: *const PGconn) -> c_int;
pub fn PQsetnonblocking (conn: *mut PGconn, arg: c_int) -> c_int;
pub fn PQfinish (conn: *mut PGconn);
pub fn PQescapeLiteral (conn: *const PGconn, str: *const c_char, len: size_t) -> *mut c_char;
pub fn PQescapeByteaConn (conn: *const PGconn, from: *const u8, from_length: size_t, to_length: *mut size_t) -> *mut u8;
pub fn PQunescapeBytea (from: *const u8, to_length: *mut size_t)-> *mut u8;
pub fn PQfreemem (ptr: *mut c_void);
pub fn PQsendQuery (conn: *mut PGconn, command: *const c_char) -> c_int;
pub fn PQflush (conn: *mut PGconn) -> c_int;
pub fn PQconsumeInput (conn: *mut PGconn) -> c_int;
pub fn PQisBusy (conn: *mut PGconn) -> c_int;
pub fn PQgetResult (conn: *mut PGconn) -> *mut PGresult;
pub fn PQresultStatus (res: *const PGresult) -> ExecStatusType;
pub fn PQresStatus (status: ExecStatusType) -> *mut c_char;
pub fn PQresultErrorMessage (res: *const PGresult) -> *mut c_char;
pub fn PQresultErrorField (res: *const PGresult, fieldcode: c_int) -> *mut c_char;
pub fn PQntuples (res: *const PGresult) -> c_int;
pub fn PQnfields (res: *const PGresult) -> c_int;
pub fn PQfname (res: *const PGresult, field_num: c_int) -> *mut c_char;
pub fn PQftype (res: *const PGresult, field_num: c_int) -> Oid;
pub fn PQgetisnull (res: *const PGresult, tup_num: c_int, field_num: c_int) -> c_int;
pub fn PQgetvalue (res: *const PGresult, tup_num: c_int, field_num: c_int) -> *mut c_char;
pub fn PQgetlength (res: *const PGresult, tup_num: c_int, field_num: c_int) -> c_int;
pub fn PQcmdStatus (res: *mut PGresult) -> *mut c_char;
pub fn PQcmdTuples (res: *mut PGresult) -> *mut c_char;
pub fn PQoidValue (res: *const PGresult) -> Oid;
pub fn PQclear (result: *mut PGresult);
pub fn PQerrorMessage (conn: *const PGconn) -> *const c_char;}}