use std::sync::Arc;
use fallible_iterator::FallibleIterator;
use crate::{
BoxedFuture,
client::ClientBorrow,
column::Column,
error::Error,
protocol::message::backend,
query::{RowSimpleStream, RowStream, RowStreamGuarded, RowStreamOwned},
statement::Statement,
};
use super::{Response, sealed};
pub trait IntoResponse: sealed::Sealed + Sized {
type Response;
fn into_response(self, res: Response) -> Self::Response;
}
impl sealed::Sealed for &[Column] {}
impl<'c> IntoResponse for &'c [Column] {
type Response = RowStream<'c>;
#[inline]
fn into_response(self, res: Response) -> Self::Response {
RowStream::new(res, self)
}
}
impl sealed::Sealed for Arc<[Column]> {}
impl IntoResponse for Arc<[Column]> {
type Response = RowStreamOwned;
#[inline]
fn into_response(self, res: Response) -> Self::Response {
RowStreamOwned::new(res, self)
}
}
impl sealed::Sealed for Vec<Column> {}
impl IntoResponse for Vec<Column> {
type Response = RowSimpleStream;
#[inline]
fn into_response(self, res: Response) -> Self::Response {
RowSimpleStream::new(res, self)
}
}
pub struct IntoRowStreamGuard<'a, C>(pub &'a C);
impl<C> sealed::Sealed for IntoRowStreamGuard<'_, C> {}
impl<'c, C> IntoResponse for IntoRowStreamGuard<'c, C>
where
C: ClientBorrow,
{
type Response = RowStreamGuarded<'c, C>;
#[inline]
fn into_response(self, res: Response) -> Self::Response {
RowStreamGuarded::new(res, self.0)
}
}
pub struct NoOpIntoRowStream;
impl sealed::Sealed for NoOpIntoRowStream {}
impl IntoResponse for NoOpIntoRowStream {
type Response = RowStream<'static>;
fn into_response(self, _: Response) -> Self::Response {
unreachable!("no row stream can be generated from no op row stream constructor")
}
}
pub struct StatementCreateResponse<'a, C> {
pub(super) name: String,
pub(super) cli: &'a C,
}
impl<C> sealed::Sealed for StatementCreateResponse<'_, C> {}
impl<'s, C> IntoResponse for StatementCreateResponse<'s, C>
where
C: ClientBorrow + Sync,
{
type Response = BoxedFuture<'s, Result<Statement, Error>>;
fn into_response(self, mut res: Response) -> Self::Response {
Box::pin(async move {
let Self { name, cli } = self;
match res.recv().await? {
backend::Message::ParseComplete => {}
_ => return Err(Error::unexpected()),
}
let parameter_description = match res.recv().await? {
backend::Message::ParameterDescription(body) => body,
_ => return Err(Error::unexpected()),
};
let row_description = match res.recv().await? {
backend::Message::RowDescription(body) => Some(body),
backend::Message::NoData => None,
_ => return Err(Error::unexpected()),
};
let mut it = parameter_description.parameters();
let mut params = Vec::with_capacity(it.size_hint().0);
while let Some(oid) = it.next()? {
let ty = cli.borrow_cli_ref()._get_type(oid).await?;
params.push(ty);
}
let mut columns = Vec::new();
if let Some(row_description) = row_description {
let mut it = row_description.fields();
columns.reserve(it.size_hint().0);
while let Some(field) = it.next()? {
let type_ = cli.borrow_cli_ref()._get_type(field.type_oid()).await?;
let column = Column::new(field.name(), type_);
columns.push(column);
}
}
Ok(Statement::new(name, params, columns))
})
}
}
pub struct StatementCreateResponseBlocking<'a, C> {
pub(super) name: String,
pub(super) cli: &'a C,
}
impl<C> sealed::Sealed for StatementCreateResponseBlocking<'_, C> {}
impl<C> IntoResponse for StatementCreateResponseBlocking<'_, C>
where
C: ClientBorrow,
{
type Response = Result<Statement, Error>;
fn into_response(self, mut res: Response) -> Self::Response {
let Self { name, cli } = self;
match res.blocking_recv()? {
backend::Message::ParseComplete => {}
_ => return Err(Error::unexpected()),
}
let parameter_description = match res.blocking_recv()? {
backend::Message::ParameterDescription(body) => body,
_ => return Err(Error::unexpected()),
};
let row_description = match res.blocking_recv()? {
backend::Message::RowDescription(body) => Some(body),
backend::Message::NoData => None,
_ => return Err(Error::unexpected()),
};
let mut it = parameter_description.parameters();
let mut params = Vec::with_capacity(it.size_hint().0);
while let Some(oid) = it.next()? {
let ty = cli.borrow_cli_ref()._get_type_blocking(oid)?;
params.push(ty);
}
let mut columns = Vec::new();
if let Some(row_description) = row_description {
let mut it = row_description.fields();
columns.reserve(it.size_hint().0);
while let Some(field) = it.next()? {
let type_ = cli.borrow_cli_ref()._get_type_blocking(field.type_oid())?;
let column = Column::new(field.name(), type_);
columns.push(column);
}
}
Ok(Statement::new(name, params, columns))
}
}