1use super::{PgConnection, PgError, PgResult};
6use crate::protocol::{BackendMessage, PgEncoder};
7use bytes::BytesMut;
8use tokio::io::AsyncWriteExt;
9
10impl PgConnection {
11 pub(crate) async fn query(
17 &mut self,
18 sql: &str,
19 params: &[Option<Vec<u8>>],
20 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
21 let bytes = PgEncoder::encode_extended_query(sql, params)
22 .map_err(|e| PgError::Encode(e.to_string()))?;
23 self.stream.write_all(&bytes).await?;
24
25 let mut rows = Vec::new();
26
27 let mut error: Option<PgError> = None;
28
29 loop {
30 let msg = self.recv().await?;
31 match msg {
32 BackendMessage::ParseComplete => {}
33 BackendMessage::BindComplete => {}
34 BackendMessage::RowDescription(_) => {}
35 BackendMessage::DataRow(data) => {
36 if error.is_none() {
38 rows.push(data);
39 }
40 }
41 BackendMessage::CommandComplete(_) => {}
42 BackendMessage::NoData => {}
43 BackendMessage::ReadyForQuery(_) => {
44 if let Some(err) = error {
45 return Err(err);
46 }
47 return Ok(rows);
48 }
49 BackendMessage::ErrorResponse(err) => {
50 if error.is_none() {
51 error = Some(PgError::Query(err.message));
52 }
53 }
54 _ => {}
55 }
56 }
57 }
58
59 pub async fn query_cached(
64 &mut self,
65 sql: &str,
66 params: &[Option<Vec<u8>>],
67 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
68 let stmt_name = Self::sql_to_stmt_name(sql);
69 let is_new = !self.prepared_statements.contains_key(&stmt_name);
70
71 let params_size: usize = params
73 .iter()
74 .map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
75 .sum();
76
77 let estimated_size = if is_new {
78 50 + sql.len() + stmt_name.len() * 2 + params_size
79 } else {
80 30 + stmt_name.len() + params_size
81 };
82
83 let mut buf = BytesMut::with_capacity(estimated_size);
84
85 if is_new {
86 buf.extend(PgEncoder::encode_parse(&stmt_name, sql, &[]));
87 self.prepared_statements.insert(stmt_name.clone(), sql.to_string());
89 }
90
91 PgEncoder::encode_bind_to(&mut buf, &stmt_name, params)
93 .map_err(|e| PgError::Encode(e.to_string()))?;
94 PgEncoder::encode_execute_to(&mut buf);
95 PgEncoder::encode_sync_to(&mut buf);
96
97 self.stream.write_all(&buf).await?;
98
99 let mut rows = Vec::new();
100
101 let mut error: Option<PgError> = None;
102
103 loop {
104 let msg = self.recv().await?;
105 match msg {
106 BackendMessage::ParseComplete => {
107 }
109 BackendMessage::BindComplete => {}
110 BackendMessage::RowDescription(_) => {}
111 BackendMessage::DataRow(data) => {
112 if error.is_none() {
113 rows.push(data);
114 }
115 }
116 BackendMessage::CommandComplete(_) => {}
117 BackendMessage::NoData => {}
118 BackendMessage::ReadyForQuery(_) => {
119 if let Some(err) = error {
120 return Err(err);
121 }
122 return Ok(rows);
123 }
124 BackendMessage::ErrorResponse(err) => {
125 if error.is_none() {
126 error = Some(PgError::Query(err.message));
127 self.prepared_statements.remove(&stmt_name);
130 }
131 }
132 _ => {}
133 }
134 }
135 }
136
137 pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
140 use std::collections::hash_map::DefaultHasher;
141 use std::hash::{Hash, Hasher};
142
143 let mut hasher = DefaultHasher::new();
144 sql.hash(&mut hasher);
145 format!("s{:016x}", hasher.finish())
146 }
147
148 pub(crate) async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
150 let bytes = PgEncoder::encode_query_string(sql);
151 self.stream.write_all(&bytes).await?;
152
153 let mut error: Option<PgError> = None;
154
155 loop {
156 let msg = self.recv().await?;
157 match msg {
158 BackendMessage::CommandComplete(_) => {}
159 BackendMessage::ReadyForQuery(_) => {
160 if let Some(err) = error {
161 return Err(err);
162 }
163 return Ok(());
164 }
165 BackendMessage::ErrorResponse(err) => {
166 if error.is_none() {
167 error = Some(PgError::Query(err.message));
168 }
169 }
170 _ => {}
171 }
172 }
173 }
174
175 #[inline]
188 pub async fn query_prepared_single(
189 &mut self,
190 stmt: &super::PreparedStatement,
191 params: &[Option<Vec<u8>>],
192 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
193 let params_size: usize = params
195 .iter()
196 .map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
197 .sum();
198
199 let mut buf = BytesMut::with_capacity(30 + stmt.name.len() + params_size);
201
202 PgEncoder::encode_bind_to(&mut buf, &stmt.name, params)
204 .map_err(|e| PgError::Encode(e.to_string()))?;
205 PgEncoder::encode_execute_to(&mut buf);
206 PgEncoder::encode_sync_to(&mut buf);
207
208 self.stream.write_all(&buf).await?;
209
210 let mut rows = Vec::new();
211
212 let mut error: Option<PgError> = None;
213
214 loop {
215 let msg = self.recv().await?;
216 match msg {
217 BackendMessage::BindComplete => {}
218 BackendMessage::RowDescription(_) => {}
219 BackendMessage::DataRow(data) => {
220 if error.is_none() {
221 rows.push(data);
222 }
223 }
224 BackendMessage::CommandComplete(_) => {}
225 BackendMessage::NoData => {}
226 BackendMessage::ReadyForQuery(_) => {
227 if let Some(err) = error {
228 return Err(err);
229 }
230 return Ok(rows);
231 }
232 BackendMessage::ErrorResponse(err) => {
233 if error.is_none() {
234 error = Some(PgError::Query(err.message));
235 }
236 }
237 _ => {}
238 }
239 }
240 }
241}