zero_postgres/sync/pipeline/
mod.rs1use std::collections::VecDeque;
33
34use crate::pipeline::Expectation;
35use crate::pipeline::Ticket;
36
37use crate::conversion::{FromRow, ToParams};
38use crate::error::{Error, Result};
39use crate::handler::ExtendedHandler;
40use crate::protocol::backend::{
41 BindComplete, CommandComplete, DataRow, EmptyQueryResponse, ErrorResponse, NoData,
42 ParseComplete, RawMessage, ReadyForQuery, RowDescription, msg_type,
43};
44use crate::protocol::frontend::{
45 write_bind, write_describe_portal, write_execute, write_flush, write_parse, write_sync,
46};
47use crate::state::extended::PreparedStatement;
48use crate::statement::IntoStatement;
49
50use super::conn::Conn;
51
52pub struct Pipeline<'a> {
56 conn: &'a mut Conn,
57 queue_seq: usize,
59 claim_seq: usize,
61 aborted: bool,
63 expectations: VecDeque<Expectation>,
65}
66
67impl<'a> Pipeline<'a> {
68 #[cfg(feature = "lowlevel")]
73 pub fn new(conn: &'a mut Conn) -> Self {
74 Self::new_inner(conn)
75 }
76
77 pub(crate) fn new_inner(conn: &'a mut Conn) -> Self {
79 conn.buffer_set.write_buffer.clear();
80 Self {
81 conn,
82 queue_seq: 0,
83 claim_seq: 0,
84 aborted: false,
85 expectations: VecDeque::new(),
86 }
87 }
88
89 #[cfg(feature = "lowlevel")]
94 pub fn cleanup(&mut self) {
95 self.cleanup_inner();
96 }
97
98 #[cfg(not(feature = "lowlevel"))]
99 pub(crate) fn cleanup(&mut self) {
100 self.cleanup_inner();
101 }
102
103 fn cleanup_inner(&mut self) {
104 if self.queue_seq == 0 && self.expectations.is_empty() {
106 return;
107 }
108
109 if !self.conn.buffer_set.write_buffer.is_empty() {
111 let _ = self.sync();
112 } else if !self.expectations.iter().any(|e| *e == Expectation::Sync) {
113 let _ = self.sync();
115 }
116
117 if self.aborted {
119 while let Some(expectation) = self.expectations.pop_front() {
121 if expectation == Expectation::Sync {
122 let _ = self.consume_ready_for_query();
123 }
124 }
125 } else {
126 while let Some(expectation) = self.expectations.pop_front() {
128 let _ = self.drain_expectation(expectation);
129 }
130 }
131
132 self.queue_seq = 0;
134 self.claim_seq = 0;
135 self.aborted = false;
136 }
137
138 fn drain_expectation(&mut self, expectation: Expectation) {
140 let mut handler = crate::handler::DropHandler::new();
141 let _ = match expectation {
142 Expectation::ParseBindExecute => self.claim_parse_bind_exec_inner(&mut handler),
143 Expectation::BindExecute => self.claim_bind_exec_inner(&mut handler, None),
144 Expectation::Sync => self.consume_ready_for_query(),
145 };
146 }
147
148 pub fn exec<'s, P: ToParams>(
177 &mut self,
178 statement: &'s (impl IntoStatement + ?Sized),
179 params: P,
180 ) -> Result<Ticket<'s>> {
181 let seq = self.queue_seq;
182 self.queue_seq += 1;
183
184 if statement.needs_parse() {
185 self.exec_sql_inner(statement.as_sql().unwrap(), ¶ms)?;
186 Ok(Ticket { seq, stmt: None })
187 } else {
188 let stmt = statement.as_prepared().unwrap();
189 self.exec_prepared_inner(&stmt.wire_name(), &stmt.param_oids, ¶ms)?;
190 Ok(Ticket {
191 seq,
192 stmt: Some(stmt),
193 })
194 }
195 }
196
197 fn exec_sql_inner<P: ToParams>(&mut self, sql: &str, params: &P) -> Result<()> {
198 let param_oids = params.natural_oids();
199 let buf = &mut self.conn.buffer_set.write_buffer;
200 write_parse(buf, "", sql, ¶m_oids);
201 write_bind(buf, "", "", params, ¶m_oids)?;
202 write_describe_portal(buf, "");
203 write_execute(buf, "", 0);
204 self.expectations.push_back(Expectation::ParseBindExecute);
205 Ok(())
206 }
207
208 fn exec_prepared_inner<P: ToParams>(
209 &mut self,
210 stmt_name: &str,
211 param_oids: &[u32],
212 params: &P,
213 ) -> Result<()> {
214 let buf = &mut self.conn.buffer_set.write_buffer;
215 write_bind(buf, "", stmt_name, params, param_oids)?;
216 write_execute(buf, "", 0);
218 self.expectations.push_back(Expectation::BindExecute);
219 Ok(())
220 }
221
222 pub fn flush(&mut self) -> Result<()> {
227 if !self.conn.buffer_set.write_buffer.is_empty() {
228 write_flush(&mut self.conn.buffer_set.write_buffer);
229 self.conn
230 .stream
231 .write_all(&self.conn.buffer_set.write_buffer)?;
232 self.conn.stream.flush()?;
233 self.conn.buffer_set.write_buffer.clear();
234 }
235 Ok(())
236 }
237
238 pub fn sync(&mut self) -> Result<()> {
243 let result = self.sync_inner();
244 if let Err(e) = &result
245 && e.is_connection_broken()
246 {
247 self.conn.is_broken = true;
248 }
249 result
250 }
251
252 fn sync_inner(&mut self) -> Result<()> {
253 write_sync(&mut self.conn.buffer_set.write_buffer);
254 self.expectations.push_back(Expectation::Sync);
255 self.conn
256 .stream
257 .write_all(&self.conn.buffer_set.write_buffer)?;
258 self.conn.stream.flush()?;
259 self.conn.buffer_set.write_buffer.clear();
260 Ok(())
261 }
262
263 fn consume_ready_for_query(&mut self) -> Result<()> {
265 loop {
266 self.conn.stream.read_message(&mut self.conn.buffer_set)?;
267 let type_byte = self.conn.buffer_set.type_byte;
268
269 if RawMessage::is_async_type(type_byte) {
270 continue;
271 }
272
273 if type_byte == msg_type::ERROR_RESPONSE {
274 let error = ErrorResponse::parse(&self.conn.buffer_set.read_buffer)?;
275 return Err(error.into_error());
276 }
277
278 if type_byte == msg_type::READY_FOR_QUERY {
279 let ready = ReadyForQuery::parse(&self.conn.buffer_set.read_buffer)?;
280 self.conn.transaction_status = ready.transaction_status().unwrap_or_default();
281 return Ok(());
282 }
283 }
284 }
285
286 fn consume_pending_syncs(&mut self) -> Result<()> {
288 while self.expectations.front() == Some(&Expectation::Sync) {
289 self.expectations.pop_front();
290 self.consume_ready_for_query()?;
291 self.aborted = false;
293 }
294 Ok(())
295 }
296
297 #[cfg(feature = "lowlevel")]
305 pub fn claim<H: ExtendedHandler>(&mut self, ticket: Ticket<'_>, handler: &mut H) -> Result<()> {
306 self.claim_with_handler(ticket, handler)
307 }
308
309 fn claim_with_handler<H: ExtendedHandler>(
310 &mut self,
311 ticket: Ticket<'_>,
312 handler: &mut H,
313 ) -> Result<()> {
314 self.check_sequence(ticket.seq)?;
315
316 if !self.conn.buffer_set.write_buffer.is_empty() {
318 self.sync()?;
319 }
320
321 if self.aborted {
322 self.claim_seq += 1;
323 self.expectations.pop_front();
325 self.consume_pending_syncs()?;
326 return Err(Error::Protocol(
327 "pipeline aborted due to earlier error".into(),
328 ));
329 }
330
331 let expectation = self.expectations.pop_front();
332
333 let result = match expectation {
334 Some(Expectation::ParseBindExecute) => self.claim_parse_bind_exec_inner(handler),
335 Some(Expectation::BindExecute) => self.claim_bind_exec_inner(handler, ticket.stmt),
336 Some(Expectation::Sync) => Err(Error::Protocol("unexpected Sync expectation".into())),
337 None => Err(Error::Protocol("no expectation in queue".into())),
338 };
339
340 if let Err(e) = &result {
341 if e.is_connection_broken() {
342 self.conn.is_broken = true;
343 }
344 self.aborted = true;
345 }
346 self.claim_seq += 1;
347 self.consume_pending_syncs()?;
348 result
349 }
350
351 pub fn claim_collect<T: for<'b> FromRow<'b>>(&mut self, ticket: Ticket<'_>) -> Result<Vec<T>> {
355 let mut handler = crate::handler::CollectHandler::<T>::new();
356 self.claim_with_handler(ticket, &mut handler)?;
357 Ok(handler.into_rows())
358 }
359
360 pub fn claim_one<T: for<'b> FromRow<'b>>(&mut self, ticket: Ticket<'_>) -> Result<Option<T>> {
364 let mut handler = crate::handler::FirstRowHandler::<T>::new();
365 self.claim_with_handler(ticket, &mut handler)?;
366 Ok(handler.into_row())
367 }
368
369 pub fn claim_drop(&mut self, ticket: Ticket<'_>) -> Result<()> {
373 let mut handler = crate::handler::DropHandler::new();
374 self.claim_with_handler(ticket, &mut handler)
375 }
376
377 fn check_sequence(&self, seq: usize) -> Result<()> {
379 if seq != self.claim_seq {
380 return Err(Error::InvalidUsage(format!(
381 "claim out of order: expected seq {}, got {}",
382 self.claim_seq, seq
383 )));
384 }
385 Ok(())
386 }
387
388 fn claim_parse_bind_exec_inner<H: ExtendedHandler>(&mut self, handler: &mut H) -> Result<()> {
390 self.read_next_message()?;
392 if self.conn.buffer_set.type_byte != msg_type::PARSE_COMPLETE {
393 return self.unexpected_message("ParseComplete");
394 }
395 ParseComplete::parse(&self.conn.buffer_set.read_buffer)?;
396
397 self.read_next_message()?;
399 if self.conn.buffer_set.type_byte != msg_type::BIND_COMPLETE {
400 return self.unexpected_message("BindComplete");
401 }
402 BindComplete::parse(&self.conn.buffer_set.read_buffer)?;
403
404 self.claim_rows_inner(handler)
406 }
407
408 fn claim_bind_exec_inner<H: ExtendedHandler>(
410 &mut self,
411 handler: &mut H,
412 stmt: Option<&PreparedStatement>,
413 ) -> Result<()> {
414 self.read_next_message()?;
416 if self.conn.buffer_set.type_byte != msg_type::BIND_COMPLETE {
417 return self.unexpected_message("BindComplete");
418 }
419 BindComplete::parse(&self.conn.buffer_set.read_buffer)?;
420
421 let row_desc = stmt.and_then(|s| s.row_desc_payload());
423
424 self.claim_rows_cached_inner(handler, row_desc)
426 }
427
428 fn claim_rows_inner<H: ExtendedHandler>(&mut self, handler: &mut H) -> Result<()> {
430 self.read_next_message()?;
432 let has_rows = match self.conn.buffer_set.type_byte {
433 msg_type::ROW_DESCRIPTION => {
434 self.conn.buffer_set.save_column_buffer();
435 true
436 }
437 msg_type::NO_DATA => {
438 NoData::parse(&self.conn.buffer_set.read_buffer)?;
439 false
441 }
442 _ => {
443 return Err(Error::Protocol(format!(
444 "expected RowDescription or NoData, got '{}'",
445 self.conn.buffer_set.type_byte as char
446 )));
447 }
448 };
449
450 loop {
452 self.read_next_message()?;
453 let type_byte = self.conn.buffer_set.type_byte;
454
455 match type_byte {
456 msg_type::DATA_ROW => {
457 if !has_rows {
458 return Err(Error::Protocol(
459 "received DataRow but no RowDescription".into(),
460 ));
461 }
462 let cols = RowDescription::parse(&self.conn.buffer_set.column_buffer)?;
463 let row = DataRow::parse(&self.conn.buffer_set.read_buffer)?;
464 handler.row(cols, row)?;
465 }
466 msg_type::COMMAND_COMPLETE => {
467 let cmd = CommandComplete::parse(&self.conn.buffer_set.read_buffer)?;
468 handler.result_end(cmd)?;
469 return Ok(());
470 }
471 msg_type::EMPTY_QUERY_RESPONSE => {
472 EmptyQueryResponse::parse(&self.conn.buffer_set.read_buffer)?;
473 return Ok(());
474 }
475 _ => {
476 return Err(Error::Protocol(format!(
477 "unexpected message type in pipeline claim: '{}'",
478 type_byte as char
479 )));
480 }
481 }
482 }
483 }
484
485 fn claim_rows_cached_inner<H: ExtendedHandler>(
487 &mut self,
488 handler: &mut H,
489 row_desc: Option<&[u8]>,
490 ) -> Result<()> {
491 loop {
493 self.read_next_message()?;
494 let type_byte = self.conn.buffer_set.type_byte;
495
496 match type_byte {
497 msg_type::DATA_ROW => {
498 let row_desc = row_desc.ok_or_else(|| {
499 Error::Protocol("received DataRow but no RowDescription cached".into())
500 })?;
501 let cols = RowDescription::parse(row_desc)?;
502 let row = DataRow::parse(&self.conn.buffer_set.read_buffer)?;
503 handler.row(cols, row)?;
504 }
505 msg_type::COMMAND_COMPLETE => {
506 let cmd = CommandComplete::parse(&self.conn.buffer_set.read_buffer)?;
507 handler.result_end(cmd)?;
508 return Ok(());
509 }
510 msg_type::EMPTY_QUERY_RESPONSE => {
511 EmptyQueryResponse::parse(&self.conn.buffer_set.read_buffer)?;
512 return Ok(());
513 }
514 _ => {
515 return Err(Error::Protocol(format!(
516 "unexpected message type in pipeline claim: '{}'",
517 type_byte as char
518 )));
519 }
520 }
521 }
522 }
523
524 fn read_next_message(&mut self) -> Result<()> {
526 loop {
527 self.conn.stream.read_message(&mut self.conn.buffer_set)?;
528 let type_byte = self.conn.buffer_set.type_byte;
529
530 if RawMessage::is_async_type(type_byte) {
532 continue;
533 }
534
535 if type_byte == msg_type::ERROR_RESPONSE {
537 let error = ErrorResponse::parse(&self.conn.buffer_set.read_buffer)?;
538 return Err(error.into_error());
539 }
540
541 return Ok(());
542 }
543 }
544
545 fn unexpected_message<T>(&self, expected: &str) -> Result<T> {
547 Err(Error::Protocol(format!(
548 "expected {}, got '{}'",
549 expected, self.conn.buffer_set.type_byte as char
550 )))
551 }
552
553 pub fn pending_count(&self) -> usize {
555 self.queue_seq - self.claim_seq
556 }
557
558 pub fn is_aborted(&self) -> bool {
560 self.aborted
561 }
562}