use std::marker::PhantomData;
use bytes::BytesMut;
use mssql_codec::connection::Connection;
#[cfg(feature = "tls")]
use mssql_tls::{TlsConfig, TlsConnector, TlsNegotiationMode};
use tds_protocol::login7::Login7;
use tds_protocol::packet::MAX_PACKET_SIZE;
use tds_protocol::packet::PacketType;
use tds_protocol::prelogin::{EncryptionLevel, PreLogin};
use tds_protocol::token::{EnvChange, EnvChangeType, Token, TokenParser};
use tokio::net::TcpStream;
use tokio::time::timeout;
use crate::config::Config;
use crate::error::{Error, Result};
#[cfg(feature = "otel")]
use crate::instrumentation::InstrumentationContext;
use crate::state::{Disconnected, Ready};
use crate::statement_cache::StatementCache;
use super::{Client, ConnectionHandle};
impl Client<Disconnected> {
pub async fn connect(config: Config) -> Result<Client<Ready>> {
let max_redirects = config.redirect.max_redirects;
let follow_redirects = config.redirect.follow_redirects;
let per_attempt = config.timeouts.connect_timeout
+ config.timeouts.tls_timeout
+ config.timeouts.login_timeout;
let overall = per_attempt * (max_redirects as u32 + 1);
let overall = overall.min(std::time::Duration::from_secs(300));
let mut attempts = 0;
let initial_host = config.host.clone();
let initial_port = config.port;
let mut current_config = config;
let result = timeout(overall, async {
loop {
attempts += 1;
if attempts > max_redirects + 1 {
return Err(Error::TooManyRedirects { max: max_redirects });
}
match Self::try_connect(¤t_config).await {
Ok(client) => return Ok(client),
Err(Error::Routing { host, port }) => {
if !follow_redirects {
return Err(Error::Routing { host, port });
}
tracing::info!(
host = %host,
port = port,
attempt = attempts,
max_redirects = max_redirects,
"following Azure SQL routing redirect"
);
current_config = current_config.with_host(&host).with_port(port);
continue;
}
Err(e) => return Err(e),
}
}
})
.await;
match result {
Ok(inner) => inner,
Err(_elapsed) => Err(Error::ConnectTimeout {
host: initial_host,
port: initial_port,
}),
}
}
async fn try_connect(config: &Config) -> Result<Client<Ready>> {
tracing::info!(
host = %config.host,
port = config.port,
database = ?config.database,
"connecting to SQL Server"
);
let addr = format!("{}:{}", config.host, config.port);
tracing::debug!("establishing TCP connection to {}", addr);
let tcp_stream = timeout(config.timeouts.connect_timeout, TcpStream::connect(&addr))
.await
.map_err(|_| Error::ConnectTimeout {
host: config.host.clone(),
port: config.port,
})?
.map_err(Error::from)?;
tcp_stream.set_nodelay(true).map_err(Error::from)?;
#[cfg(feature = "tls")]
{
let tls_mode = TlsNegotiationMode::from_encrypt_mode(config.strict_mode);
if tls_mode.is_tls_first() {
return Self::connect_tds_8(config, tcp_stream).await;
}
Self::connect_tds_7x(config, tcp_stream).await
}
#[cfg(not(feature = "tls"))]
{
if config.strict_mode {
return Err(Error::Config(
"TDS 8.0 strict mode requires TLS. Enable the 'tls' feature or use Encrypt=no_tls".into()
));
}
if !config.no_tls {
return Err(Error::Config(
"TLS encryption requires the 'tls' feature. Either enable the 'tls' feature \
or use Encrypt=no_tls in your connection string for unencrypted connections."
.into(),
));
}
Self::connect_no_tls(config, tcp_stream).await
}
}
#[cfg(feature = "tls")]
async fn connect_tds_8(config: &Config, tcp_stream: TcpStream) -> Result<Client<Ready>> {
tracing::debug!("using TDS 8.0 strict mode (TLS first)");
let tls_config = TlsConfig::new()
.strict_mode(true)
.trust_server_certificate(config.trust_server_certificate)
.with_alpn_protocols(vec![b"tds/8.0".to_vec()]);
let tls_connector = TlsConnector::new(tls_config)?;
let tls_stream = timeout(
config.timeouts.tls_timeout,
tls_connector.connect(tcp_stream, &config.host),
)
.await
.map_err(|_| Error::TlsTimeout {
host: config.host.clone(),
port: config.port,
})??;
tracing::debug!("TLS handshake completed (strict mode)");
let mut connection = Connection::new(tls_stream);
let prelogin = Self::build_prelogin(config, EncryptionLevel::Required);
Self::send_prelogin(&mut connection, &prelogin).await?;
let _prelogin_response = Self::receive_prelogin(&mut connection).await?;
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let negotiator = Self::create_negotiator(config)?;
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let sspi_token = match negotiator {
Some(ref neg) => Some(neg.initialize()?),
None => None,
};
#[cfg(not(any(feature = "integrated-auth", feature = "sspi-auth")))]
let sspi_token: Option<Vec<u8>> = None;
let login = Self::build_login7(config, sspi_token);
Self::send_login7(&mut connection, &login).await?;
let (server_version, current_database, routing) = timeout(
config.timeouts.login_timeout,
Self::process_login_response(
&mut connection,
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
negotiator.as_deref(),
),
)
.await
.map_err(|_| Error::LoginTimeout {
host: config.host.clone(),
port: config.port,
})??;
if let Some((host, port)) = routing {
return Err(Error::Routing { host, port });
}
Ok(Client {
config: config.clone(),
_state: PhantomData,
connection: Some(ConnectionHandle::Tls(connection)),
server_version,
current_database: current_database.clone(),
statement_cache: StatementCache::with_default_size(),
transaction_descriptor: 0, needs_reset: false, #[cfg(feature = "otel")]
instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
.with_database(current_database.unwrap_or_default()),
})
}
#[cfg(feature = "tls")]
async fn connect_tds_7x(config: &Config, mut tcp_stream: TcpStream) -> Result<Client<Ready>> {
use bytes::BufMut;
use tds_protocol::packet::{PACKET_HEADER_SIZE, PacketHeader, PacketStatus};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
tracing::debug!("using TDS 7.x flow (PreLogin first)");
let client_encryption = if config.no_tls {
tracing::warn!(
"⚠️ no_tls mode enabled. Connection will be UNENCRYPTED. \
Credentials and data will be transmitted in plaintext. \
This should only be used for development/testing with legacy SQL Server."
);
EncryptionLevel::NotSupported
} else if config.encrypt {
EncryptionLevel::On
} else {
EncryptionLevel::Off
};
let prelogin = Self::build_prelogin(config, client_encryption);
tracing::debug!(encryption = ?client_encryption, "sending PreLogin");
let prelogin_bytes = prelogin.encode();
let header = PacketHeader::new(
PacketType::PreLogin,
PacketStatus::END_OF_MESSAGE,
(PACKET_HEADER_SIZE + prelogin_bytes.len()) as u16,
);
let mut packet_buf = BytesMut::with_capacity(PACKET_HEADER_SIZE + prelogin_bytes.len());
header.encode(&mut packet_buf);
packet_buf.put_slice(&prelogin_bytes);
tcp_stream
.write_all(&packet_buf)
.await
.map_err(Error::from)?;
let mut header_buf = [0u8; PACKET_HEADER_SIZE];
tcp_stream
.read_exact(&mut header_buf)
.await
.map_err(Error::from)?;
let response_length = u16::from_be_bytes([header_buf[2], header_buf[3]]) as usize;
let payload_length = response_length.saturating_sub(PACKET_HEADER_SIZE);
let mut response_buf = vec![0u8; payload_length];
tcp_stream
.read_exact(&mut response_buf)
.await
.map_err(Error::from)?;
let prelogin_response = PreLogin::decode(&response_buf[..])?;
let client_tds_version = config.tds_version;
if let Some(ref server_version) = prelogin_response.server_version {
tracing::debug!(
requested_tds_version = %client_tds_version,
server_product_version = %server_version,
server_product = server_version.product_name(),
max_tds_version = %server_version.max_tds_version(),
"PreLogin response received"
);
let server_max_tds = server_version.max_tds_version();
if server_max_tds < client_tds_version && !client_tds_version.is_tds_8() {
tracing::warn!(
requested_tds_version = %client_tds_version,
server_max_tds_version = %server_max_tds,
server_product = server_version.product_name(),
"Server supports lower TDS version than requested. \
Connection will use server's maximum: {}",
server_max_tds
);
}
if server_max_tds.is_legacy() {
tracing::warn!(
server_product = server_version.product_name(),
server_max_tds_version = %server_max_tds,
"Server uses legacy TDS version. Some features may not be available."
);
}
} else {
tracing::debug!(
requested_tds_version = %client_tds_version,
"PreLogin response received (no version info)"
);
}
let server_encryption = prelogin_response.encryption;
tracing::debug!(encryption = ?server_encryption, "server encryption level");
let negotiated_encryption = match (client_encryption, server_encryption) {
(EncryptionLevel::NotSupported, EncryptionLevel::NotSupported) => {
EncryptionLevel::NotSupported
}
(EncryptionLevel::Off, EncryptionLevel::Off) => EncryptionLevel::Off,
(EncryptionLevel::On, EncryptionLevel::Off)
| (EncryptionLevel::On, EncryptionLevel::NotSupported) => {
return Err(Error::Protocol(
"Server does not support requested encryption level".to_string(),
));
}
_ => EncryptionLevel::On,
};
let use_tls = negotiated_encryption != EncryptionLevel::NotSupported;
if use_tls {
let tls_config =
TlsConfig::new().trust_server_certificate(config.trust_server_certificate);
let tls_connector = TlsConnector::new(tls_config)?;
let mut tls_stream = timeout(
config.timeouts.tls_timeout,
tls_connector.connect_with_prelogin(tcp_stream, &config.host),
)
.await
.map_err(|_| Error::TlsTimeout {
host: config.host.clone(),
port: config.port,
})??;
tracing::debug!("TLS handshake completed (PreLogin wrapped)");
let login_only_encryption = negotiated_encryption == EncryptionLevel::Off;
if login_only_encryption {
use tokio::io::AsyncWriteExt;
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let negotiator = Self::create_negotiator(config)?;
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let sspi_token = match negotiator {
Some(ref neg) => Some(neg.initialize()?),
None => None,
};
#[cfg(not(any(feature = "integrated-auth", feature = "sspi-auth")))]
let sspi_token: Option<Vec<u8>> = None;
let login = Self::build_login7(config, sspi_token);
let login_payload = login.encode();
let max_packet = MAX_PACKET_SIZE;
let max_payload = max_packet - PACKET_HEADER_SIZE;
let chunks: Vec<_> = login_payload.chunks(max_payload).collect();
let total_chunks = chunks.len();
for (i, chunk) in chunks.into_iter().enumerate() {
let is_last = i == total_chunks - 1;
let status = if is_last {
PacketStatus::END_OF_MESSAGE
} else {
PacketStatus::NORMAL
};
let header = PacketHeader::new(
PacketType::Tds7Login,
status,
(PACKET_HEADER_SIZE + chunk.len()) as u16,
);
let mut packet_buf = BytesMut::with_capacity(PACKET_HEADER_SIZE + chunk.len());
header.encode(&mut packet_buf);
packet_buf.put_slice(chunk);
tls_stream
.write_all(&packet_buf)
.await
.map_err(Error::from)?;
}
tls_stream.flush().await.map_err(Error::from)?;
tracing::debug!("Login7 sent through TLS, switching to plaintext for response");
let (wrapper, _client_conn) = tls_stream.into_inner();
let tcp_stream = wrapper.into_inner();
let mut connection = Connection::new(tcp_stream);
let (server_version, current_database, routing) = timeout(
config.timeouts.login_timeout,
Self::process_login_response(
&mut connection,
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
negotiator.as_deref(),
),
)
.await
.map_err(|_| Error::LoginTimeout {
host: config.host.clone(),
port: config.port,
})??;
if let Some((host, port)) = routing {
return Err(Error::Routing { host, port });
}
Ok(Client {
config: config.clone(),
_state: PhantomData,
connection: Some(ConnectionHandle::Plain(connection)),
server_version,
current_database: current_database.clone(),
statement_cache: StatementCache::with_default_size(),
transaction_descriptor: 0, needs_reset: false, #[cfg(feature = "otel")]
instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
.with_database(current_database.unwrap_or_default()),
})
} else {
let mut connection = Connection::new(tls_stream);
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let negotiator = Self::create_negotiator(config)?;
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let sspi_token = match negotiator {
Some(ref neg) => Some(neg.initialize()?),
None => None,
};
#[cfg(not(any(feature = "integrated-auth", feature = "sspi-auth")))]
let sspi_token: Option<Vec<u8>> = None;
let login = Self::build_login7(config, sspi_token);
Self::send_login7(&mut connection, &login).await?;
let (server_version, current_database, routing) = timeout(
config.timeouts.login_timeout,
Self::process_login_response(
&mut connection,
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
negotiator.as_deref(),
),
)
.await
.map_err(|_| Error::LoginTimeout {
host: config.host.clone(),
port: config.port,
})??;
if let Some((host, port)) = routing {
return Err(Error::Routing { host, port });
}
Ok(Client {
config: config.clone(),
_state: PhantomData,
connection: Some(ConnectionHandle::TlsPrelogin(connection)),
server_version,
current_database: current_database.clone(),
statement_cache: StatementCache::with_default_size(),
transaction_descriptor: 0, needs_reset: false, #[cfg(feature = "otel")]
instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
.with_database(current_database.unwrap_or_default()),
})
}
} else {
tracing::warn!(
"Connecting without TLS encryption. This is insecure and should only be \
used for development/testing on trusted networks."
);
let mut connection = Connection::new(tcp_stream);
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let negotiator = Self::create_negotiator(config)?;
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let sspi_token = match negotiator {
Some(ref neg) => Some(neg.initialize()?),
None => None,
};
#[cfg(not(any(feature = "integrated-auth", feature = "sspi-auth")))]
let sspi_token: Option<Vec<u8>> = None;
let login = Self::build_login7(config, sspi_token);
Self::send_login7(&mut connection, &login).await?;
let (server_version, current_database, routing) = timeout(
config.timeouts.login_timeout,
Self::process_login_response(
&mut connection,
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
negotiator.as_deref(),
),
)
.await
.map_err(|_| Error::LoginTimeout {
host: config.host.clone(),
port: config.port,
})??;
if let Some((host, port)) = routing {
return Err(Error::Routing { host, port });
}
Ok(Client {
config: config.clone(),
_state: PhantomData,
connection: Some(ConnectionHandle::Plain(connection)),
server_version,
current_database: current_database.clone(),
statement_cache: StatementCache::with_default_size(),
transaction_descriptor: 0, needs_reset: false, #[cfg(feature = "otel")]
instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
.with_database(current_database.unwrap_or_default()),
})
}
}
#[cfg(not(feature = "tls"))]
async fn connect_no_tls(config: &Config, mut tcp_stream: TcpStream) -> Result<Client<Ready>> {
use bytes::BufMut;
use tds_protocol::packet::{PACKET_HEADER_SIZE, PacketHeader, PacketStatus};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
tracing::warn!(
"⚠️ Connecting without TLS (tls feature disabled). \
Credentials and data will be transmitted in plaintext."
);
let prelogin = Self::build_prelogin(config, EncryptionLevel::NotSupported);
let prelogin_bytes = prelogin.encode();
let header = PacketHeader::new(
PacketType::PreLogin,
PacketStatus::END_OF_MESSAGE,
(PACKET_HEADER_SIZE + prelogin_bytes.len()) as u16,
);
let mut packet_buf = BytesMut::with_capacity(PACKET_HEADER_SIZE + prelogin_bytes.len());
header.encode(&mut packet_buf);
packet_buf.put_slice(&prelogin_bytes);
tcp_stream
.write_all(&packet_buf)
.await
.map_err(Error::from)?;
let mut header_buf = [0u8; PACKET_HEADER_SIZE];
tcp_stream
.read_exact(&mut header_buf)
.await
.map_err(Error::from)?;
let response_length = u16::from_be_bytes([header_buf[2], header_buf[3]]) as usize;
let payload_length = response_length.saturating_sub(PACKET_HEADER_SIZE);
let mut response_buf = vec![0u8; payload_length];
tcp_stream
.read_exact(&mut response_buf)
.await
.map_err(Error::from)?;
let prelogin_response = PreLogin::decode(&response_buf[..])?;
let server_encryption = prelogin_response.encryption;
if server_encryption != EncryptionLevel::NotSupported {
return Err(Error::Config(format!(
"Server requires encryption (level: {:?}) but TLS feature is disabled. \
Either enable the 'tls' feature or configure the server to allow unencrypted connections.",
server_encryption
)));
}
tracing::debug!("Server accepted unencrypted connection");
let mut connection = Connection::new(tcp_stream);
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let negotiator = Self::create_negotiator(config)?;
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
let sspi_token = match negotiator {
Some(ref neg) => Some(neg.initialize()?),
None => None,
};
#[cfg(not(any(feature = "integrated-auth", feature = "sspi-auth")))]
let sspi_token: Option<Vec<u8>> = None;
let login = Self::build_login7(config, sspi_token);
Self::send_login7(&mut connection, &login).await?;
let (server_version, current_database, routing) = timeout(
config.timeouts.login_timeout,
Self::process_login_response(
&mut connection,
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
negotiator.as_deref(),
),
)
.await
.map_err(|_| Error::LoginTimeout {
host: config.host.clone(),
port: config.port,
})??;
if let Some((host, port)) = routing {
return Err(Error::Routing { host, port });
}
Ok(Client {
config: config.clone(),
_state: PhantomData,
connection: Some(ConnectionHandle::Plain(connection)),
server_version,
current_database: current_database.clone(),
statement_cache: StatementCache::with_default_size(),
transaction_descriptor: 0,
needs_reset: false,
#[cfg(feature = "otel")]
instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
.with_database(current_database.unwrap_or_default()),
})
}
fn build_prelogin(config: &Config, encryption: EncryptionLevel) -> PreLogin {
let version = if config.strict_mode {
tds_protocol::version::TdsVersion::V8_0
} else {
config.tds_version
};
let mut prelogin = PreLogin::new()
.with_version(version)
.with_encryption(encryption);
if config.mars {
prelogin = prelogin.with_mars(true);
}
if let Some(ref instance) = config.instance {
prelogin = prelogin.with_instance(instance);
}
prelogin
}
fn build_login7(config: &Config, sspi_token: Option<Vec<u8>>) -> Login7 {
let version = if config.strict_mode {
tds_protocol::version::TdsVersion::V8_0
} else {
config.tds_version
};
let mut login = Login7::new()
.with_tds_version(version)
.with_packet_size(config.packet_size as u32)
.with_app_name(&config.application_name)
.with_server_name(&config.host)
.with_hostname(&config.host);
if let Some(ref database) = config.database {
login = login.with_database(database);
}
if let Some(token) = sspi_token {
login = login.with_integrated_auth(token);
} else if let mssql_auth::Credentials::SqlServer { username, password } =
&config.credentials
{
login = login.with_sql_auth(username.as_ref(), password.as_ref());
}
login
}
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
fn create_negotiator(config: &Config) -> Result<Option<Box<dyn mssql_auth::SspiNegotiator>>> {
#[allow(clippy::match_like_matches_macro)]
let is_integrated = match &config.credentials {
mssql_auth::Credentials::Integrated => true,
_ => false,
};
if !is_integrated {
return Ok(None);
}
#[cfg(feature = "sspi-auth")]
let negotiator: Box<dyn mssql_auth::SspiNegotiator> =
Box::new(mssql_auth::SspiAuth::new(&config.host, config.port)?);
#[cfg(all(feature = "integrated-auth", not(feature = "sspi-auth")))]
let negotiator: Box<dyn mssql_auth::SspiNegotiator> =
Box::new(mssql_auth::IntegratedAuth::new(&config.host, config.port));
Ok(Some(negotiator))
}
#[cfg(feature = "tls")]
async fn send_prelogin<T>(connection: &mut Connection<T>, prelogin: &PreLogin) -> Result<()>
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let payload = prelogin.encode();
let max_packet = MAX_PACKET_SIZE;
connection
.send_message(PacketType::PreLogin, payload, max_packet)
.await?;
Ok(())
}
#[cfg(feature = "tls")]
async fn receive_prelogin<T>(connection: &mut Connection<T>) -> Result<PreLogin>
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let message = connection
.read_message()
.await?
.ok_or(Error::ConnectionClosed)?;
Ok(PreLogin::decode(&message.payload[..])?)
}
async fn send_login7<T>(connection: &mut Connection<T>, login: &Login7) -> Result<()>
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let payload = login.encode();
let max_packet = MAX_PACKET_SIZE;
connection
.send_message(PacketType::Tds7Login, payload, max_packet)
.await?;
Ok(())
}
#[allow(clippy::never_loop)] async fn process_login_response<T>(
connection: &mut Connection<T>,
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))] negotiator: Option<
&dyn mssql_auth::SspiNegotiator,
>,
) -> Result<(Option<u32>, Option<String>, Option<(String, u16)>)>
where
T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
let mut server_version = None;
let mut database = None;
let mut routing = None;
'outer: loop {
let message = connection
.read_message()
.await?
.ok_or(Error::ConnectionClosed)?;
let response_bytes = message.payload;
let mut parser = TokenParser::new(response_bytes);
while let Some(token) = parser.next_token()? {
match token {
Token::LoginAck(ack) => {
tracing::info!(
version = ack.tds_version,
interface = ack.interface,
prog_name = %ack.prog_name,
"login acknowledged"
);
server_version = Some(ack.tds_version);
}
Token::EnvChange(env) => {
Self::process_env_change(&env, &mut database, &mut routing);
}
#[cfg(any(feature = "integrated-auth", feature = "sspi-auth"))]
Token::Sspi(sspi_token) => {
let neg = negotiator.ok_or_else(|| {
Error::Protocol(
"server sent SSPI challenge but no negotiator is configured"
.to_string(),
)
})?;
tracing::debug!(
challenge_len = sspi_token.data.len(),
"received SSPI challenge from server"
);
if let Some(response) = neg.step(&sspi_token.data)? {
tracing::debug!(response_len = response.len(), "sending SSPI response");
connection
.send_message(
PacketType::Sspi,
bytes::Bytes::from(response),
tds_protocol::packet::MAX_PACKET_SIZE,
)
.await?;
}
continue 'outer;
}
Token::Error(err) => {
return Err(Error::Server {
number: err.number,
state: err.state,
class: err.class,
message: err.message.clone(),
server: if err.server.is_empty() {
None
} else {
Some(err.server.clone())
},
procedure: if err.procedure.is_empty() {
None
} else {
Some(err.procedure.clone())
},
line: err.line as u32,
});
}
Token::Info(info) => {
tracing::info!(
number = info.number,
message = %info.message,
"server info message"
);
}
Token::Done(done) => {
if done.status.error {
return Err(Error::Protocol("login failed".to_string()));
}
break 'outer;
}
_ => {}
}
}
break;
}
Ok((server_version, database, routing))
}
fn process_env_change(
env: &EnvChange,
database: &mut Option<String>,
routing: &mut Option<(String, u16)>,
) {
use tds_protocol::token::EnvChangeValue;
match env.env_type {
EnvChangeType::Database => {
if let EnvChangeValue::String(ref new_value) = env.new_value {
tracing::debug!(database = %new_value, "database changed");
*database = Some(new_value.clone());
}
}
EnvChangeType::Routing => {
if let EnvChangeValue::Routing { ref host, port } = env.new_value {
tracing::info!(host = %host, port = port, "routing redirect received");
*routing = Some((host.clone(), port));
}
}
_ => {
if let EnvChangeValue::String(ref new_value) = env.new_value {
tracing::debug!(
env_type = ?env.env_type,
new_value = %new_value,
"environment change"
);
}
}
}
}
}