1use std::net::TcpStream;
4use std::os::unix::net::UnixStream;
5
6use crate::buffer_pool::PooledBufferSet;
7use crate::conversion::ToParams;
8use crate::error::{Error, Result};
9use crate::handler::{
10 AsyncMessageHandler, BinaryHandler, DropHandler, FirstRowHandler, TextHandler,
11};
12use crate::opts::Opts;
13use crate::protocol::backend::BackendKeyData;
14use crate::protocol::frontend::write_terminate;
15use crate::protocol::types::TransactionStatus;
16use crate::state::StateMachine;
17use crate::state::action::Action;
18use crate::state::connection::ConnectionStateMachine;
19use crate::state::extended::{BindStateMachine, ExtendedQueryStateMachine, PreparedStatement};
20use crate::state::simple_query::SimpleQueryStateMachine;
21use crate::statement::IntoStatement;
22
23use super::stream::Stream;
24use super::unnamed_portal::UnnamedPortal;
25
26pub struct Conn {
28 pub(crate) stream: Stream,
29 pub(crate) buffer_set: PooledBufferSet,
30 backend_key: Option<BackendKeyData>,
31 server_params: Vec<(String, String)>,
32 pub(crate) transaction_status: TransactionStatus,
33 pub(crate) is_broken: bool,
34 name_counter: u64,
35 async_message_handler: Option<Box<dyn AsyncMessageHandler>>,
36}
37
38impl Conn {
39 pub fn new<O: TryInto<Opts>>(opts: O) -> Result<Self>
41 where
42 Error: From<O::Error>,
43 {
44 let opts = opts.try_into()?;
45
46 let stream = if let Some(socket_path) = &opts.socket {
47 Stream::unix(UnixStream::connect(socket_path)?)
48 } else {
49 if opts.host.is_empty() {
50 return Err(Error::InvalidUsage("host is empty".into()));
51 }
52 let addr = format!("{}:{}", opts.host, opts.port);
53 let tcp = TcpStream::connect(&addr)?;
54 tcp.set_nodelay(true)?;
55 Stream::tcp(tcp)
56 };
57
58 Self::new_with_stream(stream, opts)
59 }
60
61 #[allow(unused_mut)]
63 pub fn new_with_stream(mut stream: Stream, options: Opts) -> Result<Self> {
64 let mut buffer_set = options.buffer_pool.get_buffer_set();
65 let mut state_machine = ConnectionStateMachine::new(options.clone());
66
67 loop {
69 match state_machine.step(&mut buffer_set)? {
70 Action::WriteAndReadByte => {
71 stream.write_all(&buffer_set.write_buffer)?;
72 stream.flush()?;
73 let byte = stream.read_u8()?;
74 state_machine.set_ssl_response(byte);
75 }
76 Action::ReadMessage => {
77 stream.read_message(&mut buffer_set)?;
78 }
79 Action::Write => {
80 stream.write_all(&buffer_set.write_buffer)?;
81 stream.flush()?;
82 }
83 Action::WriteAndReadMessage => {
84 stream.write_all(&buffer_set.write_buffer)?;
85 stream.flush()?;
86 stream.read_message(&mut buffer_set)?;
87 }
88 Action::TlsHandshake => {
89 #[cfg(feature = "sync-tls")]
90 {
91 stream = stream.upgrade_to_tls(&options.host)?;
92 }
93 #[cfg(not(feature = "sync-tls"))]
94 {
95 return Err(Error::Unsupported(
96 "TLS requested but sync-tls feature not enabled".into(),
97 ));
98 }
99 }
100 Action::HandleAsyncMessageAndReadMessage(_) => {
101 stream.read_message(&mut buffer_set)?;
103 }
104 Action::Finished => break,
105 }
106 }
107
108 let conn = Self {
109 stream,
110 buffer_set,
111 backend_key: state_machine.backend_key().cloned(),
112 server_params: state_machine.take_server_params(),
113 transaction_status: state_machine.transaction_status(),
114 is_broken: false,
115 name_counter: 0,
116 async_message_handler: None,
117 };
118
119 let conn = if options.prefer_unix_socket && conn.stream.is_tcp_loopback() {
121 conn.try_upgrade_to_unix_socket(&options)
122 } else {
123 conn
124 };
125
126 Ok(conn)
127 }
128
129 fn try_upgrade_to_unix_socket(mut self, opts: &Opts) -> Self {
132 let mut handler = FirstRowHandler::<(String,)>::new();
134 if self
135 .query("SHOW unix_socket_directories", &mut handler)
136 .is_err()
137 {
138 return self;
139 }
140
141 let socket_dir = match handler.into_row() {
142 Some((dirs,)) => {
143 match dirs.split(',').next() {
145 Some(d) if !d.trim().is_empty() => d.trim().to_string(),
146 _ => return self,
147 }
148 }
149 None => return self,
150 };
151
152 let socket_path = format!("{}/.s.PGSQL.{}", socket_dir, opts.port);
154
155 let unix_stream = match UnixStream::connect(&socket_path) {
157 Ok(s) => s,
158 Err(_) => return self,
159 };
160
161 let mut opts_unix = opts.clone();
163 opts_unix.prefer_unix_socket = false;
164
165 match Self::new_with_stream(Stream::unix(unix_stream), opts_unix) {
166 Ok(new_conn) => new_conn,
167 Err(_) => self,
168 }
169 }
170
171 pub fn backend_key(&self) -> Option<&BackendKeyData> {
173 self.backend_key.as_ref()
174 }
175
176 pub fn connection_id(&self) -> u32 {
180 self.backend_key.as_ref().map_or(0, |k| k.process_id())
181 }
182
183 pub fn server_params(&self) -> &[(String, String)] {
185 &self.server_params
186 }
187
188 pub fn transaction_status(&self) -> TransactionStatus {
190 self.transaction_status
191 }
192
193 pub fn in_transaction(&self) -> bool {
195 self.transaction_status.in_transaction()
196 }
197
198 pub fn is_broken(&self) -> bool {
200 self.is_broken
201 }
202
203 pub(crate) fn next_portal_name(&mut self) -> String {
205 self.name_counter += 1;
206 format!("_zero_p_{}", self.name_counter)
207 }
208
209 pub(crate) fn create_named_portal<S: IntoStatement, P: ToParams>(
213 &mut self,
214 portal_name: &str,
215 statement: &S,
216 params: &P,
217 ) -> Result<()> {
218 let mut state_machine = if let Some(sql) = statement.as_sql() {
220 BindStateMachine::bind_sql(&mut self.buffer_set, portal_name, sql, params)?
221 } else {
222 let stmt = statement.as_prepared().unwrap();
223 BindStateMachine::bind_prepared(
224 &mut self.buffer_set,
225 portal_name,
226 &stmt.wire_name(),
227 &stmt.param_oids,
228 params,
229 )?
230 };
231
232 loop {
234 match state_machine.step(&mut self.buffer_set)? {
235 Action::ReadMessage => {
236 self.stream.read_message(&mut self.buffer_set)?;
237 }
238 Action::Write => {
239 self.stream.write_all(&self.buffer_set.write_buffer)?;
240 self.stream.flush()?;
241 }
242 Action::WriteAndReadMessage => {
243 self.stream.write_all(&self.buffer_set.write_buffer)?;
244 self.stream.flush()?;
245 self.stream.read_message(&mut self.buffer_set)?;
246 }
247 Action::Finished => break,
248 _ => return Err(Error::Protocol("Unexpected action in bind".into())),
249 }
250 }
251
252 Ok(())
253 }
254
255 pub fn set_async_message_handler<H: AsyncMessageHandler + 'static>(&mut self, handler: H) {
262 self.async_message_handler = Some(Box::new(handler));
263 }
264
265 pub fn clear_async_message_handler(&mut self) {
267 self.async_message_handler = None;
268 }
269
270 pub fn run_pipeline<T, F>(&mut self, f: F) -> Result<T>
291 where
292 F: FnOnce(&mut super::pipeline::Pipeline<'_>) -> Result<T>,
293 {
294 let mut pipeline = super::pipeline::Pipeline::new_inner(self);
295 let result = f(&mut pipeline);
296 pipeline.cleanup();
297 result
298 }
299
300 pub fn ping(&mut self) -> Result<()> {
302 self.query_drop("")?;
303 Ok(())
304 }
305
306 fn drive<S: StateMachine>(&mut self, state_machine: &mut S) -> Result<()> {
308 loop {
309 match state_machine.step(&mut self.buffer_set)? {
310 Action::WriteAndReadByte => {
311 return Err(Error::Protocol(
312 "Unexpected WriteAndReadByte in query state machine".into(),
313 ));
314 }
315 Action::ReadMessage => {
316 self.stream.read_message(&mut self.buffer_set)?;
317 }
318 Action::Write => {
319 self.stream.write_all(&self.buffer_set.write_buffer)?;
320 self.stream.flush()?;
321 }
322 Action::WriteAndReadMessage => {
323 self.stream.write_all(&self.buffer_set.write_buffer)?;
324 self.stream.flush()?;
325 self.stream.read_message(&mut self.buffer_set)?;
326 }
327 Action::TlsHandshake => {
328 return Err(Error::Protocol(
329 "Unexpected TlsHandshake in query state machine".into(),
330 ));
331 }
332 Action::HandleAsyncMessageAndReadMessage(ref async_msg) => {
333 if let Some(ref mut h) = self.async_message_handler {
334 h.handle(async_msg);
335 }
336 self.stream.read_message(&mut self.buffer_set)?;
338 }
339 Action::Finished => {
340 self.transaction_status = state_machine.transaction_status();
341 break;
342 }
343 }
344 }
345 Ok(())
346 }
347
348 pub fn query<H: TextHandler>(&mut self, sql: &str, handler: &mut H) -> Result<()> {
350 let result = self.query_inner(sql, handler);
351 if let Err(e) = &result
352 && e.is_connection_broken()
353 {
354 self.is_broken = true;
355 }
356 result
357 }
358
359 fn query_inner<H: TextHandler>(&mut self, sql: &str, handler: &mut H) -> Result<()> {
360 let mut state_machine = SimpleQueryStateMachine::new(handler, sql);
361 self.drive(&mut state_machine)
362 }
363
364 pub fn query_drop(&mut self, sql: &str) -> Result<Option<u64>> {
366 let mut handler = DropHandler::new();
367 self.query(sql, &mut handler)?;
368 Ok(handler.rows_affected())
369 }
370
371 pub fn query_collect<T: for<'a> crate::conversion::FromRow<'a>>(
382 &mut self,
383 sql: &str,
384 ) -> Result<Vec<T>> {
385 let mut handler = crate::handler::CollectHandler::<T>::new();
386 self.query(sql, &mut handler)?;
387 Ok(handler.into_rows())
388 }
389
390 pub fn query_first<T: for<'a> crate::conversion::FromRow<'a>>(
392 &mut self,
393 sql: &str,
394 ) -> Result<Option<T>> {
395 let mut handler = crate::handler::FirstRowHandler::<T>::new();
396 self.query(sql, &mut handler)?;
397 Ok(handler.into_row())
398 }
399
400 pub fn close(mut self) -> Result<()> {
402 self.buffer_set.write_buffer.clear();
403 write_terminate(&mut self.buffer_set.write_buffer);
404 self.stream.write_all(&self.buffer_set.write_buffer)?;
405 self.stream.flush()?;
406 Ok(())
407 }
408
409 pub fn prepare(&mut self, query: &str) -> Result<PreparedStatement> {
413 self.prepare_typed(query, &[])
414 }
415
416 pub fn prepare_batch(&mut self, queries: &[&str]) -> Result<Vec<PreparedStatement>> {
433 if queries.is_empty() {
434 return Ok(Vec::new());
435 }
436
437 let start_idx = self.name_counter + 1;
438 self.name_counter += queries.len() as u64;
439
440 let result = self.prepare_batch_inner(queries, start_idx);
441 if let Err(e) = &result
442 && e.is_connection_broken()
443 {
444 self.is_broken = true;
445 }
446 result
447 }
448
449 fn prepare_batch_inner(
450 &mut self,
451 queries: &[&str],
452 start_idx: u64,
453 ) -> Result<Vec<PreparedStatement>> {
454 use crate::state::batch_prepare::BatchPrepareStateMachine;
455
456 let mut state_machine =
457 BatchPrepareStateMachine::new(&mut self.buffer_set, queries, start_idx);
458
459 loop {
460 match state_machine.step(&mut self.buffer_set)? {
461 Action::ReadMessage => {
462 self.stream.read_message(&mut self.buffer_set)?;
463 }
464 Action::WriteAndReadMessage => {
465 self.stream.write_all(&self.buffer_set.write_buffer)?;
466 self.stream.flush()?;
467 self.stream.read_message(&mut self.buffer_set)?;
468 }
469 Action::Finished => {
470 self.transaction_status = state_machine.transaction_status();
471 break;
472 }
473 _ => return Err(Error::Protocol("Unexpected action in batch prepare".into())),
474 }
475 }
476
477 Ok(state_machine.take_statements())
478 }
479
480 pub fn prepare_typed(&mut self, query: &str, param_oids: &[u32]) -> Result<PreparedStatement> {
482 self.name_counter += 1;
483 let idx = self.name_counter;
484 let result = self.prepare_inner(idx, query, param_oids);
485 if let Err(e) = &result
486 && e.is_connection_broken()
487 {
488 self.is_broken = true;
489 }
490 result
491 }
492
493 fn prepare_inner(
494 &mut self,
495 idx: u64,
496 query: &str,
497 param_oids: &[u32],
498 ) -> Result<PreparedStatement> {
499 let mut handler = DropHandler::new();
500 let mut state_machine = ExtendedQueryStateMachine::prepare(
501 &mut handler,
502 &mut self.buffer_set,
503 idx,
504 query,
505 param_oids,
506 );
507 self.drive(&mut state_machine)?;
508 state_machine
509 .take_prepared_statement()
510 .ok_or_else(|| Error::Protocol("No prepared statement".into()))
511 }
512
513 pub fn exec<S: IntoStatement, P: ToParams, H: BinaryHandler>(
530 &mut self,
531 statement: S,
532 params: P,
533 handler: &mut H,
534 ) -> Result<()> {
535 let result = self.exec_inner(&statement, ¶ms, handler);
536 if let Err(e) = &result
537 && e.is_connection_broken()
538 {
539 self.is_broken = true;
540 }
541 result
542 }
543
544 fn exec_inner<S: IntoStatement, P: ToParams, H: BinaryHandler>(
545 &mut self,
546 statement: &S,
547 params: &P,
548 handler: &mut H,
549 ) -> Result<()> {
550 let mut state_machine = if statement.needs_parse() {
551 ExtendedQueryStateMachine::execute_sql(
552 handler,
553 &mut self.buffer_set,
554 statement.as_sql().unwrap(),
555 params,
556 )?
557 } else {
558 let stmt = statement.as_prepared().unwrap();
559 ExtendedQueryStateMachine::execute(
560 handler,
561 &mut self.buffer_set,
562 &stmt.wire_name(),
563 &stmt.param_oids,
564 params,
565 )?
566 };
567
568 self.drive(&mut state_machine)
569 }
570
571 pub fn exec_drop<S: IntoStatement, P: ToParams>(
575 &mut self,
576 statement: S,
577 params: P,
578 ) -> Result<Option<u64>> {
579 let mut handler = DropHandler::new();
580 self.exec(statement, params, &mut handler)?;
581 Ok(handler.rows_affected())
582 }
583
584 pub fn exec_collect<
598 T: for<'a> crate::conversion::FromRow<'a>,
599 S: IntoStatement,
600 P: ToParams,
601 >(
602 &mut self,
603 statement: S,
604 params: P,
605 ) -> Result<Vec<T>> {
606 let mut handler = crate::handler::CollectHandler::<T>::new();
607 self.exec(statement, params, &mut handler)?;
608 Ok(handler.into_rows())
609 }
610
611 pub fn exec_first<T: for<'a> crate::conversion::FromRow<'a>, S: IntoStatement, P: ToParams>(
625 &mut self,
626 statement: S,
627 params: P,
628 ) -> Result<Option<T>> {
629 let mut handler = crate::handler::FirstRowHandler::<T>::new();
630 self.exec(statement, params, &mut handler)?;
631 Ok(handler.into_row())
632 }
633
634 pub fn exec_batch<S: IntoStatement, P: ToParams>(
665 &mut self,
666 statement: S,
667 params_list: &[P],
668 ) -> Result<()> {
669 self.exec_batch_chunked(statement, params_list, 1000)
670 }
671
672 pub fn exec_batch_chunked<S: IntoStatement, P: ToParams>(
676 &mut self,
677 statement: S,
678 params_list: &[P],
679 chunk_size: usize,
680 ) -> Result<()> {
681 let result = self.exec_batch_inner(&statement, params_list, chunk_size);
682 if let Err(e) = &result
683 && e.is_connection_broken()
684 {
685 self.is_broken = true;
686 }
687 result
688 }
689
690 fn exec_batch_inner<S: IntoStatement, P: ToParams>(
691 &mut self,
692 statement: &S,
693 params_list: &[P],
694 chunk_size: usize,
695 ) -> Result<()> {
696 use crate::protocol::frontend::{write_bind, write_execute, write_parse, write_sync};
697 use crate::state::extended::BatchStateMachine;
698
699 if params_list.is_empty() {
700 return Ok(());
701 }
702
703 let chunk_size = chunk_size.max(1);
704 let needs_parse = statement.needs_parse();
705 let sql = statement.as_sql();
706 let prepared = statement.as_prepared();
707
708 let param_oids: Vec<u32> = if let Some(stmt) = prepared {
710 stmt.param_oids.clone()
711 } else {
712 params_list[0].natural_oids()
713 };
714
715 let stmt_name = prepared.map(|s| s.wire_name()).unwrap_or_default();
717
718 for chunk in params_list.chunks(chunk_size) {
719 self.buffer_set.write_buffer.clear();
720
721 let parse_in_chunk = needs_parse;
723 if parse_in_chunk {
724 write_parse(
725 &mut self.buffer_set.write_buffer,
726 "",
727 sql.unwrap(),
728 ¶m_oids,
729 );
730 }
731
732 for params in chunk {
734 let effective_stmt_name = if needs_parse { "" } else { &stmt_name };
735 write_bind(
736 &mut self.buffer_set.write_buffer,
737 "",
738 effective_stmt_name,
739 params,
740 ¶m_oids,
741 )?;
742 write_execute(&mut self.buffer_set.write_buffer, "", 0);
743 }
744
745 write_sync(&mut self.buffer_set.write_buffer);
747
748 let mut state_machine = BatchStateMachine::new(parse_in_chunk);
750 self.drive_batch(&mut state_machine)?;
751 self.transaction_status = state_machine.transaction_status();
752 }
753
754 Ok(())
755 }
756
757 fn drive_batch(
759 &mut self,
760 state_machine: &mut crate::state::extended::BatchStateMachine,
761 ) -> Result<()> {
762 use crate::protocol::backend::{ReadyForQuery, msg_type};
763 use crate::state::action::Action;
764
765 loop {
766 let step_result = state_machine.step(&mut self.buffer_set);
767 match step_result {
768 Ok(Action::ReadMessage) => {
769 self.stream.read_message(&mut self.buffer_set)?;
770 }
771 Ok(Action::WriteAndReadMessage) => {
772 self.stream.write_all(&self.buffer_set.write_buffer)?;
773 self.stream.flush()?;
774 self.stream.read_message(&mut self.buffer_set)?;
775 }
776 Ok(Action::Finished) => {
777 break;
778 }
779 Ok(_) => return Err(Error::Protocol("Unexpected action in batch".into())),
780 Err(e) => {
781 loop {
783 self.stream.read_message(&mut self.buffer_set)?;
784 if self.buffer_set.type_byte == msg_type::READY_FOR_QUERY {
785 let ready = ReadyForQuery::parse(&self.buffer_set.read_buffer)?;
786 self.transaction_status =
787 ready.transaction_status().unwrap_or_default();
788 break;
789 }
790 }
791 return Err(e);
792 }
793 }
794 }
795 Ok(())
796 }
797
798 pub fn close_statement(&mut self, stmt: &PreparedStatement) -> Result<()> {
800 let result = self.close_statement_inner(&stmt.wire_name());
801 if let Err(e) = &result
802 && e.is_connection_broken()
803 {
804 self.is_broken = true;
805 }
806 result
807 }
808
809 fn close_statement_inner(&mut self, name: &str) -> Result<()> {
810 let mut handler = DropHandler::new();
811 let mut state_machine =
812 ExtendedQueryStateMachine::close_statement(&mut handler, &mut self.buffer_set, name);
813 self.drive(&mut state_machine)
814 }
815
816 pub fn tx<F, R>(&mut self, f: F) -> Result<R>
826 where
827 F: FnOnce(&mut Conn, super::transaction::Transaction) -> Result<R>,
828 {
829 if self.in_transaction() {
830 return Err(Error::InvalidUsage(
831 "nested transactions are not supported".into(),
832 ));
833 }
834
835 self.query_drop("BEGIN")?;
836
837 let tx = super::transaction::Transaction::new(self.connection_id());
838 let result = f(self, tx);
839
840 if self.in_transaction() {
842 let rollback_result = self.query_drop("ROLLBACK");
843
844 if let Err(e) = result {
846 return Err(e);
847 }
848 rollback_result?;
849 }
850
851 result
852 }
853}
854
855impl Conn {
858 pub fn lowlevel_bind<P: ToParams>(
868 &mut self,
869 portal: &str,
870 statement_name: &str,
871 params: P,
872 ) -> Result<()> {
873 let result = self.lowlevel_bind_inner(portal, statement_name, ¶ms);
874 if let Err(e) = &result
875 && e.is_connection_broken()
876 {
877 self.is_broken = true;
878 }
879 result
880 }
881
882 fn lowlevel_bind_inner<P: ToParams>(
883 &mut self,
884 portal: &str,
885 statement_name: &str,
886 params: &P,
887 ) -> Result<()> {
888 use crate::protocol::backend::{BindComplete, ErrorResponse, RawMessage, msg_type};
889 use crate::protocol::frontend::{write_bind, write_flush};
890
891 let param_oids = params.natural_oids();
892 self.buffer_set.write_buffer.clear();
893 write_bind(
894 &mut self.buffer_set.write_buffer,
895 portal,
896 statement_name,
897 params,
898 ¶m_oids,
899 )?;
900 write_flush(&mut self.buffer_set.write_buffer);
901
902 self.stream.write_all(&self.buffer_set.write_buffer)?;
903 self.stream.flush()?;
904
905 loop {
906 self.stream.read_message(&mut self.buffer_set)?;
907 let type_byte = self.buffer_set.type_byte;
908
909 if RawMessage::is_async_type(type_byte) {
910 continue;
911 }
912
913 match type_byte {
914 msg_type::BIND_COMPLETE => {
915 BindComplete::parse(&self.buffer_set.read_buffer)?;
916 return Ok(());
917 }
918 msg_type::ERROR_RESPONSE => {
919 let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
920 return Err(error.into_error());
921 }
922 _ => {
923 return Err(Error::Protocol(format!(
924 "Expected BindComplete or ErrorResponse, got '{}'",
925 type_byte as char
926 )));
927 }
928 }
929 }
930 }
931
932 pub fn lowlevel_execute<H: BinaryHandler>(
945 &mut self,
946 portal: &str,
947 max_rows: u32,
948 handler: &mut H,
949 ) -> Result<bool> {
950 let result = self.lowlevel_execute_inner(portal, max_rows, handler);
951 if let Err(e) = &result
952 && e.is_connection_broken()
953 {
954 self.is_broken = true;
955 }
956 result
957 }
958
959 fn lowlevel_execute_inner<H: BinaryHandler>(
960 &mut self,
961 portal: &str,
962 max_rows: u32,
963 handler: &mut H,
964 ) -> Result<bool> {
965 use crate::protocol::backend::{
966 CommandComplete, DataRow, ErrorResponse, NoData, PortalSuspended, RawMessage,
967 RowDescription, msg_type,
968 };
969 use crate::protocol::frontend::{write_describe_portal, write_execute, write_flush};
970
971 self.buffer_set.write_buffer.clear();
972 write_describe_portal(&mut self.buffer_set.write_buffer, portal);
973 write_execute(&mut self.buffer_set.write_buffer, portal, max_rows);
974 write_flush(&mut self.buffer_set.write_buffer);
975
976 self.stream.write_all(&self.buffer_set.write_buffer)?;
977 self.stream.flush()?;
978
979 let mut column_buffer: Vec<u8> = Vec::new();
980
981 loop {
982 self.stream.read_message(&mut self.buffer_set)?;
983 let type_byte = self.buffer_set.type_byte;
984
985 if RawMessage::is_async_type(type_byte) {
986 continue;
987 }
988
989 match type_byte {
990 msg_type::ROW_DESCRIPTION => {
991 column_buffer.clear();
992 column_buffer.extend_from_slice(&self.buffer_set.read_buffer);
993 let cols = RowDescription::parse(&column_buffer)?;
994 handler.result_start(cols)?;
995 }
996 msg_type::NO_DATA => {
997 NoData::parse(&self.buffer_set.read_buffer)?;
998 }
999 msg_type::DATA_ROW => {
1000 let cols = RowDescription::parse(&column_buffer)?;
1001 let row = DataRow::parse(&self.buffer_set.read_buffer)?;
1002 handler.row(cols, row)?;
1003 }
1004 msg_type::COMMAND_COMPLETE => {
1005 let complete = CommandComplete::parse(&self.buffer_set.read_buffer)?;
1006 handler.result_end(complete)?;
1007 return Ok(false); }
1009 msg_type::PORTAL_SUSPENDED => {
1010 PortalSuspended::parse(&self.buffer_set.read_buffer)?;
1011 return Ok(true); }
1013 msg_type::ERROR_RESPONSE => {
1014 let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1015 return Err(error.into_error());
1016 }
1017 _ => {
1018 return Err(Error::Protocol(format!(
1019 "Unexpected message in execute: '{}'",
1020 type_byte as char
1021 )));
1022 }
1023 }
1024 }
1025 }
1026
1027 pub fn lowlevel_sync(&mut self) -> Result<()> {
1034 let result = self.lowlevel_sync_inner();
1035 if let Err(e) = &result
1036 && e.is_connection_broken()
1037 {
1038 self.is_broken = true;
1039 }
1040 result
1041 }
1042
1043 fn lowlevel_sync_inner(&mut self) -> Result<()> {
1044 use crate::protocol::backend::{ErrorResponse, RawMessage, ReadyForQuery, msg_type};
1045 use crate::protocol::frontend::write_sync;
1046
1047 self.buffer_set.write_buffer.clear();
1048 write_sync(&mut self.buffer_set.write_buffer);
1049
1050 self.stream.write_all(&self.buffer_set.write_buffer)?;
1051 self.stream.flush()?;
1052
1053 let mut pending_error: Option<Error> = None;
1054
1055 loop {
1056 self.stream.read_message(&mut self.buffer_set)?;
1057 let type_byte = self.buffer_set.type_byte;
1058
1059 if RawMessage::is_async_type(type_byte) {
1060 continue;
1061 }
1062
1063 match type_byte {
1064 msg_type::READY_FOR_QUERY => {
1065 let ready = ReadyForQuery::parse(&self.buffer_set.read_buffer)?;
1066 self.transaction_status = ready.transaction_status().unwrap_or_default();
1067 if let Some(e) = pending_error {
1068 return Err(e);
1069 }
1070 return Ok(());
1071 }
1072 msg_type::ERROR_RESPONSE => {
1073 let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1074 pending_error = Some(error.into_error());
1075 }
1076 _ => {
1077 }
1079 }
1080 }
1081 }
1082
1083 pub fn lowlevel_flush(&mut self) -> Result<()> {
1090 use crate::protocol::frontend::write_flush;
1091
1092 self.buffer_set.write_buffer.clear();
1093 write_flush(&mut self.buffer_set.write_buffer);
1094
1095 self.stream.write_all(&self.buffer_set.write_buffer)?;
1096 self.stream.flush()?;
1097 Ok(())
1098 }
1099
1100 pub fn exec_iter<S: IntoStatement, P, F, T>(
1130 &mut self,
1131 statement: S,
1132 params: P,
1133 f: F,
1134 ) -> Result<T>
1135 where
1136 P: ToParams,
1137 F: FnOnce(&mut UnnamedPortal<'_>) -> Result<T>,
1138 {
1139 let result = self.exec_iter_inner(&statement, ¶ms, f);
1140 if let Err(e) = &result
1141 && e.is_connection_broken()
1142 {
1143 self.is_broken = true;
1144 }
1145 result
1146 }
1147
1148 fn exec_iter_inner<S: IntoStatement, P, F, T>(
1149 &mut self,
1150 statement: &S,
1151 params: &P,
1152 f: F,
1153 ) -> Result<T>
1154 where
1155 P: ToParams,
1156 F: FnOnce(&mut UnnamedPortal<'_>) -> Result<T>,
1157 {
1158 let mut state_machine = if let Some(sql) = statement.as_sql() {
1160 BindStateMachine::bind_sql(&mut self.buffer_set, "", sql, params)?
1161 } else {
1162 let stmt = statement.as_prepared().unwrap();
1163 BindStateMachine::bind_prepared(
1164 &mut self.buffer_set,
1165 "",
1166 &stmt.wire_name(),
1167 &stmt.param_oids,
1168 params,
1169 )?
1170 };
1171
1172 loop {
1174 match state_machine.step(&mut self.buffer_set)? {
1175 Action::ReadMessage => {
1176 self.stream.read_message(&mut self.buffer_set)?;
1177 }
1178 Action::Write => {
1179 self.stream.write_all(&self.buffer_set.write_buffer)?;
1180 self.stream.flush()?;
1181 }
1182 Action::WriteAndReadMessage => {
1183 self.stream.write_all(&self.buffer_set.write_buffer)?;
1184 self.stream.flush()?;
1185 self.stream.read_message(&mut self.buffer_set)?;
1186 }
1187 Action::Finished => break,
1188 _ => return Err(Error::Protocol("Unexpected action in bind".into())),
1189 }
1190 }
1191
1192 let mut portal = UnnamedPortal { conn: self };
1194 let result = f(&mut portal);
1195
1196 let sync_result = portal.conn.lowlevel_sync();
1198
1199 match (result, sync_result) {
1201 (Ok(v), Ok(())) => Ok(v),
1202 (Err(e), _) => Err(e),
1203 (Ok(_), Err(e)) => Err(e),
1204 }
1205 }
1206
1207 pub fn lowlevel_close_portal(&mut self, portal: &str) -> Result<()> {
1209 let result = self.lowlevel_close_portal_inner(portal);
1210 if let Err(e) = &result
1211 && e.is_connection_broken()
1212 {
1213 self.is_broken = true;
1214 }
1215 result
1216 }
1217
1218 fn lowlevel_close_portal_inner(&mut self, portal: &str) -> Result<()> {
1219 use crate::protocol::backend::{CloseComplete, ErrorResponse, RawMessage, msg_type};
1220 use crate::protocol::frontend::{write_close_portal, write_flush};
1221
1222 self.buffer_set.write_buffer.clear();
1223 write_close_portal(&mut self.buffer_set.write_buffer, portal);
1224 write_flush(&mut self.buffer_set.write_buffer);
1225
1226 self.stream.write_all(&self.buffer_set.write_buffer)?;
1227 self.stream.flush()?;
1228
1229 loop {
1230 self.stream.read_message(&mut self.buffer_set)?;
1231 let type_byte = self.buffer_set.type_byte;
1232
1233 if RawMessage::is_async_type(type_byte) {
1234 continue;
1235 }
1236
1237 match type_byte {
1238 msg_type::CLOSE_COMPLETE => {
1239 CloseComplete::parse(&self.buffer_set.read_buffer)?;
1240 return Ok(());
1241 }
1242 msg_type::ERROR_RESPONSE => {
1243 let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1244 return Err(error.into_error());
1245 }
1246 _ => {
1247 return Err(Error::Protocol(format!(
1248 "Expected CloseComplete or ErrorResponse, got '{}'",
1249 type_byte as char
1250 )));
1251 }
1252 }
1253 }
1254 }
1255}
1256
1257impl Drop for Conn {
1258 fn drop(&mut self) {
1259 self.buffer_set.write_buffer.clear();
1261 write_terminate(&mut self.buffer_set.write_buffer);
1262 let _ = self.stream.write_all(&self.buffer_set.write_buffer);
1263 let _ = self.stream.flush();
1264 }
1265}