use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::query::extract_row_affected;
use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
use bytes::Bytes;
use fallible_iterator::FallibleIterator;
use futures_util::{ready, Stream};
use log::debug;
use pin_project_lite::pin_project;
use postgres_protocol::message::backend::Message;
use postgres_protocol::message::frontend;
use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[derive(Debug)]
pub struct SimpleColumn {
name: String,
}
impl SimpleColumn {
pub(crate) fn new(name: String) -> SimpleColumn {
SimpleColumn { name }
}
pub fn name(&self) -> &str {
&self.name
}
}
pub fn simple_query_direct(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
debug!("executing simple query: {}", query);
let buf = encode(client, query)?;
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
Ok(SimpleQueryStream {
responses,
columns: None,
_p: PhantomPinned,
})
}
pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
debug!("executing simple query: {}", query);
let buf = encode(client, query)?;
let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
Ok(SimpleQueryStream {
responses,
columns: None,
_p: PhantomPinned,
})
}
pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
debug!("executing statement batch: {}", query);
let buf = encode(client, query)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
loop {
match responses.next().await? {
Message::ReadyForQuery(_) => return Ok(()),
Message::CommandComplete(_)
| Message::EmptyQueryResponse
| Message::RowDescription(_)
| Message::DataRow(_) => {}
_ => return Err(Error::unexpected_message()),
}
}
}
fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
client.with_buf(|buf| {
frontend::query(query, buf).map_err(Error::encode)?;
Ok(buf.split().freeze())
})
}
pin_project! {
pub struct SimpleQueryStream {
responses: Responses,
columns: Option<Arc<[SimpleColumn]>>,
#[pin]
_p: PhantomPinned,
}
}
pub struct SimpleQueryStreamCollect {
pub(crate) output: Vec<SimpleQueryMessage>,
pub(crate) inner: Result<SimpleQueryStream, Option<Error>>,
}
impl Future for SimpleQueryStreamCollect {
type Output = Result<Vec<SimpleQueryMessage>, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
match &mut this.inner {
Ok(inner) => loop {
let inner = unsafe { Pin::new_unchecked(&mut *inner) };
match ready!(inner.poll_next(cx)) {
Some(Ok(row)) => this.output.push(row),
Some(Err(err)) => return Poll::Ready(Err(err)),
None => return Poll::Ready(Ok(std::mem::take(&mut this.output))),
}
},
Err(err) => {
return Poll::Ready(Err(err.take().unwrap()));
}
}
}
}
impl Stream for SimpleQueryStream {
type Item = Result<SimpleQueryMessage, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
loop {
match ready!(this.responses.poll_next(cx)?) {
Message::CommandComplete(body) => {
let rows = extract_row_affected(&body)?;
return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))));
}
Message::EmptyQueryResponse => {
return Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))));
}
Message::RowDescription(body) => {
let columns = body
.fields()
.map(|f| Ok(SimpleColumn::new(f.name().to_string())))
.collect::<Vec<_>>()
.map_err(Error::parse)?
.into();
*this.columns = Some(columns);
}
Message::DataRow(body) => {
let row = match &this.columns {
Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
};
return Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))));
}
Message::ReadyForQuery(_) => return Poll::Ready(None),
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
}
}
}
}