qail_pg/driver/
query.rs

1//! Query execution methods for PostgreSQL connection.
2//!
3//! This module provides query, query_cached, and execute_simple.
4
5use super::{PgConnection, PgError, PgResult};
6use crate::protocol::{BackendMessage, PgEncoder};
7use bytes::BytesMut;
8use tokio::io::AsyncWriteExt;
9
10impl PgConnection {
11    /// Execute a query with binary parameters (crate-internal).
12    /// This uses the Extended Query Protocol (Parse/Bind/Execute/Sync):
13    /// - Parameters are sent as binary bytes, skipping the string layer
14    /// - No SQL injection possible - parameters are never interpolated
15    /// - Better performance via prepared statement reuse
16    pub(crate) async fn query(
17        &mut self,
18        sql: &str,
19        params: &[Option<Vec<u8>>],
20    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
21        let bytes = PgEncoder::encode_extended_query(sql, params)
22            .map_err(|e| PgError::Encode(e.to_string()))?;
23        self.stream.write_all(&bytes).await?;
24
25        let mut rows = Vec::new();
26
27        let mut error: Option<PgError> = None;
28
29        loop {
30            let msg = self.recv().await?;
31            match msg {
32                BackendMessage::ParseComplete => {}
33                BackendMessage::BindComplete => {}
34                BackendMessage::RowDescription(_) => {}
35                BackendMessage::DataRow(data) => {
36                    // Only collect rows if no error occurred
37                    if error.is_none() {
38                        rows.push(data);
39                    }
40                }
41                BackendMessage::CommandComplete(_) => {}
42                BackendMessage::NoData => {}
43                BackendMessage::ReadyForQuery(_) => {
44                    if let Some(err) = error {
45                        return Err(err);
46                    }
47                    return Ok(rows);
48                }
49                BackendMessage::ErrorResponse(err) => {
50                    if error.is_none() {
51                        error = Some(PgError::Query(err.message));
52                    }
53                }
54                _ => {}
55            }
56        }
57    }
58
59    /// Execute a query with cached prepared statement.
60    /// Like `query()`, but reuses prepared statements across calls.
61    /// The statement name is derived from a hash of the SQL text.
62    /// OPTIMIZED: Pre-allocated buffer + ultra-fast encoders.
63    pub async fn query_cached(
64        &mut self,
65        sql: &str,
66        params: &[Option<Vec<u8>>],
67    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
68        let stmt_name = Self::sql_to_stmt_name(sql);
69        let is_new = !self.prepared_statements.contains_key(&stmt_name);
70
71        // Pre-calculate buffer size for single allocation
72        let params_size: usize = params
73            .iter()
74            .map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
75            .sum();
76        
77        let estimated_size = if is_new {
78            50 + sql.len() + stmt_name.len() * 2 + params_size
79        } else {
80            30 + stmt_name.len() + params_size
81        };
82        
83        let mut buf = BytesMut::with_capacity(estimated_size);
84
85        if is_new {
86            buf.extend(PgEncoder::encode_parse(&stmt_name, sql, &[]));
87            // Cache the SQL for debugging
88            self.prepared_statements.insert(stmt_name.clone(), sql.to_string());
89        }
90
91        // Use ULTRA-OPTIMIZED encoders - write directly to buffer
92        PgEncoder::encode_bind_to(&mut buf, &stmt_name, params)
93            .map_err(|e| PgError::Encode(e.to_string()))?;
94        PgEncoder::encode_execute_to(&mut buf);
95        PgEncoder::encode_sync_to(&mut buf);
96
97        self.stream.write_all(&buf).await?;
98
99        let mut rows = Vec::new();
100
101        let mut error: Option<PgError> = None;
102
103        loop {
104            let msg = self.recv().await?;
105            match msg {
106                BackendMessage::ParseComplete => {
107                    // Already cached in is_new block above
108                }
109                BackendMessage::BindComplete => {}
110                BackendMessage::RowDescription(_) => {}
111                BackendMessage::DataRow(data) => {
112                    if error.is_none() {
113                        rows.push(data);
114                    }
115                }
116                BackendMessage::CommandComplete(_) => {}
117                BackendMessage::NoData => {}
118                BackendMessage::ReadyForQuery(_) => {
119                    if let Some(err) = error {
120                        return Err(err);
121                    }
122                    return Ok(rows);
123                }
124                BackendMessage::ErrorResponse(err) => {
125                    if error.is_none() {
126                        error = Some(PgError::Query(err.message));
127                        // Invalidate cache to prevent "prepared statement does not exist"
128                        // on next retry.
129                        self.prepared_statements.remove(&stmt_name);
130                    }
131                }
132                _ => {}
133            }
134        }
135    }
136
137    /// Generate a statement name from SQL hash.
138    /// Uses a simple hash to create a unique name like "stmt_12345abc".
139    pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
140        use std::collections::hash_map::DefaultHasher;
141        use std::hash::{Hash, Hasher};
142
143        let mut hasher = DefaultHasher::new();
144        sql.hash(&mut hasher);
145        format!("s{:016x}", hasher.finish())
146    }
147
148    /// Execute a simple SQL statement (no parameters).
149    pub(crate) async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
150        let bytes = PgEncoder::encode_query_string(sql);
151        self.stream.write_all(&bytes).await?;
152
153        let mut error: Option<PgError> = None;
154
155        loop {
156            let msg = self.recv().await?;
157            match msg {
158                BackendMessage::CommandComplete(_) => {}
159                BackendMessage::ReadyForQuery(_) => {
160                    if let Some(err) = error {
161                        return Err(err);
162                    }
163                    return Ok(());
164                }
165                BackendMessage::ErrorResponse(err) => {
166                    if error.is_none() {
167                        error = Some(PgError::Query(err.message));
168                    }
169                }
170                _ => {}
171            }
172        }
173    }
174
175    /// ZERO-HASH sequential query using pre-computed PreparedStatement.
176    /// This is the FASTEST sequential path because it skips:
177    /// - SQL generation from AST (done once outside loop)
178    /// - Hash computation for statement name (pre-computed in PreparedStatement)
179    /// - HashMap lookup for is_new check (statement already prepared)
180    /// # Example
181    /// ```ignore
182    /// let stmt = conn.prepare("SELECT * FROM users WHERE id = $1").await?;
183    /// for id in 1..10000 {
184    ///     let rows = conn.query_prepared_single(&stmt, &[Some(id.to_string().into_bytes())]).await?;
185    /// }
186    /// ```
187    #[inline]
188    pub async fn query_prepared_single(
189        &mut self,
190        stmt: &super::PreparedStatement,
191        params: &[Option<Vec<u8>>],
192    ) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
193        // Pre-calculate buffer size for single allocation
194        let params_size: usize = params
195            .iter()
196            .map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
197            .sum();
198        
199        // Bind: ~15 + stmt.name.len() + params_size, Execute: 10, Sync: 5
200        let mut buf = BytesMut::with_capacity(30 + stmt.name.len() + params_size);
201
202        // ZERO HASH, ZERO LOOKUP - just encode and send!
203        PgEncoder::encode_bind_to(&mut buf, &stmt.name, params)
204            .map_err(|e| PgError::Encode(e.to_string()))?;
205        PgEncoder::encode_execute_to(&mut buf);
206        PgEncoder::encode_sync_to(&mut buf);
207
208        self.stream.write_all(&buf).await?;
209
210        let mut rows = Vec::new();
211
212        let mut error: Option<PgError> = None;
213
214        loop {
215            let msg = self.recv().await?;
216            match msg {
217                BackendMessage::BindComplete => {}
218                BackendMessage::RowDescription(_) => {}
219                BackendMessage::DataRow(data) => {
220                    if error.is_none() {
221                        rows.push(data);
222                    }
223                }
224                BackendMessage::CommandComplete(_) => {}
225                BackendMessage::NoData => {}
226                BackendMessage::ReadyForQuery(_) => {
227                    if let Some(err) = error {
228                        return Err(err);
229                    }
230                    return Ok(rows);
231                }
232                BackendMessage::ErrorResponse(err) => {
233                    if error.is_none() {
234                        error = Some(PgError::Query(err.message));
235                    }
236                }
237                _ => {}
238            }
239        }
240    }
241}