sqlx_postgres/connection/establish.rs
1use crate::HashMap;
2
3use crate::common::StatementCache;
4use crate::connection::{sasl, stream::PgStream};
5use crate::error::Error;
6use crate::io::StatementId;
7use crate::message::{
8 Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
9};
10use crate::{PgConnectOptions, PgConnection};
11
12use super::PgConnectionInner;
13
14// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
15// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11
16
17impl PgConnection {
18 pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
19 // Upgrade to TLS if we were asked to and the server supports it
20 let mut stream = PgStream::connect(options).await?;
21
22 // To begin a session, a frontend opens a connection to the server
23 // and sends a startup message.
24
25 let mut params = vec![
26 // Sets the display format for date and time values,
27 // as well as the rules for interpreting ambiguous date input values.
28 ("DateStyle", "ISO, MDY"),
29 // Sets the client-side encoding (character set).
30 // <https://www.postgresql.org/docs/devel/multibyte.html#MULTIBYTE-CHARSET-SUPPORTED>
31 ("client_encoding", "UTF8"),
32 // Sets the time zone for displaying and interpreting time stamps.
33 ("TimeZone", "UTC"),
34 ];
35
36 if let Some(ref extra_float_digits) = options.extra_float_digits {
37 params.push(("extra_float_digits", extra_float_digits));
38 }
39
40 if let Some(ref application_name) = options.application_name {
41 params.push(("application_name", application_name));
42 }
43
44 if let Some(ref options) = options.options {
45 params.push(("options", options));
46 }
47
48 stream.write(Startup {
49 username: Some(&options.username),
50 database: options.database.as_deref(),
51 params: ¶ms,
52 })?;
53
54 stream.flush().await?;
55
56 // The server then uses this information and the contents of
57 // its configuration files (such as pg_hba.conf) to determine whether the connection is
58 // provisionally acceptable, and what additional
59 // authentication is required (if any).
60
61 let mut process_id = 0;
62 let mut secret_key = 0;
63 let transaction_status;
64
65 loop {
66 let message = stream.recv().await?;
67 match message.format {
68 BackendMessageFormat::Authentication => match message.decode()? {
69 Authentication::Ok => {
70 // the authentication exchange is successfully completed
71 // do nothing; no more information is required to continue
72 }
73
74 Authentication::CleartextPassword => {
75 // The frontend must now send a [PasswordMessage] containing the
76 // password in clear-text form.
77
78 stream
79 .send(Password::Cleartext(
80 options.password.as_deref().unwrap_or_default(),
81 ))
82 .await?;
83 }
84
85 Authentication::Md5Password(body) => {
86 // The frontend must now send a [PasswordMessage] containing the
87 // password (with user name) encrypted via MD5, then encrypted again
88 // using the 4-byte random salt specified in the
89 // [AuthenticationMD5Password] message.
90
91 stream
92 .send(Password::Md5 {
93 username: &options.username,
94 password: options.password.as_deref().unwrap_or_default(),
95 salt: body.salt,
96 })
97 .await?;
98 }
99
100 Authentication::Sasl(body) => {
101 sasl::authenticate(&mut stream, options, body).await?;
102 }
103
104 method => {
105 return Err(err_protocol!(
106 "unsupported authentication method: {:?}",
107 method
108 ));
109 }
110 },
111
112 BackendMessageFormat::BackendKeyData => {
113 // provides secret-key data that the frontend must save if it wants to be
114 // able to issue cancel requests later
115
116 let data: BackendKeyData = message.decode()?;
117
118 process_id = data.process_id;
119 secret_key = data.secret_key;
120 }
121
122 BackendMessageFormat::ReadyForQuery => {
123 // start-up is completed. The frontend can now issue commands
124 transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
125
126 break;
127 }
128
129 _ => {
130 return Err(err_protocol!(
131 "establish: unexpected message: {:?}",
132 message.format
133 ))
134 }
135 }
136 }
137
138 Ok(PgConnection {
139 inner: Box::new(PgConnectionInner {
140 stream,
141 process_id,
142 secret_key,
143 transaction_status,
144 transaction_depth: 0,
145 pending_ready_for_query_count: 0,
146 next_statement_id: StatementId::NAMED_START,
147 cache_statement: StatementCache::new(options.statement_cache_capacity),
148 cache_type_oid: HashMap::new(),
149 cache_type_info: HashMap::new(),
150 cache_elem_type_to_array: HashMap::new(),
151 log_settings: options.log_settings.clone(),
152 }),
153 })
154 }
155}