#![doc(html_root_url="https://docs.rs/postgres/0.15.1")]
#![warn(missing_docs)]
#![allow(unknown_lints, needless_lifetimes, doc_markdown)]
extern crate bytes;
extern crate fallible_iterator;
#[cfg(not(feature = "no-logging"))]
#[macro_use]
extern crate log;
extern crate postgres_protocol;
extern crate postgres_shared;
use fallible_iterator::FallibleIterator;
use std::cell::{Cell, RefCell};
use std::collections::{VecDeque, HashMap};
use std::fmt;
use std::io;
use std::mem;
use std::result;
use std::sync::Arc;
use std::time::Duration;
use postgres_protocol::authentication;
use postgres_protocol::authentication::sasl::{self, ScramSha256};
use postgres_protocol::message::backend::{self, ErrorFields};
use postgres_protocol::message::frontend;
use postgres_shared::rows::RowData;
use error::{DbError, UNDEFINED_COLUMN, UNDEFINED_TABLE};
use tls::TlsHandshake;
use notification::{Notifications, Notification};
use params::{IntoConnectParams, User};
use priv_io::MessageStream;
use rows::Rows;
use stmt::{Statement, Column};
use transaction::{Transaction, IsolationLevel};
use types::{IsNull, Kind, Type, Oid, ToSql, FromSql, Field, OID, NAME, CHAR};
#[doc(inline)]
pub use postgres_shared::CancelData;
#[doc(inline)]
pub use postgres_shared::{error, types};
#[doc(inline)]
pub use error::Error;
#[macro_use]
mod macros;
mod feature_check;
mod priv_io;
pub mod tls;
pub mod notification;
pub mod params;
pub mod rows;
pub mod stmt;
pub mod transaction;
const TYPEINFO_QUERY: &'static str = "__typeinfo";
const TYPEINFO_ENUM_QUERY: &'static str = "__typeinfo_enum";
const TYPEINFO_COMPOSITE_QUERY: &'static str = "__typeinfo_composite";
pub type Result<T> = result::Result<T, Error>;
pub trait HandleNotice: Send {
fn handle_notice(&mut self, notice: DbError);
}
impl<F: Send + FnMut(DbError)> HandleNotice for F {
fn handle_notice(&mut self, notice: DbError) {
self(notice)
}
}
#[derive(Copy, Clone, Debug)]
pub struct LoggingNoticeHandler;
impl HandleNotice for LoggingNoticeHandler {
fn handle_notice(&mut self, notice: DbError) {
info!("{}: {}", notice.severity, notice.message);
}
}
pub fn cancel_query<T>(params: T, tls: TlsMode, data: &CancelData) -> Result<()>
where
T: IntoConnectParams,
{
let params = params.into_connect_params().map_err(error::connect)?;
let mut socket = priv_io::initialize_stream(¶ms, tls)?;
let mut buf = vec![];
frontend::cancel_request(data.process_id, data.secret_key, &mut buf);
socket.write_all(&buf)?;
socket.flush()?;
Ok(())
}
fn bad_response() -> io::Error {
io::Error::new(
io::ErrorKind::InvalidInput,
"the server returned an unexpected response",
)
}
fn desynchronized() -> io::Error {
io::Error::new(
io::ErrorKind::Other,
"communication with the server has desynchronized due to an earlier IO error",
)
}
#[derive(Debug)]
pub enum TlsMode<'a> {
None,
Prefer(&'a TlsHandshake),
Require(&'a TlsHandshake),
}
#[derive(Debug)]
struct StatementInfo {
name: String,
param_types: Vec<Type>,
columns: Vec<Column>,
}
struct InnerConnection {
stream: MessageStream,
notice_handler: Box<HandleNotice>,
notifications: VecDeque<Notification>,
cancel_data: CancelData,
unknown_types: HashMap<Oid, Type>,
cached_statements: HashMap<String, Arc<StatementInfo>>,
parameters: HashMap<String, String>,
next_stmt_id: u32,
trans_depth: u32,
desynchronized: bool,
finished: bool,
has_typeinfo_query: bool,
has_typeinfo_enum_query: bool,
has_typeinfo_composite_query: bool,
}
impl Drop for InnerConnection {
fn drop(&mut self) {
if !self.finished {
let _ = self.finish_inner();
}
}
}
impl InnerConnection {
fn connect<T>(params: T, tls: TlsMode) -> Result<InnerConnection>
where
T: IntoConnectParams,
{
let params = params.into_connect_params().map_err(error::connect)?;
let stream = priv_io::initialize_stream(¶ms, tls)?;
let user = match params.user() {
Some(user) => user,
None => {
return Err(error::connect(
"user missing from connection parameters".into(),
));
}
};
let mut conn = InnerConnection {
stream: MessageStream::new(stream),
next_stmt_id: 0,
notice_handler: Box::new(LoggingNoticeHandler),
notifications: VecDeque::new(),
cancel_data: CancelData {
process_id: 0,
secret_key: 0,
},
unknown_types: HashMap::new(),
cached_statements: HashMap::new(),
parameters: HashMap::new(),
desynchronized: false,
finished: false,
trans_depth: 0,
has_typeinfo_query: false,
has_typeinfo_enum_query: false,
has_typeinfo_composite_query: false,
};
let mut options = params.options().to_owned();
options.push(("client_encoding".to_owned(), "UTF8".to_owned()));
options.push(("timezone".to_owned(), "GMT".to_owned()));
options.push(("user".to_owned(), user.name().to_owned()));
if let Some(database) = params.database() {
options.push(("database".to_owned(), database.to_owned()));
}
let options = options.iter().map(|&(ref a, ref b)| (&**a, &**b));
conn.stream.write_message(
|buf| frontend::startup_message(options, buf),
)?;
conn.stream.flush()?;
conn.handle_auth(user)?;
loop {
match conn.read_message()? {
backend::Message::BackendKeyData(body) => {
conn.cancel_data.process_id = body.process_id();
conn.cancel_data.secret_key = body.secret_key();
}
backend::Message::ReadyForQuery(_) => break,
backend::Message::ErrorResponse(body) => {
return Err(err(&mut body.fields()));
}
_ => return Err(bad_response().into()),
}
}
Ok(conn)
}
fn read_message_with_notification(&mut self) -> io::Result<backend::Message> {
debug_assert!(!self.desynchronized);
loop {
match try_desync!(self, self.stream.read_message()) {
backend::Message::NoticeResponse(body) => {
if let Ok(err) = DbError::new(&mut body.fields()) {
self.notice_handler.handle_notice(err);
}
}
backend::Message::ParameterStatus(body) => {
self.parameters.insert(
body.name()?.to_owned(),
body.value()?.to_owned(),
);
}
val => return Ok(val),
}
}
}
fn read_message_with_notification_timeout(
&mut self,
timeout: Duration,
) -> io::Result<Option<backend::Message>> {
debug_assert!(!self.desynchronized);
loop {
match try_desync!(self, self.stream.read_message_timeout(timeout)) {
Some(backend::Message::NoticeResponse(body)) => {
if let Ok(err) = Err(err(&mut body.fields())) {
self.notice_handler.handle_notice(err);
}
}
Some(backend::Message::ParameterStatus(body)) => {
self.parameters.insert(
body.name()?.to_owned(),
body.value()?.to_owned(),
);
}
val => return Ok(val),
}
}
}
fn read_message_with_notification_nonblocking(
&mut self,
) -> io::Result<Option<backend::Message>> {
debug_assert!(!self.desynchronized);
loop {
match try_desync!(self, self.stream.read_message_nonblocking()) {
Some(backend::Message::NoticeResponse(body)) => {
if let Ok(err) = Err(err(&mut body.fields())) {
self.notice_handler.handle_notice(err);
}
}
Some(backend::Message::ParameterStatus(body)) => {
self.parameters.insert(
body.name()?.to_owned(),
body.value()?.to_owned(),
);
}
val => return Ok(val),
}
}
}
fn read_message(&mut self) -> io::Result<backend::Message> {
loop {
match self.read_message_with_notification()? {
backend::Message::NotificationResponse(body) => {
self.notifications.push_back(Notification {
process_id: body.process_id(),
channel: body.channel()?.to_owned(),
payload: body.message()?.to_owned(),
})
}
val => return Ok(val),
}
}
}
fn handle_auth(&mut self, user: &User) -> Result<()> {
match self.read_message()? {
backend::Message::AuthenticationOk => return Ok(()),
backend::Message::AuthenticationCleartextPassword => {
let pass = user.password().ok_or_else(|| {
error::connect("a password was requested but not provided".into())
})?;
self.stream.write_message(
|buf| frontend::password_message(pass, buf),
)?;
self.stream.flush()?;
}
backend::Message::AuthenticationMd5Password(body) => {
let pass = user.password().ok_or_else(|| {
error::connect("a password was requested but not provided".into())
})?;
let output =
authentication::md5_hash(user.name().as_bytes(), pass.as_bytes(), body.salt());
self.stream.write_message(
|buf| frontend::password_message(&output, buf),
)?;
self.stream.flush()?;
}
backend::Message::AuthenticationSasl(body) => {
if body.mechanisms()
.filter(|m| *m == sasl::SCRAM_SHA_256)
.count()? == 0
{
return Err(
io::Error::new(io::ErrorKind::Other, "unsupported authentication")
.into(),
);
}
let pass = user.password().ok_or_else(|| {
error::connect("a password was requested but not provided".into())
})?;
let mut scram = ScramSha256::new(pass.as_bytes())?;
self.stream.write_message(|buf| {
frontend::sasl_initial_response(sasl::SCRAM_SHA_256, scram.message(), buf)
})?;
self.stream.flush()?;
let body = match self.read_message()? {
backend::Message::AuthenticationSaslContinue(body) => body,
backend::Message::ErrorResponse(body) => return Err(err(&mut body.fields())),
_ => return Err(bad_response().into()),
};
scram.update(body.data())?;
self.stream.write_message(|buf| {
frontend::sasl_response(scram.message(), buf)
})?;
self.stream.flush()?;
let body = match self.read_message()? {
backend::Message::AuthenticationSaslFinal(body) => body,
backend::Message::ErrorResponse(body) => return Err(err(&mut body.fields())),
_ => return Err(bad_response().into()),
};
scram.finish(body.data())?;
}
backend::Message::AuthenticationKerberosV5 |
backend::Message::AuthenticationScmCredential |
backend::Message::AuthenticationGss |
backend::Message::AuthenticationSspi => {
return Err(
io::Error::new(io::ErrorKind::Other, "unsupported authentication").into(),
)
}
backend::Message::ErrorResponse(body) => return Err(err(&mut body.fields())),
_ => return Err(bad_response().into()),
}
match self.read_message()? {
backend::Message::AuthenticationOk => Ok(()),
backend::Message::ErrorResponse(body) => Err(err(&mut body.fields())),
_ => Err(bad_response().into()),
}
}
fn set_notice_handler(&mut self, handler: Box<HandleNotice>) -> Box<HandleNotice> {
mem::replace(&mut self.notice_handler, handler)
}
fn raw_prepare(&mut self, stmt_name: &str, query: &str) -> Result<(Vec<Type>, Vec<Column>)> {
debug!("preparing query with name `{}`: {}", stmt_name, query);
self.stream.write_message(|buf| {
frontend::parse(stmt_name, query, None, buf)
})?;
self.stream.write_message(
|buf| frontend::describe(b'S', stmt_name, buf),
)?;
self.stream.write_message(
|buf| Ok::<(), io::Error>(frontend::sync(buf)),
)?;
self.stream.flush()?;
match self.read_message()? {
backend::Message::ParseComplete => {}
backend::Message::ErrorResponse(body) => {
self.wait_for_ready()?;
return Err(err(&mut body.fields()));
}
_ => bad_response!(self),
}
let raw_param_types = match self.read_message()? {
backend::Message::ParameterDescription(body) => body,
_ => bad_response!(self),
};
let raw_columns = match self.read_message()? {
backend::Message::RowDescription(body) => Some(body),
backend::Message::NoData => None,
_ => bad_response!(self),
};
self.wait_for_ready()?;
let param_types = raw_param_types
.parameters()
.map_err(Into::into)
.and_then(|oid| self.get_type(oid))
.collect()?;
let columns = match raw_columns {
Some(body) => {
body.fields()
.and_then(|field| {
Ok(Column::new(
field.name().to_owned(),
self.get_type(field.type_oid())?,
))
})
.collect()?
}
None => vec![],
};
Ok((param_types, columns))
}
fn read_rows<F>(&mut self, mut consumer: F) -> Result<bool>
where
F: FnMut(RowData),
{
let more_rows;
loop {
match self.read_message()? {
backend::Message::EmptyQueryResponse |
backend::Message::CommandComplete(_) => {
more_rows = false;
break;
}
backend::Message::PortalSuspended => {
more_rows = true;
break;
}
backend::Message::DataRow(body) => consumer(RowData::new(body)?),
backend::Message::ErrorResponse(body) => {
self.wait_for_ready()?;
return Err(err(&mut body.fields()));
}
backend::Message::CopyInResponse(_) => {
self.stream.write_message(|buf| {
frontend::copy_fail("COPY queries cannot be directly executed", buf)
})?;
self.stream.write_message(
|buf| Ok::<(), io::Error>(frontend::sync(buf)),
)?;
self.stream.flush()?;
}
backend::Message::CopyOutResponse(_) => {
loop {
if let backend::Message::ReadyForQuery(_) = self.read_message()? {
break;
}
}
return Err(
io::Error::new(
io::ErrorKind::InvalidInput,
"COPY queries cannot be directly \
executed",
).into(),
);
}
_ => {
self.desynchronized = true;
return Err(bad_response().into());
}
}
}
self.wait_for_ready()?;
Ok(more_rows)
}
fn raw_execute(
&mut self,
stmt_name: &str,
portal_name: &str,
row_limit: i32,
param_types: &[Type],
params: &[&ToSql],
) -> Result<()> {
assert!(
param_types.len() == params.len(),
"expected {} parameters but got {}",
param_types.len(),
params.len()
);
debug!(
"executing statement {} with parameters: {:?}",
stmt_name,
params
);
{
let r = self.stream.write_message(|buf| {
frontend::bind(
portal_name,
stmt_name,
Some(1),
params.iter().zip(param_types),
|(param, ty), buf| match param.to_sql_checked(ty, buf) {
Ok(IsNull::Yes) => Ok(postgres_protocol::IsNull::Yes),
Ok(IsNull::No) => Ok(postgres_protocol::IsNull::No),
Err(e) => Err(e),
},
Some(1),
buf,
)
});
match r {
Ok(()) => {}
Err(frontend::BindError::Conversion(e)) => {
return Err(error::conversion(e));
}
Err(frontend::BindError::Serialization(e)) => return Err(e.into()),
}
}
self.stream.write_message(|buf| {
frontend::execute(portal_name, row_limit, buf)
})?;
self.stream.write_message(
|buf| Ok::<(), io::Error>(frontend::sync(buf)),
)?;
self.stream.flush()?;
match self.read_message()? {
backend::Message::BindComplete => Ok(()),
backend::Message::ErrorResponse(body) => {
self.wait_for_ready()?;
Err(err(&mut body.fields()))
}
_ => {
self.desynchronized = true;
Err(bad_response().into())
}
}
}
fn make_stmt_name(&mut self) -> String {
let stmt_name = format!("s{}", self.next_stmt_id);
self.next_stmt_id += 1;
stmt_name
}
fn prepare<'a>(&mut self, query: &str, conn: &'a Connection) -> Result<Statement<'a>> {
let stmt_name = self.make_stmt_name();
let (param_types, columns) = self.raw_prepare(&stmt_name, query)?;
let info = Arc::new(StatementInfo {
name: stmt_name,
param_types: param_types,
columns: columns,
});
Ok(Statement::new(conn, info, Cell::new(0), false))
}
fn prepare_cached<'a>(&mut self, query: &str, conn: &'a Connection) -> Result<Statement<'a>> {
let info = self.cached_statements.get(query).cloned();
let info = match info {
Some(info) => info,
None => {
let stmt_name = self.make_stmt_name();
let (param_types, columns) = self.raw_prepare(&stmt_name, query)?;
let info = Arc::new(StatementInfo {
name: stmt_name,
param_types: param_types,
columns: columns,
});
self.cached_statements.insert(
query.to_owned(),
info.clone(),
);
info
}
};
Ok(Statement::new(conn, info, Cell::new(0), true))
}
fn close_statement(&mut self, name: &str, type_: u8) -> Result<()> {
self.stream.write_message(
|buf| frontend::close(type_, name, buf),
)?;
self.stream.write_message(
|buf| Ok::<(), io::Error>(frontend::sync(buf)),
)?;
self.stream.flush()?;
let resp = match self.read_message()? {
backend::Message::CloseComplete => Ok(()),
backend::Message::ErrorResponse(body) => Err(err(&mut body.fields())),
_ => bad_response!(self),
};
self.wait_for_ready()?;
resp
}
fn get_type(&mut self, oid: Oid) -> Result<Type> {
if let Some(ty) = Type::from_oid(oid) {
return Ok(ty);
}
if let Some(ty) = self.unknown_types.get(&oid) {
return Ok(ty.clone());
}
let ty = self.read_type(oid)?;
self.unknown_types.insert(oid, ty.clone());
Ok(ty)
}
fn setup_typeinfo_query(&mut self) -> Result<()> {
if self.has_typeinfo_query {
return Ok(());
}
match self.raw_prepare(
TYPEINFO_QUERY,
"SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, \
t.typbasetype, n.nspname, t.typrelid \
FROM pg_catalog.pg_type t \
LEFT OUTER JOIN pg_catalog.pg_range r ON \
r.rngtypid = t.oid \
INNER JOIN pg_catalog.pg_namespace n ON \
t.typnamespace = n.oid \
WHERE t.oid = $1",
) {
Ok(..) => {}
Err(ref e) if e.code() == Some(&UNDEFINED_TABLE) => {
self.raw_prepare(
TYPEINFO_QUERY,
"SELECT t.typname, t.typtype, t.typelem, NULL::OID, \
t.typbasetype, n.nspname, t.typrelid \
FROM pg_catalog.pg_type t \
INNER JOIN pg_catalog.pg_namespace n \
ON t.typnamespace = n.oid \
WHERE t.oid = $1",
)?;
}
Err(e) => return Err(e),
}
self.has_typeinfo_query = true;
Ok(())
}
#[allow(if_not_else)]
fn read_type(&mut self, oid: Oid) -> Result<Type> {
self.setup_typeinfo_query()?;
self.raw_execute(TYPEINFO_QUERY, "", 0, &[OID], &[&oid])?;
let mut row = None;
self.read_rows(|r| row = Some(r))?;
let get_raw = |i: usize| row.as_ref().and_then(|r| r.get(i));
let (name, type_, elem_oid, rngsubtype, basetype, schema, relid) = {
let name = String::from_sql_nullable(&NAME, get_raw(0)).map_err(
error::conversion,
)?;
let type_ = i8::from_sql_nullable(&CHAR, get_raw(1)).map_err(
error::conversion,
)?;
let elem_oid = Oid::from_sql_nullable(&OID, get_raw(2)).map_err(
error::conversion,
)?;
let rngsubtype = Option::<Oid>::from_sql_nullable(&OID, get_raw(3)).map_err(
error::conversion,
)?;
let basetype = Oid::from_sql_nullable(&OID, get_raw(4)).map_err(
error::conversion,
)?;
let schema = String::from_sql_nullable(&NAME, get_raw(5)).map_err(
error::conversion,
)?;
let relid = Oid::from_sql_nullable(&OID, get_raw(6)).map_err(
error::conversion,
)?;
(name, type_, elem_oid, rngsubtype, basetype, schema, relid)
};
let kind = if type_ == b'e' as i8 {
Kind::Enum(self.read_enum_variants(oid)?)
} else if type_ == b'p' as i8 {
Kind::Pseudo
} else if basetype != 0 {
Kind::Domain(self.get_type(basetype)?)
} else if elem_oid != 0 {
Kind::Array(self.get_type(elem_oid)?)
} else if relid != 0 {
Kind::Composite(self.read_composite_fields(relid)?)
} else {
match rngsubtype {
Some(oid) => Kind::Range(self.get_type(oid)?),
None => Kind::Simple,
}
};
Ok(Type::_new(name, oid, kind, schema))
}
fn setup_typeinfo_enum_query(&mut self) -> Result<()> {
if self.has_typeinfo_enum_query {
return Ok(());
}
match self.raw_prepare(
TYPEINFO_ENUM_QUERY,
"SELECT enumlabel \
FROM pg_catalog.pg_enum \
WHERE enumtypid = $1 \
ORDER BY enumsortorder",
) {
Ok(..) => {}
Err(ref e) if e.code() == Some(&UNDEFINED_COLUMN) => {
self.raw_prepare(
TYPEINFO_ENUM_QUERY,
"SELECT enumlabel \
FROM pg_catalog.pg_enum \
WHERE enumtypid = $1 \
ORDER BY oid",
)?;
}
Err(e) => return Err(e),
}
self.has_typeinfo_enum_query = true;
Ok(())
}
fn read_enum_variants(&mut self, oid: Oid) -> Result<Vec<String>> {
self.setup_typeinfo_enum_query()?;
self.raw_execute(
TYPEINFO_ENUM_QUERY,
"",
0,
&[OID],
&[&oid],
)?;
let mut rows = vec![];
self.read_rows(|row| rows.push(row))?;
let mut variants = vec![];
for row in rows {
variants.push(String::from_sql_nullable(&NAME, row.get(0)).map_err(
error::conversion,
)?);
}
Ok(variants)
}
fn setup_typeinfo_composite_query(&mut self) -> Result<()> {
if self.has_typeinfo_composite_query {
return Ok(());
}
self.raw_prepare(
TYPEINFO_COMPOSITE_QUERY,
"SELECT attname, atttypid \
FROM pg_catalog.pg_attribute \
WHERE attrelid = $1 \
AND NOT attisdropped \
AND attnum > 0 \
ORDER BY attnum",
)?;
self.has_typeinfo_composite_query = true;
Ok(())
}
fn read_composite_fields(&mut self, relid: Oid) -> Result<Vec<Field>> {
self.setup_typeinfo_composite_query()?;
self.raw_execute(
TYPEINFO_COMPOSITE_QUERY,
"",
0,
&[OID],
&[&relid],
)?;
let mut rows = vec![];
self.read_rows(|row| rows.push(row))?;
let mut fields = vec![];
for row in rows {
let (name, type_) = {
let name = String::from_sql_nullable(&NAME, row.get(0)).map_err(
error::conversion,
)?;
let type_ = Oid::from_sql_nullable(&OID, row.get(1)).map_err(
error::conversion,
)?;
(name, type_)
};
let type_ = self.get_type(type_)?;
fields.push(Field::new(name, type_));
}
Ok(fields)
}
fn is_desynchronized(&self) -> bool {
self.desynchronized
}
#[allow(needless_return)]
fn wait_for_ready(&mut self) -> Result<()> {
match self.read_message()? {
backend::Message::ReadyForQuery(_) => Ok(()),
_ => bad_response!(self),
}
}
fn quick_query(&mut self, query: &str) -> Result<Vec<Vec<Option<String>>>> {
check_desync!(self);
debug!("executing query: {}", query);
self.stream.write_message(|buf| frontend::query(query, buf))?;
self.stream.flush()?;
let mut result = vec![];
loop {
match self.read_message()? {
backend::Message::ReadyForQuery(_) => break,
backend::Message::DataRow(body) => {
let row = body.ranges()
.map(|r| {
r.map(|r| String::from_utf8_lossy(&body.buffer()[r]).into_owned())
})
.collect()?;
result.push(row);
}
backend::Message::CopyInResponse(_) => {
self.stream.write_message(|buf| {
frontend::copy_fail("COPY queries cannot be directly executed", buf)
})?;
self.stream.write_message(
|buf| Ok::<(), io::Error>(frontend::sync(buf)),
)?;
self.stream.flush()?;
}
backend::Message::ErrorResponse(body) => {
self.wait_for_ready()?;
return Err(err(&mut body.fields()));
}
_ => {}
}
}
Ok(result)
}
fn finish_inner(&mut self) -> Result<()> {
check_desync!(self);
self.stream.write_message(|buf| {
Ok::<(), io::Error>(frontend::terminate(buf))
})?;
self.stream.flush()?;
Ok(())
}
}
fn _ensure_send() {
fn _is_send<T: Send>() {}
_is_send::<Connection>();
}
pub struct Connection(RefCell<InnerConnection>);
impl fmt::Debug for Connection {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let conn = self.0.borrow();
fmt.debug_struct("Connection")
.field("stream", &conn.stream.get_ref())
.field("cancel_data", &conn.cancel_data)
.field("notifications", &conn.notifications.len())
.field("transaction_depth", &conn.trans_depth)
.field("desynchronized", &conn.desynchronized)
.field("cached_statements", &conn.cached_statements.len())
.finish()
}
}
impl Connection {
pub fn connect<T>(params: T, tls: TlsMode) -> Result<Connection>
where
T: IntoConnectParams,
{
InnerConnection::connect(params, tls).map(|conn| Connection(RefCell::new(conn)))
}
pub fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64> {
let (param_types, columns) = self.0.borrow_mut().raw_prepare("", query)?;
let info = Arc::new(StatementInfo {
name: String::new(),
param_types: param_types,
columns: columns,
});
let stmt = Statement::new(self, info, Cell::new(0), true);
stmt.execute(params)
}
pub fn query(&self, query: &str, params: &[&ToSql]) -> Result<Rows> {
let (param_types, columns) = self.0.borrow_mut().raw_prepare("", query)?;
let info = Arc::new(StatementInfo {
name: String::new(),
param_types: param_types,
columns: columns,
});
let stmt = Statement::new(self, info, Cell::new(0), true);
stmt.into_query(params)
}
pub fn transaction<'a>(&'a self) -> Result<Transaction<'a>> {
self.transaction_with(&transaction::Config::new())
}
pub fn transaction_with<'a>(&'a self, config: &transaction::Config) -> Result<Transaction<'a>> {
let mut conn = self.0.borrow_mut();
check_desync!(conn);
assert!(
conn.trans_depth == 0,
"`transaction` must be called on the active transaction"
);
let mut query = "BEGIN".to_owned();
config.build_command(&mut query);
conn.quick_query(&query)?;
conn.trans_depth += 1;
Ok(Transaction::new(self, 1))
}
pub fn prepare<'a>(&'a self, query: &str) -> Result<Statement<'a>> {
self.0.borrow_mut().prepare(query, self)
}
pub fn prepare_cached<'a>(&'a self, query: &str) -> Result<Statement<'a>> {
self.0.borrow_mut().prepare_cached(query, self)
}
pub fn transaction_isolation(&self) -> Result<IsolationLevel> {
let mut conn = self.0.borrow_mut();
check_desync!(conn);
let result = conn.quick_query("SHOW TRANSACTION ISOLATION LEVEL")?;
IsolationLevel::new(result[0][0].as_ref().unwrap())
}
pub fn set_transaction_config(&self, config: &transaction::Config) -> Result<()> {
let mut command = "SET SESSION CHARACTERISTICS AS TRANSACTION".to_owned();
config.build_command(&mut command);
self.batch_execute(&command)
}
pub fn batch_execute(&self, query: &str) -> Result<()> {
self.0.borrow_mut().quick_query(query).map(|_| ())
}
pub fn notifications<'a>(&'a self) -> Notifications<'a> {
Notifications::new(self)
}
pub fn cancel_data(&self) -> CancelData {
self.0.borrow().cancel_data
}
pub fn parameter(&self, param: &str) -> Option<String> {
self.0.borrow().parameters.get(param).cloned()
}
pub fn set_notice_handler(&self, handler: Box<HandleNotice>) -> Box<HandleNotice> {
self.0.borrow_mut().set_notice_handler(handler)
}
pub fn is_desynchronized(&self) -> bool {
self.0.borrow().is_desynchronized()
}
pub fn is_active(&self) -> bool {
self.0.borrow().trans_depth == 0
}
pub fn finish(self) -> Result<()> {
let mut conn = self.0.borrow_mut();
conn.finished = true;
conn.finish_inner()
}
}
pub trait GenericConnection {
fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64>;
fn query<'a>(&'a self, query: &str, params: &[&ToSql]) -> Result<Rows>;
fn prepare<'a>(&'a self, query: &str) -> Result<Statement<'a>>;
fn prepare_cached<'a>(&'a self, query: &str) -> Result<Statement<'a>>;
fn transaction<'a>(&'a self) -> Result<Transaction<'a>>;
fn batch_execute(&self, query: &str) -> Result<()>;
fn is_active(&self) -> bool;
}
impl GenericConnection for Connection {
fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64> {
self.execute(query, params)
}
fn query<'a>(&'a self, query: &str, params: &[&ToSql]) -> Result<Rows> {
self.query(query, params)
}
fn prepare<'a>(&'a self, query: &str) -> Result<Statement<'a>> {
self.prepare(query)
}
fn prepare_cached<'a>(&'a self, query: &str) -> Result<Statement<'a>> {
self.prepare_cached(query)
}
fn transaction<'a>(&'a self) -> Result<Transaction<'a>> {
self.transaction()
}
fn batch_execute(&self, query: &str) -> Result<()> {
self.batch_execute(query)
}
fn is_active(&self) -> bool {
self.is_active()
}
}
impl<'a> GenericConnection for Transaction<'a> {
fn execute(&self, query: &str, params: &[&ToSql]) -> Result<u64> {
self.execute(query, params)
}
fn query<'b>(&'b self, query: &str, params: &[&ToSql]) -> Result<Rows> {
self.query(query, params)
}
fn prepare<'b>(&'b self, query: &str) -> Result<Statement<'b>> {
self.prepare(query)
}
fn prepare_cached<'b>(&'b self, query: &str) -> Result<Statement<'b>> {
self.prepare_cached(query)
}
fn transaction<'b>(&'b self) -> Result<Transaction<'b>> {
self.transaction()
}
fn batch_execute(&self, query: &str) -> Result<()> {
self.batch_execute(query)
}
fn is_active(&self) -> bool {
self.is_active()
}
}
fn err(fields: &mut ErrorFields) -> Error {
match DbError::new(fields) {
Ok(err) => error::db(err),
Err(err) => err.into(),
}
}