1use cdbc::describe::Describe;
2use cdbc::error::Error;
3use cdbc::executor::{Execute, Executor};
4use crate::message::{
5 self, Bind, Close, CommandComplete, DataRow, MessageFormat, ParameterDescription, Parse, Query,
6 RowDescription,
7};
8use crate::statement::PgStatementMetadata;
9use crate::type_info::PgType;
10use crate::{
11 statement::PgStatement, PgArguments, PgConnection, PgQueryResult, PgRow, PgTypeInfo,
12 PgValueFormat, Postgres,
13};
14use either::Either;
15use std::{borrow::Cow, sync::Arc};
16use cdbc::database::{Database, HasStatement};
17use cdbc::io::chan_stream::{ChanStream, TryStream};
18fn prepare(
19 conn: &mut PgConnection,
20 sql: &str,
21 parameters: &[PgTypeInfo],
22 metadata: Option<Arc<PgStatementMetadata>>,
23) -> Result<(u32, Arc<PgStatementMetadata>), Error> {
24 let id = conn.next_statement_id;
25 conn.next_statement_id = conn.next_statement_id.wrapping_add(1);
26
27 let mut param_types = Vec::with_capacity(parameters.len());
32
33 for ty in parameters {
34 param_types.push(if let PgType::DeclareWithName(name) = &ty.0 {
35 conn.fetch_type_id_by_name(name)?
36 } else {
37 ty.0.oid()
38 });
39 }
40
41 conn.wait_until_ready()?;
43
44 conn.stream.write(Parse {
46 param_types: &*param_types,
47 query: sql,
48 statement: id,
49 });
50
51 if metadata.is_none() {
52 conn.stream.write(message::Describe::Statement(id));
54 }
55
56 conn.write_sync();
58 conn.stream.flush()?;
59
60 let _ = conn
62 .stream
63 .recv_expect(MessageFormat::ParseComplete)
64 ?;
65
66 let metadata = if let Some(metadata) = metadata {
67 conn.recv_ready_for_query()?;
69
70 metadata
72 } else {
73 let parameters = recv_desc_params(conn)?;
74
75 let rows = recv_desc_rows(conn)?;
76
77 conn.recv_ready_for_query()?;
79
80 let parameters = conn.handle_parameter_description(parameters)?;
81
82 let (columns, column_names) = conn.handle_row_description(rows, true)?;
83
84 conn.wait_until_ready()?;
87
88 Arc::new(PgStatementMetadata {
89 parameters,
90 columns,
91 column_names,
92 })
93 };
94
95 Ok((id, metadata))
96}
97
98fn recv_desc_params(conn: &mut PgConnection) -> Result<ParameterDescription, Error> {
99 conn.stream
100 .recv_expect(MessageFormat::ParameterDescription)
101
102}
103
104fn recv_desc_rows(conn: &mut PgConnection) -> Result<Option<RowDescription>, Error> {
105 let rows: Option<RowDescription> = match conn.stream.recv()? {
106 message if message.format == MessageFormat::RowDescription => Some(message.decode()?),
108
109 message if message.format == MessageFormat::NoData => None,
111
112 message => {
113 return Err(err_protocol!(
114 "expecting RowDescription or NoData but received {:?}",
115 message.format
116 ));
117 }
118 };
119
120 Ok(rows)
121}
122
123impl PgConnection {
124 pub(super) fn wait_for_close_complete(&mut self, mut count: usize) -> Result<(), Error> {
126 while count > 0 {
128 match self.stream.recv()? {
129 message if message.format == MessageFormat::PortalSuspended => {
130 }
134
135 message if message.format == MessageFormat::CloseComplete => {
136 count -= 1;
138 }
139
140 message => {
141 return Err(err_protocol!(
142 "expecting PortalSuspended or CloseComplete but received {:?}",
143 message.format
144 ));
145 }
146 }
147 }
148
149 Ok(())
150 }
151
152 pub(crate) fn write_sync(&mut self) {
153 self.stream.write(message::Sync);
154
155 self.pending_ready_for_query_count += 1;
157 }
158
159 fn get_or_prepare<'a>(
160 &mut self,
161 sql: &str,
162 parameters: &[PgTypeInfo],
163 store_to_cache: bool,
165 metadata: Option<Arc<PgStatementMetadata>>,
168 ) -> Result<(u32, Arc<PgStatementMetadata>), Error> {
169 if let Some(statement) = self.cache_statement.get_mut(sql) {
170 return Ok((*statement).clone());
171 }
172
173 let statement = prepare(self, sql, parameters, metadata)?;
174
175 if store_to_cache && self.cache_statement.is_enabled() {
176 if let Some((id, _)) = self.cache_statement.insert(sql, statement.clone()) {
177 self.stream.write(Close::Statement(id));
178 self.write_sync();
179
180 self.stream.flush()?;
181
182 self.wait_for_close_complete(1)?;
183 self.recv_ready_for_query()?;
184 }
185 }
186
187 Ok(statement)
188 }
189
190 fn run<'e, 'c: 'e, 'q: 'e>(
191 &'c mut self,
192 query: &'q str,
193 arguments: Option<PgArguments>,
194 limit: u8,
195 persistent: bool,
196 metadata_opt: Option<Arc<PgStatementMetadata>>,
197 ) -> Result<ChanStream<Either<PgQueryResult, PgRow>>, Error> {
198
199 self.wait_until_ready()?;
201
202 let mut metadata: Arc<PgStatementMetadata>;
203
204 let format = if let Some(mut arguments) = arguments {
205 let (statement, metadata_) = self
208 .get_or_prepare(query, &arguments.types, persistent, metadata_opt)
209 ?;
210
211 metadata = metadata_;
212
213 arguments.apply_patches(self, &metadata.parameters)?;
215
216 self.wait_until_ready()?;
219
220 self.stream.write(Bind {
222 portal: None,
223 statement,
224 formats: &[PgValueFormat::Binary],
225 num_params: arguments.types.len() as i16,
226 params: &*arguments.buffer,
227 result_formats: &[PgValueFormat::Binary],
228 });
229
230 self.stream.write(message::Execute {
233 portal: None,
234 limit: limit.into(),
235 });
236
237 self.write_sync();
243
244 PgValueFormat::Binary
246 } else {
247 self.stream.write(Query(query));
249 self.pending_ready_for_query_count += 1;
250
251 metadata = Arc::new(PgStatementMetadata::default());
253
254 PgValueFormat::Text
256 };
257
258 self.stream.flush()?;
259
260 Ok(chan_stream!{
261 loop {
262 let message = self.stream.recv()?;
263
264 match message.format {
265 MessageFormat::BindComplete
266 | MessageFormat::ParseComplete
267 | MessageFormat::ParameterDescription
268 | MessageFormat::NoData => {
269 }
271
272 MessageFormat::CommandComplete => {
273 let cc: CommandComplete = message.decode()?;
275
276 r#yield!(Either::Left(PgQueryResult {
277 rows_affected: cc.rows_affected(),
278 }));
279 }
280
281 MessageFormat::EmptyQueryResponse => {
282 }
284
285 MessageFormat::RowDescription => {
286 let (columns, column_names) = self
288 .handle_row_description(Some(message.decode()?), false)
289 ?;
290
291 metadata = Arc::new(PgStatementMetadata {
292 column_names,
293 columns,
294 parameters: Vec::default(),
295 });
296 }
297
298 MessageFormat::DataRow => {
299 let data: DataRow = message.decode()?;
301 let row = PgRow {
302 data,
303 format,
304 metadata: Arc::clone(&metadata),
305 };
306
307 r#yield!(Either::Right(row));
308 }
309
310 MessageFormat::ReadyForQuery => {
311 self.handle_ready_for_query(message)?;
313 break;
314 }
315
316 _ => {
317 return Err(err_protocol!(
318 "execute: unexpected message: {:?}",
319 message.format
320 ));
321 }
322 }
323 }
324
325 Ok(())
326 })
327 }
328}
329
330impl Executor for PgConnection {
331 type Database = Postgres;
332
333 fn fetch_many< 'q, E: 'q>(
334 &mut self,
335 mut query: E,
336 ) -> ChanStream<Either<PgQueryResult, PgRow>>
337 where
338 E: Execute<'q, Self::Database>,
339 {
340 let metadata = query.statement().map(|s| Arc::clone(&s.metadata));
341 let arguments = query.take_arguments();
342 let persistent = query.persistent();
343
344 chan_stream!({
345 let mut s = self.run(query.sql(), arguments, 0, persistent, metadata)?;
346 while let Some(v) = s.try_next()? {
347 r#yield!(v);
348 }
349 Ok(())
350 })
351 }
352
353 fn fetch_optional<'q, E: 'q>(
354 &mut self,
355 mut query: E,
356 ) -> Result<Option<PgRow>, Error>
357 where E: Execute<'q, Self::Database>,
358 {
359 let metadata = query.statement().map(|s| Arc::clone(&s.metadata));
360 let arguments = query.take_arguments();
361 let persistent = query.persistent();
362 let mut s = self.run(query.sql(), arguments, 1, persistent, metadata)?;
363 while let Some(s) = s.try_next()? {
364 if let Either::Right(r) = s {
365 return Ok(Some(r));
366 }
367 }
368
369 Ok(None)
370 }
371
372 fn prepare_with< 'q>(
373 &mut self,
374 sql: &'q str,
375 parameters: &'q [PgTypeInfo],
376 ) -> Result<PgStatement, Error>
377 where
378 {
379 self.wait_until_ready()?;
380
381 let (_, metadata) = self.get_or_prepare(sql, parameters, true, None)?;
382
383 Ok(PgStatement {
384 sql: sql.to_string(),
385 metadata,
386 })
387 }
388
389 fn describe< 'q>(
390 &mut self,
391 sql: &'q str,
392 ) -> Result<Describe<Self::Database>, Error>
393 where
394 {
395 self.wait_until_ready()?;
396
397 let (stmt_id, metadata) = self.get_or_prepare(sql, &[], true, None)?;
398
399 let nullable = self.get_nullable_for_columns(stmt_id, &metadata)?;
400
401 Ok(Describe {
402 columns: metadata.columns.clone(),
403 nullable,
404 parameters: Some(Either::Left(metadata.parameters.clone())),
405 })
406 }
407}
408
409impl Executor for &mut PgConnection{
410 type Database = Postgres;
411
412 fn fetch_many<'q, E: 'q>(&mut self, query: E) -> ChanStream<Either<<Self::Database as Database>::QueryResult, <Self::Database as Database>::Row>> where E: Execute<'q, Self::Database> {
413 PgConnection::fetch_many(self,query)
414 }
415
416 fn fetch_optional<'q, E: 'q>(&mut self, query: E) -> Result<Option<<Self::Database as Database>::Row>, Error> where E: Execute<'q, Self::Database> {
417 PgConnection::fetch_optional(self,query)
418 }
419
420 fn prepare_with<'q>(&mut self, sql: &'q str, parameters: &'q [<Self::Database as Database>::TypeInfo]) -> Result<<Self::Database as HasStatement>::Statement, Error> {
421 PgConnection::prepare_with(self,sql,parameters)
422 }
423
424 fn describe(&mut self, sql: &str) -> Result<Describe<Self::Database>, Error> {
425 PgConnection::describe(self,sql)
426 }
427}