sentinel_driver/stream.rs
1use std::sync::Arc;
2
3use crate::connection::stream::PgConnection;
4use crate::error::{Error, Result};
5use crate::protocol::backend::BackendMessage;
6use crate::row::{Row, RowDescription};
7
8/// A streaming row-by-row iterator over query results.
9///
10/// Created by [`Connection::query_stream()`]. Yields rows one at a time
11/// via [`next()`](RowStream::next), avoiding full materialization of
12/// large result sets in memory.
13///
14/// The stream holds an exclusive borrow of the connection — no other
15/// queries can run until the stream is dropped or fully consumed.
16///
17/// # Example
18///
19/// ```rust,no_run
20/// # async fn example(conn: &mut sentinel_driver::Connection) -> sentinel_driver::Result<()> {
21/// let mut stream = conn.query_stream("SELECT * FROM users", &[]).await?;
22/// while let Some(row) = stream.next().await? {
23/// let name: String = row.get(0);
24/// }
25/// # Ok(())
26/// # }
27/// ```
28pub struct RowStream<'a> {
29 conn: &'a mut PgConnection,
30 description: Arc<RowDescription>,
31 done: bool,
32}
33
34impl<'a> RowStream<'a> {
35 pub(crate) fn new(conn: &'a mut PgConnection, description: Arc<RowDescription>) -> Self {
36 Self {
37 conn,
38 description,
39 done: false,
40 }
41 }
42
43 /// Fetch the next row from the stream.
44 ///
45 /// Returns `Ok(Some(row))` for each row, `Ok(None)` when the query
46 /// is complete, or `Err` on server/protocol error.
47 pub async fn next(&mut self) -> Result<Option<Row>> {
48 if self.done {
49 return Ok(None);
50 }
51
52 match self.conn.recv().await? {
53 BackendMessage::DataRow { columns } => {
54 Ok(Some(Row::new(columns, Arc::clone(&self.description))))
55 }
56 BackendMessage::CommandComplete { .. } => {
57 self.done = true;
58 // Read ReadyForQuery to leave connection in clean state
59 drain_until_ready(self.conn).await?;
60 Ok(None)
61 }
62 BackendMessage::EmptyQueryResponse => {
63 self.done = true;
64 drain_until_ready(self.conn).await?;
65 Ok(None)
66 }
67 BackendMessage::ErrorResponse { fields } => {
68 self.done = true;
69 drain_until_ready(self.conn).await.ok();
70 Err(Error::server(
71 fields.severity,
72 fields.code,
73 fields.message,
74 fields.detail,
75 fields.hint,
76 fields.position,
77 ))
78 }
79 other => {
80 self.done = true;
81 Err(Error::protocol(format!(
82 "unexpected message in row stream: {other:?}"
83 )))
84 }
85 }
86 }
87
88 /// Close the stream early, draining any remaining server messages.
89 ///
90 /// Call this instead of dropping when you want to reuse the connection
91 /// for subsequent queries after only partially consuming the stream.
92 pub async fn close(mut self) -> Result<()> {
93 if self.done {
94 return Ok(());
95 }
96
97 // Drain remaining DataRows, CommandComplete, and ReadyForQuery
98 loop {
99 match self.conn.recv().await? {
100 BackendMessage::CommandComplete { .. } => {
101 self.done = true;
102 drain_until_ready(self.conn).await?;
103 return Ok(());
104 }
105 BackendMessage::ErrorResponse { fields } => {
106 self.done = true;
107 drain_until_ready(self.conn).await.ok();
108 return Err(Error::server(
109 fields.severity,
110 fields.code,
111 fields.message,
112 fields.detail,
113 fields.hint,
114 fields.position,
115 ));
116 }
117 _ => {}
118 }
119 }
120 }
121
122 /// The row description for this stream's columns.
123 pub fn description(&self) -> &RowDescription {
124 &self.description
125 }
126
127 /// Returns `true` if the stream has been fully consumed or closed.
128 pub fn is_done(&self) -> bool {
129 self.done
130 }
131}
132
133/// Drain messages until ReadyForQuery.
134async fn drain_until_ready(conn: &mut PgConnection) -> Result<()> {
135 loop {
136 if let BackendMessage::ReadyForQuery { .. } = conn.recv().await? {
137 return Ok(());
138 }
139 }
140}