use std::sync::Arc;
use crate::connection::stream::PgConnection;
use crate::error::{Error, Result};
use crate::protocol::backend::BackendMessage;
use crate::row::{Row, RowDescription};
pub struct RowStream<'a> {
conn: &'a mut PgConnection,
description: Arc<RowDescription>,
done: bool,
}
impl<'a> RowStream<'a> {
pub(crate) fn new(conn: &'a mut PgConnection, description: Arc<RowDescription>) -> Self {
Self {
conn,
description,
done: false,
}
}
pub async fn next(&mut self) -> Result<Option<Row>> {
if self.done {
return Ok(None);
}
match self.conn.recv().await? {
BackendMessage::DataRow { columns } => {
Ok(Some(Row::new(columns, Arc::clone(&self.description))))
}
BackendMessage::CommandComplete { .. } => {
self.done = true;
drain_until_ready(self.conn).await?;
Ok(None)
}
BackendMessage::EmptyQueryResponse => {
self.done = true;
drain_until_ready(self.conn).await?;
Ok(None)
}
BackendMessage::ErrorResponse { fields } => {
self.done = true;
drain_until_ready(self.conn).await.ok();
Err(Error::server(
fields.severity,
fields.code,
fields.message,
fields.detail,
fields.hint,
fields.position,
))
}
other => {
self.done = true;
Err(Error::protocol(format!(
"unexpected message in row stream: {other:?}"
)))
}
}
}
pub async fn close(mut self) -> Result<()> {
if self.done {
return Ok(());
}
loop {
match self.conn.recv().await? {
BackendMessage::CommandComplete { .. } => {
self.done = true;
drain_until_ready(self.conn).await?;
return Ok(());
}
BackendMessage::ErrorResponse { fields } => {
self.done = true;
drain_until_ready(self.conn).await.ok();
return Err(Error::server(
fields.severity,
fields.code,
fields.message,
fields.detail,
fields.hint,
fields.position,
));
}
_ => {}
}
}
}
pub fn description(&self) -> &RowDescription {
&self.description
}
pub fn is_done(&self) -> bool {
self.done
}
}
async fn drain_until_ready(conn: &mut PgConnection) -> Result<()> {
loop {
if let BackendMessage::ReadyForQuery { .. } = conn.recv().await? {
return Ok(());
}
}
}