1#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
6
7use std::marker::PhantomData;
8use std::sync::Arc;
9
10use bytes::BytesMut;
11use mssql_codec::connection::Connection;
12use mssql_tls::{TlsConfig, TlsConnector, TlsNegotiationMode, TlsStream};
13use tds_protocol::login7::Login7;
14use tds_protocol::packet::{MAX_PACKET_SIZE, PacketType};
15use tds_protocol::prelogin::{EncryptionLevel, PreLogin};
16use tds_protocol::rpc::{RpcParam, RpcRequest, TypeInfo as RpcTypeInfo};
17use tds_protocol::token::{
18 ColMetaData, ColumnData, EnvChange, EnvChangeType, NbcRow, RawRow, Token, TokenParser,
19};
20use tds_protocol::tvp::{
21 TvpColumnDef as TvpWireColumnDef, TvpColumnFlags, TvpEncoder, TvpWireType, encode_tvp_bit,
22 encode_tvp_decimal, encode_tvp_float, encode_tvp_int, encode_tvp_null, encode_tvp_nvarchar,
23 encode_tvp_varbinary,
24};
25use tokio::net::TcpStream;
26use tokio::time::timeout;
27
28use crate::config::Config;
29use crate::error::{Error, Result};
30#[cfg(feature = "otel")]
31use crate::instrumentation::InstrumentationContext;
32use crate::state::{ConnectionState, Disconnected, InTransaction, Ready};
33use crate::statement_cache::StatementCache;
34use crate::stream::{MultiResultStream, QueryStream};
35use crate::transaction::SavePoint;
36
37pub struct Client<S: ConnectionState> {
43 config: Config,
44 _state: PhantomData<S>,
45 connection: Option<ConnectionHandle>,
47 server_version: Option<u32>,
49 current_database: Option<String>,
51 statement_cache: StatementCache,
53 transaction_descriptor: u64,
57 #[cfg(feature = "otel")]
59 instrumentation: InstrumentationContext,
60}
61
62#[allow(dead_code)] enum ConnectionHandle {
70 Tls(Connection<TlsStream<TcpStream>>),
72 TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
74 Plain(Connection<TcpStream>),
76}
77
78impl Client<Disconnected> {
79 pub async fn connect(config: Config) -> Result<Client<Ready>> {
90 let max_redirects = config.redirect.max_redirects;
91 let follow_redirects = config.redirect.follow_redirects;
92 let mut attempts = 0;
93 let mut current_config = config;
94
95 loop {
96 attempts += 1;
97 if attempts > max_redirects + 1 {
98 return Err(Error::TooManyRedirects { max: max_redirects });
99 }
100
101 match Self::try_connect(¤t_config).await {
102 Ok(client) => return Ok(client),
103 Err(Error::Routing { host, port }) => {
104 if !follow_redirects {
105 return Err(Error::Routing { host, port });
106 }
107 tracing::info!(
108 host = %host,
109 port = port,
110 attempt = attempts,
111 max_redirects = max_redirects,
112 "following Azure SQL routing redirect"
113 );
114 current_config = current_config.with_host(&host).with_port(port);
115 continue;
116 }
117 Err(e) => return Err(e),
118 }
119 }
120 }
121
122 async fn try_connect(config: &Config) -> Result<Client<Ready>> {
123 tracing::info!(
124 host = %config.host,
125 port = config.port,
126 database = ?config.database,
127 "connecting to SQL Server"
128 );
129
130 let addr = format!("{}:{}", config.host, config.port);
131
132 tracing::debug!("establishing TCP connection to {}", addr);
134 let tcp_stream = timeout(config.timeouts.connect_timeout, TcpStream::connect(&addr))
135 .await
136 .map_err(|_| Error::ConnectTimeout)?
137 .map_err(|e| Error::Io(Arc::new(e)))?;
138
139 tcp_stream
141 .set_nodelay(true)
142 .map_err(|e| Error::Io(Arc::new(e)))?;
143
144 let tls_mode = TlsNegotiationMode::from_encrypt_mode(config.strict_mode);
146
147 if tls_mode.is_tls_first() {
149 return Self::connect_tds_8(config, tcp_stream).await;
150 }
151
152 Self::connect_tds_7x(config, tcp_stream).await
154 }
155
156 async fn connect_tds_8(config: &Config, tcp_stream: TcpStream) -> Result<Client<Ready>> {
160 tracing::debug!("using TDS 8.0 strict mode (TLS first)");
161
162 let tls_config = TlsConfig::new()
164 .strict_mode(true)
165 .trust_server_certificate(config.trust_server_certificate);
166
167 let tls_connector = TlsConnector::new(tls_config).map_err(|e| Error::Tls(e.to_string()))?;
168
169 let tls_stream = timeout(
171 config.timeouts.tls_timeout,
172 tls_connector.connect(tcp_stream, &config.host),
173 )
174 .await
175 .map_err(|_| Error::TlsTimeout)?
176 .map_err(|e| Error::Tls(e.to_string()))?;
177
178 tracing::debug!("TLS handshake completed (strict mode)");
179
180 let mut connection = Connection::new(tls_stream);
182
183 let prelogin = Self::build_prelogin(config, EncryptionLevel::Required);
185 Self::send_prelogin(&mut connection, &prelogin).await?;
186 let _prelogin_response = Self::receive_prelogin(&mut connection).await?;
187
188 let login = Self::build_login7(config);
190 Self::send_login7(&mut connection, &login).await?;
191
192 let (server_version, current_database, routing) =
194 Self::process_login_response(&mut connection).await?;
195
196 if let Some((host, port)) = routing {
198 return Err(Error::Routing { host, port });
199 }
200
201 Ok(Client {
202 config: config.clone(),
203 _state: PhantomData,
204 connection: Some(ConnectionHandle::Tls(connection)),
205 server_version,
206 current_database: current_database.clone(),
207 statement_cache: StatementCache::with_default_size(),
208 transaction_descriptor: 0, #[cfg(feature = "otel")]
210 instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
211 .with_database(current_database.unwrap_or_default()),
212 })
213 }
214
215 async fn connect_tds_7x(config: &Config, mut tcp_stream: TcpStream) -> Result<Client<Ready>> {
223 use bytes::BufMut;
224 use tds_protocol::packet::{PACKET_HEADER_SIZE, PacketHeader, PacketStatus};
225 use tokio::io::{AsyncReadExt, AsyncWriteExt};
226
227 tracing::debug!("using TDS 7.x flow (PreLogin first)");
228
229 let client_encryption = if config.encrypt {
232 EncryptionLevel::On
233 } else {
234 EncryptionLevel::Off
235 };
236 let prelogin = Self::build_prelogin(config, client_encryption);
237 tracing::debug!(encryption = ?client_encryption, "sending PreLogin");
238 let prelogin_bytes = prelogin.encode();
239
240 let header = PacketHeader::new(
242 PacketType::PreLogin,
243 PacketStatus::END_OF_MESSAGE,
244 (PACKET_HEADER_SIZE + prelogin_bytes.len()) as u16,
245 );
246
247 let mut packet_buf = BytesMut::with_capacity(PACKET_HEADER_SIZE + prelogin_bytes.len());
248 header.encode(&mut packet_buf);
249 packet_buf.put_slice(&prelogin_bytes);
250
251 tcp_stream
252 .write_all(&packet_buf)
253 .await
254 .map_err(|e| Error::Io(Arc::new(e)))?;
255
256 let mut header_buf = [0u8; PACKET_HEADER_SIZE];
258 tcp_stream
259 .read_exact(&mut header_buf)
260 .await
261 .map_err(|e| Error::Io(Arc::new(e)))?;
262
263 let response_length = u16::from_be_bytes([header_buf[2], header_buf[3]]) as usize;
264 let payload_length = response_length.saturating_sub(PACKET_HEADER_SIZE);
265
266 let mut response_buf = vec![0u8; payload_length];
267 tcp_stream
268 .read_exact(&mut response_buf)
269 .await
270 .map_err(|e| Error::Io(Arc::new(e)))?;
271
272 let prelogin_response =
273 PreLogin::decode(&response_buf[..]).map_err(|e| Error::Protocol(e.to_string()))?;
274
275 let server_encryption = prelogin_response.encryption;
277 tracing::debug!(encryption = ?server_encryption, "server encryption level");
278
279 let negotiated_encryption = match (client_encryption, server_encryption) {
285 (EncryptionLevel::NotSupported, EncryptionLevel::NotSupported) => {
286 EncryptionLevel::NotSupported
287 }
288 (EncryptionLevel::Off, EncryptionLevel::Off) => EncryptionLevel::Off,
289 (EncryptionLevel::On, EncryptionLevel::Off)
290 | (EncryptionLevel::On, EncryptionLevel::NotSupported) => {
291 return Err(Error::Protocol(
292 "Server does not support requested encryption level".to_string(),
293 ));
294 }
295 _ => EncryptionLevel::On,
296 };
297
298 let use_tls = negotiated_encryption != EncryptionLevel::NotSupported;
301
302 if use_tls {
303 let tls_config =
306 TlsConfig::new().trust_server_certificate(config.trust_server_certificate);
307
308 let tls_connector =
309 TlsConnector::new(tls_config).map_err(|e| Error::Tls(e.to_string()))?;
310
311 let mut tls_stream = timeout(
313 config.timeouts.tls_timeout,
314 tls_connector.connect_with_prelogin(tcp_stream, &config.host),
315 )
316 .await
317 .map_err(|_| Error::TlsTimeout)?
318 .map_err(|e| Error::Tls(e.to_string()))?;
319
320 tracing::debug!("TLS handshake completed (PreLogin wrapped)");
321
322 let login_only_encryption = negotiated_encryption == EncryptionLevel::Off;
324
325 if login_only_encryption {
326 use tokio::io::AsyncWriteExt;
334
335 let login = Self::build_login7(config);
337 let login_payload = login.encode();
338
339 let max_packet = MAX_PACKET_SIZE;
341 let max_payload = max_packet - PACKET_HEADER_SIZE;
342 let chunks: Vec<_> = login_payload.chunks(max_payload).collect();
343 let total_chunks = chunks.len();
344
345 for (i, chunk) in chunks.into_iter().enumerate() {
346 let is_last = i == total_chunks - 1;
347 let status = if is_last {
348 PacketStatus::END_OF_MESSAGE
349 } else {
350 PacketStatus::NORMAL
351 };
352
353 let header = PacketHeader::new(
354 PacketType::Tds7Login,
355 status,
356 (PACKET_HEADER_SIZE + chunk.len()) as u16,
357 );
358
359 let mut packet_buf = BytesMut::with_capacity(PACKET_HEADER_SIZE + chunk.len());
360 header.encode(&mut packet_buf);
361 packet_buf.put_slice(chunk);
362
363 tls_stream
364 .write_all(&packet_buf)
365 .await
366 .map_err(|e| Error::Io(Arc::new(e)))?;
367 }
368
369 tls_stream
371 .flush()
372 .await
373 .map_err(|e| Error::Io(Arc::new(e)))?;
374
375 tracing::debug!("Login7 sent through TLS, switching to plaintext for response");
376
377 let (wrapper, _client_conn) = tls_stream.into_inner();
381 let tcp_stream = wrapper.into_inner();
382
383 let mut connection = Connection::new(tcp_stream);
385
386 let (server_version, current_database, routing) =
388 Self::process_login_response(&mut connection).await?;
389
390 if let Some((host, port)) = routing {
392 return Err(Error::Routing { host, port });
393 }
394
395 Ok(Client {
397 config: config.clone(),
398 _state: PhantomData,
399 connection: Some(ConnectionHandle::Plain(connection)),
400 server_version,
401 current_database: current_database.clone(),
402 statement_cache: StatementCache::with_default_size(),
403 transaction_descriptor: 0, #[cfg(feature = "otel")]
405 instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
406 .with_database(current_database.unwrap_or_default()),
407 })
408 } else {
409 let mut connection = Connection::new(tls_stream);
412
413 let login = Self::build_login7(config);
415 Self::send_login7(&mut connection, &login).await?;
416
417 let (server_version, current_database, routing) =
419 Self::process_login_response(&mut connection).await?;
420
421 if let Some((host, port)) = routing {
423 return Err(Error::Routing { host, port });
424 }
425
426 Ok(Client {
427 config: config.clone(),
428 _state: PhantomData,
429 connection: Some(ConnectionHandle::TlsPrelogin(connection)),
430 server_version,
431 current_database: current_database.clone(),
432 statement_cache: StatementCache::with_default_size(),
433 transaction_descriptor: 0, #[cfg(feature = "otel")]
435 instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
436 .with_database(current_database.unwrap_or_default()),
437 })
438 }
439 } else {
440 tracing::warn!(
442 "Connecting without TLS encryption. This is insecure and should only be \
443 used for development/testing on trusted networks."
444 );
445
446 let login = Self::build_login7(config);
448 let login_bytes = login.encode();
449 tracing::debug!("Login7 packet built: {} bytes", login_bytes.len(),);
450 tracing::debug!(
452 "Login7 fixed header (94 bytes): {:02X?}",
453 &login_bytes[..login_bytes.len().min(94)]
454 );
455 if login_bytes.len() > 94 {
457 tracing::debug!(
458 "Login7 variable data ({} bytes): {:02X?}",
459 login_bytes.len() - 94,
460 &login_bytes[94..]
461 );
462 }
463
464 let login_header = PacketHeader::new(
466 PacketType::Tds7Login,
467 PacketStatus::END_OF_MESSAGE,
468 (PACKET_HEADER_SIZE + login_bytes.len()) as u16,
469 )
470 .with_packet_id(1);
471 let mut login_packet_buf =
472 BytesMut::with_capacity(PACKET_HEADER_SIZE + login_bytes.len());
473 login_header.encode(&mut login_packet_buf);
474 login_packet_buf.put_slice(&login_bytes);
475
476 tracing::debug!(
477 "Sending Login7 packet: {} bytes total, header: {:02X?}",
478 login_packet_buf.len(),
479 &login_packet_buf[..PACKET_HEADER_SIZE]
480 );
481 tcp_stream
482 .write_all(&login_packet_buf)
483 .await
484 .map_err(|e| Error::Io(Arc::new(e)))?;
485 tcp_stream
486 .flush()
487 .await
488 .map_err(|e| Error::Io(Arc::new(e)))?;
489 tracing::debug!("Login7 sent and flushed over raw TCP");
490
491 let mut response_header_buf = [0u8; PACKET_HEADER_SIZE];
493 tcp_stream
494 .read_exact(&mut response_header_buf)
495 .await
496 .map_err(|e| Error::Io(Arc::new(e)))?;
497
498 let response_type = response_header_buf[0];
499 let response_length =
500 u16::from_be_bytes([response_header_buf[2], response_header_buf[3]]) as usize;
501 tracing::debug!(
502 "Response header: type={:#04X}, length={}",
503 response_type,
504 response_length
505 );
506
507 let payload_length = response_length.saturating_sub(PACKET_HEADER_SIZE);
509 let mut response_payload = vec![0u8; payload_length];
510 tcp_stream
511 .read_exact(&mut response_payload)
512 .await
513 .map_err(|e| Error::Io(Arc::new(e)))?;
514 tracing::debug!(
515 "Response payload: {} bytes, first 32: {:02X?}",
516 response_payload.len(),
517 &response_payload[..response_payload.len().min(32)]
518 );
519
520 let connection = Connection::new(tcp_stream);
522
523 let response_bytes = bytes::Bytes::from(response_payload);
525 let mut parser = TokenParser::new(response_bytes);
526 let mut server_version = None;
527 let mut current_database = None;
528 let routing = None;
529
530 while let Some(token) = parser
531 .next_token()
532 .map_err(|e| Error::Protocol(e.to_string()))?
533 {
534 match token {
535 Token::LoginAck(ack) => {
536 tracing::info!(
537 version = ack.tds_version,
538 interface = ack.interface,
539 prog_name = %ack.prog_name,
540 "login acknowledged"
541 );
542 server_version = Some(ack.tds_version);
543 }
544 Token::EnvChange(env) => {
545 Self::process_env_change(&env, &mut current_database, &mut None);
546 }
547 Token::Error(err) => {
548 return Err(Error::Server {
549 number: err.number,
550 state: err.state,
551 class: err.class,
552 message: err.message.clone(),
553 server: if err.server.is_empty() {
554 None
555 } else {
556 Some(err.server.clone())
557 },
558 procedure: if err.procedure.is_empty() {
559 None
560 } else {
561 Some(err.procedure.clone())
562 },
563 line: err.line as u32,
564 });
565 }
566 Token::Info(info) => {
567 tracing::info!(
568 number = info.number,
569 message = %info.message,
570 "server info message"
571 );
572 }
573 Token::Done(done) => {
574 if done.status.error {
575 return Err(Error::Protocol("login failed".to_string()));
576 }
577 break;
578 }
579 _ => {}
580 }
581 }
582
583 if let Some((host, port)) = routing {
585 return Err(Error::Routing { host, port });
586 }
587
588 Ok(Client {
589 config: config.clone(),
590 _state: PhantomData,
591 connection: Some(ConnectionHandle::Plain(connection)),
592 server_version,
593 current_database: current_database.clone(),
594 statement_cache: StatementCache::with_default_size(),
595 transaction_descriptor: 0, #[cfg(feature = "otel")]
597 instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
598 .with_database(current_database.unwrap_or_default()),
599 })
600 }
601 }
602
603 fn build_prelogin(config: &Config, encryption: EncryptionLevel) -> PreLogin {
605 let mut prelogin = PreLogin::new().with_encryption(encryption);
606
607 if config.mars {
608 prelogin = prelogin.with_mars(true);
609 }
610
611 if let Some(ref instance) = config.instance {
612 prelogin = prelogin.with_instance(instance);
613 }
614
615 prelogin
616 }
617
618 fn build_login7(config: &Config) -> Login7 {
620 let mut login = Login7::new()
621 .with_packet_size(config.packet_size as u32)
622 .with_app_name(&config.application_name)
623 .with_server_name(&config.host)
624 .with_hostname(&config.host);
625
626 if let Some(ref database) = config.database {
627 login = login.with_database(database);
628 }
629
630 match &config.credentials {
632 mssql_auth::Credentials::SqlServer { username, password } => {
633 login = login.with_sql_auth(username.as_ref(), password.as_ref());
634 }
635 _ => {}
637 }
638
639 login
640 }
641
642 async fn send_prelogin<T>(connection: &mut Connection<T>, prelogin: &PreLogin) -> Result<()>
644 where
645 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
646 {
647 let payload = prelogin.encode();
648 let max_packet = MAX_PACKET_SIZE;
649
650 connection
651 .send_message(PacketType::PreLogin, payload, max_packet)
652 .await
653 .map_err(|e| Error::Protocol(e.to_string()))
654 }
655
656 async fn receive_prelogin<T>(connection: &mut Connection<T>) -> Result<PreLogin>
658 where
659 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
660 {
661 let message = connection
662 .read_message()
663 .await
664 .map_err(|e| Error::Protocol(e.to_string()))?
665 .ok_or(Error::ConnectionClosed)?;
666
667 PreLogin::decode(&message.payload[..]).map_err(|e| Error::Protocol(e.to_string()))
668 }
669
670 async fn send_login7<T>(connection: &mut Connection<T>, login: &Login7) -> Result<()>
672 where
673 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
674 {
675 let payload = login.encode();
676 let max_packet = MAX_PACKET_SIZE;
677
678 connection
679 .send_message(PacketType::Tds7Login, payload, max_packet)
680 .await
681 .map_err(|e| Error::Protocol(e.to_string()))
682 }
683
684 async fn process_login_response<T>(
688 connection: &mut Connection<T>,
689 ) -> Result<(Option<u32>, Option<String>, Option<(String, u16)>)>
690 where
691 T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
692 {
693 let message = connection
694 .read_message()
695 .await
696 .map_err(|e| Error::Protocol(e.to_string()))?
697 .ok_or(Error::ConnectionClosed)?;
698
699 let response_bytes = message.payload;
700
701 let mut parser = TokenParser::new(response_bytes);
702 let mut server_version = None;
703 let mut database = None;
704 let mut routing = None;
705
706 while let Some(token) = parser
707 .next_token()
708 .map_err(|e| Error::Protocol(e.to_string()))?
709 {
710 match token {
711 Token::LoginAck(ack) => {
712 tracing::info!(
713 version = ack.tds_version,
714 interface = ack.interface,
715 prog_name = %ack.prog_name,
716 "login acknowledged"
717 );
718 server_version = Some(ack.tds_version);
719 }
720 Token::EnvChange(env) => {
721 Self::process_env_change(&env, &mut database, &mut routing);
722 }
723 Token::Error(err) => {
724 return Err(Error::Server {
725 number: err.number,
726 state: err.state,
727 class: err.class,
728 message: err.message.clone(),
729 server: if err.server.is_empty() {
730 None
731 } else {
732 Some(err.server.clone())
733 },
734 procedure: if err.procedure.is_empty() {
735 None
736 } else {
737 Some(err.procedure.clone())
738 },
739 line: err.line as u32,
740 });
741 }
742 Token::Info(info) => {
743 tracing::info!(
744 number = info.number,
745 message = %info.message,
746 "server info message"
747 );
748 }
749 Token::Done(done) => {
750 if done.status.error {
751 return Err(Error::Protocol("login failed".to_string()));
752 }
753 break;
754 }
755 _ => {}
756 }
757 }
758
759 Ok((server_version, database, routing))
760 }
761
762 fn process_env_change(
764 env: &EnvChange,
765 database: &mut Option<String>,
766 routing: &mut Option<(String, u16)>,
767 ) {
768 use tds_protocol::token::EnvChangeValue;
769
770 match env.env_type {
771 EnvChangeType::Database => {
772 if let EnvChangeValue::String(ref new_value) = env.new_value {
773 tracing::debug!(database = %new_value, "database changed");
774 *database = Some(new_value.clone());
775 }
776 }
777 EnvChangeType::Routing => {
778 if let EnvChangeValue::Routing { ref host, port } = env.new_value {
779 tracing::info!(host = %host, port = port, "routing redirect received");
780 *routing = Some((host.clone(), port));
781 }
782 }
783 _ => {
784 if let EnvChangeValue::String(ref new_value) = env.new_value {
785 tracing::debug!(
786 env_type = ?env.env_type,
787 new_value = %new_value,
788 "environment change"
789 );
790 }
791 }
792 }
793 }
794}
795
796impl<S: ConnectionState> Client<S> {
798 fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
806 use tds_protocol::token::EnvChangeValue;
807
808 match env.env_type {
809 EnvChangeType::BeginTransaction => {
810 if let EnvChangeValue::Binary(ref data) = env.new_value {
811 if data.len() >= 8 {
812 let descriptor = u64::from_le_bytes([
813 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
814 ]);
815 tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
816 *transaction_descriptor = descriptor;
817 }
818 }
819 }
820 EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
821 tracing::debug!(
822 env_type = ?env.env_type,
823 "transaction ended via raw SQL"
824 );
825 *transaction_descriptor = 0;
826 }
827 _ => {}
828 }
829 }
830
831 async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
837 let payload =
838 tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
839 let max_packet = self.config.packet_size as usize;
840
841 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
842
843 match connection {
844 ConnectionHandle::Tls(conn) => {
845 conn.send_message(PacketType::SqlBatch, payload, max_packet)
846 .await
847 .map_err(|e| Error::Protocol(e.to_string()))?;
848 }
849 ConnectionHandle::TlsPrelogin(conn) => {
850 conn.send_message(PacketType::SqlBatch, payload, max_packet)
851 .await
852 .map_err(|e| Error::Protocol(e.to_string()))?;
853 }
854 ConnectionHandle::Plain(conn) => {
855 conn.send_message(PacketType::SqlBatch, payload, max_packet)
856 .await
857 .map_err(|e| Error::Protocol(e.to_string()))?;
858 }
859 }
860
861 Ok(())
862 }
863
864 async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
868 let payload = rpc.encode_with_transaction(self.transaction_descriptor);
869 let max_packet = self.config.packet_size as usize;
870
871 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
872
873 match connection {
874 ConnectionHandle::Tls(conn) => {
875 conn.send_message(PacketType::Rpc, payload, max_packet)
876 .await
877 .map_err(|e| Error::Protocol(e.to_string()))?;
878 }
879 ConnectionHandle::TlsPrelogin(conn) => {
880 conn.send_message(PacketType::Rpc, payload, max_packet)
881 .await
882 .map_err(|e| Error::Protocol(e.to_string()))?;
883 }
884 ConnectionHandle::Plain(conn) => {
885 conn.send_message(PacketType::Rpc, payload, max_packet)
886 .await
887 .map_err(|e| Error::Protocol(e.to_string()))?;
888 }
889 }
890
891 Ok(())
892 }
893
894 fn convert_params(params: &[&(dyn crate::ToSql + Sync)]) -> Result<Vec<RpcParam>> {
896 use bytes::{BufMut, BytesMut};
897 use mssql_types::SqlValue;
898
899 params
900 .iter()
901 .enumerate()
902 .map(|(i, p)| {
903 let sql_value = p.to_sql()?;
904 let name = format!("@p{}", i + 1);
905
906 Ok(match sql_value {
907 SqlValue::Null => RpcParam::null(&name, RpcTypeInfo::nvarchar(1)),
908 SqlValue::Bool(v) => {
909 let mut buf = BytesMut::with_capacity(1);
910 buf.put_u8(if v { 1 } else { 0 });
911 RpcParam::new(&name, RpcTypeInfo::bit(), buf.freeze())
912 }
913 SqlValue::TinyInt(v) => {
914 let mut buf = BytesMut::with_capacity(1);
915 buf.put_u8(v);
916 RpcParam::new(&name, RpcTypeInfo::tinyint(), buf.freeze())
917 }
918 SqlValue::SmallInt(v) => {
919 let mut buf = BytesMut::with_capacity(2);
920 buf.put_i16_le(v);
921 RpcParam::new(&name, RpcTypeInfo::smallint(), buf.freeze())
922 }
923 SqlValue::Int(v) => RpcParam::int(&name, v),
924 SqlValue::BigInt(v) => RpcParam::bigint(&name, v),
925 SqlValue::Float(v) => {
926 let mut buf = BytesMut::with_capacity(4);
927 buf.put_f32_le(v);
928 RpcParam::new(&name, RpcTypeInfo::real(), buf.freeze())
929 }
930 SqlValue::Double(v) => {
931 let mut buf = BytesMut::with_capacity(8);
932 buf.put_f64_le(v);
933 RpcParam::new(&name, RpcTypeInfo::float(), buf.freeze())
934 }
935 SqlValue::String(ref s) => RpcParam::nvarchar(&name, s),
936 SqlValue::Binary(ref b) => {
937 RpcParam::new(&name, RpcTypeInfo::varbinary(b.len() as u16), b.clone())
938 }
939 SqlValue::Xml(ref s) => RpcParam::nvarchar(&name, s),
940 #[cfg(feature = "uuid")]
941 SqlValue::Uuid(u) => {
942 let bytes = u.as_bytes();
944 let mut buf = BytesMut::with_capacity(16);
945 buf.put_u32_le(u32::from_be_bytes([
947 bytes[0], bytes[1], bytes[2], bytes[3],
948 ]));
949 buf.put_u16_le(u16::from_be_bytes([bytes[4], bytes[5]]));
950 buf.put_u16_le(u16::from_be_bytes([bytes[6], bytes[7]]));
951 buf.put_slice(&bytes[8..16]);
952 RpcParam::new(&name, RpcTypeInfo::uniqueidentifier(), buf.freeze())
953 }
954 #[cfg(feature = "decimal")]
955 SqlValue::Decimal(d) => {
956 RpcParam::nvarchar(&name, &d.to_string())
958 }
959 #[cfg(feature = "chrono")]
960 SqlValue::Date(_)
961 | SqlValue::Time(_)
962 | SqlValue::DateTime(_)
963 | SqlValue::DateTimeOffset(_) => {
964 let s = match &sql_value {
967 SqlValue::Date(d) => d.to_string(),
968 SqlValue::Time(t) => t.to_string(),
969 SqlValue::DateTime(dt) => dt.to_string(),
970 SqlValue::DateTimeOffset(dto) => dto.to_rfc3339(),
971 _ => unreachable!(),
972 };
973 RpcParam::nvarchar(&name, &s)
974 }
975 #[cfg(feature = "json")]
976 SqlValue::Json(ref j) => RpcParam::nvarchar(&name, &j.to_string()),
977 SqlValue::Tvp(ref tvp_data) => {
978 Self::encode_tvp_param(&name, tvp_data)?
980 }
981 _ => {
983 return Err(Error::Type(mssql_types::TypeError::UnsupportedConversion {
984 from: sql_value.type_name().to_string(),
985 to: "RPC parameter",
986 }));
987 }
988 })
989 })
990 .collect()
991 }
992
993 fn encode_tvp_param(name: &str, tvp_data: &mssql_types::TvpData) -> Result<RpcParam> {
998 let wire_columns: Vec<TvpWireColumnDef> = tvp_data
1000 .columns
1001 .iter()
1002 .map(|col| {
1003 let wire_type = Self::convert_tvp_column_type(&col.column_type);
1004 TvpWireColumnDef {
1005 wire_type,
1006 flags: TvpColumnFlags {
1007 nullable: col.nullable,
1008 },
1009 }
1010 })
1011 .collect();
1012
1013 let encoder = TvpEncoder::new(&tvp_data.schema, &tvp_data.type_name, &wire_columns);
1015
1016 let mut buf = BytesMut::with_capacity(256);
1018
1019 encoder.encode_metadata(&mut buf);
1021
1022 for row in &tvp_data.rows {
1024 encoder.encode_row(&mut buf, |row_buf| {
1025 for (col_idx, value) in row.iter().enumerate() {
1026 let wire_type = &wire_columns[col_idx].wire_type;
1027 Self::encode_tvp_value(value, wire_type, row_buf);
1028 }
1029 });
1030 }
1031
1032 encoder.encode_end(&mut buf);
1034
1035 let type_info = RpcTypeInfo {
1039 type_id: 0xF3, max_length: None,
1041 precision: None,
1042 scale: None,
1043 collation: None,
1044 };
1045
1046 Ok(RpcParam {
1047 name: name.to_string(),
1048 flags: tds_protocol::rpc::ParamFlags::default(),
1049 type_info,
1050 value: Some(buf.freeze()),
1051 })
1052 }
1053
1054 fn convert_tvp_column_type(col_type: &mssql_types::TvpColumnType) -> TvpWireType {
1056 match col_type {
1057 mssql_types::TvpColumnType::Bit => TvpWireType::Bit,
1058 mssql_types::TvpColumnType::TinyInt => TvpWireType::Int { size: 1 },
1059 mssql_types::TvpColumnType::SmallInt => TvpWireType::Int { size: 2 },
1060 mssql_types::TvpColumnType::Int => TvpWireType::Int { size: 4 },
1061 mssql_types::TvpColumnType::BigInt => TvpWireType::Int { size: 8 },
1062 mssql_types::TvpColumnType::Real => TvpWireType::Float { size: 4 },
1063 mssql_types::TvpColumnType::Float => TvpWireType::Float { size: 8 },
1064 mssql_types::TvpColumnType::Decimal { precision, scale } => TvpWireType::Decimal {
1065 precision: *precision,
1066 scale: *scale,
1067 },
1068 mssql_types::TvpColumnType::NVarChar { max_length } => TvpWireType::NVarChar {
1069 max_length: *max_length,
1070 },
1071 mssql_types::TvpColumnType::VarChar { max_length } => TvpWireType::VarChar {
1072 max_length: *max_length,
1073 },
1074 mssql_types::TvpColumnType::VarBinary { max_length } => TvpWireType::VarBinary {
1075 max_length: *max_length,
1076 },
1077 mssql_types::TvpColumnType::UniqueIdentifier => TvpWireType::Guid,
1078 mssql_types::TvpColumnType::Date => TvpWireType::Date,
1079 mssql_types::TvpColumnType::Time { scale } => TvpWireType::Time { scale: *scale },
1080 mssql_types::TvpColumnType::DateTime2 { scale } => {
1081 TvpWireType::DateTime2 { scale: *scale }
1082 }
1083 mssql_types::TvpColumnType::DateTimeOffset { scale } => {
1084 TvpWireType::DateTimeOffset { scale: *scale }
1085 }
1086 mssql_types::TvpColumnType::Xml => TvpWireType::Xml,
1087 }
1088 }
1089
1090 fn encode_tvp_value(
1092 value: &mssql_types::SqlValue,
1093 wire_type: &TvpWireType,
1094 buf: &mut BytesMut,
1095 ) {
1096 use mssql_types::SqlValue;
1097
1098 match value {
1099 SqlValue::Null => {
1100 encode_tvp_null(wire_type, buf);
1101 }
1102 SqlValue::Bool(v) => {
1103 encode_tvp_bit(*v, buf);
1104 }
1105 SqlValue::TinyInt(v) => {
1106 encode_tvp_int(*v as i64, 1, buf);
1107 }
1108 SqlValue::SmallInt(v) => {
1109 encode_tvp_int(*v as i64, 2, buf);
1110 }
1111 SqlValue::Int(v) => {
1112 encode_tvp_int(*v as i64, 4, buf);
1113 }
1114 SqlValue::BigInt(v) => {
1115 encode_tvp_int(*v, 8, buf);
1116 }
1117 SqlValue::Float(v) => {
1118 encode_tvp_float(*v as f64, 4, buf);
1119 }
1120 SqlValue::Double(v) => {
1121 encode_tvp_float(*v, 8, buf);
1122 }
1123 SqlValue::String(s) => {
1124 let max_len = match wire_type {
1125 TvpWireType::NVarChar { max_length } => *max_length,
1126 _ => 4000,
1127 };
1128 encode_tvp_nvarchar(s, max_len, buf);
1129 }
1130 SqlValue::Binary(b) => {
1131 let max_len = match wire_type {
1132 TvpWireType::VarBinary { max_length } => *max_length,
1133 _ => 8000,
1134 };
1135 encode_tvp_varbinary(b, max_len, buf);
1136 }
1137 #[cfg(feature = "decimal")]
1138 SqlValue::Decimal(d) => {
1139 let sign = if d.is_sign_negative() { 0u8 } else { 1u8 };
1140 let mantissa = d.mantissa().unsigned_abs();
1141 encode_tvp_decimal(sign, mantissa, buf);
1142 }
1143 #[cfg(feature = "uuid")]
1144 SqlValue::Uuid(u) => {
1145 let bytes = u.as_bytes();
1146 tds_protocol::tvp::encode_tvp_guid(bytes, buf);
1147 }
1148 #[cfg(feature = "chrono")]
1149 SqlValue::Date(d) => {
1150 let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1152 let days = d.signed_duration_since(base).num_days() as u32;
1153 tds_protocol::tvp::encode_tvp_date(days, buf);
1154 }
1155 #[cfg(feature = "chrono")]
1156 SqlValue::Time(t) => {
1157 use chrono::Timelike;
1158 let nanos =
1159 t.num_seconds_from_midnight() as u64 * 1_000_000_000 + t.nanosecond() as u64;
1160 let intervals = nanos / 100;
1161 let scale = match wire_type {
1162 TvpWireType::Time { scale } => *scale,
1163 _ => 7,
1164 };
1165 tds_protocol::tvp::encode_tvp_time(intervals, scale, buf);
1166 }
1167 #[cfg(feature = "chrono")]
1168 SqlValue::DateTime(dt) => {
1169 use chrono::Timelike;
1170 let nanos = dt.time().num_seconds_from_midnight() as u64 * 1_000_000_000
1172 + dt.time().nanosecond() as u64;
1173 let intervals = nanos / 100;
1174 let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1176 let days = dt.date().signed_duration_since(base).num_days() as u32;
1177 let scale = match wire_type {
1178 TvpWireType::DateTime2 { scale } => *scale,
1179 _ => 7,
1180 };
1181 tds_protocol::tvp::encode_tvp_datetime2(intervals, days, scale, buf);
1182 }
1183 #[cfg(feature = "chrono")]
1184 SqlValue::DateTimeOffset(dto) => {
1185 use chrono::{Offset, Timelike};
1186 let nanos = dto.time().num_seconds_from_midnight() as u64 * 1_000_000_000
1188 + dto.time().nanosecond() as u64;
1189 let intervals = nanos / 100;
1190 let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1192 let days = dto.date_naive().signed_duration_since(base).num_days() as u32;
1193 let offset_minutes = (dto.offset().fix().local_minus_utc() / 60) as i16;
1195 let scale = match wire_type {
1196 TvpWireType::DateTimeOffset { scale } => *scale,
1197 _ => 7,
1198 };
1199 tds_protocol::tvp::encode_tvp_datetimeoffset(
1200 intervals,
1201 days,
1202 offset_minutes,
1203 scale,
1204 buf,
1205 );
1206 }
1207 #[cfg(feature = "json")]
1208 SqlValue::Json(j) => {
1209 encode_tvp_nvarchar(&j.to_string(), 0xFFFF, buf);
1211 }
1212 SqlValue::Xml(s) => {
1213 encode_tvp_nvarchar(s, 0xFFFF, buf);
1215 }
1216 SqlValue::Tvp(_) => {
1217 encode_tvp_null(wire_type, buf);
1219 }
1220 _ => {
1222 encode_tvp_null(wire_type, buf);
1223 }
1224 }
1225 }
1226
1227 async fn read_query_response(
1229 &mut self,
1230 ) -> Result<(Vec<crate::row::Column>, Vec<crate::row::Row>)> {
1231 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
1232
1233 let message = match connection {
1234 ConnectionHandle::Tls(conn) => conn
1235 .read_message()
1236 .await
1237 .map_err(|e| Error::Protocol(e.to_string()))?,
1238 ConnectionHandle::TlsPrelogin(conn) => conn
1239 .read_message()
1240 .await
1241 .map_err(|e| Error::Protocol(e.to_string()))?,
1242 ConnectionHandle::Plain(conn) => conn
1243 .read_message()
1244 .await
1245 .map_err(|e| Error::Protocol(e.to_string()))?,
1246 }
1247 .ok_or(Error::ConnectionClosed)?;
1248
1249 let mut parser = TokenParser::new(message.payload);
1250 let mut columns: Vec<crate::row::Column> = Vec::new();
1251 let mut rows: Vec<crate::row::Row> = Vec::new();
1252 let mut protocol_metadata: Option<ColMetaData> = None;
1253
1254 loop {
1255 let token = parser
1257 .next_token_with_metadata(protocol_metadata.as_ref())
1258 .map_err(|e| Error::Protocol(e.to_string()))?;
1259
1260 let Some(token) = token else {
1261 break;
1262 };
1263
1264 match token {
1265 Token::ColMetaData(meta) => {
1266 rows.clear();
1269
1270 columns = meta
1271 .columns
1272 .iter()
1273 .enumerate()
1274 .map(|(i, col)| {
1275 let type_name = format!("{:?}", col.type_id);
1276 let mut column = crate::row::Column::new(&col.name, i, type_name)
1277 .with_nullable(col.flags & 0x01 != 0);
1278
1279 if let Some(max_len) = col.type_info.max_length {
1280 column = column.with_max_length(max_len);
1281 }
1282 if let (Some(prec), Some(scale)) =
1283 (col.type_info.precision, col.type_info.scale)
1284 {
1285 column = column.with_precision_scale(prec, scale);
1286 }
1287 column
1288 })
1289 .collect();
1290
1291 tracing::debug!(columns = columns.len(), "received column metadata");
1292 protocol_metadata = Some(meta);
1293 }
1294 Token::Row(raw_row) => {
1295 if let Some(ref meta) = protocol_metadata {
1296 let row = Self::convert_raw_row(&raw_row, meta, &columns)?;
1297 rows.push(row);
1298 }
1299 }
1300 Token::NbcRow(nbc_row) => {
1301 if let Some(ref meta) = protocol_metadata {
1302 let row = Self::convert_nbc_row(&nbc_row, meta, &columns)?;
1303 rows.push(row);
1304 }
1305 }
1306 Token::Error(err) => {
1307 return Err(Error::Server {
1308 number: err.number,
1309 state: err.state,
1310 class: err.class,
1311 message: err.message.clone(),
1312 server: if err.server.is_empty() {
1313 None
1314 } else {
1315 Some(err.server.clone())
1316 },
1317 procedure: if err.procedure.is_empty() {
1318 None
1319 } else {
1320 Some(err.procedure.clone())
1321 },
1322 line: err.line as u32,
1323 });
1324 }
1325 Token::Done(done) => {
1326 if done.status.error {
1327 return Err(Error::Query("query failed".to_string()));
1328 }
1329 tracing::debug!(
1330 row_count = done.row_count,
1331 has_more = done.status.more,
1332 "query complete"
1333 );
1334 if !done.status.more {
1337 break;
1338 }
1339 }
1340 Token::DoneProc(done) => {
1341 if done.status.error {
1342 return Err(Error::Query("query failed".to_string()));
1343 }
1344 }
1345 Token::DoneInProc(done) => {
1346 if done.status.error {
1347 return Err(Error::Query("query failed".to_string()));
1348 }
1349 }
1350 Token::Info(info) => {
1351 tracing::debug!(
1352 number = info.number,
1353 message = %info.message,
1354 "server info message"
1355 );
1356 }
1357 Token::EnvChange(env) => {
1358 Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
1362 }
1363 _ => {}
1364 }
1365 }
1366
1367 tracing::debug!(
1368 columns = columns.len(),
1369 rows = rows.len(),
1370 "query response parsed"
1371 );
1372 Ok((columns, rows))
1373 }
1374
1375 fn convert_raw_row(
1379 raw: &RawRow,
1380 meta: &ColMetaData,
1381 columns: &[crate::row::Column],
1382 ) -> Result<crate::row::Row> {
1383 let mut values = Vec::with_capacity(meta.columns.len());
1384 let mut buf = raw.data.as_ref();
1385
1386 for col in &meta.columns {
1387 let value = Self::parse_column_value(&mut buf, col)?;
1388 values.push(value);
1389 }
1390
1391 Ok(crate::row::Row::from_values(columns.to_vec(), values))
1392 }
1393
1394 fn convert_nbc_row(
1398 nbc: &NbcRow,
1399 meta: &ColMetaData,
1400 columns: &[crate::row::Column],
1401 ) -> Result<crate::row::Row> {
1402 let mut values = Vec::with_capacity(meta.columns.len());
1403 let mut buf = nbc.data.as_ref();
1404
1405 for (i, col) in meta.columns.iter().enumerate() {
1406 if nbc.is_null(i) {
1407 values.push(mssql_types::SqlValue::Null);
1408 } else {
1409 let value = Self::parse_column_value(&mut buf, col)?;
1410 values.push(value);
1411 }
1412 }
1413
1414 Ok(crate::row::Row::from_values(columns.to_vec(), values))
1415 }
1416
1417 fn parse_column_value(buf: &mut &[u8], col: &ColumnData) -> Result<mssql_types::SqlValue> {
1419 use bytes::Buf;
1420 use mssql_types::SqlValue;
1421 use tds_protocol::types::TypeId;
1422
1423 let value = match col.type_id {
1424 TypeId::Null => SqlValue::Null,
1426
1427 TypeId::Int1 => {
1429 if buf.remaining() < 1 {
1430 return Err(Error::Protocol("unexpected EOF reading TINYINT".into()));
1431 }
1432 SqlValue::TinyInt(buf.get_u8())
1433 }
1434 TypeId::Bit => {
1435 if buf.remaining() < 1 {
1436 return Err(Error::Protocol("unexpected EOF reading BIT".into()));
1437 }
1438 SqlValue::Bool(buf.get_u8() != 0)
1439 }
1440
1441 TypeId::Int2 => {
1443 if buf.remaining() < 2 {
1444 return Err(Error::Protocol("unexpected EOF reading SMALLINT".into()));
1445 }
1446 SqlValue::SmallInt(buf.get_i16_le())
1447 }
1448
1449 TypeId::Int4 => {
1451 if buf.remaining() < 4 {
1452 return Err(Error::Protocol("unexpected EOF reading INT".into()));
1453 }
1454 SqlValue::Int(buf.get_i32_le())
1455 }
1456 TypeId::Float4 => {
1457 if buf.remaining() < 4 {
1458 return Err(Error::Protocol("unexpected EOF reading REAL".into()));
1459 }
1460 SqlValue::Float(buf.get_f32_le())
1461 }
1462
1463 TypeId::Int8 => {
1465 if buf.remaining() < 8 {
1466 return Err(Error::Protocol("unexpected EOF reading BIGINT".into()));
1467 }
1468 SqlValue::BigInt(buf.get_i64_le())
1469 }
1470 TypeId::Float8 => {
1471 if buf.remaining() < 8 {
1472 return Err(Error::Protocol("unexpected EOF reading FLOAT".into()));
1473 }
1474 SqlValue::Double(buf.get_f64_le())
1475 }
1476 TypeId::Money => {
1477 if buf.remaining() < 8 {
1478 return Err(Error::Protocol("unexpected EOF reading MONEY".into()));
1479 }
1480 let high = buf.get_i32_le();
1482 let low = buf.get_u32_le();
1483 let cents = ((high as i64) << 32) | (low as i64);
1484 let value = (cents as f64) / 10000.0;
1485 SqlValue::Double(value)
1486 }
1487 TypeId::Money4 => {
1488 if buf.remaining() < 4 {
1489 return Err(Error::Protocol("unexpected EOF reading SMALLMONEY".into()));
1490 }
1491 let cents = buf.get_i32_le();
1492 let value = (cents as f64) / 10000.0;
1493 SqlValue::Double(value)
1494 }
1495
1496 TypeId::IntN => {
1498 if buf.remaining() < 1 {
1499 return Err(Error::Protocol("unexpected EOF reading IntN length".into()));
1500 }
1501 let len = buf.get_u8();
1502 match len {
1503 0 => SqlValue::Null,
1504 1 => SqlValue::TinyInt(buf.get_u8()),
1505 2 => SqlValue::SmallInt(buf.get_i16_le()),
1506 4 => SqlValue::Int(buf.get_i32_le()),
1507 8 => SqlValue::BigInt(buf.get_i64_le()),
1508 _ => {
1509 return Err(Error::Protocol(format!("invalid IntN length: {len}")));
1510 }
1511 }
1512 }
1513 TypeId::FloatN => {
1514 if buf.remaining() < 1 {
1515 return Err(Error::Protocol(
1516 "unexpected EOF reading FloatN length".into(),
1517 ));
1518 }
1519 let len = buf.get_u8();
1520 match len {
1521 0 => SqlValue::Null,
1522 4 => SqlValue::Float(buf.get_f32_le()),
1523 8 => SqlValue::Double(buf.get_f64_le()),
1524 _ => {
1525 return Err(Error::Protocol(format!("invalid FloatN length: {len}")));
1526 }
1527 }
1528 }
1529 TypeId::BitN => {
1530 if buf.remaining() < 1 {
1531 return Err(Error::Protocol("unexpected EOF reading BitN length".into()));
1532 }
1533 let len = buf.get_u8();
1534 match len {
1535 0 => SqlValue::Null,
1536 1 => SqlValue::Bool(buf.get_u8() != 0),
1537 _ => {
1538 return Err(Error::Protocol(format!("invalid BitN length: {len}")));
1539 }
1540 }
1541 }
1542 TypeId::MoneyN => {
1543 if buf.remaining() < 1 {
1544 return Err(Error::Protocol(
1545 "unexpected EOF reading MoneyN length".into(),
1546 ));
1547 }
1548 let len = buf.get_u8();
1549 match len {
1550 0 => SqlValue::Null,
1551 4 => {
1552 let cents = buf.get_i32_le();
1553 SqlValue::Double((cents as f64) / 10000.0)
1554 }
1555 8 => {
1556 let high = buf.get_i32_le();
1557 let low = buf.get_u32_le();
1558 let cents = ((high as i64) << 32) | (low as i64);
1559 SqlValue::Double((cents as f64) / 10000.0)
1560 }
1561 _ => {
1562 return Err(Error::Protocol(format!("invalid MoneyN length: {len}")));
1563 }
1564 }
1565 }
1566 TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1568 if buf.remaining() < 1 {
1569 return Err(Error::Protocol(
1570 "unexpected EOF reading DECIMAL/NUMERIC length".into(),
1571 ));
1572 }
1573 let len = buf.get_u8() as usize;
1574 if len == 0 {
1575 SqlValue::Null
1576 } else {
1577 if buf.remaining() < len {
1578 return Err(Error::Protocol(
1579 "unexpected EOF reading DECIMAL/NUMERIC data".into(),
1580 ));
1581 }
1582
1583 let sign = buf.get_u8();
1585 let mantissa_len = len - 1;
1586
1587 let mut mantissa_bytes = [0u8; 16];
1589 for i in 0..mantissa_len.min(16) {
1590 mantissa_bytes[i] = buf.get_u8();
1591 }
1592 for _ in 16..mantissa_len {
1594 buf.get_u8();
1595 }
1596
1597 let mantissa = u128::from_le_bytes(mantissa_bytes);
1598 let scale = col.type_info.scale.unwrap_or(0) as u32;
1599
1600 #[cfg(feature = "decimal")]
1601 {
1602 use rust_decimal::Decimal;
1603 if scale > 28 {
1606 let divisor = 10f64.powi(scale as i32);
1608 let value = (mantissa as f64) / divisor;
1609 let value = if sign == 0 { -value } else { value };
1610 SqlValue::Double(value)
1611 } else {
1612 let mut decimal =
1613 Decimal::from_i128_with_scale(mantissa as i128, scale);
1614 if sign == 0 {
1615 decimal.set_sign_negative(true);
1616 }
1617 SqlValue::Decimal(decimal)
1618 }
1619 }
1620
1621 #[cfg(not(feature = "decimal"))]
1622 {
1623 let divisor = 10f64.powi(scale as i32);
1625 let value = (mantissa as f64) / divisor;
1626 let value = if sign == 0 { -value } else { value };
1627 SqlValue::Double(value)
1628 }
1629 }
1630 }
1631
1632 TypeId::DateTimeN => {
1634 if buf.remaining() < 1 {
1635 return Err(Error::Protocol(
1636 "unexpected EOF reading DateTimeN length".into(),
1637 ));
1638 }
1639 let len = buf.get_u8() as usize;
1640 if len == 0 {
1641 SqlValue::Null
1642 } else if buf.remaining() < len {
1643 return Err(Error::Protocol("unexpected EOF reading DateTimeN".into()));
1644 } else {
1645 match len {
1646 4 => {
1647 let days = buf.get_u16_le() as i64;
1649 let minutes = buf.get_u16_le() as u32;
1650 #[cfg(feature = "chrono")]
1651 {
1652 let base = chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap();
1653 let date = base + chrono::Duration::days(days);
1654 let time = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
1655 minutes * 60,
1656 0,
1657 )
1658 .unwrap();
1659 SqlValue::DateTime(date.and_time(time))
1660 }
1661 #[cfg(not(feature = "chrono"))]
1662 {
1663 SqlValue::String(format!("SMALLDATETIME({days},{minutes})"))
1664 }
1665 }
1666 8 => {
1667 let days = buf.get_i32_le() as i64;
1669 let time_300ths = buf.get_u32_le() as u64;
1670 #[cfg(feature = "chrono")]
1671 {
1672 let base = chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap();
1673 let date = base + chrono::Duration::days(days);
1674 let total_ms = (time_300ths * 1000) / 300;
1676 let secs = (total_ms / 1000) as u32;
1677 let nanos = ((total_ms % 1000) * 1_000_000) as u32;
1678 let time = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
1679 secs, nanos,
1680 )
1681 .unwrap();
1682 SqlValue::DateTime(date.and_time(time))
1683 }
1684 #[cfg(not(feature = "chrono"))]
1685 {
1686 SqlValue::String(format!("DATETIME({days},{time_300ths})"))
1687 }
1688 }
1689 _ => {
1690 return Err(Error::Protocol(format!(
1691 "invalid DateTimeN length: {len}"
1692 )));
1693 }
1694 }
1695 }
1696 }
1697
1698 TypeId::DateTime => {
1700 if buf.remaining() < 8 {
1701 return Err(Error::Protocol("unexpected EOF reading DATETIME".into()));
1702 }
1703 let days = buf.get_i32_le() as i64;
1704 let time_300ths = buf.get_u32_le() as u64;
1705 #[cfg(feature = "chrono")]
1706 {
1707 let base = chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap();
1708 let date = base + chrono::Duration::days(days);
1709 let total_ms = (time_300ths * 1000) / 300;
1710 let secs = (total_ms / 1000) as u32;
1711 let nanos = ((total_ms % 1000) * 1_000_000) as u32;
1712 let time =
1713 chrono::NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap();
1714 SqlValue::DateTime(date.and_time(time))
1715 }
1716 #[cfg(not(feature = "chrono"))]
1717 {
1718 SqlValue::String(format!("DATETIME({days},{time_300ths})"))
1719 }
1720 }
1721
1722 TypeId::DateTime4 => {
1724 if buf.remaining() < 4 {
1725 return Err(Error::Protocol(
1726 "unexpected EOF reading SMALLDATETIME".into(),
1727 ));
1728 }
1729 let days = buf.get_u16_le() as i64;
1730 let minutes = buf.get_u16_le() as u32;
1731 #[cfg(feature = "chrono")]
1732 {
1733 let base = chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap();
1734 let date = base + chrono::Duration::days(days);
1735 let time =
1736 chrono::NaiveTime::from_num_seconds_from_midnight_opt(minutes * 60, 0)
1737 .unwrap();
1738 SqlValue::DateTime(date.and_time(time))
1739 }
1740 #[cfg(not(feature = "chrono"))]
1741 {
1742 SqlValue::String(format!("SMALLDATETIME({days},{minutes})"))
1743 }
1744 }
1745
1746 TypeId::Date => {
1748 if buf.remaining() < 1 {
1749 return Err(Error::Protocol("unexpected EOF reading DATE length".into()));
1750 }
1751 let len = buf.get_u8() as usize;
1752 if len == 0 {
1753 SqlValue::Null
1754 } else if len != 3 {
1755 return Err(Error::Protocol(format!("invalid DATE length: {len}")));
1756 } else if buf.remaining() < 3 {
1757 return Err(Error::Protocol("unexpected EOF reading DATE".into()));
1758 } else {
1759 let days = buf.get_u8() as u32
1761 | ((buf.get_u8() as u32) << 8)
1762 | ((buf.get_u8() as u32) << 16);
1763 #[cfg(feature = "chrono")]
1764 {
1765 let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1766 let date = base + chrono::Duration::days(days as i64);
1767 SqlValue::Date(date)
1768 }
1769 #[cfg(not(feature = "chrono"))]
1770 {
1771 SqlValue::String(format!("DATE({days})"))
1772 }
1773 }
1774 }
1775
1776 TypeId::Time => {
1778 if buf.remaining() < 1 {
1779 return Err(Error::Protocol("unexpected EOF reading TIME length".into()));
1780 }
1781 let len = buf.get_u8() as usize;
1782 if len == 0 {
1783 SqlValue::Null
1784 } else if buf.remaining() < len {
1785 return Err(Error::Protocol("unexpected EOF reading TIME".into()));
1786 } else {
1787 let scale = col.type_info.scale.unwrap_or(7);
1788 let mut time_bytes = [0u8; 8];
1789 for byte in time_bytes.iter_mut().take(len) {
1790 *byte = buf.get_u8();
1791 }
1792 let intervals = u64::from_le_bytes(time_bytes);
1793 #[cfg(feature = "chrono")]
1794 {
1795 let time = Self::intervals_to_time(intervals, scale);
1796 SqlValue::Time(time)
1797 }
1798 #[cfg(not(feature = "chrono"))]
1799 {
1800 SqlValue::String(format!("TIME({intervals})"))
1801 }
1802 }
1803 }
1804
1805 TypeId::DateTime2 => {
1807 if buf.remaining() < 1 {
1808 return Err(Error::Protocol(
1809 "unexpected EOF reading DATETIME2 length".into(),
1810 ));
1811 }
1812 let len = buf.get_u8() as usize;
1813 if len == 0 {
1814 SqlValue::Null
1815 } else if buf.remaining() < len {
1816 return Err(Error::Protocol("unexpected EOF reading DATETIME2".into()));
1817 } else {
1818 let scale = col.type_info.scale.unwrap_or(7);
1819 let time_len = Self::time_bytes_for_scale(scale);
1820
1821 let mut time_bytes = [0u8; 8];
1823 for byte in time_bytes.iter_mut().take(time_len) {
1824 *byte = buf.get_u8();
1825 }
1826 let intervals = u64::from_le_bytes(time_bytes);
1827
1828 let days = buf.get_u8() as u32
1830 | ((buf.get_u8() as u32) << 8)
1831 | ((buf.get_u8() as u32) << 16);
1832
1833 #[cfg(feature = "chrono")]
1834 {
1835 let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1836 let date = base + chrono::Duration::days(days as i64);
1837 let time = Self::intervals_to_time(intervals, scale);
1838 SqlValue::DateTime(date.and_time(time))
1839 }
1840 #[cfg(not(feature = "chrono"))]
1841 {
1842 SqlValue::String(format!("DATETIME2({days},{intervals})"))
1843 }
1844 }
1845 }
1846
1847 TypeId::DateTimeOffset => {
1849 if buf.remaining() < 1 {
1850 return Err(Error::Protocol(
1851 "unexpected EOF reading DATETIMEOFFSET length".into(),
1852 ));
1853 }
1854 let len = buf.get_u8() as usize;
1855 if len == 0 {
1856 SqlValue::Null
1857 } else if buf.remaining() < len {
1858 return Err(Error::Protocol(
1859 "unexpected EOF reading DATETIMEOFFSET".into(),
1860 ));
1861 } else {
1862 let scale = col.type_info.scale.unwrap_or(7);
1863 let time_len = Self::time_bytes_for_scale(scale);
1864
1865 let mut time_bytes = [0u8; 8];
1867 for byte in time_bytes.iter_mut().take(time_len) {
1868 *byte = buf.get_u8();
1869 }
1870 let intervals = u64::from_le_bytes(time_bytes);
1871
1872 let days = buf.get_u8() as u32
1874 | ((buf.get_u8() as u32) << 8)
1875 | ((buf.get_u8() as u32) << 16);
1876
1877 let offset_minutes = buf.get_i16_le();
1879
1880 #[cfg(feature = "chrono")]
1881 {
1882 use chrono::TimeZone;
1883 let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1884 let date = base + chrono::Duration::days(days as i64);
1885 let time = Self::intervals_to_time(intervals, scale);
1886 let offset = chrono::FixedOffset::east_opt((offset_minutes as i32) * 60)
1887 .unwrap_or_else(|| chrono::FixedOffset::east_opt(0).unwrap());
1888 let datetime = offset
1889 .from_local_datetime(&date.and_time(time))
1890 .single()
1891 .unwrap_or_else(|| offset.from_utc_datetime(&date.and_time(time)));
1892 SqlValue::DateTimeOffset(datetime)
1893 }
1894 #[cfg(not(feature = "chrono"))]
1895 {
1896 SqlValue::String(format!(
1897 "DATETIMEOFFSET({days},{intervals},{offset_minutes})"
1898 ))
1899 }
1900 }
1901 }
1902
1903 TypeId::Text => Self::parse_plp_varchar(buf)?,
1905
1906 TypeId::Char | TypeId::VarChar => {
1908 if buf.remaining() < 1 {
1909 return Err(Error::Protocol(
1910 "unexpected EOF reading legacy varchar length".into(),
1911 ));
1912 }
1913 let len = buf.get_u8();
1914 if len == 0xFF {
1915 SqlValue::Null
1916 } else if len == 0 {
1917 SqlValue::String(String::new())
1918 } else if buf.remaining() < len as usize {
1919 return Err(Error::Protocol(
1920 "unexpected EOF reading legacy varchar data".into(),
1921 ));
1922 } else {
1923 let data = &buf[..len as usize];
1924 let s = String::from_utf8_lossy(data).into_owned();
1925 buf.advance(len as usize);
1926 SqlValue::String(s)
1927 }
1928 }
1929
1930 TypeId::BigVarChar | TypeId::BigChar => {
1932 if col.type_info.max_length == Some(0xFFFF) {
1934 Self::parse_plp_varchar(buf)?
1936 } else {
1937 if buf.remaining() < 2 {
1939 return Err(Error::Protocol(
1940 "unexpected EOF reading varchar length".into(),
1941 ));
1942 }
1943 let len = buf.get_u16_le();
1944 if len == 0xFFFF {
1945 SqlValue::Null
1946 } else if buf.remaining() < len as usize {
1947 return Err(Error::Protocol(
1948 "unexpected EOF reading varchar data".into(),
1949 ));
1950 } else {
1951 let data = &buf[..len as usize];
1952 let s = String::from_utf8_lossy(data).into_owned();
1953 buf.advance(len as usize);
1954 SqlValue::String(s)
1955 }
1956 }
1957 }
1958
1959 TypeId::NText => Self::parse_plp_nvarchar(buf)?,
1961
1962 TypeId::NVarChar | TypeId::NChar => {
1964 if col.type_info.max_length == Some(0xFFFF) {
1966 Self::parse_plp_nvarchar(buf)?
1968 } else {
1969 if buf.remaining() < 2 {
1971 return Err(Error::Protocol(
1972 "unexpected EOF reading nvarchar length".into(),
1973 ));
1974 }
1975 let len = buf.get_u16_le();
1976 if len == 0xFFFF {
1977 SqlValue::Null
1978 } else if buf.remaining() < len as usize {
1979 return Err(Error::Protocol(
1980 "unexpected EOF reading nvarchar data".into(),
1981 ));
1982 } else {
1983 let data = &buf[..len as usize];
1984 let utf16: Vec<u16> = data
1986 .chunks_exact(2)
1987 .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
1988 .collect();
1989 let s = String::from_utf16(&utf16)
1990 .map_err(|_| Error::Protocol("invalid UTF-16 in nvarchar".into()))?;
1991 buf.advance(len as usize);
1992 SqlValue::String(s)
1993 }
1994 }
1995 }
1996
1997 TypeId::Image => Self::parse_plp_varbinary(buf)?,
1999
2000 TypeId::Binary | TypeId::VarBinary => {
2002 if buf.remaining() < 1 {
2003 return Err(Error::Protocol(
2004 "unexpected EOF reading legacy varbinary length".into(),
2005 ));
2006 }
2007 let len = buf.get_u8();
2008 if len == 0xFF {
2009 SqlValue::Null
2010 } else if len == 0 {
2011 SqlValue::Binary(bytes::Bytes::new())
2012 } else if buf.remaining() < len as usize {
2013 return Err(Error::Protocol(
2014 "unexpected EOF reading legacy varbinary data".into(),
2015 ));
2016 } else {
2017 let data = bytes::Bytes::copy_from_slice(&buf[..len as usize]);
2018 buf.advance(len as usize);
2019 SqlValue::Binary(data)
2020 }
2021 }
2022
2023 TypeId::BigVarBinary | TypeId::BigBinary => {
2025 if col.type_info.max_length == Some(0xFFFF) {
2027 Self::parse_plp_varbinary(buf)?
2029 } else {
2030 if buf.remaining() < 2 {
2031 return Err(Error::Protocol(
2032 "unexpected EOF reading varbinary length".into(),
2033 ));
2034 }
2035 let len = buf.get_u16_le();
2036 if len == 0xFFFF {
2037 SqlValue::Null
2038 } else if buf.remaining() < len as usize {
2039 return Err(Error::Protocol(
2040 "unexpected EOF reading varbinary data".into(),
2041 ));
2042 } else {
2043 let data = bytes::Bytes::copy_from_slice(&buf[..len as usize]);
2044 buf.advance(len as usize);
2045 SqlValue::Binary(data)
2046 }
2047 }
2048 }
2049
2050 TypeId::Xml => {
2052 match Self::parse_plp_nvarchar(buf)? {
2054 SqlValue::Null => SqlValue::Null,
2055 SqlValue::String(s) => SqlValue::Xml(s),
2056 _ => {
2057 return Err(Error::Protocol(
2058 "unexpected value type when parsing XML".into(),
2059 ));
2060 }
2061 }
2062 }
2063
2064 TypeId::Guid => {
2066 if buf.remaining() < 1 {
2067 return Err(Error::Protocol("unexpected EOF reading GUID length".into()));
2068 }
2069 let len = buf.get_u8();
2070 if len == 0 {
2071 SqlValue::Null
2072 } else if len != 16 {
2073 return Err(Error::Protocol(format!("invalid GUID length: {len}")));
2074 } else if buf.remaining() < 16 {
2075 return Err(Error::Protocol("unexpected EOF reading GUID".into()));
2076 } else {
2077 let data = bytes::Bytes::copy_from_slice(&buf[..16]);
2079 buf.advance(16);
2080 SqlValue::Binary(data)
2081 }
2082 }
2083
2084 TypeId::Variant => Self::parse_sql_variant(buf)?,
2086
2087 TypeId::Udt => Self::parse_plp_varbinary(buf)?,
2089
2090 _ => {
2092 if buf.remaining() < 2 {
2094 return Err(Error::Protocol(format!(
2095 "unexpected EOF reading {:?}",
2096 col.type_id
2097 )));
2098 }
2099 let len = buf.get_u16_le();
2100 if len == 0xFFFF {
2101 SqlValue::Null
2102 } else if buf.remaining() < len as usize {
2103 return Err(Error::Protocol(format!(
2104 "unexpected EOF reading {:?} data",
2105 col.type_id
2106 )));
2107 } else {
2108 let data = bytes::Bytes::copy_from_slice(&buf[..len as usize]);
2109 buf.advance(len as usize);
2110 SqlValue::Binary(data)
2111 }
2112 }
2113 };
2114
2115 Ok(value)
2116 }
2117
2118 fn parse_plp_nvarchar(buf: &mut &[u8]) -> Result<mssql_types::SqlValue> {
2124 use bytes::Buf;
2125 use mssql_types::SqlValue;
2126
2127 if buf.remaining() < 8 {
2128 return Err(Error::Protocol(
2129 "unexpected EOF reading PLP total length".into(),
2130 ));
2131 }
2132
2133 let total_len = buf.get_u64_le();
2134 if total_len == 0xFFFFFFFFFFFFFFFF {
2135 return Ok(SqlValue::Null);
2136 }
2137
2138 let mut all_data = Vec::new();
2140 loop {
2141 if buf.remaining() < 4 {
2142 return Err(Error::Protocol(
2143 "unexpected EOF reading PLP chunk length".into(),
2144 ));
2145 }
2146 let chunk_len = buf.get_u32_le() as usize;
2147 if chunk_len == 0 {
2148 break; }
2150 if buf.remaining() < chunk_len {
2151 return Err(Error::Protocol(
2152 "unexpected EOF reading PLP chunk data".into(),
2153 ));
2154 }
2155 all_data.extend_from_slice(&buf[..chunk_len]);
2156 buf.advance(chunk_len);
2157 }
2158
2159 let utf16: Vec<u16> = all_data
2161 .chunks_exact(2)
2162 .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
2163 .collect();
2164 let s = String::from_utf16(&utf16)
2165 .map_err(|_| Error::Protocol("invalid UTF-16 in PLP nvarchar".into()))?;
2166 Ok(SqlValue::String(s))
2167 }
2168
2169 fn parse_plp_varchar(buf: &mut &[u8]) -> Result<mssql_types::SqlValue> {
2171 use bytes::Buf;
2172 use mssql_types::SqlValue;
2173
2174 if buf.remaining() < 8 {
2175 return Err(Error::Protocol(
2176 "unexpected EOF reading PLP total length".into(),
2177 ));
2178 }
2179
2180 let total_len = buf.get_u64_le();
2181 if total_len == 0xFFFFFFFFFFFFFFFF {
2182 return Ok(SqlValue::Null);
2183 }
2184
2185 let mut all_data = Vec::new();
2187 loop {
2188 if buf.remaining() < 4 {
2189 return Err(Error::Protocol(
2190 "unexpected EOF reading PLP chunk length".into(),
2191 ));
2192 }
2193 let chunk_len = buf.get_u32_le() as usize;
2194 if chunk_len == 0 {
2195 break; }
2197 if buf.remaining() < chunk_len {
2198 return Err(Error::Protocol(
2199 "unexpected EOF reading PLP chunk data".into(),
2200 ));
2201 }
2202 all_data.extend_from_slice(&buf[..chunk_len]);
2203 buf.advance(chunk_len);
2204 }
2205
2206 let s = String::from_utf8_lossy(&all_data).into_owned();
2208 Ok(SqlValue::String(s))
2209 }
2210
2211 fn parse_plp_varbinary(buf: &mut &[u8]) -> Result<mssql_types::SqlValue> {
2213 use bytes::Buf;
2214 use mssql_types::SqlValue;
2215
2216 if buf.remaining() < 8 {
2217 return Err(Error::Protocol(
2218 "unexpected EOF reading PLP total length".into(),
2219 ));
2220 }
2221
2222 let total_len = buf.get_u64_le();
2223 if total_len == 0xFFFFFFFFFFFFFFFF {
2224 return Ok(SqlValue::Null);
2225 }
2226
2227 let mut all_data = Vec::new();
2229 loop {
2230 if buf.remaining() < 4 {
2231 return Err(Error::Protocol(
2232 "unexpected EOF reading PLP chunk length".into(),
2233 ));
2234 }
2235 let chunk_len = buf.get_u32_le() as usize;
2236 if chunk_len == 0 {
2237 break; }
2239 if buf.remaining() < chunk_len {
2240 return Err(Error::Protocol(
2241 "unexpected EOF reading PLP chunk data".into(),
2242 ));
2243 }
2244 all_data.extend_from_slice(&buf[..chunk_len]);
2245 buf.advance(chunk_len);
2246 }
2247
2248 Ok(SqlValue::Binary(bytes::Bytes::from(all_data)))
2249 }
2250
2251 fn parse_sql_variant(buf: &mut &[u8]) -> Result<mssql_types::SqlValue> {
2260 use bytes::Buf;
2261 use mssql_types::SqlValue;
2262
2263 if buf.remaining() < 4 {
2265 return Err(Error::Protocol(
2266 "unexpected EOF reading SQL_VARIANT length".into(),
2267 ));
2268 }
2269 let total_len = buf.get_u32_le() as usize;
2270
2271 if total_len == 0 {
2272 return Ok(SqlValue::Null);
2273 }
2274
2275 if buf.remaining() < total_len {
2276 return Err(Error::Protocol(
2277 "unexpected EOF reading SQL_VARIANT data".into(),
2278 ));
2279 }
2280
2281 if total_len < 2 {
2283 return Err(Error::Protocol(
2284 "SQL_VARIANT too short for type info".into(),
2285 ));
2286 }
2287
2288 let base_type = buf.get_u8();
2289 let prop_count = buf.get_u8() as usize;
2290
2291 if buf.remaining() < prop_count {
2292 return Err(Error::Protocol(
2293 "unexpected EOF reading SQL_VARIANT properties".into(),
2294 ));
2295 }
2296
2297 let data_len = total_len.saturating_sub(2).saturating_sub(prop_count);
2299
2300 match base_type {
2303 0x30 => {
2305 buf.advance(prop_count);
2307 if data_len < 1 {
2308 return Ok(SqlValue::Null);
2309 }
2310 let v = buf.get_u8();
2311 Ok(SqlValue::TinyInt(v))
2312 }
2313 0x32 => {
2314 buf.advance(prop_count);
2316 if data_len < 1 {
2317 return Ok(SqlValue::Null);
2318 }
2319 let v = buf.get_u8();
2320 Ok(SqlValue::Bool(v != 0))
2321 }
2322 0x34 => {
2323 buf.advance(prop_count);
2325 if data_len < 2 {
2326 return Ok(SqlValue::Null);
2327 }
2328 let v = buf.get_i16_le();
2329 Ok(SqlValue::SmallInt(v))
2330 }
2331 0x38 => {
2332 buf.advance(prop_count);
2334 if data_len < 4 {
2335 return Ok(SqlValue::Null);
2336 }
2337 let v = buf.get_i32_le();
2338 Ok(SqlValue::Int(v))
2339 }
2340 0x7F => {
2341 buf.advance(prop_count);
2343 if data_len < 8 {
2344 return Ok(SqlValue::Null);
2345 }
2346 let v = buf.get_i64_le();
2347 Ok(SqlValue::BigInt(v))
2348 }
2349 0x6D => {
2350 let float_len = if prop_count >= 1 { buf.get_u8() } else { 8 };
2352 buf.advance(prop_count.saturating_sub(1));
2353
2354 if float_len == 4 && data_len >= 4 {
2355 let v = buf.get_f32_le();
2356 Ok(SqlValue::Float(v))
2357 } else if data_len >= 8 {
2358 let v = buf.get_f64_le();
2359 Ok(SqlValue::Double(v))
2360 } else {
2361 Ok(SqlValue::Null)
2362 }
2363 }
2364 0x6E => {
2365 let money_len = if prop_count >= 1 { buf.get_u8() } else { 8 };
2367 buf.advance(prop_count.saturating_sub(1));
2368
2369 if money_len == 4 && data_len >= 4 {
2370 let raw = buf.get_i32_le();
2371 let value = raw as f64 / 10000.0;
2372 Ok(SqlValue::Double(value))
2373 } else if data_len >= 8 {
2374 let high = buf.get_i32_le() as i64;
2375 let low = buf.get_u32_le() as i64;
2376 let raw = (high << 32) | low;
2377 let value = raw as f64 / 10000.0;
2378 Ok(SqlValue::Double(value))
2379 } else {
2380 Ok(SqlValue::Null)
2381 }
2382 }
2383 0x6F => {
2384 let dt_len = if prop_count >= 1 { buf.get_u8() } else { 8 };
2386 buf.advance(prop_count.saturating_sub(1));
2387
2388 #[cfg(feature = "chrono")]
2389 {
2390 use chrono::NaiveDate;
2391 if dt_len == 4 && data_len >= 4 {
2392 let days = buf.get_u16_le() as i64;
2394 let mins = buf.get_u16_le() as u32;
2395 let base = NaiveDate::from_ymd_opt(1900, 1, 1)
2396 .unwrap()
2397 .and_hms_opt(0, 0, 0)
2398 .unwrap();
2399 let dt = base
2400 + chrono::Duration::days(days)
2401 + chrono::Duration::minutes(mins as i64);
2402 Ok(SqlValue::DateTime(dt))
2403 } else if data_len >= 8 {
2404 let days = buf.get_i32_le() as i64;
2406 let ticks = buf.get_u32_le() as i64;
2407 let base = NaiveDate::from_ymd_opt(1900, 1, 1)
2408 .unwrap()
2409 .and_hms_opt(0, 0, 0)
2410 .unwrap();
2411 let millis = (ticks * 10) / 3;
2412 let dt = base
2413 + chrono::Duration::days(days)
2414 + chrono::Duration::milliseconds(millis);
2415 Ok(SqlValue::DateTime(dt))
2416 } else {
2417 Ok(SqlValue::Null)
2418 }
2419 }
2420 #[cfg(not(feature = "chrono"))]
2421 {
2422 buf.advance(data_len);
2423 Ok(SqlValue::Null)
2424 }
2425 }
2426 0x6A | 0x6C => {
2427 let _precision = if prop_count >= 1 { buf.get_u8() } else { 18 };
2429 let scale = if prop_count >= 2 { buf.get_u8() } else { 0 };
2430 buf.advance(prop_count.saturating_sub(2));
2431
2432 if data_len < 1 {
2433 return Ok(SqlValue::Null);
2434 }
2435
2436 let sign = buf.get_u8();
2437 let mantissa_len = data_len - 1;
2438
2439 if mantissa_len > 16 {
2440 buf.advance(mantissa_len);
2442 return Ok(SqlValue::Null);
2443 }
2444
2445 let mut mantissa_bytes = [0u8; 16];
2446 for i in 0..mantissa_len.min(16) {
2447 mantissa_bytes[i] = buf.get_u8();
2448 }
2449 let mantissa = u128::from_le_bytes(mantissa_bytes);
2450
2451 #[cfg(feature = "decimal")]
2452 {
2453 use rust_decimal::Decimal;
2454 if scale > 28 {
2455 let divisor = 10f64.powi(scale as i32);
2457 let value = (mantissa as f64) / divisor;
2458 let value = if sign == 0 { -value } else { value };
2459 Ok(SqlValue::Double(value))
2460 } else {
2461 let mut decimal =
2462 Decimal::from_i128_with_scale(mantissa as i128, scale as u32);
2463 if sign == 0 {
2464 decimal.set_sign_negative(true);
2465 }
2466 Ok(SqlValue::Decimal(decimal))
2467 }
2468 }
2469 #[cfg(not(feature = "decimal"))]
2470 {
2471 let divisor = 10f64.powi(scale as i32);
2472 let value = (mantissa as f64) / divisor;
2473 let value = if sign == 0 { -value } else { value };
2474 Ok(SqlValue::Double(value))
2475 }
2476 }
2477 0x24 => {
2478 buf.advance(prop_count);
2480 if data_len < 16 {
2481 return Ok(SqlValue::Null);
2482 }
2483 let mut guid_bytes = [0u8; 16];
2484 for byte in &mut guid_bytes {
2485 *byte = buf.get_u8();
2486 }
2487 Ok(SqlValue::Binary(bytes::Bytes::copy_from_slice(&guid_bytes)))
2488 }
2489 0x28 => {
2490 buf.advance(prop_count);
2492 #[cfg(feature = "chrono")]
2493 {
2494 if data_len < 3 {
2495 return Ok(SqlValue::Null);
2496 }
2497 let mut date_bytes = [0u8; 4];
2498 date_bytes[0] = buf.get_u8();
2499 date_bytes[1] = buf.get_u8();
2500 date_bytes[2] = buf.get_u8();
2501 let days = u32::from_le_bytes(date_bytes);
2502 let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
2503 let date = base + chrono::Duration::days(days as i64);
2504 Ok(SqlValue::Date(date))
2505 }
2506 #[cfg(not(feature = "chrono"))]
2507 {
2508 buf.advance(data_len);
2509 Ok(SqlValue::Null)
2510 }
2511 }
2512 0xA7 | 0x2F | 0x27 => {
2513 buf.advance(prop_count);
2515 if data_len == 0 {
2516 return Ok(SqlValue::String(String::new()));
2517 }
2518 let data = &buf[..data_len];
2519 let s = String::from_utf8_lossy(data).into_owned();
2520 buf.advance(data_len);
2521 Ok(SqlValue::String(s))
2522 }
2523 0xE7 | 0xEF => {
2524 buf.advance(prop_count);
2526 if data_len == 0 {
2527 return Ok(SqlValue::String(String::new()));
2528 }
2529 let utf16: Vec<u16> = buf[..data_len]
2531 .chunks_exact(2)
2532 .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
2533 .collect();
2534 buf.advance(data_len);
2535 let s = String::from_utf16(&utf16).map_err(|_| {
2536 Error::Protocol("invalid UTF-16 in SQL_VARIANT nvarchar".into())
2537 })?;
2538 Ok(SqlValue::String(s))
2539 }
2540 0xA5 | 0x2D | 0x25 => {
2541 buf.advance(prop_count);
2543 let data = bytes::Bytes::copy_from_slice(&buf[..data_len]);
2544 buf.advance(data_len);
2545 Ok(SqlValue::Binary(data))
2546 }
2547 _ => {
2548 buf.advance(prop_count);
2550 let data = bytes::Bytes::copy_from_slice(&buf[..data_len]);
2551 buf.advance(data_len);
2552 Ok(SqlValue::Binary(data))
2553 }
2554 }
2555 }
2556
2557 fn time_bytes_for_scale(scale: u8) -> usize {
2559 match scale {
2560 0..=2 => 3,
2561 3..=4 => 4,
2562 5..=7 => 5,
2563 _ => 5, }
2565 }
2566
2567 #[cfg(feature = "chrono")]
2569 fn intervals_to_time(intervals: u64, scale: u8) -> chrono::NaiveTime {
2570 let nanos = match scale {
2580 0 => intervals * 1_000_000_000,
2581 1 => intervals * 100_000_000,
2582 2 => intervals * 10_000_000,
2583 3 => intervals * 1_000_000,
2584 4 => intervals * 100_000,
2585 5 => intervals * 10_000,
2586 6 => intervals * 1_000,
2587 7 => intervals * 100,
2588 _ => intervals * 100,
2589 };
2590
2591 let secs = (nanos / 1_000_000_000) as u32;
2592 let nano_part = (nanos % 1_000_000_000) as u32;
2593
2594 chrono::NaiveTime::from_num_seconds_from_midnight_opt(secs, nano_part)
2595 .unwrap_or_else(|| chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap())
2596 }
2597
2598 async fn read_execute_result(&mut self) -> Result<u64> {
2600 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
2601
2602 let message = match connection {
2603 ConnectionHandle::Tls(conn) => conn
2604 .read_message()
2605 .await
2606 .map_err(|e| Error::Protocol(e.to_string()))?,
2607 ConnectionHandle::TlsPrelogin(conn) => conn
2608 .read_message()
2609 .await
2610 .map_err(|e| Error::Protocol(e.to_string()))?,
2611 ConnectionHandle::Plain(conn) => conn
2612 .read_message()
2613 .await
2614 .map_err(|e| Error::Protocol(e.to_string()))?,
2615 }
2616 .ok_or(Error::ConnectionClosed)?;
2617
2618 let mut parser = TokenParser::new(message.payload);
2619 let mut rows_affected = 0u64;
2620 let mut current_metadata: Option<ColMetaData> = None;
2621
2622 loop {
2623 let token = parser
2625 .next_token_with_metadata(current_metadata.as_ref())
2626 .map_err(|e| Error::Protocol(e.to_string()))?;
2627
2628 let Some(token) = token else {
2629 break;
2630 };
2631
2632 match token {
2633 Token::ColMetaData(meta) => {
2634 current_metadata = Some(meta);
2636 }
2637 Token::Row(_) | Token::NbcRow(_) => {
2638 }
2641 Token::Done(done) => {
2642 if done.status.error {
2643 return Err(Error::Query("execution failed".to_string()));
2644 }
2645 if done.status.count {
2646 rows_affected += done.row_count;
2648 }
2649 if !done.status.more {
2652 break;
2653 }
2654 }
2655 Token::DoneProc(done) => {
2656 if done.status.count {
2657 rows_affected += done.row_count;
2658 }
2659 }
2660 Token::DoneInProc(done) => {
2661 if done.status.count {
2662 rows_affected += done.row_count;
2663 }
2664 }
2665 Token::Error(err) => {
2666 return Err(Error::Server {
2667 number: err.number,
2668 state: err.state,
2669 class: err.class,
2670 message: err.message.clone(),
2671 server: if err.server.is_empty() {
2672 None
2673 } else {
2674 Some(err.server.clone())
2675 },
2676 procedure: if err.procedure.is_empty() {
2677 None
2678 } else {
2679 Some(err.procedure.clone())
2680 },
2681 line: err.line as u32,
2682 });
2683 }
2684 Token::Info(info) => {
2685 tracing::info!(
2686 number = info.number,
2687 message = %info.message,
2688 "server info message"
2689 );
2690 }
2691 Token::EnvChange(env) => {
2692 Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
2696 }
2697 _ => {}
2698 }
2699 }
2700
2701 Ok(rows_affected)
2702 }
2703
2704 async fn read_transaction_begin_result(&mut self) -> Result<u64> {
2710 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
2711
2712 let message = match connection {
2713 ConnectionHandle::Tls(conn) => conn
2714 .read_message()
2715 .await
2716 .map_err(|e| Error::Protocol(e.to_string()))?,
2717 ConnectionHandle::TlsPrelogin(conn) => conn
2718 .read_message()
2719 .await
2720 .map_err(|e| Error::Protocol(e.to_string()))?,
2721 ConnectionHandle::Plain(conn) => conn
2722 .read_message()
2723 .await
2724 .map_err(|e| Error::Protocol(e.to_string()))?,
2725 }
2726 .ok_or(Error::ConnectionClosed)?;
2727
2728 let mut parser = TokenParser::new(message.payload);
2729 let mut transaction_descriptor: u64 = 0;
2730
2731 loop {
2732 let token = parser
2733 .next_token()
2734 .map_err(|e| Error::Protocol(e.to_string()))?;
2735
2736 let Some(token) = token else {
2737 break;
2738 };
2739
2740 match token {
2741 Token::EnvChange(env) => {
2742 if env.env_type == EnvChangeType::BeginTransaction {
2743 if let tds_protocol::token::EnvChangeValue::Binary(ref data) = env.new_value
2746 {
2747 if data.len() >= 8 {
2748 transaction_descriptor = u64::from_le_bytes([
2749 data[0], data[1], data[2], data[3], data[4], data[5], data[6],
2750 data[7],
2751 ]);
2752 tracing::debug!(
2753 transaction_descriptor =
2754 format!("0x{:016X}", transaction_descriptor),
2755 "transaction begun"
2756 );
2757 }
2758 }
2759 }
2760 }
2761 Token::Done(done) => {
2762 if done.status.error {
2763 return Err(Error::Query("BEGIN TRANSACTION failed".to_string()));
2764 }
2765 break;
2766 }
2767 Token::Error(err) => {
2768 return Err(Error::Server {
2769 number: err.number,
2770 state: err.state,
2771 class: err.class,
2772 message: err.message.clone(),
2773 server: if err.server.is_empty() {
2774 None
2775 } else {
2776 Some(err.server.clone())
2777 },
2778 procedure: if err.procedure.is_empty() {
2779 None
2780 } else {
2781 Some(err.procedure.clone())
2782 },
2783 line: err.line as u32,
2784 });
2785 }
2786 Token::Info(info) => {
2787 tracing::info!(
2788 number = info.number,
2789 message = %info.message,
2790 "server info message"
2791 );
2792 }
2793 _ => {}
2794 }
2795 }
2796
2797 Ok(transaction_descriptor)
2798 }
2799}
2800
2801impl Client<Ready> {
2802 pub async fn query<'a>(
2827 &'a mut self,
2828 sql: &str,
2829 params: &[&(dyn crate::ToSql + Sync)],
2830 ) -> Result<QueryStream<'a>> {
2831 tracing::debug!(sql = sql, params_count = params.len(), "executing query");
2832
2833 #[cfg(feature = "otel")]
2834 let instrumentation = self.instrumentation.clone();
2835 #[cfg(feature = "otel")]
2836 let mut span = instrumentation.query_span(sql);
2837
2838 let result = async {
2839 if params.is_empty() {
2840 self.send_sql_batch(sql).await?;
2842 } else {
2843 let rpc_params = Self::convert_params(params)?;
2845 let rpc = RpcRequest::execute_sql(sql, rpc_params);
2846 self.send_rpc(&rpc).await?;
2847 }
2848
2849 self.read_query_response().await
2851 }
2852 .await;
2853
2854 #[cfg(feature = "otel")]
2855 match &result {
2856 Ok(_) => InstrumentationContext::record_success(&mut span, None),
2857 Err(e) => InstrumentationContext::record_error(&mut span, e),
2858 }
2859
2860 #[cfg(feature = "otel")]
2862 drop(span);
2863
2864 let (columns, rows) = result?;
2865 Ok(QueryStream::new(columns, rows))
2866 }
2867
2868 pub async fn query_with_timeout<'a>(
2895 &'a mut self,
2896 sql: &str,
2897 params: &[&(dyn crate::ToSql + Sync)],
2898 timeout_duration: std::time::Duration,
2899 ) -> Result<QueryStream<'a>> {
2900 timeout(timeout_duration, self.query(sql, params))
2901 .await
2902 .map_err(|_| Error::CommandTimeout)?
2903 }
2904
2905 pub async fn query_multiple<'a>(
2932 &'a mut self,
2933 sql: &str,
2934 params: &[&(dyn crate::ToSql + Sync)],
2935 ) -> Result<MultiResultStream<'a>> {
2936 tracing::debug!(
2937 sql = sql,
2938 params_count = params.len(),
2939 "executing multi-result query"
2940 );
2941
2942 if params.is_empty() {
2943 self.send_sql_batch(sql).await?;
2945 } else {
2946 let rpc_params = Self::convert_params(params)?;
2948 let rpc = RpcRequest::execute_sql(sql, rpc_params);
2949 self.send_rpc(&rpc).await?;
2950 }
2951
2952 let result_sets = self.read_multi_result_response().await?;
2954 Ok(MultiResultStream::new(result_sets))
2955 }
2956
2957 async fn read_multi_result_response(&mut self) -> Result<Vec<crate::stream::ResultSet>> {
2959 let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
2960
2961 let message = match connection {
2962 ConnectionHandle::Tls(conn) => conn
2963 .read_message()
2964 .await
2965 .map_err(|e| Error::Protocol(e.to_string()))?,
2966 ConnectionHandle::TlsPrelogin(conn) => conn
2967 .read_message()
2968 .await
2969 .map_err(|e| Error::Protocol(e.to_string()))?,
2970 ConnectionHandle::Plain(conn) => conn
2971 .read_message()
2972 .await
2973 .map_err(|e| Error::Protocol(e.to_string()))?,
2974 }
2975 .ok_or(Error::ConnectionClosed)?;
2976
2977 let mut parser = TokenParser::new(message.payload);
2978 let mut result_sets: Vec<crate::stream::ResultSet> = Vec::new();
2979 let mut current_columns: Vec<crate::row::Column> = Vec::new();
2980 let mut current_rows: Vec<crate::row::Row> = Vec::new();
2981 let mut protocol_metadata: Option<ColMetaData> = None;
2982
2983 loop {
2984 let token = parser
2985 .next_token_with_metadata(protocol_metadata.as_ref())
2986 .map_err(|e| Error::Protocol(e.to_string()))?;
2987
2988 let Some(token) = token else {
2989 break;
2990 };
2991
2992 match token {
2993 Token::ColMetaData(meta) => {
2994 if !current_columns.is_empty() {
2996 result_sets.push(crate::stream::ResultSet::new(
2997 std::mem::take(&mut current_columns),
2998 std::mem::take(&mut current_rows),
2999 ));
3000 }
3001
3002 current_columns = meta
3004 .columns
3005 .iter()
3006 .enumerate()
3007 .map(|(i, col)| {
3008 let type_name = format!("{:?}", col.type_id);
3009 let mut column = crate::row::Column::new(&col.name, i, type_name)
3010 .with_nullable(col.flags & 0x01 != 0);
3011
3012 if let Some(max_len) = col.type_info.max_length {
3013 column = column.with_max_length(max_len);
3014 }
3015 if let (Some(prec), Some(scale)) =
3016 (col.type_info.precision, col.type_info.scale)
3017 {
3018 column = column.with_precision_scale(prec, scale);
3019 }
3020 column
3021 })
3022 .collect();
3023
3024 tracing::debug!(
3025 columns = current_columns.len(),
3026 result_set = result_sets.len(),
3027 "received column metadata for result set"
3028 );
3029 protocol_metadata = Some(meta);
3030 }
3031 Token::Row(raw_row) => {
3032 if let Some(ref meta) = protocol_metadata {
3033 let row = Self::convert_raw_row(&raw_row, meta, ¤t_columns)?;
3034 current_rows.push(row);
3035 }
3036 }
3037 Token::NbcRow(nbc_row) => {
3038 if let Some(ref meta) = protocol_metadata {
3039 let row = Self::convert_nbc_row(&nbc_row, meta, ¤t_columns)?;
3040 current_rows.push(row);
3041 }
3042 }
3043 Token::Error(err) => {
3044 return Err(Error::Server {
3045 number: err.number,
3046 state: err.state,
3047 class: err.class,
3048 message: err.message.clone(),
3049 server: if err.server.is_empty() {
3050 None
3051 } else {
3052 Some(err.server.clone())
3053 },
3054 procedure: if err.procedure.is_empty() {
3055 None
3056 } else {
3057 Some(err.procedure.clone())
3058 },
3059 line: err.line as u32,
3060 });
3061 }
3062 Token::Done(done) => {
3063 if done.status.error {
3064 return Err(Error::Query("query failed".to_string()));
3065 }
3066
3067 if !current_columns.is_empty() {
3069 result_sets.push(crate::stream::ResultSet::new(
3070 std::mem::take(&mut current_columns),
3071 std::mem::take(&mut current_rows),
3072 ));
3073 protocol_metadata = None;
3074 }
3075
3076 if !done.status.more {
3078 tracing::debug!(result_sets = result_sets.len(), "all result sets parsed");
3079 break;
3080 }
3081 }
3082 Token::DoneInProc(done) => {
3083 if done.status.error {
3084 return Err(Error::Query("query failed".to_string()));
3085 }
3086
3087 if !current_columns.is_empty() {
3089 result_sets.push(crate::stream::ResultSet::new(
3090 std::mem::take(&mut current_columns),
3091 std::mem::take(&mut current_rows),
3092 ));
3093 protocol_metadata = None;
3094 }
3095
3096 if !done.status.more {
3098 }
3100 }
3101 Token::DoneProc(done) => {
3102 if done.status.error {
3103 return Err(Error::Query("query failed".to_string()));
3104 }
3105 }
3107 Token::Info(info) => {
3108 tracing::debug!(
3109 number = info.number,
3110 message = %info.message,
3111 "server info message"
3112 );
3113 }
3114 _ => {}
3115 }
3116 }
3117
3118 if !current_columns.is_empty() {
3120 result_sets.push(crate::stream::ResultSet::new(current_columns, current_rows));
3121 }
3122
3123 Ok(result_sets)
3124 }
3125
3126 pub async fn execute(
3130 &mut self,
3131 sql: &str,
3132 params: &[&(dyn crate::ToSql + Sync)],
3133 ) -> Result<u64> {
3134 tracing::debug!(
3135 sql = sql,
3136 params_count = params.len(),
3137 "executing statement"
3138 );
3139
3140 #[cfg(feature = "otel")]
3141 let instrumentation = self.instrumentation.clone();
3142 #[cfg(feature = "otel")]
3143 let mut span = instrumentation.query_span(sql);
3144
3145 let result = async {
3146 if params.is_empty() {
3147 self.send_sql_batch(sql).await?;
3149 } else {
3150 let rpc_params = Self::convert_params(params)?;
3152 let rpc = RpcRequest::execute_sql(sql, rpc_params);
3153 self.send_rpc(&rpc).await?;
3154 }
3155
3156 self.read_execute_result().await
3158 }
3159 .await;
3160
3161 #[cfg(feature = "otel")]
3162 match &result {
3163 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
3164 Err(e) => InstrumentationContext::record_error(&mut span, e),
3165 }
3166
3167 #[cfg(feature = "otel")]
3169 drop(span);
3170
3171 result
3172 }
3173
3174 pub async fn execute_with_timeout(
3201 &mut self,
3202 sql: &str,
3203 params: &[&(dyn crate::ToSql + Sync)],
3204 timeout_duration: std::time::Duration,
3205 ) -> Result<u64> {
3206 timeout(timeout_duration, self.execute(sql, params))
3207 .await
3208 .map_err(|_| Error::CommandTimeout)?
3209 }
3210
3211 pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
3218 tracing::debug!("beginning transaction");
3219
3220 #[cfg(feature = "otel")]
3221 let instrumentation = self.instrumentation.clone();
3222 #[cfg(feature = "otel")]
3223 let mut span = instrumentation.transaction_span("BEGIN");
3224
3225 let result = async {
3227 self.send_sql_batch("BEGIN TRANSACTION").await?;
3228 self.read_transaction_begin_result().await
3229 }
3230 .await;
3231
3232 #[cfg(feature = "otel")]
3233 match &result {
3234 Ok(_) => InstrumentationContext::record_success(&mut span, None),
3235 Err(e) => InstrumentationContext::record_error(&mut span, e),
3236 }
3237
3238 #[cfg(feature = "otel")]
3240 drop(span);
3241
3242 let transaction_descriptor = result?;
3243
3244 Ok(Client {
3245 config: self.config,
3246 _state: PhantomData,
3247 connection: self.connection,
3248 server_version: self.server_version,
3249 current_database: self.current_database,
3250 statement_cache: self.statement_cache,
3251 transaction_descriptor, #[cfg(feature = "otel")]
3253 instrumentation: self.instrumentation,
3254 })
3255 }
3256
3257 pub async fn begin_transaction_with_isolation(
3272 mut self,
3273 isolation_level: crate::transaction::IsolationLevel,
3274 ) -> Result<Client<InTransaction>> {
3275 tracing::debug!(
3276 isolation_level = %isolation_level.name(),
3277 "beginning transaction with isolation level"
3278 );
3279
3280 #[cfg(feature = "otel")]
3281 let instrumentation = self.instrumentation.clone();
3282 #[cfg(feature = "otel")]
3283 let mut span = instrumentation.transaction_span("BEGIN");
3284
3285 let result = async {
3287 self.send_sql_batch(isolation_level.as_sql()).await?;
3288 self.read_execute_result().await?;
3289
3290 self.send_sql_batch("BEGIN TRANSACTION").await?;
3292 self.read_transaction_begin_result().await
3293 }
3294 .await;
3295
3296 #[cfg(feature = "otel")]
3297 match &result {
3298 Ok(_) => InstrumentationContext::record_success(&mut span, None),
3299 Err(e) => InstrumentationContext::record_error(&mut span, e),
3300 }
3301
3302 #[cfg(feature = "otel")]
3303 drop(span);
3304
3305 let transaction_descriptor = result?;
3306
3307 Ok(Client {
3308 config: self.config,
3309 _state: PhantomData,
3310 connection: self.connection,
3311 server_version: self.server_version,
3312 current_database: self.current_database,
3313 statement_cache: self.statement_cache,
3314 transaction_descriptor,
3315 #[cfg(feature = "otel")]
3316 instrumentation: self.instrumentation,
3317 })
3318 }
3319
3320 pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
3325 tracing::debug!(sql = sql, "executing simple query");
3326
3327 self.send_sql_batch(sql).await?;
3329
3330 let _ = self.read_execute_result().await?;
3332
3333 Ok(())
3334 }
3335
3336 pub async fn close(self) -> Result<()> {
3338 tracing::debug!("closing connection");
3339 Ok(())
3340 }
3341
3342 #[must_use]
3344 pub fn database(&self) -> Option<&str> {
3345 self.config.database.as_deref()
3346 }
3347
3348 #[must_use]
3350 pub fn host(&self) -> &str {
3351 &self.config.host
3352 }
3353
3354 #[must_use]
3356 pub fn port(&self) -> u16 {
3357 self.config.port
3358 }
3359
3360 #[must_use]
3379 pub fn is_in_transaction(&self) -> bool {
3380 self.transaction_descriptor != 0
3381 }
3382
3383 #[must_use]
3405 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
3406 let connection = self
3407 .connection
3408 .as_ref()
3409 .expect("connection should be present");
3410 match connection {
3411 ConnectionHandle::Tls(conn) => {
3412 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
3413 }
3414 ConnectionHandle::TlsPrelogin(conn) => {
3415 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
3416 }
3417 ConnectionHandle::Plain(conn) => {
3418 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
3419 }
3420 }
3421 }
3422}
3423
3424impl Client<InTransaction> {
3425 pub async fn query<'a>(
3429 &'a mut self,
3430 sql: &str,
3431 params: &[&(dyn crate::ToSql + Sync)],
3432 ) -> Result<QueryStream<'a>> {
3433 tracing::debug!(
3434 sql = sql,
3435 params_count = params.len(),
3436 "executing query in transaction"
3437 );
3438
3439 #[cfg(feature = "otel")]
3440 let instrumentation = self.instrumentation.clone();
3441 #[cfg(feature = "otel")]
3442 let mut span = instrumentation.query_span(sql);
3443
3444 let result = async {
3445 if params.is_empty() {
3446 self.send_sql_batch(sql).await?;
3448 } else {
3449 let rpc_params = Self::convert_params(params)?;
3451 let rpc = RpcRequest::execute_sql(sql, rpc_params);
3452 self.send_rpc(&rpc).await?;
3453 }
3454
3455 self.read_query_response().await
3457 }
3458 .await;
3459
3460 #[cfg(feature = "otel")]
3461 match &result {
3462 Ok(_) => InstrumentationContext::record_success(&mut span, None),
3463 Err(e) => InstrumentationContext::record_error(&mut span, e),
3464 }
3465
3466 #[cfg(feature = "otel")]
3468 drop(span);
3469
3470 let (columns, rows) = result?;
3471 Ok(QueryStream::new(columns, rows))
3472 }
3473
3474 pub async fn execute(
3478 &mut self,
3479 sql: &str,
3480 params: &[&(dyn crate::ToSql + Sync)],
3481 ) -> Result<u64> {
3482 tracing::debug!(
3483 sql = sql,
3484 params_count = params.len(),
3485 "executing statement in transaction"
3486 );
3487
3488 #[cfg(feature = "otel")]
3489 let instrumentation = self.instrumentation.clone();
3490 #[cfg(feature = "otel")]
3491 let mut span = instrumentation.query_span(sql);
3492
3493 let result = async {
3494 if params.is_empty() {
3495 self.send_sql_batch(sql).await?;
3497 } else {
3498 let rpc_params = Self::convert_params(params)?;
3500 let rpc = RpcRequest::execute_sql(sql, rpc_params);
3501 self.send_rpc(&rpc).await?;
3502 }
3503
3504 self.read_execute_result().await
3506 }
3507 .await;
3508
3509 #[cfg(feature = "otel")]
3510 match &result {
3511 Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
3512 Err(e) => InstrumentationContext::record_error(&mut span, e),
3513 }
3514
3515 #[cfg(feature = "otel")]
3517 drop(span);
3518
3519 result
3520 }
3521
3522 pub async fn query_with_timeout<'a>(
3526 &'a mut self,
3527 sql: &str,
3528 params: &[&(dyn crate::ToSql + Sync)],
3529 timeout_duration: std::time::Duration,
3530 ) -> Result<QueryStream<'a>> {
3531 timeout(timeout_duration, self.query(sql, params))
3532 .await
3533 .map_err(|_| Error::CommandTimeout)?
3534 }
3535
3536 pub async fn execute_with_timeout(
3540 &mut self,
3541 sql: &str,
3542 params: &[&(dyn crate::ToSql + Sync)],
3543 timeout_duration: std::time::Duration,
3544 ) -> Result<u64> {
3545 timeout(timeout_duration, self.execute(sql, params))
3546 .await
3547 .map_err(|_| Error::CommandTimeout)?
3548 }
3549
3550 pub async fn commit(mut self) -> Result<Client<Ready>> {
3554 tracing::debug!("committing transaction");
3555
3556 #[cfg(feature = "otel")]
3557 let instrumentation = self.instrumentation.clone();
3558 #[cfg(feature = "otel")]
3559 let mut span = instrumentation.transaction_span("COMMIT");
3560
3561 let result = async {
3563 self.send_sql_batch("COMMIT TRANSACTION").await?;
3564 self.read_execute_result().await
3565 }
3566 .await;
3567
3568 #[cfg(feature = "otel")]
3569 match &result {
3570 Ok(_) => InstrumentationContext::record_success(&mut span, None),
3571 Err(e) => InstrumentationContext::record_error(&mut span, e),
3572 }
3573
3574 #[cfg(feature = "otel")]
3576 drop(span);
3577
3578 result?;
3579
3580 Ok(Client {
3581 config: self.config,
3582 _state: PhantomData,
3583 connection: self.connection,
3584 server_version: self.server_version,
3585 current_database: self.current_database,
3586 statement_cache: self.statement_cache,
3587 transaction_descriptor: 0, #[cfg(feature = "otel")]
3589 instrumentation: self.instrumentation,
3590 })
3591 }
3592
3593 pub async fn rollback(mut self) -> Result<Client<Ready>> {
3597 tracing::debug!("rolling back transaction");
3598
3599 #[cfg(feature = "otel")]
3600 let instrumentation = self.instrumentation.clone();
3601 #[cfg(feature = "otel")]
3602 let mut span = instrumentation.transaction_span("ROLLBACK");
3603
3604 let result = async {
3606 self.send_sql_batch("ROLLBACK TRANSACTION").await?;
3607 self.read_execute_result().await
3608 }
3609 .await;
3610
3611 #[cfg(feature = "otel")]
3612 match &result {
3613 Ok(_) => InstrumentationContext::record_success(&mut span, None),
3614 Err(e) => InstrumentationContext::record_error(&mut span, e),
3615 }
3616
3617 #[cfg(feature = "otel")]
3619 drop(span);
3620
3621 result?;
3622
3623 Ok(Client {
3624 config: self.config,
3625 _state: PhantomData,
3626 connection: self.connection,
3627 server_version: self.server_version,
3628 current_database: self.current_database,
3629 statement_cache: self.statement_cache,
3630 transaction_descriptor: 0, #[cfg(feature = "otel")]
3632 instrumentation: self.instrumentation,
3633 })
3634 }
3635
3636 pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
3653 validate_identifier(name)?;
3654 tracing::debug!(name = name, "creating savepoint");
3655
3656 let sql = format!("SAVE TRANSACTION {}", name);
3659 self.send_sql_batch(&sql).await?;
3660 self.read_execute_result().await?;
3661
3662 Ok(SavePoint::new(name.to_string()))
3663 }
3664
3665 pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
3680 tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
3681
3682 let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
3685 self.send_sql_batch(&sql).await?;
3686 self.read_execute_result().await?;
3687
3688 Ok(())
3689 }
3690
3691 pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
3697 tracing::debug!(name = savepoint.name(), "releasing savepoint");
3698
3699 drop(savepoint);
3703 Ok(())
3704 }
3705
3706 #[must_use]
3710 pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
3711 let connection = self
3712 .connection
3713 .as_ref()
3714 .expect("connection should be present");
3715 match connection {
3716 ConnectionHandle::Tls(conn) => {
3717 crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
3718 }
3719 ConnectionHandle::TlsPrelogin(conn) => {
3720 crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
3721 }
3722 ConnectionHandle::Plain(conn) => {
3723 crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
3724 }
3725 }
3726 }
3727}
3728
3729fn validate_identifier(name: &str) -> Result<()> {
3731 use once_cell::sync::Lazy;
3732 use regex::Regex;
3733
3734 static IDENTIFIER_RE: Lazy<Regex> =
3735 Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_@#$]{0,127}$").unwrap());
3736
3737 if name.is_empty() {
3738 return Err(Error::InvalidIdentifier(
3739 "identifier cannot be empty".into(),
3740 ));
3741 }
3742
3743 if !IDENTIFIER_RE.is_match(name) {
3744 return Err(Error::InvalidIdentifier(format!(
3745 "invalid identifier '{}': must start with letter/underscore, \
3746 contain only alphanumerics/_/@/#/$, and be 1-128 characters",
3747 name
3748 )));
3749 }
3750
3751 Ok(())
3752}
3753
3754impl<S: ConnectionState> std::fmt::Debug for Client<S> {
3755 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3756 f.debug_struct("Client")
3757 .field("host", &self.config.host)
3758 .field("port", &self.config.port)
3759 .field("database", &self.config.database)
3760 .finish()
3761 }
3762}
3763
3764#[cfg(test)]
3765#[allow(clippy::unwrap_used, clippy::panic)]
3766mod tests {
3767 use super::*;
3768
3769 #[test]
3770 fn test_validate_identifier_valid() {
3771 assert!(validate_identifier("my_table").is_ok());
3772 assert!(validate_identifier("Table123").is_ok());
3773 assert!(validate_identifier("_private").is_ok());
3774 assert!(validate_identifier("sp_test").is_ok());
3775 }
3776
3777 #[test]
3778 fn test_validate_identifier_invalid() {
3779 assert!(validate_identifier("").is_err());
3780 assert!(validate_identifier("123abc").is_err());
3781 assert!(validate_identifier("table-name").is_err());
3782 assert!(validate_identifier("table name").is_err());
3783 assert!(validate_identifier("table;DROP TABLE users").is_err());
3784 }
3785
3786 fn make_plp_data(total_len: u64, chunks: &[&[u8]]) -> Vec<u8> {
3795 let mut data = Vec::new();
3796 data.extend_from_slice(&total_len.to_le_bytes());
3798 for chunk in chunks {
3800 let len = chunk.len() as u32;
3801 data.extend_from_slice(&len.to_le_bytes());
3802 data.extend_from_slice(chunk);
3803 }
3804 data.extend_from_slice(&0u32.to_le_bytes());
3806 data
3807 }
3808
3809 #[test]
3810 fn test_parse_plp_nvarchar_simple() {
3811 let utf16_data = [0x48, 0x00, 0x65, 0x00, 0x6C, 0x00, 0x6C, 0x00, 0x6F, 0x00];
3813 let plp = make_plp_data(10, &[&utf16_data]);
3814 let mut buf: &[u8] = &plp;
3815
3816 let result = Client::<Ready>::parse_plp_nvarchar(&mut buf).unwrap();
3817 match result {
3818 mssql_types::SqlValue::String(s) => assert_eq!(s, "Hello"),
3819 _ => panic!("expected String, got {:?}", result),
3820 }
3821 }
3822
3823 #[test]
3824 fn test_parse_plp_nvarchar_null() {
3825 let plp = 0xFFFFFFFFFFFFFFFFu64.to_le_bytes();
3827 let mut buf: &[u8] = &plp;
3828
3829 let result = Client::<Ready>::parse_plp_nvarchar(&mut buf).unwrap();
3830 assert!(matches!(result, mssql_types::SqlValue::Null));
3831 }
3832
3833 #[test]
3834 fn test_parse_plp_nvarchar_empty() {
3835 let plp = make_plp_data(0, &[]);
3837 let mut buf: &[u8] = &plp;
3838
3839 let result = Client::<Ready>::parse_plp_nvarchar(&mut buf).unwrap();
3840 match result {
3841 mssql_types::SqlValue::String(s) => assert_eq!(s, ""),
3842 _ => panic!("expected empty String"),
3843 }
3844 }
3845
3846 #[test]
3847 fn test_parse_plp_nvarchar_multi_chunk() {
3848 let chunk1 = [0x48, 0x00, 0x65, 0x00, 0x6C, 0x00]; let chunk2 = [0x6C, 0x00, 0x6F, 0x00]; let plp = make_plp_data(10, &[&chunk1, &chunk2]);
3852 let mut buf: &[u8] = &plp;
3853
3854 let result = Client::<Ready>::parse_plp_nvarchar(&mut buf).unwrap();
3855 match result {
3856 mssql_types::SqlValue::String(s) => assert_eq!(s, "Hello"),
3857 _ => panic!("expected String"),
3858 }
3859 }
3860
3861 #[test]
3862 fn test_parse_plp_varchar_simple() {
3863 let data = b"Hello World";
3864 let plp = make_plp_data(11, &[data]);
3865 let mut buf: &[u8] = &plp;
3866
3867 let result = Client::<Ready>::parse_plp_varchar(&mut buf).unwrap();
3868 match result {
3869 mssql_types::SqlValue::String(s) => assert_eq!(s, "Hello World"),
3870 _ => panic!("expected String"),
3871 }
3872 }
3873
3874 #[test]
3875 fn test_parse_plp_varchar_null() {
3876 let plp = 0xFFFFFFFFFFFFFFFFu64.to_le_bytes();
3877 let mut buf: &[u8] = &plp;
3878
3879 let result = Client::<Ready>::parse_plp_varchar(&mut buf).unwrap();
3880 assert!(matches!(result, mssql_types::SqlValue::Null));
3881 }
3882
3883 #[test]
3884 fn test_parse_plp_varbinary_simple() {
3885 let data = [0x01, 0x02, 0x03, 0x04, 0x05];
3886 let plp = make_plp_data(5, &[&data]);
3887 let mut buf: &[u8] = &plp;
3888
3889 let result = Client::<Ready>::parse_plp_varbinary(&mut buf).unwrap();
3890 match result {
3891 mssql_types::SqlValue::Binary(b) => assert_eq!(&b[..], &[0x01, 0x02, 0x03, 0x04, 0x05]),
3892 _ => panic!("expected Binary"),
3893 }
3894 }
3895
3896 #[test]
3897 fn test_parse_plp_varbinary_null() {
3898 let plp = 0xFFFFFFFFFFFFFFFFu64.to_le_bytes();
3899 let mut buf: &[u8] = &plp;
3900
3901 let result = Client::<Ready>::parse_plp_varbinary(&mut buf).unwrap();
3902 assert!(matches!(result, mssql_types::SqlValue::Null));
3903 }
3904
3905 #[test]
3906 fn test_parse_plp_varbinary_large() {
3907 let chunk1: Vec<u8> = (0..100u8).collect();
3909 let chunk2: Vec<u8> = (100..200u8).collect();
3910 let chunk3: Vec<u8> = (200..255u8).collect();
3911 let total_len = chunk1.len() + chunk2.len() + chunk3.len();
3912 let plp = make_plp_data(total_len as u64, &[&chunk1, &chunk2, &chunk3]);
3913 let mut buf: &[u8] = &plp;
3914
3915 let result = Client::<Ready>::parse_plp_varbinary(&mut buf).unwrap();
3916 match result {
3917 mssql_types::SqlValue::Binary(b) => {
3918 assert_eq!(b.len(), 255);
3919 for (i, &byte) in b.iter().enumerate() {
3921 assert_eq!(byte, i as u8);
3922 }
3923 }
3924 _ => panic!("expected Binary"),
3925 }
3926 }
3927
3928 use tds_protocol::token::{ColumnData, TypeInfo};
3936 use tds_protocol::types::TypeId;
3937
3938 fn make_nvarchar_int_row(nvarchar_value: &str, int_value: i32) -> Vec<u8> {
3941 let mut data = Vec::new();
3942
3943 let utf16: Vec<u16> = nvarchar_value.encode_utf16().collect();
3945 let byte_len = (utf16.len() * 2) as u16;
3946 data.extend_from_slice(&byte_len.to_le_bytes());
3947 for code_unit in utf16 {
3948 data.extend_from_slice(&code_unit.to_le_bytes());
3949 }
3950
3951 data.push(4); data.extend_from_slice(&int_value.to_le_bytes());
3954
3955 data
3956 }
3957
3958 #[test]
3959 fn test_parse_row_nvarchar_then_int() {
3960 let raw_data = make_nvarchar_int_row("World", 42);
3962
3963 let col0 = ColumnData {
3965 name: "greeting".to_string(),
3966 type_id: TypeId::NVarChar,
3967 col_type: 0xE7,
3968 flags: 0x01,
3969 user_type: 0,
3970 type_info: TypeInfo {
3971 max_length: Some(10), precision: None,
3973 scale: None,
3974 collation: None,
3975 },
3976 };
3977
3978 let col1 = ColumnData {
3979 name: "number".to_string(),
3980 type_id: TypeId::IntN,
3981 col_type: 0x26,
3982 flags: 0x01,
3983 user_type: 0,
3984 type_info: TypeInfo {
3985 max_length: Some(4),
3986 precision: None,
3987 scale: None,
3988 collation: None,
3989 },
3990 };
3991
3992 let mut buf: &[u8] = &raw_data;
3993
3994 let value0 = Client::<Ready>::parse_column_value(&mut buf, &col0).unwrap();
3996 match value0 {
3997 mssql_types::SqlValue::String(s) => assert_eq!(s, "World"),
3998 _ => panic!("expected String, got {:?}", value0),
3999 }
4000
4001 let value1 = Client::<Ready>::parse_column_value(&mut buf, &col1).unwrap();
4003 match value1 {
4004 mssql_types::SqlValue::Int(i) => assert_eq!(i, 42),
4005 _ => panic!("expected Int, got {:?}", value1),
4006 }
4007
4008 assert_eq!(buf.len(), 0, "buffer should be fully consumed");
4010 }
4011
4012 #[test]
4013 fn test_parse_row_multiple_types() {
4014 let mut data = Vec::new();
4016
4017 data.extend_from_slice(&0xFFFFu16.to_le_bytes());
4019
4020 data.push(4); data.extend_from_slice(&123i32.to_le_bytes());
4023
4024 let utf16: Vec<u16> = "Test".encode_utf16().collect();
4026 data.extend_from_slice(&((utf16.len() * 2) as u16).to_le_bytes());
4027 for code_unit in utf16 {
4028 data.extend_from_slice(&code_unit.to_le_bytes());
4029 }
4030
4031 data.push(0);
4033
4034 let col0 = ColumnData {
4036 name: "col0".to_string(),
4037 type_id: TypeId::NVarChar,
4038 col_type: 0xE7,
4039 flags: 0x01,
4040 user_type: 0,
4041 type_info: TypeInfo {
4042 max_length: Some(100),
4043 precision: None,
4044 scale: None,
4045 collation: None,
4046 },
4047 };
4048 let col1 = ColumnData {
4049 name: "col1".to_string(),
4050 type_id: TypeId::IntN,
4051 col_type: 0x26,
4052 flags: 0x01,
4053 user_type: 0,
4054 type_info: TypeInfo {
4055 max_length: Some(4),
4056 precision: None,
4057 scale: None,
4058 collation: None,
4059 },
4060 };
4061 let col2 = col0.clone();
4062 let col3 = col1.clone();
4063
4064 let mut buf: &[u8] = &data;
4065
4066 let v0 = Client::<Ready>::parse_column_value(&mut buf, &col0).unwrap();
4068 assert!(
4069 matches!(v0, mssql_types::SqlValue::Null),
4070 "col0 should be Null"
4071 );
4072
4073 let v1 = Client::<Ready>::parse_column_value(&mut buf, &col1).unwrap();
4074 assert!(
4075 matches!(v1, mssql_types::SqlValue::Int(123)),
4076 "col1 should be 123"
4077 );
4078
4079 let v2 = Client::<Ready>::parse_column_value(&mut buf, &col2).unwrap();
4080 match v2 {
4081 mssql_types::SqlValue::String(s) => assert_eq!(s, "Test"),
4082 _ => panic!("col2 should be 'Test'"),
4083 }
4084
4085 let v3 = Client::<Ready>::parse_column_value(&mut buf, &col3).unwrap();
4086 assert!(
4087 matches!(v3, mssql_types::SqlValue::Null),
4088 "col3 should be Null"
4089 );
4090
4091 assert_eq!(buf.len(), 0, "buffer should be fully consumed");
4093 }
4094
4095 #[test]
4096 fn test_parse_row_with_unicode() {
4097 let test_str = "Héllo Wörld 日本語";
4099 let mut data = Vec::new();
4100
4101 let utf16: Vec<u16> = test_str.encode_utf16().collect();
4103 data.extend_from_slice(&((utf16.len() * 2) as u16).to_le_bytes());
4104 for code_unit in utf16 {
4105 data.extend_from_slice(&code_unit.to_le_bytes());
4106 }
4107
4108 data.push(8); data.extend_from_slice(&9999999999i64.to_le_bytes());
4111
4112 let col0 = ColumnData {
4113 name: "text".to_string(),
4114 type_id: TypeId::NVarChar,
4115 col_type: 0xE7,
4116 flags: 0x01,
4117 user_type: 0,
4118 type_info: TypeInfo {
4119 max_length: Some(100),
4120 precision: None,
4121 scale: None,
4122 collation: None,
4123 },
4124 };
4125 let col1 = ColumnData {
4126 name: "num".to_string(),
4127 type_id: TypeId::IntN,
4128 col_type: 0x26,
4129 flags: 0x01,
4130 user_type: 0,
4131 type_info: TypeInfo {
4132 max_length: Some(8),
4133 precision: None,
4134 scale: None,
4135 collation: None,
4136 },
4137 };
4138
4139 let mut buf: &[u8] = &data;
4140
4141 let v0 = Client::<Ready>::parse_column_value(&mut buf, &col0).unwrap();
4142 match v0 {
4143 mssql_types::SqlValue::String(s) => assert_eq!(s, test_str),
4144 _ => panic!("expected String"),
4145 }
4146
4147 let v1 = Client::<Ready>::parse_column_value(&mut buf, &col1).unwrap();
4148 match v1 {
4149 mssql_types::SqlValue::BigInt(i) => assert_eq!(i, 9999999999),
4150 _ => panic!("expected BigInt"),
4151 }
4152
4153 assert_eq!(buf.len(), 0, "buffer should be fully consumed");
4154 }
4155}