zero_postgres/sync/pipeline/
mod.rs1use std::collections::VecDeque;
33
34use crate::pipeline::Expectation;
35use crate::pipeline::Ticket;
36
37use crate::conversion::{FromRow, ToParams};
38use crate::error::{Error, Result};
39use crate::handler::BinaryHandler;
40use crate::protocol::backend::{
41 BindComplete, CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, NoData,
42 ParseComplete, RawMessage, ReadyForQuery, RowDescription, msg_type,
43};
44use crate::protocol::frontend::{
45 write_bind, write_describe_portal, write_execute, write_flush, write_parse, write_sync,
46};
47use crate::state::extended::PreparedStatement;
48use crate::statement::IntoStatement;
49
50use super::conn::Conn;
51
52pub struct Pipeline<'a> {
56 conn: &'a mut Conn,
57 queue_seq: usize,
59 claim_seq: usize,
61 aborted: bool,
63 column_buffer: Vec<u8>,
65 expectations: VecDeque<Expectation>,
67}
68
69impl<'a> Pipeline<'a> {
70 #[cfg(feature = "lowlevel")]
75 pub fn new(conn: &'a mut Conn) -> Self {
76 Self::new_inner(conn)
77 }
78
79 pub(crate) fn new_inner(conn: &'a mut Conn) -> Self {
81 conn.buffer_set.write_buffer.clear();
82 Self {
83 conn,
84 queue_seq: 0,
85 claim_seq: 0,
86 aborted: false,
87 column_buffer: Vec::new(),
88 expectations: VecDeque::new(),
89 }
90 }
91
92 #[cfg(feature = "lowlevel")]
97 pub fn cleanup(&mut self) {
98 self.cleanup_inner();
99 }
100
101 #[cfg(not(feature = "lowlevel"))]
102 pub(crate) fn cleanup(&mut self) {
103 self.cleanup_inner();
104 }
105
106 fn cleanup_inner(&mut self) {
107 if self.queue_seq == 0 && self.expectations.is_empty() {
109 return;
110 }
111
112 if !self.conn.buffer_set.write_buffer.is_empty() {
114 let _ = self.sync();
115 } else if !self.expectations.iter().any(|e| *e == Expectation::Sync) {
116 let _ = self.sync();
118 }
119
120 if self.aborted {
122 while let Some(expectation) = self.expectations.pop_front() {
124 if expectation == Expectation::Sync {
125 let _ = self.consume_ready_for_query();
126 }
127 }
128 } else {
129 while let Some(expectation) = self.expectations.pop_front() {
131 let _ = self.drain_expectation(expectation);
132 }
133 }
134
135 self.queue_seq = 0;
137 self.claim_seq = 0;
138 self.aborted = false;
139 }
140
141 fn drain_expectation(&mut self, expectation: Expectation) {
143 let mut handler = crate::handler::DropHandler::new();
144 let _ = match expectation {
145 Expectation::ParseBindExecute => self.claim_parse_bind_exec_inner(&mut handler),
146 Expectation::BindExecute => self.claim_bind_exec_inner(&mut handler, None),
147 Expectation::Sync => self.consume_ready_for_query(),
148 };
149 }
150
151 pub fn exec<'s, P: ToParams>(
180 &mut self,
181 statement: &'s (impl IntoStatement + ?Sized),
182 params: P,
183 ) -> Result<Ticket<'s>> {
184 let seq = self.queue_seq;
185 self.queue_seq += 1;
186
187 if statement.needs_parse() {
188 self.exec_sql_inner(statement.as_sql().unwrap(), ¶ms)?;
189 Ok(Ticket { seq, stmt: None })
190 } else {
191 let stmt = statement.as_prepared().unwrap();
192 self.exec_prepared_inner(&stmt.wire_name(), &stmt.param_oids, ¶ms)?;
193 Ok(Ticket {
194 seq,
195 stmt: Some(stmt),
196 })
197 }
198 }
199
200 fn exec_sql_inner<P: ToParams>(&mut self, sql: &str, params: &P) -> Result<()> {
201 let param_oids = params.natural_oids();
202 let buf = &mut self.conn.buffer_set.write_buffer;
203 write_parse(buf, "", sql, ¶m_oids);
204 write_bind(buf, "", "", params, ¶m_oids)?;
205 write_describe_portal(buf, "");
206 write_execute(buf, "", 0);
207 self.expectations.push_back(Expectation::ParseBindExecute);
208 Ok(())
209 }
210
211 fn exec_prepared_inner<P: ToParams>(
212 &mut self,
213 stmt_name: &str,
214 param_oids: &[u32],
215 params: &P,
216 ) -> Result<()> {
217 let buf = &mut self.conn.buffer_set.write_buffer;
218 write_bind(buf, "", stmt_name, params, param_oids)?;
219 write_execute(buf, "", 0);
221 self.expectations.push_back(Expectation::BindExecute);
222 Ok(())
223 }
224
225 pub fn flush(&mut self) -> Result<()> {
230 if !self.conn.buffer_set.write_buffer.is_empty() {
231 write_flush(&mut self.conn.buffer_set.write_buffer);
232 self.conn
233 .stream
234 .write_all(&self.conn.buffer_set.write_buffer)?;
235 self.conn.stream.flush()?;
236 self.conn.buffer_set.write_buffer.clear();
237 }
238 Ok(())
239 }
240
241 pub fn sync(&mut self) -> Result<()> {
246 let result = self.sync_inner();
247 if let Err(e) = &result
248 && e.is_connection_broken()
249 {
250 self.conn.is_broken = true;
251 }
252 result
253 }
254
255 fn sync_inner(&mut self) -> Result<()> {
256 write_sync(&mut self.conn.buffer_set.write_buffer);
257 self.expectations.push_back(Expectation::Sync);
258 self.conn
259 .stream
260 .write_all(&self.conn.buffer_set.write_buffer)?;
261 self.conn.stream.flush()?;
262 self.conn.buffer_set.write_buffer.clear();
263 Ok(())
264 }
265
266 fn consume_ready_for_query(&mut self) -> Result<()> {
268 loop {
269 self.conn.stream.read_message(&mut self.conn.buffer_set)?;
270 let type_byte = self.conn.buffer_set.type_byte;
271
272 if RawMessage::is_async_type(type_byte) {
273 continue;
274 }
275
276 if type_byte == msg_type::ERROR_RESPONSE {
277 let error = ErrorResponse::parse(&self.conn.buffer_set.read_buffer)?;
278 return Err(error.into_error());
279 }
280
281 if type_byte == msg_type::READY_FOR_QUERY {
282 let ready = ReadyForQuery::parse(&self.conn.buffer_set.read_buffer)?;
283 self.conn.transaction_status = ready.transaction_status().unwrap_or_default();
284 return Ok(());
285 }
286 }
287 }
288
289 fn consume_pending_syncs(&mut self) -> Result<()> {
291 while self.expectations.front() == Some(&Expectation::Sync) {
292 self.expectations.pop_front();
293 self.consume_ready_for_query()?;
294 self.aborted = false;
296 }
297 Ok(())
298 }
299
300 #[cfg(feature = "lowlevel")]
308 pub fn claim<H: BinaryHandler>(&mut self, ticket: Ticket<'_>, handler: &mut H) -> Result<()> {
309 self.claim_with_handler(ticket, handler)
310 }
311
312 fn claim_with_handler<H: BinaryHandler>(
313 &mut self,
314 ticket: Ticket<'_>,
315 handler: &mut H,
316 ) -> Result<()> {
317 self.check_sequence(ticket.seq)?;
318
319 if !self.conn.buffer_set.write_buffer.is_empty() {
321 self.sync()?;
322 }
323
324 if self.aborted {
325 self.claim_seq += 1;
326 self.expectations.pop_front();
328 self.consume_pending_syncs()?;
329 return Err(Error::Protocol(
330 "pipeline aborted due to earlier error".into(),
331 ));
332 }
333
334 let expectation = self.expectations.pop_front();
335
336 let result = match expectation {
337 Some(Expectation::ParseBindExecute) => self.claim_parse_bind_exec_inner(handler),
338 Some(Expectation::BindExecute) => self.claim_bind_exec_inner(handler, ticket.stmt),
339 Some(Expectation::Sync) => Err(Error::Protocol("unexpected Sync expectation".into())),
340 None => Err(Error::Protocol("no expectation in queue".into())),
341 };
342
343 if let Err(e) = &result {
344 if e.is_connection_broken() {
345 self.conn.is_broken = true;
346 }
347 self.aborted = true;
348 }
349 self.claim_seq += 1;
350 self.consume_pending_syncs()?;
351 result
352 }
353
354 pub fn claim_collect<T: for<'b> FromRow<'b>>(&mut self, ticket: Ticket<'_>) -> Result<Vec<T>> {
358 let mut handler = crate::handler::CollectHandler::<T>::new();
359 self.claim_with_handler(ticket, &mut handler)?;
360 Ok(handler.into_rows())
361 }
362
363 pub fn claim_one<T: for<'b> FromRow<'b>>(&mut self, ticket: Ticket<'_>) -> Result<Option<T>> {
367 let mut handler = crate::handler::FirstRowHandler::<T>::new();
368 self.claim_with_handler(ticket, &mut handler)?;
369 Ok(handler.into_row())
370 }
371
372 pub fn claim_drop(&mut self, ticket: Ticket<'_>) -> Result<()> {
376 let mut handler = crate::handler::DropHandler::new();
377 self.claim_with_handler(ticket, &mut handler)
378 }
379
380 fn check_sequence(&self, seq: usize) -> Result<()> {
382 if seq != self.claim_seq {
383 return Err(Error::InvalidUsage(format!(
384 "claim out of order: expected seq {}, got {}",
385 self.claim_seq, seq
386 )));
387 }
388 Ok(())
389 }
390
391 fn claim_parse_bind_exec_inner<H: BinaryHandler>(&mut self, handler: &mut H) -> Result<()> {
393 self.read_next_message()?;
395 if self.conn.buffer_set.type_byte != msg_type::PARSE_COMPLETE {
396 return self.unexpected_message("ParseComplete");
397 }
398 ParseComplete::parse(&self.conn.buffer_set.read_buffer)?;
399
400 self.read_next_message()?;
402 if self.conn.buffer_set.type_byte != msg_type::BIND_COMPLETE {
403 return self.unexpected_message("BindComplete");
404 }
405 BindComplete::parse(&self.conn.buffer_set.read_buffer)?;
406
407 self.claim_rows_inner(handler)
409 }
410
411 fn claim_bind_exec_inner<H: BinaryHandler>(
413 &mut self,
414 handler: &mut H,
415 stmt: Option<&PreparedStatement>,
416 ) -> Result<()> {
417 self.read_next_message()?;
419 if self.conn.buffer_set.type_byte != msg_type::BIND_COMPLETE {
420 return self.unexpected_message("BindComplete");
421 }
422 BindComplete::parse(&self.conn.buffer_set.read_buffer)?;
423
424 let row_desc = stmt.and_then(|s| s.row_desc_payload());
426
427 self.claim_rows_cached_inner(handler, row_desc)
429 }
430
431 fn claim_rows_inner<H: BinaryHandler>(&mut self, handler: &mut H) -> Result<()> {
433 self.read_next_message()?;
435 let has_rows = match self.conn.buffer_set.type_byte {
436 msg_type::ROW_DESCRIPTION => {
437 self.column_buffer.clear();
438 self.column_buffer
439 .extend_from_slice(&self.conn.buffer_set.read_buffer);
440 true
441 }
442 msg_type::NO_DATA => {
443 NoData::parse(&self.conn.buffer_set.read_buffer)?;
444 false
446 }
447 _ => {
448 return Err(Error::Protocol(format!(
449 "expected RowDescription or NoData, got '{}'",
450 self.conn.buffer_set.type_byte as char
451 )));
452 }
453 };
454
455 loop {
457 self.read_next_message()?;
458 let type_byte = self.conn.buffer_set.type_byte;
459
460 match type_byte {
461 msg_type::DATA_ROW => {
462 if !has_rows {
463 return Err(Error::Protocol(
464 "received DataRow but no RowDescription".into(),
465 ));
466 }
467 let cols = RowDescription::parse(&self.column_buffer)?;
468 let row = DataRow::parse(&self.conn.buffer_set.read_buffer)?;
469 handler.row(cols, row)?;
470 }
471 msg_type::COMMAND_COMPLETE => {
472 let cmd = CommandComplete::parse(&self.conn.buffer_set.read_buffer)?;
473 handler.result_end(cmd)?;
474 return Ok(());
475 }
476 msg_type::EMPTY_QUERY_RESPONSE => {
477 EmptyQueryResponse::parse(&self.conn.buffer_set.read_buffer)?;
478 return Ok(());
479 }
480 _ => {
481 return Err(Error::Protocol(format!(
482 "unexpected message type in pipeline claim: '{}'",
483 type_byte as char
484 )));
485 }
486 }
487 }
488 }
489
490 fn claim_rows_cached_inner<H: BinaryHandler>(
492 &mut self,
493 handler: &mut H,
494 row_desc: Option<&[u8]>,
495 ) -> Result<()> {
496 loop {
498 self.read_next_message()?;
499 let type_byte = self.conn.buffer_set.type_byte;
500
501 match type_byte {
502 msg_type::DATA_ROW => {
503 let row_desc = row_desc.ok_or_else(|| {
504 Error::Protocol("received DataRow but no RowDescription cached".into())
505 })?;
506 let cols = RowDescription::parse(row_desc)?;
507 let row = DataRow::parse(&self.conn.buffer_set.read_buffer)?;
508 handler.row(cols, row)?;
509 }
510 msg_type::COMMAND_COMPLETE => {
511 let cmd = CommandComplete::parse(&self.conn.buffer_set.read_buffer)?;
512 handler.result_end(cmd)?;
513 return Ok(());
514 }
515 msg_type::EMPTY_QUERY_RESPONSE => {
516 EmptyQueryResponse::parse(&self.conn.buffer_set.read_buffer)?;
517 return Ok(());
518 }
519 _ => {
520 return Err(Error::Protocol(format!(
521 "unexpected message type in pipeline claim: '{}'",
522 type_byte as char
523 )));
524 }
525 }
526 }
527 }
528
529 fn read_next_message(&mut self) -> Result<()> {
531 loop {
532 self.conn.stream.read_message(&mut self.conn.buffer_set)?;
533 let type_byte = self.conn.buffer_set.type_byte;
534
535 if RawMessage::is_async_type(type_byte) {
537 continue;
538 }
539
540 if type_byte == msg_type::ERROR_RESPONSE {
542 let error = ErrorResponse::parse(&self.conn.buffer_set.read_buffer)?;
543 return Err(error.into_error());
544 }
545
546 return Ok(());
547 }
548 }
549
550 fn unexpected_message<T>(&self, expected: &str) -> Result<T> {
552 Err(Error::Protocol(format!(
553 "expected {}, got '{}'",
554 expected, self.conn.buffer_set.type_byte as char
555 )))
556 }
557
558 pub fn pending_count(&self) -> usize {
560 self.queue_seq - self.claim_seq
561 }
562
563 pub fn is_aborted(&self) -> bool {
565 self.aborted
566 }
567}