Skip to main content

qail_pg/driver/
query.rs

1//! Query execution methods for PostgreSQL connection.
2//!
3//! This module provides query, query_cached, and execute_simple.
4
5use super::{PgConnection, PgError, PgResult};
6use crate::protocol::{BackendMessage, PgEncoder};
7use bytes::BytesMut;
8use tokio::io::AsyncWriteExt;
9
10impl PgConnection {
11    /// Execute a query with binary parameters (crate-internal).
12    /// This uses the Extended Query Protocol (Parse/Bind/Execute/Sync):
13    /// - Parameters are sent as binary bytes, skipping the string layer
14    /// - No SQL injection possible - parameters are never interpolated
15    /// - Better performance via prepared statement reuse
16    pub(crate) async fn query(
17        &mut self,
18        sql: &str,
19        params: &[Option<Vec<u8>>],
20    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
21        let bytes = PgEncoder::encode_extended_query(sql, params)
22            .map_err(|e| PgError::Encode(e.to_string()))?;
23        self.stream.write_all(&bytes).await?;
24
25        let mut rows = Vec::new();
26
27        let mut error: Option<PgError> = None;
28
29        loop {
30            let msg = self.recv().await?;
31            match msg {
32                BackendMessage::ParseComplete => {}
33                BackendMessage::BindComplete => {}
34                BackendMessage::RowDescription(_) => {}
35                BackendMessage::DataRow(data) => {
36                    // Only collect rows if no error occurred
37                    if error.is_none() {
38                        rows.push(data);
39                    }
40                }
41                BackendMessage::CommandComplete(_) => {}
42                BackendMessage::NoData => {}
43                BackendMessage::ReadyForQuery(_) => {
44                    if let Some(err) = error {
45                        return Err(err);
46                    }
47                    return Ok(rows);
48                }
49                BackendMessage::ErrorResponse(err) => {
50                    if error.is_none() {
51                        error = Some(PgError::Query(err.message));
52                    }
53                }
54                _ => {}
55            }
56        }
57    }
58
59    /// Execute a query with cached prepared statement.
60    /// Like `query()`, but reuses prepared statements across calls.
61    /// The statement name is derived from a hash of the SQL text.
62    /// OPTIMIZED: Pre-allocated buffer + ultra-fast encoders.
63    pub async fn query_cached(
64        &mut self,
65        sql: &str,
66        params: &[Option<Vec<u8>>],
67    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
68        let stmt_name = Self::sql_to_stmt_name(sql);
69        let is_new = !self.prepared_statements.contains_key(&stmt_name);
70
71        // Pre-calculate buffer size for single allocation
72        let params_size: usize = params
73            .iter()
74            .map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
75            .sum();
76        
77        let estimated_size = if is_new {
78            50 + sql.len() + stmt_name.len() * 2 + params_size
79        } else {
80            30 + stmt_name.len() + params_size
81        };
82        
83        let mut buf = BytesMut::with_capacity(estimated_size);
84
85        if is_new {
86            buf.extend(PgEncoder::encode_parse(&stmt_name, sql, &[]));
87            // Cache the SQL for debugging
88            self.prepared_statements.insert(stmt_name.clone(), sql.to_string());
89        }
90
91        // Use ULTRA-OPTIMIZED encoders - write directly to buffer
92        PgEncoder::encode_bind_to(&mut buf, &stmt_name, params)
93            .map_err(|e| PgError::Encode(e.to_string()))?;
94        PgEncoder::encode_execute_to(&mut buf);
95        PgEncoder::encode_sync_to(&mut buf);
96
97        self.stream.write_all(&buf).await?;
98
99        let mut rows = Vec::new();
100
101        let mut error: Option<PgError> = None;
102
103        loop {
104            let msg = self.recv().await?;
105            match msg {
106                BackendMessage::ParseComplete => {
107                    // Already cached in is_new block above
108                }
109                BackendMessage::BindComplete => {}
110                BackendMessage::RowDescription(_) => {}
111                BackendMessage::DataRow(data) => {
112                    if error.is_none() {
113                        rows.push(data);
114                    }
115                }
116                BackendMessage::CommandComplete(_) => {}
117                BackendMessage::NoData => {}
118                BackendMessage::ReadyForQuery(_) => {
119                    if let Some(err) = error {
120                        return Err(err);
121                    }
122                    return Ok(rows);
123                }
124                BackendMessage::ErrorResponse(err) => {
125                    if error.is_none() {
126                        error = Some(PgError::Query(err.message));
127                        // Invalidate cache to prevent "prepared statement does not exist"
128                        // on next retry.
129                        self.prepared_statements.remove(&stmt_name);
130                    }
131                }
132                _ => {}
133            }
134        }
135    }
136
137    /// Generate a statement name from SQL hash.
138    /// Uses a simple hash to create a unique name like "stmt_12345abc".
139    pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
140        use std::collections::hash_map::DefaultHasher;
141        use std::hash::{Hash, Hasher};
142
143        let mut hasher = DefaultHasher::new();
144        sql.hash(&mut hasher);
145        format!("s{:016x}", hasher.finish())
146    }
147
148    /// Execute a simple SQL statement (no parameters).
149    pub async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
150        let bytes = PgEncoder::encode_query_string(sql);
151        self.stream.write_all(&bytes).await?;
152
153        let mut error: Option<PgError> = None;
154
155        loop {
156            let msg = self.recv().await?;
157            match msg {
158                BackendMessage::CommandComplete(_) => {}
159                BackendMessage::ReadyForQuery(_) => {
160                    if let Some(err) = error {
161                        return Err(err);
162                    }
163                    return Ok(());
164                }
165                BackendMessage::ErrorResponse(err) => {
166                    if error.is_none() {
167                        error = Some(PgError::Query(err.message));
168                    }
169                }
170                _ => {}
171            }
172        }
173    }
174
175    /// Execute a simple SQL query and return rows (Simple Query Protocol).
176    ///
177    /// Unlike `execute_simple`, this collects and returns data rows.
178    /// Used for branch management and other administrative queries.
179    pub async fn simple_query(&mut self, sql: &str) -> PgResult<Vec<super::PgRow>> {
180        use std::sync::Arc;
181        let bytes = PgEncoder::encode_query_string(sql);
182        self.stream.write_all(&bytes).await?;
183
184        let mut rows: Vec<super::PgRow> = Vec::new();
185        let mut column_info: Option<Arc<super::ColumnInfo>> = None;
186        let mut error: Option<PgError> = None;
187
188        loop {
189            let msg = self.recv().await?;
190            match msg {
191                BackendMessage::RowDescription(fields) => {
192                    column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
193                }
194                BackendMessage::DataRow(data) => {
195                    if error.is_none() {
196                        rows.push(super::PgRow {
197                            columns: data,
198                            column_info: column_info.clone(),
199                        });
200                    }
201                }
202                BackendMessage::CommandComplete(_) => {}
203                BackendMessage::ReadyForQuery(_) => {
204                    if let Some(err) = error {
205                        return Err(err);
206                    }
207                    return Ok(rows);
208                }
209                BackendMessage::ErrorResponse(err) => {
210                    if error.is_none() {
211                        error = Some(PgError::Query(err.message));
212                    }
213                }
214                _ => {}
215            }
216        }
217    }
218
219    /// ZERO-HASH sequential query using pre-computed PreparedStatement.
220    /// This is the FASTEST sequential path because it skips:
221    /// - SQL generation from AST (done once outside loop)
222    /// - Hash computation for statement name (pre-computed in PreparedStatement)
223    /// - HashMap lookup for is_new check (statement already prepared)
224    /// # Example
225    /// ```ignore
226    /// let stmt = conn.prepare("SELECT * FROM users WHERE id = $1").await?;
227    /// for id in 1..10000 {
228    ///     let rows = conn.query_prepared_single(&stmt, &[Some(id.to_string().into_bytes())]).await?;
229    /// }
230    /// ```
231    #[inline]
232    pub async fn query_prepared_single(
233        &mut self,
234        stmt: &super::PreparedStatement,
235        params: &[Option<Vec<u8>>],
236    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
237        // Pre-calculate buffer size for single allocation
238        let params_size: usize = params
239            .iter()
240            .map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
241            .sum();
242        
243        // Bind: ~15 + stmt.name.len() + params_size, Execute: 10, Sync: 5
244        let mut buf = BytesMut::with_capacity(30 + stmt.name.len() + params_size);
245
246        // ZERO HASH, ZERO LOOKUP - just encode and send!
247        PgEncoder::encode_bind_to(&mut buf, &stmt.name, params)
248            .map_err(|e| PgError::Encode(e.to_string()))?;
249        PgEncoder::encode_execute_to(&mut buf);
250        PgEncoder::encode_sync_to(&mut buf);
251
252        self.stream.write_all(&buf).await?;
253
254        let mut rows = Vec::new();
255
256        let mut error: Option<PgError> = None;
257
258        loop {
259            let msg = self.recv().await?;
260            match msg {
261                BackendMessage::BindComplete => {}
262                BackendMessage::RowDescription(_) => {}
263                BackendMessage::DataRow(data) => {
264                    if error.is_none() {
265                        rows.push(data);
266                    }
267                }
268                BackendMessage::CommandComplete(_) => {}
269                BackendMessage::NoData => {}
270                BackendMessage::ReadyForQuery(_) => {
271                    if let Some(err) = error {
272                        return Err(err);
273                    }
274                    return Ok(rows);
275                }
276                BackendMessage::ErrorResponse(err) => {
277                    if error.is_none() {
278                        error = Some(PgError::Query(err.message));
279                    }
280                }
281                _ => {}
282            }
283        }
284    }
285}