sqlx_postgres/connection/
establish.rs

1use crate::HashMap;
2
3use crate::common::StatementCache;
4use crate::connection::{sasl, stream::PgStream};
5use crate::error::Error;
6use crate::io::StatementId;
7use crate::message::{
8    Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
9};
10use crate::{PgConnectOptions, PgConnection};
11
12use super::PgConnectionInner;
13
14// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
15// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11
16
17impl PgConnection {
18    pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
19        // Upgrade to TLS if we were asked to and the server supports it
20        let mut stream = PgStream::connect(options).await?;
21
22        // To begin a session, a frontend opens a connection to the server
23        // and sends a startup message.
24
25        let mut params = vec![
26            // Sets the display format for date and time values,
27            // as well as the rules for interpreting ambiguous date input values.
28            ("DateStyle", "ISO, MDY"),
29            // Sets the client-side encoding (character set).
30            // <https://www.postgresql.org/docs/devel/multibyte.html#MULTIBYTE-CHARSET-SUPPORTED>
31            ("client_encoding", "UTF8"),
32            // Sets the time zone for displaying and interpreting time stamps.
33            ("TimeZone", "UTC"),
34        ];
35
36        if let Some(ref extra_float_digits) = options.extra_float_digits {
37            params.push(("extra_float_digits", extra_float_digits));
38        }
39
40        if let Some(ref application_name) = options.application_name {
41            params.push(("application_name", application_name));
42        }
43
44        if let Some(ref options) = options.options {
45            params.push(("options", options));
46        }
47
48        stream.write(Startup {
49            username: Some(&options.username),
50            database: options.database.as_deref(),
51            params: &params,
52        })?;
53
54        stream.flush().await?;
55
56        // The server then uses this information and the contents of
57        // its configuration files (such as pg_hba.conf) to determine whether the connection is
58        // provisionally acceptable, and what additional
59        // authentication is required (if any).
60
61        let mut process_id = 0;
62        let mut secret_key = 0;
63        let transaction_status;
64
65        loop {
66            let message = stream.recv().await?;
67            match message.format {
68                BackendMessageFormat::Authentication => match message.decode()? {
69                    Authentication::Ok => {
70                        // the authentication exchange is successfully completed
71                        // do nothing; no more information is required to continue
72                    }
73
74                    Authentication::CleartextPassword => {
75                        // The frontend must now send a [PasswordMessage] containing the
76                        // password in clear-text form.
77
78                        stream
79                            .send(Password::Cleartext(
80                                options.password.as_deref().unwrap_or_default(),
81                            ))
82                            .await?;
83                    }
84
85                    Authentication::Md5Password(body) => {
86                        // The frontend must now send a [PasswordMessage] containing the
87                        // password (with user name) encrypted via MD5, then encrypted again
88                        // using the 4-byte random salt specified in the
89                        // [AuthenticationMD5Password] message.
90
91                        stream
92                            .send(Password::Md5 {
93                                username: &options.username,
94                                password: options.password.as_deref().unwrap_or_default(),
95                                salt: body.salt,
96                            })
97                            .await?;
98                    }
99
100                    Authentication::Sasl(body) => {
101                        sasl::authenticate(&mut stream, options, body).await?;
102                    }
103
104                    method => {
105                        return Err(err_protocol!(
106                            "unsupported authentication method: {:?}",
107                            method
108                        ));
109                    }
110                },
111
112                BackendMessageFormat::BackendKeyData => {
113                    // provides secret-key data that the frontend must save if it wants to be
114                    // able to issue cancel requests later
115
116                    let data: BackendKeyData = message.decode()?;
117
118                    process_id = data.process_id;
119                    secret_key = data.secret_key;
120                }
121
122                BackendMessageFormat::ReadyForQuery => {
123                    // start-up is completed. The frontend can now issue commands
124                    transaction_status = message.decode::<ReadyForQuery>()?.transaction_status;
125
126                    break;
127                }
128
129                _ => {
130                    return Err(err_protocol!(
131                        "establish: unexpected message: {:?}",
132                        message.format
133                    ))
134                }
135            }
136        }
137
138        Ok(PgConnection {
139            inner: Box::new(PgConnectionInner {
140                stream,
141                process_id,
142                secret_key,
143                transaction_status,
144                transaction_depth: 0,
145                pending_ready_for_query_count: 0,
146                next_statement_id: StatementId::NAMED_START,
147                cache_statement: StatementCache::new(options.statement_cache_capacity),
148                cache_type_oid: HashMap::new(),
149                cache_type_info: HashMap::new(),
150                cache_elem_type_to_array: HashMap::new(),
151                log_settings: options.log_settings.clone(),
152            }),
153        })
154    }
155}