1use std::net::TcpStream;
4#[cfg(unix)]
5use std::os::unix::net::UnixStream;
6
7use crate::buffer_pool::PooledBufferSet;
8use crate::conversion::ToParams;
9use crate::error::{Error, Result};
10use crate::handler::{
11 AsyncMessageHandler, BinaryHandler, DropHandler, FirstRowHandler, TextHandler,
12};
13use crate::opts::Opts;
14use crate::protocol::backend::BackendKeyData;
15use crate::protocol::frontend::write_terminate;
16use crate::protocol::types::TransactionStatus;
17use crate::state::StateMachine;
18use crate::state::action::Action;
19use crate::state::connection::ConnectionStateMachine;
20use crate::state::extended::{BindStateMachine, ExtendedQueryStateMachine, PreparedStatement};
21use crate::state::simple_query::SimpleQueryStateMachine;
22use crate::statement::IntoStatement;
23
24use super::stream::Stream;
25use super::unnamed_portal::UnnamedPortal;
26
27pub struct Conn {
29 pub(crate) stream: Stream,
30 pub(crate) buffer_set: PooledBufferSet,
31 backend_key: Option<BackendKeyData>,
32 server_params: Vec<(String, String)>,
33 pub(crate) transaction_status: TransactionStatus,
34 pub(crate) is_broken: bool,
35 name_counter: u64,
36 async_message_handler: Option<Box<dyn AsyncMessageHandler>>,
37}
38
39impl Conn {
40 pub fn new<O: TryInto<Opts>>(opts: O) -> Result<Self>
42 where
43 Error: From<O::Error>,
44 {
45 let opts = opts.try_into()?;
46
47 let stream = if let Some(socket_path) = &opts.socket {
48 #[cfg(unix)]
49 {
50 Stream::unix(UnixStream::connect(socket_path)?)
51 }
52 #[cfg(not(unix))]
53 {
54 let _ = socket_path;
55 return Err(Error::Unsupported(
56 "Unix sockets are not supported on this platform".into(),
57 ));
58 }
59 } else {
60 if opts.host.is_empty() {
61 return Err(Error::InvalidUsage("host is empty".into()));
62 }
63 let addr = format!("{}:{}", opts.host, opts.port);
64 let tcp = TcpStream::connect(&addr)?;
65 tcp.set_nodelay(true)?;
66 Stream::tcp(tcp)
67 };
68
69 Self::new_with_stream(stream, opts)
70 }
71
72 #[allow(unused_mut)]
74 pub fn new_with_stream(mut stream: Stream, options: Opts) -> Result<Self> {
75 let mut buffer_set = options.buffer_pool.get_buffer_set();
76 let mut state_machine = ConnectionStateMachine::new(options.clone());
77
78 loop {
80 match state_machine.step(&mut buffer_set)? {
81 Action::WriteAndReadByte => {
82 stream.write_all(&buffer_set.write_buffer)?;
83 stream.flush()?;
84 let byte = stream.read_u8()?;
85 state_machine.set_ssl_response(byte);
86 }
87 Action::ReadMessage => {
88 stream.read_message(&mut buffer_set)?;
89 }
90 Action::Write => {
91 stream.write_all(&buffer_set.write_buffer)?;
92 stream.flush()?;
93 }
94 Action::WriteAndReadMessage => {
95 stream.write_all(&buffer_set.write_buffer)?;
96 stream.flush()?;
97 stream.read_message(&mut buffer_set)?;
98 }
99 Action::TlsHandshake => {
100 #[cfg(feature = "sync-tls")]
101 {
102 stream = stream.upgrade_to_tls(&options.host)?;
103 }
104 #[cfg(not(feature = "sync-tls"))]
105 {
106 return Err(Error::Unsupported(
107 "TLS requested but sync-tls feature not enabled".into(),
108 ));
109 }
110 }
111 Action::HandleAsyncMessageAndReadMessage(_) => {
112 stream.read_message(&mut buffer_set)?;
114 }
115 Action::Finished => break,
116 }
117 }
118
119 let conn = Self {
120 stream,
121 buffer_set,
122 backend_key: state_machine.backend_key().cloned(),
123 server_params: state_machine.take_server_params(),
124 transaction_status: state_machine.transaction_status(),
125 is_broken: false,
126 name_counter: 0,
127 async_message_handler: None,
128 };
129
130 #[cfg(unix)]
132 let conn = if options.upgrade_to_unix_socket && conn.stream.is_tcp_loopback() {
133 conn.try_upgrade_to_unix_socket(&options)
134 } else {
135 conn
136 };
137
138 Ok(conn)
139 }
140
141 #[cfg(unix)]
144 fn try_upgrade_to_unix_socket(mut self, opts: &Opts) -> Self {
145 let mut handler = FirstRowHandler::<(String,)>::new();
147 if self
148 .query("SHOW unix_socket_directories", &mut handler)
149 .is_err()
150 {
151 return self;
152 }
153
154 let socket_dir = match handler.into_row() {
155 Some((dirs,)) => {
156 match dirs.split(',').next() {
158 Some(d) if !d.trim().is_empty() => d.trim().to_string(),
159 _ => return self,
160 }
161 }
162 None => return self,
163 };
164
165 let socket_path = format!("{}/.s.PGSQL.{}", socket_dir, opts.port);
167
168 let unix_stream = match UnixStream::connect(&socket_path) {
170 Ok(s) => s,
171 Err(_) => return self,
172 };
173
174 let mut opts_unix = opts.clone();
176 opts_unix.upgrade_to_unix_socket = false;
177
178 match Self::new_with_stream(Stream::unix(unix_stream), opts_unix) {
179 Ok(new_conn) => new_conn,
180 Err(_) => self,
181 }
182 }
183
184 pub fn backend_key(&self) -> Option<&BackendKeyData> {
186 self.backend_key.as_ref()
187 }
188
189 pub fn connection_id(&self) -> u32 {
193 self.backend_key.as_ref().map_or(0, |k| k.process_id())
194 }
195
196 pub fn server_params(&self) -> &[(String, String)] {
198 &self.server_params
199 }
200
201 pub fn transaction_status(&self) -> TransactionStatus {
203 self.transaction_status
204 }
205
206 pub fn in_transaction(&self) -> bool {
208 self.transaction_status.in_transaction()
209 }
210
211 pub fn is_broken(&self) -> bool {
213 self.is_broken
214 }
215
216 pub(crate) fn next_portal_name(&mut self) -> String {
218 self.name_counter += 1;
219 format!("_zero_p_{}", self.name_counter)
220 }
221
222 pub(crate) fn create_named_portal<S: IntoStatement, P: ToParams>(
226 &mut self,
227 portal_name: &str,
228 statement: &S,
229 params: &P,
230 ) -> Result<()> {
231 let mut state_machine = if let Some(sql) = statement.as_sql() {
233 BindStateMachine::bind_sql(&mut self.buffer_set, portal_name, sql, params)?
234 } else {
235 let stmt = statement.as_prepared().unwrap();
236 BindStateMachine::bind_prepared(
237 &mut self.buffer_set,
238 portal_name,
239 &stmt.wire_name(),
240 &stmt.param_oids,
241 params,
242 )?
243 };
244
245 loop {
247 match state_machine.step(&mut self.buffer_set)? {
248 Action::ReadMessage => {
249 self.stream.read_message(&mut self.buffer_set)?;
250 }
251 Action::Write => {
252 self.stream.write_all(&self.buffer_set.write_buffer)?;
253 self.stream.flush()?;
254 }
255 Action::WriteAndReadMessage => {
256 self.stream.write_all(&self.buffer_set.write_buffer)?;
257 self.stream.flush()?;
258 self.stream.read_message(&mut self.buffer_set)?;
259 }
260 Action::Finished => break,
261 _ => return Err(Error::Protocol("Unexpected action in bind".into())),
262 }
263 }
264
265 Ok(())
266 }
267
268 pub fn set_async_message_handler<H: AsyncMessageHandler + 'static>(&mut self, handler: H) {
275 self.async_message_handler = Some(Box::new(handler));
276 }
277
278 pub fn clear_async_message_handler(&mut self) {
280 self.async_message_handler = None;
281 }
282
283 pub fn run_pipeline<T, F>(&mut self, f: F) -> Result<T>
304 where
305 F: FnOnce(&mut super::pipeline::Pipeline<'_>) -> Result<T>,
306 {
307 let mut pipeline = super::pipeline::Pipeline::new_inner(self);
308 let result = f(&mut pipeline);
309 pipeline.cleanup();
310 result
311 }
312
313 pub fn ping(&mut self) -> Result<()> {
315 self.query_drop("")?;
316 Ok(())
317 }
318
319 fn drive<S: StateMachine>(&mut self, state_machine: &mut S) -> Result<()> {
321 loop {
322 match state_machine.step(&mut self.buffer_set)? {
323 Action::WriteAndReadByte => {
324 return Err(Error::Protocol(
325 "Unexpected WriteAndReadByte in query state machine".into(),
326 ));
327 }
328 Action::ReadMessage => {
329 self.stream.read_message(&mut self.buffer_set)?;
330 }
331 Action::Write => {
332 self.stream.write_all(&self.buffer_set.write_buffer)?;
333 self.stream.flush()?;
334 }
335 Action::WriteAndReadMessage => {
336 self.stream.write_all(&self.buffer_set.write_buffer)?;
337 self.stream.flush()?;
338 self.stream.read_message(&mut self.buffer_set)?;
339 }
340 Action::TlsHandshake => {
341 return Err(Error::Protocol(
342 "Unexpected TlsHandshake in query state machine".into(),
343 ));
344 }
345 Action::HandleAsyncMessageAndReadMessage(ref async_msg) => {
346 if let Some(ref mut h) = self.async_message_handler {
347 h.handle(async_msg);
348 }
349 self.stream.read_message(&mut self.buffer_set)?;
351 }
352 Action::Finished => {
353 self.transaction_status = state_machine.transaction_status();
354 break;
355 }
356 }
357 }
358 Ok(())
359 }
360
361 pub fn query<H: TextHandler>(&mut self, sql: &str, handler: &mut H) -> Result<()> {
363 let result = self.query_inner(sql, handler);
364 if let Err(e) = &result
365 && e.is_connection_broken()
366 {
367 self.is_broken = true;
368 }
369 result
370 }
371
372 fn query_inner<H: TextHandler>(&mut self, sql: &str, handler: &mut H) -> Result<()> {
373 let mut state_machine = SimpleQueryStateMachine::new(handler, sql);
374 self.drive(&mut state_machine)
375 }
376
377 pub fn query_drop(&mut self, sql: &str) -> Result<Option<u64>> {
379 let mut handler = DropHandler::new();
380 self.query(sql, &mut handler)?;
381 Ok(handler.rows_affected())
382 }
383
384 pub fn query_collect<T: for<'a> crate::conversion::FromRow<'a>>(
395 &mut self,
396 sql: &str,
397 ) -> Result<Vec<T>> {
398 let mut handler = crate::handler::CollectHandler::<T>::new();
399 self.query(sql, &mut handler)?;
400 Ok(handler.into_rows())
401 }
402
403 pub fn query_first<T: for<'a> crate::conversion::FromRow<'a>>(
405 &mut self,
406 sql: &str,
407 ) -> Result<Option<T>> {
408 let mut handler = crate::handler::FirstRowHandler::<T>::new();
409 self.query(sql, &mut handler)?;
410 Ok(handler.into_row())
411 }
412
413 pub fn close(mut self) -> Result<()> {
415 self.buffer_set.write_buffer.clear();
416 write_terminate(&mut self.buffer_set.write_buffer);
417 self.stream.write_all(&self.buffer_set.write_buffer)?;
418 self.stream.flush()?;
419 Ok(())
420 }
421
422 pub fn prepare(&mut self, query: &str) -> Result<PreparedStatement> {
426 self.prepare_typed(query, &[])
427 }
428
429 pub fn prepare_batch(&mut self, queries: &[&str]) -> Result<Vec<PreparedStatement>> {
446 if queries.is_empty() {
447 return Ok(Vec::new());
448 }
449
450 let start_idx = self.name_counter + 1;
451 self.name_counter += queries.len() as u64;
452
453 let result = self.prepare_batch_inner(queries, start_idx);
454 if let Err(e) = &result
455 && e.is_connection_broken()
456 {
457 self.is_broken = true;
458 }
459 result
460 }
461
462 fn prepare_batch_inner(
463 &mut self,
464 queries: &[&str],
465 start_idx: u64,
466 ) -> Result<Vec<PreparedStatement>> {
467 use crate::state::batch_prepare::BatchPrepareStateMachine;
468
469 let mut state_machine =
470 BatchPrepareStateMachine::new(&mut self.buffer_set, queries, start_idx);
471
472 loop {
473 match state_machine.step(&mut self.buffer_set)? {
474 Action::ReadMessage => {
475 self.stream.read_message(&mut self.buffer_set)?;
476 }
477 Action::WriteAndReadMessage => {
478 self.stream.write_all(&self.buffer_set.write_buffer)?;
479 self.stream.flush()?;
480 self.stream.read_message(&mut self.buffer_set)?;
481 }
482 Action::Finished => {
483 self.transaction_status = state_machine.transaction_status();
484 break;
485 }
486 _ => return Err(Error::Protocol("Unexpected action in batch prepare".into())),
487 }
488 }
489
490 Ok(state_machine.take_statements())
491 }
492
493 pub fn prepare_typed(&mut self, query: &str, param_oids: &[u32]) -> Result<PreparedStatement> {
495 self.name_counter += 1;
496 let idx = self.name_counter;
497 let result = self.prepare_inner(idx, query, param_oids);
498 if let Err(e) = &result
499 && e.is_connection_broken()
500 {
501 self.is_broken = true;
502 }
503 result
504 }
505
506 fn prepare_inner(
507 &mut self,
508 idx: u64,
509 query: &str,
510 param_oids: &[u32],
511 ) -> Result<PreparedStatement> {
512 let mut handler = DropHandler::new();
513 let mut state_machine = ExtendedQueryStateMachine::prepare(
514 &mut handler,
515 &mut self.buffer_set,
516 idx,
517 query,
518 param_oids,
519 );
520 self.drive(&mut state_machine)?;
521 state_machine
522 .take_prepared_statement()
523 .ok_or_else(|| Error::Protocol("No prepared statement".into()))
524 }
525
526 pub fn exec<S: IntoStatement, P: ToParams, H: BinaryHandler>(
543 &mut self,
544 statement: S,
545 params: P,
546 handler: &mut H,
547 ) -> Result<()> {
548 let result = self.exec_inner(&statement, ¶ms, handler);
549 if let Err(e) = &result
550 && e.is_connection_broken()
551 {
552 self.is_broken = true;
553 }
554 result
555 }
556
557 fn exec_inner<S: IntoStatement, P: ToParams, H: BinaryHandler>(
558 &mut self,
559 statement: &S,
560 params: &P,
561 handler: &mut H,
562 ) -> Result<()> {
563 let mut state_machine = if statement.needs_parse() {
564 ExtendedQueryStateMachine::execute_sql(
565 handler,
566 &mut self.buffer_set,
567 statement.as_sql().unwrap(),
568 params,
569 )?
570 } else {
571 let stmt = statement.as_prepared().unwrap();
572 ExtendedQueryStateMachine::execute(
573 handler,
574 &mut self.buffer_set,
575 &stmt.wire_name(),
576 &stmt.param_oids,
577 params,
578 )?
579 };
580
581 self.drive(&mut state_machine)
582 }
583
584 pub fn exec_drop<S: IntoStatement, P: ToParams>(
588 &mut self,
589 statement: S,
590 params: P,
591 ) -> Result<Option<u64>> {
592 let mut handler = DropHandler::new();
593 self.exec(statement, params, &mut handler)?;
594 Ok(handler.rows_affected())
595 }
596
597 pub fn exec_collect<
611 T: for<'a> crate::conversion::FromRow<'a>,
612 S: IntoStatement,
613 P: ToParams,
614 >(
615 &mut self,
616 statement: S,
617 params: P,
618 ) -> Result<Vec<T>> {
619 let mut handler = crate::handler::CollectHandler::<T>::new();
620 self.exec(statement, params, &mut handler)?;
621 Ok(handler.into_rows())
622 }
623
624 pub fn exec_first<T: for<'a> crate::conversion::FromRow<'a>, S: IntoStatement, P: ToParams>(
638 &mut self,
639 statement: S,
640 params: P,
641 ) -> Result<Option<T>> {
642 let mut handler = crate::handler::FirstRowHandler::<T>::new();
643 self.exec(statement, params, &mut handler)?;
644 Ok(handler.into_row())
645 }
646
647 pub fn exec_batch<S: IntoStatement, P: ToParams>(
678 &mut self,
679 statement: S,
680 params_list: &[P],
681 ) -> Result<()> {
682 self.exec_batch_chunked(statement, params_list, 1000)
683 }
684
685 pub fn exec_batch_chunked<S: IntoStatement, P: ToParams>(
689 &mut self,
690 statement: S,
691 params_list: &[P],
692 chunk_size: usize,
693 ) -> Result<()> {
694 let result = self.exec_batch_inner(&statement, params_list, chunk_size);
695 if let Err(e) = &result
696 && e.is_connection_broken()
697 {
698 self.is_broken = true;
699 }
700 result
701 }
702
703 fn exec_batch_inner<S: IntoStatement, P: ToParams>(
704 &mut self,
705 statement: &S,
706 params_list: &[P],
707 chunk_size: usize,
708 ) -> Result<()> {
709 use crate::protocol::frontend::{write_bind, write_execute, write_parse, write_sync};
710 use crate::state::extended::BatchStateMachine;
711
712 if params_list.is_empty() {
713 return Ok(());
714 }
715
716 let chunk_size = chunk_size.max(1);
717 let needs_parse = statement.needs_parse();
718 let sql = statement.as_sql();
719 let prepared = statement.as_prepared();
720
721 let param_oids: Vec<u32> = if let Some(stmt) = prepared {
723 stmt.param_oids.clone()
724 } else {
725 params_list[0].natural_oids()
726 };
727
728 let stmt_name = prepared.map(|s| s.wire_name()).unwrap_or_default();
730
731 for chunk in params_list.chunks(chunk_size) {
732 self.buffer_set.write_buffer.clear();
733
734 let parse_in_chunk = needs_parse;
736 if parse_in_chunk {
737 write_parse(
738 &mut self.buffer_set.write_buffer,
739 "",
740 sql.unwrap(),
741 ¶m_oids,
742 );
743 }
744
745 for params in chunk {
747 let effective_stmt_name = if needs_parse { "" } else { &stmt_name };
748 write_bind(
749 &mut self.buffer_set.write_buffer,
750 "",
751 effective_stmt_name,
752 params,
753 ¶m_oids,
754 )?;
755 write_execute(&mut self.buffer_set.write_buffer, "", 0);
756 }
757
758 write_sync(&mut self.buffer_set.write_buffer);
760
761 let mut state_machine = BatchStateMachine::new(parse_in_chunk);
763 self.drive_batch(&mut state_machine)?;
764 self.transaction_status = state_machine.transaction_status();
765 }
766
767 Ok(())
768 }
769
770 fn drive_batch(
772 &mut self,
773 state_machine: &mut crate::state::extended::BatchStateMachine,
774 ) -> Result<()> {
775 use crate::protocol::backend::{ReadyForQuery, msg_type};
776 use crate::state::action::Action;
777
778 loop {
779 let step_result = state_machine.step(&mut self.buffer_set);
780 match step_result {
781 Ok(Action::ReadMessage) => {
782 self.stream.read_message(&mut self.buffer_set)?;
783 }
784 Ok(Action::WriteAndReadMessage) => {
785 self.stream.write_all(&self.buffer_set.write_buffer)?;
786 self.stream.flush()?;
787 self.stream.read_message(&mut self.buffer_set)?;
788 }
789 Ok(Action::Finished) => {
790 break;
791 }
792 Ok(_) => return Err(Error::Protocol("Unexpected action in batch".into())),
793 Err(e) => {
794 loop {
796 self.stream.read_message(&mut self.buffer_set)?;
797 if self.buffer_set.type_byte == msg_type::READY_FOR_QUERY {
798 let ready = ReadyForQuery::parse(&self.buffer_set.read_buffer)?;
799 self.transaction_status =
800 ready.transaction_status().unwrap_or_default();
801 break;
802 }
803 }
804 return Err(e);
805 }
806 }
807 }
808 Ok(())
809 }
810
811 pub fn close_statement(&mut self, stmt: &PreparedStatement) -> Result<()> {
813 let result = self.close_statement_inner(&stmt.wire_name());
814 if let Err(e) = &result
815 && e.is_connection_broken()
816 {
817 self.is_broken = true;
818 }
819 result
820 }
821
822 fn close_statement_inner(&mut self, name: &str) -> Result<()> {
823 let mut handler = DropHandler::new();
824 let mut state_machine =
825 ExtendedQueryStateMachine::close_statement(&mut handler, &mut self.buffer_set, name);
826 self.drive(&mut state_machine)
827 }
828
829 pub fn tx<F, R>(&mut self, f: F) -> Result<R>
839 where
840 F: FnOnce(&mut Conn, super::transaction::Transaction) -> Result<R>,
841 {
842 if self.in_transaction() {
843 return Err(Error::InvalidUsage(
844 "nested transactions are not supported".into(),
845 ));
846 }
847
848 self.query_drop("BEGIN")?;
849
850 let tx = super::transaction::Transaction::new(self.connection_id());
851 let result = f(self, tx);
852
853 if self.in_transaction() {
855 let rollback_result = self.query_drop("ROLLBACK");
856
857 if let Err(e) = result {
859 return Err(e);
860 }
861 rollback_result?;
862 }
863
864 result
865 }
866}
867
868impl Conn {
871 pub fn lowlevel_bind<P: ToParams>(
881 &mut self,
882 portal: &str,
883 statement_name: &str,
884 params: P,
885 ) -> Result<()> {
886 let result = self.lowlevel_bind_inner(portal, statement_name, ¶ms);
887 if let Err(e) = &result
888 && e.is_connection_broken()
889 {
890 self.is_broken = true;
891 }
892 result
893 }
894
895 fn lowlevel_bind_inner<P: ToParams>(
896 &mut self,
897 portal: &str,
898 statement_name: &str,
899 params: &P,
900 ) -> Result<()> {
901 use crate::protocol::backend::{BindComplete, ErrorResponse, RawMessage, msg_type};
902 use crate::protocol::frontend::{write_bind, write_flush};
903
904 let param_oids = params.natural_oids();
905 self.buffer_set.write_buffer.clear();
906 write_bind(
907 &mut self.buffer_set.write_buffer,
908 portal,
909 statement_name,
910 params,
911 ¶m_oids,
912 )?;
913 write_flush(&mut self.buffer_set.write_buffer);
914
915 self.stream.write_all(&self.buffer_set.write_buffer)?;
916 self.stream.flush()?;
917
918 loop {
919 self.stream.read_message(&mut self.buffer_set)?;
920 let type_byte = self.buffer_set.type_byte;
921
922 if RawMessage::is_async_type(type_byte) {
923 continue;
924 }
925
926 match type_byte {
927 msg_type::BIND_COMPLETE => {
928 BindComplete::parse(&self.buffer_set.read_buffer)?;
929 return Ok(());
930 }
931 msg_type::ERROR_RESPONSE => {
932 let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
933 return Err(error.into_error());
934 }
935 _ => {
936 return Err(Error::Protocol(format!(
937 "Expected BindComplete or ErrorResponse, got '{}'",
938 type_byte as char
939 )));
940 }
941 }
942 }
943 }
944
945 pub fn lowlevel_execute<H: BinaryHandler>(
958 &mut self,
959 portal: &str,
960 max_rows: u32,
961 handler: &mut H,
962 ) -> Result<bool> {
963 let result = self.lowlevel_execute_inner(portal, max_rows, handler);
964 if let Err(e) = &result
965 && e.is_connection_broken()
966 {
967 self.is_broken = true;
968 }
969 result
970 }
971
972 fn lowlevel_execute_inner<H: BinaryHandler>(
973 &mut self,
974 portal: &str,
975 max_rows: u32,
976 handler: &mut H,
977 ) -> Result<bool> {
978 use crate::protocol::backend::{
979 CommandComplete, DataRow, ErrorResponse, NoData, PortalSuspended, RawMessage,
980 RowDescription, msg_type,
981 };
982 use crate::protocol::frontend::{write_describe_portal, write_execute, write_flush};
983
984 self.buffer_set.write_buffer.clear();
985 write_describe_portal(&mut self.buffer_set.write_buffer, portal);
986 write_execute(&mut self.buffer_set.write_buffer, portal, max_rows);
987 write_flush(&mut self.buffer_set.write_buffer);
988
989 self.stream.write_all(&self.buffer_set.write_buffer)?;
990 self.stream.flush()?;
991
992 let mut column_buffer: Vec<u8> = Vec::new();
993
994 loop {
995 self.stream.read_message(&mut self.buffer_set)?;
996 let type_byte = self.buffer_set.type_byte;
997
998 if RawMessage::is_async_type(type_byte) {
999 continue;
1000 }
1001
1002 match type_byte {
1003 msg_type::ROW_DESCRIPTION => {
1004 column_buffer.clear();
1005 column_buffer.extend_from_slice(&self.buffer_set.read_buffer);
1006 let cols = RowDescription::parse(&column_buffer)?;
1007 handler.result_start(cols)?;
1008 }
1009 msg_type::NO_DATA => {
1010 NoData::parse(&self.buffer_set.read_buffer)?;
1011 }
1012 msg_type::DATA_ROW => {
1013 let cols = RowDescription::parse(&column_buffer)?;
1014 let row = DataRow::parse(&self.buffer_set.read_buffer)?;
1015 handler.row(cols, row)?;
1016 }
1017 msg_type::COMMAND_COMPLETE => {
1018 let complete = CommandComplete::parse(&self.buffer_set.read_buffer)?;
1019 handler.result_end(complete)?;
1020 return Ok(false); }
1022 msg_type::PORTAL_SUSPENDED => {
1023 PortalSuspended::parse(&self.buffer_set.read_buffer)?;
1024 return Ok(true); }
1026 msg_type::ERROR_RESPONSE => {
1027 let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1028 return Err(error.into_error());
1029 }
1030 _ => {
1031 return Err(Error::Protocol(format!(
1032 "Unexpected message in execute: '{}'",
1033 type_byte as char
1034 )));
1035 }
1036 }
1037 }
1038 }
1039
1040 pub fn lowlevel_sync(&mut self) -> Result<()> {
1047 let result = self.lowlevel_sync_inner();
1048 if let Err(e) = &result
1049 && e.is_connection_broken()
1050 {
1051 self.is_broken = true;
1052 }
1053 result
1054 }
1055
1056 fn lowlevel_sync_inner(&mut self) -> Result<()> {
1057 use crate::protocol::backend::{ErrorResponse, RawMessage, ReadyForQuery, msg_type};
1058 use crate::protocol::frontend::write_sync;
1059
1060 self.buffer_set.write_buffer.clear();
1061 write_sync(&mut self.buffer_set.write_buffer);
1062
1063 self.stream.write_all(&self.buffer_set.write_buffer)?;
1064 self.stream.flush()?;
1065
1066 let mut pending_error: Option<Error> = None;
1067
1068 loop {
1069 self.stream.read_message(&mut self.buffer_set)?;
1070 let type_byte = self.buffer_set.type_byte;
1071
1072 if RawMessage::is_async_type(type_byte) {
1073 continue;
1074 }
1075
1076 match type_byte {
1077 msg_type::READY_FOR_QUERY => {
1078 let ready = ReadyForQuery::parse(&self.buffer_set.read_buffer)?;
1079 self.transaction_status = ready.transaction_status().unwrap_or_default();
1080 if let Some(e) = pending_error {
1081 return Err(e);
1082 }
1083 return Ok(());
1084 }
1085 msg_type::ERROR_RESPONSE => {
1086 let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1087 pending_error = Some(error.into_error());
1088 }
1089 _ => {
1090 }
1092 }
1093 }
1094 }
1095
1096 pub fn lowlevel_flush(&mut self) -> Result<()> {
1103 use crate::protocol::frontend::write_flush;
1104
1105 self.buffer_set.write_buffer.clear();
1106 write_flush(&mut self.buffer_set.write_buffer);
1107
1108 self.stream.write_all(&self.buffer_set.write_buffer)?;
1109 self.stream.flush()?;
1110 Ok(())
1111 }
1112
1113 pub fn exec_iter<S: IntoStatement, P, F, T>(
1143 &mut self,
1144 statement: S,
1145 params: P,
1146 f: F,
1147 ) -> Result<T>
1148 where
1149 P: ToParams,
1150 F: FnOnce(&mut UnnamedPortal<'_>) -> Result<T>,
1151 {
1152 let result = self.exec_iter_inner(&statement, ¶ms, f);
1153 if let Err(e) = &result
1154 && e.is_connection_broken()
1155 {
1156 self.is_broken = true;
1157 }
1158 result
1159 }
1160
1161 fn exec_iter_inner<S: IntoStatement, P, F, T>(
1162 &mut self,
1163 statement: &S,
1164 params: &P,
1165 f: F,
1166 ) -> Result<T>
1167 where
1168 P: ToParams,
1169 F: FnOnce(&mut UnnamedPortal<'_>) -> Result<T>,
1170 {
1171 let mut state_machine = if let Some(sql) = statement.as_sql() {
1173 BindStateMachine::bind_sql(&mut self.buffer_set, "", sql, params)?
1174 } else {
1175 let stmt = statement.as_prepared().unwrap();
1176 BindStateMachine::bind_prepared(
1177 &mut self.buffer_set,
1178 "",
1179 &stmt.wire_name(),
1180 &stmt.param_oids,
1181 params,
1182 )?
1183 };
1184
1185 loop {
1187 match state_machine.step(&mut self.buffer_set)? {
1188 Action::ReadMessage => {
1189 self.stream.read_message(&mut self.buffer_set)?;
1190 }
1191 Action::Write => {
1192 self.stream.write_all(&self.buffer_set.write_buffer)?;
1193 self.stream.flush()?;
1194 }
1195 Action::WriteAndReadMessage => {
1196 self.stream.write_all(&self.buffer_set.write_buffer)?;
1197 self.stream.flush()?;
1198 self.stream.read_message(&mut self.buffer_set)?;
1199 }
1200 Action::Finished => break,
1201 _ => return Err(Error::Protocol("Unexpected action in bind".into())),
1202 }
1203 }
1204
1205 let mut portal = UnnamedPortal { conn: self };
1207 let result = f(&mut portal);
1208
1209 let sync_result = portal.conn.lowlevel_sync();
1211
1212 match (result, sync_result) {
1214 (Ok(v), Ok(())) => Ok(v),
1215 (Err(e), _) => Err(e),
1216 (Ok(_), Err(e)) => Err(e),
1217 }
1218 }
1219
1220 pub fn lowlevel_close_portal(&mut self, portal: &str) -> Result<()> {
1222 let result = self.lowlevel_close_portal_inner(portal);
1223 if let Err(e) = &result
1224 && e.is_connection_broken()
1225 {
1226 self.is_broken = true;
1227 }
1228 result
1229 }
1230
1231 fn lowlevel_close_portal_inner(&mut self, portal: &str) -> Result<()> {
1232 use crate::protocol::backend::{CloseComplete, ErrorResponse, RawMessage, msg_type};
1233 use crate::protocol::frontend::{write_close_portal, write_flush};
1234
1235 self.buffer_set.write_buffer.clear();
1236 write_close_portal(&mut self.buffer_set.write_buffer, portal);
1237 write_flush(&mut self.buffer_set.write_buffer);
1238
1239 self.stream.write_all(&self.buffer_set.write_buffer)?;
1240 self.stream.flush()?;
1241
1242 loop {
1243 self.stream.read_message(&mut self.buffer_set)?;
1244 let type_byte = self.buffer_set.type_byte;
1245
1246 if RawMessage::is_async_type(type_byte) {
1247 continue;
1248 }
1249
1250 match type_byte {
1251 msg_type::CLOSE_COMPLETE => {
1252 CloseComplete::parse(&self.buffer_set.read_buffer)?;
1253 return Ok(());
1254 }
1255 msg_type::ERROR_RESPONSE => {
1256 let error = ErrorResponse::parse(&self.buffer_set.read_buffer)?;
1257 return Err(error.into_error());
1258 }
1259 _ => {
1260 return Err(Error::Protocol(format!(
1261 "Expected CloseComplete or ErrorResponse, got '{}'",
1262 type_byte as char
1263 )));
1264 }
1265 }
1266 }
1267 }
1268}
1269
1270impl Drop for Conn {
1271 fn drop(&mut self) {
1272 self.buffer_set.write_buffer.clear();
1274 write_terminate(&mut self.buffer_set.write_buffer);
1275 let _ = self.stream.write_all(&self.buffer_set.write_buffer);
1276 let _ = self.stream.flush();
1277 }
1278}