1use super::{PgConnection, PgError, PgResult};
6use crate::protocol::{BackendMessage, PgEncoder};
7use bytes::BytesMut;
8use tokio::io::AsyncWriteExt;
9
10impl PgConnection {
11 pub(crate) async fn query(
17 &mut self,
18 sql: &str,
19 params: &[Option<Vec<u8>>],
20 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
21 let bytes = PgEncoder::encode_extended_query(sql, params)
22 .map_err(|e| PgError::Encode(e.to_string()))?;
23 self.stream.write_all(&bytes).await?;
24
25 let mut rows = Vec::new();
26
27 let mut error: Option<PgError> = None;
28
29 loop {
30 let msg = self.recv().await?;
31 match msg {
32 BackendMessage::ParseComplete => {}
33 BackendMessage::BindComplete => {}
34 BackendMessage::RowDescription(_) => {}
35 BackendMessage::DataRow(data) => {
36 if error.is_none() {
38 rows.push(data);
39 }
40 }
41 BackendMessage::CommandComplete(_) => {}
42 BackendMessage::NoData => {}
43 BackendMessage::ReadyForQuery(_) => {
44 if let Some(err) = error {
45 return Err(err);
46 }
47 return Ok(rows);
48 }
49 BackendMessage::ErrorResponse(err) => {
50 if error.is_none() {
51 error = Some(PgError::Query(err.message));
52 }
53 }
54 _ => {}
55 }
56 }
57 }
58
59 pub async fn query_cached(
64 &mut self,
65 sql: &str,
66 params: &[Option<Vec<u8>>],
67 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
68 let stmt_name = Self::sql_to_stmt_name(sql);
69 let is_new = !self.prepared_statements.contains_key(&stmt_name);
70
71 let params_size: usize = params
73 .iter()
74 .map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
75 .sum();
76
77 let estimated_size = if is_new {
78 50 + sql.len() + stmt_name.len() * 2 + params_size
79 } else {
80 30 + stmt_name.len() + params_size
81 };
82
83 let mut buf = BytesMut::with_capacity(estimated_size);
84
85 if is_new {
86 buf.extend(PgEncoder::encode_parse(&stmt_name, sql, &[]));
87 self.prepared_statements.insert(stmt_name.clone(), sql.to_string());
89 }
90
91 PgEncoder::encode_bind_to(&mut buf, &stmt_name, params)
93 .map_err(|e| PgError::Encode(e.to_string()))?;
94 PgEncoder::encode_execute_to(&mut buf);
95 PgEncoder::encode_sync_to(&mut buf);
96
97 self.stream.write_all(&buf).await?;
98
99 let mut rows = Vec::new();
100
101 let mut error: Option<PgError> = None;
102
103 loop {
104 let msg = self.recv().await?;
105 match msg {
106 BackendMessage::ParseComplete => {
107 }
109 BackendMessage::BindComplete => {}
110 BackendMessage::RowDescription(_) => {}
111 BackendMessage::DataRow(data) => {
112 if error.is_none() {
113 rows.push(data);
114 }
115 }
116 BackendMessage::CommandComplete(_) => {}
117 BackendMessage::NoData => {}
118 BackendMessage::ReadyForQuery(_) => {
119 if let Some(err) = error {
120 return Err(err);
121 }
122 return Ok(rows);
123 }
124 BackendMessage::ErrorResponse(err) => {
125 if error.is_none() {
126 error = Some(PgError::Query(err.message));
127 self.prepared_statements.remove(&stmt_name);
130 }
131 }
132 _ => {}
133 }
134 }
135 }
136
137 pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
140 use std::collections::hash_map::DefaultHasher;
141 use std::hash::{Hash, Hasher};
142
143 let mut hasher = DefaultHasher::new();
144 sql.hash(&mut hasher);
145 format!("s{:016x}", hasher.finish())
146 }
147
148 pub async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
150 let bytes = PgEncoder::encode_query_string(sql);
151 self.stream.write_all(&bytes).await?;
152
153 let mut error: Option<PgError> = None;
154
155 loop {
156 let msg = self.recv().await?;
157 match msg {
158 BackendMessage::CommandComplete(_) => {}
159 BackendMessage::ReadyForQuery(_) => {
160 if let Some(err) = error {
161 return Err(err);
162 }
163 return Ok(());
164 }
165 BackendMessage::ErrorResponse(err) => {
166 if error.is_none() {
167 error = Some(PgError::Query(err.message));
168 }
169 }
170 _ => {}
171 }
172 }
173 }
174
175 pub async fn simple_query(&mut self, sql: &str) -> PgResult<Vec<super::PgRow>> {
180 use std::sync::Arc;
181 let bytes = PgEncoder::encode_query_string(sql);
182 self.stream.write_all(&bytes).await?;
183
184 let mut rows: Vec<super::PgRow> = Vec::new();
185 let mut column_info: Option<Arc<super::ColumnInfo>> = None;
186 let mut error: Option<PgError> = None;
187
188 loop {
189 let msg = self.recv().await?;
190 match msg {
191 BackendMessage::RowDescription(fields) => {
192 column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
193 }
194 BackendMessage::DataRow(data) => {
195 if error.is_none() {
196 rows.push(super::PgRow {
197 columns: data,
198 column_info: column_info.clone(),
199 });
200 }
201 }
202 BackendMessage::CommandComplete(_) => {}
203 BackendMessage::ReadyForQuery(_) => {
204 if let Some(err) = error {
205 return Err(err);
206 }
207 return Ok(rows);
208 }
209 BackendMessage::ErrorResponse(err) => {
210 if error.is_none() {
211 error = Some(PgError::Query(err.message));
212 }
213 }
214 _ => {}
215 }
216 }
217 }
218
219 #[inline]
232 pub async fn query_prepared_single(
233 &mut self,
234 stmt: &super::PreparedStatement,
235 params: &[Option<Vec<u8>>],
236 ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
237 let params_size: usize = params
239 .iter()
240 .map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
241 .sum();
242
243 let mut buf = BytesMut::with_capacity(30 + stmt.name.len() + params_size);
245
246 PgEncoder::encode_bind_to(&mut buf, &stmt.name, params)
248 .map_err(|e| PgError::Encode(e.to_string()))?;
249 PgEncoder::encode_execute_to(&mut buf);
250 PgEncoder::encode_sync_to(&mut buf);
251
252 self.stream.write_all(&buf).await?;
253
254 let mut rows = Vec::new();
255
256 let mut error: Option<PgError> = None;
257
258 loop {
259 let msg = self.recv().await?;
260 match msg {
261 BackendMessage::BindComplete => {}
262 BackendMessage::RowDescription(_) => {}
263 BackendMessage::DataRow(data) => {
264 if error.is_none() {
265 rows.push(data);
266 }
267 }
268 BackendMessage::CommandComplete(_) => {}
269 BackendMessage::NoData => {}
270 BackendMessage::ReadyForQuery(_) => {
271 if let Some(err) = error {
272 return Err(err);
273 }
274 return Ok(rows);
275 }
276 BackendMessage::ErrorResponse(err) => {
277 if error.is_none() {
278 error = Some(PgError::Query(err.message));
279 }
280 }
281 _ => {}
282 }
283 }
284 }
285}