Skip to main content

sentinel_driver/
stream.rs

1use std::sync::Arc;
2
3use crate::connection::stream::PgConnection;
4use crate::error::{Error, Result};
5use crate::protocol::backend::BackendMessage;
6use crate::row::{Row, RowDescription};
7
8/// A streaming row-by-row iterator over query results.
9///
10/// Created by [`Connection::query_stream()`]. Yields rows one at a time
11/// via [`next()`](RowStream::next), avoiding full materialization of
12/// large result sets in memory.
13///
14/// The stream holds an exclusive borrow of the connection — no other
15/// queries can run until the stream is dropped or fully consumed.
16///
17/// # Example
18///
19/// ```rust,no_run
20/// # async fn example(conn: &mut sentinel_driver::Connection) -> sentinel_driver::Result<()> {
21/// let mut stream = conn.query_stream("SELECT * FROM users", &[]).await?;
22/// while let Some(row) = stream.next().await? {
23///     let name: String = row.get(0);
24/// }
25/// # Ok(())
26/// # }
27/// ```
28pub struct RowStream<'a> {
29    conn: &'a mut PgConnection,
30    description: Arc<RowDescription>,
31    done: bool,
32}
33
34impl<'a> RowStream<'a> {
35    pub(crate) fn new(conn: &'a mut PgConnection, description: Arc<RowDescription>) -> Self {
36        Self {
37            conn,
38            description,
39            done: false,
40        }
41    }
42
43    /// Fetch the next row from the stream.
44    ///
45    /// Returns `Ok(Some(row))` for each row, `Ok(None)` when the query
46    /// is complete, or `Err` on server/protocol error.
47    pub async fn next(&mut self) -> Result<Option<Row>> {
48        if self.done {
49            return Ok(None);
50        }
51
52        match self.conn.recv().await? {
53            BackendMessage::DataRow { columns } => {
54                Ok(Some(Row::new(columns, Arc::clone(&self.description))))
55            }
56            BackendMessage::CommandComplete { .. } => {
57                self.done = true;
58                // Read ReadyForQuery to leave connection in clean state
59                drain_until_ready(self.conn).await?;
60                Ok(None)
61            }
62            BackendMessage::EmptyQueryResponse => {
63                self.done = true;
64                drain_until_ready(self.conn).await?;
65                Ok(None)
66            }
67            BackendMessage::ErrorResponse { fields } => {
68                self.done = true;
69                drain_until_ready(self.conn).await.ok();
70                Err(Error::server(
71                    fields.severity,
72                    fields.code,
73                    fields.message,
74                    fields.detail,
75                    fields.hint,
76                    fields.position,
77                ))
78            }
79            other => {
80                self.done = true;
81                Err(Error::protocol(format!(
82                    "unexpected message in row stream: {other:?}"
83                )))
84            }
85        }
86    }
87
88    /// Close the stream early, draining any remaining server messages.
89    ///
90    /// Call this instead of dropping when you want to reuse the connection
91    /// for subsequent queries after only partially consuming the stream.
92    pub async fn close(mut self) -> Result<()> {
93        if self.done {
94            return Ok(());
95        }
96
97        // Drain remaining DataRows, CommandComplete, and ReadyForQuery
98        loop {
99            match self.conn.recv().await? {
100                BackendMessage::CommandComplete { .. } => {
101                    self.done = true;
102                    drain_until_ready(self.conn).await?;
103                    return Ok(());
104                }
105                BackendMessage::ErrorResponse { fields } => {
106                    self.done = true;
107                    drain_until_ready(self.conn).await.ok();
108                    return Err(Error::server(
109                        fields.severity,
110                        fields.code,
111                        fields.message,
112                        fields.detail,
113                        fields.hint,
114                        fields.position,
115                    ));
116                }
117                _ => {}
118            }
119        }
120    }
121
122    /// The row description for this stream's columns.
123    pub fn description(&self) -> &RowDescription {
124        &self.description
125    }
126
127    /// Returns `true` if the stream has been fully consumed or closed.
128    pub fn is_done(&self) -> bool {
129        self.done
130    }
131}
132
133/// Drain messages until ReadyForQuery.
134async fn drain_until_ready(conn: &mut PgConnection) -> Result<()> {
135    loop {
136        if let BackendMessage::ReadyForQuery { .. } = conn.recv().await? {
137            return Ok(());
138        }
139    }
140}