use std::sync::Arc;
use futures_core::future::BoxFuture;
use crate::connection::ConnectionSource;
use crate::cursor::Cursor;
use crate::executor::Execute;
use crate::pool::Pool;
use crate::postgres::protocol::{DataRow, Message, ReadyForQuery, RowDescription};
use crate::postgres::row::Statement;
use crate::postgres::{PgArguments, PgConnection, PgRow, Postgres};
pub struct PgCursor<'c, 'q> {
source: ConnectionSource<'c, PgConnection>,
query: Option<(&'q str, Option<PgArguments>)>,
statement: Arc<Statement>,
}
impl crate::cursor::private::Sealed for PgCursor<'_, '_> {}
impl<'c, 'q> Cursor<'c, 'q> for PgCursor<'c, 'q> {
type Database = Postgres;
#[doc(hidden)]
fn from_pool<E>(pool: &Pool<PgConnection>, query: E) -> Self
where
Self: Sized,
E: Execute<'q, Postgres>,
{
Self {
source: ConnectionSource::Pool(pool.clone()),
statement: Arc::default(),
query: Some(query.into_parts()),
}
}
#[doc(hidden)]
fn from_connection<E>(conn: &'c mut PgConnection, query: E) -> Self
where
Self: Sized,
E: Execute<'q, Postgres>,
{
Self {
source: ConnectionSource::ConnectionRef(conn),
statement: Arc::default(),
query: Some(query.into_parts()),
}
}
fn next(&mut self) -> BoxFuture<crate::Result<Option<PgRow<'_>>>> {
Box::pin(next(self))
}
}
async fn next<'a, 'c: 'a, 'q: 'a>(
cursor: &'a mut PgCursor<'c, 'q>,
) -> crate::Result<Option<PgRow<'a>>> {
let mut conn = cursor.source.resolve().await?;
if let Some((query, arguments)) = cursor.query.take() {
let statement = conn.run(query, arguments).await?;
if let Some(statement) = statement {
cursor.statement = Arc::clone(&conn.cache_statement[&statement]);
}
}
loop {
match conn.stream.receive().await? {
Message::ParseComplete | Message::BindComplete => {}
Message::CommandComplete => {}
Message::ReadyForQuery => {
let _ready = ReadyForQuery::read(conn.stream.buffer())?;
conn.is_ready = true;
break;
}
Message::RowDescription => {
let rd = RowDescription::read(conn.stream.buffer())?;
cursor.statement = Arc::new(
conn.parse_row_description(rd, Default::default(), None, false)
.await?,
);
}
Message::DataRow => {
let data = DataRow::read(conn.stream.buffer(), &mut conn.current_row_values)?;
return Ok(Some(PgRow {
statement: Arc::clone(&cursor.statement),
data,
}));
}
message => {
return Err(protocol_err!("next: unexpected message: {:?}", message).into());
}
}
}
Ok(None)
}