sentinel_driver/connection/
stream_impl.rs1use std::sync::Arc;
2
3use super::{frontend, BackendMessage, BytesMut, Connection, Error, Result, RowDescription, ToSql};
4use crate::stream::RowStream;
5
6impl Connection {
7 pub async fn query_stream(
26 &mut self,
27 sql: &str,
28 params: &[&(dyn ToSql + Sync)],
29 ) -> Result<RowStream<'_>> {
30 let param_types: Vec<u32> = params.iter().map(|p| p.oid().0).collect();
32 let mut encoded_params: Vec<Option<&[u8]>> = Vec::with_capacity(params.len());
33 let mut param_bufs: Vec<BytesMut> = Vec::with_capacity(params.len());
34
35 for param in params {
36 let mut buf = BytesMut::new();
37 param.to_sql(&mut buf)?;
38 param_bufs.push(buf);
39 }
40 for buf in ¶m_bufs {
41 encoded_params.push(Some(buf.as_ref()));
42 }
43
44 frontend::parse(self.conn.write_buf(), "", sql, ¶m_types);
46 frontend::bind(self.conn.write_buf(), "", "", &encoded_params, &[]);
47 frontend::describe_portal(self.conn.write_buf(), "");
48 frontend::execute(self.conn.write_buf(), "", 0);
49 frontend::sync(self.conn.write_buf());
50 self.conn.send().await?;
51
52 match self.conn.recv().await? {
54 BackendMessage::ParseComplete => {}
55 BackendMessage::ErrorResponse { fields } => {
56 self.drain_until_ready().await.ok();
57 return Err(Error::server(
58 fields.severity,
59 fields.code,
60 fields.message,
61 fields.detail,
62 fields.hint,
63 fields.position,
64 ));
65 }
66 other => {
67 return Err(Error::protocol(format!(
68 "expected ParseComplete, got {other:?}"
69 )));
70 }
71 }
72
73 match self.conn.recv().await? {
75 BackendMessage::BindComplete => {}
76 BackendMessage::ErrorResponse { fields } => {
77 self.drain_until_ready().await.ok();
78 return Err(Error::server(
79 fields.severity,
80 fields.code,
81 fields.message,
82 fields.detail,
83 fields.hint,
84 fields.position,
85 ));
86 }
87 other => {
88 return Err(Error::protocol(format!(
89 "expected BindComplete, got {other:?}"
90 )));
91 }
92 }
93
94 let description = match self.conn.recv().await? {
96 BackendMessage::RowDescription { fields } => Arc::new(RowDescription::new(fields)),
97 BackendMessage::NoData => {
98 self.drain_until_ready().await.ok();
100 return Err(Error::protocol(
101 "query_stream requires a query that returns rows".to_string(),
102 ));
103 }
104 BackendMessage::ErrorResponse { fields } => {
105 self.drain_until_ready().await.ok();
106 return Err(Error::server(
107 fields.severity,
108 fields.code,
109 fields.message,
110 fields.detail,
111 fields.hint,
112 fields.position,
113 ));
114 }
115 other => {
116 return Err(Error::protocol(format!(
117 "expected RowDescription, got {other:?}"
118 )));
119 }
120 };
121
122 Ok(RowStream::new(&mut self.conn, description))
123 }
124}