sentinel_driver/connection/
query.rs1use super::{
2 frontend, pipeline, BackendMessage, BytesMut, Connection, Duration, Error, Oid, PipelineBatch,
3 Result, Row, ToSql,
4};
5
6use crate::row::{self, SimpleQueryMessage, SimpleQueryRow};
7
8impl Connection {
9 pub async fn query(&mut self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
20 if let Some(timeout) = self.query_timeout {
21 return self.query_with_timeout(sql, params, timeout).await;
22 }
23
24 let result = self.query_internal(sql, params).await?;
25 match result {
26 pipeline::QueryResult::Rows(rows) => Ok(rows),
27 pipeline::QueryResult::Command(_) => Ok(Vec::new()),
28 }
29 }
30
31 pub async fn query_one(&mut self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Row> {
35 let rows = self.query(sql, params).await?;
36 rows.into_iter()
37 .next()
38 .ok_or_else(|| Error::Protocol("query returned no rows".into()))
39 }
40
41 pub async fn query_opt(
43 &mut self,
44 sql: &str,
45 params: &[&(dyn ToSql + Sync)],
46 ) -> Result<Option<Row>> {
47 let rows = self.query(sql, params).await?;
48 Ok(rows.into_iter().next())
49 }
50
51 pub async fn execute(&mut self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
55 if let Some(timeout) = self.query_timeout {
56 return self.execute_with_timeout(sql, params, timeout).await;
57 }
58
59 let result = self.query_internal(sql, params).await?;
60 match result {
61 pipeline::QueryResult::Command(r) => Ok(r.rows_affected),
62 pipeline::QueryResult::Rows(_) => Ok(0),
63 }
64 }
65
66 pub async fn query_with_timeout(
71 &mut self,
72 sql: &str,
73 params: &[&(dyn ToSql + Sync)],
74 timeout: Duration,
75 ) -> Result<Vec<Row>> {
76 let cancel_token = self.cancel_token();
77
78 match tokio::time::timeout(timeout, self.query_internal(sql, params)).await {
79 Ok(result) => {
80 let result = result?;
81 match result {
82 pipeline::QueryResult::Rows(rows) => Ok(rows),
83 pipeline::QueryResult::Command(_) => Ok(Vec::new()),
84 }
85 }
86 Err(_elapsed) => {
87 self.is_broken = true;
88 tokio::spawn(async move {
90 cancel_token.cancel().await.ok();
91 });
92 Err(Error::Timeout(format!(
93 "query timeout after {}ms",
94 timeout.as_millis()
95 )))
96 }
97 }
98 }
99
100 pub async fn execute_with_timeout(
105 &mut self,
106 sql: &str,
107 params: &[&(dyn ToSql + Sync)],
108 timeout: Duration,
109 ) -> Result<u64> {
110 let cancel_token = self.cancel_token();
111
112 match tokio::time::timeout(timeout, self.query_internal(sql, params)).await {
113 Ok(result) => {
114 let result = result?;
115 match result {
116 pipeline::QueryResult::Command(r) => Ok(r.rows_affected),
117 pipeline::QueryResult::Rows(_) => Ok(0),
118 }
119 }
120 Err(_elapsed) => {
121 self.is_broken = true;
122 tokio::spawn(async move {
123 cancel_token.cancel().await.ok();
124 });
125 Err(Error::Timeout(format!(
126 "query timeout after {}ms",
127 timeout.as_millis()
128 )))
129 }
130 }
131 }
132
133 pub async fn simple_query(&mut self, sql: &str) -> Result<Vec<SimpleQueryMessage>> {
158 frontend::query(self.conn.write_buf(), sql);
159 self.conn.send().await?;
160
161 let mut results = Vec::new();
162
163 loop {
164 match self.conn.recv().await? {
165 BackendMessage::DataRow { columns } => {
166 let mut text_columns = Vec::with_capacity(columns.len());
168 for i in 0..columns.len() {
169 let value = columns
170 .get(i)
171 .map(|bytes| String::from_utf8_lossy(&bytes).into_owned());
172 text_columns.push(value);
173 }
174 results.push(SimpleQueryMessage::Row(SimpleQueryRow::new(text_columns)));
175 }
176 BackendMessage::CommandComplete { tag } => {
177 let parsed = row::parse_command_tag(&tag);
178 results.push(SimpleQueryMessage::CommandComplete(parsed.rows_affected));
179 }
180 BackendMessage::ReadyForQuery { transaction_status } => {
181 self.transaction_status = transaction_status;
182 break;
183 }
184 BackendMessage::ErrorResponse { fields } => {
185 self.drain_until_ready().await.ok();
186 return Err(Error::server(
187 fields.severity,
188 fields.code,
189 fields.message,
190 fields.detail,
191 fields.hint,
192 fields.position,
193 ));
194 }
195 _ => {}
196 }
197 }
198
199 Ok(results)
200 }
201
202 pub async fn query_typed(
223 &mut self,
224 sql: &str,
225 params: &[(&(dyn ToSql + Sync), Oid)],
226 ) -> Result<Vec<Row>> {
227 let result = self.query_typed_internal(sql, params).await?;
228 match result {
229 pipeline::QueryResult::Rows(rows) => Ok(rows),
230 pipeline::QueryResult::Command(_) => Ok(Vec::new()),
231 }
232 }
233
234 pub async fn query_typed_one(
236 &mut self,
237 sql: &str,
238 params: &[(&(dyn ToSql + Sync), Oid)],
239 ) -> Result<Row> {
240 let rows = self.query_typed(sql, params).await?;
241 rows.into_iter()
242 .next()
243 .ok_or_else(|| Error::Protocol("query returned no rows".into()))
244 }
245
246 pub async fn query_typed_opt(
248 &mut self,
249 sql: &str,
250 params: &[(&(dyn ToSql + Sync), Oid)],
251 ) -> Result<Option<Row>> {
252 let rows = self.query_typed(sql, params).await?;
253 Ok(rows.into_iter().next())
254 }
255
256 pub async fn execute_typed(
258 &mut self,
259 sql: &str,
260 params: &[(&(dyn ToSql + Sync), Oid)],
261 ) -> Result<u64> {
262 let result = self.query_typed_internal(sql, params).await?;
263 match result {
264 pipeline::QueryResult::Command(r) => Ok(r.rows_affected),
265 pipeline::QueryResult::Rows(_) => Ok(0),
266 }
267 }
268
269 async fn query_typed_internal(
270 &mut self,
271 sql: &str,
272 params: &[(&(dyn ToSql + Sync), Oid)],
273 ) -> Result<pipeline::QueryResult> {
274 let param_types: Vec<u32> = params.iter().map(|(_, oid)| oid.0).collect();
275 let mut encoded_params: Vec<Option<Vec<u8>>> = Vec::with_capacity(params.len());
276
277 for (value, _) in params {
278 if value.is_null() {
279 encoded_params.push(None);
280 } else {
281 let mut buf = BytesMut::new();
282 value.to_sql(&mut buf)?;
283 encoded_params.push(Some(buf.to_vec()));
284 }
285 }
286
287 let mut batch = PipelineBatch::new();
288 batch.add(sql.to_string(), param_types, encoded_params);
289
290 let mut results = batch.execute(&mut self.conn).await?;
291 results
292 .pop()
293 .ok_or_else(|| Error::protocol("pipeline returned no results"))
294 }
295}