use crate::HashMap;
use crate::common::StatementCache;
use crate::connection::{sasl, stream::PgStream};
use crate::error::Error;
use crate::io::Decode;
use crate::message::{
Authentication, BackendKeyData, MessageFormat, Password, ReadyForQuery, Startup,
};
use crate::types::Oid;
use crate::{PgConnectOptions, PgConnection};
impl PgConnection {
pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
let mut stream = PgStream::connect(options).await?;
let mut params = vec![
("DateStyle", "ISO, MDY"),
("client_encoding", "UTF8"),
("TimeZone", "UTC"),
];
if let Some(ref extra_float_digits) = options.extra_float_digits {
params.push(("extra_float_digits", extra_float_digits));
}
if let Some(ref application_name) = options.application_name {
params.push(("application_name", application_name));
}
if let Some(ref options) = options.options {
params.push(("options", options));
}
stream
.send(Startup {
username: Some(&options.username),
database: options.database.as_deref(),
params: ¶ms,
})
.await?;
let mut process_id = 0;
let mut secret_key = 0;
let transaction_status;
loop {
let message = stream.recv().await?;
match message.format {
MessageFormat::Authentication => match message.decode()? {
Authentication::Ok => {
}
Authentication::CleartextPassword => {
stream
.send(Password::Cleartext(
options.password.as_deref().unwrap_or_default(),
))
.await?;
}
Authentication::Md5Password(body) => {
stream
.send(Password::Md5 {
username: &options.username,
password: options.password.as_deref().unwrap_or_default(),
salt: body.salt,
})
.await?;
}
Authentication::Sasl(body) => {
sasl::authenticate(&mut stream, options, body).await?;
}
method => {
return Err(err_protocol!(
"unsupported authentication method: {:?}",
method
));
}
},
MessageFormat::BackendKeyData => {
let data: BackendKeyData = message.decode()?;
process_id = data.process_id;
secret_key = data.secret_key;
}
MessageFormat::ReadyForQuery => {
transaction_status =
ReadyForQuery::decode(message.contents)?.transaction_status;
break;
}
_ => {
return Err(err_protocol!(
"establish: unexpected message: {:?}",
message.format
))
}
}
}
Ok(PgConnection {
stream,
process_id,
secret_key,
transaction_status,
transaction_depth: 0,
pending_ready_for_query_count: 0,
next_statement_id: Oid(1),
cache_statement: StatementCache::new(options.statement_cache_capacity),
cache_type_oid: HashMap::new(),
cache_type_info: HashMap::new(),
log_settings: options.log_settings.clone(),
})
}
}