cdbc_pg/connection/
executor.rs

1use cdbc::describe::Describe;
2use cdbc::error::Error;
3use cdbc::executor::{Execute, Executor};
4use crate::message::{
5    self, Bind, Close, CommandComplete, DataRow, MessageFormat, ParameterDescription, Parse, Query,
6    RowDescription,
7};
8use crate::statement::PgStatementMetadata;
9use crate::type_info::PgType;
10use crate::{
11    statement::PgStatement, PgArguments, PgConnection, PgQueryResult, PgRow, PgTypeInfo,
12    PgValueFormat, Postgres,
13};
14use either::Either;
15use std::{borrow::Cow, sync::Arc};
16use cdbc::database::{Database, HasStatement};
17use cdbc::io::chan_stream::{ChanStream, TryStream};
18fn prepare(
19    conn: &mut PgConnection,
20    sql: &str,
21    parameters: &[PgTypeInfo],
22    metadata: Option<Arc<PgStatementMetadata>>,
23) -> Result<(u32, Arc<PgStatementMetadata>), Error> {
24    let id = conn.next_statement_id;
25    conn.next_statement_id = conn.next_statement_id.wrapping_add(1);
26
27    // build a list of type OIDs to send to the database in the PARSE command
28    // we have not yet started the query sequence, so we are *safe* to cleanly make
29    // additional queries here to get any missing OIDs
30
31    let mut param_types = Vec::with_capacity(parameters.len());
32
33    for ty in parameters {
34        param_types.push(if let PgType::DeclareWithName(name) = &ty.0 {
35            conn.fetch_type_id_by_name(name)?
36        } else {
37            ty.0.oid()
38        });
39    }
40
41    // flush and wait until we are re-ready
42    conn.wait_until_ready()?;
43
44    // next we send the PARSE command to the server
45    conn.stream.write(Parse {
46        param_types: &*param_types,
47        query: sql,
48        statement: id,
49    });
50
51    if metadata.is_none() {
52        // get the statement columns and parameters
53        conn.stream.write(message::Describe::Statement(id));
54    }
55
56    // we ask for the server to immediately send us the result of the PARSE command
57    conn.write_sync();
58    conn.stream.flush()?;
59
60    // indicates that the SQL query string is now successfully parsed and has semantic validity
61    let _ = conn
62        .stream
63        .recv_expect(MessageFormat::ParseComplete)
64        ?;
65
66    let metadata = if let Some(metadata) = metadata {
67        // each SYNC produces one READY FOR QUERY
68        conn.recv_ready_for_query()?;
69
70        // we already have metadata
71        metadata
72    } else {
73        let parameters = recv_desc_params(conn)?;
74
75        let rows = recv_desc_rows(conn)?;
76
77        // each SYNC produces one READY FOR QUERY
78        conn.recv_ready_for_query()?;
79
80        let parameters = conn.handle_parameter_description(parameters)?;
81
82        let (columns, column_names) = conn.handle_row_description(rows, true)?;
83
84        // ensure that if we did fetch custom data, we wait until we are fully ready before
85        // continuing
86        conn.wait_until_ready()?;
87
88        Arc::new(PgStatementMetadata {
89            parameters,
90            columns,
91            column_names,
92        })
93    };
94
95    Ok((id, metadata))
96}
97
98fn recv_desc_params(conn: &mut PgConnection) -> Result<ParameterDescription, Error> {
99    conn.stream
100        .recv_expect(MessageFormat::ParameterDescription)
101
102}
103
104fn recv_desc_rows(conn: &mut PgConnection) -> Result<Option<RowDescription>, Error> {
105    let rows: Option<RowDescription> = match conn.stream.recv()? {
106        // describes the rows that will be returned when the statement is eventually executed
107        message if message.format == MessageFormat::RowDescription => Some(message.decode()?),
108
109        // no data would be returned if this statement was executed
110        message if message.format == MessageFormat::NoData => None,
111
112        message => {
113            return Err(err_protocol!(
114                "expecting RowDescription or NoData but received {:?}",
115                message.format
116            ));
117        }
118    };
119
120    Ok(rows)
121}
122
123impl PgConnection {
124    // wait for CloseComplete to indicate a statement was closed
125    pub(super) fn wait_for_close_complete(&mut self, mut count: usize) -> Result<(), Error> {
126        // we need to wait for the [CloseComplete] to be returned from the server
127        while count > 0 {
128            match self.stream.recv()? {
129                message if message.format == MessageFormat::PortalSuspended => {
130                    // there was an open portal
131                    // this can happen if the last time a statement was used it was not fully executed
132                    // such as in [fetch_one]
133                }
134
135                message if message.format == MessageFormat::CloseComplete => {
136                    // successfully closed the statement (and freed up the server resources)
137                    count -= 1;
138                }
139
140                message => {
141                    return Err(err_protocol!(
142                        "expecting PortalSuspended or CloseComplete but received {:?}",
143                        message.format
144                    ));
145                }
146            }
147        }
148
149        Ok(())
150    }
151
152    pub(crate) fn write_sync(&mut self) {
153        self.stream.write(message::Sync);
154
155        // all SYNC messages will return a ReadyForQuery
156        self.pending_ready_for_query_count += 1;
157    }
158
159    fn get_or_prepare<'a>(
160        &mut self,
161        sql: &str,
162        parameters: &[PgTypeInfo],
163        // should we store the result of this prepare to the cache
164        store_to_cache: bool,
165        // optional metadata that was provided by the user, this means they are reusing
166        // a statement object
167        metadata: Option<Arc<PgStatementMetadata>>,
168    ) -> Result<(u32, Arc<PgStatementMetadata>), Error> {
169        if let Some(statement) = self.cache_statement.get_mut(sql) {
170            return Ok((*statement).clone());
171        }
172
173        let statement = prepare(self, sql, parameters, metadata)?;
174
175        if store_to_cache && self.cache_statement.is_enabled() {
176            if let Some((id, _)) = self.cache_statement.insert(sql, statement.clone()) {
177                self.stream.write(Close::Statement(id));
178                self.write_sync();
179
180                self.stream.flush()?;
181
182                self.wait_for_close_complete(1)?;
183                self.recv_ready_for_query()?;
184            }
185        }
186
187        Ok(statement)
188    }
189
190    fn run<'e, 'c: 'e, 'q: 'e>(
191        &'c mut self,
192        query: &'q str,
193        arguments: Option<PgArguments>,
194        limit: u8,
195        persistent: bool,
196        metadata_opt: Option<Arc<PgStatementMetadata>>,
197    ) -> Result<ChanStream<Either<PgQueryResult, PgRow>>, Error> {
198
199        // before we continue, wait until we are "ready" to accept more queries
200        self.wait_until_ready()?;
201
202        let mut metadata: Arc<PgStatementMetadata>;
203
204        let format = if let Some(mut arguments) = arguments {
205            // prepare the statement if this our first time executing it
206            // always return the statement ID here
207            let (statement, metadata_) = self
208                .get_or_prepare(query, &arguments.types, persistent, metadata_opt)
209                ?;
210
211            metadata = metadata_;
212
213            // patch holes created during encoding
214            arguments.apply_patches(self, &metadata.parameters)?;
215
216            // apply patches use fetch_optional thaht may produce `PortalSuspended` message,
217            // consume messages til `ReadyForQuery` before bind and execute
218            self.wait_until_ready()?;
219
220            // bind to attach the arguments to the statement and create a portal
221            self.stream.write(Bind {
222                portal: None,
223                statement,
224                formats: &[PgValueFormat::Binary],
225                num_params: arguments.types.len() as i16,
226                params: &*arguments.buffer,
227                result_formats: &[PgValueFormat::Binary],
228            });
229
230            // executes the portal up to the passed limit
231            // the protocol-level limit acts nearly identically to the `LIMIT` in SQL
232            self.stream.write(message::Execute {
233                portal: None,
234                limit: limit.into(),
235            });
236
237            // finally, [Sync] asks postgres to process the messages that we sent and respond with
238            // a [ReadyForQuery] message when it's completely done. Theoretically, we could send
239            // dozens of queries before a [Sync] and postgres can handle that. Execution on the server
240            // is still serial but it would reduce round-trips. Some kind of builder pattern that is
241            // termed batching might suit this.
242            self.write_sync();
243
244            // prepared statements are binary
245            PgValueFormat::Binary
246        } else {
247            // Query will trigger a ReadyForQuery
248            self.stream.write(Query(query));
249            self.pending_ready_for_query_count += 1;
250
251            // metadata starts out as "nothing"
252            metadata = Arc::new(PgStatementMetadata::default());
253
254            // and unprepared statements are text
255            PgValueFormat::Text
256        };
257
258        self.stream.flush()?;
259
260        Ok(chan_stream!{
261            loop {
262                let message = self.stream.recv()?;
263
264                match message.format {
265                    MessageFormat::BindComplete
266                    | MessageFormat::ParseComplete
267                    | MessageFormat::ParameterDescription
268                    | MessageFormat::NoData => {
269                        // harmless messages to ignore
270                    }
271
272                    MessageFormat::CommandComplete => {
273                        // a SQL command completed normally
274                        let cc: CommandComplete = message.decode()?;
275
276                        r#yield!(Either::Left(PgQueryResult {
277                            rows_affected: cc.rows_affected(),
278                        }));
279                    }
280
281                    MessageFormat::EmptyQueryResponse => {
282                        // empty query string passed to an unprepared execute
283                    }
284
285                    MessageFormat::RowDescription => {
286                        // indicates that a *new* set of rows are about to be returned
287                        let (columns, column_names) = self
288                            .handle_row_description(Some(message.decode()?), false)
289                            ?;
290
291                        metadata = Arc::new(PgStatementMetadata {
292                            column_names,
293                            columns,
294                            parameters: Vec::default(),
295                        });
296                    }
297
298                    MessageFormat::DataRow => {
299                        // one of the set of rows returned by a SELECT, FETCH, etc query
300                        let data: DataRow = message.decode()?;
301                        let row = PgRow {
302                            data,
303                            format,
304                            metadata: Arc::clone(&metadata),
305                        };
306
307                        r#yield!(Either::Right(row));
308                    }
309
310                    MessageFormat::ReadyForQuery => {
311                        // processing of the query string is complete
312                        self.handle_ready_for_query(message)?;
313                        break;
314                    }
315
316                    _ => {
317                        return Err(err_protocol!(
318                            "execute: unexpected message: {:?}",
319                            message.format
320                        ));
321                    }
322                }
323            }
324
325            Ok(())
326        })
327    }
328}
329
330impl Executor for PgConnection {
331    type Database = Postgres;
332
333    fn fetch_many< 'q, E: 'q>(
334        &mut self,
335        mut query: E,
336    ) -> ChanStream<Either<PgQueryResult, PgRow>>
337    where
338        E: Execute<'q, Self::Database>,
339    {
340        let metadata = query.statement().map(|s| Arc::clone(&s.metadata));
341        let arguments = query.take_arguments();
342        let persistent = query.persistent();
343
344        chan_stream!({
345            let mut s = self.run(query.sql(), arguments, 0, persistent, metadata)?;
346            while let Some(v) = s.try_next()? {
347                r#yield!(v);
348            }
349            Ok(())
350        })
351    }
352
353    fn fetch_optional<'q, E: 'q>(
354        &mut self,
355        mut query: E,
356    ) ->  Result<Option<PgRow>, Error>
357    where E: Execute<'q, Self::Database>,
358    {
359        let metadata = query.statement().map(|s| Arc::clone(&s.metadata));
360        let arguments = query.take_arguments();
361        let persistent = query.persistent();
362            let mut s = self.run(query.sql(), arguments, 1, persistent, metadata)?;
363            while let Some(s) = s.try_next()? {
364                if let Either::Right(r) = s {
365                    return Ok(Some(r));
366                }
367            }
368
369            Ok(None)
370    }
371
372    fn prepare_with< 'q>(
373        &mut self,
374        sql: &'q str,
375        parameters: &'q [PgTypeInfo],
376    ) ->  Result<PgStatement, Error>
377    where
378    {
379            self.wait_until_ready()?;
380
381            let (_, metadata) = self.get_or_prepare(sql, parameters, true, None)?;
382
383            Ok(PgStatement {
384                sql: sql.to_string(),
385                metadata,
386            })
387    }
388
389    fn describe< 'q>(
390        &mut self,
391        sql: &'q str,
392    ) ->  Result<Describe<Self::Database>, Error>
393    where
394    {
395            self.wait_until_ready()?;
396
397            let (stmt_id, metadata) = self.get_or_prepare(sql, &[], true, None)?;
398
399            let nullable = self.get_nullable_for_columns(stmt_id, &metadata)?;
400
401            Ok(Describe {
402                columns: metadata.columns.clone(),
403                nullable,
404                parameters: Some(Either::Left(metadata.parameters.clone())),
405            })
406    }
407}
408
409impl Executor for &mut PgConnection{
410    type Database = Postgres;
411
412    fn fetch_many<'q, E: 'q>(&mut self, query: E) -> ChanStream<Either<<Self::Database as Database>::QueryResult, <Self::Database as Database>::Row>> where E: Execute<'q, Self::Database> {
413        PgConnection::fetch_many(self,query)
414    }
415
416    fn fetch_optional<'q, E: 'q>(&mut self, query: E) -> Result<Option<<Self::Database as Database>::Row>, Error> where E: Execute<'q, Self::Database> {
417        PgConnection::fetch_optional(self,query)
418    }
419
420    fn prepare_with<'q>(&mut self, sql: &'q str, parameters: &'q [<Self::Database as Database>::TypeInfo]) -> Result<<Self::Database as HasStatement>::Statement, Error> {
421        PgConnection::prepare_with(self,sql,parameters)
422    }
423
424    fn describe(&mut self, sql: &str) -> Result<Describe<Self::Database>, Error> {
425        PgConnection::describe(self,sql)
426    }
427}