Skip to main content

sentinel_driver/connection/
query.rs

1use super::{
2    frontend, pipeline, BackendMessage, BytesMut, Connection, Duration, Error, Oid, PipelineBatch,
3    Result, Row, ToSql,
4};
5
6use crate::row::{self, SimpleQueryMessage, SimpleQueryRow};
7
8impl Connection {
9    /// Execute a query that returns rows.
10    ///
11    /// Parameters are encoded in binary format.
12    ///
13    /// ```rust,no_run
14    /// # async fn example(conn: &mut sentinel_driver::Connection) -> sentinel_driver::Result<()> {
15    /// let rows = conn.query("SELECT * FROM users WHERE id = $1", &[&42i32]).await?;
16    /// # Ok(())
17    /// # }
18    /// ```
19    pub async fn query(&mut self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
20        if let Some(timeout) = self.query_timeout {
21            return self.query_with_timeout(sql, params, timeout).await;
22        }
23
24        let result = self.query_internal(sql, params).await?;
25        match result {
26            pipeline::QueryResult::Rows(rows) => Ok(rows),
27            pipeline::QueryResult::Command(_) => Ok(Vec::new()),
28        }
29    }
30
31    /// Execute a query that returns a single row.
32    ///
33    /// Returns an error if no rows are returned.
34    pub async fn query_one(&mut self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Row> {
35        let rows = self.query(sql, params).await?;
36        rows.into_iter()
37            .next()
38            .ok_or_else(|| Error::Protocol("query returned no rows".into()))
39    }
40
41    /// Execute a query that returns an optional single row.
42    pub async fn query_opt(
43        &mut self,
44        sql: &str,
45        params: &[&(dyn ToSql + Sync)],
46    ) -> Result<Option<Row>> {
47        let rows = self.query(sql, params).await?;
48        Ok(rows.into_iter().next())
49    }
50
51    /// Execute a non-SELECT query (INSERT, UPDATE, DELETE, etc.).
52    ///
53    /// Returns the number of rows affected.
54    pub async fn execute(&mut self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
55        if let Some(timeout) = self.query_timeout {
56            return self.execute_with_timeout(sql, params, timeout).await;
57        }
58
59        let result = self.query_internal(sql, params).await?;
60        match result {
61            pipeline::QueryResult::Command(r) => Ok(r.rows_affected),
62            pipeline::QueryResult::Rows(_) => Ok(0),
63        }
64    }
65
66    /// Execute a query with a timeout.
67    ///
68    /// If the query does not complete within `timeout`, a cancel request
69    /// is sent to the server and the connection is marked as broken.
70    pub async fn query_with_timeout(
71        &mut self,
72        sql: &str,
73        params: &[&(dyn ToSql + Sync)],
74        timeout: Duration,
75    ) -> Result<Vec<Row>> {
76        let cancel_token = self.cancel_token();
77
78        match tokio::time::timeout(timeout, self.query_internal(sql, params)).await {
79            Ok(result) => {
80                let result = result?;
81                match result {
82                    pipeline::QueryResult::Rows(rows) => Ok(rows),
83                    pipeline::QueryResult::Command(_) => Ok(Vec::new()),
84                }
85            }
86            Err(_elapsed) => {
87                self.is_broken = true;
88                // Fire-and-forget cancel
89                tokio::spawn(async move {
90                    cancel_token.cancel().await.ok();
91                });
92                Err(Error::Timeout(format!(
93                    "query timeout after {}ms",
94                    timeout.as_millis()
95                )))
96            }
97        }
98    }
99
100    /// Execute a non-SELECT query with a timeout.
101    ///
102    /// If the query does not complete within `timeout`, a cancel request
103    /// is sent to the server and the connection is marked as broken.
104    pub async fn execute_with_timeout(
105        &mut self,
106        sql: &str,
107        params: &[&(dyn ToSql + Sync)],
108        timeout: Duration,
109    ) -> Result<u64> {
110        let cancel_token = self.cancel_token();
111
112        match tokio::time::timeout(timeout, self.query_internal(sql, params)).await {
113            Ok(result) => {
114                let result = result?;
115                match result {
116                    pipeline::QueryResult::Command(r) => Ok(r.rows_affected),
117                    pipeline::QueryResult::Rows(_) => Ok(0),
118                }
119            }
120            Err(_elapsed) => {
121                self.is_broken = true;
122                tokio::spawn(async move {
123                    cancel_token.cancel().await.ok();
124                });
125                Err(Error::Timeout(format!(
126                    "query timeout after {}ms",
127                    timeout.as_millis()
128                )))
129            }
130        }
131    }
132
133    /// Execute a simple query (no parameters, text protocol).
134    ///
135    /// Returns row data (in text format) and command completions. Useful
136    /// for DDL statements, multi-statement queries, and queries where you
137    /// don't need binary-decoded typed values.
138    ///
139    /// ```rust,no_run
140    /// # async fn example(conn: &mut sentinel_driver::Connection) -> sentinel_driver::Result<()> {
141    /// use sentinel_driver::SimpleQueryMessage;
142    ///
143    /// let messages = conn.simple_query("SELECT 1 AS n; SELECT 'hello' AS greeting").await?;
144    /// for msg in &messages {
145    ///     match msg {
146    ///         SimpleQueryMessage::Row(row) => {
147    ///             println!("value: {:?}", row.get(0));
148    ///         }
149    ///         SimpleQueryMessage::CommandComplete(n) => {
150    ///             println!("rows: {n}");
151    ///         }
152    ///     }
153    /// }
154    /// # Ok(())
155    /// # }
156    /// ```
157    pub async fn simple_query(&mut self, sql: &str) -> Result<Vec<SimpleQueryMessage>> {
158        frontend::query(self.conn.write_buf(), sql);
159        self.conn.send().await?;
160
161        let mut results = Vec::new();
162
163        loop {
164            match self.conn.recv().await? {
165                BackendMessage::DataRow { columns } => {
166                    // Extract text-format column values from DataRow
167                    let mut text_columns = Vec::with_capacity(columns.len());
168                    for i in 0..columns.len() {
169                        let value = columns
170                            .get(i)
171                            .map(|bytes| String::from_utf8_lossy(&bytes).into_owned());
172                        text_columns.push(value);
173                    }
174                    results.push(SimpleQueryMessage::Row(SimpleQueryRow::new(text_columns)));
175                }
176                BackendMessage::CommandComplete { tag } => {
177                    let parsed = row::parse_command_tag(&tag);
178                    results.push(SimpleQueryMessage::CommandComplete(parsed.rows_affected));
179                }
180                BackendMessage::ReadyForQuery { transaction_status } => {
181                    self.transaction_status = transaction_status;
182                    break;
183                }
184                BackendMessage::ErrorResponse { fields } => {
185                    self.drain_until_ready().await.ok();
186                    return Err(Error::server(
187                        fields.severity,
188                        fields.code,
189                        fields.message,
190                        fields.detail,
191                        fields.hint,
192                        fields.position,
193                    ));
194                }
195                _ => {}
196            }
197        }
198
199        Ok(results)
200    }
201
202    // ── query_typed ────────────────────────────────────
203
204    /// Execute a query with inline parameter types, skipping the prepare step.
205    ///
206    /// Instead of a separate Prepare round-trip, the parameter types are
207    /// specified directly in the Parse message. This saves one round-trip
208    /// compared to [`query()`](Self::query) at the cost of requiring the
209    /// caller to specify types explicitly.
210    ///
211    /// ```rust,no_run
212    /// # async fn example(conn: &mut sentinel_driver::Connection) -> sentinel_driver::Result<()> {
213    /// use sentinel_driver::Oid;
214    ///
215    /// let rows = conn.query_typed(
216    ///     "SELECT $1::int4 + $2::int4 AS sum",
217    ///     &[(&1i32, Oid::INT4), (&2i32, Oid::INT4)],
218    /// ).await?;
219    /// # Ok(())
220    /// # }
221    /// ```
222    pub async fn query_typed(
223        &mut self,
224        sql: &str,
225        params: &[(&(dyn ToSql + Sync), Oid)],
226    ) -> Result<Vec<Row>> {
227        let result = self.query_typed_internal(sql, params).await?;
228        match result {
229            pipeline::QueryResult::Rows(rows) => Ok(rows),
230            pipeline::QueryResult::Command(_) => Ok(Vec::new()),
231        }
232    }
233
234    /// Execute a typed query that returns a single row.
235    pub async fn query_typed_one(
236        &mut self,
237        sql: &str,
238        params: &[(&(dyn ToSql + Sync), Oid)],
239    ) -> Result<Row> {
240        let rows = self.query_typed(sql, params).await?;
241        rows.into_iter()
242            .next()
243            .ok_or_else(|| Error::Protocol("query returned no rows".into()))
244    }
245
246    /// Execute a typed query that returns an optional single row.
247    pub async fn query_typed_opt(
248        &mut self,
249        sql: &str,
250        params: &[(&(dyn ToSql + Sync), Oid)],
251    ) -> Result<Option<Row>> {
252        let rows = self.query_typed(sql, params).await?;
253        Ok(rows.into_iter().next())
254    }
255
256    /// Execute a typed non-SELECT query, returning rows affected.
257    pub async fn execute_typed(
258        &mut self,
259        sql: &str,
260        params: &[(&(dyn ToSql + Sync), Oid)],
261    ) -> Result<u64> {
262        let result = self.query_typed_internal(sql, params).await?;
263        match result {
264            pipeline::QueryResult::Command(r) => Ok(r.rows_affected),
265            pipeline::QueryResult::Rows(_) => Ok(0),
266        }
267    }
268
269    async fn query_typed_internal(
270        &mut self,
271        sql: &str,
272        params: &[(&(dyn ToSql + Sync), Oid)],
273    ) -> Result<pipeline::QueryResult> {
274        let param_types: Vec<u32> = params.iter().map(|(_, oid)| oid.0).collect();
275        let mut encoded_params: Vec<Option<Vec<u8>>> = Vec::with_capacity(params.len());
276
277        for (value, _) in params {
278            if value.is_null() {
279                encoded_params.push(None);
280            } else {
281                let mut buf = BytesMut::new();
282                value.to_sql(&mut buf)?;
283                encoded_params.push(Some(buf.to_vec()));
284            }
285        }
286
287        let mut batch = PipelineBatch::new();
288        batch.add(sql.to_string(), param_types, encoded_params);
289
290        let mut results = batch.execute(&mut self.conn).await?;
291        results
292            .pop()
293            .ok_or_else(|| Error::protocol("pipeline returned no results"))
294    }
295}