sqlx_core/postgres/connection/establish.rs
1use crate::HashMap;
2
3use crate::common::StatementCache;
4use crate::error::Error;
5use crate::io::Decode;
6use crate::postgres::connection::{sasl, stream::PgStream, tls};
7use crate::postgres::message::{
8 Authentication, BackendKeyData, MessageFormat, Password, ReadyForQuery, Startup,
9};
10use crate::postgres::types::Oid;
11use crate::postgres::{PgConnectOptions, PgConnection};
12
13// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
14// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11
15
16impl PgConnection {
17 pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
18 let mut stream = PgStream::connect(options).await?;
19
20 // Upgrade to TLS if we were asked to and the server supports it
21 tls::maybe_upgrade(&mut stream, options).await?;
22
23 // To begin a session, a frontend opens a connection to the server
24 // and sends a startup message.
25
26 let mut params = vec![
27 // Sets the display format for date and time values,
28 // as well as the rules for interpreting ambiguous date input values.
29 ("DateStyle", "ISO, MDY"),
30 // Sets the client-side encoding (character set).
31 // <https://www.postgresql.org/docs/devel/multibyte.html#MULTIBYTE-CHARSET-SUPPORTED>
32 ("client_encoding", "UTF8"),
33 // Sets the time zone for displaying and interpreting time stamps.
34 ("TimeZone", "UTC"),
35 ];
36
37 if let Some(ref extra_float_digits) = options.extra_float_digits {
38 params.push(("extra_float_digits", extra_float_digits));
39 }
40
41 if let Some(ref application_name) = options.application_name {
42 params.push(("application_name", application_name));
43 }
44
45 if let Some(ref options) = options.options {
46 params.push(("options", options));
47 }
48
49 stream
50 .send(Startup {
51 username: Some(&options.username),
52 database: options.database.as_deref(),
53 params: ¶ms,
54 })
55 .await?;
56
57 // The server then uses this information and the contents of
58 // its configuration files (such as pg_hba.conf) to determine whether the connection is
59 // provisionally acceptable, and what additional
60 // authentication is required (if any).
61
62 let mut process_id = 0;
63 let mut secret_key = 0;
64 let transaction_status;
65
66 loop {
67 let message = stream.recv().await?;
68 match message.format {
69 MessageFormat::Authentication => match message.decode()? {
70 Authentication::Ok => {
71 // the authentication exchange is successfully completed
72 // do nothing; no more information is required to continue
73 }
74
75 Authentication::CleartextPassword => {
76 // The frontend must now send a [PasswordMessage] containing the
77 // password in clear-text form.
78
79 stream
80 .send(Password::Cleartext(
81 options.password.as_deref().unwrap_or_default(),
82 ))
83 .await?;
84 }
85
86 Authentication::Md5Password(body) => {
87 // The frontend must now send a [PasswordMessage] containing the
88 // password (with user name) encrypted via MD5, then encrypted again
89 // using the 4-byte random salt specified in the
90 // [AuthenticationMD5Password] message.
91
92 stream
93 .send(Password::Md5 {
94 username: &options.username,
95 password: options.password.as_deref().unwrap_or_default(),
96 salt: body.salt,
97 })
98 .await?;
99 }
100
101 Authentication::Sasl(body) => {
102 sasl::authenticate(&mut stream, options, body).await?;
103 }
104
105 method => {
106 return Err(err_protocol!(
107 "unsupported authentication method: {:?}",
108 method
109 ));
110 }
111 },
112
113 MessageFormat::BackendKeyData => {
114 // provides secret-key data that the frontend must save if it wants to be
115 // able to issue cancel requests later
116
117 let data: BackendKeyData = message.decode()?;
118
119 process_id = data.process_id;
120 secret_key = data.secret_key;
121 }
122
123 MessageFormat::ReadyForQuery => {
124 // start-up is completed. The frontend can now issue commands
125 transaction_status =
126 ReadyForQuery::decode(message.contents)?.transaction_status;
127
128 break;
129 }
130
131 _ => {
132 return Err(err_protocol!(
133 "establish: unexpected message: {:?}",
134 message.format
135 ))
136 }
137 }
138 }
139
140 Ok(PgConnection {
141 stream,
142 process_id,
143 secret_key,
144 transaction_status,
145 transaction_depth: 0,
146 pending_ready_for_query_count: 0,
147 next_statement_id: Oid(1),
148 cache_statement: StatementCache::new(options.statement_cache_capacity),
149 cache_type_oid: HashMap::new(),
150 cache_type_info: HashMap::new(),
151 log_settings: options.log_settings.clone(),
152 })
153 }
154}