Skip to main content

tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,no_run
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! fn parse(data: Bytes) -> Result<(), tds_protocol::ProtocolError> {
19//!     let mut parser = TokenParser::new(data);
20//!
21//!     while let Some(token) = parser.next_token()? {
22//!         match token {
23//!             Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!             Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!             _ => {}
26//!         }
27//!     }
28//!     Ok(())
29//! }
30//! ```
31
32use bytes::{Buf, BufMut, Bytes};
33
34use crate::codec::{read_b_varchar, read_us_varchar};
35use crate::error::ProtocolError;
36use crate::prelude::*;
37use crate::types::TypeId;
38
39/// Token type identifier.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41#[repr(u8)]
42#[non_exhaustive]
43pub enum TokenType {
44    /// Column metadata (COLMETADATA).
45    ColMetaData = 0x81,
46    /// Error message (ERROR).
47    Error = 0xAA,
48    /// Informational message (INFO).
49    Info = 0xAB,
50    /// Login acknowledgment (LOGINACK).
51    LoginAck = 0xAD,
52    /// Row data (ROW).
53    Row = 0xD1,
54    /// Null bitmap compressed row (NBCROW).
55    NbcRow = 0xD2,
56    /// Environment change (ENVCHANGE).
57    EnvChange = 0xE3,
58    /// SSPI authentication (SSPI).
59    Sspi = 0xED,
60    /// Done (DONE).
61    Done = 0xFD,
62    /// Done in procedure (DONEINPROC).
63    DoneInProc = 0xFF,
64    /// Done procedure (DONEPROC).
65    DoneProc = 0xFE,
66    /// Return status (RETURNSTATUS).
67    ReturnStatus = 0x79,
68    /// Return value (RETURNVALUE).
69    ReturnValue = 0xAC,
70    /// Order (ORDER).
71    Order = 0xA9,
72    /// Feature extension acknowledgment (FEATUREEXTACK).
73    FeatureExtAck = 0xAE,
74    /// Session state (SESSIONSTATE).
75    SessionState = 0xE4,
76    /// Federated authentication info (FEDAUTHINFO).
77    FedAuthInfo = 0xEE,
78    /// Column info (COLINFO).
79    ColInfo = 0xA5,
80    /// Table name (TABNAME).
81    TabName = 0xA4,
82    /// Offset (OFFSET).
83    Offset = 0x78,
84    /// Compute (COMPUTE BY) column metadata (ALTMETADATA). Deprecated in TDS 7.4.
85    AltMetaData = 0x88,
86    /// Compute (COMPUTE BY) row data (ALTROW). Deprecated in TDS 7.4.
87    AltRow = 0xD3,
88}
89
90impl TokenType {
91    /// Create a token type from a raw byte.
92    pub fn from_u8(value: u8) -> Option<Self> {
93        match value {
94            0x81 => Some(Self::ColMetaData),
95            0xAA => Some(Self::Error),
96            0xAB => Some(Self::Info),
97            0xAD => Some(Self::LoginAck),
98            0xD1 => Some(Self::Row),
99            0xD2 => Some(Self::NbcRow),
100            0xE3 => Some(Self::EnvChange),
101            0xED => Some(Self::Sspi),
102            0xFD => Some(Self::Done),
103            0xFF => Some(Self::DoneInProc),
104            0xFE => Some(Self::DoneProc),
105            0x79 => Some(Self::ReturnStatus),
106            0xAC => Some(Self::ReturnValue),
107            0xA9 => Some(Self::Order),
108            0xAE => Some(Self::FeatureExtAck),
109            0xE4 => Some(Self::SessionState),
110            0xEE => Some(Self::FedAuthInfo),
111            0xA5 => Some(Self::ColInfo),
112            0xA4 => Some(Self::TabName),
113            0x78 => Some(Self::Offset),
114            0x88 => Some(Self::AltMetaData),
115            0xD3 => Some(Self::AltRow),
116            _ => None,
117        }
118    }
119}
120
121/// Parsed TDS token.
122///
123/// This enum represents all possible tokens that can be received from SQL Server.
124/// Each variant contains the parsed token data.
125#[derive(Debug, Clone)]
126#[non_exhaustive]
127pub enum Token {
128    /// Column metadata describing result set structure.
129    ColMetaData(ColMetaData),
130    /// Row data.
131    Row(RawRow),
132    /// Null bitmap compressed row.
133    NbcRow(NbcRow),
134    /// Completion of a SQL statement.
135    Done(Done),
136    /// Completion of a stored procedure.
137    DoneProc(DoneProc),
138    /// Completion within a stored procedure.
139    DoneInProc(DoneInProc),
140    /// Return status from stored procedure.
141    ReturnStatus(i32),
142    /// Return value from stored procedure.
143    ReturnValue(ReturnValue),
144    /// Error message from server.
145    Error(ServerError),
146    /// Informational message from server.
147    Info(ServerInfo),
148    /// Login acknowledgment.
149    LoginAck(LoginAck),
150    /// Environment change notification.
151    EnvChange(EnvChange),
152    /// Column ordering information.
153    Order(Order),
154    /// Feature extension acknowledgment.
155    FeatureExtAck(FeatureExtAck),
156    /// SSPI authentication data.
157    Sspi(SspiToken),
158    /// Session state information.
159    SessionState(SessionState),
160    /// Federated authentication info.
161    FedAuthInfo(FedAuthInfo),
162}
163
164/// Column metadata token.
165#[derive(Debug, Clone, Default)]
166pub struct ColMetaData {
167    /// Column definitions.
168    pub columns: Vec<ColumnData>,
169    /// CEK table for Always Encrypted result sets.
170    /// Present only when the server sends encrypted column metadata.
171    pub cek_table: Option<crate::crypto::CekTable>,
172}
173
174/// Column definition within metadata.
175#[derive(Debug, Clone)]
176pub struct ColumnData {
177    /// Column name.
178    pub name: String,
179    /// Column data type ID.
180    pub type_id: TypeId,
181    /// Column data type raw byte (for unknown types).
182    pub col_type: u8,
183    /// Column flags.
184    pub flags: u16,
185    /// User type ID.
186    pub user_type: u32,
187    /// Type-specific metadata.
188    pub type_info: TypeInfo,
189    /// Per-column encryption metadata (Always Encrypted).
190    /// Present only for columns with the encrypted flag (0x0800) set.
191    pub crypto_metadata: Option<crate::crypto::CryptoMetadata>,
192}
193
194/// Type-specific metadata.
195#[derive(Debug, Clone, Default)]
196pub struct TypeInfo {
197    /// Maximum length for variable-length types.
198    pub max_length: Option<u32>,
199    /// Precision for numeric types.
200    pub precision: Option<u8>,
201    /// Scale for numeric types.
202    pub scale: Option<u8>,
203    /// Collation for string types.
204    pub collation: Option<Collation>,
205}
206
207/// SQL Server collation.
208///
209/// Collations in SQL Server define the character encoding and sorting rules
210/// for string data. For `VARCHAR` columns, the collation determines which
211/// code page (character encoding) is used to store the data.
212///
213/// # Encoding Support
214///
215/// When the `encoding` feature is enabled, the [`Collation::encoding()`] method
216/// returns the appropriate [`encoding_rs::Encoding`] for decoding `VARCHAR` data.
217///
218/// # Example
219///
220/// ```rust,ignore
221/// use tds_protocol::token::Collation;
222///
223/// let collation = Collation { lcid: 0x0804, sort_id: 0 }; // Chinese (PRC)
224/// if let Some(encoding) = collation.encoding() {
225///     let (decoded, _, _) = encoding.decode(raw_bytes);
226///     // decoded is now proper Chinese text
227/// }
228/// ```
229#[derive(Debug, Clone, Copy, Default)]
230pub struct Collation {
231    /// Locale ID (LCID).
232    ///
233    /// The LCID encodes both the language and region. The lower 16 bits
234    /// contain the primary language ID, and bits 16-19 contain the sort ID
235    /// for some collations.
236    ///
237    /// For UTF-8 collations (SQL Server 2019+), fUTF8 (bit 26, 0x0400_0000) is set.
238    pub lcid: u32,
239    /// Sort ID.
240    ///
241    /// Used with certain collations to specify sorting behavior.
242    pub sort_id: u8,
243}
244
245impl Collation {
246    /// Create a `Collation` from the 5-byte TDS wire format.
247    ///
248    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
249    pub fn from_bytes(bytes: &[u8; 5]) -> Self {
250        Self {
251            lcid: u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
252            sort_id: bytes[4],
253        }
254    }
255
256    /// Serialize to the 5-byte TDS wire format.
257    ///
258    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
259    pub fn to_bytes(&self) -> [u8; 5] {
260        let b = self.lcid.to_le_bytes();
261        [b[0], b[1], b[2], b[3], self.sort_id]
262    }
263
264    /// Returns the character encoding for this collation.
265    ///
266    /// This method maps the collation's LCID to the appropriate character
267    /// encoding from the `encoding_rs` crate.
268    ///
269    /// # Returns
270    ///
271    /// - `Some(&Encoding)` - The encoding to use for decoding `VARCHAR` data
272    /// - `None` - If the collation uses UTF-8 (no transcoding needed) or
273    ///   the LCID is not recognized (caller should use Windows-1252 fallback)
274    ///
275    /// # UTF-8 Collations
276    ///
277    /// SQL Server 2019+ supports UTF-8 collations (identified by the `_UTF8`
278    /// suffix). These return `None` because no transcoding is needed.
279    ///
280    /// # Example
281    ///
282    /// ```rust,ignore
283    /// let collation = Collation { lcid: 0x0419, sort_id: 0 }; // Russian
284    /// if let Some(encoding) = collation.encoding() {
285    ///     // encoding is Windows-1251 for Cyrillic
286    ///     let (text, _, had_errors) = encoding.decode(&raw_bytes);
287    /// }
288    /// ```
289    #[cfg(feature = "encoding")]
290    pub fn encoding(&self) -> Option<&'static encoding_rs::Encoding> {
291        // A non-zero SortId means a SQL collation, whose code page derives
292        // from the SortId, not the LCID (MS-TDS). Consulting only the LCID
293        // silently decoded these as windows-1252 (issue #158).
294        if self.sort_id != 0 {
295            return crate::collation::encoding_for_sort_id(self.sort_id);
296        }
297        crate::collation::encoding_for_lcid(self.lcid)
298    }
299
300    /// Returns whether this collation uses UTF-8 encoding.
301    ///
302    /// UTF-8 collations were introduced in SQL Server 2019 and are
303    /// identified by the `_UTF8` suffix in the collation name.
304    #[cfg(feature = "encoding")]
305    pub fn is_utf8(&self) -> bool {
306        crate::collation::is_utf8_collation(self.lcid)
307    }
308
309    /// Returns the Windows code page number for this collation.
310    ///
311    /// Useful for error messages and debugging.
312    ///
313    /// # Returns
314    ///
315    /// The code page number (e.g., 1252 for Western European, 932 for Japanese).
316    #[cfg(feature = "encoding")]
317    pub fn code_page(&self) -> Option<u16> {
318        // SQL collations (non-zero SortId) derive their code page from the
319        // SortId, not the LCID (issue #158).
320        if self.sort_id != 0 {
321            return crate::collation::code_page_for_sort_id(self.sort_id);
322        }
323        crate::collation::code_page_for_lcid(self.lcid)
324    }
325
326    /// Returns the encoding name for this collation.
327    ///
328    /// Useful for error messages and debugging.
329    #[cfg(feature = "encoding")]
330    pub fn encoding_name(&self) -> &'static str {
331        if self.sort_id != 0 {
332            return match crate::collation::encoding_for_sort_id(self.sort_id) {
333                Some(enc) => enc.name(),
334                None => "unsupported",
335            };
336        }
337        crate::collation::encoding_name_for_lcid(self.lcid)
338    }
339}
340
341/// Raw row data (not yet decoded).
342#[derive(Debug, Clone)]
343pub struct RawRow {
344    /// Raw column values.
345    pub data: bytes::Bytes,
346}
347
348/// Null bitmap compressed row.
349#[derive(Debug, Clone)]
350pub struct NbcRow {
351    /// Null bitmap.
352    pub null_bitmap: Vec<u8>,
353    /// Raw non-null column values.
354    pub data: bytes::Bytes,
355}
356
357/// Done token indicating statement completion.
358#[derive(Debug, Clone, Copy)]
359pub struct Done {
360    /// Status flags.
361    pub status: DoneStatus,
362    /// Current command.
363    pub cur_cmd: u16,
364    /// Row count (if applicable).
365    pub row_count: u64,
366}
367
368/// Done status flags.
369#[derive(Debug, Clone, Copy, Default)]
370#[non_exhaustive]
371pub struct DoneStatus {
372    /// More results follow.
373    pub more: bool,
374    /// Error occurred.
375    pub error: bool,
376    /// Transaction in progress.
377    pub in_xact: bool,
378    /// Row count is valid.
379    pub count: bool,
380    /// Attention acknowledgment.
381    pub attn: bool,
382    /// Server error caused statement termination.
383    pub srverror: bool,
384}
385
386/// Done in procedure token.
387#[derive(Debug, Clone, Copy)]
388pub struct DoneInProc {
389    /// Status flags.
390    pub status: DoneStatus,
391    /// Current command.
392    pub cur_cmd: u16,
393    /// Row count.
394    pub row_count: u64,
395}
396
397/// Done procedure token.
398#[derive(Debug, Clone, Copy)]
399pub struct DoneProc {
400    /// Status flags.
401    pub status: DoneStatus,
402    /// Current command.
403    pub cur_cmd: u16,
404    /// Row count.
405    pub row_count: u64,
406}
407
408/// Return value from stored procedure.
409#[derive(Debug, Clone)]
410#[non_exhaustive]
411pub struct ReturnValue {
412    /// Parameter ordinal.
413    pub param_ordinal: u16,
414    /// Parameter name.
415    pub param_name: String,
416    /// Status flags.
417    pub status: u8,
418    /// User type.
419    pub user_type: u32,
420    /// Type flags.
421    pub flags: u16,
422    /// Raw column type byte from the wire.
423    pub col_type: u8,
424    /// Type info.
425    pub type_info: TypeInfo,
426    /// Value data.
427    pub value: bytes::Bytes,
428}
429
430/// Server error message.
431#[derive(Debug, Clone)]
432pub struct ServerError {
433    /// Error number.
434    pub number: i32,
435    /// Error state.
436    pub state: u8,
437    /// Error severity class.
438    pub class: u8,
439    /// Error message text.
440    pub message: String,
441    /// Server name.
442    pub server: String,
443    /// Procedure name.
444    pub procedure: String,
445    /// Line number.
446    pub line: i32,
447}
448
449/// Server informational message.
450#[derive(Debug, Clone)]
451pub struct ServerInfo {
452    /// Info number.
453    pub number: i32,
454    /// Info state.
455    pub state: u8,
456    /// Info class (severity).
457    pub class: u8,
458    /// Info message text.
459    pub message: String,
460    /// Server name.
461    pub server: String,
462    /// Procedure name.
463    pub procedure: String,
464    /// Line number.
465    pub line: i32,
466}
467
468/// Login acknowledgment token.
469#[derive(Debug, Clone)]
470pub struct LoginAck {
471    /// Interface type.
472    pub interface: u8,
473    /// TDS version.
474    pub tds_version: u32,
475    /// Program name.
476    pub prog_name: String,
477    /// Program version.
478    pub prog_version: u32,
479}
480
481/// Environment change token.
482#[derive(Debug, Clone)]
483pub struct EnvChange {
484    /// Type of environment change.
485    pub env_type: EnvChangeType,
486    /// New value.
487    pub new_value: EnvChangeValue,
488    /// Old value.
489    pub old_value: EnvChangeValue,
490}
491
492/// Environment change type.
493#[derive(Debug, Clone, Copy, PartialEq, Eq)]
494#[repr(u8)]
495#[non_exhaustive]
496pub enum EnvChangeType {
497    /// Database changed.
498    Database = 1,
499    /// Language changed.
500    Language = 2,
501    /// Character set changed.
502    CharacterSet = 3,
503    /// Packet size changed.
504    PacketSize = 4,
505    /// Unicode data sorting locale ID.
506    UnicodeSortingLocalId = 5,
507    /// Unicode comparison flags.
508    UnicodeComparisonFlags = 6,
509    /// SQL collation.
510    SqlCollation = 7,
511    /// Begin transaction.
512    BeginTransaction = 8,
513    /// Commit transaction.
514    CommitTransaction = 9,
515    /// Rollback transaction.
516    RollbackTransaction = 10,
517    /// Enlist DTC transaction.
518    EnlistDtcTransaction = 11,
519    /// Defect DTC transaction.
520    DefectTransaction = 12,
521    /// Real-time log shipping.
522    RealTimeLogShipping = 13,
523    /// Promote transaction.
524    PromoteTransaction = 15,
525    /// Transaction manager address.
526    TransactionManagerAddress = 16,
527    /// Transaction ended.
528    TransactionEnded = 17,
529    /// Reset connection completion acknowledgment.
530    ResetConnectionCompletionAck = 18,
531    /// User instance started.
532    UserInstanceStarted = 19,
533    /// Routing information.
534    Routing = 20,
535}
536
537/// Environment change value.
538#[derive(Debug, Clone)]
539#[non_exhaustive]
540pub enum EnvChangeValue {
541    /// String value.
542    String(String),
543    /// Binary value.
544    Binary(bytes::Bytes),
545    /// Routing information.
546    Routing {
547        /// Host name.
548        host: String,
549        /// Port number.
550        port: u16,
551    },
552}
553
554/// Column ordering information.
555#[derive(Debug, Clone)]
556pub struct Order {
557    /// Ordered column indices.
558    pub columns: Vec<u16>,
559}
560
561/// Feature extension acknowledgment.
562#[derive(Debug, Clone)]
563pub struct FeatureExtAck {
564    /// Acknowledged features.
565    pub features: Vec<FeatureAck>,
566}
567
568/// Individual feature acknowledgment.
569#[derive(Debug, Clone)]
570pub struct FeatureAck {
571    /// Feature ID.
572    pub feature_id: u8,
573    /// Feature data.
574    pub data: bytes::Bytes,
575}
576
577/// SSPI authentication token.
578#[derive(Debug, Clone)]
579pub struct SspiToken {
580    /// SSPI data.
581    pub data: bytes::Bytes,
582}
583
584/// Session state token.
585#[derive(Debug, Clone)]
586pub struct SessionState {
587    /// Session state data.
588    pub data: bytes::Bytes,
589}
590
591/// Federated authentication info.
592#[derive(Debug, Clone)]
593pub struct FedAuthInfo {
594    /// STS URL.
595    pub sts_url: String,
596    /// Service principal name.
597    pub spn: String,
598}
599
600// =============================================================================
601// ColMetaData and Row Parsing Implementation
602// =============================================================================
603
604/// Decode collation information (5 bytes).
605///
606/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
607pub(crate) fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
608    if src.remaining() < 5 {
609        return Err(ProtocolError::UnexpectedEof);
610    }
611    // Collation: LCID (4 bytes) + Sort ID (1 byte)
612    let lcid = src.get_u32_le();
613    let sort_id = src.get_u8();
614    Ok(Collation { lcid, sort_id })
615}
616
617/// Decode type-specific metadata for a column based on its TypeId.
618///
619/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
620pub(crate) fn decode_type_info(
621    src: &mut impl Buf,
622    type_id: TypeId,
623    col_type: u8,
624) -> Result<TypeInfo, ProtocolError> {
625    match type_id {
626        // Fixed-length types have no additional metadata
627        TypeId::Null => Ok(TypeInfo::default()),
628        TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
629        TypeId::Int2 => Ok(TypeInfo::default()),
630        TypeId::Int4 => Ok(TypeInfo::default()),
631        TypeId::Int8 => Ok(TypeInfo::default()),
632        TypeId::Float4 => Ok(TypeInfo::default()),
633        TypeId::Float8 => Ok(TypeInfo::default()),
634        TypeId::Money => Ok(TypeInfo::default()),
635        TypeId::Money4 => Ok(TypeInfo::default()),
636        TypeId::DateTime => Ok(TypeInfo::default()),
637        TypeId::DateTime4 => Ok(TypeInfo::default()),
638
639        // Variable length integer/float/money (1-byte max length)
640        TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
641            if src.remaining() < 1 {
642                return Err(ProtocolError::UnexpectedEof);
643            }
644            let max_length = src.get_u8() as u32;
645            Ok(TypeInfo {
646                max_length: Some(max_length),
647                ..Default::default()
648            })
649        }
650
651        // GUID has 1-byte length
652        TypeId::Guid => {
653            if src.remaining() < 1 {
654                return Err(ProtocolError::UnexpectedEof);
655            }
656            let max_length = src.get_u8() as u32;
657            Ok(TypeInfo {
658                max_length: Some(max_length),
659                ..Default::default()
660            })
661        }
662
663        // Decimal/Numeric types (1-byte length + precision + scale)
664        TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
665            if src.remaining() < 3 {
666                return Err(ProtocolError::UnexpectedEof);
667            }
668            let max_length = src.get_u8() as u32;
669            let precision = src.get_u8();
670            let scale = src.get_u8();
671            Ok(TypeInfo {
672                max_length: Some(max_length),
673                precision: Some(precision),
674                scale: Some(scale),
675                ..Default::default()
676            })
677        }
678
679        // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
680        TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
681            if src.remaining() < 1 {
682                return Err(ProtocolError::UnexpectedEof);
683            }
684            let max_length = src.get_u8() as u32;
685            Ok(TypeInfo {
686                max_length: Some(max_length),
687                ..Default::default()
688            })
689        }
690
691        // Big varchar/binary with 2-byte length + collation for strings
692        TypeId::BigVarChar | TypeId::BigChar => {
693            if src.remaining() < 7 {
694                // 2 (length) + 5 (collation)
695                return Err(ProtocolError::UnexpectedEof);
696            }
697            let max_length = src.get_u16_le() as u32;
698            let collation = decode_collation(src)?;
699            Ok(TypeInfo {
700                max_length: Some(max_length),
701                collation: Some(collation),
702                ..Default::default()
703            })
704        }
705
706        // Big binary (2-byte length, no collation)
707        TypeId::BigVarBinary | TypeId::BigBinary => {
708            if src.remaining() < 2 {
709                return Err(ProtocolError::UnexpectedEof);
710            }
711            let max_length = src.get_u16_le() as u32;
712            Ok(TypeInfo {
713                max_length: Some(max_length),
714                ..Default::default()
715            })
716        }
717
718        // Unicode strings (NChar, NVarChar) - 2-byte length + collation
719        TypeId::NChar | TypeId::NVarChar => {
720            if src.remaining() < 7 {
721                // 2 (length) + 5 (collation)
722                return Err(ProtocolError::UnexpectedEof);
723            }
724            let max_length = src.get_u16_le() as u32;
725            let collation = decode_collation(src)?;
726            Ok(TypeInfo {
727                max_length: Some(max_length),
728                collation: Some(collation),
729                ..Default::default()
730            })
731        }
732
733        // Date type (no additional metadata)
734        TypeId::Date => Ok(TypeInfo::default()),
735
736        // Time, DateTime2, DateTimeOffset have scale
737        TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
738            if src.remaining() < 1 {
739                return Err(ProtocolError::UnexpectedEof);
740            }
741            let scale = src.get_u8();
742            Ok(TypeInfo {
743                scale: Some(scale),
744                ..Default::default()
745            })
746        }
747
748        // Text/NText/Image (deprecated LOB types)
749        TypeId::Text | TypeId::NText | TypeId::Image => {
750            // These have complex metadata: length (4) + collation (5) + table name parts
751            if src.remaining() < 4 {
752                return Err(ProtocolError::UnexpectedEof);
753            }
754            let max_length = src.get_u32_le();
755
756            // For Text/NText, read collation
757            let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
758                if src.remaining() < 5 {
759                    return Err(ProtocolError::UnexpectedEof);
760                }
761                Some(decode_collation(src)?)
762            } else {
763                None
764            };
765
766            // Skip table name parts (variable length)
767            // Format: numParts (1 byte) followed by us_varchar for each part
768            if src.remaining() < 1 {
769                return Err(ProtocolError::UnexpectedEof);
770            }
771            let num_parts = src.get_u8();
772            for _ in 0..num_parts {
773                // Read and discard table name part
774                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
775            }
776
777            Ok(TypeInfo {
778                max_length: Some(max_length),
779                collation,
780                ..Default::default()
781            })
782        }
783
784        // XML type
785        TypeId::Xml => {
786            if src.remaining() < 1 {
787                return Err(ProtocolError::UnexpectedEof);
788            }
789            let schema_present = src.get_u8();
790
791            if schema_present != 0 {
792                // XML_INFO per MS-TDS §2.2.5.5.3: DBNAME and OWNING_SCHEMA are
793                // B_VARCHAR (1-byte length); only XML_SCHEMA_COLLECTION is
794                // US_VARCHAR (2-byte length).
795                let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
796                let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
797                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
798            }
799
800            Ok(TypeInfo::default())
801        }
802
803        // UDT (User-defined type) - complex metadata
804        TypeId::Udt => {
805            // Max length (2 bytes)
806            if src.remaining() < 2 {
807                return Err(ProtocolError::UnexpectedEof);
808            }
809            let max_length = src.get_u16_le() as u32;
810
811            // UDT_INFO per MS-TDS: DB_NAME, SCHEMA_NAME, and TYPE_NAME are
812            // B_VARCHAR (1-byte length); only ASSEMBLY_QUALIFIED_NAME is
813            // US_VARCHAR (2-byte length).
814            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
815            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
816            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
817            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
818
819            Ok(TypeInfo {
820                max_length: Some(max_length),
821                ..Default::default()
822            })
823        }
824
825        // Table-valued parameter - complex metadata (skip for now)
826        TypeId::Tvp => {
827            // TVP has very complex metadata, not commonly used
828            // For now, we can't properly parse this
829            Err(ProtocolError::InvalidTokenType(col_type))
830        }
831
832        // SQL Variant - 4-byte length
833        TypeId::Variant => {
834            if src.remaining() < 4 {
835                return Err(ProtocolError::UnexpectedEof);
836            }
837            let max_length = src.get_u32_le();
838            Ok(TypeInfo {
839                max_length: Some(max_length),
840                ..Default::default()
841            })
842        }
843    }
844}
845
846impl ColMetaData {
847    /// Special value indicating no metadata.
848    pub const NO_METADATA: u16 = 0xFFFF;
849
850    /// Decode a COLMETADATA token from bytes.
851    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
852        if src.remaining() < 2 {
853            return Err(ProtocolError::UnexpectedEof);
854        }
855
856        let column_count = src.get_u16_le();
857
858        // 0xFFFF means no metadata present
859        if column_count == Self::NO_METADATA {
860            return Ok(Self {
861                columns: Vec::new(),
862                cek_table: None,
863            });
864        }
865
866        let mut columns = Vec::with_capacity(column_count as usize);
867
868        for _ in 0..column_count {
869            let column = Self::decode_column(src)?;
870            columns.push(column);
871        }
872
873        Ok(Self {
874            columns,
875            cek_table: None,
876        })
877    }
878
879    /// Decode an ALTMETADATA (COMPUTE BY) token body, returning the compute Id
880    /// and the column metadata describing the matching ALTROW values.
881    ///
882    /// The token type byte (0x88) must already be consumed. ALTMETADATA was
883    /// deprecated in TDS 7.4, but legacy `COMPUTE BY` queries still emit it. The
884    /// driver parses it only to learn the ALTROW value layout so those rows can
885    /// be consumed and dropped, keeping the token stream byte-aligned.
886    ///
887    /// Wire format (MS-TDS 2.2.7.1):
888    /// `Count(USHORT) Id(USHORT) ByCols(UCHAR) ByCols*ColNum(USHORT) Count*ComputeData`,
889    /// where each `ComputeData` is `Op(BYTE) Operand(USHORT)` followed by a
890    /// column definition identical to COLMETADATA (TableName is never sent for
891    /// COMPUTE, which excludes text/ntext/image columns).
892    fn decode_alt(src: &mut impl Buf) -> Result<(u16, Self), ProtocolError> {
893        // Count (USHORT) + Id (USHORT) + ByCols (UCHAR)
894        if src.remaining() < 5 {
895            return Err(ProtocolError::UnexpectedEof);
896        }
897        let count = src.get_u16_le();
898        let id = src.get_u16_le();
899        let by_cols = src.get_u8() as usize;
900
901        // ColNum: USHORT repeated ByCols times (the grouping columns). Not needed
902        // to parse ALTROW values, but must be consumed to stay byte-aligned.
903        if src.remaining() < by_cols * 2 {
904            return Err(ProtocolError::UnexpectedEof);
905        }
906        for _ in 0..by_cols {
907            let _ = src.get_u16_le();
908        }
909
910        let mut columns = Vec::with_capacity(count as usize);
911        for _ in 0..count {
912            // ComputeData prefix: Op (BYTE) + Operand (USHORT).
913            if src.remaining() < 3 {
914                return Err(ProtocolError::UnexpectedEof);
915            }
916            let _op = src.get_u8();
917            let _operand = src.get_u16_le();
918            columns.push(Self::decode_column(src)?);
919        }
920
921        Ok((
922            id,
923            Self {
924                columns,
925                cek_table: None,
926            },
927        ))
928    }
929
930    /// Decode a single column from the metadata.
931    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
932        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
933        if src.remaining() < 7 {
934            return Err(ProtocolError::UnexpectedEof);
935        }
936
937        let user_type = src.get_u32_le();
938        let flags = src.get_u16_le();
939        let col_type = src.get_u8();
940
941        // An unknown type byte must be a hard error: treating it as Null (a
942        // zero-length column) misaligns every subsequent column and row,
943        // producing plausible garbage values (issue #157).
944        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
945
946        // Parse type-specific metadata
947        let type_info = decode_type_info(src, type_id, col_type)?;
948
949        // Read column name (B_VARCHAR format - 1 byte length in characters)
950        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
951
952        Ok(ColumnData {
953            name,
954            type_id,
955            col_type,
956            flags,
957            user_type,
958            type_info,
959            crypto_metadata: None,
960        })
961    }
962
963    /// Decode a COLMETADATA token with Always Encrypted support.
964    ///
965    /// When column encryption was negotiated in Login7, the server sends a CekTable
966    /// before column definitions and per-column CryptoMetadata for encrypted columns.
967    ///
968    /// # Wire Format (with encryption)
969    ///
970    /// ```text
971    /// column_count: USHORT
972    /// cek_table: CekTable (always present when encryption negotiated)
973    /// columns: ColumnData[column_count] (with CryptoMetadata for encrypted columns)
974    /// ```
975    pub fn decode_encrypted(src: &mut impl Buf) -> Result<Self, ProtocolError> {
976        if src.remaining() < 2 {
977            return Err(ProtocolError::UnexpectedEof);
978        }
979
980        let column_count = src.get_u16_le();
981
982        if column_count == Self::NO_METADATA {
983            return Ok(Self {
984                columns: Vec::new(),
985                cek_table: None,
986            });
987        }
988
989        // Parse CEK table (always present when encryption was negotiated)
990        let cek_table = crate::crypto::CekTable::decode(src)?;
991
992        let mut columns = Vec::with_capacity(column_count as usize);
993
994        for _ in 0..column_count {
995            let column = Self::decode_column_encrypted(src)?;
996            columns.push(column);
997        }
998
999        Ok(Self {
1000            columns,
1001            cek_table: Some(cek_table),
1002        })
1003    }
1004
1005    /// Decode a single column definition with Always Encrypted support.
1006    ///
1007    /// For encrypted columns (flags & 0x0800), parses CryptoMetadata after the type info.
1008    fn decode_column_encrypted(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
1009        if src.remaining() < 7 {
1010            return Err(ProtocolError::UnexpectedEof);
1011        }
1012
1013        let user_type = src.get_u32_le();
1014        let flags = src.get_u16_le();
1015        let col_type = src.get_u8();
1016
1017        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
1018
1019        // Parse type-specific metadata (for encrypted columns, this is the transport type)
1020        let type_info = decode_type_info(src, type_id, col_type)?;
1021
1022        // Parse CryptoMetadata if the column is encrypted
1023        let crypto_metadata = if crate::crypto::is_column_encrypted(flags) {
1024            Some(crate::crypto::CryptoMetadata::decode(src)?)
1025        } else {
1026            None
1027        };
1028
1029        // Read column name
1030        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1031
1032        Ok(ColumnData {
1033            name,
1034            type_id,
1035            col_type,
1036            flags,
1037            user_type,
1038            type_info,
1039            crypto_metadata,
1040        })
1041    }
1042
1043    /// Get the number of columns.
1044    #[must_use]
1045    pub fn column_count(&self) -> usize {
1046        self.columns.len()
1047    }
1048
1049    /// Check if this represents no metadata.
1050    #[must_use]
1051    pub fn is_empty(&self) -> bool {
1052        self.columns.is_empty()
1053    }
1054}
1055
1056impl ColumnData {
1057    /// Check if this column is nullable.
1058    #[must_use]
1059    pub fn is_nullable(&self) -> bool {
1060        (self.flags & 0x0001) != 0
1061    }
1062
1063    /// Get the fixed size in bytes for this column, if applicable.
1064    ///
1065    /// Returns `None` for variable-length types.
1066    #[must_use]
1067    pub fn fixed_size(&self) -> Option<usize> {
1068        match self.type_id {
1069            TypeId::Null => Some(0),
1070            TypeId::Int1 | TypeId::Bit => Some(1),
1071            TypeId::Int2 => Some(2),
1072            TypeId::Int4 => Some(4),
1073            TypeId::Int8 => Some(8),
1074            TypeId::Float4 => Some(4),
1075            TypeId::Float8 => Some(8),
1076            TypeId::Money => Some(8),
1077            TypeId::Money4 => Some(4),
1078            TypeId::DateTime => Some(8),
1079            TypeId::DateTime4 => Some(4),
1080            TypeId::Date => Some(3),
1081            _ => None,
1082        }
1083    }
1084}
1085
1086// =============================================================================
1087// Row Parsing Implementation
1088// =============================================================================
1089
1090impl RawRow {
1091    /// Decode a ROW token from bytes.
1092    ///
1093    /// This function requires the column metadata to know how to parse the row.
1094    /// The row data is stored as raw bytes for later parsing.
1095    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1096        let mut data = bytes::BytesMut::new();
1097
1098        for col in &metadata.columns {
1099            Self::decode_column_value(src, col, &mut data)?;
1100        }
1101
1102        Ok(Self {
1103            data: data.freeze(),
1104        })
1105    }
1106
1107    /// Decode only the first `prefix_len` columns of a ROW token, leaving `src`
1108    /// positioned at the start of column `prefix_len`.
1109    ///
1110    /// Used by the BLOB streaming path to decode the leading scalar columns of a
1111    /// row and stop at a trailing MAX column, whose PLP value is then streamed
1112    /// directly from the socket rather than buffered.
1113    pub fn decode_prefix(
1114        src: &mut impl Buf,
1115        metadata: &ColMetaData,
1116        prefix_len: usize,
1117    ) -> Result<Self, ProtocolError> {
1118        let mut data = bytes::BytesMut::new();
1119        for col in metadata.columns.iter().take(prefix_len) {
1120            Self::decode_column_value(src, col, &mut data)?;
1121        }
1122        Ok(Self {
1123            data: data.freeze(),
1124        })
1125    }
1126
1127    /// Decode a single column value and append to the output buffer.
1128    fn decode_column_value(
1129        src: &mut impl Buf,
1130        col: &ColumnData,
1131        dst: &mut bytes::BytesMut,
1132    ) -> Result<(), ProtocolError> {
1133        match col.type_id {
1134            // Fixed-length types
1135            TypeId::Null => {
1136                // No data
1137            }
1138            TypeId::Int1 | TypeId::Bit => {
1139                if src.remaining() < 1 {
1140                    return Err(ProtocolError::UnexpectedEof);
1141                }
1142                dst.extend_from_slice(&[src.get_u8()]);
1143            }
1144            TypeId::Int2 => {
1145                if src.remaining() < 2 {
1146                    return Err(ProtocolError::UnexpectedEof);
1147                }
1148                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
1149            }
1150            TypeId::Int4 => {
1151                if src.remaining() < 4 {
1152                    return Err(ProtocolError::UnexpectedEof);
1153                }
1154                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1155            }
1156            TypeId::Int8 => {
1157                if src.remaining() < 8 {
1158                    return Err(ProtocolError::UnexpectedEof);
1159                }
1160                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1161            }
1162            TypeId::Float4 => {
1163                if src.remaining() < 4 {
1164                    return Err(ProtocolError::UnexpectedEof);
1165                }
1166                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1167            }
1168            TypeId::Float8 => {
1169                if src.remaining() < 8 {
1170                    return Err(ProtocolError::UnexpectedEof);
1171                }
1172                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1173            }
1174            TypeId::Money => {
1175                if src.remaining() < 8 {
1176                    return Err(ProtocolError::UnexpectedEof);
1177                }
1178                let hi = src.get_u32_le();
1179                let lo = src.get_u32_le();
1180                dst.extend_from_slice(&hi.to_le_bytes());
1181                dst.extend_from_slice(&lo.to_le_bytes());
1182            }
1183            TypeId::Money4 => {
1184                if src.remaining() < 4 {
1185                    return Err(ProtocolError::UnexpectedEof);
1186                }
1187                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1188            }
1189            TypeId::DateTime => {
1190                if src.remaining() < 8 {
1191                    return Err(ProtocolError::UnexpectedEof);
1192                }
1193                let days = src.get_u32_le();
1194                let time = src.get_u32_le();
1195                dst.extend_from_slice(&days.to_le_bytes());
1196                dst.extend_from_slice(&time.to_le_bytes());
1197            }
1198            TypeId::DateTime4 => {
1199                if src.remaining() < 4 {
1200                    return Err(ProtocolError::UnexpectedEof);
1201                }
1202                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1203            }
1204            // DATE type uses 1-byte length prefix (can be NULL)
1205            TypeId::Date => {
1206                Self::decode_bytelen_type(src, dst)?;
1207            }
1208
1209            // Variable-length nullable types (length-prefixed)
1210            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
1211                Self::decode_bytelen_type(src, dst)?;
1212            }
1213
1214            TypeId::Guid => {
1215                Self::decode_bytelen_type(src, dst)?;
1216            }
1217
1218            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1219                Self::decode_bytelen_type(src, dst)?;
1220            }
1221
1222            // Old-style byte-length strings
1223            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
1224                Self::decode_bytelen_type(src, dst)?;
1225            }
1226
1227            // 2-byte length strings (or PLP for MAX types)
1228            TypeId::BigVarChar | TypeId::BigVarBinary => {
1229                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
1230                if col.type_info.max_length == Some(0xFFFF) {
1231                    Self::decode_plp_type(src, dst)?;
1232                } else {
1233                    Self::decode_ushortlen_type(src, dst)?;
1234                }
1235            }
1236
1237            // Fixed-length types that don't have MAX variants
1238            TypeId::BigChar | TypeId::BigBinary => {
1239                Self::decode_ushortlen_type(src, dst)?;
1240            }
1241
1242            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
1243            TypeId::NVarChar => {
1244                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
1245                if col.type_info.max_length == Some(0xFFFF) {
1246                    Self::decode_plp_type(src, dst)?;
1247                } else {
1248                    Self::decode_ushortlen_type(src, dst)?;
1249                }
1250            }
1251
1252            // Fixed-length NCHAR doesn't have MAX variant
1253            TypeId::NChar => {
1254                Self::decode_ushortlen_type(src, dst)?;
1255            }
1256
1257            // Time types with scale
1258            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
1259                Self::decode_bytelen_type(src, dst)?;
1260            }
1261
1262            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
1263            TypeId::Text | TypeId::NText | TypeId::Image => {
1264                Self::decode_textptr_type(src, dst)?;
1265            }
1266
1267            // XML - uses actual PLP format
1268            TypeId::Xml => {
1269                Self::decode_plp_type(src, dst)?;
1270            }
1271
1272            // Complex types
1273            TypeId::Variant => {
1274                Self::decode_intlen_type(src, dst)?;
1275            }
1276
1277            TypeId::Udt => {
1278                // UDT uses PLP encoding
1279                Self::decode_plp_type(src, dst)?;
1280            }
1281
1282            TypeId::Tvp => {
1283                // TVP not supported in row data
1284                return Err(ProtocolError::InvalidTokenType(col.col_type));
1285            }
1286        }
1287
1288        Ok(())
1289    }
1290
1291    /// Decode a 1-byte length-prefixed value.
1292    fn decode_bytelen_type(
1293        src: &mut impl Buf,
1294        dst: &mut bytes::BytesMut,
1295    ) -> Result<(), ProtocolError> {
1296        if src.remaining() < 1 {
1297            return Err(ProtocolError::UnexpectedEof);
1298        }
1299        let len = src.get_u8() as usize;
1300        if len == 0xFF {
1301            // NULL value - store as zero-length with NULL marker
1302            dst.extend_from_slice(&[0xFF]);
1303        } else if len == 0 {
1304            // Empty value
1305            dst.extend_from_slice(&[0x00]);
1306        } else {
1307            if src.remaining() < len {
1308                return Err(ProtocolError::UnexpectedEof);
1309            }
1310            dst.extend_from_slice(&[len as u8]);
1311            for _ in 0..len {
1312                dst.extend_from_slice(&[src.get_u8()]);
1313            }
1314        }
1315        Ok(())
1316    }
1317
1318    /// Decode a 2-byte length-prefixed value.
1319    fn decode_ushortlen_type(
1320        src: &mut impl Buf,
1321        dst: &mut bytes::BytesMut,
1322    ) -> Result<(), ProtocolError> {
1323        if src.remaining() < 2 {
1324            return Err(ProtocolError::UnexpectedEof);
1325        }
1326        let len = src.get_u16_le() as usize;
1327        if len == 0xFFFF {
1328            // NULL value
1329            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1330        } else if len == 0 {
1331            // Empty value
1332            dst.extend_from_slice(&0u16.to_le_bytes());
1333        } else {
1334            if src.remaining() < len {
1335                return Err(ProtocolError::UnexpectedEof);
1336            }
1337            dst.extend_from_slice(&(len as u16).to_le_bytes());
1338            for _ in 0..len {
1339                dst.extend_from_slice(&[src.get_u8()]);
1340            }
1341        }
1342        Ok(())
1343    }
1344
1345    /// Decode a 4-byte length-prefixed value.
1346    fn decode_intlen_type(
1347        src: &mut impl Buf,
1348        dst: &mut bytes::BytesMut,
1349    ) -> Result<(), ProtocolError> {
1350        if src.remaining() < 4 {
1351            return Err(ProtocolError::UnexpectedEof);
1352        }
1353        let len = src.get_u32_le() as usize;
1354        if len == 0xFFFFFFFF {
1355            // NULL value
1356            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1357        } else if len == 0 {
1358            // Empty value
1359            dst.extend_from_slice(&0u32.to_le_bytes());
1360        } else {
1361            if src.remaining() < len {
1362                return Err(ProtocolError::UnexpectedEof);
1363            }
1364            dst.extend_from_slice(&(len as u32).to_le_bytes());
1365            for _ in 0..len {
1366                dst.extend_from_slice(&[src.get_u8()]);
1367            }
1368        }
1369        Ok(())
1370    }
1371
1372    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1373    ///
1374    /// These deprecated LOB types use a special format:
1375    /// - 1 byte: textptr_len (0 = NULL)
1376    /// - textptr_len bytes: textptr (if not NULL)
1377    /// - 8 bytes: timestamp (if not NULL)
1378    /// - 4 bytes: data length (if not NULL)
1379    /// - data_len bytes: the actual data (if not NULL)
1380    ///
1381    /// We convert this to PLP format for the client to parse:
1382    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1383    /// - 4 bytes: chunk length (= data length)
1384    /// - chunk data
1385    /// - 4 bytes: 0 (terminator)
1386    fn decode_textptr_type(
1387        src: &mut impl Buf,
1388        dst: &mut bytes::BytesMut,
1389    ) -> Result<(), ProtocolError> {
1390        if src.remaining() < 1 {
1391            return Err(ProtocolError::UnexpectedEof);
1392        }
1393
1394        let textptr_len = src.get_u8() as usize;
1395
1396        if textptr_len == 0 {
1397            // NULL value - write PLP NULL marker
1398            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1399            return Ok(());
1400        }
1401
1402        // Skip textptr bytes
1403        if src.remaining() < textptr_len {
1404            return Err(ProtocolError::UnexpectedEof);
1405        }
1406        src.advance(textptr_len);
1407
1408        // Skip 8-byte timestamp
1409        if src.remaining() < 8 {
1410            return Err(ProtocolError::UnexpectedEof);
1411        }
1412        src.advance(8);
1413
1414        // Read data length
1415        if src.remaining() < 4 {
1416            return Err(ProtocolError::UnexpectedEof);
1417        }
1418        let data_len = src.get_u32_le() as usize;
1419
1420        if src.remaining() < data_len {
1421            return Err(ProtocolError::UnexpectedEof);
1422        }
1423
1424        // Write in PLP format for client parsing:
1425        // - 8 bytes: total length
1426        // - 4 bytes: chunk length
1427        // - chunk data
1428        // - 4 bytes: 0 (terminator)
1429        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1430        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1431        for _ in 0..data_len {
1432            dst.extend_from_slice(&[src.get_u8()]);
1433        }
1434        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1435
1436        Ok(())
1437    }
1438
1439    /// Decode a PLP (Partially Length-Prefixed) value.
1440    ///
1441    /// PLP format:
1442    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1443    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1444    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1445        if src.remaining() < 8 {
1446            return Err(ProtocolError::UnexpectedEof);
1447        }
1448
1449        let total_len = src.get_u64_le();
1450
1451        // Store the total length marker
1452        dst.extend_from_slice(&total_len.to_le_bytes());
1453
1454        if total_len == 0xFFFFFFFFFFFFFFFF {
1455            // NULL value - no more data
1456            return Ok(());
1457        }
1458
1459        // Read chunks until terminator
1460        loop {
1461            if src.remaining() < 4 {
1462                return Err(ProtocolError::UnexpectedEof);
1463            }
1464            let chunk_len = src.get_u32_le() as usize;
1465            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1466
1467            if chunk_len == 0 {
1468                // End of PLP data
1469                break;
1470            }
1471
1472            if src.remaining() < chunk_len {
1473                return Err(ProtocolError::UnexpectedEof);
1474            }
1475
1476            for _ in 0..chunk_len {
1477                dst.extend_from_slice(&[src.get_u8()]);
1478            }
1479        }
1480
1481        Ok(())
1482    }
1483}
1484
1485// =============================================================================
1486// NbcRow Parsing Implementation
1487// =============================================================================
1488
1489impl NbcRow {
1490    /// Decode an NBCROW token from bytes.
1491    ///
1492    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1493    /// columns are NULL, followed by only the non-NULL values.
1494    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1495        let col_count = metadata.columns.len();
1496        let bitmap_len = col_count.div_ceil(8);
1497
1498        if src.remaining() < bitmap_len {
1499            return Err(ProtocolError::UnexpectedEof);
1500        }
1501
1502        // Read null bitmap
1503        let mut null_bitmap = vec![0u8; bitmap_len];
1504        for byte in &mut null_bitmap {
1505            *byte = src.get_u8();
1506        }
1507
1508        // Read non-null values
1509        let mut data = bytes::BytesMut::new();
1510
1511        for (i, col) in metadata.columns.iter().enumerate() {
1512            let byte_idx = i / 8;
1513            let bit_idx = i % 8;
1514            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1515
1516            if !is_null {
1517                // Read the value - for NBCROW, we read without the length prefix
1518                // for fixed-length types, and with length prefix for variable types
1519                RawRow::decode_column_value(src, col, &mut data)?;
1520            }
1521        }
1522
1523        Ok(Self {
1524            null_bitmap,
1525            data: data.freeze(),
1526        })
1527    }
1528
1529    /// Decode the null bitmap and the first `prefix_len` columns of an NBCROW,
1530    /// leaving `src` positioned at column `prefix_len`'s value (when that column
1531    /// is non-NULL per the bitmap). The returned row carries the full bitmap and
1532    /// the leading non-NULL values; query the trailing column's nullness with
1533    /// [`is_null`](Self::is_null).
1534    pub fn decode_prefix(
1535        src: &mut impl Buf,
1536        metadata: &ColMetaData,
1537        prefix_len: usize,
1538    ) -> Result<Self, ProtocolError> {
1539        let col_count = metadata.columns.len();
1540        let bitmap_len = col_count.div_ceil(8);
1541
1542        if src.remaining() < bitmap_len {
1543            return Err(ProtocolError::UnexpectedEof);
1544        }
1545
1546        let mut null_bitmap = vec![0u8; bitmap_len];
1547        for byte in &mut null_bitmap {
1548            *byte = src.get_u8();
1549        }
1550
1551        let mut data = bytes::BytesMut::new();
1552        for (i, col) in metadata.columns.iter().enumerate().take(prefix_len) {
1553            let is_null = (null_bitmap[i / 8] & (1 << (i % 8))) != 0;
1554            if !is_null {
1555                RawRow::decode_column_value(src, col, &mut data)?;
1556            }
1557        }
1558
1559        Ok(Self {
1560            null_bitmap,
1561            data: data.freeze(),
1562        })
1563    }
1564
1565    /// Check if a column at the given index is NULL.
1566    #[must_use]
1567    pub fn is_null(&self, column_index: usize) -> bool {
1568        let byte_idx = column_index / 8;
1569        let bit_idx = column_index % 8;
1570        if byte_idx < self.null_bitmap.len() {
1571            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1572        } else {
1573            true // Out of bounds = NULL
1574        }
1575    }
1576}
1577
1578// =============================================================================
1579// ReturnValue Parsing Implementation
1580// =============================================================================
1581
1582impl ReturnValue {
1583    /// Decode a RETURNVALUE token from bytes.
1584    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1585        // MS-TDS §2.2.7.18: the RETURNVALUE token has no length prefix —
1586        // it begins directly with the 2-byte ParamOrdinal. The previous
1587        // spurious 2-byte read consumed the ordinal and shifted every
1588        // subsequent field, leaving the stream parser two bytes ahead and
1589        // reading value bytes as the next token type (e.g. `0x74` from a
1590        // Unicode name fragment was misread as an unknown token).
1591        if src.remaining() < 2 {
1592            return Err(ProtocolError::UnexpectedEof);
1593        }
1594        let param_ordinal = src.get_u16_le();
1595
1596        // Parameter name (B_VARCHAR)
1597        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1598
1599        // Status (1 byte)
1600        if src.remaining() < 1 {
1601            return Err(ProtocolError::UnexpectedEof);
1602        }
1603        let status = src.get_u8();
1604
1605        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1606        if src.remaining() < 7 {
1607            return Err(ProtocolError::UnexpectedEof);
1608        }
1609        let user_type = src.get_u32_le();
1610        let flags = src.get_u16_le();
1611        let col_type = src.get_u8();
1612
1613        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
1614
1615        // Parse type info
1616        let type_info = decode_type_info(src, type_id, col_type)?;
1617
1618        // Read the value data
1619        let mut value_buf = bytes::BytesMut::new();
1620
1621        // Create a temporary column for value parsing
1622        let temp_col = ColumnData {
1623            name: String::new(),
1624            type_id,
1625            col_type,
1626            flags,
1627            user_type,
1628            type_info: type_info.clone(),
1629            crypto_metadata: None,
1630        };
1631
1632        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1633
1634        Ok(Self {
1635            param_ordinal,
1636            param_name,
1637            status,
1638            user_type,
1639            flags,
1640            col_type,
1641            type_info,
1642            value: value_buf.freeze(),
1643        })
1644    }
1645}
1646
1647// =============================================================================
1648// SessionState Parsing Implementation
1649// =============================================================================
1650
1651impl SessionState {
1652    /// Decode a SESSIONSTATE token from bytes.
1653    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1654        if src.remaining() < 4 {
1655            return Err(ProtocolError::UnexpectedEof);
1656        }
1657
1658        let length = src.get_u32_le() as usize;
1659
1660        if src.remaining() < length {
1661            return Err(ProtocolError::IncompletePacket {
1662                expected: length,
1663                actual: src.remaining(),
1664            });
1665        }
1666
1667        let data = src.copy_to_bytes(length);
1668
1669        Ok(Self { data })
1670    }
1671}
1672
1673// =============================================================================
1674// Token Parsing Implementation
1675// =============================================================================
1676
1677/// Done token status flags bit positions.
1678mod done_status_bits {
1679    pub const DONE_MORE: u16 = 0x0001;
1680    pub const DONE_ERROR: u16 = 0x0002;
1681    pub const DONE_INXACT: u16 = 0x0004;
1682    pub const DONE_COUNT: u16 = 0x0010;
1683    pub const DONE_ATTN: u16 = 0x0020;
1684    pub const DONE_SRVERROR: u16 = 0x0100;
1685}
1686
1687impl DoneStatus {
1688    /// Parse done status from raw bits.
1689    #[must_use]
1690    pub fn from_bits(bits: u16) -> Self {
1691        use done_status_bits::*;
1692        Self {
1693            more: (bits & DONE_MORE) != 0,
1694            error: (bits & DONE_ERROR) != 0,
1695            in_xact: (bits & DONE_INXACT) != 0,
1696            count: (bits & DONE_COUNT) != 0,
1697            attn: (bits & DONE_ATTN) != 0,
1698            srverror: (bits & DONE_SRVERROR) != 0,
1699        }
1700    }
1701
1702    /// Convert to raw bits.
1703    #[must_use]
1704    pub fn to_bits(&self) -> u16 {
1705        use done_status_bits::*;
1706        let mut bits = 0u16;
1707        if self.more {
1708            bits |= DONE_MORE;
1709        }
1710        if self.error {
1711            bits |= DONE_ERROR;
1712        }
1713        if self.in_xact {
1714            bits |= DONE_INXACT;
1715        }
1716        if self.count {
1717            bits |= DONE_COUNT;
1718        }
1719        if self.attn {
1720            bits |= DONE_ATTN;
1721        }
1722        if self.srverror {
1723            bits |= DONE_SRVERROR;
1724        }
1725        bits
1726    }
1727}
1728
1729impl Done {
1730    /// Size of the DONE token in bytes (excluding token type byte).
1731    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1732
1733    /// Decode a DONE token from bytes.
1734    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1735        if src.remaining() < Self::SIZE {
1736            return Err(ProtocolError::IncompletePacket {
1737                expected: Self::SIZE,
1738                actual: src.remaining(),
1739            });
1740        }
1741
1742        let status = DoneStatus::from_bits(src.get_u16_le());
1743        let cur_cmd = src.get_u16_le();
1744        let row_count = src.get_u64_le();
1745
1746        Ok(Self {
1747            status,
1748            cur_cmd,
1749            row_count,
1750        })
1751    }
1752
1753    /// Encode the DONE token to bytes.
1754    pub fn encode(&self, dst: &mut impl BufMut) {
1755        dst.put_u8(TokenType::Done as u8);
1756        dst.put_u16_le(self.status.to_bits());
1757        dst.put_u16_le(self.cur_cmd);
1758        dst.put_u64_le(self.row_count);
1759    }
1760
1761    /// Check if more results follow this DONE token.
1762    #[must_use]
1763    pub const fn has_more(&self) -> bool {
1764        self.status.more
1765    }
1766
1767    /// Check if an error occurred.
1768    #[must_use]
1769    pub const fn has_error(&self) -> bool {
1770        self.status.error
1771    }
1772
1773    /// Check if the row count is valid.
1774    #[must_use]
1775    pub const fn has_count(&self) -> bool {
1776        self.status.count
1777    }
1778}
1779
1780impl DoneProc {
1781    /// Size of the DONEPROC token in bytes (excluding token type byte).
1782    pub const SIZE: usize = 12;
1783
1784    /// Decode a DONEPROC token from bytes.
1785    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1786        if src.remaining() < Self::SIZE {
1787            return Err(ProtocolError::IncompletePacket {
1788                expected: Self::SIZE,
1789                actual: src.remaining(),
1790            });
1791        }
1792
1793        let status = DoneStatus::from_bits(src.get_u16_le());
1794        let cur_cmd = src.get_u16_le();
1795        let row_count = src.get_u64_le();
1796
1797        Ok(Self {
1798            status,
1799            cur_cmd,
1800            row_count,
1801        })
1802    }
1803
1804    /// Encode the DONEPROC token to bytes.
1805    pub fn encode(&self, dst: &mut impl BufMut) {
1806        dst.put_u8(TokenType::DoneProc as u8);
1807        dst.put_u16_le(self.status.to_bits());
1808        dst.put_u16_le(self.cur_cmd);
1809        dst.put_u64_le(self.row_count);
1810    }
1811}
1812
1813impl DoneInProc {
1814    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1815    pub const SIZE: usize = 12;
1816
1817    /// Decode a DONEINPROC token from bytes.
1818    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1819        if src.remaining() < Self::SIZE {
1820            return Err(ProtocolError::IncompletePacket {
1821                expected: Self::SIZE,
1822                actual: src.remaining(),
1823            });
1824        }
1825
1826        let status = DoneStatus::from_bits(src.get_u16_le());
1827        let cur_cmd = src.get_u16_le();
1828        let row_count = src.get_u64_le();
1829
1830        Ok(Self {
1831            status,
1832            cur_cmd,
1833            row_count,
1834        })
1835    }
1836
1837    /// Encode the DONEINPROC token to bytes.
1838    pub fn encode(&self, dst: &mut impl BufMut) {
1839        dst.put_u8(TokenType::DoneInProc as u8);
1840        dst.put_u16_le(self.status.to_bits());
1841        dst.put_u16_le(self.cur_cmd);
1842        dst.put_u64_le(self.row_count);
1843    }
1844}
1845
1846impl ServerError {
1847    /// Decode an ERROR token from bytes.
1848    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1849        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1850        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1851        if src.remaining() < 2 {
1852            return Err(ProtocolError::UnexpectedEof);
1853        }
1854
1855        let _length = src.get_u16_le();
1856
1857        if src.remaining() < 6 {
1858            return Err(ProtocolError::UnexpectedEof);
1859        }
1860
1861        let number = src.get_i32_le();
1862        let state = src.get_u8();
1863        let class = src.get_u8();
1864
1865        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1866        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1867        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1868
1869        if src.remaining() < 4 {
1870            return Err(ProtocolError::UnexpectedEof);
1871        }
1872        let line = src.get_i32_le();
1873
1874        Ok(Self {
1875            number,
1876            state,
1877            class,
1878            message,
1879            server,
1880            procedure,
1881            line,
1882        })
1883    }
1884
1885    /// Check if this is a fatal error (severity >= 20).
1886    #[must_use]
1887    pub const fn is_fatal(&self) -> bool {
1888        self.class >= 20
1889    }
1890
1891    /// Check if this error indicates the batch was aborted (severity >= 16).
1892    #[must_use]
1893    pub const fn is_batch_abort(&self) -> bool {
1894        self.class >= 16
1895    }
1896}
1897
1898impl ServerInfo {
1899    /// Decode an INFO token from bytes.
1900    ///
1901    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1902    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1903        if src.remaining() < 2 {
1904            return Err(ProtocolError::UnexpectedEof);
1905        }
1906
1907        let _length = src.get_u16_le();
1908
1909        if src.remaining() < 6 {
1910            return Err(ProtocolError::UnexpectedEof);
1911        }
1912
1913        let number = src.get_i32_le();
1914        let state = src.get_u8();
1915        let class = src.get_u8();
1916
1917        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1918        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1919        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1920
1921        if src.remaining() < 4 {
1922            return Err(ProtocolError::UnexpectedEof);
1923        }
1924        let line = src.get_i32_le();
1925
1926        Ok(Self {
1927            number,
1928            state,
1929            class,
1930            message,
1931            server,
1932            procedure,
1933            line,
1934        })
1935    }
1936}
1937
1938impl LoginAck {
1939    /// Decode a LOGINACK token from bytes.
1940    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1941        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1942        if src.remaining() < 2 {
1943            return Err(ProtocolError::UnexpectedEof);
1944        }
1945
1946        let _length = src.get_u16_le();
1947
1948        if src.remaining() < 5 {
1949            return Err(ProtocolError::UnexpectedEof);
1950        }
1951
1952        let interface = src.get_u8();
1953        let tds_version = src.get_u32_le();
1954        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1955
1956        if src.remaining() < 4 {
1957            return Err(ProtocolError::UnexpectedEof);
1958        }
1959        let prog_version = src.get_u32_le();
1960
1961        Ok(Self {
1962            interface,
1963            tds_version,
1964            prog_name,
1965            prog_version,
1966        })
1967    }
1968
1969    /// Get the TDS version as a `TdsVersion`.
1970    #[must_use]
1971    pub fn tds_version(&self) -> crate::version::TdsVersion {
1972        crate::version::TdsVersion::new(self.tds_version)
1973    }
1974}
1975
1976impl EnvChangeType {
1977    /// Create from raw byte value.
1978    pub fn from_u8(value: u8) -> Option<Self> {
1979        match value {
1980            1 => Some(Self::Database),
1981            2 => Some(Self::Language),
1982            3 => Some(Self::CharacterSet),
1983            4 => Some(Self::PacketSize),
1984            5 => Some(Self::UnicodeSortingLocalId),
1985            6 => Some(Self::UnicodeComparisonFlags),
1986            7 => Some(Self::SqlCollation),
1987            8 => Some(Self::BeginTransaction),
1988            9 => Some(Self::CommitTransaction),
1989            10 => Some(Self::RollbackTransaction),
1990            11 => Some(Self::EnlistDtcTransaction),
1991            12 => Some(Self::DefectTransaction),
1992            13 => Some(Self::RealTimeLogShipping),
1993            15 => Some(Self::PromoteTransaction),
1994            16 => Some(Self::TransactionManagerAddress),
1995            17 => Some(Self::TransactionEnded),
1996            18 => Some(Self::ResetConnectionCompletionAck),
1997            19 => Some(Self::UserInstanceStarted),
1998            20 => Some(Self::Routing),
1999            _ => None,
2000        }
2001    }
2002}
2003
2004impl EnvChange {
2005    /// Decode an ENVCHANGE token from bytes.
2006    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2007        if src.remaining() < 3 {
2008            return Err(ProtocolError::UnexpectedEof);
2009        }
2010
2011        let length = src.get_u16_le() as usize;
2012        if length == 0 {
2013            // The frame must at least contain the type byte; reading it from
2014            // outside a zero-length frame would consume the next token.
2015            return Err(ProtocolError::UnexpectedEof);
2016        }
2017        if src.remaining() < length {
2018            return Err(ProtocolError::IncompletePacket {
2019                expected: length,
2020                actual: src.remaining(),
2021            });
2022        }
2023
2024        // Frame-strict decoding (issue #145): the value decoders below only
2025        // bounds-check against the *buffer*, so on an under-declared frame
2026        // they could read past the declared length into the next token's
2027        // bytes. Slice exactly the declared frame and decode from that:
2028        // over-read attempts now hit frame end and take the lenient
2029        // empty-value fallbacks (preserving the #140 hostile-input
2030        // behavior), and the outer buffer always advances by exactly
2031        // `length`.
2032        let mut frame = src.copy_to_bytes(length);
2033        let src = &mut frame;
2034
2035        let env_type_byte = src.get_u8();
2036        let env_type = EnvChangeType::from_u8(env_type_byte)
2037            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
2038
2039        let (new_value, old_value) = match env_type {
2040            EnvChangeType::Routing => {
2041                // Routing has special format
2042                let new_value = Self::decode_routing_value(src)?;
2043                let old_value = EnvChangeValue::Binary(Bytes::new());
2044                (new_value, old_value)
2045            }
2046            EnvChangeType::BeginTransaction
2047            | EnvChangeType::CommitTransaction
2048            | EnvChangeType::RollbackTransaction
2049            | EnvChangeType::EnlistDtcTransaction
2050            | EnvChangeType::SqlCollation => {
2051                // These use binary format per MS-TDS spec:
2052                // - Transaction tokens: transaction descriptor (8 bytes)
2053                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
2054                // The declared ENVCHANGE `length` can be shorter than this
2055                // branch needs (e.g. covers only the type byte), so the
2056                // length-prefix reads must be bounds-checked individually:
2057                // `get_u8` on an empty buffer panics. Match the branch's
2058                // existing graceful style — a missing prefix means empty.
2059                let new_len = if src.has_remaining() {
2060                    src.get_u8() as usize
2061                } else {
2062                    0
2063                };
2064                let new_value = if new_len > 0 && src.remaining() >= new_len {
2065                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
2066                } else {
2067                    EnvChangeValue::Binary(Bytes::new())
2068                };
2069
2070                let old_len = if src.has_remaining() {
2071                    src.get_u8() as usize
2072                } else {
2073                    0
2074                };
2075                let old_value = if old_len > 0 && src.remaining() >= old_len {
2076                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
2077                } else {
2078                    EnvChangeValue::Binary(Bytes::new())
2079                };
2080
2081                (new_value, old_value)
2082            }
2083            _ => {
2084                // String format for most env changes
2085                let new_value = read_b_varchar(src)
2086                    .map(EnvChangeValue::String)
2087                    .unwrap_or(EnvChangeValue::String(String::new()));
2088
2089                let old_value = read_b_varchar(src)
2090                    .map(EnvChangeValue::String)
2091                    .unwrap_or(EnvChangeValue::String(String::new()));
2092
2093                (new_value, old_value)
2094            }
2095        };
2096
2097        // No frame-boundary fixup needed: the whole declared frame was
2098        // consumed from the outer buffer up front, so decoders that
2099        // under-consume (e.g. Routing's implicit zero-length OldValue) just
2100        // leave bytes behind in the dropped sub-frame.
2101
2102        Ok(Self {
2103            env_type,
2104            new_value,
2105            old_value,
2106        })
2107    }
2108
2109    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
2110        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
2111        if src.remaining() < 2 {
2112            return Err(ProtocolError::UnexpectedEof);
2113        }
2114
2115        let _routing_len = src.get_u16_le();
2116
2117        if src.remaining() < 5 {
2118            return Err(ProtocolError::UnexpectedEof);
2119        }
2120
2121        let _protocol = src.get_u8();
2122        let port = src.get_u16_le();
2123        let server_len = src.get_u16_le() as usize;
2124
2125        // Read UTF-16LE server name
2126        if src.remaining() < server_len * 2 {
2127            return Err(ProtocolError::UnexpectedEof);
2128        }
2129
2130        let mut chars = Vec::with_capacity(server_len);
2131        for _ in 0..server_len {
2132            chars.push(src.get_u16_le());
2133        }
2134
2135        let host = String::from_utf16(&chars).map_err(|_| {
2136            ProtocolError::StringEncoding(
2137                #[cfg(feature = "std")]
2138                "invalid UTF-16 in routing hostname".to_string(),
2139                #[cfg(not(feature = "std"))]
2140                "invalid UTF-16 in routing hostname",
2141            )
2142        })?;
2143
2144        Ok(EnvChangeValue::Routing { host, port })
2145    }
2146
2147    /// Check if this is a routing redirect.
2148    #[must_use]
2149    pub fn is_routing(&self) -> bool {
2150        self.env_type == EnvChangeType::Routing
2151    }
2152
2153    /// Get routing information if this is a routing change.
2154    #[must_use]
2155    pub fn routing_info(&self) -> Option<(&str, u16)> {
2156        if let EnvChangeValue::Routing { host, port } = &self.new_value {
2157            Some((host, *port))
2158        } else {
2159            None
2160        }
2161    }
2162
2163    /// Get the new database name if this is a database change.
2164    #[must_use]
2165    pub fn new_database(&self) -> Option<&str> {
2166        if self.env_type == EnvChangeType::Database {
2167            if let EnvChangeValue::String(s) = &self.new_value {
2168                return Some(s);
2169            }
2170        }
2171        None
2172    }
2173}
2174
2175impl Order {
2176    /// Decode an ORDER token from bytes.
2177    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2178        if src.remaining() < 2 {
2179            return Err(ProtocolError::UnexpectedEof);
2180        }
2181
2182        let length = src.get_u16_le() as usize;
2183        let column_count = length / 2;
2184
2185        if src.remaining() < length {
2186            return Err(ProtocolError::IncompletePacket {
2187                expected: length,
2188                actual: src.remaining(),
2189            });
2190        }
2191
2192        let mut columns = Vec::with_capacity(column_count);
2193        for _ in 0..column_count {
2194            columns.push(src.get_u16_le());
2195        }
2196
2197        Ok(Self { columns })
2198    }
2199}
2200
2201impl FeatureExtAck {
2202    /// Feature terminator byte.
2203    pub const TERMINATOR: u8 = 0xFF;
2204
2205    /// Decode a FEATUREEXTACK token from bytes.
2206    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2207        let mut features = Vec::new();
2208
2209        loop {
2210            if !src.has_remaining() {
2211                return Err(ProtocolError::UnexpectedEof);
2212            }
2213
2214            let feature_id = src.get_u8();
2215            if feature_id == Self::TERMINATOR {
2216                break;
2217            }
2218
2219            if src.remaining() < 4 {
2220                return Err(ProtocolError::UnexpectedEof);
2221            }
2222
2223            let data_len = src.get_u32_le() as usize;
2224
2225            if src.remaining() < data_len {
2226                return Err(ProtocolError::IncompletePacket {
2227                    expected: data_len,
2228                    actual: src.remaining(),
2229                });
2230            }
2231
2232            let data = src.copy_to_bytes(data_len);
2233            features.push(FeatureAck { feature_id, data });
2234        }
2235
2236        Ok(Self { features })
2237    }
2238}
2239
2240impl SspiToken {
2241    /// Decode an SSPI token from bytes.
2242    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2243        if src.remaining() < 2 {
2244            return Err(ProtocolError::UnexpectedEof);
2245        }
2246
2247        let length = src.get_u16_le() as usize;
2248
2249        if src.remaining() < length {
2250            return Err(ProtocolError::IncompletePacket {
2251                expected: length,
2252                actual: src.remaining(),
2253            });
2254        }
2255
2256        let data = src.copy_to_bytes(length);
2257        Ok(Self { data })
2258    }
2259}
2260
2261impl FedAuthInfo {
2262    /// `FedAuthInfoID` for the STS URL (MS-TDS §2.2.7.12: %0x01 = STSURL).
2263    const ID_STSURL: u8 = 0x01;
2264    /// `FedAuthInfoID` for the service principal name (MS-TDS §2.2.7.12:
2265    /// %0x02 = SPN).
2266    const ID_SPN: u8 = 0x02;
2267    /// Size of one `FedAuthInfoOpt` header: ID (1) + DataLen (4) + DataOffset (4).
2268    const OPT_HEADER_LEN: usize = 9;
2269
2270    /// Decode a FEDAUTHINFO token from bytes.
2271    ///
2272    /// Wire layout per MS-TDS §2.2.7.12 (after the 0xEE token byte):
2273    /// `TokenLength` (DWORD) covering everything that follows, then
2274    /// `CountOfInfoIDs` (DWORD), then `CountOfInfoIDs` option headers of
2275    /// ID (BYTE) + `FedAuthInfoDataLen` (DWORD) + `FedAuthInfoDataOffset`
2276    /// (DWORD), then the data block. Offsets are relative to the start of
2277    /// the `CountOfInfoIDs` field, and the option data is UTF-16LE.
2278    ///
2279    /// Exactly `TokenLength` bytes are consumed, so tokens that follow
2280    /// FEDAUTHINFO in the login stream (LOGINACK, DONE) are preserved.
2281    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2282        if src.remaining() < 4 {
2283            return Err(ProtocolError::UnexpectedEof);
2284        }
2285        let token_len = src.get_u32_le() as usize;
2286        if src.remaining() < token_len {
2287            return Err(ProtocolError::UnexpectedEof);
2288        }
2289
2290        // Offsets in the option headers are relative to the start of this
2291        // region (the CountOfInfoIDs field), so address into it directly.
2292        let region = src.copy_to_bytes(token_len);
2293        if region.len() < 4 {
2294            return Err(ProtocolError::UnexpectedEof);
2295        }
2296        let count = u32::from_le_bytes([region[0], region[1], region[2], region[3]]) as usize;
2297
2298        // All headers must fit between the count field and the end of the
2299        // token. The checked math also rejects hostile counts that would
2300        // overflow the offset arithmetic.
2301        let headers_end = count
2302            .checked_mul(Self::OPT_HEADER_LEN)
2303            .and_then(|n| n.checked_add(4))
2304            .ok_or(ProtocolError::UnexpectedEof)?;
2305        if headers_end > region.len() {
2306            return Err(ProtocolError::UnexpectedEof);
2307        }
2308
2309        let mut sts_url = String::new();
2310        let mut spn = String::new();
2311
2312        for i in 0..count {
2313            let h = 4 + i * Self::OPT_HEADER_LEN;
2314            let info_id = region[h];
2315            let data_len =
2316                u32::from_le_bytes([region[h + 1], region[h + 2], region[h + 3], region[h + 4]])
2317                    as usize;
2318            let data_off =
2319                u32::from_le_bytes([region[h + 5], region[h + 6], region[h + 7], region[h + 8]])
2320                    as usize;
2321
2322            // Unknown IDs are skipped without validating their data, per the
2323            // spec's instruction to ignore unrecognized options.
2324            if info_id != Self::ID_SPN && info_id != Self::ID_STSURL {
2325                continue;
2326            }
2327
2328            let data_end = data_off
2329                .checked_add(data_len)
2330                .ok_or(ProtocolError::UnexpectedEof)?;
2331            if data_end > region.len() {
2332                return Err(ProtocolError::UnexpectedEof);
2333            }
2334            if data_len % 2 != 0 {
2335                return Err(ProtocolError::StringEncoding(
2336                    #[cfg(feature = "std")]
2337                    "FEDAUTHINFO option data has odd length, not UTF-16".to_string(),
2338                    #[cfg(not(feature = "std"))]
2339                    "FEDAUTHINFO option data has odd length, not UTF-16",
2340                ));
2341            }
2342
2343            let chars: Vec<u16> = region[data_off..data_end]
2344                .chunks_exact(2)
2345                .map(|b| u16::from_le_bytes([b[0], b[1]]))
2346                .collect();
2347            let value = String::from_utf16(&chars).map_err(|_| {
2348                ProtocolError::StringEncoding(
2349                    #[cfg(feature = "std")]
2350                    "invalid UTF-16 in FEDAUTHINFO option".to_string(),
2351                    #[cfg(not(feature = "std"))]
2352                    "invalid UTF-16 in FEDAUTHINFO option",
2353                )
2354            })?;
2355
2356            if info_id == Self::ID_SPN {
2357                spn = value;
2358            } else {
2359                sts_url = value;
2360            }
2361        }
2362
2363        Ok(Self { sts_url, spn })
2364    }
2365}
2366
2367// =============================================================================
2368// Token Parser
2369// =============================================================================
2370
2371/// Token stream parser.
2372///
2373/// Parses a stream of TDS tokens from a byte buffer.
2374///
2375/// # Basic vs Context-Aware Parsing
2376///
2377/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
2378/// Use [`next_token()`](TokenParser::next_token) for these.
2379///
2380/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
2381/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
2382/// for these.
2383///
2384/// # Example
2385///
2386/// ```rust,ignore
2387/// let mut parser = TokenParser::new(data);
2388/// let mut metadata = None;
2389///
2390/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
2391///     match token {
2392///         Token::ColMetaData(meta) => {
2393///             metadata = Some(meta);
2394///         }
2395///         Token::Row(row) => {
2396///             // Process row using metadata
2397///         }
2398///         Token::Done(done) => {
2399///             if !done.has_more() {
2400///                 break;
2401///             }
2402///         }
2403///         _ => {}
2404///     }
2405/// }
2406/// ```
2407pub struct TokenParser {
2408    data: Bytes,
2409    position: usize,
2410    /// Whether Always Encrypted was negotiated for this connection.
2411    /// When true, ColMetaData tokens are parsed with CekTable and per-column CryptoMetadata.
2412    encryption_enabled: bool,
2413    /// COMPUTE BY (ALTMETADATA) column layouts keyed by compute Id, used to
2414    /// parse and drop the matching ALTROW tokens. Cleared on each new
2415    /// ColMetaData (result-set boundary), where compute Ids become unique again.
2416    alt_metadata: Vec<(u16, ColMetaData)>,
2417}
2418
2419impl TokenParser {
2420    /// Create a new token parser from bytes.
2421    #[must_use]
2422    pub fn new(data: Bytes) -> Self {
2423        Self {
2424            data,
2425            position: 0,
2426            encryption_enabled: false,
2427            alt_metadata: Vec::new(),
2428        }
2429    }
2430
2431    /// Enable Always Encrypted metadata parsing.
2432    ///
2433    /// When enabled, ColMetaData tokens are parsed using the encrypted format
2434    /// which includes a CekTable and per-column CryptoMetadata.
2435    #[must_use]
2436    pub fn with_encryption(mut self, enabled: bool) -> Self {
2437        self.encryption_enabled = enabled;
2438        self
2439    }
2440
2441    /// Get remaining bytes in the buffer.
2442    #[must_use]
2443    pub fn remaining(&self) -> usize {
2444        self.data.len().saturating_sub(self.position)
2445    }
2446
2447    /// Check if there are more bytes to parse.
2448    #[must_use]
2449    pub fn has_remaining(&self) -> bool {
2450        self.position < self.data.len()
2451    }
2452
2453    /// Peek at the next token type without consuming it.
2454    #[must_use]
2455    pub fn peek_token_type(&self) -> Option<TokenType> {
2456        if self.position < self.data.len() {
2457            TokenType::from_u8(self.data[self.position])
2458        } else {
2459            None
2460        }
2461    }
2462
2463    /// Parse the next token from the stream.
2464    ///
2465    /// This method can only parse context-independent tokens. For tokens that
2466    /// require column metadata (ColMetaData, Row, NbcRow), use
2467    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2468    ///
2469    /// Returns `None` if no more tokens are available.
2470    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2471        self.next_token_with_metadata(None)
2472    }
2473
2474    /// Parse the next token with optional column metadata context.
2475    ///
2476    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2477    /// Without metadata, those tokens will return an error.
2478    ///
2479    /// Returns `None` if no more tokens are available.
2480    pub fn next_token_with_metadata(
2481        &mut self,
2482        metadata: Option<&ColMetaData>,
2483    ) -> Result<Option<Token>, ProtocolError> {
2484        loop {
2485            if !self.has_remaining() {
2486                return Ok(None);
2487            }
2488
2489            let mut buf = &self.data[self.position..];
2490            let start_pos = self.position;
2491
2492            let token_type_byte = buf.get_u8();
2493            let token_type = TokenType::from_u8(token_type_byte);
2494
2495            let token = match token_type {
2496                Some(TokenType::Done) => {
2497                    let done = Done::decode(&mut buf)?;
2498                    Token::Done(done)
2499                }
2500                Some(TokenType::DoneProc) => {
2501                    let done = DoneProc::decode(&mut buf)?;
2502                    Token::DoneProc(done)
2503                }
2504                Some(TokenType::DoneInProc) => {
2505                    let done = DoneInProc::decode(&mut buf)?;
2506                    Token::DoneInProc(done)
2507                }
2508                Some(TokenType::Error) => {
2509                    let error = ServerError::decode(&mut buf)?;
2510                    Token::Error(error)
2511                }
2512                Some(TokenType::Info) => {
2513                    let info = ServerInfo::decode(&mut buf)?;
2514                    Token::Info(info)
2515                }
2516                Some(TokenType::LoginAck) => {
2517                    let login_ack = LoginAck::decode(&mut buf)?;
2518                    Token::LoginAck(login_ack)
2519                }
2520                Some(TokenType::EnvChange) => {
2521                    let env_change = EnvChange::decode(&mut buf)?;
2522                    Token::EnvChange(env_change)
2523                }
2524                Some(TokenType::Order) => {
2525                    let order = Order::decode(&mut buf)?;
2526                    Token::Order(order)
2527                }
2528                Some(TokenType::FeatureExtAck) => {
2529                    let ack = FeatureExtAck::decode(&mut buf)?;
2530                    Token::FeatureExtAck(ack)
2531                }
2532                Some(TokenType::Sspi) => {
2533                    let sspi = SspiToken::decode(&mut buf)?;
2534                    Token::Sspi(sspi)
2535                }
2536                Some(TokenType::FedAuthInfo) => {
2537                    let info = FedAuthInfo::decode(&mut buf)?;
2538                    Token::FedAuthInfo(info)
2539                }
2540                Some(TokenType::ReturnStatus) => {
2541                    if buf.remaining() < 4 {
2542                        return Err(ProtocolError::UnexpectedEof);
2543                    }
2544                    let status = buf.get_i32_le();
2545                    Token::ReturnStatus(status)
2546                }
2547                Some(TokenType::ColMetaData) => {
2548                    let col_meta = if self.encryption_enabled {
2549                        ColMetaData::decode_encrypted(&mut buf)?
2550                    } else {
2551                        ColMetaData::decode(&mut buf)?
2552                    };
2553                    // New result set: prior COMPUTE BY layouts no longer apply
2554                    // and their Ids may be reused.
2555                    self.alt_metadata.clear();
2556                    Token::ColMetaData(col_meta)
2557                }
2558                Some(TokenType::Row) => {
2559                    let meta = metadata.ok_or_else(|| {
2560                        ProtocolError::StringEncoding(
2561                            #[cfg(feature = "std")]
2562                            "Row token requires column metadata".to_string(),
2563                            #[cfg(not(feature = "std"))]
2564                            "Row token requires column metadata",
2565                        )
2566                    })?;
2567                    let row = RawRow::decode(&mut buf, meta)?;
2568                    Token::Row(row)
2569                }
2570                Some(TokenType::NbcRow) => {
2571                    let meta = metadata.ok_or_else(|| {
2572                        ProtocolError::StringEncoding(
2573                            #[cfg(feature = "std")]
2574                            "NbcRow token requires column metadata".to_string(),
2575                            #[cfg(not(feature = "std"))]
2576                            "NbcRow token requires column metadata",
2577                        )
2578                    })?;
2579                    let row = NbcRow::decode(&mut buf, meta)?;
2580                    Token::NbcRow(row)
2581                }
2582                Some(TokenType::ReturnValue) => {
2583                    let ret_val = ReturnValue::decode(&mut buf)?;
2584                    Token::ReturnValue(ret_val)
2585                }
2586                Some(TokenType::SessionState) => {
2587                    let session = SessionState::decode(&mut buf)?;
2588                    Token::SessionState(session)
2589                }
2590                Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2591                    // These tokens are rarely used and have complex formats.
2592                    // Skip them by reading the length and advancing.
2593                    if buf.remaining() < 2 {
2594                        return Err(ProtocolError::UnexpectedEof);
2595                    }
2596                    let length = buf.get_u16_le() as usize;
2597                    if buf.remaining() < length {
2598                        return Err(ProtocolError::IncompletePacket {
2599                            expected: length,
2600                            actual: buf.remaining(),
2601                        });
2602                    }
2603                    // Skip the data
2604                    buf.advance(length);
2605                    // #273: advance past the skipped token and iterate. The skip
2606                    // path must NOT recurse — a server-controlled flat run of these
2607                    // tokens would otherwise add one stack frame per token and
2608                    // overflow the stack (remote DoS).
2609                    self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2610                    continue;
2611                }
2612                Some(TokenType::AltMetaData) => {
2613                    // COMPUTE BY metadata (#275). Parse to learn the ALTROW value
2614                    // layout, store it keyed by Id, then drop the token — compute
2615                    // rows are not surfaced, but their bytes must be consumed so
2616                    // the base result set stays parseable.
2617                    let (id, alt_meta) = ColMetaData::decode_alt(&mut buf)?;
2618                    self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2619                    self.alt_metadata.push((id, alt_meta));
2620                    continue;
2621                }
2622                Some(TokenType::AltRow) => {
2623                    // COMPUTE BY row (#275). Parse using the matching ALTMETADATA
2624                    // to consume its bytes, then drop it. Like the ColInfo skip
2625                    // path, this MUST NOT recurse (remote DoS via a flat token run).
2626                    if buf.remaining() < 2 {
2627                        return Err(ProtocolError::UnexpectedEof);
2628                    }
2629                    let id = buf.get_u16_le();
2630                    let alt_meta = self
2631                        .alt_metadata
2632                        .iter()
2633                        .find(|(stored_id, _)| *stored_id == id)
2634                        .map(|(_, meta)| meta)
2635                        .ok_or_else(|| {
2636                            ProtocolError::StringEncoding(
2637                                #[cfg(feature = "std")]
2638                                "ALTROW token without matching ALTMETADATA".to_string(),
2639                                #[cfg(not(feature = "std"))]
2640                                "ALTROW token without matching ALTMETADATA",
2641                            )
2642                        })?;
2643                    let _ = RawRow::decode(&mut buf, alt_meta)?;
2644                    self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2645                    continue;
2646                }
2647                None => {
2648                    return Err(ProtocolError::InvalidTokenType(token_type_byte));
2649                }
2650            };
2651
2652            // Update position based on how much was consumed
2653            let consumed = self.data.len() - start_pos - buf.remaining();
2654            self.position = start_pos + consumed;
2655
2656            return Ok(Some(token));
2657        }
2658    }
2659
2660    /// Skip the current token without fully parsing it.
2661    ///
2662    /// This is useful for skipping unknown or uninteresting tokens.
2663    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2664        if !self.has_remaining() {
2665            return Ok(());
2666        }
2667
2668        let token_type_byte = self.data[self.position];
2669        let token_type = TokenType::from_u8(token_type_byte);
2670
2671        // Calculate how many bytes to skip based on token type
2672        let skip_amount = match token_type {
2673            // Fixed-size tokens
2674            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2675                1 + Done::SIZE // token type + 12 bytes
2676            }
2677            Some(TokenType::ReturnStatus) => {
2678                1 + 4 // token type + 4 bytes
2679            }
2680            // Variable-length tokens with 2-byte length prefix
2681            Some(TokenType::Error)
2682            | Some(TokenType::Info)
2683            | Some(TokenType::LoginAck)
2684            | Some(TokenType::EnvChange)
2685            | Some(TokenType::Order)
2686            | Some(TokenType::Sspi)
2687            | Some(TokenType::ColInfo)
2688            | Some(TokenType::TabName)
2689            | Some(TokenType::Offset)
2690            | Some(TokenType::ReturnValue) => {
2691                if self.remaining() < 3 {
2692                    return Err(ProtocolError::UnexpectedEof);
2693                }
2694                let length = u16::from_le_bytes([
2695                    self.data[self.position + 1],
2696                    self.data[self.position + 2],
2697                ]) as usize;
2698                1 + 2 + length // token type + length prefix + data
2699            }
2700            // Tokens with 4-byte length prefix
2701            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2702                if self.remaining() < 5 {
2703                    return Err(ProtocolError::UnexpectedEof);
2704                }
2705                let length = u32::from_le_bytes([
2706                    self.data[self.position + 1],
2707                    self.data[self.position + 2],
2708                    self.data[self.position + 3],
2709                    self.data[self.position + 4],
2710                ]) as usize;
2711                1 + 4 + length
2712            }
2713            // FeatureExtAck has no length prefix - must parse
2714            Some(TokenType::FeatureExtAck) => {
2715                // Parse to find end
2716                let mut buf = &self.data[self.position + 1..];
2717                let _ = FeatureExtAck::decode(&mut buf)?;
2718                self.data.len() - self.position - buf.remaining()
2719            }
2720            // COMPUTE BY metadata (#275) has no length prefix; parse to find its
2721            // end, recording the layout so a following ALTROW can be skipped.
2722            Some(TokenType::AltMetaData) => {
2723                let mut buf = &self.data[self.position + 1..];
2724                let (id, alt_meta) = ColMetaData::decode_alt(&mut buf)?;
2725                let consumed = self.data.len() - self.position - buf.remaining();
2726                self.alt_metadata.push((id, alt_meta));
2727                consumed
2728            }
2729            // COMPUTE BY row (#275); length depends on the matching ALTMETADATA.
2730            Some(TokenType::AltRow) => {
2731                let mut buf = &self.data[self.position + 1..];
2732                if buf.remaining() < 2 {
2733                    return Err(ProtocolError::UnexpectedEof);
2734                }
2735                let id = buf.get_u16_le();
2736                let alt_meta = self
2737                    .alt_metadata
2738                    .iter()
2739                    .find(|(stored_id, _)| *stored_id == id)
2740                    .map(|(_, meta)| meta)
2741                    .ok_or(ProtocolError::InvalidTokenType(token_type_byte))?;
2742                let _ = RawRow::decode(&mut buf, alt_meta)?;
2743                self.data.len() - self.position - buf.remaining()
2744            }
2745            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2746            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2747                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2748            }
2749            None => {
2750                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2751            }
2752        };
2753
2754        if self.remaining() < skip_amount {
2755            return Err(ProtocolError::UnexpectedEof);
2756        }
2757
2758        self.position += skip_amount;
2759        Ok(())
2760    }
2761
2762    /// Get the current position in the buffer.
2763    #[must_use]
2764    pub fn position(&self) -> usize {
2765        self.position
2766    }
2767
2768    /// Reset the parser to the beginning.
2769    pub fn reset(&mut self) {
2770        self.position = 0;
2771    }
2772}
2773
2774// =============================================================================
2775// Tests
2776// =============================================================================
2777
2778#[cfg(test)]
2779#[allow(clippy::unwrap_used, clippy::panic)]
2780mod tests {
2781    use super::*;
2782    use bytes::BytesMut;
2783
2784    #[test]
2785    fn test_done_roundtrip() {
2786        let done = Done {
2787            status: DoneStatus {
2788                more: false,
2789                error: false,
2790                in_xact: false,
2791                count: true,
2792                attn: false,
2793                srverror: false,
2794            },
2795            cur_cmd: 193, // SELECT
2796            row_count: 42,
2797        };
2798
2799        let mut buf = BytesMut::new();
2800        done.encode(&mut buf);
2801
2802        // Skip the token type byte
2803        let mut cursor = &buf[1..];
2804        let decoded = Done::decode(&mut cursor).unwrap();
2805
2806        assert_eq!(decoded.status.count, done.status.count);
2807        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2808        assert_eq!(decoded.row_count, done.row_count);
2809    }
2810
2811    #[test]
2812    fn test_done_status_bits() {
2813        let status = DoneStatus {
2814            more: true,
2815            error: true,
2816            in_xact: true,
2817            count: true,
2818            attn: false,
2819            srverror: false,
2820        };
2821
2822        let bits = status.to_bits();
2823        let restored = DoneStatus::from_bits(bits);
2824
2825        assert_eq!(status.more, restored.more);
2826        assert_eq!(status.error, restored.error);
2827        assert_eq!(status.in_xact, restored.in_xact);
2828        assert_eq!(status.count, restored.count);
2829    }
2830
2831    #[test]
2832    fn test_token_parser_done() {
2833        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2834        let data = Bytes::from_static(&[
2835            0xFD, // DONE token type
2836            0x10, 0x00, // status: DONE_COUNT
2837            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2838            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2839        ]);
2840
2841        let mut parser = TokenParser::new(data);
2842        let token = parser.next_token().unwrap().unwrap();
2843
2844        match token {
2845            Token::Done(done) => {
2846                assert!(done.status.count);
2847                assert!(!done.status.more);
2848                assert_eq!(done.cur_cmd, 193);
2849                assert_eq!(done.row_count, 5);
2850            }
2851            _ => panic!("Expected Done token"),
2852        }
2853
2854        // No more tokens
2855        assert!(parser.next_token().unwrap().is_none());
2856    }
2857
2858    #[test]
2859    fn test_env_change_type_from_u8() {
2860        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2861        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2862        assert_eq!(EnvChangeType::from_u8(100), None);
2863    }
2864
2865    /// A spec-faithful Routing ENVCHANGE (MS-TDS 2.2.7.9) carries a
2866    /// zero-length OldValue (two bytes) after the routing data. The decoder
2867    /// reads only the NewValue, so it must skip to the declared frame
2868    /// boundary — otherwise the leftover `00 00` is misparsed as the next
2869    /// token type and the rest of the login response is garbage. Azure SQL
2870    /// Gateway redirects send exactly this shape.
2871    #[test]
2872    fn test_env_change_routing_consumes_declared_length() {
2873        let host = "redirect.example";
2874        let host_utf16: Vec<u16> = host.encode_utf16().collect();
2875
2876        let mut data = BytesMut::new();
2877        // RoutingDataValue: length + protocol + port + server_len + server
2878        let routing_len = 1 + 2 + 2 + host_utf16.len() * 2;
2879        // ENVCHANGE length: type byte + routing length prefix + routing data
2880        // + zero-length OldValue
2881        let env_len = 1 + 2 + routing_len + 2;
2882        data.put_u16_le(env_len as u16);
2883        data.put_u8(20); // Routing
2884        data.put_u16_le(routing_len as u16);
2885        data.put_u8(0); // protocol: TCP
2886        data.put_u16_le(11000); // port
2887        data.put_u16_le(host_utf16.len() as u16);
2888        for c in &host_utf16 {
2889            data.put_u16_le(*c);
2890        }
2891        data.put_u16_le(0); // OldValue: zero-length US_VARBYTE
2892        // A trailing DONE token type byte that must remain for the next read.
2893        data.put_u8(0xFD);
2894
2895        let mut buf: &[u8] = &data;
2896        let env = EnvChange::decode(&mut buf).unwrap();
2897        assert_eq!(env.routing_info(), Some((host, 11000)));
2898        assert_eq!(
2899            buf,
2900            &[0xFD],
2901            "decode must consume exactly the declared ENVCHANGE frame"
2902        );
2903    }
2904
2905    fn put_b_varchar(buf: &mut BytesMut, s: &str) {
2906        let utf16: Vec<u16> = s.encode_utf16().collect();
2907        buf.put_u8(utf16.len() as u8);
2908        for c in utf16 {
2909            buf.put_u16_le(c);
2910        }
2911    }
2912
2913    fn put_us_varchar(buf: &mut BytesMut, s: &str) {
2914        let utf16: Vec<u16> = s.encode_utf16().collect();
2915        buf.put_u16_le(utf16.len() as u16);
2916        for c in utf16 {
2917            buf.put_u16_le(c);
2918        }
2919    }
2920
2921    /// UDT_INFO regression (issue #154): per MS-TDS, DB_NAME, SCHEMA_NAME,
2922    /// and TYPE_NAME are B_VARCHAR (1-byte length); only
2923    /// ASSEMBLY_QUALIFIED_NAME is US_VARCHAR. Reading all four as US_VARCHAR
2924    /// misaligned the stream, so every query selecting a UDT column
2925    /// (geography, geometry, hierarchyid, CLR UDTs) failed with
2926    /// UnexpectedEof.
2927    #[test]
2928    fn test_udt_info_metadata_uses_b_varchar_names() {
2929        let mut data = BytesMut::new();
2930        data.put_u16_le(0xFFFF); // MAX_BYTE_SIZE
2931        put_b_varchar(&mut data, "master");
2932        put_b_varchar(&mut data, "dbo");
2933        put_b_varchar(&mut data, "hierarchyid");
2934        put_us_varchar(
2935            &mut data,
2936            "Microsoft.SqlServer.Types.SqlHierarchyId, Microsoft.SqlServer.Types",
2937        );
2938        // A trailing token type byte that must remain for the next read.
2939        data.put_u8(0xFD);
2940
2941        let mut buf: &[u8] = &data;
2942        let info = decode_type_info(&mut buf, TypeId::Udt, TypeId::Udt as u8).unwrap();
2943        assert_eq!(info.max_length, Some(0xFFFF));
2944        assert_eq!(
2945            buf,
2946            &[0xFD],
2947            "decode must consume exactly the UDT_INFO frame"
2948        );
2949    }
2950
2951    /// XML_INFO regression (issue #154): per MS-TDS §2.2.5.5.3, DBNAME and
2952    /// OWNING_SCHEMA are B_VARCHAR; only XML_SCHEMA_COLLECTION is US_VARCHAR.
2953    /// Schema-bound xml columns (SCHEMA_PRESENT=1) previously misparsed.
2954    #[test]
2955    fn test_xml_info_schema_bound_uses_b_varchar_names() {
2956        let mut data = BytesMut::new();
2957        data.put_u8(1); // SCHEMA_PRESENT
2958        put_b_varchar(&mut data, "master");
2959        put_b_varchar(&mut data, "dbo");
2960        put_us_varchar(&mut data, "MyXmlSchemaCollection");
2961        data.put_u8(0xFD);
2962
2963        let mut buf: &[u8] = &data;
2964        decode_type_info(&mut buf, TypeId::Xml, TypeId::Xml as u8).unwrap();
2965        assert_eq!(
2966            buf,
2967            &[0xFD],
2968            "decode must consume exactly the XML_INFO frame"
2969        );
2970    }
2971
2972    #[test]
2973    fn hostile_env_change_binary_truncated_is_not_panic() {
2974        // length=1 covers only the type byte (0x08 = BeginTransaction, a
2975        // binary-format type); the new_len/old_len prefix reads then hit an
2976        // empty buffer. Must decode gracefully, never panic (found by the
2977        // parse_env_change and parse_token fuzz targets).
2978        let data = [0x01, 0x00, 0x08];
2979        let mut buf: &[u8] = &data;
2980        let env = EnvChange::decode(&mut buf).unwrap();
2981        assert_eq!(env.env_type, EnvChangeType::BeginTransaction);
2982    }
2983
2984    /// Issue #145: an under-declared frame must not let the value decoders
2985    /// read past the declared length into the next token's bytes.
2986    #[test]
2987    fn hostile_env_change_under_declared_cannot_steal_following_bytes() {
2988        // length=1 covers only the type byte; the bytes after the frame are
2989        // shaped exactly like the transaction-descriptor payload the old
2990        // buffer-bounded decoder would have consumed (new_len=8, descriptor,
2991        // old_len=0).
2992        let mut data = BytesMut::new();
2993        data.put_u16_le(1); // declared frame: type byte only
2994        data.put_u8(0x08); // BeginTransaction
2995        let following: &[u8] = &[0x08, 1, 2, 3, 4, 5, 6, 7, 8, 0x00];
2996        data.extend_from_slice(following);
2997
2998        let mut buf: &[u8] = &data;
2999        let env = EnvChange::decode(&mut buf).unwrap();
3000        assert_eq!(env.env_type, EnvChangeType::BeginTransaction);
3001        match &env.new_value {
3002            EnvChangeValue::Binary(b) => {
3003                assert!(
3004                    b.is_empty(),
3005                    "under-declared frame yields the lenient empty value"
3006                );
3007            }
3008            other => panic!("expected empty Binary value, got {other:?}"),
3009        }
3010        assert_eq!(
3011            buf, following,
3012            "bytes beyond the declared frame belong to the next token"
3013        );
3014    }
3015
3016    /// Issue #145: a zero-length frame cannot supply a type byte; reading
3017    /// one from beyond the frame would consume the next token.
3018    #[test]
3019    fn hostile_env_change_zero_length_frame_errors() {
3020        let data = [0x00, 0x00, 0xFD];
3021        let mut buf: &[u8] = &data;
3022        assert!(EnvChange::decode(&mut buf).is_err());
3023    }
3024
3025    #[test]
3026    fn test_colmetadata_no_columns() {
3027        // No metadata marker (0xFFFF)
3028        let data = Bytes::from_static(&[0xFF, 0xFF]);
3029        let mut cursor: &[u8] = &data;
3030        let meta = ColMetaData::decode(&mut cursor).unwrap();
3031        assert!(meta.is_empty());
3032        assert_eq!(meta.column_count(), 0);
3033    }
3034
3035    #[test]
3036    fn test_colmetadata_single_int_column() {
3037        // COLMETADATA with 1 INT column
3038        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
3039        let mut data = BytesMut::new();
3040        data.extend_from_slice(&[0x01, 0x00]); // 1 column
3041        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
3042        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
3043        data.extend_from_slice(&[0x38]); // TypeId::Int4
3044        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
3045        data.extend_from_slice(&[0x02]); // 2 characters
3046        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
3047
3048        let mut cursor: &[u8] = &data;
3049        let meta = ColMetaData::decode(&mut cursor).unwrap();
3050
3051        assert_eq!(meta.column_count(), 1);
3052        assert_eq!(meta.columns[0].name, "id");
3053        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
3054        assert!(meta.columns[0].is_nullable());
3055    }
3056
3057    #[test]
3058    fn test_colmetadata_nvarchar_column() {
3059        // COLMETADATA with 1 NVARCHAR(50) column
3060        let mut data = BytesMut::new();
3061        data.extend_from_slice(&[0x01, 0x00]); // 1 column
3062        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
3063        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
3064        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
3065        // Type info: max_length (2 bytes) + collation (5 bytes)
3066        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
3067        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
3068        // Column name "name"
3069        data.extend_from_slice(&[0x04]); // 4 characters
3070        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
3071
3072        let mut cursor: &[u8] = &data;
3073        let meta = ColMetaData::decode(&mut cursor).unwrap();
3074
3075        assert_eq!(meta.column_count(), 1);
3076        assert_eq!(meta.columns[0].name, "name");
3077        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
3078        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
3079        assert!(meta.columns[0].type_info.collation.is_some());
3080    }
3081
3082    #[test]
3083    fn test_raw_row_decode_int() {
3084        // Create metadata for a single INT column
3085        let metadata = ColMetaData {
3086            cek_table: None,
3087            columns: vec![ColumnData {
3088                name: "id".to_string(),
3089                type_id: TypeId::Int4,
3090                col_type: 0x38,
3091                flags: 0,
3092                user_type: 0,
3093                type_info: TypeInfo::default(),
3094                crypto_metadata: None,
3095            }],
3096        };
3097
3098        // Row data: just 4 bytes for the int value 42
3099        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
3100        let mut cursor: &[u8] = &data;
3101        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
3102
3103        // The raw data should contain the 4 bytes
3104        assert_eq!(row.data.len(), 4);
3105        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
3106    }
3107
3108    #[test]
3109    fn test_raw_row_decode_nullable_int() {
3110        // Create metadata for a nullable INT column (IntN)
3111        let metadata = ColMetaData {
3112            cek_table: None,
3113            columns: vec![ColumnData {
3114                name: "id".to_string(),
3115                type_id: TypeId::IntN,
3116                col_type: 0x26,
3117                flags: 0x01, // nullable
3118                user_type: 0,
3119                type_info: TypeInfo {
3120                    max_length: Some(4),
3121                    ..Default::default()
3122                },
3123                crypto_metadata: None,
3124            }],
3125        };
3126
3127        // Row data with value: 1 byte length + 4 bytes value
3128        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
3129        let mut cursor: &[u8] = &data;
3130        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
3131
3132        assert_eq!(row.data.len(), 5);
3133        assert_eq!(row.data[0], 4); // length
3134        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
3135    }
3136
3137    #[test]
3138    fn test_raw_row_decode_null_value() {
3139        // Create metadata for a nullable INT column (IntN)
3140        let metadata = ColMetaData {
3141            cek_table: None,
3142            columns: vec![ColumnData {
3143                name: "id".to_string(),
3144                type_id: TypeId::IntN,
3145                col_type: 0x26,
3146                flags: 0x01, // nullable
3147                user_type: 0,
3148                type_info: TypeInfo {
3149                    max_length: Some(4),
3150                    ..Default::default()
3151                },
3152                crypto_metadata: None,
3153            }],
3154        };
3155
3156        // NULL value: length = 0xFF (for bytelen types)
3157        let data = Bytes::from_static(&[0xFF]);
3158        let mut cursor: &[u8] = &data;
3159        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
3160
3161        assert_eq!(row.data.len(), 1);
3162        assert_eq!(row.data[0], 0xFF); // NULL marker
3163    }
3164
3165    #[test]
3166    fn test_nbcrow_null_bitmap() {
3167        let row = NbcRow {
3168            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
3169            data: Bytes::new(),
3170        };
3171
3172        assert!(row.is_null(0));
3173        assert!(!row.is_null(1));
3174        assert!(row.is_null(2));
3175        assert!(!row.is_null(3));
3176    }
3177
3178    #[test]
3179    fn test_token_parser_colmetadata() {
3180        // Build a COLMETADATA token with 1 INT column
3181        let mut data = BytesMut::new();
3182        data.extend_from_slice(&[0x81]); // COLMETADATA token type
3183        data.extend_from_slice(&[0x01, 0x00]); // 1 column
3184        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
3185        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
3186        data.extend_from_slice(&[0x38]); // TypeId::Int4
3187        data.extend_from_slice(&[0x02]); // column name length
3188        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
3189
3190        let mut parser = TokenParser::new(data.freeze());
3191        let token = parser.next_token().unwrap().unwrap();
3192
3193        match token {
3194            Token::ColMetaData(meta) => {
3195                assert_eq!(meta.column_count(), 1);
3196                assert_eq!(meta.columns[0].name, "id");
3197                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
3198            }
3199            _ => panic!("Expected ColMetaData token"),
3200        }
3201    }
3202
3203    #[test]
3204    fn test_token_parser_row_with_metadata() {
3205        // Build metadata
3206        let metadata = ColMetaData {
3207            cek_table: None,
3208            columns: vec![ColumnData {
3209                name: "id".to_string(),
3210                type_id: TypeId::Int4,
3211                col_type: 0x38,
3212                flags: 0,
3213                user_type: 0,
3214                type_info: TypeInfo::default(),
3215                crypto_metadata: None,
3216            }],
3217        };
3218
3219        // Build ROW token
3220        let mut data = BytesMut::new();
3221        data.extend_from_slice(&[0xD1]); // ROW token type
3222        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
3223
3224        let mut parser = TokenParser::new(data.freeze());
3225        let token = parser
3226            .next_token_with_metadata(Some(&metadata))
3227            .unwrap()
3228            .unwrap();
3229
3230        match token {
3231            Token::Row(row) => {
3232                assert_eq!(row.data.len(), 4);
3233            }
3234            _ => panic!("Expected Row token"),
3235        }
3236    }
3237
3238    #[test]
3239    fn test_token_parser_row_without_metadata_fails() {
3240        // Build ROW token
3241        let mut data = BytesMut::new();
3242        data.extend_from_slice(&[0xD1]); // ROW token type
3243        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
3244
3245        let mut parser = TokenParser::new(data.freeze());
3246        let result = parser.next_token(); // No metadata provided
3247
3248        assert!(result.is_err());
3249    }
3250
3251    #[test]
3252    fn test_token_parser_peek() {
3253        let data = Bytes::from_static(&[
3254            0xFD, // DONE token type
3255            0x10, 0x00, // status
3256            0xC1, 0x00, // cur_cmd
3257            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
3258        ]);
3259
3260        let parser = TokenParser::new(data);
3261        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
3262    }
3263
3264    #[test]
3265    fn test_column_data_fixed_size() {
3266        let col = ColumnData {
3267            name: String::new(),
3268            type_id: TypeId::Int4,
3269            col_type: 0x38,
3270            flags: 0,
3271            user_type: 0,
3272            type_info: TypeInfo::default(),
3273            crypto_metadata: None,
3274        };
3275        assert_eq!(col.fixed_size(), Some(4));
3276
3277        let col2 = ColumnData {
3278            name: String::new(),
3279            type_id: TypeId::NVarChar,
3280            col_type: 0xE7,
3281            flags: 0,
3282            user_type: 0,
3283            type_info: TypeInfo::default(),
3284            crypto_metadata: None,
3285        };
3286        assert_eq!(col2.fixed_size(), None);
3287    }
3288
3289    // ========================================================================
3290    // End-to-End Decode Tests (Wire → Stored → Verification)
3291    // ========================================================================
3292    //
3293    // These tests verify that RawRow::decode_column_value correctly stores
3294    // column values in a format that can be parsed back.
3295
3296    #[test]
3297    fn test_decode_nvarchar_then_intn_roundtrip() {
3298        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
3299        // This tests the scenario from the MCP parameterized query
3300
3301        // Build wire data (what the server sends)
3302        let mut wire_data = BytesMut::new();
3303
3304        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
3305        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
3306        let word = "World";
3307        let utf16: Vec<u16> = word.encode_utf16().collect();
3308        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
3309        for code_unit in &utf16 {
3310            wire_data.put_u16_le(*code_unit);
3311        }
3312
3313        // Column 1: IntN 42 - 1-byte length prefix
3314        wire_data.put_u8(4); // 4 bytes for INT
3315        wire_data.put_i32_le(42);
3316
3317        // Build column metadata
3318        let metadata = ColMetaData {
3319            cek_table: None,
3320            columns: vec![
3321                ColumnData {
3322                    name: "greeting".to_string(),
3323                    type_id: TypeId::NVarChar,
3324                    col_type: 0xE7,
3325                    flags: 0x01,
3326                    user_type: 0,
3327                    type_info: TypeInfo {
3328                        max_length: Some(10), // non-MAX
3329                        precision: None,
3330                        scale: None,
3331                        collation: None,
3332                    },
3333                    crypto_metadata: None,
3334                },
3335                ColumnData {
3336                    name: "number".to_string(),
3337                    type_id: TypeId::IntN,
3338                    col_type: 0x26,
3339                    flags: 0x01,
3340                    user_type: 0,
3341                    type_info: TypeInfo {
3342                        max_length: Some(4),
3343                        precision: None,
3344                        scale: None,
3345                        collation: None,
3346                    },
3347                    crypto_metadata: None,
3348                },
3349            ],
3350        };
3351
3352        // Decode the wire data into stored format
3353        let mut wire_cursor = wire_data.freeze();
3354        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
3355
3356        // Verify wire data was fully consumed
3357        assert_eq!(
3358            wire_cursor.remaining(),
3359            0,
3360            "wire data should be fully consumed"
3361        );
3362
3363        // Now parse the stored data
3364        let mut stored_cursor: &[u8] = &raw_row.data;
3365
3366        // Parse column 0 (NVarChar)
3367        // Stored format for non-MAX NVarChar: [2-byte len][data]
3368        assert!(
3369            stored_cursor.remaining() >= 2,
3370            "need at least 2 bytes for length"
3371        );
3372        let len0 = stored_cursor.get_u16_le() as usize;
3373        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
3374        assert!(
3375            stored_cursor.remaining() >= len0,
3376            "need {len0} bytes for data"
3377        );
3378
3379        // Read UTF-16LE and convert to string
3380        let mut utf16_read = Vec::new();
3381        for _ in 0..(len0 / 2) {
3382            utf16_read.push(stored_cursor.get_u16_le());
3383        }
3384        let string0 = String::from_utf16(&utf16_read).unwrap();
3385        assert_eq!(string0, "World", "column 0 should be 'World'");
3386
3387        // Parse column 1 (IntN)
3388        // Stored format for IntN: [1-byte len][data]
3389        assert!(
3390            stored_cursor.remaining() >= 1,
3391            "need at least 1 byte for length"
3392        );
3393        let len1 = stored_cursor.get_u8();
3394        assert_eq!(len1, 4, "IntN length should be 4");
3395        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
3396        let int1 = stored_cursor.get_i32_le();
3397        assert_eq!(int1, 42, "column 1 should be 42");
3398
3399        // Verify stored data was fully consumed
3400        assert_eq!(
3401            stored_cursor.remaining(),
3402            0,
3403            "stored data should be fully consumed"
3404        );
3405    }
3406
3407    #[test]
3408    fn test_decode_nvarchar_max_then_intn_roundtrip() {
3409        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
3410
3411        // Build wire data for PLP NVARCHAR(MAX) + IntN
3412        let mut wire_data = BytesMut::new();
3413
3414        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
3415        // PLP: 8-byte total length, then chunks
3416        let word = "Hello";
3417        let utf16: Vec<u16> = word.encode_utf16().collect();
3418        let byte_len = (utf16.len() * 2) as u64;
3419
3420        wire_data.put_u64_le(byte_len); // total length = 10
3421        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
3422        for code_unit in &utf16 {
3423            wire_data.put_u16_le(*code_unit);
3424        }
3425        wire_data.put_u32_le(0); // terminating zero-length chunk
3426
3427        // Column 1: IntN 99
3428        wire_data.put_u8(4);
3429        wire_data.put_i32_le(99);
3430
3431        // Build metadata with MAX type
3432        let metadata = ColMetaData {
3433            cek_table: None,
3434            columns: vec![
3435                ColumnData {
3436                    name: "text".to_string(),
3437                    type_id: TypeId::NVarChar,
3438                    col_type: 0xE7,
3439                    flags: 0x01,
3440                    user_type: 0,
3441                    type_info: TypeInfo {
3442                        max_length: Some(0xFFFF), // MAX indicator
3443                        precision: None,
3444                        scale: None,
3445                        collation: None,
3446                    },
3447                    crypto_metadata: None,
3448                },
3449                ColumnData {
3450                    name: "num".to_string(),
3451                    type_id: TypeId::IntN,
3452                    col_type: 0x26,
3453                    flags: 0x01,
3454                    user_type: 0,
3455                    type_info: TypeInfo {
3456                        max_length: Some(4),
3457                        precision: None,
3458                        scale: None,
3459                        collation: None,
3460                    },
3461                    crypto_metadata: None,
3462                },
3463            ],
3464        };
3465
3466        // Decode wire data
3467        let mut wire_cursor = wire_data.freeze();
3468        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
3469
3470        // Verify wire data was fully consumed
3471        assert_eq!(
3472            wire_cursor.remaining(),
3473            0,
3474            "wire data should be fully consumed"
3475        );
3476
3477        // Parse stored PLP data for column 0
3478        let mut stored_cursor: &[u8] = &raw_row.data;
3479
3480        // PLP stored format: [8-byte total][chunks...][4-byte 0]
3481        let total_len = stored_cursor.get_u64_le();
3482        assert_eq!(total_len, 10, "PLP total length should be 10");
3483
3484        let chunk_len = stored_cursor.get_u32_le();
3485        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
3486
3487        let mut utf16_read = Vec::new();
3488        for _ in 0..(chunk_len / 2) {
3489            utf16_read.push(stored_cursor.get_u16_le());
3490        }
3491        let string0 = String::from_utf16(&utf16_read).unwrap();
3492        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
3493
3494        let terminator = stored_cursor.get_u32_le();
3495        assert_eq!(terminator, 0, "PLP should end with 0");
3496
3497        // Parse IntN
3498        let len1 = stored_cursor.get_u8();
3499        assert_eq!(len1, 4);
3500        let int1 = stored_cursor.get_i32_le();
3501        assert_eq!(int1, 99, "column 1 should be 99");
3502
3503        // Verify fully consumed
3504        assert_eq!(
3505            stored_cursor.remaining(),
3506            0,
3507            "stored data should be fully consumed"
3508        );
3509    }
3510
3511    // ========================================================================
3512    // ReturnStatus Token Tests
3513    // ========================================================================
3514
3515    #[test]
3516    fn test_return_status_via_parser() {
3517        // RETURNSTATUS token: type (0x79) + value (i32 LE)
3518        let data = Bytes::from_static(&[
3519            0x79, // RETURNSTATUS token type
3520            0x00, 0x00, 0x00, 0x00, // return value = 0 (success)
3521        ]);
3522
3523        let mut parser = TokenParser::new(data);
3524        let token = parser.next_token().unwrap().unwrap();
3525
3526        match token {
3527            Token::ReturnStatus(status) => {
3528                assert_eq!(status, 0);
3529            }
3530            _ => panic!("Expected ReturnStatus token, got {token:?}"),
3531        }
3532
3533        assert!(parser.next_token().unwrap().is_none());
3534    }
3535
3536    #[test]
3537    fn test_return_status_nonzero() {
3538        // Return value = -6 (common for error returns)
3539        let mut buf = BytesMut::new();
3540        buf.put_u8(0x79); // RETURNSTATUS
3541        buf.put_i32_le(-6);
3542
3543        let mut parser = TokenParser::new(buf.freeze());
3544        let token = parser.next_token().unwrap().unwrap();
3545
3546        match token {
3547            Token::ReturnStatus(status) => {
3548                assert_eq!(status, -6);
3549            }
3550            _ => panic!("Expected ReturnStatus token"),
3551        }
3552    }
3553
3554    // ========================================================================
3555    // DoneProc Token Tests
3556    // ========================================================================
3557
3558    #[test]
3559    fn test_done_proc_roundtrip() {
3560        let done = DoneProc {
3561            status: DoneStatus {
3562                more: false,
3563                error: false,
3564                in_xact: false,
3565                count: true,
3566                attn: false,
3567                srverror: false,
3568            },
3569            cur_cmd: 0x00C6, // EXECUTE (198)
3570            row_count: 100,
3571        };
3572
3573        let mut buf = BytesMut::new();
3574        done.encode(&mut buf);
3575
3576        // Verify token type byte
3577        assert_eq!(buf[0], 0xFE);
3578
3579        // Skip token type byte and decode
3580        let mut cursor = &buf[1..];
3581        let decoded = DoneProc::decode(&mut cursor).unwrap();
3582
3583        assert!(decoded.status.count);
3584        assert!(!decoded.status.more);
3585        assert!(!decoded.status.error);
3586        assert_eq!(decoded.cur_cmd, 0x00C6);
3587        assert_eq!(decoded.row_count, 100);
3588    }
3589
3590    #[test]
3591    fn test_done_proc_via_parser() {
3592        let data = Bytes::from_static(&[
3593            0xFE, // DONEPROC token type
3594            0x00, 0x00, // status: no flags
3595            0xC6, 0x00, // cur_cmd: EXECUTE (198)
3596            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 0
3597        ]);
3598
3599        let mut parser = TokenParser::new(data);
3600        let token = parser.next_token().unwrap().unwrap();
3601
3602        match token {
3603            Token::DoneProc(done) => {
3604                assert!(!done.status.count);
3605                assert!(!done.status.more);
3606                assert_eq!(done.cur_cmd, 198);
3607                assert_eq!(done.row_count, 0);
3608            }
3609            _ => panic!("Expected DoneProc token"),
3610        }
3611    }
3612
3613    #[test]
3614    fn test_done_proc_with_error_flag() {
3615        let mut buf = BytesMut::new();
3616        buf.put_u8(0xFE); // DONEPROC
3617        buf.put_u16_le(0x0002); // status: DONE_ERROR
3618        buf.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3619        buf.put_u64_le(0); // row_count
3620
3621        let mut parser = TokenParser::new(buf.freeze());
3622        let token = parser.next_token().unwrap().unwrap();
3623
3624        match token {
3625            Token::DoneProc(done) => {
3626                assert!(done.status.error);
3627                assert!(!done.status.count);
3628                assert!(!done.status.more);
3629            }
3630            _ => panic!("Expected DoneProc token"),
3631        }
3632    }
3633
3634    // ========================================================================
3635    // DoneInProc Token Tests
3636    // ========================================================================
3637
3638    #[test]
3639    fn test_done_in_proc_roundtrip() {
3640        let done = DoneInProc {
3641            status: DoneStatus {
3642                more: true,
3643                error: false,
3644                in_xact: false,
3645                count: true,
3646                attn: false,
3647                srverror: false,
3648            },
3649            cur_cmd: 193, // SELECT
3650            row_count: 7,
3651        };
3652
3653        let mut buf = BytesMut::new();
3654        done.encode(&mut buf);
3655
3656        assert_eq!(buf[0], 0xFF);
3657
3658        let mut cursor = &buf[1..];
3659        let decoded = DoneInProc::decode(&mut cursor).unwrap();
3660
3661        assert!(decoded.status.more);
3662        assert!(decoded.status.count);
3663        assert!(!decoded.status.error);
3664        assert_eq!(decoded.cur_cmd, 193);
3665        assert_eq!(decoded.row_count, 7);
3666    }
3667
3668    #[test]
3669    fn test_done_in_proc_via_parser() {
3670        let data = Bytes::from_static(&[
3671            0xFF, // DONEINPROC token type
3672            0x11, 0x00, // status: MORE | COUNT
3673            0xC1, 0x00, // cur_cmd: SELECT (193)
3674            0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 3
3675        ]);
3676
3677        let mut parser = TokenParser::new(data);
3678        let token = parser.next_token().unwrap().unwrap();
3679
3680        match token {
3681            Token::DoneInProc(done) => {
3682                assert!(done.status.more);
3683                assert!(done.status.count);
3684                assert_eq!(done.cur_cmd, 193);
3685                assert_eq!(done.row_count, 3);
3686            }
3687            _ => panic!("Expected DoneInProc token"),
3688        }
3689    }
3690
3691    // ========================================================================
3692    // ServerError Token Tests
3693    // ========================================================================
3694
3695    #[test]
3696    fn test_server_error_decode() {
3697        // Build a realistic ERROR token (without the 0xAA type byte,
3698        // since decode() is called after the parser strips it).
3699        let mut buf = BytesMut::new();
3700
3701        // Construct the message fields first to compute length
3702        let msg_utf16: Vec<u16> = "Invalid column name 'foo'.".encode_utf16().collect();
3703        let srv_utf16: Vec<u16> = "SQLDB01".encode_utf16().collect();
3704        let proc_utf16: Vec<u16> = "".encode_utf16().collect();
3705
3706        // Length = number(4) + state(1) + class(1)
3707        //        + us_varchar(message): 2 + msg_utf16.len()*2
3708        //        + b_varchar(server): 1 + srv_utf16.len()*2
3709        //        + b_varchar(procedure): 1 + proc_utf16.len()*2
3710        //        + line(4)
3711        let length: u16 = (4
3712            + 1
3713            + 1
3714            + 2
3715            + (msg_utf16.len() * 2)
3716            + 1
3717            + (srv_utf16.len() * 2)
3718            + 1
3719            + (proc_utf16.len() * 2)
3720            + 4) as u16;
3721
3722        buf.put_u16_le(length);
3723        buf.put_i32_le(207); // error number: Invalid column
3724        buf.put_u8(1); // state
3725        buf.put_u8(16); // class (severity 16)
3726
3727        // Message (US_VARCHAR: 2-byte char count + UTF-16LE)
3728        buf.put_u16_le(msg_utf16.len() as u16);
3729        for &c in &msg_utf16 {
3730            buf.put_u16_le(c);
3731        }
3732
3733        // Server (B_VARCHAR: 1-byte char count + UTF-16LE)
3734        buf.put_u8(srv_utf16.len() as u8);
3735        for &c in &srv_utf16 {
3736            buf.put_u16_le(c);
3737        }
3738
3739        // Procedure (B_VARCHAR: empty)
3740        buf.put_u8(proc_utf16.len() as u8);
3741
3742        // Line number
3743        buf.put_i32_le(42);
3744
3745        let mut cursor = buf.freeze();
3746        let error = ServerError::decode(&mut cursor).unwrap();
3747
3748        assert_eq!(error.number, 207);
3749        assert_eq!(error.state, 1);
3750        assert_eq!(error.class, 16);
3751        assert_eq!(error.message, "Invalid column name 'foo'.");
3752        assert_eq!(error.server, "SQLDB01");
3753        assert_eq!(error.procedure, "");
3754        assert_eq!(error.line, 42);
3755    }
3756
3757    #[test]
3758    fn test_server_error_severity_helpers() {
3759        let fatal = ServerError {
3760            number: 4014,
3761            state: 1,
3762            class: 20,
3763            message: "Fatal error".to_string(),
3764            server: String::new(),
3765            procedure: String::new(),
3766            line: 0,
3767        };
3768        assert!(fatal.is_fatal());
3769        assert!(fatal.is_batch_abort());
3770
3771        let batch_abort = ServerError {
3772            number: 547,
3773            state: 0,
3774            class: 16,
3775            message: "Constraint violation".to_string(),
3776            server: String::new(),
3777            procedure: String::new(),
3778            line: 1,
3779        };
3780        assert!(!batch_abort.is_fatal());
3781        assert!(batch_abort.is_batch_abort());
3782
3783        let informational = ServerError {
3784            number: 5701,
3785            state: 2,
3786            class: 10,
3787            message: "Changed db context".to_string(),
3788            server: String::new(),
3789            procedure: String::new(),
3790            line: 0,
3791        };
3792        assert!(!informational.is_fatal());
3793        assert!(!informational.is_batch_abort());
3794    }
3795
3796    #[test]
3797    fn test_server_error_via_parser() {
3798        // Build an ERROR token with the 0xAA type byte for the parser
3799        let mut buf = BytesMut::new();
3800        buf.put_u8(0xAA); // ERROR token type
3801
3802        let msg_utf16: Vec<u16> = "Syntax error".encode_utf16().collect();
3803        let srv_utf16: Vec<u16> = "SRV".encode_utf16().collect();
3804        let proc_utf16: Vec<u16> = "sp_test".encode_utf16().collect();
3805
3806        let length: u16 = (4
3807            + 1
3808            + 1
3809            + 2
3810            + (msg_utf16.len() * 2)
3811            + 1
3812            + (srv_utf16.len() * 2)
3813            + 1
3814            + (proc_utf16.len() * 2)
3815            + 4) as u16;
3816
3817        buf.put_u16_le(length);
3818        buf.put_i32_le(102); // Syntax error
3819        buf.put_u8(1);
3820        buf.put_u8(15);
3821
3822        buf.put_u16_le(msg_utf16.len() as u16);
3823        for &c in &msg_utf16 {
3824            buf.put_u16_le(c);
3825        }
3826        buf.put_u8(srv_utf16.len() as u8);
3827        for &c in &srv_utf16 {
3828            buf.put_u16_le(c);
3829        }
3830        buf.put_u8(proc_utf16.len() as u8);
3831        for &c in &proc_utf16 {
3832            buf.put_u16_le(c);
3833        }
3834        buf.put_i32_le(5);
3835
3836        let mut parser = TokenParser::new(buf.freeze());
3837        let token = parser.next_token().unwrap().unwrap();
3838
3839        match token {
3840            Token::Error(err) => {
3841                assert_eq!(err.number, 102);
3842                assert_eq!(err.class, 15);
3843                assert_eq!(err.message, "Syntax error");
3844                assert_eq!(err.server, "SRV");
3845                assert_eq!(err.procedure, "sp_test");
3846                assert_eq!(err.line, 5);
3847            }
3848            _ => panic!("Expected Error token"),
3849        }
3850    }
3851
3852    // ========================================================================
3853    // ReturnValue Token Tests
3854    // ========================================================================
3855
3856    /// Helper: build a ReturnValue token (without the 0xAC type byte)
3857    /// for an IntN output parameter.
3858    fn build_return_value_intn(
3859        ordinal: u16,
3860        name: &str,
3861        status: u8,
3862        value: Option<i32>,
3863    ) -> BytesMut {
3864        let mut inner = BytesMut::new();
3865
3866        // param_ordinal
3867        inner.put_u16_le(ordinal);
3868
3869        // param_name (B_VARCHAR)
3870        let name_utf16: Vec<u16> = name.encode_utf16().collect();
3871        inner.put_u8(name_utf16.len() as u8);
3872        for &c in &name_utf16 {
3873            inner.put_u16_le(c);
3874        }
3875
3876        // status
3877        inner.put_u8(status);
3878
3879        // user_type (4 bytes)
3880        inner.put_u32_le(0);
3881
3882        // flags (2 bytes)
3883        inner.put_u16_le(0x0001); // nullable
3884
3885        // type_id: IntN = 0x26
3886        inner.put_u8(0x26);
3887
3888        // type_info for IntN: 1-byte max_length
3889        inner.put_u8(4);
3890
3891        // value (TYPE_VARBYTE for IntN: 1-byte length + data)
3892        match value {
3893            Some(v) => {
3894                inner.put_u8(4); // length = 4
3895                inner.put_i32_le(v);
3896            }
3897            None => {
3898                inner.put_u8(0); // length = 0 means NULL
3899            }
3900        }
3901
3902        // RETURNVALUE has no outer length prefix (MS-TDS §2.2.7.18) — the
3903        // decoder walks the inner fields directly after the 0xAC token byte.
3904        inner
3905    }
3906
3907    #[test]
3908    fn test_return_value_int_output() {
3909        let buf = build_return_value_intn(1, "@result", 0x01, Some(42));
3910        let mut cursor = buf.freeze();
3911        let rv = ReturnValue::decode(&mut cursor).unwrap();
3912
3913        assert_eq!(rv.param_ordinal, 1);
3914        assert_eq!(rv.param_name, "@result");
3915        assert_eq!(rv.status, 0x01); // OUTPUT
3916        assert_eq!(rv.col_type, 0x26); // IntN
3917        assert_eq!(rv.type_info.max_length, Some(4));
3918        // Value should contain: length byte (4) + i32 LE (42)
3919        assert_eq!(rv.value.len(), 5);
3920        assert_eq!(rv.value[0], 4);
3921        assert_eq!(
3922            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3923            42
3924        );
3925    }
3926
3927    #[test]
3928    fn test_return_value_null_output() {
3929        let buf = build_return_value_intn(2, "@count", 0x01, None);
3930        let mut cursor = buf.freeze();
3931        let rv = ReturnValue::decode(&mut cursor).unwrap();
3932
3933        assert_eq!(rv.param_ordinal, 2);
3934        assert_eq!(rv.param_name, "@count");
3935        assert_eq!(rv.status, 0x01);
3936        assert_eq!(rv.col_type, 0x26);
3937        // NULL value: length byte = 0
3938        assert_eq!(rv.value.len(), 1);
3939        assert_eq!(rv.value[0], 0);
3940    }
3941
3942    #[test]
3943    fn test_return_value_udf_status() {
3944        // UDF return value has status = 0x02
3945        let buf = build_return_value_intn(0, "@RETURN_VALUE", 0x02, Some(-1));
3946        let mut cursor = buf.freeze();
3947        let rv = ReturnValue::decode(&mut cursor).unwrap();
3948
3949        assert_eq!(rv.param_ordinal, 0);
3950        assert_eq!(rv.param_name, "@RETURN_VALUE");
3951        assert_eq!(rv.status, 0x02); // UDF return value
3952        assert_eq!(rv.value[0], 4);
3953        assert_eq!(
3954            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3955            -1
3956        );
3957    }
3958
3959    #[test]
3960    fn test_return_value_nvarchar_output() {
3961        // Build a ReturnValue for NVARCHAR(100) output parameter
3962        let mut inner = BytesMut::new();
3963
3964        // param_ordinal
3965        inner.put_u16_le(1);
3966
3967        // param_name "@name"
3968        let name_utf16: Vec<u16> = "@name".encode_utf16().collect();
3969        inner.put_u8(name_utf16.len() as u8);
3970        for &c in &name_utf16 {
3971            inner.put_u16_le(c);
3972        }
3973
3974        // status = OUTPUT
3975        inner.put_u8(0x01);
3976        // user_type
3977        inner.put_u32_le(0);
3978        // flags (nullable)
3979        inner.put_u16_le(0x0001);
3980        // type_id: NVarChar = 0xE7
3981        inner.put_u8(0xE7);
3982        // type_info for NVarChar: 2-byte max_length + 5-byte collation
3983        inner.put_u16_le(200); // max 100 chars * 2 bytes
3984        inner.put_u32_le(0x0904D000); // collation LCID
3985        inner.put_u8(0x34); // collation sort_id
3986
3987        // value: "Hello" in UTF-16LE with 2-byte length prefix
3988        let val_utf16: Vec<u16> = "Hello".encode_utf16().collect();
3989        let byte_len = (val_utf16.len() * 2) as u16;
3990        inner.put_u16_le(byte_len);
3991        for &c in &val_utf16 {
3992            inner.put_u16_le(c);
3993        }
3994
3995        let mut cursor = inner.freeze();
3996        let rv = ReturnValue::decode(&mut cursor).unwrap();
3997
3998        assert_eq!(rv.param_ordinal, 1);
3999        assert_eq!(rv.param_name, "@name");
4000        assert_eq!(rv.status, 0x01);
4001        assert_eq!(rv.col_type, 0xE7); // NVarChar
4002        assert_eq!(rv.type_info.max_length, Some(200));
4003        assert!(rv.type_info.collation.is_some());
4004
4005        // Value: 2-byte length (10) + "Hello" in UTF-16LE
4006        assert_eq!(rv.value.len(), 12); // 2 + 10
4007        let val_len = u16::from_le_bytes([rv.value[0], rv.value[1]]);
4008        assert_eq!(val_len, 10);
4009    }
4010
4011    #[test]
4012    fn test_return_value_via_parser() {
4013        // Build a full ReturnValue token with the 0xAC type byte
4014        let mut data = BytesMut::new();
4015        data.put_u8(0xAC); // RETURNVALUE token type
4016        data.extend_from_slice(&build_return_value_intn(0, "@out", 0x01, Some(99)));
4017
4018        let mut parser = TokenParser::new(data.freeze());
4019        let token = parser.next_token().unwrap().unwrap();
4020
4021        match token {
4022            Token::ReturnValue(rv) => {
4023                assert_eq!(rv.param_name, "@out");
4024                assert_eq!(rv.param_ordinal, 0);
4025                assert_eq!(rv.status, 0x01);
4026                assert_eq!(rv.col_type, 0x26);
4027            }
4028            _ => panic!("Expected ReturnValue token"),
4029        }
4030    }
4031
4032    // ========================================================================
4033    // Multi-Token Stream Tests
4034    // ========================================================================
4035
4036    #[test]
4037    fn test_multi_token_stored_proc_response() {
4038        // Simulate a stored procedure response:
4039        // DoneInProc (result set done) → ReturnStatus → DoneProc
4040        let mut data = BytesMut::new();
4041
4042        // Token 1: DONEINPROC — result set with 3 rows
4043        data.put_u8(0xFF); // DONEINPROC
4044        data.put_u16_le(0x0010); // status: COUNT
4045        data.put_u16_le(0x00C1); // cur_cmd: SELECT
4046        data.put_u64_le(3); // row_count
4047
4048        // Token 2: RETURNSTATUS — procedure returned 0
4049        data.put_u8(0x79); // RETURNSTATUS
4050        data.put_i32_le(0);
4051
4052        // Token 3: DONEPROC — final
4053        data.put_u8(0xFE); // DONEPROC
4054        data.put_u16_le(0x0000); // status: no flags
4055        data.put_u16_le(0x00C6); // cur_cmd: EXECUTE
4056        data.put_u64_le(0);
4057
4058        let mut parser = TokenParser::new(data.freeze());
4059
4060        // Token 1: DoneInProc
4061        let t1 = parser.next_token().unwrap().unwrap();
4062        match t1 {
4063            Token::DoneInProc(done) => {
4064                assert!(done.status.count);
4065                assert_eq!(done.row_count, 3);
4066                assert_eq!(done.cur_cmd, 193);
4067            }
4068            _ => panic!("Expected DoneInProc, got {t1:?}"),
4069        }
4070
4071        // Token 2: ReturnStatus
4072        let t2 = parser.next_token().unwrap().unwrap();
4073        match t2 {
4074            Token::ReturnStatus(status) => {
4075                assert_eq!(status, 0);
4076            }
4077            _ => panic!("Expected ReturnStatus, got {t2:?}"),
4078        }
4079
4080        // Token 3: DoneProc
4081        let t3 = parser.next_token().unwrap().unwrap();
4082        match t3 {
4083            Token::DoneProc(done) => {
4084                assert!(!done.status.count);
4085                assert!(!done.status.more);
4086                assert_eq!(done.cur_cmd, 198);
4087            }
4088            _ => panic!("Expected DoneProc, got {t3:?}"),
4089        }
4090
4091        // No more tokens
4092        assert!(parser.next_token().unwrap().is_none());
4093    }
4094
4095    #[test]
4096    fn test_multi_token_error_in_stream() {
4097        // Simulate: ERROR → DONE (error during query)
4098        let mut data = BytesMut::new();
4099
4100        // Token 1: ERROR
4101        data.put_u8(0xAA);
4102
4103        let msg_utf16: Vec<u16> = "Deadlock".encode_utf16().collect();
4104        let srv_utf16: Vec<u16> = "DB1".encode_utf16().collect();
4105
4106        let length: u16 = (4 + 1 + 1
4107            + 2 + (msg_utf16.len() * 2)
4108            + 1 + (srv_utf16.len() * 2)
4109            + 1  // empty procedure
4110            + 4) as u16;
4111
4112        data.put_u16_le(length);
4113        data.put_i32_le(1205); // deadlock
4114        data.put_u8(51); // state
4115        data.put_u8(13); // class
4116
4117        data.put_u16_le(msg_utf16.len() as u16);
4118        for &c in &msg_utf16 {
4119            data.put_u16_le(c);
4120        }
4121        data.put_u8(srv_utf16.len() as u8);
4122        for &c in &srv_utf16 {
4123            data.put_u16_le(c);
4124        }
4125        data.put_u8(0); // empty procedure
4126        data.put_i32_le(0);
4127
4128        // Token 2: DONE with error flag
4129        data.put_u8(0xFD);
4130        data.put_u16_le(0x0002); // DONE_ERROR
4131        data.put_u16_le(0x00C1); // SELECT
4132        data.put_u64_le(0);
4133
4134        let mut parser = TokenParser::new(data.freeze());
4135
4136        // Token 1: Error
4137        let t1 = parser.next_token().unwrap().unwrap();
4138        match t1 {
4139            Token::Error(err) => {
4140                assert_eq!(err.number, 1205);
4141                assert_eq!(err.class, 13);
4142                assert_eq!(err.message, "Deadlock");
4143                assert_eq!(err.server, "DB1");
4144            }
4145            _ => panic!("Expected Error token, got {t1:?}"),
4146        }
4147
4148        // Token 2: Done with error
4149        let t2 = parser.next_token().unwrap().unwrap();
4150        match t2 {
4151            Token::Done(done) => {
4152                assert!(done.status.error);
4153                assert!(!done.status.count);
4154            }
4155            _ => panic!("Expected Done token, got {t2:?}"),
4156        }
4157
4158        assert!(parser.next_token().unwrap().is_none());
4159    }
4160
4161    #[test]
4162    fn test_multi_token_proc_with_return_value() {
4163        // Simulate stored proc: ReturnValue → ReturnStatus → DoneProc
4164        let mut data = BytesMut::new();
4165
4166        // Token 1: ReturnValue (@result = 42)
4167        data.put_u8(0xAC);
4168        data.extend_from_slice(&build_return_value_intn(1, "@result", 0x01, Some(42)));
4169
4170        // Token 2: ReturnStatus = 0
4171        data.put_u8(0x79);
4172        data.put_i32_le(0);
4173
4174        // Token 3: DoneProc
4175        data.put_u8(0xFE);
4176        data.put_u16_le(0x0000);
4177        data.put_u16_le(0x00C6);
4178        data.put_u64_le(0);
4179
4180        let mut parser = TokenParser::new(data.freeze());
4181
4182        let t1 = parser.next_token().unwrap().unwrap();
4183        match t1 {
4184            Token::ReturnValue(rv) => {
4185                assert_eq!(rv.param_name, "@result");
4186                assert_eq!(rv.param_ordinal, 1);
4187            }
4188            _ => panic!("Expected ReturnValue, got {t1:?}"),
4189        }
4190
4191        let t2 = parser.next_token().unwrap().unwrap();
4192        assert!(matches!(t2, Token::ReturnStatus(0)));
4193
4194        let t3 = parser.next_token().unwrap().unwrap();
4195        assert!(matches!(t3, Token::DoneProc(_)));
4196
4197        assert!(parser.next_token().unwrap().is_none());
4198    }
4199
4200    // ========================================================================
4201    // EOF / Truncation Edge Cases
4202    // ========================================================================
4203
4204    #[test]
4205    fn test_return_status_truncated() {
4206        // Only 3 bytes instead of 4 for i32
4207        let data = Bytes::from_static(&[0x79, 0x01, 0x02, 0x03]);
4208        let mut parser = TokenParser::new(data);
4209        assert!(parser.next_token().is_err());
4210    }
4211
4212    #[test]
4213    fn test_done_proc_truncated() {
4214        // Only 8 bytes instead of 12
4215        let data = Bytes::from_static(&[0xFE, 0x00, 0x00, 0xC1, 0x00, 0x01, 0x00, 0x00, 0x00]);
4216        let mut parser = TokenParser::new(data);
4217        assert!(parser.next_token().is_err());
4218    }
4219
4220    #[test]
4221    fn test_server_error_truncated() {
4222        // ERROR token with only the length field (body truncated)
4223        let data = Bytes::from_static(&[0xAA, 0x20, 0x00]);
4224        let mut parser = TokenParser::new(data);
4225        assert!(parser.next_token().is_err());
4226    }
4227
4228    // ========================================================================
4229    // FEDAUTHINFO (issue #189: parser must follow MS-TDS §2.2.7.12)
4230    // ========================================================================
4231
4232    /// Build a spec-exact FEDAUTHINFO token (including the 0xEE type byte):
4233    /// DWORD TokenLength, DWORD CountOfInfoIDs, option headers of
4234    /// ID/DataLen/DataOffset, then UTF-16LE data addressed by the offsets
4235    /// (relative to the start of the count field).
4236    fn build_fed_auth_info_token(options: &[(u8, &str)]) -> Vec<u8> {
4237        let headers_end = 4 + options.len() * 9;
4238        let mut data_block = Vec::new();
4239        let mut headers = Vec::new();
4240        for (id, value) in options {
4241            let encoded: Vec<u8> = value.encode_utf16().flat_map(u16::to_le_bytes).collect();
4242            let offset = headers_end + data_block.len();
4243            headers.push(*id);
4244            headers.extend_from_slice(&u32::try_from(encoded.len()).unwrap().to_le_bytes());
4245            headers.extend_from_slice(&u32::try_from(offset).unwrap().to_le_bytes());
4246            data_block.extend_from_slice(&encoded);
4247        }
4248
4249        let token_len = 4 + headers.len() + data_block.len();
4250        let mut out = vec![0xEE];
4251        out.extend_from_slice(&u32::try_from(token_len).unwrap().to_le_bytes());
4252        out.extend_from_slice(&u32::try_from(options.len()).unwrap().to_le_bytes());
4253        out.extend_from_slice(&headers);
4254        out.extend_from_slice(&data_block);
4255        out
4256    }
4257
4258    #[test]
4259    fn test_fed_auth_info_decodes_spec_layout() {
4260        const STS: &str = "https://login.microsoftonline.com/common";
4261        const SPN: &str = "https://database.windows.net/";
4262        // STSURL (0x01) listed first, SPN (0x02) second. Real Azure servers
4263        // list SPN first (see the captured-token test below); decoding must
4264        // not depend on option order.
4265        let token = build_fed_auth_info_token(&[(0x01, STS), (0x02, SPN)]);
4266
4267        let mut parser = TokenParser::new(Bytes::from(token));
4268        let parsed = parser.next_token().unwrap().unwrap();
4269        let Token::FedAuthInfo(info) = parsed else {
4270            panic!("expected FedAuthInfo, got {parsed:?}");
4271        };
4272        assert_eq!(info.sts_url, STS);
4273        assert_eq!(info.spn, SPN);
4274        assert!(parser.next_token().unwrap().is_none(), "exact consumption");
4275    }
4276
4277    #[test]
4278    fn test_fed_auth_info_preserves_following_tokens() {
4279        // The old parser looped over the whole remaining stream, swallowing
4280        // the tokens that follow FEDAUTHINFO during login. A DONE token
4281        // appended after it must survive.
4282        let mut stream = build_fed_auth_info_token(&[
4283            (0x01, "https://sts.example/"),
4284            (0x02, "https://db.example/"),
4285        ]);
4286        stream.push(0xFD); // DONE
4287        stream.extend_from_slice(&0u16.to_le_bytes()); // status
4288        stream.extend_from_slice(&0u16.to_le_bytes()); // curcmd
4289        stream.extend_from_slice(&0u64.to_le_bytes()); // rowcount
4290
4291        let mut parser = TokenParser::new(Bytes::from(stream));
4292        assert!(matches!(
4293            parser.next_token().unwrap(),
4294            Some(Token::FedAuthInfo(_))
4295        ));
4296        assert!(
4297            matches!(parser.next_token().unwrap(), Some(Token::Done(_))),
4298            "DONE after FEDAUTHINFO must not be swallowed"
4299        );
4300        assert!(parser.next_token().unwrap().is_none());
4301    }
4302
4303    #[test]
4304    fn test_fed_auth_info_unknown_ids_ignored() {
4305        // Spec: unrecognized FedAuthInfoIDs must be ignored.
4306        let token =
4307            build_fed_auth_info_token(&[(0x7F, "ignore-me"), (0x01, "https://sts.example/")]);
4308        let mut parser = TokenParser::new(Bytes::from(token));
4309        let Some(Token::FedAuthInfo(info)) = parser.next_token().unwrap() else {
4310            panic!("expected FedAuthInfo");
4311        };
4312        assert_eq!(info.sts_url, "https://sts.example/");
4313        assert_eq!(info.spn, "");
4314    }
4315
4316    #[test]
4317    fn test_fed_auth_info_hostile_inputs_error() {
4318        // TokenLength longer than the buffer.
4319        let mut truncated = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4320        truncated.truncate(truncated.len() - 4);
4321        assert!(
4322            TokenParser::new(Bytes::from(truncated))
4323                .next_token()
4324                .is_err()
4325        );
4326
4327        // CountOfInfoIDs claims more headers than the token holds
4328        // (also covers hostile counts whose header math would overflow).
4329        let mut bad_count = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4330        bad_count[5..9].copy_from_slice(&u32::MAX.to_le_bytes());
4331        assert!(
4332            TokenParser::new(Bytes::from(bad_count))
4333                .next_token()
4334                .is_err()
4335        );
4336
4337        // Data offset pointing past the end of the token.
4338        let mut bad_offset = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4339        bad_offset[14..18].copy_from_slice(&u32::MAX.to_le_bytes());
4340        assert!(
4341            TokenParser::new(Bytes::from(bad_offset))
4342                .next_token()
4343                .is_err()
4344        );
4345
4346        // Odd data length cannot be UTF-16.
4347        let mut odd_len = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4348        odd_len[10..14].copy_from_slice(&3u32.to_le_bytes());
4349        assert!(TokenParser::new(Bytes::from(odd_len)).next_token().is_err());
4350    }
4351
4352    #[test]
4353    fn test_fed_auth_info_parse_and_skip_agree() {
4354        // Issue #189: decode() and skip_token() must consume the same bytes
4355        // (the old decode ran past the token while skip honored the length).
4356        let token = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4357        let total = token.len();
4358
4359        let mut parser = TokenParser::new(Bytes::from(token.clone()));
4360        parser.next_token().unwrap();
4361        assert_eq!(parser.position(), total, "decode consumption");
4362
4363        let mut skipper = TokenParser::new(Bytes::from(token));
4364        skipper.skip_token().unwrap();
4365        assert_eq!(skipper.position(), total, "skip consumption");
4366    }
4367
4368    /// A FEDAUTHINFO token captured from a live Azure SQL Database login on
4369    /// 2026-06-12 (the client declared the ADAL library in LOGIN7; the server
4370    /// responded with this token). The tenant GUID inside the STS URL is
4371    /// replaced with an all-zero GUID of identical length, so every offset
4372    /// and length is byte-identical to the wire capture.
4373    ///
4374    /// This is the regression test deferred from PR #193, and it earns its
4375    /// keep: the real token proves FedAuthInfoID 0x01 = STSURL and
4376    /// 0x02 = SPN (Azure lists SPN first), which the synthetic tests
4377    /// originally had swapped.
4378    #[test]
4379    fn test_fed_auth_info_captured_from_azure() {
4380        const CAPTURED: &[u8] = &[
4381            0xEE, 0xCC, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x3A, 0x00, 0x00, 0x00,
4382            0x16, 0x00, 0x00, 0x00, 0x01, 0x7C, 0x00, 0x00, 0x00, 0x50, 0x00, 0x00, 0x00, 0x68,
4383            0x00, 0x74, 0x00, 0x74, 0x00, 0x70, 0x00, 0x73, 0x00, 0x3A, 0x00, 0x2F, 0x00, 0x2F,
4384            0x00, 0x64, 0x00, 0x61, 0x00, 0x74, 0x00, 0x61, 0x00, 0x62, 0x00, 0x61, 0x00, 0x73,
4385            0x00, 0x65, 0x00, 0x2E, 0x00, 0x77, 0x00, 0x69, 0x00, 0x6E, 0x00, 0x64, 0x00, 0x6F,
4386            0x00, 0x77, 0x00, 0x73, 0x00, 0x2E, 0x00, 0x6E, 0x00, 0x65, 0x00, 0x74, 0x00, 0x2F,
4387            0x00, 0x68, 0x00, 0x74, 0x00, 0x74, 0x00, 0x70, 0x00, 0x73, 0x00, 0x3A, 0x00, 0x2F,
4388            0x00, 0x2F, 0x00, 0x6C, 0x00, 0x6F, 0x00, 0x67, 0x00, 0x69, 0x00, 0x6E, 0x00, 0x2E,
4389            0x00, 0x77, 0x00, 0x69, 0x00, 0x6E, 0x00, 0x64, 0x00, 0x6F, 0x00, 0x77, 0x00, 0x73,
4390            0x00, 0x2E, 0x00, 0x6E, 0x00, 0x65, 0x00, 0x74, 0x00, 0x2F, 0x00, 0x30, 0x00, 0x30,
4391            0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x2D,
4392            0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x2D, 0x00, 0x30, 0x00, 0x30,
4393            0x00, 0x30, 0x00, 0x30, 0x00, 0x2D, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30,
4394            0x00, 0x2D, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30,
4395            0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00,
4396        ];
4397
4398        let mut parser = TokenParser::new(Bytes::from_static(CAPTURED));
4399        let Some(Token::FedAuthInfo(info)) = parser.next_token().unwrap() else {
4400            panic!("expected FedAuthInfo");
4401        };
4402        assert_eq!(
4403            info.sts_url,
4404            "https://login.windows.net/00000000-0000-0000-0000-000000000000"
4405        );
4406        assert_eq!(info.spn, "https://database.windows.net/");
4407        assert!(
4408            parser.next_token().unwrap().is_none(),
4409            "the captured token must be consumed exactly"
4410        );
4411    }
4412
4413    /// Regression test for #273 (remote DoS via unbounded token-skip recursion).
4414    ///
4415    /// COLINFO/TABNAME/OFFSET tokens are skipped, and the skip path used to
4416    /// self-recurse to fetch the next token — one real stack frame per skipped
4417    /// token (Rust guarantees no tail-call elimination). Because
4418    /// `Connection::read_message` reassembles every packet into one `Bytes`
4419    /// before tokenizing, a server-controlled multi-MB message of these 3-byte
4420    /// tokens drove ~10^5–10^6 frames and aborted the process.
4421    ///
4422    /// This feeds a flat run of 200_000 skip-tokens — far deeper than the
4423    /// (~2 MB) test-thread stack could survive by recursion — followed by a
4424    /// real DONE. Pre-fix, the recursive parser aborted (SIGABRT) on this
4425    /// input. Post-fix it must return that DONE with the whole buffer consumed,
4426    /// proving the skip path is bounded-stack and that the input was
4427    /// non-degenerate (all 200_000 tokens were actually traversed).
4428    #[test]
4429    fn skip_tokens_iterate_not_recurse_273() {
4430        const SKIP_COUNT: usize = 200_000;
4431        let mut buf = BytesMut::with_capacity(SKIP_COUNT * 3 + 13);
4432        for _ in 0..SKIP_COUNT {
4433            buf.put_u8(TokenType::ColInfo as u8);
4434            buf.put_u16_le(0); // zero-length body: 3 bytes total per skip-token
4435        }
4436        let done = Done {
4437            status: DoneStatus {
4438                more: false,
4439                error: false,
4440                in_xact: false,
4441                count: true,
4442                attn: false,
4443                srverror: false,
4444            },
4445            cur_cmd: 0xABCD,
4446            row_count: 99,
4447        };
4448        done.encode(&mut buf);
4449        let total_len = buf.len();
4450
4451        let mut parser = TokenParser::new(buf.freeze());
4452
4453        // A specific outcome — the DONE that follows the skip run — not a
4454        // generic is_err an unrelated parse failure could also satisfy.
4455        let Some(Token::Done(decoded)) = parser.next_token().unwrap() else {
4456            panic!("expected the DONE token after the skip run");
4457        };
4458        assert_eq!(decoded.cur_cmd, 0xABCD);
4459        assert_eq!(decoded.row_count, 99);
4460
4461        // Every skipped token was traversed: proof the input was non-trivial.
4462        assert_eq!(parser.position(), total_len);
4463        assert!(parser.next_token().unwrap().is_none());
4464    }
4465
4466    /// Build an ALTMETADATA token describing one `SUM` aggregate over an `int`
4467    /// column (empty name), followed by `rows` ALTROW tokens each carrying that
4468    /// int value. Mirrors the wire shape of a `COMPUTE BY` result set.
4469    fn compute_by_tokens(id: u16, rows: &[i32]) -> BytesMut {
4470        let mut buf = BytesMut::new();
4471        // ALTMETADATA
4472        buf.put_u8(TokenType::AltMetaData as u8);
4473        buf.put_u16_le(1); // Count = 1 ComputeData
4474        buf.put_u16_le(id); // Id
4475        buf.put_u8(1); // ByCols = 1
4476        buf.put_u16_le(1); // ColNum[0]
4477        buf.put_u8(0x4D); // Op = AOPSUM
4478        buf.put_u16_le(1); // Operand (column 1)
4479        buf.put_u32_le(0); // UserType (ULONG)
4480        buf.put_u16_le(0); // Flags
4481        buf.put_u8(TypeId::Int4 as u8); // TYPE_INFO (fixed-length int4)
4482        buf.put_u8(0); // ColName: B_VARCHAR length 0
4483        // ALTROW(s)
4484        for &v in rows {
4485            buf.put_u8(TokenType::AltRow as u8);
4486            buf.put_u16_le(id); // Id matching the ALTMETADATA
4487            buf.put_i32_le(v); // int4 value
4488        }
4489        buf
4490    }
4491
4492    fn trailing_done(buf: &mut BytesMut) {
4493        let done = Done {
4494            status: DoneStatus {
4495                more: false,
4496                error: false,
4497                in_xact: false,
4498                count: true,
4499                attn: false,
4500                srverror: false,
4501            },
4502            cur_cmd: 0xBEEF,
4503            row_count: 3,
4504        };
4505        done.encode(buf);
4506    }
4507
4508    /// #275: a COMPUTE BY result set (ALTMETADATA 0x88 + ALTROW 0xD3) must be
4509    /// silently consumed, not hard-error. The DONE following the compute tokens
4510    /// must still be delivered, and every byte traversed.
4511    #[test]
4512    fn compute_by_alt_tokens_skipped_275() {
4513        let mut buf = compute_by_tokens(7, &[42, -1]);
4514        trailing_done(&mut buf);
4515        let total_len = buf.len();
4516
4517        let mut parser = TokenParser::new(buf.freeze());
4518
4519        // The first token surfaced is the DONE — both ALTROWs and the
4520        // ALTMETADATA were dropped, not returned.
4521        let Some(Token::Done(done)) = parser.next_token().unwrap() else {
4522            panic!("expected DONE after the COMPUTE BY tokens were skipped");
4523        };
4524        assert_eq!(done.cur_cmd, 0xBEEF);
4525
4526        // All alt-token bytes were consumed to reach the DONE.
4527        assert_eq!(parser.position(), total_len);
4528        assert!(parser.next_token().unwrap().is_none());
4529    }
4530
4531    /// #275: ALTMETADATA must be remembered across calls so a following
4532    /// ColMetaData (new result set) clears stale compute Ids.
4533    #[test]
4534    fn compute_by_alt_metadata_cleared_on_new_colmetadata_275() {
4535        let mut buf = compute_by_tokens(7, &[42]);
4536        // New result set: a COLMETADATA token clears alt_metadata. Encode an
4537        // empty (NO_METADATA) ColMetaData, then reuse Id 7 for a fresh compute.
4538        buf.put_u8(TokenType::ColMetaData as u8);
4539        buf.put_u16_le(ColMetaData::NO_METADATA);
4540        buf.extend_from_slice(&compute_by_tokens(7, &[99]));
4541        trailing_done(&mut buf);
4542        let total_len = buf.len();
4543
4544        let mut parser = TokenParser::new(buf.freeze());
4545
4546        // First surfaced token: the empty ColMetaData (compute tokens dropped).
4547        let Some(Token::ColMetaData(_)) = parser.next_token().unwrap() else {
4548            panic!("expected ColMetaData between the two compute groups");
4549        };
4550        // Then the DONE, after the second compute group is also dropped.
4551        let Some(Token::Done(_)) = parser.next_token().unwrap() else {
4552            panic!("expected DONE after the second COMPUTE BY group");
4553        };
4554        assert_eq!(parser.position(), total_len);
4555    }
4556
4557    /// #275: an ALTROW with no matching ALTMETADATA cannot be length-decoded;
4558    /// it must error cleanly rather than panic or desync.
4559    #[test]
4560    fn altrow_without_altmetadata_errors_275() {
4561        let mut buf = BytesMut::new();
4562        buf.put_u8(TokenType::AltRow as u8);
4563        buf.put_u16_le(7); // Id with no preceding ALTMETADATA
4564        buf.put_i32_le(42);
4565
4566        let mut parser = TokenParser::new(buf.freeze());
4567        assert!(parser.next_token().is_err());
4568    }
4569}