sqlx_core/postgres/connection/
establish.rs

1use crate::HashMap;
2
3use crate::common::StatementCache;
4use crate::error::Error;
5use crate::io::Decode;
6use crate::postgres::connection::{sasl, stream::PgStream, tls};
7use crate::postgres::message::{
8    Authentication, BackendKeyData, MessageFormat, Password, ReadyForQuery, Startup,
9};
10use crate::postgres::types::Oid;
11use crate::postgres::{PgConnectOptions, PgConnection};
12
13// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.3
14// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11
15
16impl PgConnection {
17    pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
18        let mut stream = PgStream::connect(options).await?;
19
20        // Upgrade to TLS if we were asked to and the server supports it
21        tls::maybe_upgrade(&mut stream, options).await?;
22
23        // To begin a session, a frontend opens a connection to the server
24        // and sends a startup message.
25
26        let mut params = vec![
27            // Sets the display format for date and time values,
28            // as well as the rules for interpreting ambiguous date input values.
29            ("DateStyle", "ISO, MDY"),
30            // Sets the client-side encoding (character set).
31            // <https://www.postgresql.org/docs/devel/multibyte.html#MULTIBYTE-CHARSET-SUPPORTED>
32            ("client_encoding", "UTF8"),
33            // Sets the time zone for displaying and interpreting time stamps.
34            ("TimeZone", "UTC"),
35        ];
36
37        if let Some(ref extra_float_digits) = options.extra_float_digits {
38            params.push(("extra_float_digits", extra_float_digits));
39        }
40
41        if let Some(ref application_name) = options.application_name {
42            params.push(("application_name", application_name));
43        }
44
45        if let Some(ref options) = options.options {
46            params.push(("options", options));
47        }
48
49        stream
50            .send(Startup {
51                username: Some(&options.username),
52                database: options.database.as_deref(),
53                params: &params,
54            })
55            .await?;
56
57        // The server then uses this information and the contents of
58        // its configuration files (such as pg_hba.conf) to determine whether the connection is
59        // provisionally acceptable, and what additional
60        // authentication is required (if any).
61
62        let mut process_id = 0;
63        let mut secret_key = 0;
64        let transaction_status;
65
66        loop {
67            let message = stream.recv().await?;
68            match message.format {
69                MessageFormat::Authentication => match message.decode()? {
70                    Authentication::Ok => {
71                        // the authentication exchange is successfully completed
72                        // do nothing; no more information is required to continue
73                    }
74
75                    Authentication::CleartextPassword => {
76                        // The frontend must now send a [PasswordMessage] containing the
77                        // password in clear-text form.
78
79                        stream
80                            .send(Password::Cleartext(
81                                options.password.as_deref().unwrap_or_default(),
82                            ))
83                            .await?;
84                    }
85
86                    Authentication::Md5Password(body) => {
87                        // The frontend must now send a [PasswordMessage] containing the
88                        // password (with user name) encrypted via MD5, then encrypted again
89                        // using the 4-byte random salt specified in the
90                        // [AuthenticationMD5Password] message.
91
92                        stream
93                            .send(Password::Md5 {
94                                username: &options.username,
95                                password: options.password.as_deref().unwrap_or_default(),
96                                salt: body.salt,
97                            })
98                            .await?;
99                    }
100
101                    Authentication::Sasl(body) => {
102                        sasl::authenticate(&mut stream, options, body).await?;
103                    }
104
105                    method => {
106                        return Err(err_protocol!(
107                            "unsupported authentication method: {:?}",
108                            method
109                        ));
110                    }
111                },
112
113                MessageFormat::BackendKeyData => {
114                    // provides secret-key data that the frontend must save if it wants to be
115                    // able to issue cancel requests later
116
117                    let data: BackendKeyData = message.decode()?;
118
119                    process_id = data.process_id;
120                    secret_key = data.secret_key;
121                }
122
123                MessageFormat::ReadyForQuery => {
124                    // start-up is completed. The frontend can now issue commands
125                    transaction_status =
126                        ReadyForQuery::decode(message.contents)?.transaction_status;
127
128                    break;
129                }
130
131                _ => {
132                    return Err(err_protocol!(
133                        "establish: unexpected message: {:?}",
134                        message.format
135                    ))
136                }
137            }
138        }
139
140        Ok(PgConnection {
141            stream,
142            process_id,
143            secret_key,
144            transaction_status,
145            transaction_depth: 0,
146            pending_ready_for_query_count: 0,
147            next_statement_id: Oid(1),
148            cache_statement: StatementCache::new(options.statement_cache_capacity),
149            cache_type_oid: HashMap::new(),
150            cache_type_info: HashMap::new(),
151            log_settings: options.log_settings.clone(),
152        })
153    }
154}