Skip to main content

sentinel_driver/connection/
stream_impl.rs

1use std::sync::Arc;
2
3use super::{frontend, BackendMessage, BytesMut, Connection, Error, Result, RowDescription, ToSql};
4use crate::stream::RowStream;
5
6impl Connection {
7    /// Execute a streaming query that returns rows one at a time.
8    ///
9    /// Unlike [`query()`](Connection::query) which materializes all rows
10    /// in memory, this returns a [`RowStream`] that yields rows lazily.
11    /// Ideal for large result sets.
12    ///
13    /// The stream holds an exclusive borrow of the connection — no other
14    /// queries can run until the stream is dropped or fully consumed.
15    ///
16    /// ```rust,no_run
17    /// # async fn example(conn: &mut sentinel_driver::Connection) -> sentinel_driver::Result<()> {
18    /// let mut stream = conn.query_stream("SELECT * FROM users", &[]).await?;
19    /// while let Some(row) = stream.next().await? {
20    ///     let name: String = row.get(0);
21    /// }
22    /// # Ok(())
23    /// # }
24    /// ```
25    pub async fn query_stream(
26        &mut self,
27        sql: &str,
28        params: &[&(dyn ToSql + Sync)],
29    ) -> Result<RowStream<'_>> {
30        // Encode parameters
31        let param_types: Vec<u32> = params.iter().map(|p| p.oid().0).collect();
32        let mut encoded_params: Vec<Option<&[u8]>> = Vec::with_capacity(params.len());
33        let mut param_bufs: Vec<BytesMut> = Vec::with_capacity(params.len());
34
35        for param in params {
36            let mut buf = BytesMut::new();
37            param.to_sql(&mut buf)?;
38            param_bufs.push(buf);
39        }
40        for buf in &param_bufs {
41            encoded_params.push(Some(buf.as_ref()));
42        }
43
44        // Send Parse + Bind + Describe + Execute + Sync
45        frontend::parse(self.conn.write_buf(), "", sql, &param_types);
46        frontend::bind(self.conn.write_buf(), "", "", &encoded_params, &[]);
47        frontend::describe_portal(self.conn.write_buf(), "");
48        frontend::execute(self.conn.write_buf(), "", 0);
49        frontend::sync(self.conn.write_buf());
50        self.conn.send().await?;
51
52        // Read ParseComplete
53        match self.conn.recv().await? {
54            BackendMessage::ParseComplete => {}
55            BackendMessage::ErrorResponse { fields } => {
56                self.drain_until_ready().await.ok();
57                return Err(Error::server(
58                    fields.severity,
59                    fields.code,
60                    fields.message,
61                    fields.detail,
62                    fields.hint,
63                    fields.position,
64                ));
65            }
66            other => {
67                return Err(Error::protocol(format!(
68                    "expected ParseComplete, got {other:?}"
69                )));
70            }
71        }
72
73        // Read BindComplete
74        match self.conn.recv().await? {
75            BackendMessage::BindComplete => {}
76            BackendMessage::ErrorResponse { fields } => {
77                self.drain_until_ready().await.ok();
78                return Err(Error::server(
79                    fields.severity,
80                    fields.code,
81                    fields.message,
82                    fields.detail,
83                    fields.hint,
84                    fields.position,
85                ));
86            }
87            other => {
88                return Err(Error::protocol(format!(
89                    "expected BindComplete, got {other:?}"
90                )));
91            }
92        }
93
94        // Read RowDescription (required for streaming — NoData means no rows to stream)
95        let description = match self.conn.recv().await? {
96            BackendMessage::RowDescription { fields } => Arc::new(RowDescription::new(fields)),
97            BackendMessage::NoData => {
98                // Non-SELECT query — drain remaining and return error
99                self.drain_until_ready().await.ok();
100                return Err(Error::protocol(
101                    "query_stream requires a query that returns rows".to_string(),
102                ));
103            }
104            BackendMessage::ErrorResponse { fields } => {
105                self.drain_until_ready().await.ok();
106                return Err(Error::server(
107                    fields.severity,
108                    fields.code,
109                    fields.message,
110                    fields.detail,
111                    fields.hint,
112                    fields.position,
113                ));
114            }
115            other => {
116                return Err(Error::protocol(format!(
117                    "expected RowDescription, got {other:?}"
118                )));
119            }
120        };
121
122        Ok(RowStream::new(&mut self.conn, description))
123    }
124}