mssql_client/
client.rs

1//! SQL Server client implementation.
2
3// Allow unwrap/expect for chrono date construction with known-valid constant dates
4// and for regex patterns that are compile-time constants
5#![allow(clippy::unwrap_used, clippy::expect_used, clippy::needless_range_loop)]
6
7use std::marker::PhantomData;
8use std::sync::Arc;
9
10use bytes::BytesMut;
11use mssql_codec::connection::Connection;
12use mssql_tls::{TlsConfig, TlsConnector, TlsNegotiationMode, TlsStream};
13use tds_protocol::login7::Login7;
14use tds_protocol::packet::{MAX_PACKET_SIZE, PacketType};
15use tds_protocol::prelogin::{EncryptionLevel, PreLogin};
16use tds_protocol::rpc::{RpcParam, RpcRequest, TypeInfo as RpcTypeInfo};
17use tds_protocol::token::{
18    ColMetaData, ColumnData, EnvChange, EnvChangeType, NbcRow, RawRow, Token, TokenParser,
19};
20use tds_protocol::tvp::{
21    TvpColumnDef as TvpWireColumnDef, TvpColumnFlags, TvpEncoder, TvpWireType, encode_tvp_bit,
22    encode_tvp_decimal, encode_tvp_float, encode_tvp_int, encode_tvp_null, encode_tvp_nvarchar,
23    encode_tvp_varbinary,
24};
25use tokio::net::TcpStream;
26use tokio::time::timeout;
27
28use crate::config::Config;
29use crate::error::{Error, Result};
30#[cfg(feature = "otel")]
31use crate::instrumentation::InstrumentationContext;
32use crate::state::{ConnectionState, Disconnected, InTransaction, Ready};
33use crate::statement_cache::StatementCache;
34use crate::stream::{MultiResultStream, QueryStream};
35use crate::transaction::SavePoint;
36
37/// SQL Server client with type-state connection management.
38///
39/// The generic parameter `S` represents the current connection state,
40/// ensuring at compile time that certain operations are only available
41/// in appropriate states.
42pub struct Client<S: ConnectionState> {
43    config: Config,
44    _state: PhantomData<S>,
45    /// The underlying connection (present only when connected)
46    connection: Option<ConnectionHandle>,
47    /// Server version from LoginAck (raw u32 TDS version)
48    server_version: Option<u32>,
49    /// Current database from EnvChange
50    current_database: Option<String>,
51    /// Prepared statement cache for query optimization
52    statement_cache: StatementCache,
53    /// Transaction descriptor from BeginTransaction EnvChange.
54    /// Per MS-TDS spec, this value must be included in ALL_HEADERS for subsequent
55    /// requests within an explicit transaction. 0 indicates auto-commit mode.
56    transaction_descriptor: u64,
57    /// OpenTelemetry instrumentation context (when otel feature is enabled)
58    #[cfg(feature = "otel")]
59    instrumentation: InstrumentationContext,
60}
61
62/// Internal connection handle wrapping the actual connection.
63///
64/// This is an enum to support different connection types:
65/// - TLS (TDS 8.0 strict mode)
66/// - TLS with PreLogin wrapping (TDS 7.x style)
67/// - Plain TCP (rare, for testing or internal networks)
68#[allow(dead_code)] // Connection will be used once query execution is implemented
69enum ConnectionHandle {
70    /// TLS connection (TDS 8.0 strict mode - TLS before any TDS traffic)
71    Tls(Connection<TlsStream<TcpStream>>),
72    /// TLS connection with PreLogin wrapping (TDS 7.x style)
73    TlsPrelogin(Connection<TlsStream<mssql_tls::TlsPreloginWrapper<TcpStream>>>),
74    /// Plain TCP connection (rare, for testing or internal networks)
75    Plain(Connection<TcpStream>),
76}
77
78impl Client<Disconnected> {
79    /// Connect to SQL Server.
80    ///
81    /// This establishes a connection, performs TLS negotiation (if required),
82    /// and authenticates with the server.
83    ///
84    /// # Example
85    ///
86    /// ```rust,ignore
87    /// let client = Client::connect(config).await?;
88    /// ```
89    pub async fn connect(config: Config) -> Result<Client<Ready>> {
90        let max_redirects = config.redirect.max_redirects;
91        let follow_redirects = config.redirect.follow_redirects;
92        let mut attempts = 0;
93        let mut current_config = config;
94
95        loop {
96            attempts += 1;
97            if attempts > max_redirects + 1 {
98                return Err(Error::TooManyRedirects { max: max_redirects });
99            }
100
101            match Self::try_connect(&current_config).await {
102                Ok(client) => return Ok(client),
103                Err(Error::Routing { host, port }) => {
104                    if !follow_redirects {
105                        return Err(Error::Routing { host, port });
106                    }
107                    tracing::info!(
108                        host = %host,
109                        port = port,
110                        attempt = attempts,
111                        max_redirects = max_redirects,
112                        "following Azure SQL routing redirect"
113                    );
114                    current_config = current_config.with_host(&host).with_port(port);
115                    continue;
116                }
117                Err(e) => return Err(e),
118            }
119        }
120    }
121
122    async fn try_connect(config: &Config) -> Result<Client<Ready>> {
123        tracing::info!(
124            host = %config.host,
125            port = config.port,
126            database = ?config.database,
127            "connecting to SQL Server"
128        );
129
130        let addr = format!("{}:{}", config.host, config.port);
131
132        // Step 1: Establish TCP connection
133        tracing::debug!("establishing TCP connection to {}", addr);
134        let tcp_stream = timeout(config.timeouts.connect_timeout, TcpStream::connect(&addr))
135            .await
136            .map_err(|_| Error::ConnectTimeout)?
137            .map_err(|e| Error::Io(Arc::new(e)))?;
138
139        // Enable TCP nodelay for better latency
140        tcp_stream
141            .set_nodelay(true)
142            .map_err(|e| Error::Io(Arc::new(e)))?;
143
144        // Determine TLS negotiation mode
145        let tls_mode = TlsNegotiationMode::from_encrypt_mode(config.strict_mode);
146
147        // Step 2: Handle TDS 8.0 strict mode (TLS before any TDS traffic)
148        if tls_mode.is_tls_first() {
149            return Self::connect_tds_8(config, tcp_stream).await;
150        }
151
152        // Step 3: TDS 7.x flow - PreLogin first, then TLS, then Login7
153        Self::connect_tds_7x(config, tcp_stream).await
154    }
155
156    /// Connect using TDS 8.0 strict mode.
157    ///
158    /// Flow: TCP -> TLS -> PreLogin (encrypted) -> Login7 (encrypted)
159    async fn connect_tds_8(config: &Config, tcp_stream: TcpStream) -> Result<Client<Ready>> {
160        tracing::debug!("using TDS 8.0 strict mode (TLS first)");
161
162        // Build TLS configuration
163        let tls_config = TlsConfig::new()
164            .strict_mode(true)
165            .trust_server_certificate(config.trust_server_certificate);
166
167        let tls_connector = TlsConnector::new(tls_config).map_err(|e| Error::Tls(e.to_string()))?;
168
169        // Perform TLS handshake before any TDS traffic
170        let tls_stream = timeout(
171            config.timeouts.tls_timeout,
172            tls_connector.connect(tcp_stream, &config.host),
173        )
174        .await
175        .map_err(|_| Error::TlsTimeout)?
176        .map_err(|e| Error::Tls(e.to_string()))?;
177
178        tracing::debug!("TLS handshake completed (strict mode)");
179
180        // Create connection wrapper
181        let mut connection = Connection::new(tls_stream);
182
183        // Send PreLogin (encrypted in strict mode)
184        let prelogin = Self::build_prelogin(config, EncryptionLevel::Required);
185        Self::send_prelogin(&mut connection, &prelogin).await?;
186        let _prelogin_response = Self::receive_prelogin(&mut connection).await?;
187
188        // Send Login7
189        let login = Self::build_login7(config);
190        Self::send_login7(&mut connection, &login).await?;
191
192        // Process login response
193        let (server_version, current_database, routing) =
194            Self::process_login_response(&mut connection).await?;
195
196        // Handle routing redirect
197        if let Some((host, port)) = routing {
198            return Err(Error::Routing { host, port });
199        }
200
201        Ok(Client {
202            config: config.clone(),
203            _state: PhantomData,
204            connection: Some(ConnectionHandle::Tls(connection)),
205            server_version,
206            current_database: current_database.clone(),
207            statement_cache: StatementCache::with_default_size(),
208            transaction_descriptor: 0, // Auto-commit mode initially
209            #[cfg(feature = "otel")]
210            instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
211                .with_database(current_database.unwrap_or_default()),
212        })
213    }
214
215    /// Connect using TDS 7.x flow.
216    ///
217    /// Flow: TCP -> PreLogin (clear) -> TLS -> Login7 (encrypted)
218    ///
219    /// Note: For TDS 7.x, the PreLogin exchange happens over raw TCP before
220    /// upgrading to TLS. We use low-level I/O for this initial exchange
221    /// since the Connection struct splits the stream immediately.
222    async fn connect_tds_7x(config: &Config, mut tcp_stream: TcpStream) -> Result<Client<Ready>> {
223        use bytes::BufMut;
224        use tds_protocol::packet::{PACKET_HEADER_SIZE, PacketHeader, PacketStatus};
225        use tokio::io::{AsyncReadExt, AsyncWriteExt};
226
227        tracing::debug!("using TDS 7.x flow (PreLogin first)");
228
229        // Build PreLogin packet
230        // Use EncryptionLevel::On if client wants encryption, Off otherwise
231        let client_encryption = if config.encrypt {
232            EncryptionLevel::On
233        } else {
234            EncryptionLevel::Off
235        };
236        let prelogin = Self::build_prelogin(config, client_encryption);
237        tracing::debug!(encryption = ?client_encryption, "sending PreLogin");
238        let prelogin_bytes = prelogin.encode();
239
240        // Manually create and send the PreLogin packet over raw TCP
241        let header = PacketHeader::new(
242            PacketType::PreLogin,
243            PacketStatus::END_OF_MESSAGE,
244            (PACKET_HEADER_SIZE + prelogin_bytes.len()) as u16,
245        );
246
247        let mut packet_buf = BytesMut::with_capacity(PACKET_HEADER_SIZE + prelogin_bytes.len());
248        header.encode(&mut packet_buf);
249        packet_buf.put_slice(&prelogin_bytes);
250
251        tcp_stream
252            .write_all(&packet_buf)
253            .await
254            .map_err(|e| Error::Io(Arc::new(e)))?;
255
256        // Read PreLogin response
257        let mut header_buf = [0u8; PACKET_HEADER_SIZE];
258        tcp_stream
259            .read_exact(&mut header_buf)
260            .await
261            .map_err(|e| Error::Io(Arc::new(e)))?;
262
263        let response_length = u16::from_be_bytes([header_buf[2], header_buf[3]]) as usize;
264        let payload_length = response_length.saturating_sub(PACKET_HEADER_SIZE);
265
266        let mut response_buf = vec![0u8; payload_length];
267        tcp_stream
268            .read_exact(&mut response_buf)
269            .await
270            .map_err(|e| Error::Io(Arc::new(e)))?;
271
272        let prelogin_response =
273            PreLogin::decode(&response_buf[..]).map_err(|e| Error::Protocol(e.to_string()))?;
274
275        // Check server encryption response
276        let server_encryption = prelogin_response.encryption;
277        tracing::debug!(encryption = ?server_encryption, "server encryption level");
278
279        // Determine negotiated encryption level (follows TDS 7.x rules)
280        // - NotSupported + NotSupported = NotSupported (no TLS at all)
281        // - Off + Off = Off (TLS for login only, then plain)
282        // - On + anything supported = On (full TLS)
283        // - Required = On with failure if not possible
284        let negotiated_encryption = match (client_encryption, server_encryption) {
285            (EncryptionLevel::NotSupported, EncryptionLevel::NotSupported) => {
286                EncryptionLevel::NotSupported
287            }
288            (EncryptionLevel::Off, EncryptionLevel::Off) => EncryptionLevel::Off,
289            (EncryptionLevel::On, EncryptionLevel::Off)
290            | (EncryptionLevel::On, EncryptionLevel::NotSupported) => {
291                return Err(Error::Protocol(
292                    "Server does not support requested encryption level".to_string(),
293                ));
294            }
295            _ => EncryptionLevel::On,
296        };
297
298        // TLS is required unless negotiated encryption is NotSupported
299        // Even with "Off", TLS is used to protect login credentials (per TDS 7.x spec)
300        let use_tls = negotiated_encryption != EncryptionLevel::NotSupported;
301
302        if use_tls {
303            // Upgrade to TLS with PreLogin wrapping (TDS 7.x style)
304            // In TDS 7.x, the TLS handshake is wrapped inside TDS PreLogin packets
305            let tls_config =
306                TlsConfig::new().trust_server_certificate(config.trust_server_certificate);
307
308            let tls_connector =
309                TlsConnector::new(tls_config).map_err(|e| Error::Tls(e.to_string()))?;
310
311            // Use PreLogin-wrapped TLS connection for TDS 7.x
312            let mut tls_stream = timeout(
313                config.timeouts.tls_timeout,
314                tls_connector.connect_with_prelogin(tcp_stream, &config.host),
315            )
316            .await
317            .map_err(|_| Error::TlsTimeout)?
318            .map_err(|e| Error::Tls(e.to_string()))?;
319
320            tracing::debug!("TLS handshake completed (PreLogin wrapped)");
321
322            // Check if we need full encryption or login-only encryption
323            let login_only_encryption = negotiated_encryption == EncryptionLevel::Off;
324
325            if login_only_encryption {
326                // Login-Only Encryption (ENCRYPT_OFF + ENCRYPT_OFF per MS-TDS spec):
327                // - Login7 is sent through TLS to protect credentials
328                // - Server responds in PLAINTEXT after receiving Login7
329                // - All subsequent communication is plaintext
330                //
331                // We must NOT use Connection with TLS stream because Connection splits
332                // the stream and we need to extract the underlying TCP afterward.
333                use tokio::io::AsyncWriteExt;
334
335                // Build and send Login7 directly through TLS
336                let login = Self::build_login7(config);
337                let login_payload = login.encode();
338
339                // Create TDS packet manually for Login7
340                let max_packet = MAX_PACKET_SIZE;
341                let max_payload = max_packet - PACKET_HEADER_SIZE;
342                let chunks: Vec<_> = login_payload.chunks(max_payload).collect();
343                let total_chunks = chunks.len();
344
345                for (i, chunk) in chunks.into_iter().enumerate() {
346                    let is_last = i == total_chunks - 1;
347                    let status = if is_last {
348                        PacketStatus::END_OF_MESSAGE
349                    } else {
350                        PacketStatus::NORMAL
351                    };
352
353                    let header = PacketHeader::new(
354                        PacketType::Tds7Login,
355                        status,
356                        (PACKET_HEADER_SIZE + chunk.len()) as u16,
357                    );
358
359                    let mut packet_buf = BytesMut::with_capacity(PACKET_HEADER_SIZE + chunk.len());
360                    header.encode(&mut packet_buf);
361                    packet_buf.put_slice(chunk);
362
363                    tls_stream
364                        .write_all(&packet_buf)
365                        .await
366                        .map_err(|e| Error::Io(Arc::new(e)))?;
367                }
368
369                // Flush TLS to ensure all data is sent
370                tls_stream
371                    .flush()
372                    .await
373                    .map_err(|e| Error::Io(Arc::new(e)))?;
374
375                tracing::debug!("Login7 sent through TLS, switching to plaintext for response");
376
377                // Extract the underlying TCP stream from the TLS layer
378                // TlsStream::into_inner() returns (IO, ClientConnection)
379                // where IO is our TlsPreloginWrapper<TcpStream>
380                let (wrapper, _client_conn) = tls_stream.into_inner();
381                let tcp_stream = wrapper.into_inner();
382
383                // Create Connection from plain TCP for reading response
384                let mut connection = Connection::new(tcp_stream);
385
386                // Process login response (comes in plaintext)
387                let (server_version, current_database, routing) =
388                    Self::process_login_response(&mut connection).await?;
389
390                // Handle routing redirect
391                if let Some((host, port)) = routing {
392                    return Err(Error::Routing { host, port });
393                }
394
395                // Store plain TCP connection for subsequent operations
396                Ok(Client {
397                    config: config.clone(),
398                    _state: PhantomData,
399                    connection: Some(ConnectionHandle::Plain(connection)),
400                    server_version,
401                    current_database: current_database.clone(),
402                    statement_cache: StatementCache::with_default_size(),
403                    transaction_descriptor: 0, // Auto-commit mode initially
404                    #[cfg(feature = "otel")]
405                    instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
406                        .with_database(current_database.unwrap_or_default()),
407                })
408            } else {
409                // Full Encryption (ENCRYPT_ON per MS-TDS spec):
410                // - All communication after TLS handshake goes through TLS
411                let mut connection = Connection::new(tls_stream);
412
413                // Send Login7
414                let login = Self::build_login7(config);
415                Self::send_login7(&mut connection, &login).await?;
416
417                // Process login response
418                let (server_version, current_database, routing) =
419                    Self::process_login_response(&mut connection).await?;
420
421                // Handle routing redirect
422                if let Some((host, port)) = routing {
423                    return Err(Error::Routing { host, port });
424                }
425
426                Ok(Client {
427                    config: config.clone(),
428                    _state: PhantomData,
429                    connection: Some(ConnectionHandle::TlsPrelogin(connection)),
430                    server_version,
431                    current_database: current_database.clone(),
432                    statement_cache: StatementCache::with_default_size(),
433                    transaction_descriptor: 0, // Auto-commit mode initially
434                    #[cfg(feature = "otel")]
435                    instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
436                        .with_database(current_database.unwrap_or_default()),
437                })
438            }
439        } else {
440            // Server does not require encryption and client doesn't either
441            tracing::warn!(
442                "Connecting without TLS encryption. This is insecure and should only be \
443                 used for development/testing on trusted networks."
444            );
445
446            // Build Login7 packet
447            let login = Self::build_login7(config);
448            let login_bytes = login.encode();
449            tracing::debug!("Login7 packet built: {} bytes", login_bytes.len(),);
450            // Dump the fixed header (94 bytes)
451            tracing::debug!(
452                "Login7 fixed header (94 bytes): {:02X?}",
453                &login_bytes[..login_bytes.len().min(94)]
454            );
455            // Dump variable data
456            if login_bytes.len() > 94 {
457                tracing::debug!(
458                    "Login7 variable data ({} bytes): {:02X?}",
459                    login_bytes.len() - 94,
460                    &login_bytes[94..]
461                );
462            }
463
464            // Send Login7 over raw TCP (like PreLogin)
465            let login_header = PacketHeader::new(
466                PacketType::Tds7Login,
467                PacketStatus::END_OF_MESSAGE,
468                (PACKET_HEADER_SIZE + login_bytes.len()) as u16,
469            )
470            .with_packet_id(1);
471            let mut login_packet_buf =
472                BytesMut::with_capacity(PACKET_HEADER_SIZE + login_bytes.len());
473            login_header.encode(&mut login_packet_buf);
474            login_packet_buf.put_slice(&login_bytes);
475
476            tracing::debug!(
477                "Sending Login7 packet: {} bytes total, header: {:02X?}",
478                login_packet_buf.len(),
479                &login_packet_buf[..PACKET_HEADER_SIZE]
480            );
481            tcp_stream
482                .write_all(&login_packet_buf)
483                .await
484                .map_err(|e| Error::Io(Arc::new(e)))?;
485            tcp_stream
486                .flush()
487                .await
488                .map_err(|e| Error::Io(Arc::new(e)))?;
489            tracing::debug!("Login7 sent and flushed over raw TCP");
490
491            // Read login response header
492            let mut response_header_buf = [0u8; PACKET_HEADER_SIZE];
493            tcp_stream
494                .read_exact(&mut response_header_buf)
495                .await
496                .map_err(|e| Error::Io(Arc::new(e)))?;
497
498            let response_type = response_header_buf[0];
499            let response_length =
500                u16::from_be_bytes([response_header_buf[2], response_header_buf[3]]) as usize;
501            tracing::debug!(
502                "Response header: type={:#04X}, length={}",
503                response_type,
504                response_length
505            );
506
507            // Read response payload
508            let payload_length = response_length.saturating_sub(PACKET_HEADER_SIZE);
509            let mut response_payload = vec![0u8; payload_length];
510            tcp_stream
511                .read_exact(&mut response_payload)
512                .await
513                .map_err(|e| Error::Io(Arc::new(e)))?;
514            tracing::debug!(
515                "Response payload: {} bytes, first 32: {:02X?}",
516                response_payload.len(),
517                &response_payload[..response_payload.len().min(32)]
518            );
519
520            // Now create Connection for further communication
521            let connection = Connection::new(tcp_stream);
522
523            // Parse login response
524            let response_bytes = bytes::Bytes::from(response_payload);
525            let mut parser = TokenParser::new(response_bytes);
526            let mut server_version = None;
527            let mut current_database = None;
528            let routing = None;
529
530            while let Some(token) = parser
531                .next_token()
532                .map_err(|e| Error::Protocol(e.to_string()))?
533            {
534                match token {
535                    Token::LoginAck(ack) => {
536                        tracing::info!(
537                            version = ack.tds_version,
538                            interface = ack.interface,
539                            prog_name = %ack.prog_name,
540                            "login acknowledged"
541                        );
542                        server_version = Some(ack.tds_version);
543                    }
544                    Token::EnvChange(env) => {
545                        Self::process_env_change(&env, &mut current_database, &mut None);
546                    }
547                    Token::Error(err) => {
548                        return Err(Error::Server {
549                            number: err.number,
550                            state: err.state,
551                            class: err.class,
552                            message: err.message.clone(),
553                            server: if err.server.is_empty() {
554                                None
555                            } else {
556                                Some(err.server.clone())
557                            },
558                            procedure: if err.procedure.is_empty() {
559                                None
560                            } else {
561                                Some(err.procedure.clone())
562                            },
563                            line: err.line as u32,
564                        });
565                    }
566                    Token::Info(info) => {
567                        tracing::info!(
568                            number = info.number,
569                            message = %info.message,
570                            "server info message"
571                        );
572                    }
573                    Token::Done(done) => {
574                        if done.status.error {
575                            return Err(Error::Protocol("login failed".to_string()));
576                        }
577                        break;
578                    }
579                    _ => {}
580                }
581            }
582
583            // Handle routing redirect
584            if let Some((host, port)) = routing {
585                return Err(Error::Routing { host, port });
586            }
587
588            Ok(Client {
589                config: config.clone(),
590                _state: PhantomData,
591                connection: Some(ConnectionHandle::Plain(connection)),
592                server_version,
593                current_database: current_database.clone(),
594                statement_cache: StatementCache::with_default_size(),
595                transaction_descriptor: 0, // Auto-commit mode initially
596                #[cfg(feature = "otel")]
597                instrumentation: InstrumentationContext::new(config.host.clone(), config.port)
598                    .with_database(current_database.unwrap_or_default()),
599            })
600        }
601    }
602
603    /// Build a PreLogin packet.
604    fn build_prelogin(config: &Config, encryption: EncryptionLevel) -> PreLogin {
605        let mut prelogin = PreLogin::new().with_encryption(encryption);
606
607        if config.mars {
608            prelogin = prelogin.with_mars(true);
609        }
610
611        if let Some(ref instance) = config.instance {
612            prelogin = prelogin.with_instance(instance);
613        }
614
615        prelogin
616    }
617
618    /// Build a Login7 packet.
619    fn build_login7(config: &Config) -> Login7 {
620        let mut login = Login7::new()
621            .with_packet_size(config.packet_size as u32)
622            .with_app_name(&config.application_name)
623            .with_server_name(&config.host)
624            .with_hostname(&config.host);
625
626        if let Some(ref database) = config.database {
627            login = login.with_database(database);
628        }
629
630        // Set credentials
631        match &config.credentials {
632            mssql_auth::Credentials::SqlServer { username, password } => {
633                login = login.with_sql_auth(username.as_ref(), password.as_ref());
634            }
635            // Other credential types would be handled here
636            _ => {}
637        }
638
639        login
640    }
641
642    /// Send a PreLogin packet (for use with Connection).
643    async fn send_prelogin<T>(connection: &mut Connection<T>, prelogin: &PreLogin) -> Result<()>
644    where
645        T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
646    {
647        let payload = prelogin.encode();
648        let max_packet = MAX_PACKET_SIZE;
649
650        connection
651            .send_message(PacketType::PreLogin, payload, max_packet)
652            .await
653            .map_err(|e| Error::Protocol(e.to_string()))
654    }
655
656    /// Receive a PreLogin response (for use with Connection).
657    async fn receive_prelogin<T>(connection: &mut Connection<T>) -> Result<PreLogin>
658    where
659        T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
660    {
661        let message = connection
662            .read_message()
663            .await
664            .map_err(|e| Error::Protocol(e.to_string()))?
665            .ok_or(Error::ConnectionClosed)?;
666
667        PreLogin::decode(&message.payload[..]).map_err(|e| Error::Protocol(e.to_string()))
668    }
669
670    /// Send a Login7 packet.
671    async fn send_login7<T>(connection: &mut Connection<T>, login: &Login7) -> Result<()>
672    where
673        T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
674    {
675        let payload = login.encode();
676        let max_packet = MAX_PACKET_SIZE;
677
678        connection
679            .send_message(PacketType::Tds7Login, payload, max_packet)
680            .await
681            .map_err(|e| Error::Protocol(e.to_string()))
682    }
683
684    /// Process the login response tokens.
685    ///
686    /// Returns: (server_version, database, routing_info)
687    async fn process_login_response<T>(
688        connection: &mut Connection<T>,
689    ) -> Result<(Option<u32>, Option<String>, Option<(String, u16)>)>
690    where
691        T: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
692    {
693        let message = connection
694            .read_message()
695            .await
696            .map_err(|e| Error::Protocol(e.to_string()))?
697            .ok_or(Error::ConnectionClosed)?;
698
699        let response_bytes = message.payload;
700
701        let mut parser = TokenParser::new(response_bytes);
702        let mut server_version = None;
703        let mut database = None;
704        let mut routing = None;
705
706        while let Some(token) = parser
707            .next_token()
708            .map_err(|e| Error::Protocol(e.to_string()))?
709        {
710            match token {
711                Token::LoginAck(ack) => {
712                    tracing::info!(
713                        version = ack.tds_version,
714                        interface = ack.interface,
715                        prog_name = %ack.prog_name,
716                        "login acknowledged"
717                    );
718                    server_version = Some(ack.tds_version);
719                }
720                Token::EnvChange(env) => {
721                    Self::process_env_change(&env, &mut database, &mut routing);
722                }
723                Token::Error(err) => {
724                    return Err(Error::Server {
725                        number: err.number,
726                        state: err.state,
727                        class: err.class,
728                        message: err.message.clone(),
729                        server: if err.server.is_empty() {
730                            None
731                        } else {
732                            Some(err.server.clone())
733                        },
734                        procedure: if err.procedure.is_empty() {
735                            None
736                        } else {
737                            Some(err.procedure.clone())
738                        },
739                        line: err.line as u32,
740                    });
741                }
742                Token::Info(info) => {
743                    tracing::info!(
744                        number = info.number,
745                        message = %info.message,
746                        "server info message"
747                    );
748                }
749                Token::Done(done) => {
750                    if done.status.error {
751                        return Err(Error::Protocol("login failed".to_string()));
752                    }
753                    break;
754                }
755                _ => {}
756            }
757        }
758
759        Ok((server_version, database, routing))
760    }
761
762    /// Process an EnvChange token.
763    fn process_env_change(
764        env: &EnvChange,
765        database: &mut Option<String>,
766        routing: &mut Option<(String, u16)>,
767    ) {
768        use tds_protocol::token::EnvChangeValue;
769
770        match env.env_type {
771            EnvChangeType::Database => {
772                if let EnvChangeValue::String(ref new_value) = env.new_value {
773                    tracing::debug!(database = %new_value, "database changed");
774                    *database = Some(new_value.clone());
775                }
776            }
777            EnvChangeType::Routing => {
778                if let EnvChangeValue::Routing { ref host, port } = env.new_value {
779                    tracing::info!(host = %host, port = port, "routing redirect received");
780                    *routing = Some((host.clone(), port));
781                }
782            }
783            _ => {
784                if let EnvChangeValue::String(ref new_value) = env.new_value {
785                    tracing::debug!(
786                        env_type = ?env.env_type,
787                        new_value = %new_value,
788                        "environment change"
789                    );
790                }
791            }
792        }
793    }
794}
795
796// Private helper methods available to all connection states
797impl<S: ConnectionState> Client<S> {
798    /// Process transaction-related EnvChange tokens.
799    ///
800    /// This handles BeginTransaction, CommitTransaction, and RollbackTransaction
801    /// EnvChange tokens, updating the transaction descriptor accordingly.
802    ///
803    /// This enables executing BEGIN TRANSACTION, COMMIT, and ROLLBACK via raw SQL
804    /// while still having the transaction descriptor tracked correctly.
805    fn process_transaction_env_change(env: &EnvChange, transaction_descriptor: &mut u64) {
806        use tds_protocol::token::EnvChangeValue;
807
808        match env.env_type {
809            EnvChangeType::BeginTransaction => {
810                if let EnvChangeValue::Binary(ref data) = env.new_value {
811                    if data.len() >= 8 {
812                        let descriptor = u64::from_le_bytes([
813                            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
814                        ]);
815                        tracing::debug!(descriptor = descriptor, "transaction started via raw SQL");
816                        *transaction_descriptor = descriptor;
817                    }
818                }
819            }
820            EnvChangeType::CommitTransaction | EnvChangeType::RollbackTransaction => {
821                tracing::debug!(
822                    env_type = ?env.env_type,
823                    "transaction ended via raw SQL"
824                );
825                *transaction_descriptor = 0;
826            }
827            _ => {}
828        }
829    }
830
831    /// Send a SQL batch to the server.
832    ///
833    /// Uses the client's current transaction descriptor in ALL_HEADERS.
834    /// Per MS-TDS spec, when in an explicit transaction, the descriptor
835    /// returned by BeginTransaction must be included.
836    async fn send_sql_batch(&mut self, sql: &str) -> Result<()> {
837        let payload =
838            tds_protocol::encode_sql_batch_with_transaction(sql, self.transaction_descriptor);
839        let max_packet = self.config.packet_size as usize;
840
841        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
842
843        match connection {
844            ConnectionHandle::Tls(conn) => {
845                conn.send_message(PacketType::SqlBatch, payload, max_packet)
846                    .await
847                    .map_err(|e| Error::Protocol(e.to_string()))?;
848            }
849            ConnectionHandle::TlsPrelogin(conn) => {
850                conn.send_message(PacketType::SqlBatch, payload, max_packet)
851                    .await
852                    .map_err(|e| Error::Protocol(e.to_string()))?;
853            }
854            ConnectionHandle::Plain(conn) => {
855                conn.send_message(PacketType::SqlBatch, payload, max_packet)
856                    .await
857                    .map_err(|e| Error::Protocol(e.to_string()))?;
858            }
859        }
860
861        Ok(())
862    }
863
864    /// Send an RPC request to the server.
865    ///
866    /// Uses the client's current transaction descriptor in ALL_HEADERS.
867    async fn send_rpc(&mut self, rpc: &RpcRequest) -> Result<()> {
868        let payload = rpc.encode_with_transaction(self.transaction_descriptor);
869        let max_packet = self.config.packet_size as usize;
870
871        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
872
873        match connection {
874            ConnectionHandle::Tls(conn) => {
875                conn.send_message(PacketType::Rpc, payload, max_packet)
876                    .await
877                    .map_err(|e| Error::Protocol(e.to_string()))?;
878            }
879            ConnectionHandle::TlsPrelogin(conn) => {
880                conn.send_message(PacketType::Rpc, payload, max_packet)
881                    .await
882                    .map_err(|e| Error::Protocol(e.to_string()))?;
883            }
884            ConnectionHandle::Plain(conn) => {
885                conn.send_message(PacketType::Rpc, payload, max_packet)
886                    .await
887                    .map_err(|e| Error::Protocol(e.to_string()))?;
888            }
889        }
890
891        Ok(())
892    }
893
894    /// Convert ToSql parameters to RPC parameters.
895    fn convert_params(params: &[&(dyn crate::ToSql + Sync)]) -> Result<Vec<RpcParam>> {
896        use bytes::{BufMut, BytesMut};
897        use mssql_types::SqlValue;
898
899        params
900            .iter()
901            .enumerate()
902            .map(|(i, p)| {
903                let sql_value = p.to_sql()?;
904                let name = format!("@p{}", i + 1);
905
906                Ok(match sql_value {
907                    SqlValue::Null => RpcParam::null(&name, RpcTypeInfo::nvarchar(1)),
908                    SqlValue::Bool(v) => {
909                        let mut buf = BytesMut::with_capacity(1);
910                        buf.put_u8(if v { 1 } else { 0 });
911                        RpcParam::new(&name, RpcTypeInfo::bit(), buf.freeze())
912                    }
913                    SqlValue::TinyInt(v) => {
914                        let mut buf = BytesMut::with_capacity(1);
915                        buf.put_u8(v);
916                        RpcParam::new(&name, RpcTypeInfo::tinyint(), buf.freeze())
917                    }
918                    SqlValue::SmallInt(v) => {
919                        let mut buf = BytesMut::with_capacity(2);
920                        buf.put_i16_le(v);
921                        RpcParam::new(&name, RpcTypeInfo::smallint(), buf.freeze())
922                    }
923                    SqlValue::Int(v) => RpcParam::int(&name, v),
924                    SqlValue::BigInt(v) => RpcParam::bigint(&name, v),
925                    SqlValue::Float(v) => {
926                        let mut buf = BytesMut::with_capacity(4);
927                        buf.put_f32_le(v);
928                        RpcParam::new(&name, RpcTypeInfo::real(), buf.freeze())
929                    }
930                    SqlValue::Double(v) => {
931                        let mut buf = BytesMut::with_capacity(8);
932                        buf.put_f64_le(v);
933                        RpcParam::new(&name, RpcTypeInfo::float(), buf.freeze())
934                    }
935                    SqlValue::String(ref s) => RpcParam::nvarchar(&name, s),
936                    SqlValue::Binary(ref b) => {
937                        RpcParam::new(&name, RpcTypeInfo::varbinary(b.len() as u16), b.clone())
938                    }
939                    SqlValue::Xml(ref s) => RpcParam::nvarchar(&name, s),
940                    #[cfg(feature = "uuid")]
941                    SqlValue::Uuid(u) => {
942                        // UUID is stored in a specific byte order for SQL Server
943                        let bytes = u.as_bytes();
944                        let mut buf = BytesMut::with_capacity(16);
945                        // SQL Server stores GUIDs in mixed-endian format
946                        buf.put_u32_le(u32::from_be_bytes([
947                            bytes[0], bytes[1], bytes[2], bytes[3],
948                        ]));
949                        buf.put_u16_le(u16::from_be_bytes([bytes[4], bytes[5]]));
950                        buf.put_u16_le(u16::from_be_bytes([bytes[6], bytes[7]]));
951                        buf.put_slice(&bytes[8..16]);
952                        RpcParam::new(&name, RpcTypeInfo::uniqueidentifier(), buf.freeze())
953                    }
954                    #[cfg(feature = "decimal")]
955                    SqlValue::Decimal(d) => {
956                        // Decimal encoding is complex; use string representation for now
957                        RpcParam::nvarchar(&name, &d.to_string())
958                    }
959                    #[cfg(feature = "chrono")]
960                    SqlValue::Date(_)
961                    | SqlValue::Time(_)
962                    | SqlValue::DateTime(_)
963                    | SqlValue::DateTimeOffset(_) => {
964                        // For date/time types, use string representation for simplicity
965                        // A full implementation would encode these properly
966                        let s = match &sql_value {
967                            SqlValue::Date(d) => d.to_string(),
968                            SqlValue::Time(t) => t.to_string(),
969                            SqlValue::DateTime(dt) => dt.to_string(),
970                            SqlValue::DateTimeOffset(dto) => dto.to_rfc3339(),
971                            _ => unreachable!(),
972                        };
973                        RpcParam::nvarchar(&name, &s)
974                    }
975                    #[cfg(feature = "json")]
976                    SqlValue::Json(ref j) => RpcParam::nvarchar(&name, &j.to_string()),
977                    SqlValue::Tvp(ref tvp_data) => {
978                        // Encode TVP using the wire format
979                        Self::encode_tvp_param(&name, tvp_data)?
980                    }
981                    // Handle future SqlValue variants
982                    _ => {
983                        return Err(Error::Type(mssql_types::TypeError::UnsupportedConversion {
984                            from: sql_value.type_name().to_string(),
985                            to: "RPC parameter",
986                        }));
987                    }
988                })
989            })
990            .collect()
991    }
992
993    /// Encode a TVP parameter for RPC.
994    ///
995    /// This encodes the complete TVP structure including metadata and row data
996    /// into the TDS wire format.
997    fn encode_tvp_param(name: &str, tvp_data: &mssql_types::TvpData) -> Result<RpcParam> {
998        // Convert mssql-types column definitions to wire format
999        let wire_columns: Vec<TvpWireColumnDef> = tvp_data
1000            .columns
1001            .iter()
1002            .map(|col| {
1003                let wire_type = Self::convert_tvp_column_type(&col.column_type);
1004                TvpWireColumnDef {
1005                    wire_type,
1006                    flags: TvpColumnFlags {
1007                        nullable: col.nullable,
1008                    },
1009                }
1010            })
1011            .collect();
1012
1013        // Create encoder
1014        let encoder = TvpEncoder::new(&tvp_data.schema, &tvp_data.type_name, &wire_columns);
1015
1016        // Encode to buffer
1017        let mut buf = BytesMut::with_capacity(256);
1018
1019        // Encode metadata
1020        encoder.encode_metadata(&mut buf);
1021
1022        // Encode each row
1023        for row in &tvp_data.rows {
1024            encoder.encode_row(&mut buf, |row_buf| {
1025                for (col_idx, value) in row.iter().enumerate() {
1026                    let wire_type = &wire_columns[col_idx].wire_type;
1027                    Self::encode_tvp_value(value, wire_type, row_buf);
1028                }
1029            });
1030        }
1031
1032        // Encode end marker
1033        encoder.encode_end(&mut buf);
1034
1035        // Create RPC param with TVP type info
1036        // For TVP, we use a special type info that indicates the parameter is a TVP
1037        // The type info is encoded as part of the TVP data itself
1038        let type_info = RpcTypeInfo {
1039            type_id: 0xF3, // TVP type
1040            max_length: None,
1041            precision: None,
1042            scale: None,
1043            collation: None,
1044        };
1045
1046        Ok(RpcParam {
1047            name: name.to_string(),
1048            flags: tds_protocol::rpc::ParamFlags::default(),
1049            type_info,
1050            value: Some(buf.freeze()),
1051        })
1052    }
1053
1054    /// Convert mssql-types TvpColumnType to wire TvpWireType.
1055    fn convert_tvp_column_type(col_type: &mssql_types::TvpColumnType) -> TvpWireType {
1056        match col_type {
1057            mssql_types::TvpColumnType::Bit => TvpWireType::Bit,
1058            mssql_types::TvpColumnType::TinyInt => TvpWireType::Int { size: 1 },
1059            mssql_types::TvpColumnType::SmallInt => TvpWireType::Int { size: 2 },
1060            mssql_types::TvpColumnType::Int => TvpWireType::Int { size: 4 },
1061            mssql_types::TvpColumnType::BigInt => TvpWireType::Int { size: 8 },
1062            mssql_types::TvpColumnType::Real => TvpWireType::Float { size: 4 },
1063            mssql_types::TvpColumnType::Float => TvpWireType::Float { size: 8 },
1064            mssql_types::TvpColumnType::Decimal { precision, scale } => TvpWireType::Decimal {
1065                precision: *precision,
1066                scale: *scale,
1067            },
1068            mssql_types::TvpColumnType::NVarChar { max_length } => TvpWireType::NVarChar {
1069                max_length: *max_length,
1070            },
1071            mssql_types::TvpColumnType::VarChar { max_length } => TvpWireType::VarChar {
1072                max_length: *max_length,
1073            },
1074            mssql_types::TvpColumnType::VarBinary { max_length } => TvpWireType::VarBinary {
1075                max_length: *max_length,
1076            },
1077            mssql_types::TvpColumnType::UniqueIdentifier => TvpWireType::Guid,
1078            mssql_types::TvpColumnType::Date => TvpWireType::Date,
1079            mssql_types::TvpColumnType::Time { scale } => TvpWireType::Time { scale: *scale },
1080            mssql_types::TvpColumnType::DateTime2 { scale } => {
1081                TvpWireType::DateTime2 { scale: *scale }
1082            }
1083            mssql_types::TvpColumnType::DateTimeOffset { scale } => {
1084                TvpWireType::DateTimeOffset { scale: *scale }
1085            }
1086            mssql_types::TvpColumnType::Xml => TvpWireType::Xml,
1087        }
1088    }
1089
1090    /// Encode a single TVP column value.
1091    fn encode_tvp_value(
1092        value: &mssql_types::SqlValue,
1093        wire_type: &TvpWireType,
1094        buf: &mut BytesMut,
1095    ) {
1096        use mssql_types::SqlValue;
1097
1098        match value {
1099            SqlValue::Null => {
1100                encode_tvp_null(wire_type, buf);
1101            }
1102            SqlValue::Bool(v) => {
1103                encode_tvp_bit(*v, buf);
1104            }
1105            SqlValue::TinyInt(v) => {
1106                encode_tvp_int(*v as i64, 1, buf);
1107            }
1108            SqlValue::SmallInt(v) => {
1109                encode_tvp_int(*v as i64, 2, buf);
1110            }
1111            SqlValue::Int(v) => {
1112                encode_tvp_int(*v as i64, 4, buf);
1113            }
1114            SqlValue::BigInt(v) => {
1115                encode_tvp_int(*v, 8, buf);
1116            }
1117            SqlValue::Float(v) => {
1118                encode_tvp_float(*v as f64, 4, buf);
1119            }
1120            SqlValue::Double(v) => {
1121                encode_tvp_float(*v, 8, buf);
1122            }
1123            SqlValue::String(s) => {
1124                let max_len = match wire_type {
1125                    TvpWireType::NVarChar { max_length } => *max_length,
1126                    _ => 4000,
1127                };
1128                encode_tvp_nvarchar(s, max_len, buf);
1129            }
1130            SqlValue::Binary(b) => {
1131                let max_len = match wire_type {
1132                    TvpWireType::VarBinary { max_length } => *max_length,
1133                    _ => 8000,
1134                };
1135                encode_tvp_varbinary(b, max_len, buf);
1136            }
1137            #[cfg(feature = "decimal")]
1138            SqlValue::Decimal(d) => {
1139                let sign = if d.is_sign_negative() { 0u8 } else { 1u8 };
1140                let mantissa = d.mantissa().unsigned_abs();
1141                encode_tvp_decimal(sign, mantissa, buf);
1142            }
1143            #[cfg(feature = "uuid")]
1144            SqlValue::Uuid(u) => {
1145                let bytes = u.as_bytes();
1146                tds_protocol::tvp::encode_tvp_guid(bytes, buf);
1147            }
1148            #[cfg(feature = "chrono")]
1149            SqlValue::Date(d) => {
1150                // Calculate days since 0001-01-01
1151                let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1152                let days = d.signed_duration_since(base).num_days() as u32;
1153                tds_protocol::tvp::encode_tvp_date(days, buf);
1154            }
1155            #[cfg(feature = "chrono")]
1156            SqlValue::Time(t) => {
1157                use chrono::Timelike;
1158                let nanos =
1159                    t.num_seconds_from_midnight() as u64 * 1_000_000_000 + t.nanosecond() as u64;
1160                let intervals = nanos / 100;
1161                let scale = match wire_type {
1162                    TvpWireType::Time { scale } => *scale,
1163                    _ => 7,
1164                };
1165                tds_protocol::tvp::encode_tvp_time(intervals, scale, buf);
1166            }
1167            #[cfg(feature = "chrono")]
1168            SqlValue::DateTime(dt) => {
1169                use chrono::Timelike;
1170                // Time component
1171                let nanos = dt.time().num_seconds_from_midnight() as u64 * 1_000_000_000
1172                    + dt.time().nanosecond() as u64;
1173                let intervals = nanos / 100;
1174                // Date component
1175                let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1176                let days = dt.date().signed_duration_since(base).num_days() as u32;
1177                let scale = match wire_type {
1178                    TvpWireType::DateTime2 { scale } => *scale,
1179                    _ => 7,
1180                };
1181                tds_protocol::tvp::encode_tvp_datetime2(intervals, days, scale, buf);
1182            }
1183            #[cfg(feature = "chrono")]
1184            SqlValue::DateTimeOffset(dto) => {
1185                use chrono::{Offset, Timelike};
1186                // Time component (in 100-nanosecond intervals)
1187                let nanos = dto.time().num_seconds_from_midnight() as u64 * 1_000_000_000
1188                    + dto.time().nanosecond() as u64;
1189                let intervals = nanos / 100;
1190                // Date component (days since 0001-01-01)
1191                let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1192                let days = dto.date_naive().signed_duration_since(base).num_days() as u32;
1193                // Timezone offset in minutes
1194                let offset_minutes = (dto.offset().fix().local_minus_utc() / 60) as i16;
1195                let scale = match wire_type {
1196                    TvpWireType::DateTimeOffset { scale } => *scale,
1197                    _ => 7,
1198                };
1199                tds_protocol::tvp::encode_tvp_datetimeoffset(
1200                    intervals,
1201                    days,
1202                    offset_minutes,
1203                    scale,
1204                    buf,
1205                );
1206            }
1207            #[cfg(feature = "json")]
1208            SqlValue::Json(j) => {
1209                // JSON is encoded as NVARCHAR
1210                encode_tvp_nvarchar(&j.to_string(), 0xFFFF, buf);
1211            }
1212            SqlValue::Xml(s) => {
1213                // XML is encoded as NVARCHAR for TVP
1214                encode_tvp_nvarchar(s, 0xFFFF, buf);
1215            }
1216            SqlValue::Tvp(_) => {
1217                // Nested TVPs are not supported
1218                encode_tvp_null(wire_type, buf);
1219            }
1220            // Handle future SqlValue variants as NULL
1221            _ => {
1222                encode_tvp_null(wire_type, buf);
1223            }
1224        }
1225    }
1226
1227    /// Read complete query response including columns and rows.
1228    async fn read_query_response(
1229        &mut self,
1230    ) -> Result<(Vec<crate::row::Column>, Vec<crate::row::Row>)> {
1231        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
1232
1233        let message = match connection {
1234            ConnectionHandle::Tls(conn) => conn
1235                .read_message()
1236                .await
1237                .map_err(|e| Error::Protocol(e.to_string()))?,
1238            ConnectionHandle::TlsPrelogin(conn) => conn
1239                .read_message()
1240                .await
1241                .map_err(|e| Error::Protocol(e.to_string()))?,
1242            ConnectionHandle::Plain(conn) => conn
1243                .read_message()
1244                .await
1245                .map_err(|e| Error::Protocol(e.to_string()))?,
1246        }
1247        .ok_or(Error::ConnectionClosed)?;
1248
1249        let mut parser = TokenParser::new(message.payload);
1250        let mut columns: Vec<crate::row::Column> = Vec::new();
1251        let mut rows: Vec<crate::row::Row> = Vec::new();
1252        let mut protocol_metadata: Option<ColMetaData> = None;
1253
1254        loop {
1255            // Use next_token_with_metadata to properly parse Row/NbcRow tokens
1256            let token = parser
1257                .next_token_with_metadata(protocol_metadata.as_ref())
1258                .map_err(|e| Error::Protocol(e.to_string()))?;
1259
1260            let Some(token) = token else {
1261                break;
1262            };
1263
1264            match token {
1265                Token::ColMetaData(meta) => {
1266                    // New result set starting - clear previous rows
1267                    // This enables multi-statement batches to return the last result set
1268                    rows.clear();
1269
1270                    columns = meta
1271                        .columns
1272                        .iter()
1273                        .enumerate()
1274                        .map(|(i, col)| {
1275                            let type_name = format!("{:?}", col.type_id);
1276                            let mut column = crate::row::Column::new(&col.name, i, type_name)
1277                                .with_nullable(col.flags & 0x01 != 0);
1278
1279                            if let Some(max_len) = col.type_info.max_length {
1280                                column = column.with_max_length(max_len);
1281                            }
1282                            if let (Some(prec), Some(scale)) =
1283                                (col.type_info.precision, col.type_info.scale)
1284                            {
1285                                column = column.with_precision_scale(prec, scale);
1286                            }
1287                            column
1288                        })
1289                        .collect();
1290
1291                    tracing::debug!(columns = columns.len(), "received column metadata");
1292                    protocol_metadata = Some(meta);
1293                }
1294                Token::Row(raw_row) => {
1295                    if let Some(ref meta) = protocol_metadata {
1296                        let row = Self::convert_raw_row(&raw_row, meta, &columns)?;
1297                        rows.push(row);
1298                    }
1299                }
1300                Token::NbcRow(nbc_row) => {
1301                    if let Some(ref meta) = protocol_metadata {
1302                        let row = Self::convert_nbc_row(&nbc_row, meta, &columns)?;
1303                        rows.push(row);
1304                    }
1305                }
1306                Token::Error(err) => {
1307                    return Err(Error::Server {
1308                        number: err.number,
1309                        state: err.state,
1310                        class: err.class,
1311                        message: err.message.clone(),
1312                        server: if err.server.is_empty() {
1313                            None
1314                        } else {
1315                            Some(err.server.clone())
1316                        },
1317                        procedure: if err.procedure.is_empty() {
1318                            None
1319                        } else {
1320                            Some(err.procedure.clone())
1321                        },
1322                        line: err.line as u32,
1323                    });
1324                }
1325                Token::Done(done) => {
1326                    if done.status.error {
1327                        return Err(Error::Query("query failed".to_string()));
1328                    }
1329                    tracing::debug!(
1330                        row_count = done.row_count,
1331                        has_more = done.status.more,
1332                        "query complete"
1333                    );
1334                    // Only break if there are no more result sets
1335                    // This enables multi-statement batches to process all results
1336                    if !done.status.more {
1337                        break;
1338                    }
1339                }
1340                Token::DoneProc(done) => {
1341                    if done.status.error {
1342                        return Err(Error::Query("query failed".to_string()));
1343                    }
1344                }
1345                Token::DoneInProc(done) => {
1346                    if done.status.error {
1347                        return Err(Error::Query("query failed".to_string()));
1348                    }
1349                }
1350                Token::Info(info) => {
1351                    tracing::debug!(
1352                        number = info.number,
1353                        message = %info.message,
1354                        "server info message"
1355                    );
1356                }
1357                Token::EnvChange(env) => {
1358                    // Process transaction-related EnvChange tokens.
1359                    // This allows BEGIN TRANSACTION, COMMIT, ROLLBACK via raw SQL
1360                    // to properly update the transaction descriptor.
1361                    Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
1362                }
1363                _ => {}
1364            }
1365        }
1366
1367        tracing::debug!(
1368            columns = columns.len(),
1369            rows = rows.len(),
1370            "query response parsed"
1371        );
1372        Ok((columns, rows))
1373    }
1374
1375    /// Convert a RawRow to a client Row.
1376    ///
1377    /// This parses the raw bytes back into SqlValue types based on column metadata.
1378    fn convert_raw_row(
1379        raw: &RawRow,
1380        meta: &ColMetaData,
1381        columns: &[crate::row::Column],
1382    ) -> Result<crate::row::Row> {
1383        let mut values = Vec::with_capacity(meta.columns.len());
1384        let mut buf = raw.data.as_ref();
1385
1386        for col in &meta.columns {
1387            let value = Self::parse_column_value(&mut buf, col)?;
1388            values.push(value);
1389        }
1390
1391        Ok(crate::row::Row::from_values(columns.to_vec(), values))
1392    }
1393
1394    /// Convert an NbcRow to a client Row.
1395    ///
1396    /// NbcRow has a null bitmap followed by only non-null values.
1397    fn convert_nbc_row(
1398        nbc: &NbcRow,
1399        meta: &ColMetaData,
1400        columns: &[crate::row::Column],
1401    ) -> Result<crate::row::Row> {
1402        let mut values = Vec::with_capacity(meta.columns.len());
1403        let mut buf = nbc.data.as_ref();
1404
1405        for (i, col) in meta.columns.iter().enumerate() {
1406            if nbc.is_null(i) {
1407                values.push(mssql_types::SqlValue::Null);
1408            } else {
1409                let value = Self::parse_column_value(&mut buf, col)?;
1410                values.push(value);
1411            }
1412        }
1413
1414        Ok(crate::row::Row::from_values(columns.to_vec(), values))
1415    }
1416
1417    /// Parse a single column value from a buffer based on column metadata.
1418    fn parse_column_value(buf: &mut &[u8], col: &ColumnData) -> Result<mssql_types::SqlValue> {
1419        use bytes::Buf;
1420        use mssql_types::SqlValue;
1421        use tds_protocol::types::TypeId;
1422
1423        let value = match col.type_id {
1424            // Fixed-length null type
1425            TypeId::Null => SqlValue::Null,
1426
1427            // 1-byte types
1428            TypeId::Int1 => {
1429                if buf.remaining() < 1 {
1430                    return Err(Error::Protocol("unexpected EOF reading TINYINT".into()));
1431                }
1432                SqlValue::TinyInt(buf.get_u8())
1433            }
1434            TypeId::Bit => {
1435                if buf.remaining() < 1 {
1436                    return Err(Error::Protocol("unexpected EOF reading BIT".into()));
1437                }
1438                SqlValue::Bool(buf.get_u8() != 0)
1439            }
1440
1441            // 2-byte types
1442            TypeId::Int2 => {
1443                if buf.remaining() < 2 {
1444                    return Err(Error::Protocol("unexpected EOF reading SMALLINT".into()));
1445                }
1446                SqlValue::SmallInt(buf.get_i16_le())
1447            }
1448
1449            // 4-byte types
1450            TypeId::Int4 => {
1451                if buf.remaining() < 4 {
1452                    return Err(Error::Protocol("unexpected EOF reading INT".into()));
1453                }
1454                SqlValue::Int(buf.get_i32_le())
1455            }
1456            TypeId::Float4 => {
1457                if buf.remaining() < 4 {
1458                    return Err(Error::Protocol("unexpected EOF reading REAL".into()));
1459                }
1460                SqlValue::Float(buf.get_f32_le())
1461            }
1462
1463            // 8-byte types
1464            TypeId::Int8 => {
1465                if buf.remaining() < 8 {
1466                    return Err(Error::Protocol("unexpected EOF reading BIGINT".into()));
1467                }
1468                SqlValue::BigInt(buf.get_i64_le())
1469            }
1470            TypeId::Float8 => {
1471                if buf.remaining() < 8 {
1472                    return Err(Error::Protocol("unexpected EOF reading FLOAT".into()));
1473                }
1474                SqlValue::Double(buf.get_f64_le())
1475            }
1476            TypeId::Money => {
1477                if buf.remaining() < 8 {
1478                    return Err(Error::Protocol("unexpected EOF reading MONEY".into()));
1479                }
1480                // MONEY is stored as 8 bytes, fixed-point with 4 decimal places
1481                let high = buf.get_i32_le();
1482                let low = buf.get_u32_le();
1483                let cents = ((high as i64) << 32) | (low as i64);
1484                let value = (cents as f64) / 10000.0;
1485                SqlValue::Double(value)
1486            }
1487            TypeId::Money4 => {
1488                if buf.remaining() < 4 {
1489                    return Err(Error::Protocol("unexpected EOF reading SMALLMONEY".into()));
1490                }
1491                let cents = buf.get_i32_le();
1492                let value = (cents as f64) / 10000.0;
1493                SqlValue::Double(value)
1494            }
1495
1496            // Variable-length nullable types (IntN, FloatN, etc.)
1497            TypeId::IntN => {
1498                if buf.remaining() < 1 {
1499                    return Err(Error::Protocol("unexpected EOF reading IntN length".into()));
1500                }
1501                let len = buf.get_u8();
1502                match len {
1503                    0 => SqlValue::Null,
1504                    1 => SqlValue::TinyInt(buf.get_u8()),
1505                    2 => SqlValue::SmallInt(buf.get_i16_le()),
1506                    4 => SqlValue::Int(buf.get_i32_le()),
1507                    8 => SqlValue::BigInt(buf.get_i64_le()),
1508                    _ => {
1509                        return Err(Error::Protocol(format!("invalid IntN length: {len}")));
1510                    }
1511                }
1512            }
1513            TypeId::FloatN => {
1514                if buf.remaining() < 1 {
1515                    return Err(Error::Protocol(
1516                        "unexpected EOF reading FloatN length".into(),
1517                    ));
1518                }
1519                let len = buf.get_u8();
1520                match len {
1521                    0 => SqlValue::Null,
1522                    4 => SqlValue::Float(buf.get_f32_le()),
1523                    8 => SqlValue::Double(buf.get_f64_le()),
1524                    _ => {
1525                        return Err(Error::Protocol(format!("invalid FloatN length: {len}")));
1526                    }
1527                }
1528            }
1529            TypeId::BitN => {
1530                if buf.remaining() < 1 {
1531                    return Err(Error::Protocol("unexpected EOF reading BitN length".into()));
1532                }
1533                let len = buf.get_u8();
1534                match len {
1535                    0 => SqlValue::Null,
1536                    1 => SqlValue::Bool(buf.get_u8() != 0),
1537                    _ => {
1538                        return Err(Error::Protocol(format!("invalid BitN length: {len}")));
1539                    }
1540                }
1541            }
1542            TypeId::MoneyN => {
1543                if buf.remaining() < 1 {
1544                    return Err(Error::Protocol(
1545                        "unexpected EOF reading MoneyN length".into(),
1546                    ));
1547                }
1548                let len = buf.get_u8();
1549                match len {
1550                    0 => SqlValue::Null,
1551                    4 => {
1552                        let cents = buf.get_i32_le();
1553                        SqlValue::Double((cents as f64) / 10000.0)
1554                    }
1555                    8 => {
1556                        let high = buf.get_i32_le();
1557                        let low = buf.get_u32_le();
1558                        let cents = ((high as i64) << 32) | (low as i64);
1559                        SqlValue::Double((cents as f64) / 10000.0)
1560                    }
1561                    _ => {
1562                        return Err(Error::Protocol(format!("invalid MoneyN length: {len}")));
1563                    }
1564                }
1565            }
1566            // DECIMAL/NUMERIC types (1-byte length prefix)
1567            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1568                if buf.remaining() < 1 {
1569                    return Err(Error::Protocol(
1570                        "unexpected EOF reading DECIMAL/NUMERIC length".into(),
1571                    ));
1572                }
1573                let len = buf.get_u8() as usize;
1574                if len == 0 {
1575                    SqlValue::Null
1576                } else {
1577                    if buf.remaining() < len {
1578                        return Err(Error::Protocol(
1579                            "unexpected EOF reading DECIMAL/NUMERIC data".into(),
1580                        ));
1581                    }
1582
1583                    // First byte is sign: 0 = negative, 1 = positive
1584                    let sign = buf.get_u8();
1585                    let mantissa_len = len - 1;
1586
1587                    // Read mantissa as little-endian integer (up to 16 bytes for max precision 38)
1588                    let mut mantissa_bytes = [0u8; 16];
1589                    for i in 0..mantissa_len.min(16) {
1590                        mantissa_bytes[i] = buf.get_u8();
1591                    }
1592                    // Skip any excess bytes (shouldn't happen with valid data)
1593                    for _ in 16..mantissa_len {
1594                        buf.get_u8();
1595                    }
1596
1597                    let mantissa = u128::from_le_bytes(mantissa_bytes);
1598                    let scale = col.type_info.scale.unwrap_or(0) as u32;
1599
1600                    #[cfg(feature = "decimal")]
1601                    {
1602                        use rust_decimal::Decimal;
1603                        // rust_decimal supports max scale of 28
1604                        // For scales > 28, fall back to f64 to avoid overflow/hang
1605                        if scale > 28 {
1606                            // Fall back to f64 for high-scale decimals
1607                            let divisor = 10f64.powi(scale as i32);
1608                            let value = (mantissa as f64) / divisor;
1609                            let value = if sign == 0 { -value } else { value };
1610                            SqlValue::Double(value)
1611                        } else {
1612                            let mut decimal =
1613                                Decimal::from_i128_with_scale(mantissa as i128, scale);
1614                            if sign == 0 {
1615                                decimal.set_sign_negative(true);
1616                            }
1617                            SqlValue::Decimal(decimal)
1618                        }
1619                    }
1620
1621                    #[cfg(not(feature = "decimal"))]
1622                    {
1623                        // Without the decimal feature, convert to f64
1624                        let divisor = 10f64.powi(scale as i32);
1625                        let value = (mantissa as f64) / divisor;
1626                        let value = if sign == 0 { -value } else { value };
1627                        SqlValue::Double(value)
1628                    }
1629                }
1630            }
1631
1632            // DATETIME/SMALLDATETIME nullable (1-byte length prefix)
1633            TypeId::DateTimeN => {
1634                if buf.remaining() < 1 {
1635                    return Err(Error::Protocol(
1636                        "unexpected EOF reading DateTimeN length".into(),
1637                    ));
1638                }
1639                let len = buf.get_u8() as usize;
1640                if len == 0 {
1641                    SqlValue::Null
1642                } else if buf.remaining() < len {
1643                    return Err(Error::Protocol("unexpected EOF reading DateTimeN".into()));
1644                } else {
1645                    match len {
1646                        4 => {
1647                            // SMALLDATETIME: 2 bytes days + 2 bytes minutes
1648                            let days = buf.get_u16_le() as i64;
1649                            let minutes = buf.get_u16_le() as u32;
1650                            #[cfg(feature = "chrono")]
1651                            {
1652                                let base = chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap();
1653                                let date = base + chrono::Duration::days(days);
1654                                let time = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
1655                                    minutes * 60,
1656                                    0,
1657                                )
1658                                .unwrap();
1659                                SqlValue::DateTime(date.and_time(time))
1660                            }
1661                            #[cfg(not(feature = "chrono"))]
1662                            {
1663                                SqlValue::String(format!("SMALLDATETIME({days},{minutes})"))
1664                            }
1665                        }
1666                        8 => {
1667                            // DATETIME: 4 bytes days + 4 bytes 1/300ths of second
1668                            let days = buf.get_i32_le() as i64;
1669                            let time_300ths = buf.get_u32_le() as u64;
1670                            #[cfg(feature = "chrono")]
1671                            {
1672                                let base = chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap();
1673                                let date = base + chrono::Duration::days(days);
1674                                // Convert 300ths of second to nanoseconds
1675                                let total_ms = (time_300ths * 1000) / 300;
1676                                let secs = (total_ms / 1000) as u32;
1677                                let nanos = ((total_ms % 1000) * 1_000_000) as u32;
1678                                let time = chrono::NaiveTime::from_num_seconds_from_midnight_opt(
1679                                    secs, nanos,
1680                                )
1681                                .unwrap();
1682                                SqlValue::DateTime(date.and_time(time))
1683                            }
1684                            #[cfg(not(feature = "chrono"))]
1685                            {
1686                                SqlValue::String(format!("DATETIME({days},{time_300ths})"))
1687                            }
1688                        }
1689                        _ => {
1690                            return Err(Error::Protocol(format!(
1691                                "invalid DateTimeN length: {len}"
1692                            )));
1693                        }
1694                    }
1695                }
1696            }
1697
1698            // Fixed DATETIME (8 bytes)
1699            TypeId::DateTime => {
1700                if buf.remaining() < 8 {
1701                    return Err(Error::Protocol("unexpected EOF reading DATETIME".into()));
1702                }
1703                let days = buf.get_i32_le() as i64;
1704                let time_300ths = buf.get_u32_le() as u64;
1705                #[cfg(feature = "chrono")]
1706                {
1707                    let base = chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap();
1708                    let date = base + chrono::Duration::days(days);
1709                    let total_ms = (time_300ths * 1000) / 300;
1710                    let secs = (total_ms / 1000) as u32;
1711                    let nanos = ((total_ms % 1000) * 1_000_000) as u32;
1712                    let time =
1713                        chrono::NaiveTime::from_num_seconds_from_midnight_opt(secs, nanos).unwrap();
1714                    SqlValue::DateTime(date.and_time(time))
1715                }
1716                #[cfg(not(feature = "chrono"))]
1717                {
1718                    SqlValue::String(format!("DATETIME({days},{time_300ths})"))
1719                }
1720            }
1721
1722            // Fixed SMALLDATETIME (4 bytes)
1723            TypeId::DateTime4 => {
1724                if buf.remaining() < 4 {
1725                    return Err(Error::Protocol(
1726                        "unexpected EOF reading SMALLDATETIME".into(),
1727                    ));
1728                }
1729                let days = buf.get_u16_le() as i64;
1730                let minutes = buf.get_u16_le() as u32;
1731                #[cfg(feature = "chrono")]
1732                {
1733                    let base = chrono::NaiveDate::from_ymd_opt(1900, 1, 1).unwrap();
1734                    let date = base + chrono::Duration::days(days);
1735                    let time =
1736                        chrono::NaiveTime::from_num_seconds_from_midnight_opt(minutes * 60, 0)
1737                            .unwrap();
1738                    SqlValue::DateTime(date.and_time(time))
1739                }
1740                #[cfg(not(feature = "chrono"))]
1741                {
1742                    SqlValue::String(format!("SMALLDATETIME({days},{minutes})"))
1743                }
1744            }
1745
1746            // DATE (3 bytes, nullable with 1-byte length prefix)
1747            TypeId::Date => {
1748                if buf.remaining() < 1 {
1749                    return Err(Error::Protocol("unexpected EOF reading DATE length".into()));
1750                }
1751                let len = buf.get_u8() as usize;
1752                if len == 0 {
1753                    SqlValue::Null
1754                } else if len != 3 {
1755                    return Err(Error::Protocol(format!("invalid DATE length: {len}")));
1756                } else if buf.remaining() < 3 {
1757                    return Err(Error::Protocol("unexpected EOF reading DATE".into()));
1758                } else {
1759                    // 3 bytes little-endian days since 0001-01-01
1760                    let days = buf.get_u8() as u32
1761                        | ((buf.get_u8() as u32) << 8)
1762                        | ((buf.get_u8() as u32) << 16);
1763                    #[cfg(feature = "chrono")]
1764                    {
1765                        let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1766                        let date = base + chrono::Duration::days(days as i64);
1767                        SqlValue::Date(date)
1768                    }
1769                    #[cfg(not(feature = "chrono"))]
1770                    {
1771                        SqlValue::String(format!("DATE({days})"))
1772                    }
1773                }
1774            }
1775
1776            // TIME (variable length with scale, 1-byte length prefix)
1777            TypeId::Time => {
1778                if buf.remaining() < 1 {
1779                    return Err(Error::Protocol("unexpected EOF reading TIME length".into()));
1780                }
1781                let len = buf.get_u8() as usize;
1782                if len == 0 {
1783                    SqlValue::Null
1784                } else if buf.remaining() < len {
1785                    return Err(Error::Protocol("unexpected EOF reading TIME".into()));
1786                } else {
1787                    let scale = col.type_info.scale.unwrap_or(7);
1788                    let mut time_bytes = [0u8; 8];
1789                    for byte in time_bytes.iter_mut().take(len) {
1790                        *byte = buf.get_u8();
1791                    }
1792                    let intervals = u64::from_le_bytes(time_bytes);
1793                    #[cfg(feature = "chrono")]
1794                    {
1795                        let time = Self::intervals_to_time(intervals, scale);
1796                        SqlValue::Time(time)
1797                    }
1798                    #[cfg(not(feature = "chrono"))]
1799                    {
1800                        SqlValue::String(format!("TIME({intervals})"))
1801                    }
1802                }
1803            }
1804
1805            // DATETIME2 (variable length: TIME bytes + 3 bytes date, 1-byte length prefix)
1806            TypeId::DateTime2 => {
1807                if buf.remaining() < 1 {
1808                    return Err(Error::Protocol(
1809                        "unexpected EOF reading DATETIME2 length".into(),
1810                    ));
1811                }
1812                let len = buf.get_u8() as usize;
1813                if len == 0 {
1814                    SqlValue::Null
1815                } else if buf.remaining() < len {
1816                    return Err(Error::Protocol("unexpected EOF reading DATETIME2".into()));
1817                } else {
1818                    let scale = col.type_info.scale.unwrap_or(7);
1819                    let time_len = Self::time_bytes_for_scale(scale);
1820
1821                    // Read time
1822                    let mut time_bytes = [0u8; 8];
1823                    for byte in time_bytes.iter_mut().take(time_len) {
1824                        *byte = buf.get_u8();
1825                    }
1826                    let intervals = u64::from_le_bytes(time_bytes);
1827
1828                    // Read date (3 bytes)
1829                    let days = buf.get_u8() as u32
1830                        | ((buf.get_u8() as u32) << 8)
1831                        | ((buf.get_u8() as u32) << 16);
1832
1833                    #[cfg(feature = "chrono")]
1834                    {
1835                        let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1836                        let date = base + chrono::Duration::days(days as i64);
1837                        let time = Self::intervals_to_time(intervals, scale);
1838                        SqlValue::DateTime(date.and_time(time))
1839                    }
1840                    #[cfg(not(feature = "chrono"))]
1841                    {
1842                        SqlValue::String(format!("DATETIME2({days},{intervals})"))
1843                    }
1844                }
1845            }
1846
1847            // DATETIMEOFFSET (variable length: TIME bytes + 3 bytes date + 2 bytes offset)
1848            TypeId::DateTimeOffset => {
1849                if buf.remaining() < 1 {
1850                    return Err(Error::Protocol(
1851                        "unexpected EOF reading DATETIMEOFFSET length".into(),
1852                    ));
1853                }
1854                let len = buf.get_u8() as usize;
1855                if len == 0 {
1856                    SqlValue::Null
1857                } else if buf.remaining() < len {
1858                    return Err(Error::Protocol(
1859                        "unexpected EOF reading DATETIMEOFFSET".into(),
1860                    ));
1861                } else {
1862                    let scale = col.type_info.scale.unwrap_or(7);
1863                    let time_len = Self::time_bytes_for_scale(scale);
1864
1865                    // Read time
1866                    let mut time_bytes = [0u8; 8];
1867                    for byte in time_bytes.iter_mut().take(time_len) {
1868                        *byte = buf.get_u8();
1869                    }
1870                    let intervals = u64::from_le_bytes(time_bytes);
1871
1872                    // Read date (3 bytes)
1873                    let days = buf.get_u8() as u32
1874                        | ((buf.get_u8() as u32) << 8)
1875                        | ((buf.get_u8() as u32) << 16);
1876
1877                    // Read offset in minutes (2 bytes, signed)
1878                    let offset_minutes = buf.get_i16_le();
1879
1880                    #[cfg(feature = "chrono")]
1881                    {
1882                        use chrono::TimeZone;
1883                        let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
1884                        let date = base + chrono::Duration::days(days as i64);
1885                        let time = Self::intervals_to_time(intervals, scale);
1886                        let offset = chrono::FixedOffset::east_opt((offset_minutes as i32) * 60)
1887                            .unwrap_or_else(|| chrono::FixedOffset::east_opt(0).unwrap());
1888                        let datetime = offset
1889                            .from_local_datetime(&date.and_time(time))
1890                            .single()
1891                            .unwrap_or_else(|| offset.from_utc_datetime(&date.and_time(time)));
1892                        SqlValue::DateTimeOffset(datetime)
1893                    }
1894                    #[cfg(not(feature = "chrono"))]
1895                    {
1896                        SqlValue::String(format!(
1897                            "DATETIMEOFFSET({days},{intervals},{offset_minutes})"
1898                        ))
1899                    }
1900                }
1901            }
1902
1903            // TEXT type - always uses PLP encoding (deprecated LOB type)
1904            TypeId::Text => Self::parse_plp_varchar(buf)?,
1905
1906            // Legacy byte-length string types (Char, VarChar) - 1-byte length prefix
1907            TypeId::Char | TypeId::VarChar => {
1908                if buf.remaining() < 1 {
1909                    return Err(Error::Protocol(
1910                        "unexpected EOF reading legacy varchar length".into(),
1911                    ));
1912                }
1913                let len = buf.get_u8();
1914                if len == 0xFF {
1915                    SqlValue::Null
1916                } else if len == 0 {
1917                    SqlValue::String(String::new())
1918                } else if buf.remaining() < len as usize {
1919                    return Err(Error::Protocol(
1920                        "unexpected EOF reading legacy varchar data".into(),
1921                    ));
1922                } else {
1923                    let data = &buf[..len as usize];
1924                    let s = String::from_utf8_lossy(data).into_owned();
1925                    buf.advance(len as usize);
1926                    SqlValue::String(s)
1927                }
1928            }
1929
1930            // Variable-length string types (BigVarChar, BigChar)
1931            TypeId::BigVarChar | TypeId::BigChar => {
1932                // Check if this is a MAX type (uses PLP encoding)
1933                if col.type_info.max_length == Some(0xFFFF) {
1934                    // PLP format: 8-byte total length, then chunks
1935                    Self::parse_plp_varchar(buf)?
1936                } else {
1937                    // 2-byte length prefix for non-MAX types
1938                    if buf.remaining() < 2 {
1939                        return Err(Error::Protocol(
1940                            "unexpected EOF reading varchar length".into(),
1941                        ));
1942                    }
1943                    let len = buf.get_u16_le();
1944                    if len == 0xFFFF {
1945                        SqlValue::Null
1946                    } else if buf.remaining() < len as usize {
1947                        return Err(Error::Protocol(
1948                            "unexpected EOF reading varchar data".into(),
1949                        ));
1950                    } else {
1951                        let data = &buf[..len as usize];
1952                        let s = String::from_utf8_lossy(data).into_owned();
1953                        buf.advance(len as usize);
1954                        SqlValue::String(s)
1955                    }
1956                }
1957            }
1958
1959            // NTEXT type - always uses PLP encoding (deprecated LOB type)
1960            TypeId::NText => Self::parse_plp_nvarchar(buf)?,
1961
1962            // Variable-length Unicode string types (NVarChar, NChar)
1963            TypeId::NVarChar | TypeId::NChar => {
1964                // Check if this is a MAX type (uses PLP encoding)
1965                if col.type_info.max_length == Some(0xFFFF) {
1966                    // PLP format: 8-byte total length, then chunks
1967                    Self::parse_plp_nvarchar(buf)?
1968                } else {
1969                    // 2-byte length prefix (in bytes, not chars) for non-MAX types
1970                    if buf.remaining() < 2 {
1971                        return Err(Error::Protocol(
1972                            "unexpected EOF reading nvarchar length".into(),
1973                        ));
1974                    }
1975                    let len = buf.get_u16_le();
1976                    if len == 0xFFFF {
1977                        SqlValue::Null
1978                    } else if buf.remaining() < len as usize {
1979                        return Err(Error::Protocol(
1980                            "unexpected EOF reading nvarchar data".into(),
1981                        ));
1982                    } else {
1983                        let data = &buf[..len as usize];
1984                        // UTF-16LE to String
1985                        let utf16: Vec<u16> = data
1986                            .chunks_exact(2)
1987                            .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
1988                            .collect();
1989                        let s = String::from_utf16(&utf16)
1990                            .map_err(|_| Error::Protocol("invalid UTF-16 in nvarchar".into()))?;
1991                        buf.advance(len as usize);
1992                        SqlValue::String(s)
1993                    }
1994                }
1995            }
1996
1997            // IMAGE type - always uses PLP encoding (deprecated LOB type)
1998            TypeId::Image => Self::parse_plp_varbinary(buf)?,
1999
2000            // Legacy byte-length binary types (Binary, VarBinary) - 1-byte length prefix
2001            TypeId::Binary | TypeId::VarBinary => {
2002                if buf.remaining() < 1 {
2003                    return Err(Error::Protocol(
2004                        "unexpected EOF reading legacy varbinary length".into(),
2005                    ));
2006                }
2007                let len = buf.get_u8();
2008                if len == 0xFF {
2009                    SqlValue::Null
2010                } else if len == 0 {
2011                    SqlValue::Binary(bytes::Bytes::new())
2012                } else if buf.remaining() < len as usize {
2013                    return Err(Error::Protocol(
2014                        "unexpected EOF reading legacy varbinary data".into(),
2015                    ));
2016                } else {
2017                    let data = bytes::Bytes::copy_from_slice(&buf[..len as usize]);
2018                    buf.advance(len as usize);
2019                    SqlValue::Binary(data)
2020                }
2021            }
2022
2023            // Variable-length binary types (BigVarBinary, BigBinary)
2024            TypeId::BigVarBinary | TypeId::BigBinary => {
2025                // Check if this is a MAX type (uses PLP encoding)
2026                if col.type_info.max_length == Some(0xFFFF) {
2027                    // PLP format: 8-byte total length, then chunks
2028                    Self::parse_plp_varbinary(buf)?
2029                } else {
2030                    if buf.remaining() < 2 {
2031                        return Err(Error::Protocol(
2032                            "unexpected EOF reading varbinary length".into(),
2033                        ));
2034                    }
2035                    let len = buf.get_u16_le();
2036                    if len == 0xFFFF {
2037                        SqlValue::Null
2038                    } else if buf.remaining() < len as usize {
2039                        return Err(Error::Protocol(
2040                            "unexpected EOF reading varbinary data".into(),
2041                        ));
2042                    } else {
2043                        let data = bytes::Bytes::copy_from_slice(&buf[..len as usize]);
2044                        buf.advance(len as usize);
2045                        SqlValue::Binary(data)
2046                    }
2047                }
2048            }
2049
2050            // XML type - always uses PLP encoding
2051            TypeId::Xml => {
2052                // Parse as PLP NVARCHAR (XML is UTF-16 encoded in TDS)
2053                match Self::parse_plp_nvarchar(buf)? {
2054                    SqlValue::Null => SqlValue::Null,
2055                    SqlValue::String(s) => SqlValue::Xml(s),
2056                    _ => {
2057                        return Err(Error::Protocol(
2058                            "unexpected value type when parsing XML".into(),
2059                        ));
2060                    }
2061                }
2062            }
2063
2064            // GUID/UniqueIdentifier
2065            TypeId::Guid => {
2066                if buf.remaining() < 1 {
2067                    return Err(Error::Protocol("unexpected EOF reading GUID length".into()));
2068                }
2069                let len = buf.get_u8();
2070                if len == 0 {
2071                    SqlValue::Null
2072                } else if len != 16 {
2073                    return Err(Error::Protocol(format!("invalid GUID length: {len}")));
2074                } else if buf.remaining() < 16 {
2075                    return Err(Error::Protocol("unexpected EOF reading GUID".into()));
2076                } else {
2077                    // SQL Server stores GUIDs in mixed-endian format
2078                    let data = bytes::Bytes::copy_from_slice(&buf[..16]);
2079                    buf.advance(16);
2080                    SqlValue::Binary(data)
2081                }
2082            }
2083
2084            // SQL_VARIANT - contains embedded type info
2085            TypeId::Variant => Self::parse_sql_variant(buf)?,
2086
2087            // UDT (User-Defined Type) - uses PLP encoding, return as binary
2088            TypeId::Udt => Self::parse_plp_varbinary(buf)?,
2089
2090            // Default: treat as binary with 2-byte length prefix
2091            _ => {
2092                // Try to read as variable-length with 2-byte length
2093                if buf.remaining() < 2 {
2094                    return Err(Error::Protocol(format!(
2095                        "unexpected EOF reading {:?}",
2096                        col.type_id
2097                    )));
2098                }
2099                let len = buf.get_u16_le();
2100                if len == 0xFFFF {
2101                    SqlValue::Null
2102                } else if buf.remaining() < len as usize {
2103                    return Err(Error::Protocol(format!(
2104                        "unexpected EOF reading {:?} data",
2105                        col.type_id
2106                    )));
2107                } else {
2108                    let data = bytes::Bytes::copy_from_slice(&buf[..len as usize]);
2109                    buf.advance(len as usize);
2110                    SqlValue::Binary(data)
2111                }
2112            }
2113        };
2114
2115        Ok(value)
2116    }
2117
2118    /// Parse PLP-encoded NVARCHAR(MAX) data.
2119    ///
2120    /// PLP format stored by decode_plp_type:
2121    /// - 8-byte total length (0xFFFFFFFFFFFFFFFF = NULL)
2122    /// - Chunks: 4-byte chunk length + chunk data, terminated by 0 length
2123    fn parse_plp_nvarchar(buf: &mut &[u8]) -> Result<mssql_types::SqlValue> {
2124        use bytes::Buf;
2125        use mssql_types::SqlValue;
2126
2127        if buf.remaining() < 8 {
2128            return Err(Error::Protocol(
2129                "unexpected EOF reading PLP total length".into(),
2130            ));
2131        }
2132
2133        let total_len = buf.get_u64_le();
2134        if total_len == 0xFFFFFFFFFFFFFFFF {
2135            return Ok(SqlValue::Null);
2136        }
2137
2138        // Read all chunks and concatenate the data
2139        let mut all_data = Vec::new();
2140        loop {
2141            if buf.remaining() < 4 {
2142                return Err(Error::Protocol(
2143                    "unexpected EOF reading PLP chunk length".into(),
2144                ));
2145            }
2146            let chunk_len = buf.get_u32_le() as usize;
2147            if chunk_len == 0 {
2148                break; // End of PLP data
2149            }
2150            if buf.remaining() < chunk_len {
2151                return Err(Error::Protocol(
2152                    "unexpected EOF reading PLP chunk data".into(),
2153                ));
2154            }
2155            all_data.extend_from_slice(&buf[..chunk_len]);
2156            buf.advance(chunk_len);
2157        }
2158
2159        // Convert UTF-16LE to String
2160        let utf16: Vec<u16> = all_data
2161            .chunks_exact(2)
2162            .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
2163            .collect();
2164        let s = String::from_utf16(&utf16)
2165            .map_err(|_| Error::Protocol("invalid UTF-16 in PLP nvarchar".into()))?;
2166        Ok(SqlValue::String(s))
2167    }
2168
2169    /// Parse PLP-encoded VARCHAR(MAX) data.
2170    fn parse_plp_varchar(buf: &mut &[u8]) -> Result<mssql_types::SqlValue> {
2171        use bytes::Buf;
2172        use mssql_types::SqlValue;
2173
2174        if buf.remaining() < 8 {
2175            return Err(Error::Protocol(
2176                "unexpected EOF reading PLP total length".into(),
2177            ));
2178        }
2179
2180        let total_len = buf.get_u64_le();
2181        if total_len == 0xFFFFFFFFFFFFFFFF {
2182            return Ok(SqlValue::Null);
2183        }
2184
2185        // Read all chunks and concatenate the data
2186        let mut all_data = Vec::new();
2187        loop {
2188            if buf.remaining() < 4 {
2189                return Err(Error::Protocol(
2190                    "unexpected EOF reading PLP chunk length".into(),
2191                ));
2192            }
2193            let chunk_len = buf.get_u32_le() as usize;
2194            if chunk_len == 0 {
2195                break; // End of PLP data
2196            }
2197            if buf.remaining() < chunk_len {
2198                return Err(Error::Protocol(
2199                    "unexpected EOF reading PLP chunk data".into(),
2200                ));
2201            }
2202            all_data.extend_from_slice(&buf[..chunk_len]);
2203            buf.advance(chunk_len);
2204        }
2205
2206        // VARCHAR is UTF-8/ASCII
2207        let s = String::from_utf8_lossy(&all_data).into_owned();
2208        Ok(SqlValue::String(s))
2209    }
2210
2211    /// Parse PLP-encoded VARBINARY(MAX) data.
2212    fn parse_plp_varbinary(buf: &mut &[u8]) -> Result<mssql_types::SqlValue> {
2213        use bytes::Buf;
2214        use mssql_types::SqlValue;
2215
2216        if buf.remaining() < 8 {
2217            return Err(Error::Protocol(
2218                "unexpected EOF reading PLP total length".into(),
2219            ));
2220        }
2221
2222        let total_len = buf.get_u64_le();
2223        if total_len == 0xFFFFFFFFFFFFFFFF {
2224            return Ok(SqlValue::Null);
2225        }
2226
2227        // Read all chunks and concatenate the data
2228        let mut all_data = Vec::new();
2229        loop {
2230            if buf.remaining() < 4 {
2231                return Err(Error::Protocol(
2232                    "unexpected EOF reading PLP chunk length".into(),
2233                ));
2234            }
2235            let chunk_len = buf.get_u32_le() as usize;
2236            if chunk_len == 0 {
2237                break; // End of PLP data
2238            }
2239            if buf.remaining() < chunk_len {
2240                return Err(Error::Protocol(
2241                    "unexpected EOF reading PLP chunk data".into(),
2242                ));
2243            }
2244            all_data.extend_from_slice(&buf[..chunk_len]);
2245            buf.advance(chunk_len);
2246        }
2247
2248        Ok(SqlValue::Binary(bytes::Bytes::from(all_data)))
2249    }
2250
2251    /// Parse SQL_VARIANT data which contains embedded type information.
2252    ///
2253    /// SQL_VARIANT format:
2254    /// - 4 bytes: total length (0 = NULL)
2255    /// - 1 byte: base type ID
2256    /// - 1 byte: property byte count
2257    /// - N bytes: type-specific properties
2258    /// - Remaining bytes: actual data
2259    fn parse_sql_variant(buf: &mut &[u8]) -> Result<mssql_types::SqlValue> {
2260        use bytes::Buf;
2261        use mssql_types::SqlValue;
2262
2263        // Read 4-byte length
2264        if buf.remaining() < 4 {
2265            return Err(Error::Protocol(
2266                "unexpected EOF reading SQL_VARIANT length".into(),
2267            ));
2268        }
2269        let total_len = buf.get_u32_le() as usize;
2270
2271        if total_len == 0 {
2272            return Ok(SqlValue::Null);
2273        }
2274
2275        if buf.remaining() < total_len {
2276            return Err(Error::Protocol(
2277                "unexpected EOF reading SQL_VARIANT data".into(),
2278            ));
2279        }
2280
2281        // Read type info
2282        if total_len < 2 {
2283            return Err(Error::Protocol(
2284                "SQL_VARIANT too short for type info".into(),
2285            ));
2286        }
2287
2288        let base_type = buf.get_u8();
2289        let prop_count = buf.get_u8() as usize;
2290
2291        if buf.remaining() < prop_count {
2292            return Err(Error::Protocol(
2293                "unexpected EOF reading SQL_VARIANT properties".into(),
2294            ));
2295        }
2296
2297        // Data length is total_len - 2 (type, prop_count) - prop_count
2298        let data_len = total_len.saturating_sub(2).saturating_sub(prop_count);
2299
2300        // Parse based on base type
2301        // See MS-TDS SQL_VARIANT specification for type mappings
2302        match base_type {
2303            // Integer types (no properties)
2304            0x30 => {
2305                // TINYINT
2306                buf.advance(prop_count);
2307                if data_len < 1 {
2308                    return Ok(SqlValue::Null);
2309                }
2310                let v = buf.get_u8();
2311                Ok(SqlValue::TinyInt(v))
2312            }
2313            0x32 => {
2314                // BIT
2315                buf.advance(prop_count);
2316                if data_len < 1 {
2317                    return Ok(SqlValue::Null);
2318                }
2319                let v = buf.get_u8();
2320                Ok(SqlValue::Bool(v != 0))
2321            }
2322            0x34 => {
2323                // SMALLINT
2324                buf.advance(prop_count);
2325                if data_len < 2 {
2326                    return Ok(SqlValue::Null);
2327                }
2328                let v = buf.get_i16_le();
2329                Ok(SqlValue::SmallInt(v))
2330            }
2331            0x38 => {
2332                // INT
2333                buf.advance(prop_count);
2334                if data_len < 4 {
2335                    return Ok(SqlValue::Null);
2336                }
2337                let v = buf.get_i32_le();
2338                Ok(SqlValue::Int(v))
2339            }
2340            0x7F => {
2341                // BIGINT
2342                buf.advance(prop_count);
2343                if data_len < 8 {
2344                    return Ok(SqlValue::Null);
2345                }
2346                let v = buf.get_i64_le();
2347                Ok(SqlValue::BigInt(v))
2348            }
2349            0x6D => {
2350                // FLOATN - 1 prop byte (length)
2351                let float_len = if prop_count >= 1 { buf.get_u8() } else { 8 };
2352                buf.advance(prop_count.saturating_sub(1));
2353
2354                if float_len == 4 && data_len >= 4 {
2355                    let v = buf.get_f32_le();
2356                    Ok(SqlValue::Float(v))
2357                } else if data_len >= 8 {
2358                    let v = buf.get_f64_le();
2359                    Ok(SqlValue::Double(v))
2360                } else {
2361                    Ok(SqlValue::Null)
2362                }
2363            }
2364            0x6E => {
2365                // MONEYN - 1 prop byte (length)
2366                let money_len = if prop_count >= 1 { buf.get_u8() } else { 8 };
2367                buf.advance(prop_count.saturating_sub(1));
2368
2369                if money_len == 4 && data_len >= 4 {
2370                    let raw = buf.get_i32_le();
2371                    let value = raw as f64 / 10000.0;
2372                    Ok(SqlValue::Double(value))
2373                } else if data_len >= 8 {
2374                    let high = buf.get_i32_le() as i64;
2375                    let low = buf.get_u32_le() as i64;
2376                    let raw = (high << 32) | low;
2377                    let value = raw as f64 / 10000.0;
2378                    Ok(SqlValue::Double(value))
2379                } else {
2380                    Ok(SqlValue::Null)
2381                }
2382            }
2383            0x6F => {
2384                // DATETIMEN - 1 prop byte (length)
2385                let dt_len = if prop_count >= 1 { buf.get_u8() } else { 8 };
2386                buf.advance(prop_count.saturating_sub(1));
2387
2388                #[cfg(feature = "chrono")]
2389                {
2390                    use chrono::NaiveDate;
2391                    if dt_len == 4 && data_len >= 4 {
2392                        // SMALLDATETIME
2393                        let days = buf.get_u16_le() as i64;
2394                        let mins = buf.get_u16_le() as u32;
2395                        let base = NaiveDate::from_ymd_opt(1900, 1, 1)
2396                            .unwrap()
2397                            .and_hms_opt(0, 0, 0)
2398                            .unwrap();
2399                        let dt = base
2400                            + chrono::Duration::days(days)
2401                            + chrono::Duration::minutes(mins as i64);
2402                        Ok(SqlValue::DateTime(dt))
2403                    } else if data_len >= 8 {
2404                        // DATETIME
2405                        let days = buf.get_i32_le() as i64;
2406                        let ticks = buf.get_u32_le() as i64;
2407                        let base = NaiveDate::from_ymd_opt(1900, 1, 1)
2408                            .unwrap()
2409                            .and_hms_opt(0, 0, 0)
2410                            .unwrap();
2411                        let millis = (ticks * 10) / 3;
2412                        let dt = base
2413                            + chrono::Duration::days(days)
2414                            + chrono::Duration::milliseconds(millis);
2415                        Ok(SqlValue::DateTime(dt))
2416                    } else {
2417                        Ok(SqlValue::Null)
2418                    }
2419                }
2420                #[cfg(not(feature = "chrono"))]
2421                {
2422                    buf.advance(data_len);
2423                    Ok(SqlValue::Null)
2424                }
2425            }
2426            0x6A | 0x6C => {
2427                // DECIMALN/NUMERICN - 2 prop bytes (precision, scale)
2428                let _precision = if prop_count >= 1 { buf.get_u8() } else { 18 };
2429                let scale = if prop_count >= 2 { buf.get_u8() } else { 0 };
2430                buf.advance(prop_count.saturating_sub(2));
2431
2432                if data_len < 1 {
2433                    return Ok(SqlValue::Null);
2434                }
2435
2436                let sign = buf.get_u8();
2437                let mantissa_len = data_len - 1;
2438
2439                if mantissa_len > 16 {
2440                    // Too large, skip and return null
2441                    buf.advance(mantissa_len);
2442                    return Ok(SqlValue::Null);
2443                }
2444
2445                let mut mantissa_bytes = [0u8; 16];
2446                for i in 0..mantissa_len.min(16) {
2447                    mantissa_bytes[i] = buf.get_u8();
2448                }
2449                let mantissa = u128::from_le_bytes(mantissa_bytes);
2450
2451                #[cfg(feature = "decimal")]
2452                {
2453                    use rust_decimal::Decimal;
2454                    if scale > 28 {
2455                        // Fall back to f64
2456                        let divisor = 10f64.powi(scale as i32);
2457                        let value = (mantissa as f64) / divisor;
2458                        let value = if sign == 0 { -value } else { value };
2459                        Ok(SqlValue::Double(value))
2460                    } else {
2461                        let mut decimal =
2462                            Decimal::from_i128_with_scale(mantissa as i128, scale as u32);
2463                        if sign == 0 {
2464                            decimal.set_sign_negative(true);
2465                        }
2466                        Ok(SqlValue::Decimal(decimal))
2467                    }
2468                }
2469                #[cfg(not(feature = "decimal"))]
2470                {
2471                    let divisor = 10f64.powi(scale as i32);
2472                    let value = (mantissa as f64) / divisor;
2473                    let value = if sign == 0 { -value } else { value };
2474                    Ok(SqlValue::Double(value))
2475                }
2476            }
2477            0x24 => {
2478                // UNIQUEIDENTIFIER (no properties)
2479                buf.advance(prop_count);
2480                if data_len < 16 {
2481                    return Ok(SqlValue::Null);
2482                }
2483                let mut guid_bytes = [0u8; 16];
2484                for byte in &mut guid_bytes {
2485                    *byte = buf.get_u8();
2486                }
2487                Ok(SqlValue::Binary(bytes::Bytes::copy_from_slice(&guid_bytes)))
2488            }
2489            0x28 => {
2490                // DATE (no properties)
2491                buf.advance(prop_count);
2492                #[cfg(feature = "chrono")]
2493                {
2494                    if data_len < 3 {
2495                        return Ok(SqlValue::Null);
2496                    }
2497                    let mut date_bytes = [0u8; 4];
2498                    date_bytes[0] = buf.get_u8();
2499                    date_bytes[1] = buf.get_u8();
2500                    date_bytes[2] = buf.get_u8();
2501                    let days = u32::from_le_bytes(date_bytes);
2502                    let base = chrono::NaiveDate::from_ymd_opt(1, 1, 1).unwrap();
2503                    let date = base + chrono::Duration::days(days as i64);
2504                    Ok(SqlValue::Date(date))
2505                }
2506                #[cfg(not(feature = "chrono"))]
2507                {
2508                    buf.advance(data_len);
2509                    Ok(SqlValue::Null)
2510                }
2511            }
2512            0xA7 | 0x2F | 0x27 => {
2513                // BigVarChar/BigChar/VarChar/Char - 7 prop bytes (collation 5 + maxlen 2)
2514                buf.advance(prop_count);
2515                if data_len == 0 {
2516                    return Ok(SqlValue::String(String::new()));
2517                }
2518                let data = &buf[..data_len];
2519                let s = String::from_utf8_lossy(data).into_owned();
2520                buf.advance(data_len);
2521                Ok(SqlValue::String(s))
2522            }
2523            0xE7 | 0xEF => {
2524                // NVarChar/NChar - 7 prop bytes (collation 5 + maxlen 2)
2525                buf.advance(prop_count);
2526                if data_len == 0 {
2527                    return Ok(SqlValue::String(String::new()));
2528                }
2529                // UTF-16LE encoded
2530                let utf16: Vec<u16> = buf[..data_len]
2531                    .chunks_exact(2)
2532                    .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
2533                    .collect();
2534                buf.advance(data_len);
2535                let s = String::from_utf16(&utf16).map_err(|_| {
2536                    Error::Protocol("invalid UTF-16 in SQL_VARIANT nvarchar".into())
2537                })?;
2538                Ok(SqlValue::String(s))
2539            }
2540            0xA5 | 0x2D | 0x25 => {
2541                // BigVarBinary/BigBinary/Binary/VarBinary - 2 prop bytes (maxlen)
2542                buf.advance(prop_count);
2543                let data = bytes::Bytes::copy_from_slice(&buf[..data_len]);
2544                buf.advance(data_len);
2545                Ok(SqlValue::Binary(data))
2546            }
2547            _ => {
2548                // Unknown type - return as binary
2549                buf.advance(prop_count);
2550                let data = bytes::Bytes::copy_from_slice(&buf[..data_len]);
2551                buf.advance(data_len);
2552                Ok(SqlValue::Binary(data))
2553            }
2554        }
2555    }
2556
2557    /// Calculate number of bytes needed for TIME based on scale.
2558    fn time_bytes_for_scale(scale: u8) -> usize {
2559        match scale {
2560            0..=2 => 3,
2561            3..=4 => 4,
2562            5..=7 => 5,
2563            _ => 5, // Default to max precision
2564        }
2565    }
2566
2567    /// Convert 100-nanosecond intervals to NaiveTime.
2568    #[cfg(feature = "chrono")]
2569    fn intervals_to_time(intervals: u64, scale: u8) -> chrono::NaiveTime {
2570        // Scale determines the unit:
2571        // scale 0: seconds
2572        // scale 1: 100ms
2573        // scale 2: 10ms
2574        // scale 3: 1ms
2575        // scale 4: 100us
2576        // scale 5: 10us
2577        // scale 6: 1us
2578        // scale 7: 100ns
2579        let nanos = match scale {
2580            0 => intervals * 1_000_000_000,
2581            1 => intervals * 100_000_000,
2582            2 => intervals * 10_000_000,
2583            3 => intervals * 1_000_000,
2584            4 => intervals * 100_000,
2585            5 => intervals * 10_000,
2586            6 => intervals * 1_000,
2587            7 => intervals * 100,
2588            _ => intervals * 100,
2589        };
2590
2591        let secs = (nanos / 1_000_000_000) as u32;
2592        let nano_part = (nanos % 1_000_000_000) as u32;
2593
2594        chrono::NaiveTime::from_num_seconds_from_midnight_opt(secs, nano_part)
2595            .unwrap_or_else(|| chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap())
2596    }
2597
2598    /// Read execute result (row count) from the response.
2599    async fn read_execute_result(&mut self) -> Result<u64> {
2600        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
2601
2602        let message = match connection {
2603            ConnectionHandle::Tls(conn) => conn
2604                .read_message()
2605                .await
2606                .map_err(|e| Error::Protocol(e.to_string()))?,
2607            ConnectionHandle::TlsPrelogin(conn) => conn
2608                .read_message()
2609                .await
2610                .map_err(|e| Error::Protocol(e.to_string()))?,
2611            ConnectionHandle::Plain(conn) => conn
2612                .read_message()
2613                .await
2614                .map_err(|e| Error::Protocol(e.to_string()))?,
2615        }
2616        .ok_or(Error::ConnectionClosed)?;
2617
2618        let mut parser = TokenParser::new(message.payload);
2619        let mut rows_affected = 0u64;
2620        let mut current_metadata: Option<ColMetaData> = None;
2621
2622        loop {
2623            // Use metadata-aware parsing to handle Row tokens from SELECT statements
2624            let token = parser
2625                .next_token_with_metadata(current_metadata.as_ref())
2626                .map_err(|e| Error::Protocol(e.to_string()))?;
2627
2628            let Some(token) = token else {
2629                break;
2630            };
2631
2632            match token {
2633                Token::ColMetaData(meta) => {
2634                    // Store metadata for subsequent Row token parsing
2635                    current_metadata = Some(meta);
2636                }
2637                Token::Row(_) | Token::NbcRow(_) => {
2638                    // Skip row data for execute() - we only care about row count
2639                    // The rows are parsed but we don't process them
2640                }
2641                Token::Done(done) => {
2642                    if done.status.error {
2643                        return Err(Error::Query("execution failed".to_string()));
2644                    }
2645                    if done.status.count {
2646                        // Accumulate row counts from all statements in a batch
2647                        rows_affected += done.row_count;
2648                    }
2649                    // Only break if there are no more result sets
2650                    // This enables multi-statement batches to report total affected rows
2651                    if !done.status.more {
2652                        break;
2653                    }
2654                }
2655                Token::DoneProc(done) => {
2656                    if done.status.count {
2657                        rows_affected += done.row_count;
2658                    }
2659                }
2660                Token::DoneInProc(done) => {
2661                    if done.status.count {
2662                        rows_affected += done.row_count;
2663                    }
2664                }
2665                Token::Error(err) => {
2666                    return Err(Error::Server {
2667                        number: err.number,
2668                        state: err.state,
2669                        class: err.class,
2670                        message: err.message.clone(),
2671                        server: if err.server.is_empty() {
2672                            None
2673                        } else {
2674                            Some(err.server.clone())
2675                        },
2676                        procedure: if err.procedure.is_empty() {
2677                            None
2678                        } else {
2679                            Some(err.procedure.clone())
2680                        },
2681                        line: err.line as u32,
2682                    });
2683                }
2684                Token::Info(info) => {
2685                    tracing::info!(
2686                        number = info.number,
2687                        message = %info.message,
2688                        "server info message"
2689                    );
2690                }
2691                Token::EnvChange(env) => {
2692                    // Process transaction-related EnvChange tokens.
2693                    // This allows BEGIN TRANSACTION, COMMIT, ROLLBACK via raw SQL
2694                    // to properly update the transaction descriptor.
2695                    Self::process_transaction_env_change(&env, &mut self.transaction_descriptor);
2696                }
2697                _ => {}
2698            }
2699        }
2700
2701        Ok(rows_affected)
2702    }
2703
2704    /// Read the response from BEGIN TRANSACTION and extract the transaction descriptor.
2705    ///
2706    /// Per MS-TDS spec, the server sends a BeginTransaction EnvChange token containing
2707    /// the transaction descriptor (8-byte value) that must be included in subsequent
2708    /// ALL_HEADERS sections for requests within this transaction.
2709    async fn read_transaction_begin_result(&mut self) -> Result<u64> {
2710        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
2711
2712        let message = match connection {
2713            ConnectionHandle::Tls(conn) => conn
2714                .read_message()
2715                .await
2716                .map_err(|e| Error::Protocol(e.to_string()))?,
2717            ConnectionHandle::TlsPrelogin(conn) => conn
2718                .read_message()
2719                .await
2720                .map_err(|e| Error::Protocol(e.to_string()))?,
2721            ConnectionHandle::Plain(conn) => conn
2722                .read_message()
2723                .await
2724                .map_err(|e| Error::Protocol(e.to_string()))?,
2725        }
2726        .ok_or(Error::ConnectionClosed)?;
2727
2728        let mut parser = TokenParser::new(message.payload);
2729        let mut transaction_descriptor: u64 = 0;
2730
2731        loop {
2732            let token = parser
2733                .next_token()
2734                .map_err(|e| Error::Protocol(e.to_string()))?;
2735
2736            let Some(token) = token else {
2737                break;
2738            };
2739
2740            match token {
2741                Token::EnvChange(env) => {
2742                    if env.env_type == EnvChangeType::BeginTransaction {
2743                        // Extract the transaction descriptor from the binary value
2744                        // Per MS-TDS spec, it's an 8-byte (ULONGLONG) value
2745                        if let tds_protocol::token::EnvChangeValue::Binary(ref data) = env.new_value
2746                        {
2747                            if data.len() >= 8 {
2748                                transaction_descriptor = u64::from_le_bytes([
2749                                    data[0], data[1], data[2], data[3], data[4], data[5], data[6],
2750                                    data[7],
2751                                ]);
2752                                tracing::debug!(
2753                                    transaction_descriptor =
2754                                        format!("0x{:016X}", transaction_descriptor),
2755                                    "transaction begun"
2756                                );
2757                            }
2758                        }
2759                    }
2760                }
2761                Token::Done(done) => {
2762                    if done.status.error {
2763                        return Err(Error::Query("BEGIN TRANSACTION failed".to_string()));
2764                    }
2765                    break;
2766                }
2767                Token::Error(err) => {
2768                    return Err(Error::Server {
2769                        number: err.number,
2770                        state: err.state,
2771                        class: err.class,
2772                        message: err.message.clone(),
2773                        server: if err.server.is_empty() {
2774                            None
2775                        } else {
2776                            Some(err.server.clone())
2777                        },
2778                        procedure: if err.procedure.is_empty() {
2779                            None
2780                        } else {
2781                            Some(err.procedure.clone())
2782                        },
2783                        line: err.line as u32,
2784                    });
2785                }
2786                Token::Info(info) => {
2787                    tracing::info!(
2788                        number = info.number,
2789                        message = %info.message,
2790                        "server info message"
2791                    );
2792                }
2793                _ => {}
2794            }
2795        }
2796
2797        Ok(transaction_descriptor)
2798    }
2799}
2800
2801impl Client<Ready> {
2802    /// Execute a query and return a streaming result set.
2803    ///
2804    /// Per ADR-007, results are streamed by default for memory efficiency.
2805    /// Use `.collect_all()` on the stream if you need all rows in memory.
2806    ///
2807    /// # Example
2808    ///
2809    /// ```rust,ignore
2810    /// use futures::StreamExt;
2811    ///
2812    /// // Streaming (memory-efficient)
2813    /// let mut stream = client.query("SELECT * FROM users WHERE id = @p1", &[&1]).await?;
2814    /// while let Some(row) = stream.next().await {
2815    ///     let row = row?;
2816    ///     process(&row);
2817    /// }
2818    ///
2819    /// // Buffered (loads all into memory)
2820    /// let rows: Vec<Row> = client
2821    ///     .query("SELECT * FROM small_table", &[])
2822    ///     .await?
2823    ///     .collect_all()
2824    ///     .await?;
2825    /// ```
2826    pub async fn query<'a>(
2827        &'a mut self,
2828        sql: &str,
2829        params: &[&(dyn crate::ToSql + Sync)],
2830    ) -> Result<QueryStream<'a>> {
2831        tracing::debug!(sql = sql, params_count = params.len(), "executing query");
2832
2833        #[cfg(feature = "otel")]
2834        let instrumentation = self.instrumentation.clone();
2835        #[cfg(feature = "otel")]
2836        let mut span = instrumentation.query_span(sql);
2837
2838        let result = async {
2839            if params.is_empty() {
2840                // Simple query without parameters - use SQL batch
2841                self.send_sql_batch(sql).await?;
2842            } else {
2843                // Parameterized query - use sp_executesql via RPC
2844                let rpc_params = Self::convert_params(params)?;
2845                let rpc = RpcRequest::execute_sql(sql, rpc_params);
2846                self.send_rpc(&rpc).await?;
2847            }
2848
2849            // Read complete response including columns and rows
2850            self.read_query_response().await
2851        }
2852        .await;
2853
2854        #[cfg(feature = "otel")]
2855        match &result {
2856            Ok(_) => InstrumentationContext::record_success(&mut span, None),
2857            Err(e) => InstrumentationContext::record_error(&mut span, e),
2858        }
2859
2860        // Drop the span before returning
2861        #[cfg(feature = "otel")]
2862        drop(span);
2863
2864        let (columns, rows) = result?;
2865        Ok(QueryStream::new(columns, rows))
2866    }
2867
2868    /// Execute a query with a specific timeout.
2869    ///
2870    /// This overrides the default `command_timeout` from the connection configuration
2871    /// for this specific query. If the query does not complete within the specified
2872    /// duration, an error is returned.
2873    ///
2874    /// # Arguments
2875    ///
2876    /// * `sql` - The SQL query to execute
2877    /// * `params` - Query parameters
2878    /// * `timeout_duration` - Maximum time to wait for the query to complete
2879    ///
2880    /// # Example
2881    ///
2882    /// ```rust,ignore
2883    /// use std::time::Duration;
2884    ///
2885    /// // Execute with a 5-second timeout
2886    /// let rows = client
2887    ///     .query_with_timeout(
2888    ///         "SELECT * FROM large_table",
2889    ///         &[],
2890    ///         Duration::from_secs(5),
2891    ///     )
2892    ///     .await?;
2893    /// ```
2894    pub async fn query_with_timeout<'a>(
2895        &'a mut self,
2896        sql: &str,
2897        params: &[&(dyn crate::ToSql + Sync)],
2898        timeout_duration: std::time::Duration,
2899    ) -> Result<QueryStream<'a>> {
2900        timeout(timeout_duration, self.query(sql, params))
2901            .await
2902            .map_err(|_| Error::CommandTimeout)?
2903    }
2904
2905    /// Execute a batch that may return multiple result sets.
2906    ///
2907    /// This is useful for stored procedures or SQL batches that contain
2908    /// multiple SELECT statements.
2909    ///
2910    /// # Example
2911    ///
2912    /// ```rust,ignore
2913    /// // Execute a batch with multiple SELECTs
2914    /// let mut results = client.query_multiple(
2915    ///     "SELECT 1 AS a; SELECT 2 AS b, 3 AS c;",
2916    ///     &[]
2917    /// ).await?;
2918    ///
2919    /// // Process first result set
2920    /// while let Some(row) = results.next_row().await? {
2921    ///     println!("Result 1: {:?}", row);
2922    /// }
2923    ///
2924    /// // Move to second result set
2925    /// if results.next_result().await? {
2926    ///     while let Some(row) = results.next_row().await? {
2927    ///         println!("Result 2: {:?}", row);
2928    ///     }
2929    /// }
2930    /// ```
2931    pub async fn query_multiple<'a>(
2932        &'a mut self,
2933        sql: &str,
2934        params: &[&(dyn crate::ToSql + Sync)],
2935    ) -> Result<MultiResultStream<'a>> {
2936        tracing::debug!(
2937            sql = sql,
2938            params_count = params.len(),
2939            "executing multi-result query"
2940        );
2941
2942        if params.is_empty() {
2943            // Simple batch without parameters - use SQL batch
2944            self.send_sql_batch(sql).await?;
2945        } else {
2946            // Parameterized query - use sp_executesql via RPC
2947            let rpc_params = Self::convert_params(params)?;
2948            let rpc = RpcRequest::execute_sql(sql, rpc_params);
2949            self.send_rpc(&rpc).await?;
2950        }
2951
2952        // Read all result sets
2953        let result_sets = self.read_multi_result_response().await?;
2954        Ok(MultiResultStream::new(result_sets))
2955    }
2956
2957    /// Read multiple result sets from a batch response.
2958    async fn read_multi_result_response(&mut self) -> Result<Vec<crate::stream::ResultSet>> {
2959        let connection = self.connection.as_mut().ok_or(Error::ConnectionClosed)?;
2960
2961        let message = match connection {
2962            ConnectionHandle::Tls(conn) => conn
2963                .read_message()
2964                .await
2965                .map_err(|e| Error::Protocol(e.to_string()))?,
2966            ConnectionHandle::TlsPrelogin(conn) => conn
2967                .read_message()
2968                .await
2969                .map_err(|e| Error::Protocol(e.to_string()))?,
2970            ConnectionHandle::Plain(conn) => conn
2971                .read_message()
2972                .await
2973                .map_err(|e| Error::Protocol(e.to_string()))?,
2974        }
2975        .ok_or(Error::ConnectionClosed)?;
2976
2977        let mut parser = TokenParser::new(message.payload);
2978        let mut result_sets: Vec<crate::stream::ResultSet> = Vec::new();
2979        let mut current_columns: Vec<crate::row::Column> = Vec::new();
2980        let mut current_rows: Vec<crate::row::Row> = Vec::new();
2981        let mut protocol_metadata: Option<ColMetaData> = None;
2982
2983        loop {
2984            let token = parser
2985                .next_token_with_metadata(protocol_metadata.as_ref())
2986                .map_err(|e| Error::Protocol(e.to_string()))?;
2987
2988            let Some(token) = token else {
2989                break;
2990            };
2991
2992            match token {
2993                Token::ColMetaData(meta) => {
2994                    // New result set starting - save the previous one if it has columns
2995                    if !current_columns.is_empty() {
2996                        result_sets.push(crate::stream::ResultSet::new(
2997                            std::mem::take(&mut current_columns),
2998                            std::mem::take(&mut current_rows),
2999                        ));
3000                    }
3001
3002                    // Parse the new column metadata
3003                    current_columns = meta
3004                        .columns
3005                        .iter()
3006                        .enumerate()
3007                        .map(|(i, col)| {
3008                            let type_name = format!("{:?}", col.type_id);
3009                            let mut column = crate::row::Column::new(&col.name, i, type_name)
3010                                .with_nullable(col.flags & 0x01 != 0);
3011
3012                            if let Some(max_len) = col.type_info.max_length {
3013                                column = column.with_max_length(max_len);
3014                            }
3015                            if let (Some(prec), Some(scale)) =
3016                                (col.type_info.precision, col.type_info.scale)
3017                            {
3018                                column = column.with_precision_scale(prec, scale);
3019                            }
3020                            column
3021                        })
3022                        .collect();
3023
3024                    tracing::debug!(
3025                        columns = current_columns.len(),
3026                        result_set = result_sets.len(),
3027                        "received column metadata for result set"
3028                    );
3029                    protocol_metadata = Some(meta);
3030                }
3031                Token::Row(raw_row) => {
3032                    if let Some(ref meta) = protocol_metadata {
3033                        let row = Self::convert_raw_row(&raw_row, meta, &current_columns)?;
3034                        current_rows.push(row);
3035                    }
3036                }
3037                Token::NbcRow(nbc_row) => {
3038                    if let Some(ref meta) = protocol_metadata {
3039                        let row = Self::convert_nbc_row(&nbc_row, meta, &current_columns)?;
3040                        current_rows.push(row);
3041                    }
3042                }
3043                Token::Error(err) => {
3044                    return Err(Error::Server {
3045                        number: err.number,
3046                        state: err.state,
3047                        class: err.class,
3048                        message: err.message.clone(),
3049                        server: if err.server.is_empty() {
3050                            None
3051                        } else {
3052                            Some(err.server.clone())
3053                        },
3054                        procedure: if err.procedure.is_empty() {
3055                            None
3056                        } else {
3057                            Some(err.procedure.clone())
3058                        },
3059                        line: err.line as u32,
3060                    });
3061                }
3062                Token::Done(done) => {
3063                    if done.status.error {
3064                        return Err(Error::Query("query failed".to_string()));
3065                    }
3066
3067                    // Save the current result set if we have columns
3068                    if !current_columns.is_empty() {
3069                        result_sets.push(crate::stream::ResultSet::new(
3070                            std::mem::take(&mut current_columns),
3071                            std::mem::take(&mut current_rows),
3072                        ));
3073                        protocol_metadata = None;
3074                    }
3075
3076                    // Check if there are more result sets
3077                    if !done.status.more {
3078                        tracing::debug!(result_sets = result_sets.len(), "all result sets parsed");
3079                        break;
3080                    }
3081                }
3082                Token::DoneInProc(done) => {
3083                    if done.status.error {
3084                        return Err(Error::Query("query failed".to_string()));
3085                    }
3086
3087                    // Save the current result set if we have columns (within stored proc)
3088                    if !current_columns.is_empty() {
3089                        result_sets.push(crate::stream::ResultSet::new(
3090                            std::mem::take(&mut current_columns),
3091                            std::mem::take(&mut current_rows),
3092                        ));
3093                        protocol_metadata = None;
3094                    }
3095
3096                    // DoneInProc may indicate more results within the batch
3097                    if !done.status.more {
3098                        // No more results from this statement, but batch may continue
3099                    }
3100                }
3101                Token::DoneProc(done) => {
3102                    if done.status.error {
3103                        return Err(Error::Query("query failed".to_string()));
3104                    }
3105                    // DoneProc marks end of stored procedure, not necessarily end of results
3106                }
3107                Token::Info(info) => {
3108                    tracing::debug!(
3109                        number = info.number,
3110                        message = %info.message,
3111                        "server info message"
3112                    );
3113                }
3114                _ => {}
3115            }
3116        }
3117
3118        // Don't forget any remaining result set that wasn't followed by Done
3119        if !current_columns.is_empty() {
3120            result_sets.push(crate::stream::ResultSet::new(current_columns, current_rows));
3121        }
3122
3123        Ok(result_sets)
3124    }
3125
3126    /// Execute a query that doesn't return rows.
3127    ///
3128    /// Returns the number of affected rows.
3129    pub async fn execute(
3130        &mut self,
3131        sql: &str,
3132        params: &[&(dyn crate::ToSql + Sync)],
3133    ) -> Result<u64> {
3134        tracing::debug!(
3135            sql = sql,
3136            params_count = params.len(),
3137            "executing statement"
3138        );
3139
3140        #[cfg(feature = "otel")]
3141        let instrumentation = self.instrumentation.clone();
3142        #[cfg(feature = "otel")]
3143        let mut span = instrumentation.query_span(sql);
3144
3145        let result = async {
3146            if params.is_empty() {
3147                // Simple statement without parameters - use SQL batch
3148                self.send_sql_batch(sql).await?;
3149            } else {
3150                // Parameterized statement - use sp_executesql via RPC
3151                let rpc_params = Self::convert_params(params)?;
3152                let rpc = RpcRequest::execute_sql(sql, rpc_params);
3153                self.send_rpc(&rpc).await?;
3154            }
3155
3156            // Read response and get row count
3157            self.read_execute_result().await
3158        }
3159        .await;
3160
3161        #[cfg(feature = "otel")]
3162        match &result {
3163            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
3164            Err(e) => InstrumentationContext::record_error(&mut span, e),
3165        }
3166
3167        // Drop the span before returning
3168        #[cfg(feature = "otel")]
3169        drop(span);
3170
3171        result
3172    }
3173
3174    /// Execute a statement with a specific timeout.
3175    ///
3176    /// This overrides the default `command_timeout` from the connection configuration
3177    /// for this specific statement. If the statement does not complete within the
3178    /// specified duration, an error is returned.
3179    ///
3180    /// # Arguments
3181    ///
3182    /// * `sql` - The SQL statement to execute
3183    /// * `params` - Statement parameters
3184    /// * `timeout_duration` - Maximum time to wait for the statement to complete
3185    ///
3186    /// # Example
3187    ///
3188    /// ```rust,ignore
3189    /// use std::time::Duration;
3190    ///
3191    /// // Execute with a 10-second timeout
3192    /// let rows_affected = client
3193    ///     .execute_with_timeout(
3194    ///         "UPDATE large_table SET status = @p1",
3195    ///         &[&"processed"],
3196    ///         Duration::from_secs(10),
3197    ///     )
3198    ///     .await?;
3199    /// ```
3200    pub async fn execute_with_timeout(
3201        &mut self,
3202        sql: &str,
3203        params: &[&(dyn crate::ToSql + Sync)],
3204        timeout_duration: std::time::Duration,
3205    ) -> Result<u64> {
3206        timeout(timeout_duration, self.execute(sql, params))
3207            .await
3208            .map_err(|_| Error::CommandTimeout)?
3209    }
3210
3211    /// Begin a transaction.
3212    ///
3213    /// This transitions the client from `Ready` to `InTransaction` state.
3214    /// Per MS-TDS spec, the server returns a transaction descriptor in the
3215    /// BeginTransaction EnvChange token that must be included in subsequent
3216    /// ALL_HEADERS sections.
3217    pub async fn begin_transaction(mut self) -> Result<Client<InTransaction>> {
3218        tracing::debug!("beginning transaction");
3219
3220        #[cfg(feature = "otel")]
3221        let instrumentation = self.instrumentation.clone();
3222        #[cfg(feature = "otel")]
3223        let mut span = instrumentation.transaction_span("BEGIN");
3224
3225        // Execute BEGIN TRANSACTION and extract the transaction descriptor
3226        let result = async {
3227            self.send_sql_batch("BEGIN TRANSACTION").await?;
3228            self.read_transaction_begin_result().await
3229        }
3230        .await;
3231
3232        #[cfg(feature = "otel")]
3233        match &result {
3234            Ok(_) => InstrumentationContext::record_success(&mut span, None),
3235            Err(e) => InstrumentationContext::record_error(&mut span, e),
3236        }
3237
3238        // Drop the span before moving instrumentation
3239        #[cfg(feature = "otel")]
3240        drop(span);
3241
3242        let transaction_descriptor = result?;
3243
3244        Ok(Client {
3245            config: self.config,
3246            _state: PhantomData,
3247            connection: self.connection,
3248            server_version: self.server_version,
3249            current_database: self.current_database,
3250            statement_cache: self.statement_cache,
3251            transaction_descriptor, // Store the descriptor from server
3252            #[cfg(feature = "otel")]
3253            instrumentation: self.instrumentation,
3254        })
3255    }
3256
3257    /// Begin a transaction with a specific isolation level.
3258    ///
3259    /// This transitions the client from `Ready` to `InTransaction` state
3260    /// with the specified isolation level.
3261    ///
3262    /// # Example
3263    ///
3264    /// ```rust,ignore
3265    /// use mssql_client::IsolationLevel;
3266    ///
3267    /// let tx = client.begin_transaction_with_isolation(IsolationLevel::Serializable).await?;
3268    /// // All operations in this transaction use SERIALIZABLE isolation
3269    /// tx.commit().await?;
3270    /// ```
3271    pub async fn begin_transaction_with_isolation(
3272        mut self,
3273        isolation_level: crate::transaction::IsolationLevel,
3274    ) -> Result<Client<InTransaction>> {
3275        tracing::debug!(
3276            isolation_level = %isolation_level.name(),
3277            "beginning transaction with isolation level"
3278        );
3279
3280        #[cfg(feature = "otel")]
3281        let instrumentation = self.instrumentation.clone();
3282        #[cfg(feature = "otel")]
3283        let mut span = instrumentation.transaction_span("BEGIN");
3284
3285        // First set the isolation level
3286        let result = async {
3287            self.send_sql_batch(isolation_level.as_sql()).await?;
3288            self.read_execute_result().await?;
3289
3290            // Then begin the transaction
3291            self.send_sql_batch("BEGIN TRANSACTION").await?;
3292            self.read_transaction_begin_result().await
3293        }
3294        .await;
3295
3296        #[cfg(feature = "otel")]
3297        match &result {
3298            Ok(_) => InstrumentationContext::record_success(&mut span, None),
3299            Err(e) => InstrumentationContext::record_error(&mut span, e),
3300        }
3301
3302        #[cfg(feature = "otel")]
3303        drop(span);
3304
3305        let transaction_descriptor = result?;
3306
3307        Ok(Client {
3308            config: self.config,
3309            _state: PhantomData,
3310            connection: self.connection,
3311            server_version: self.server_version,
3312            current_database: self.current_database,
3313            statement_cache: self.statement_cache,
3314            transaction_descriptor,
3315            #[cfg(feature = "otel")]
3316            instrumentation: self.instrumentation,
3317        })
3318    }
3319
3320    /// Execute a simple query without parameters.
3321    ///
3322    /// This is useful for DDL statements and simple queries where you
3323    /// don't need to retrieve the affected row count.
3324    pub async fn simple_query(&mut self, sql: &str) -> Result<()> {
3325        tracing::debug!(sql = sql, "executing simple query");
3326
3327        // Send SQL batch
3328        self.send_sql_batch(sql).await?;
3329
3330        // Read and discard response
3331        let _ = self.read_execute_result().await?;
3332
3333        Ok(())
3334    }
3335
3336    /// Close the connection gracefully.
3337    pub async fn close(self) -> Result<()> {
3338        tracing::debug!("closing connection");
3339        Ok(())
3340    }
3341
3342    /// Get the current database name.
3343    #[must_use]
3344    pub fn database(&self) -> Option<&str> {
3345        self.config.database.as_deref()
3346    }
3347
3348    /// Get the server host.
3349    #[must_use]
3350    pub fn host(&self) -> &str {
3351        &self.config.host
3352    }
3353
3354    /// Get the server port.
3355    #[must_use]
3356    pub fn port(&self) -> u16 {
3357        self.config.port
3358    }
3359
3360    /// Check if the connection is currently in a transaction.
3361    ///
3362    /// This returns `true` if a transaction was started via raw SQL
3363    /// (`BEGIN TRANSACTION`) and has not yet been committed or rolled back.
3364    ///
3365    /// Note: This only tracks transactions started via raw SQL. Transactions
3366    /// started via the type-state API (`begin_transaction()`) result in a
3367    /// `Client<InTransaction>` which is a different type.
3368    ///
3369    /// # Example
3370    ///
3371    /// ```rust,ignore
3372    /// client.execute("BEGIN TRANSACTION", &[]).await?;
3373    /// assert!(client.is_in_transaction());
3374    ///
3375    /// client.execute("COMMIT", &[]).await?;
3376    /// assert!(!client.is_in_transaction());
3377    /// ```
3378    #[must_use]
3379    pub fn is_in_transaction(&self) -> bool {
3380        self.transaction_descriptor != 0
3381    }
3382
3383    /// Get a handle for cancelling the current query.
3384    ///
3385    /// The cancel handle can be cloned and sent to other tasks, enabling
3386    /// cancellation of long-running queries from a separate async context.
3387    ///
3388    /// # Example
3389    ///
3390    /// ```rust,ignore
3391    /// use std::time::Duration;
3392    ///
3393    /// let cancel_handle = client.cancel_handle();
3394    ///
3395    /// // Spawn a task to cancel after 10 seconds
3396    /// let handle = tokio::spawn(async move {
3397    ///     tokio::time::sleep(Duration::from_secs(10)).await;
3398    ///     let _ = cancel_handle.cancel().await;
3399    /// });
3400    ///
3401    /// // This query will be cancelled if it runs longer than 10 seconds
3402    /// let result = client.query("SELECT * FROM very_large_table", &[]).await;
3403    /// ```
3404    #[must_use]
3405    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
3406        let connection = self
3407            .connection
3408            .as_ref()
3409            .expect("connection should be present");
3410        match connection {
3411            ConnectionHandle::Tls(conn) => {
3412                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
3413            }
3414            ConnectionHandle::TlsPrelogin(conn) => {
3415                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
3416            }
3417            ConnectionHandle::Plain(conn) => {
3418                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
3419            }
3420        }
3421    }
3422}
3423
3424impl Client<InTransaction> {
3425    /// Execute a query within the transaction and return a streaming result set.
3426    ///
3427    /// See [`Client<Ready>::query`] for usage examples.
3428    pub async fn query<'a>(
3429        &'a mut self,
3430        sql: &str,
3431        params: &[&(dyn crate::ToSql + Sync)],
3432    ) -> Result<QueryStream<'a>> {
3433        tracing::debug!(
3434            sql = sql,
3435            params_count = params.len(),
3436            "executing query in transaction"
3437        );
3438
3439        #[cfg(feature = "otel")]
3440        let instrumentation = self.instrumentation.clone();
3441        #[cfg(feature = "otel")]
3442        let mut span = instrumentation.query_span(sql);
3443
3444        let result = async {
3445            if params.is_empty() {
3446                // Simple query without parameters - use SQL batch
3447                self.send_sql_batch(sql).await?;
3448            } else {
3449                // Parameterized query - use sp_executesql via RPC
3450                let rpc_params = Self::convert_params(params)?;
3451                let rpc = RpcRequest::execute_sql(sql, rpc_params);
3452                self.send_rpc(&rpc).await?;
3453            }
3454
3455            // Read complete response including columns and rows
3456            self.read_query_response().await
3457        }
3458        .await;
3459
3460        #[cfg(feature = "otel")]
3461        match &result {
3462            Ok(_) => InstrumentationContext::record_success(&mut span, None),
3463            Err(e) => InstrumentationContext::record_error(&mut span, e),
3464        }
3465
3466        // Drop the span before returning
3467        #[cfg(feature = "otel")]
3468        drop(span);
3469
3470        let (columns, rows) = result?;
3471        Ok(QueryStream::new(columns, rows))
3472    }
3473
3474    /// Execute a statement within the transaction.
3475    ///
3476    /// Returns the number of affected rows.
3477    pub async fn execute(
3478        &mut self,
3479        sql: &str,
3480        params: &[&(dyn crate::ToSql + Sync)],
3481    ) -> Result<u64> {
3482        tracing::debug!(
3483            sql = sql,
3484            params_count = params.len(),
3485            "executing statement in transaction"
3486        );
3487
3488        #[cfg(feature = "otel")]
3489        let instrumentation = self.instrumentation.clone();
3490        #[cfg(feature = "otel")]
3491        let mut span = instrumentation.query_span(sql);
3492
3493        let result = async {
3494            if params.is_empty() {
3495                // Simple statement without parameters - use SQL batch
3496                self.send_sql_batch(sql).await?;
3497            } else {
3498                // Parameterized statement - use sp_executesql via RPC
3499                let rpc_params = Self::convert_params(params)?;
3500                let rpc = RpcRequest::execute_sql(sql, rpc_params);
3501                self.send_rpc(&rpc).await?;
3502            }
3503
3504            // Read response and get row count
3505            self.read_execute_result().await
3506        }
3507        .await;
3508
3509        #[cfg(feature = "otel")]
3510        match &result {
3511            Ok(rows) => InstrumentationContext::record_success(&mut span, Some(*rows)),
3512            Err(e) => InstrumentationContext::record_error(&mut span, e),
3513        }
3514
3515        // Drop the span before returning
3516        #[cfg(feature = "otel")]
3517        drop(span);
3518
3519        result
3520    }
3521
3522    /// Execute a query within the transaction with a specific timeout.
3523    ///
3524    /// See [`Client<Ready>::query_with_timeout`] for details.
3525    pub async fn query_with_timeout<'a>(
3526        &'a mut self,
3527        sql: &str,
3528        params: &[&(dyn crate::ToSql + Sync)],
3529        timeout_duration: std::time::Duration,
3530    ) -> Result<QueryStream<'a>> {
3531        timeout(timeout_duration, self.query(sql, params))
3532            .await
3533            .map_err(|_| Error::CommandTimeout)?
3534    }
3535
3536    /// Execute a statement within the transaction with a specific timeout.
3537    ///
3538    /// See [`Client<Ready>::execute_with_timeout`] for details.
3539    pub async fn execute_with_timeout(
3540        &mut self,
3541        sql: &str,
3542        params: &[&(dyn crate::ToSql + Sync)],
3543        timeout_duration: std::time::Duration,
3544    ) -> Result<u64> {
3545        timeout(timeout_duration, self.execute(sql, params))
3546            .await
3547            .map_err(|_| Error::CommandTimeout)?
3548    }
3549
3550    /// Commit the transaction.
3551    ///
3552    /// This transitions the client back to `Ready` state.
3553    pub async fn commit(mut self) -> Result<Client<Ready>> {
3554        tracing::debug!("committing transaction");
3555
3556        #[cfg(feature = "otel")]
3557        let instrumentation = self.instrumentation.clone();
3558        #[cfg(feature = "otel")]
3559        let mut span = instrumentation.transaction_span("COMMIT");
3560
3561        // Execute COMMIT TRANSACTION
3562        let result = async {
3563            self.send_sql_batch("COMMIT TRANSACTION").await?;
3564            self.read_execute_result().await
3565        }
3566        .await;
3567
3568        #[cfg(feature = "otel")]
3569        match &result {
3570            Ok(_) => InstrumentationContext::record_success(&mut span, None),
3571            Err(e) => InstrumentationContext::record_error(&mut span, e),
3572        }
3573
3574        // Drop the span before moving instrumentation
3575        #[cfg(feature = "otel")]
3576        drop(span);
3577
3578        result?;
3579
3580        Ok(Client {
3581            config: self.config,
3582            _state: PhantomData,
3583            connection: self.connection,
3584            server_version: self.server_version,
3585            current_database: self.current_database,
3586            statement_cache: self.statement_cache,
3587            transaction_descriptor: 0, // Reset to auto-commit mode
3588            #[cfg(feature = "otel")]
3589            instrumentation: self.instrumentation,
3590        })
3591    }
3592
3593    /// Rollback the transaction.
3594    ///
3595    /// This transitions the client back to `Ready` state.
3596    pub async fn rollback(mut self) -> Result<Client<Ready>> {
3597        tracing::debug!("rolling back transaction");
3598
3599        #[cfg(feature = "otel")]
3600        let instrumentation = self.instrumentation.clone();
3601        #[cfg(feature = "otel")]
3602        let mut span = instrumentation.transaction_span("ROLLBACK");
3603
3604        // Execute ROLLBACK TRANSACTION
3605        let result = async {
3606            self.send_sql_batch("ROLLBACK TRANSACTION").await?;
3607            self.read_execute_result().await
3608        }
3609        .await;
3610
3611        #[cfg(feature = "otel")]
3612        match &result {
3613            Ok(_) => InstrumentationContext::record_success(&mut span, None),
3614            Err(e) => InstrumentationContext::record_error(&mut span, e),
3615        }
3616
3617        // Drop the span before moving instrumentation
3618        #[cfg(feature = "otel")]
3619        drop(span);
3620
3621        result?;
3622
3623        Ok(Client {
3624            config: self.config,
3625            _state: PhantomData,
3626            connection: self.connection,
3627            server_version: self.server_version,
3628            current_database: self.current_database,
3629            statement_cache: self.statement_cache,
3630            transaction_descriptor: 0, // Reset to auto-commit mode
3631            #[cfg(feature = "otel")]
3632            instrumentation: self.instrumentation,
3633        })
3634    }
3635
3636    /// Create a savepoint and return a handle for later rollback.
3637    ///
3638    /// The returned `SavePoint` handle contains the validated savepoint name.
3639    /// Use it with `rollback_to()` to partially undo transaction work.
3640    ///
3641    /// # Example
3642    ///
3643    /// ```rust,ignore
3644    /// let tx = client.begin_transaction().await?;
3645    /// tx.execute("INSERT INTO orders ...").await?;
3646    /// let sp = tx.save_point("before_items").await?;
3647    /// tx.execute("INSERT INTO items ...").await?;
3648    /// // Oops, rollback just the items
3649    /// tx.rollback_to(&sp).await?;
3650    /// tx.commit().await?;
3651    /// ```
3652    pub async fn save_point(&mut self, name: &str) -> Result<SavePoint> {
3653        validate_identifier(name)?;
3654        tracing::debug!(name = name, "creating savepoint");
3655
3656        // Execute SAVE TRANSACTION <name>
3657        // Note: name is validated by validate_identifier() to prevent SQL injection
3658        let sql = format!("SAVE TRANSACTION {}", name);
3659        self.send_sql_batch(&sql).await?;
3660        self.read_execute_result().await?;
3661
3662        Ok(SavePoint::new(name.to_string()))
3663    }
3664
3665    /// Rollback to a savepoint.
3666    ///
3667    /// This rolls back all changes made after the savepoint was created,
3668    /// but keeps the transaction active. The savepoint remains valid and
3669    /// can be rolled back to again.
3670    ///
3671    /// # Example
3672    ///
3673    /// ```rust,ignore
3674    /// let sp = tx.save_point("checkpoint").await?;
3675    /// // ... do some work ...
3676    /// tx.rollback_to(&sp).await?;  // Undo changes since checkpoint
3677    /// // Transaction is still active, savepoint is still valid
3678    /// ```
3679    pub async fn rollback_to(&mut self, savepoint: &SavePoint) -> Result<()> {
3680        tracing::debug!(name = savepoint.name(), "rolling back to savepoint");
3681
3682        // Execute ROLLBACK TRANSACTION <name>
3683        // Note: savepoint name was validated during creation
3684        let sql = format!("ROLLBACK TRANSACTION {}", savepoint.name());
3685        self.send_sql_batch(&sql).await?;
3686        self.read_execute_result().await?;
3687
3688        Ok(())
3689    }
3690
3691    /// Release a savepoint (optional cleanup).
3692    ///
3693    /// Note: SQL Server doesn't have explicit savepoint release, but this
3694    /// method is provided for API completeness. The savepoint is automatically
3695    /// released when the transaction commits or rolls back.
3696    pub async fn release_savepoint(&mut self, savepoint: SavePoint) -> Result<()> {
3697        tracing::debug!(name = savepoint.name(), "releasing savepoint");
3698
3699        // SQL Server doesn't require explicit savepoint release
3700        // The savepoint is implicitly released on commit/rollback
3701        // This method exists for API completeness
3702        drop(savepoint);
3703        Ok(())
3704    }
3705
3706    /// Get a handle for cancelling the current query within the transaction.
3707    ///
3708    /// See [`Client<Ready>::cancel_handle`] for usage examples.
3709    #[must_use]
3710    pub fn cancel_handle(&self) -> crate::cancel::CancelHandle {
3711        let connection = self
3712            .connection
3713            .as_ref()
3714            .expect("connection should be present");
3715        match connection {
3716            ConnectionHandle::Tls(conn) => {
3717                crate::cancel::CancelHandle::from_tls(conn.cancel_handle())
3718            }
3719            ConnectionHandle::TlsPrelogin(conn) => {
3720                crate::cancel::CancelHandle::from_tls_prelogin(conn.cancel_handle())
3721            }
3722            ConnectionHandle::Plain(conn) => {
3723                crate::cancel::CancelHandle::from_plain(conn.cancel_handle())
3724            }
3725        }
3726    }
3727}
3728
3729/// Validate an identifier (table name, savepoint name, etc.) to prevent SQL injection.
3730fn validate_identifier(name: &str) -> Result<()> {
3731    use once_cell::sync::Lazy;
3732    use regex::Regex;
3733
3734    static IDENTIFIER_RE: Lazy<Regex> =
3735        Lazy::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_@#$]{0,127}$").unwrap());
3736
3737    if name.is_empty() {
3738        return Err(Error::InvalidIdentifier(
3739            "identifier cannot be empty".into(),
3740        ));
3741    }
3742
3743    if !IDENTIFIER_RE.is_match(name) {
3744        return Err(Error::InvalidIdentifier(format!(
3745            "invalid identifier '{}': must start with letter/underscore, \
3746             contain only alphanumerics/_/@/#/$, and be 1-128 characters",
3747            name
3748        )));
3749    }
3750
3751    Ok(())
3752}
3753
3754impl<S: ConnectionState> std::fmt::Debug for Client<S> {
3755    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3756        f.debug_struct("Client")
3757            .field("host", &self.config.host)
3758            .field("port", &self.config.port)
3759            .field("database", &self.config.database)
3760            .finish()
3761    }
3762}
3763
3764#[cfg(test)]
3765#[allow(clippy::unwrap_used, clippy::panic)]
3766mod tests {
3767    use super::*;
3768
3769    #[test]
3770    fn test_validate_identifier_valid() {
3771        assert!(validate_identifier("my_table").is_ok());
3772        assert!(validate_identifier("Table123").is_ok());
3773        assert!(validate_identifier("_private").is_ok());
3774        assert!(validate_identifier("sp_test").is_ok());
3775    }
3776
3777    #[test]
3778    fn test_validate_identifier_invalid() {
3779        assert!(validate_identifier("").is_err());
3780        assert!(validate_identifier("123abc").is_err());
3781        assert!(validate_identifier("table-name").is_err());
3782        assert!(validate_identifier("table name").is_err());
3783        assert!(validate_identifier("table;DROP TABLE users").is_err());
3784    }
3785
3786    // ========================================================================
3787    // PLP (Partially Length-Prefixed) Parsing Tests
3788    // ========================================================================
3789    //
3790    // These tests verify that MAX type (NVARCHAR(MAX), VARCHAR(MAX), VARBINARY(MAX))
3791    // data is correctly parsed from the PLP wire format.
3792
3793    /// Helper to create PLP data with a single chunk.
3794    fn make_plp_data(total_len: u64, chunks: &[&[u8]]) -> Vec<u8> {
3795        let mut data = Vec::new();
3796        // 8-byte total length
3797        data.extend_from_slice(&total_len.to_le_bytes());
3798        // Chunks
3799        for chunk in chunks {
3800            let len = chunk.len() as u32;
3801            data.extend_from_slice(&len.to_le_bytes());
3802            data.extend_from_slice(chunk);
3803        }
3804        // Terminating zero-length chunk
3805        data.extend_from_slice(&0u32.to_le_bytes());
3806        data
3807    }
3808
3809    #[test]
3810    fn test_parse_plp_nvarchar_simple() {
3811        // "Hello" in UTF-16LE: H=0x0048, e=0x0065, l=0x006C, l=0x006C, o=0x006F
3812        let utf16_data = [0x48, 0x00, 0x65, 0x00, 0x6C, 0x00, 0x6C, 0x00, 0x6F, 0x00];
3813        let plp = make_plp_data(10, &[&utf16_data]);
3814        let mut buf: &[u8] = &plp;
3815
3816        let result = Client::<Ready>::parse_plp_nvarchar(&mut buf).unwrap();
3817        match result {
3818            mssql_types::SqlValue::String(s) => assert_eq!(s, "Hello"),
3819            _ => panic!("expected String, got {:?}", result),
3820        }
3821    }
3822
3823    #[test]
3824    fn test_parse_plp_nvarchar_null() {
3825        // NULL is indicated by total_len = 0xFFFFFFFFFFFFFFFF
3826        let plp = 0xFFFFFFFFFFFFFFFFu64.to_le_bytes();
3827        let mut buf: &[u8] = &plp;
3828
3829        let result = Client::<Ready>::parse_plp_nvarchar(&mut buf).unwrap();
3830        assert!(matches!(result, mssql_types::SqlValue::Null));
3831    }
3832
3833    #[test]
3834    fn test_parse_plp_nvarchar_empty() {
3835        // Empty string: total_len=0, single zero-length chunk
3836        let plp = make_plp_data(0, &[]);
3837        let mut buf: &[u8] = &plp;
3838
3839        let result = Client::<Ready>::parse_plp_nvarchar(&mut buf).unwrap();
3840        match result {
3841            mssql_types::SqlValue::String(s) => assert_eq!(s, ""),
3842            _ => panic!("expected empty String"),
3843        }
3844    }
3845
3846    #[test]
3847    fn test_parse_plp_nvarchar_multi_chunk() {
3848        // "Hello" split across two chunks: "Hel" + "lo"
3849        let chunk1 = [0x48, 0x00, 0x65, 0x00, 0x6C, 0x00]; // "Hel"
3850        let chunk2 = [0x6C, 0x00, 0x6F, 0x00]; // "lo"
3851        let plp = make_plp_data(10, &[&chunk1, &chunk2]);
3852        let mut buf: &[u8] = &plp;
3853
3854        let result = Client::<Ready>::parse_plp_nvarchar(&mut buf).unwrap();
3855        match result {
3856            mssql_types::SqlValue::String(s) => assert_eq!(s, "Hello"),
3857            _ => panic!("expected String"),
3858        }
3859    }
3860
3861    #[test]
3862    fn test_parse_plp_varchar_simple() {
3863        let data = b"Hello World";
3864        let plp = make_plp_data(11, &[data]);
3865        let mut buf: &[u8] = &plp;
3866
3867        let result = Client::<Ready>::parse_plp_varchar(&mut buf).unwrap();
3868        match result {
3869            mssql_types::SqlValue::String(s) => assert_eq!(s, "Hello World"),
3870            _ => panic!("expected String"),
3871        }
3872    }
3873
3874    #[test]
3875    fn test_parse_plp_varchar_null() {
3876        let plp = 0xFFFFFFFFFFFFFFFFu64.to_le_bytes();
3877        let mut buf: &[u8] = &plp;
3878
3879        let result = Client::<Ready>::parse_plp_varchar(&mut buf).unwrap();
3880        assert!(matches!(result, mssql_types::SqlValue::Null));
3881    }
3882
3883    #[test]
3884    fn test_parse_plp_varbinary_simple() {
3885        let data = [0x01, 0x02, 0x03, 0x04, 0x05];
3886        let plp = make_plp_data(5, &[&data]);
3887        let mut buf: &[u8] = &plp;
3888
3889        let result = Client::<Ready>::parse_plp_varbinary(&mut buf).unwrap();
3890        match result {
3891            mssql_types::SqlValue::Binary(b) => assert_eq!(&b[..], &[0x01, 0x02, 0x03, 0x04, 0x05]),
3892            _ => panic!("expected Binary"),
3893        }
3894    }
3895
3896    #[test]
3897    fn test_parse_plp_varbinary_null() {
3898        let plp = 0xFFFFFFFFFFFFFFFFu64.to_le_bytes();
3899        let mut buf: &[u8] = &plp;
3900
3901        let result = Client::<Ready>::parse_plp_varbinary(&mut buf).unwrap();
3902        assert!(matches!(result, mssql_types::SqlValue::Null));
3903    }
3904
3905    #[test]
3906    fn test_parse_plp_varbinary_large() {
3907        // Test with larger data split across multiple chunks
3908        let chunk1: Vec<u8> = (0..100u8).collect();
3909        let chunk2: Vec<u8> = (100..200u8).collect();
3910        let chunk3: Vec<u8> = (200..255u8).collect();
3911        let total_len = chunk1.len() + chunk2.len() + chunk3.len();
3912        let plp = make_plp_data(total_len as u64, &[&chunk1, &chunk2, &chunk3]);
3913        let mut buf: &[u8] = &plp;
3914
3915        let result = Client::<Ready>::parse_plp_varbinary(&mut buf).unwrap();
3916        match result {
3917            mssql_types::SqlValue::Binary(b) => {
3918                assert_eq!(b.len(), 255);
3919                // Verify data integrity
3920                for (i, &byte) in b.iter().enumerate() {
3921                    assert_eq!(byte, i as u8);
3922                }
3923            }
3924            _ => panic!("expected Binary"),
3925        }
3926    }
3927
3928    // ========================================================================
3929    // Multi-Column Row Parsing Tests
3930    // ========================================================================
3931    //
3932    // These tests verify that parsing multiple columns in a row works correctly,
3933    // especially for scenarios where string columns are followed by integer columns.
3934
3935    use tds_protocol::token::{ColumnData, TypeInfo};
3936    use tds_protocol::types::TypeId;
3937
3938    /// Build raw row data for a non-MAX NVarChar followed by an IntN.
3939    /// This mimics the scenario: SELECT @name AS greeting, @value AS number
3940    fn make_nvarchar_int_row(nvarchar_value: &str, int_value: i32) -> Vec<u8> {
3941        let mut data = Vec::new();
3942
3943        // Column 0: NVarChar (non-MAX) - 2-byte length prefix (in bytes)
3944        let utf16: Vec<u16> = nvarchar_value.encode_utf16().collect();
3945        let byte_len = (utf16.len() * 2) as u16;
3946        data.extend_from_slice(&byte_len.to_le_bytes());
3947        for code_unit in utf16 {
3948            data.extend_from_slice(&code_unit.to_le_bytes());
3949        }
3950
3951        // Column 1: IntN - 1-byte length prefix
3952        data.push(4); // 4 bytes for INT
3953        data.extend_from_slice(&int_value.to_le_bytes());
3954
3955        data
3956    }
3957
3958    #[test]
3959    fn test_parse_row_nvarchar_then_int() {
3960        // Build raw row data for: "World", 42
3961        let raw_data = make_nvarchar_int_row("World", 42);
3962
3963        // Create column metadata
3964        let col0 = ColumnData {
3965            name: "greeting".to_string(),
3966            type_id: TypeId::NVarChar,
3967            col_type: 0xE7,
3968            flags: 0x01,
3969            user_type: 0,
3970            type_info: TypeInfo {
3971                max_length: Some(10), // 5 chars * 2 bytes = 10
3972                precision: None,
3973                scale: None,
3974                collation: None,
3975            },
3976        };
3977
3978        let col1 = ColumnData {
3979            name: "number".to_string(),
3980            type_id: TypeId::IntN,
3981            col_type: 0x26,
3982            flags: 0x01,
3983            user_type: 0,
3984            type_info: TypeInfo {
3985                max_length: Some(4),
3986                precision: None,
3987                scale: None,
3988                collation: None,
3989            },
3990        };
3991
3992        let mut buf: &[u8] = &raw_data;
3993
3994        // Parse column 0 (NVarChar)
3995        let value0 = Client::<Ready>::parse_column_value(&mut buf, &col0).unwrap();
3996        match value0 {
3997            mssql_types::SqlValue::String(s) => assert_eq!(s, "World"),
3998            _ => panic!("expected String, got {:?}", value0),
3999        }
4000
4001        // Parse column 1 (IntN)
4002        let value1 = Client::<Ready>::parse_column_value(&mut buf, &col1).unwrap();
4003        match value1 {
4004            mssql_types::SqlValue::Int(i) => assert_eq!(i, 42),
4005            _ => panic!("expected Int, got {:?}", value1),
4006        }
4007
4008        // Buffer should be fully consumed
4009        assert_eq!(buf.len(), 0, "buffer should be fully consumed");
4010    }
4011
4012    #[test]
4013    fn test_parse_row_multiple_types() {
4014        // Build raw data for: NULL (NVarChar), 123 (IntN), "Test" (NVarChar), NULL (IntN)
4015        let mut data = Vec::new();
4016
4017        // Column 0: NVarChar NULL (0xFFFF)
4018        data.extend_from_slice(&0xFFFFu16.to_le_bytes());
4019
4020        // Column 1: IntN with value 123
4021        data.push(4); // 4 bytes
4022        data.extend_from_slice(&123i32.to_le_bytes());
4023
4024        // Column 2: NVarChar "Test"
4025        let utf16: Vec<u16> = "Test".encode_utf16().collect();
4026        data.extend_from_slice(&((utf16.len() * 2) as u16).to_le_bytes());
4027        for code_unit in utf16 {
4028            data.extend_from_slice(&code_unit.to_le_bytes());
4029        }
4030
4031        // Column 3: IntN NULL (0 length)
4032        data.push(0);
4033
4034        // Metadata for 4 columns
4035        let col0 = ColumnData {
4036            name: "col0".to_string(),
4037            type_id: TypeId::NVarChar,
4038            col_type: 0xE7,
4039            flags: 0x01,
4040            user_type: 0,
4041            type_info: TypeInfo {
4042                max_length: Some(100),
4043                precision: None,
4044                scale: None,
4045                collation: None,
4046            },
4047        };
4048        let col1 = ColumnData {
4049            name: "col1".to_string(),
4050            type_id: TypeId::IntN,
4051            col_type: 0x26,
4052            flags: 0x01,
4053            user_type: 0,
4054            type_info: TypeInfo {
4055                max_length: Some(4),
4056                precision: None,
4057                scale: None,
4058                collation: None,
4059            },
4060        };
4061        let col2 = col0.clone();
4062        let col3 = col1.clone();
4063
4064        let mut buf: &[u8] = &data;
4065
4066        // Parse all 4 columns
4067        let v0 = Client::<Ready>::parse_column_value(&mut buf, &col0).unwrap();
4068        assert!(
4069            matches!(v0, mssql_types::SqlValue::Null),
4070            "col0 should be Null"
4071        );
4072
4073        let v1 = Client::<Ready>::parse_column_value(&mut buf, &col1).unwrap();
4074        assert!(
4075            matches!(v1, mssql_types::SqlValue::Int(123)),
4076            "col1 should be 123"
4077        );
4078
4079        let v2 = Client::<Ready>::parse_column_value(&mut buf, &col2).unwrap();
4080        match v2 {
4081            mssql_types::SqlValue::String(s) => assert_eq!(s, "Test"),
4082            _ => panic!("col2 should be 'Test'"),
4083        }
4084
4085        let v3 = Client::<Ready>::parse_column_value(&mut buf, &col3).unwrap();
4086        assert!(
4087            matches!(v3, mssql_types::SqlValue::Null),
4088            "col3 should be Null"
4089        );
4090
4091        // Buffer should be fully consumed
4092        assert_eq!(buf.len(), 0, "buffer should be fully consumed");
4093    }
4094
4095    #[test]
4096    fn test_parse_row_with_unicode() {
4097        // Test with Unicode characters that need proper UTF-16 encoding
4098        let test_str = "Héllo Wörld 日本語";
4099        let mut data = Vec::new();
4100
4101        // NVarChar with Unicode
4102        let utf16: Vec<u16> = test_str.encode_utf16().collect();
4103        data.extend_from_slice(&((utf16.len() * 2) as u16).to_le_bytes());
4104        for code_unit in utf16 {
4105            data.extend_from_slice(&code_unit.to_le_bytes());
4106        }
4107
4108        // IntN value
4109        data.push(8); // BIGINT
4110        data.extend_from_slice(&9999999999i64.to_le_bytes());
4111
4112        let col0 = ColumnData {
4113            name: "text".to_string(),
4114            type_id: TypeId::NVarChar,
4115            col_type: 0xE7,
4116            flags: 0x01,
4117            user_type: 0,
4118            type_info: TypeInfo {
4119                max_length: Some(100),
4120                precision: None,
4121                scale: None,
4122                collation: None,
4123            },
4124        };
4125        let col1 = ColumnData {
4126            name: "num".to_string(),
4127            type_id: TypeId::IntN,
4128            col_type: 0x26,
4129            flags: 0x01,
4130            user_type: 0,
4131            type_info: TypeInfo {
4132                max_length: Some(8),
4133                precision: None,
4134                scale: None,
4135                collation: None,
4136            },
4137        };
4138
4139        let mut buf: &[u8] = &data;
4140
4141        let v0 = Client::<Ready>::parse_column_value(&mut buf, &col0).unwrap();
4142        match v0 {
4143            mssql_types::SqlValue::String(s) => assert_eq!(s, test_str),
4144            _ => panic!("expected String"),
4145        }
4146
4147        let v1 = Client::<Ready>::parse_column_value(&mut buf, &col1).unwrap();
4148        match v1 {
4149            mssql_types::SqlValue::BigInt(i) => assert_eq!(i, 9999999999),
4150            _ => panic!("expected BigInt"),
4151        }
4152
4153        assert_eq!(buf.len(), 0, "buffer should be fully consumed");
4154    }
4155}