Skip to main content

tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,no_run
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! fn parse(data: Bytes) -> Result<(), tds_protocol::ProtocolError> {
19//!     let mut parser = TokenParser::new(data);
20//!
21//!     while let Some(token) = parser.next_token()? {
22//!         match token {
23//!             Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!             Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!             _ => {}
26//!         }
27//!     }
28//!     Ok(())
29//! }
30//! ```
31
32use bytes::{Buf, BufMut, Bytes};
33
34use crate::codec::{read_b_varchar, read_us_varchar};
35use crate::error::ProtocolError;
36use crate::prelude::*;
37use crate::types::TypeId;
38
39/// Token type identifier.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41#[repr(u8)]
42#[non_exhaustive]
43pub enum TokenType {
44    /// Column metadata (COLMETADATA).
45    ColMetaData = 0x81,
46    /// Error message (ERROR).
47    Error = 0xAA,
48    /// Informational message (INFO).
49    Info = 0xAB,
50    /// Login acknowledgment (LOGINACK).
51    LoginAck = 0xAD,
52    /// Row data (ROW).
53    Row = 0xD1,
54    /// Null bitmap compressed row (NBCROW).
55    NbcRow = 0xD2,
56    /// Environment change (ENVCHANGE).
57    EnvChange = 0xE3,
58    /// SSPI authentication (SSPI).
59    Sspi = 0xED,
60    /// Done (DONE).
61    Done = 0xFD,
62    /// Done in procedure (DONEINPROC).
63    DoneInProc = 0xFF,
64    /// Done procedure (DONEPROC).
65    DoneProc = 0xFE,
66    /// Return status (RETURNSTATUS).
67    ReturnStatus = 0x79,
68    /// Return value (RETURNVALUE).
69    ReturnValue = 0xAC,
70    /// Order (ORDER).
71    Order = 0xA9,
72    /// Feature extension acknowledgment (FEATUREEXTACK).
73    FeatureExtAck = 0xAE,
74    /// Session state (SESSIONSTATE).
75    SessionState = 0xE4,
76    /// Federated authentication info (FEDAUTHINFO).
77    FedAuthInfo = 0xEE,
78    /// Column info (COLINFO).
79    ColInfo = 0xA5,
80    /// Table name (TABNAME).
81    TabName = 0xA4,
82    /// Offset (OFFSET).
83    Offset = 0x78,
84}
85
86impl TokenType {
87    /// Create a token type from a raw byte.
88    pub fn from_u8(value: u8) -> Option<Self> {
89        match value {
90            0x81 => Some(Self::ColMetaData),
91            0xAA => Some(Self::Error),
92            0xAB => Some(Self::Info),
93            0xAD => Some(Self::LoginAck),
94            0xD1 => Some(Self::Row),
95            0xD2 => Some(Self::NbcRow),
96            0xE3 => Some(Self::EnvChange),
97            0xED => Some(Self::Sspi),
98            0xFD => Some(Self::Done),
99            0xFF => Some(Self::DoneInProc),
100            0xFE => Some(Self::DoneProc),
101            0x79 => Some(Self::ReturnStatus),
102            0xAC => Some(Self::ReturnValue),
103            0xA9 => Some(Self::Order),
104            0xAE => Some(Self::FeatureExtAck),
105            0xE4 => Some(Self::SessionState),
106            0xEE => Some(Self::FedAuthInfo),
107            0xA5 => Some(Self::ColInfo),
108            0xA4 => Some(Self::TabName),
109            0x78 => Some(Self::Offset),
110            _ => None,
111        }
112    }
113}
114
115/// Parsed TDS token.
116///
117/// This enum represents all possible tokens that can be received from SQL Server.
118/// Each variant contains the parsed token data.
119#[derive(Debug, Clone)]
120#[non_exhaustive]
121pub enum Token {
122    /// Column metadata describing result set structure.
123    ColMetaData(ColMetaData),
124    /// Row data.
125    Row(RawRow),
126    /// Null bitmap compressed row.
127    NbcRow(NbcRow),
128    /// Completion of a SQL statement.
129    Done(Done),
130    /// Completion of a stored procedure.
131    DoneProc(DoneProc),
132    /// Completion within a stored procedure.
133    DoneInProc(DoneInProc),
134    /// Return status from stored procedure.
135    ReturnStatus(i32),
136    /// Return value from stored procedure.
137    ReturnValue(ReturnValue),
138    /// Error message from server.
139    Error(ServerError),
140    /// Informational message from server.
141    Info(ServerInfo),
142    /// Login acknowledgment.
143    LoginAck(LoginAck),
144    /// Environment change notification.
145    EnvChange(EnvChange),
146    /// Column ordering information.
147    Order(Order),
148    /// Feature extension acknowledgment.
149    FeatureExtAck(FeatureExtAck),
150    /// SSPI authentication data.
151    Sspi(SspiToken),
152    /// Session state information.
153    SessionState(SessionState),
154    /// Federated authentication info.
155    FedAuthInfo(FedAuthInfo),
156}
157
158/// Column metadata token.
159#[derive(Debug, Clone, Default)]
160pub struct ColMetaData {
161    /// Column definitions.
162    pub columns: Vec<ColumnData>,
163    /// CEK table for Always Encrypted result sets.
164    /// Present only when the server sends encrypted column metadata.
165    pub cek_table: Option<crate::crypto::CekTable>,
166}
167
168/// Column definition within metadata.
169#[derive(Debug, Clone)]
170pub struct ColumnData {
171    /// Column name.
172    pub name: String,
173    /// Column data type ID.
174    pub type_id: TypeId,
175    /// Column data type raw byte (for unknown types).
176    pub col_type: u8,
177    /// Column flags.
178    pub flags: u16,
179    /// User type ID.
180    pub user_type: u32,
181    /// Type-specific metadata.
182    pub type_info: TypeInfo,
183    /// Per-column encryption metadata (Always Encrypted).
184    /// Present only for columns with the encrypted flag (0x0800) set.
185    pub crypto_metadata: Option<crate::crypto::CryptoMetadata>,
186}
187
188/// Type-specific metadata.
189#[derive(Debug, Clone, Default)]
190pub struct TypeInfo {
191    /// Maximum length for variable-length types.
192    pub max_length: Option<u32>,
193    /// Precision for numeric types.
194    pub precision: Option<u8>,
195    /// Scale for numeric types.
196    pub scale: Option<u8>,
197    /// Collation for string types.
198    pub collation: Option<Collation>,
199}
200
201/// SQL Server collation.
202///
203/// Collations in SQL Server define the character encoding and sorting rules
204/// for string data. For `VARCHAR` columns, the collation determines which
205/// code page (character encoding) is used to store the data.
206///
207/// # Encoding Support
208///
209/// When the `encoding` feature is enabled, the [`Collation::encoding()`] method
210/// returns the appropriate [`encoding_rs::Encoding`] for decoding `VARCHAR` data.
211///
212/// # Example
213///
214/// ```rust,ignore
215/// use tds_protocol::token::Collation;
216///
217/// let collation = Collation { lcid: 0x0804, sort_id: 0 }; // Chinese (PRC)
218/// if let Some(encoding) = collation.encoding() {
219///     let (decoded, _, _) = encoding.decode(raw_bytes);
220///     // decoded is now proper Chinese text
221/// }
222/// ```
223#[derive(Debug, Clone, Copy, Default)]
224pub struct Collation {
225    /// Locale ID (LCID).
226    ///
227    /// The LCID encodes both the language and region. The lower 16 bits
228    /// contain the primary language ID, and bits 16-19 contain the sort ID
229    /// for some collations.
230    ///
231    /// For UTF-8 collations (SQL Server 2019+), fUTF8 (bit 26, 0x0400_0000) is set.
232    pub lcid: u32,
233    /// Sort ID.
234    ///
235    /// Used with certain collations to specify sorting behavior.
236    pub sort_id: u8,
237}
238
239impl Collation {
240    /// Create a `Collation` from the 5-byte TDS wire format.
241    ///
242    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
243    pub fn from_bytes(bytes: &[u8; 5]) -> Self {
244        Self {
245            lcid: u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
246            sort_id: bytes[4],
247        }
248    }
249
250    /// Serialize to the 5-byte TDS wire format.
251    ///
252    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
253    pub fn to_bytes(&self) -> [u8; 5] {
254        let b = self.lcid.to_le_bytes();
255        [b[0], b[1], b[2], b[3], self.sort_id]
256    }
257
258    /// Returns the character encoding for this collation.
259    ///
260    /// This method maps the collation's LCID to the appropriate character
261    /// encoding from the `encoding_rs` crate.
262    ///
263    /// # Returns
264    ///
265    /// - `Some(&Encoding)` - The encoding to use for decoding `VARCHAR` data
266    /// - `None` - If the collation uses UTF-8 (no transcoding needed) or
267    ///   the LCID is not recognized (caller should use Windows-1252 fallback)
268    ///
269    /// # UTF-8 Collations
270    ///
271    /// SQL Server 2019+ supports UTF-8 collations (identified by the `_UTF8`
272    /// suffix). These return `None` because no transcoding is needed.
273    ///
274    /// # Example
275    ///
276    /// ```rust,ignore
277    /// let collation = Collation { lcid: 0x0419, sort_id: 0 }; // Russian
278    /// if let Some(encoding) = collation.encoding() {
279    ///     // encoding is Windows-1251 for Cyrillic
280    ///     let (text, _, had_errors) = encoding.decode(&raw_bytes);
281    /// }
282    /// ```
283    #[cfg(feature = "encoding")]
284    pub fn encoding(&self) -> Option<&'static encoding_rs::Encoding> {
285        // A non-zero SortId means a SQL collation, whose code page derives
286        // from the SortId, not the LCID (MS-TDS). Consulting only the LCID
287        // silently decoded these as windows-1252 (issue #158).
288        if self.sort_id != 0 {
289            return crate::collation::encoding_for_sort_id(self.sort_id);
290        }
291        crate::collation::encoding_for_lcid(self.lcid)
292    }
293
294    /// Returns whether this collation uses UTF-8 encoding.
295    ///
296    /// UTF-8 collations were introduced in SQL Server 2019 and are
297    /// identified by the `_UTF8` suffix in the collation name.
298    #[cfg(feature = "encoding")]
299    pub fn is_utf8(&self) -> bool {
300        crate::collation::is_utf8_collation(self.lcid)
301    }
302
303    /// Returns the Windows code page number for this collation.
304    ///
305    /// Useful for error messages and debugging.
306    ///
307    /// # Returns
308    ///
309    /// The code page number (e.g., 1252 for Western European, 932 for Japanese).
310    #[cfg(feature = "encoding")]
311    pub fn code_page(&self) -> Option<u16> {
312        // SQL collations (non-zero SortId) derive their code page from the
313        // SortId, not the LCID (issue #158).
314        if self.sort_id != 0 {
315            return crate::collation::code_page_for_sort_id(self.sort_id);
316        }
317        crate::collation::code_page_for_lcid(self.lcid)
318    }
319
320    /// Returns the encoding name for this collation.
321    ///
322    /// Useful for error messages and debugging.
323    #[cfg(feature = "encoding")]
324    pub fn encoding_name(&self) -> &'static str {
325        if self.sort_id != 0 {
326            return match crate::collation::encoding_for_sort_id(self.sort_id) {
327                Some(enc) => enc.name(),
328                None => "unsupported",
329            };
330        }
331        crate::collation::encoding_name_for_lcid(self.lcid)
332    }
333}
334
335/// Raw row data (not yet decoded).
336#[derive(Debug, Clone)]
337pub struct RawRow {
338    /// Raw column values.
339    pub data: bytes::Bytes,
340}
341
342/// Null bitmap compressed row.
343#[derive(Debug, Clone)]
344pub struct NbcRow {
345    /// Null bitmap.
346    pub null_bitmap: Vec<u8>,
347    /// Raw non-null column values.
348    pub data: bytes::Bytes,
349}
350
351/// Done token indicating statement completion.
352#[derive(Debug, Clone, Copy)]
353pub struct Done {
354    /// Status flags.
355    pub status: DoneStatus,
356    /// Current command.
357    pub cur_cmd: u16,
358    /// Row count (if applicable).
359    pub row_count: u64,
360}
361
362/// Done status flags.
363#[derive(Debug, Clone, Copy, Default)]
364#[non_exhaustive]
365pub struct DoneStatus {
366    /// More results follow.
367    pub more: bool,
368    /// Error occurred.
369    pub error: bool,
370    /// Transaction in progress.
371    pub in_xact: bool,
372    /// Row count is valid.
373    pub count: bool,
374    /// Attention acknowledgment.
375    pub attn: bool,
376    /// Server error caused statement termination.
377    pub srverror: bool,
378}
379
380/// Done in procedure token.
381#[derive(Debug, Clone, Copy)]
382pub struct DoneInProc {
383    /// Status flags.
384    pub status: DoneStatus,
385    /// Current command.
386    pub cur_cmd: u16,
387    /// Row count.
388    pub row_count: u64,
389}
390
391/// Done procedure token.
392#[derive(Debug, Clone, Copy)]
393pub struct DoneProc {
394    /// Status flags.
395    pub status: DoneStatus,
396    /// Current command.
397    pub cur_cmd: u16,
398    /// Row count.
399    pub row_count: u64,
400}
401
402/// Return value from stored procedure.
403#[derive(Debug, Clone)]
404#[non_exhaustive]
405pub struct ReturnValue {
406    /// Parameter ordinal.
407    pub param_ordinal: u16,
408    /// Parameter name.
409    pub param_name: String,
410    /// Status flags.
411    pub status: u8,
412    /// User type.
413    pub user_type: u32,
414    /// Type flags.
415    pub flags: u16,
416    /// Raw column type byte from the wire.
417    pub col_type: u8,
418    /// Type info.
419    pub type_info: TypeInfo,
420    /// Value data.
421    pub value: bytes::Bytes,
422}
423
424/// Server error message.
425#[derive(Debug, Clone)]
426pub struct ServerError {
427    /// Error number.
428    pub number: i32,
429    /// Error state.
430    pub state: u8,
431    /// Error severity class.
432    pub class: u8,
433    /// Error message text.
434    pub message: String,
435    /// Server name.
436    pub server: String,
437    /// Procedure name.
438    pub procedure: String,
439    /// Line number.
440    pub line: i32,
441}
442
443/// Server informational message.
444#[derive(Debug, Clone)]
445pub struct ServerInfo {
446    /// Info number.
447    pub number: i32,
448    /// Info state.
449    pub state: u8,
450    /// Info class (severity).
451    pub class: u8,
452    /// Info message text.
453    pub message: String,
454    /// Server name.
455    pub server: String,
456    /// Procedure name.
457    pub procedure: String,
458    /// Line number.
459    pub line: i32,
460}
461
462/// Login acknowledgment token.
463#[derive(Debug, Clone)]
464pub struct LoginAck {
465    /// Interface type.
466    pub interface: u8,
467    /// TDS version.
468    pub tds_version: u32,
469    /// Program name.
470    pub prog_name: String,
471    /// Program version.
472    pub prog_version: u32,
473}
474
475/// Environment change token.
476#[derive(Debug, Clone)]
477pub struct EnvChange {
478    /// Type of environment change.
479    pub env_type: EnvChangeType,
480    /// New value.
481    pub new_value: EnvChangeValue,
482    /// Old value.
483    pub old_value: EnvChangeValue,
484}
485
486/// Environment change type.
487#[derive(Debug, Clone, Copy, PartialEq, Eq)]
488#[repr(u8)]
489#[non_exhaustive]
490pub enum EnvChangeType {
491    /// Database changed.
492    Database = 1,
493    /// Language changed.
494    Language = 2,
495    /// Character set changed.
496    CharacterSet = 3,
497    /// Packet size changed.
498    PacketSize = 4,
499    /// Unicode data sorting locale ID.
500    UnicodeSortingLocalId = 5,
501    /// Unicode comparison flags.
502    UnicodeComparisonFlags = 6,
503    /// SQL collation.
504    SqlCollation = 7,
505    /// Begin transaction.
506    BeginTransaction = 8,
507    /// Commit transaction.
508    CommitTransaction = 9,
509    /// Rollback transaction.
510    RollbackTransaction = 10,
511    /// Enlist DTC transaction.
512    EnlistDtcTransaction = 11,
513    /// Defect DTC transaction.
514    DefectTransaction = 12,
515    /// Real-time log shipping.
516    RealTimeLogShipping = 13,
517    /// Promote transaction.
518    PromoteTransaction = 15,
519    /// Transaction manager address.
520    TransactionManagerAddress = 16,
521    /// Transaction ended.
522    TransactionEnded = 17,
523    /// Reset connection completion acknowledgment.
524    ResetConnectionCompletionAck = 18,
525    /// User instance started.
526    UserInstanceStarted = 19,
527    /// Routing information.
528    Routing = 20,
529}
530
531/// Environment change value.
532#[derive(Debug, Clone)]
533#[non_exhaustive]
534pub enum EnvChangeValue {
535    /// String value.
536    String(String),
537    /// Binary value.
538    Binary(bytes::Bytes),
539    /// Routing information.
540    Routing {
541        /// Host name.
542        host: String,
543        /// Port number.
544        port: u16,
545    },
546}
547
548/// Column ordering information.
549#[derive(Debug, Clone)]
550pub struct Order {
551    /// Ordered column indices.
552    pub columns: Vec<u16>,
553}
554
555/// Feature extension acknowledgment.
556#[derive(Debug, Clone)]
557pub struct FeatureExtAck {
558    /// Acknowledged features.
559    pub features: Vec<FeatureAck>,
560}
561
562/// Individual feature acknowledgment.
563#[derive(Debug, Clone)]
564pub struct FeatureAck {
565    /// Feature ID.
566    pub feature_id: u8,
567    /// Feature data.
568    pub data: bytes::Bytes,
569}
570
571/// SSPI authentication token.
572#[derive(Debug, Clone)]
573pub struct SspiToken {
574    /// SSPI data.
575    pub data: bytes::Bytes,
576}
577
578/// Session state token.
579#[derive(Debug, Clone)]
580pub struct SessionState {
581    /// Session state data.
582    pub data: bytes::Bytes,
583}
584
585/// Federated authentication info.
586#[derive(Debug, Clone)]
587pub struct FedAuthInfo {
588    /// STS URL.
589    pub sts_url: String,
590    /// Service principal name.
591    pub spn: String,
592}
593
594// =============================================================================
595// ColMetaData and Row Parsing Implementation
596// =============================================================================
597
598/// Decode collation information (5 bytes).
599///
600/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
601pub(crate) fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
602    if src.remaining() < 5 {
603        return Err(ProtocolError::UnexpectedEof);
604    }
605    // Collation: LCID (4 bytes) + Sort ID (1 byte)
606    let lcid = src.get_u32_le();
607    let sort_id = src.get_u8();
608    Ok(Collation { lcid, sort_id })
609}
610
611/// Decode type-specific metadata for a column based on its TypeId.
612///
613/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
614pub(crate) fn decode_type_info(
615    src: &mut impl Buf,
616    type_id: TypeId,
617    col_type: u8,
618) -> Result<TypeInfo, ProtocolError> {
619    match type_id {
620        // Fixed-length types have no additional metadata
621        TypeId::Null => Ok(TypeInfo::default()),
622        TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
623        TypeId::Int2 => Ok(TypeInfo::default()),
624        TypeId::Int4 => Ok(TypeInfo::default()),
625        TypeId::Int8 => Ok(TypeInfo::default()),
626        TypeId::Float4 => Ok(TypeInfo::default()),
627        TypeId::Float8 => Ok(TypeInfo::default()),
628        TypeId::Money => Ok(TypeInfo::default()),
629        TypeId::Money4 => Ok(TypeInfo::default()),
630        TypeId::DateTime => Ok(TypeInfo::default()),
631        TypeId::DateTime4 => Ok(TypeInfo::default()),
632
633        // Variable length integer/float/money (1-byte max length)
634        TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
635            if src.remaining() < 1 {
636                return Err(ProtocolError::UnexpectedEof);
637            }
638            let max_length = src.get_u8() as u32;
639            Ok(TypeInfo {
640                max_length: Some(max_length),
641                ..Default::default()
642            })
643        }
644
645        // GUID has 1-byte length
646        TypeId::Guid => {
647            if src.remaining() < 1 {
648                return Err(ProtocolError::UnexpectedEof);
649            }
650            let max_length = src.get_u8() as u32;
651            Ok(TypeInfo {
652                max_length: Some(max_length),
653                ..Default::default()
654            })
655        }
656
657        // Decimal/Numeric types (1-byte length + precision + scale)
658        TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
659            if src.remaining() < 3 {
660                return Err(ProtocolError::UnexpectedEof);
661            }
662            let max_length = src.get_u8() as u32;
663            let precision = src.get_u8();
664            let scale = src.get_u8();
665            Ok(TypeInfo {
666                max_length: Some(max_length),
667                precision: Some(precision),
668                scale: Some(scale),
669                ..Default::default()
670            })
671        }
672
673        // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
674        TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
675            if src.remaining() < 1 {
676                return Err(ProtocolError::UnexpectedEof);
677            }
678            let max_length = src.get_u8() as u32;
679            Ok(TypeInfo {
680                max_length: Some(max_length),
681                ..Default::default()
682            })
683        }
684
685        // Big varchar/binary with 2-byte length + collation for strings
686        TypeId::BigVarChar | TypeId::BigChar => {
687            if src.remaining() < 7 {
688                // 2 (length) + 5 (collation)
689                return Err(ProtocolError::UnexpectedEof);
690            }
691            let max_length = src.get_u16_le() as u32;
692            let collation = decode_collation(src)?;
693            Ok(TypeInfo {
694                max_length: Some(max_length),
695                collation: Some(collation),
696                ..Default::default()
697            })
698        }
699
700        // Big binary (2-byte length, no collation)
701        TypeId::BigVarBinary | TypeId::BigBinary => {
702            if src.remaining() < 2 {
703                return Err(ProtocolError::UnexpectedEof);
704            }
705            let max_length = src.get_u16_le() as u32;
706            Ok(TypeInfo {
707                max_length: Some(max_length),
708                ..Default::default()
709            })
710        }
711
712        // Unicode strings (NChar, NVarChar) - 2-byte length + collation
713        TypeId::NChar | TypeId::NVarChar => {
714            if src.remaining() < 7 {
715                // 2 (length) + 5 (collation)
716                return Err(ProtocolError::UnexpectedEof);
717            }
718            let max_length = src.get_u16_le() as u32;
719            let collation = decode_collation(src)?;
720            Ok(TypeInfo {
721                max_length: Some(max_length),
722                collation: Some(collation),
723                ..Default::default()
724            })
725        }
726
727        // Date type (no additional metadata)
728        TypeId::Date => Ok(TypeInfo::default()),
729
730        // Time, DateTime2, DateTimeOffset have scale
731        TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
732            if src.remaining() < 1 {
733                return Err(ProtocolError::UnexpectedEof);
734            }
735            let scale = src.get_u8();
736            Ok(TypeInfo {
737                scale: Some(scale),
738                ..Default::default()
739            })
740        }
741
742        // Text/NText/Image (deprecated LOB types)
743        TypeId::Text | TypeId::NText | TypeId::Image => {
744            // These have complex metadata: length (4) + collation (5) + table name parts
745            if src.remaining() < 4 {
746                return Err(ProtocolError::UnexpectedEof);
747            }
748            let max_length = src.get_u32_le();
749
750            // For Text/NText, read collation
751            let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
752                if src.remaining() < 5 {
753                    return Err(ProtocolError::UnexpectedEof);
754                }
755                Some(decode_collation(src)?)
756            } else {
757                None
758            };
759
760            // Skip table name parts (variable length)
761            // Format: numParts (1 byte) followed by us_varchar for each part
762            if src.remaining() < 1 {
763                return Err(ProtocolError::UnexpectedEof);
764            }
765            let num_parts = src.get_u8();
766            for _ in 0..num_parts {
767                // Read and discard table name part
768                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
769            }
770
771            Ok(TypeInfo {
772                max_length: Some(max_length),
773                collation,
774                ..Default::default()
775            })
776        }
777
778        // XML type
779        TypeId::Xml => {
780            if src.remaining() < 1 {
781                return Err(ProtocolError::UnexpectedEof);
782            }
783            let schema_present = src.get_u8();
784
785            if schema_present != 0 {
786                // XML_INFO per MS-TDS §2.2.5.5.3: DBNAME and OWNING_SCHEMA are
787                // B_VARCHAR (1-byte length); only XML_SCHEMA_COLLECTION is
788                // US_VARCHAR (2-byte length).
789                let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
790                let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
791                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
792            }
793
794            Ok(TypeInfo::default())
795        }
796
797        // UDT (User-defined type) - complex metadata
798        TypeId::Udt => {
799            // Max length (2 bytes)
800            if src.remaining() < 2 {
801                return Err(ProtocolError::UnexpectedEof);
802            }
803            let max_length = src.get_u16_le() as u32;
804
805            // UDT_INFO per MS-TDS: DB_NAME, SCHEMA_NAME, and TYPE_NAME are
806            // B_VARCHAR (1-byte length); only ASSEMBLY_QUALIFIED_NAME is
807            // US_VARCHAR (2-byte length).
808            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
809            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
810            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
811            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
812
813            Ok(TypeInfo {
814                max_length: Some(max_length),
815                ..Default::default()
816            })
817        }
818
819        // Table-valued parameter - complex metadata (skip for now)
820        TypeId::Tvp => {
821            // TVP has very complex metadata, not commonly used
822            // For now, we can't properly parse this
823            Err(ProtocolError::InvalidTokenType(col_type))
824        }
825
826        // SQL Variant - 4-byte length
827        TypeId::Variant => {
828            if src.remaining() < 4 {
829                return Err(ProtocolError::UnexpectedEof);
830            }
831            let max_length = src.get_u32_le();
832            Ok(TypeInfo {
833                max_length: Some(max_length),
834                ..Default::default()
835            })
836        }
837    }
838}
839
840impl ColMetaData {
841    /// Special value indicating no metadata.
842    pub const NO_METADATA: u16 = 0xFFFF;
843
844    /// Decode a COLMETADATA token from bytes.
845    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
846        if src.remaining() < 2 {
847            return Err(ProtocolError::UnexpectedEof);
848        }
849
850        let column_count = src.get_u16_le();
851
852        // 0xFFFF means no metadata present
853        if column_count == Self::NO_METADATA {
854            return Ok(Self {
855                columns: Vec::new(),
856                cek_table: None,
857            });
858        }
859
860        let mut columns = Vec::with_capacity(column_count as usize);
861
862        for _ in 0..column_count {
863            let column = Self::decode_column(src)?;
864            columns.push(column);
865        }
866
867        Ok(Self {
868            columns,
869            cek_table: None,
870        })
871    }
872
873    /// Decode a single column from the metadata.
874    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
875        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
876        if src.remaining() < 7 {
877            return Err(ProtocolError::UnexpectedEof);
878        }
879
880        let user_type = src.get_u32_le();
881        let flags = src.get_u16_le();
882        let col_type = src.get_u8();
883
884        // An unknown type byte must be a hard error: treating it as Null (a
885        // zero-length column) misaligns every subsequent column and row,
886        // producing plausible garbage values (issue #157).
887        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
888
889        // Parse type-specific metadata
890        let type_info = decode_type_info(src, type_id, col_type)?;
891
892        // Read column name (B_VARCHAR format - 1 byte length in characters)
893        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
894
895        Ok(ColumnData {
896            name,
897            type_id,
898            col_type,
899            flags,
900            user_type,
901            type_info,
902            crypto_metadata: None,
903        })
904    }
905
906    /// Decode a COLMETADATA token with Always Encrypted support.
907    ///
908    /// When column encryption was negotiated in Login7, the server sends a CekTable
909    /// before column definitions and per-column CryptoMetadata for encrypted columns.
910    ///
911    /// # Wire Format (with encryption)
912    ///
913    /// ```text
914    /// column_count: USHORT
915    /// cek_table: CekTable (always present when encryption negotiated)
916    /// columns: ColumnData[column_count] (with CryptoMetadata for encrypted columns)
917    /// ```
918    pub fn decode_encrypted(src: &mut impl Buf) -> Result<Self, ProtocolError> {
919        if src.remaining() < 2 {
920            return Err(ProtocolError::UnexpectedEof);
921        }
922
923        let column_count = src.get_u16_le();
924
925        if column_count == Self::NO_METADATA {
926            return Ok(Self {
927                columns: Vec::new(),
928                cek_table: None,
929            });
930        }
931
932        // Parse CEK table (always present when encryption was negotiated)
933        let cek_table = crate::crypto::CekTable::decode(src)?;
934
935        let mut columns = Vec::with_capacity(column_count as usize);
936
937        for _ in 0..column_count {
938            let column = Self::decode_column_encrypted(src)?;
939            columns.push(column);
940        }
941
942        Ok(Self {
943            columns,
944            cek_table: Some(cek_table),
945        })
946    }
947
948    /// Decode a single column definition with Always Encrypted support.
949    ///
950    /// For encrypted columns (flags & 0x0800), parses CryptoMetadata after the type info.
951    fn decode_column_encrypted(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
952        if src.remaining() < 7 {
953            return Err(ProtocolError::UnexpectedEof);
954        }
955
956        let user_type = src.get_u32_le();
957        let flags = src.get_u16_le();
958        let col_type = src.get_u8();
959
960        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
961
962        // Parse type-specific metadata (for encrypted columns, this is the transport type)
963        let type_info = decode_type_info(src, type_id, col_type)?;
964
965        // Parse CryptoMetadata if the column is encrypted
966        let crypto_metadata = if crate::crypto::is_column_encrypted(flags) {
967            Some(crate::crypto::CryptoMetadata::decode(src)?)
968        } else {
969            None
970        };
971
972        // Read column name
973        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
974
975        Ok(ColumnData {
976            name,
977            type_id,
978            col_type,
979            flags,
980            user_type,
981            type_info,
982            crypto_metadata,
983        })
984    }
985
986    /// Get the number of columns.
987    #[must_use]
988    pub fn column_count(&self) -> usize {
989        self.columns.len()
990    }
991
992    /// Check if this represents no metadata.
993    #[must_use]
994    pub fn is_empty(&self) -> bool {
995        self.columns.is_empty()
996    }
997}
998
999impl ColumnData {
1000    /// Check if this column is nullable.
1001    #[must_use]
1002    pub fn is_nullable(&self) -> bool {
1003        (self.flags & 0x0001) != 0
1004    }
1005
1006    /// Get the fixed size in bytes for this column, if applicable.
1007    ///
1008    /// Returns `None` for variable-length types.
1009    #[must_use]
1010    pub fn fixed_size(&self) -> Option<usize> {
1011        match self.type_id {
1012            TypeId::Null => Some(0),
1013            TypeId::Int1 | TypeId::Bit => Some(1),
1014            TypeId::Int2 => Some(2),
1015            TypeId::Int4 => Some(4),
1016            TypeId::Int8 => Some(8),
1017            TypeId::Float4 => Some(4),
1018            TypeId::Float8 => Some(8),
1019            TypeId::Money => Some(8),
1020            TypeId::Money4 => Some(4),
1021            TypeId::DateTime => Some(8),
1022            TypeId::DateTime4 => Some(4),
1023            TypeId::Date => Some(3),
1024            _ => None,
1025        }
1026    }
1027}
1028
1029// =============================================================================
1030// Row Parsing Implementation
1031// =============================================================================
1032
1033impl RawRow {
1034    /// Decode a ROW token from bytes.
1035    ///
1036    /// This function requires the column metadata to know how to parse the row.
1037    /// The row data is stored as raw bytes for later parsing.
1038    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1039        let mut data = bytes::BytesMut::new();
1040
1041        for col in &metadata.columns {
1042            Self::decode_column_value(src, col, &mut data)?;
1043        }
1044
1045        Ok(Self {
1046            data: data.freeze(),
1047        })
1048    }
1049
1050    /// Decode a single column value and append to the output buffer.
1051    fn decode_column_value(
1052        src: &mut impl Buf,
1053        col: &ColumnData,
1054        dst: &mut bytes::BytesMut,
1055    ) -> Result<(), ProtocolError> {
1056        match col.type_id {
1057            // Fixed-length types
1058            TypeId::Null => {
1059                // No data
1060            }
1061            TypeId::Int1 | TypeId::Bit => {
1062                if src.remaining() < 1 {
1063                    return Err(ProtocolError::UnexpectedEof);
1064                }
1065                dst.extend_from_slice(&[src.get_u8()]);
1066            }
1067            TypeId::Int2 => {
1068                if src.remaining() < 2 {
1069                    return Err(ProtocolError::UnexpectedEof);
1070                }
1071                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
1072            }
1073            TypeId::Int4 => {
1074                if src.remaining() < 4 {
1075                    return Err(ProtocolError::UnexpectedEof);
1076                }
1077                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1078            }
1079            TypeId::Int8 => {
1080                if src.remaining() < 8 {
1081                    return Err(ProtocolError::UnexpectedEof);
1082                }
1083                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1084            }
1085            TypeId::Float4 => {
1086                if src.remaining() < 4 {
1087                    return Err(ProtocolError::UnexpectedEof);
1088                }
1089                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1090            }
1091            TypeId::Float8 => {
1092                if src.remaining() < 8 {
1093                    return Err(ProtocolError::UnexpectedEof);
1094                }
1095                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1096            }
1097            TypeId::Money => {
1098                if src.remaining() < 8 {
1099                    return Err(ProtocolError::UnexpectedEof);
1100                }
1101                let hi = src.get_u32_le();
1102                let lo = src.get_u32_le();
1103                dst.extend_from_slice(&hi.to_le_bytes());
1104                dst.extend_from_slice(&lo.to_le_bytes());
1105            }
1106            TypeId::Money4 => {
1107                if src.remaining() < 4 {
1108                    return Err(ProtocolError::UnexpectedEof);
1109                }
1110                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1111            }
1112            TypeId::DateTime => {
1113                if src.remaining() < 8 {
1114                    return Err(ProtocolError::UnexpectedEof);
1115                }
1116                let days = src.get_u32_le();
1117                let time = src.get_u32_le();
1118                dst.extend_from_slice(&days.to_le_bytes());
1119                dst.extend_from_slice(&time.to_le_bytes());
1120            }
1121            TypeId::DateTime4 => {
1122                if src.remaining() < 4 {
1123                    return Err(ProtocolError::UnexpectedEof);
1124                }
1125                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1126            }
1127            // DATE type uses 1-byte length prefix (can be NULL)
1128            TypeId::Date => {
1129                Self::decode_bytelen_type(src, dst)?;
1130            }
1131
1132            // Variable-length nullable types (length-prefixed)
1133            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
1134                Self::decode_bytelen_type(src, dst)?;
1135            }
1136
1137            TypeId::Guid => {
1138                Self::decode_bytelen_type(src, dst)?;
1139            }
1140
1141            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1142                Self::decode_bytelen_type(src, dst)?;
1143            }
1144
1145            // Old-style byte-length strings
1146            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
1147                Self::decode_bytelen_type(src, dst)?;
1148            }
1149
1150            // 2-byte length strings (or PLP for MAX types)
1151            TypeId::BigVarChar | TypeId::BigVarBinary => {
1152                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
1153                if col.type_info.max_length == Some(0xFFFF) {
1154                    Self::decode_plp_type(src, dst)?;
1155                } else {
1156                    Self::decode_ushortlen_type(src, dst)?;
1157                }
1158            }
1159
1160            // Fixed-length types that don't have MAX variants
1161            TypeId::BigChar | TypeId::BigBinary => {
1162                Self::decode_ushortlen_type(src, dst)?;
1163            }
1164
1165            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
1166            TypeId::NVarChar => {
1167                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
1168                if col.type_info.max_length == Some(0xFFFF) {
1169                    Self::decode_plp_type(src, dst)?;
1170                } else {
1171                    Self::decode_ushortlen_type(src, dst)?;
1172                }
1173            }
1174
1175            // Fixed-length NCHAR doesn't have MAX variant
1176            TypeId::NChar => {
1177                Self::decode_ushortlen_type(src, dst)?;
1178            }
1179
1180            // Time types with scale
1181            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
1182                Self::decode_bytelen_type(src, dst)?;
1183            }
1184
1185            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
1186            TypeId::Text | TypeId::NText | TypeId::Image => {
1187                Self::decode_textptr_type(src, dst)?;
1188            }
1189
1190            // XML - uses actual PLP format
1191            TypeId::Xml => {
1192                Self::decode_plp_type(src, dst)?;
1193            }
1194
1195            // Complex types
1196            TypeId::Variant => {
1197                Self::decode_intlen_type(src, dst)?;
1198            }
1199
1200            TypeId::Udt => {
1201                // UDT uses PLP encoding
1202                Self::decode_plp_type(src, dst)?;
1203            }
1204
1205            TypeId::Tvp => {
1206                // TVP not supported in row data
1207                return Err(ProtocolError::InvalidTokenType(col.col_type));
1208            }
1209        }
1210
1211        Ok(())
1212    }
1213
1214    /// Decode a 1-byte length-prefixed value.
1215    fn decode_bytelen_type(
1216        src: &mut impl Buf,
1217        dst: &mut bytes::BytesMut,
1218    ) -> Result<(), ProtocolError> {
1219        if src.remaining() < 1 {
1220            return Err(ProtocolError::UnexpectedEof);
1221        }
1222        let len = src.get_u8() as usize;
1223        if len == 0xFF {
1224            // NULL value - store as zero-length with NULL marker
1225            dst.extend_from_slice(&[0xFF]);
1226        } else if len == 0 {
1227            // Empty value
1228            dst.extend_from_slice(&[0x00]);
1229        } else {
1230            if src.remaining() < len {
1231                return Err(ProtocolError::UnexpectedEof);
1232            }
1233            dst.extend_from_slice(&[len as u8]);
1234            for _ in 0..len {
1235                dst.extend_from_slice(&[src.get_u8()]);
1236            }
1237        }
1238        Ok(())
1239    }
1240
1241    /// Decode a 2-byte length-prefixed value.
1242    fn decode_ushortlen_type(
1243        src: &mut impl Buf,
1244        dst: &mut bytes::BytesMut,
1245    ) -> Result<(), ProtocolError> {
1246        if src.remaining() < 2 {
1247            return Err(ProtocolError::UnexpectedEof);
1248        }
1249        let len = src.get_u16_le() as usize;
1250        if len == 0xFFFF {
1251            // NULL value
1252            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1253        } else if len == 0 {
1254            // Empty value
1255            dst.extend_from_slice(&0u16.to_le_bytes());
1256        } else {
1257            if src.remaining() < len {
1258                return Err(ProtocolError::UnexpectedEof);
1259            }
1260            dst.extend_from_slice(&(len as u16).to_le_bytes());
1261            for _ in 0..len {
1262                dst.extend_from_slice(&[src.get_u8()]);
1263            }
1264        }
1265        Ok(())
1266    }
1267
1268    /// Decode a 4-byte length-prefixed value.
1269    fn decode_intlen_type(
1270        src: &mut impl Buf,
1271        dst: &mut bytes::BytesMut,
1272    ) -> Result<(), ProtocolError> {
1273        if src.remaining() < 4 {
1274            return Err(ProtocolError::UnexpectedEof);
1275        }
1276        let len = src.get_u32_le() as usize;
1277        if len == 0xFFFFFFFF {
1278            // NULL value
1279            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1280        } else if len == 0 {
1281            // Empty value
1282            dst.extend_from_slice(&0u32.to_le_bytes());
1283        } else {
1284            if src.remaining() < len {
1285                return Err(ProtocolError::UnexpectedEof);
1286            }
1287            dst.extend_from_slice(&(len as u32).to_le_bytes());
1288            for _ in 0..len {
1289                dst.extend_from_slice(&[src.get_u8()]);
1290            }
1291        }
1292        Ok(())
1293    }
1294
1295    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1296    ///
1297    /// These deprecated LOB types use a special format:
1298    /// - 1 byte: textptr_len (0 = NULL)
1299    /// - textptr_len bytes: textptr (if not NULL)
1300    /// - 8 bytes: timestamp (if not NULL)
1301    /// - 4 bytes: data length (if not NULL)
1302    /// - data_len bytes: the actual data (if not NULL)
1303    ///
1304    /// We convert this to PLP format for the client to parse:
1305    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1306    /// - 4 bytes: chunk length (= data length)
1307    /// - chunk data
1308    /// - 4 bytes: 0 (terminator)
1309    fn decode_textptr_type(
1310        src: &mut impl Buf,
1311        dst: &mut bytes::BytesMut,
1312    ) -> Result<(), ProtocolError> {
1313        if src.remaining() < 1 {
1314            return Err(ProtocolError::UnexpectedEof);
1315        }
1316
1317        let textptr_len = src.get_u8() as usize;
1318
1319        if textptr_len == 0 {
1320            // NULL value - write PLP NULL marker
1321            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1322            return Ok(());
1323        }
1324
1325        // Skip textptr bytes
1326        if src.remaining() < textptr_len {
1327            return Err(ProtocolError::UnexpectedEof);
1328        }
1329        src.advance(textptr_len);
1330
1331        // Skip 8-byte timestamp
1332        if src.remaining() < 8 {
1333            return Err(ProtocolError::UnexpectedEof);
1334        }
1335        src.advance(8);
1336
1337        // Read data length
1338        if src.remaining() < 4 {
1339            return Err(ProtocolError::UnexpectedEof);
1340        }
1341        let data_len = src.get_u32_le() as usize;
1342
1343        if src.remaining() < data_len {
1344            return Err(ProtocolError::UnexpectedEof);
1345        }
1346
1347        // Write in PLP format for client parsing:
1348        // - 8 bytes: total length
1349        // - 4 bytes: chunk length
1350        // - chunk data
1351        // - 4 bytes: 0 (terminator)
1352        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1353        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1354        for _ in 0..data_len {
1355            dst.extend_from_slice(&[src.get_u8()]);
1356        }
1357        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1358
1359        Ok(())
1360    }
1361
1362    /// Decode a PLP (Partially Length-Prefixed) value.
1363    ///
1364    /// PLP format:
1365    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1366    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1367    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1368        if src.remaining() < 8 {
1369            return Err(ProtocolError::UnexpectedEof);
1370        }
1371
1372        let total_len = src.get_u64_le();
1373
1374        // Store the total length marker
1375        dst.extend_from_slice(&total_len.to_le_bytes());
1376
1377        if total_len == 0xFFFFFFFFFFFFFFFF {
1378            // NULL value - no more data
1379            return Ok(());
1380        }
1381
1382        // Read chunks until terminator
1383        loop {
1384            if src.remaining() < 4 {
1385                return Err(ProtocolError::UnexpectedEof);
1386            }
1387            let chunk_len = src.get_u32_le() as usize;
1388            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1389
1390            if chunk_len == 0 {
1391                // End of PLP data
1392                break;
1393            }
1394
1395            if src.remaining() < chunk_len {
1396                return Err(ProtocolError::UnexpectedEof);
1397            }
1398
1399            for _ in 0..chunk_len {
1400                dst.extend_from_slice(&[src.get_u8()]);
1401            }
1402        }
1403
1404        Ok(())
1405    }
1406}
1407
1408// =============================================================================
1409// NbcRow Parsing Implementation
1410// =============================================================================
1411
1412impl NbcRow {
1413    /// Decode an NBCROW token from bytes.
1414    ///
1415    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1416    /// columns are NULL, followed by only the non-NULL values.
1417    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1418        let col_count = metadata.columns.len();
1419        let bitmap_len = col_count.div_ceil(8);
1420
1421        if src.remaining() < bitmap_len {
1422            return Err(ProtocolError::UnexpectedEof);
1423        }
1424
1425        // Read null bitmap
1426        let mut null_bitmap = vec![0u8; bitmap_len];
1427        for byte in &mut null_bitmap {
1428            *byte = src.get_u8();
1429        }
1430
1431        // Read non-null values
1432        let mut data = bytes::BytesMut::new();
1433
1434        for (i, col) in metadata.columns.iter().enumerate() {
1435            let byte_idx = i / 8;
1436            let bit_idx = i % 8;
1437            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1438
1439            if !is_null {
1440                // Read the value - for NBCROW, we read without the length prefix
1441                // for fixed-length types, and with length prefix for variable types
1442                RawRow::decode_column_value(src, col, &mut data)?;
1443            }
1444        }
1445
1446        Ok(Self {
1447            null_bitmap,
1448            data: data.freeze(),
1449        })
1450    }
1451
1452    /// Check if a column at the given index is NULL.
1453    #[must_use]
1454    pub fn is_null(&self, column_index: usize) -> bool {
1455        let byte_idx = column_index / 8;
1456        let bit_idx = column_index % 8;
1457        if byte_idx < self.null_bitmap.len() {
1458            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1459        } else {
1460            true // Out of bounds = NULL
1461        }
1462    }
1463}
1464
1465// =============================================================================
1466// ReturnValue Parsing Implementation
1467// =============================================================================
1468
1469impl ReturnValue {
1470    /// Decode a RETURNVALUE token from bytes.
1471    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1472        // MS-TDS §2.2.7.18: the RETURNVALUE token has no length prefix —
1473        // it begins directly with the 2-byte ParamOrdinal. The previous
1474        // spurious 2-byte read consumed the ordinal and shifted every
1475        // subsequent field, leaving the stream parser two bytes ahead and
1476        // reading value bytes as the next token type (e.g. `0x74` from a
1477        // Unicode name fragment was misread as an unknown token).
1478        if src.remaining() < 2 {
1479            return Err(ProtocolError::UnexpectedEof);
1480        }
1481        let param_ordinal = src.get_u16_le();
1482
1483        // Parameter name (B_VARCHAR)
1484        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1485
1486        // Status (1 byte)
1487        if src.remaining() < 1 {
1488            return Err(ProtocolError::UnexpectedEof);
1489        }
1490        let status = src.get_u8();
1491
1492        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1493        if src.remaining() < 7 {
1494            return Err(ProtocolError::UnexpectedEof);
1495        }
1496        let user_type = src.get_u32_le();
1497        let flags = src.get_u16_le();
1498        let col_type = src.get_u8();
1499
1500        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
1501
1502        // Parse type info
1503        let type_info = decode_type_info(src, type_id, col_type)?;
1504
1505        // Read the value data
1506        let mut value_buf = bytes::BytesMut::new();
1507
1508        // Create a temporary column for value parsing
1509        let temp_col = ColumnData {
1510            name: String::new(),
1511            type_id,
1512            col_type,
1513            flags,
1514            user_type,
1515            type_info: type_info.clone(),
1516            crypto_metadata: None,
1517        };
1518
1519        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1520
1521        Ok(Self {
1522            param_ordinal,
1523            param_name,
1524            status,
1525            user_type,
1526            flags,
1527            col_type,
1528            type_info,
1529            value: value_buf.freeze(),
1530        })
1531    }
1532}
1533
1534// =============================================================================
1535// SessionState Parsing Implementation
1536// =============================================================================
1537
1538impl SessionState {
1539    /// Decode a SESSIONSTATE token from bytes.
1540    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1541        if src.remaining() < 4 {
1542            return Err(ProtocolError::UnexpectedEof);
1543        }
1544
1545        let length = src.get_u32_le() as usize;
1546
1547        if src.remaining() < length {
1548            return Err(ProtocolError::IncompletePacket {
1549                expected: length,
1550                actual: src.remaining(),
1551            });
1552        }
1553
1554        let data = src.copy_to_bytes(length);
1555
1556        Ok(Self { data })
1557    }
1558}
1559
1560// =============================================================================
1561// Token Parsing Implementation
1562// =============================================================================
1563
1564/// Done token status flags bit positions.
1565mod done_status_bits {
1566    pub const DONE_MORE: u16 = 0x0001;
1567    pub const DONE_ERROR: u16 = 0x0002;
1568    pub const DONE_INXACT: u16 = 0x0004;
1569    pub const DONE_COUNT: u16 = 0x0010;
1570    pub const DONE_ATTN: u16 = 0x0020;
1571    pub const DONE_SRVERROR: u16 = 0x0100;
1572}
1573
1574impl DoneStatus {
1575    /// Parse done status from raw bits.
1576    #[must_use]
1577    pub fn from_bits(bits: u16) -> Self {
1578        use done_status_bits::*;
1579        Self {
1580            more: (bits & DONE_MORE) != 0,
1581            error: (bits & DONE_ERROR) != 0,
1582            in_xact: (bits & DONE_INXACT) != 0,
1583            count: (bits & DONE_COUNT) != 0,
1584            attn: (bits & DONE_ATTN) != 0,
1585            srverror: (bits & DONE_SRVERROR) != 0,
1586        }
1587    }
1588
1589    /// Convert to raw bits.
1590    #[must_use]
1591    pub fn to_bits(&self) -> u16 {
1592        use done_status_bits::*;
1593        let mut bits = 0u16;
1594        if self.more {
1595            bits |= DONE_MORE;
1596        }
1597        if self.error {
1598            bits |= DONE_ERROR;
1599        }
1600        if self.in_xact {
1601            bits |= DONE_INXACT;
1602        }
1603        if self.count {
1604            bits |= DONE_COUNT;
1605        }
1606        if self.attn {
1607            bits |= DONE_ATTN;
1608        }
1609        if self.srverror {
1610            bits |= DONE_SRVERROR;
1611        }
1612        bits
1613    }
1614}
1615
1616impl Done {
1617    /// Size of the DONE token in bytes (excluding token type byte).
1618    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1619
1620    /// Decode a DONE token from bytes.
1621    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1622        if src.remaining() < Self::SIZE {
1623            return Err(ProtocolError::IncompletePacket {
1624                expected: Self::SIZE,
1625                actual: src.remaining(),
1626            });
1627        }
1628
1629        let status = DoneStatus::from_bits(src.get_u16_le());
1630        let cur_cmd = src.get_u16_le();
1631        let row_count = src.get_u64_le();
1632
1633        Ok(Self {
1634            status,
1635            cur_cmd,
1636            row_count,
1637        })
1638    }
1639
1640    /// Encode the DONE token to bytes.
1641    pub fn encode(&self, dst: &mut impl BufMut) {
1642        dst.put_u8(TokenType::Done as u8);
1643        dst.put_u16_le(self.status.to_bits());
1644        dst.put_u16_le(self.cur_cmd);
1645        dst.put_u64_le(self.row_count);
1646    }
1647
1648    /// Check if more results follow this DONE token.
1649    #[must_use]
1650    pub const fn has_more(&self) -> bool {
1651        self.status.more
1652    }
1653
1654    /// Check if an error occurred.
1655    #[must_use]
1656    pub const fn has_error(&self) -> bool {
1657        self.status.error
1658    }
1659
1660    /// Check if the row count is valid.
1661    #[must_use]
1662    pub const fn has_count(&self) -> bool {
1663        self.status.count
1664    }
1665}
1666
1667impl DoneProc {
1668    /// Size of the DONEPROC token in bytes (excluding token type byte).
1669    pub const SIZE: usize = 12;
1670
1671    /// Decode a DONEPROC token from bytes.
1672    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1673        if src.remaining() < Self::SIZE {
1674            return Err(ProtocolError::IncompletePacket {
1675                expected: Self::SIZE,
1676                actual: src.remaining(),
1677            });
1678        }
1679
1680        let status = DoneStatus::from_bits(src.get_u16_le());
1681        let cur_cmd = src.get_u16_le();
1682        let row_count = src.get_u64_le();
1683
1684        Ok(Self {
1685            status,
1686            cur_cmd,
1687            row_count,
1688        })
1689    }
1690
1691    /// Encode the DONEPROC token to bytes.
1692    pub fn encode(&self, dst: &mut impl BufMut) {
1693        dst.put_u8(TokenType::DoneProc as u8);
1694        dst.put_u16_le(self.status.to_bits());
1695        dst.put_u16_le(self.cur_cmd);
1696        dst.put_u64_le(self.row_count);
1697    }
1698}
1699
1700impl DoneInProc {
1701    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1702    pub const SIZE: usize = 12;
1703
1704    /// Decode a DONEINPROC token from bytes.
1705    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1706        if src.remaining() < Self::SIZE {
1707            return Err(ProtocolError::IncompletePacket {
1708                expected: Self::SIZE,
1709                actual: src.remaining(),
1710            });
1711        }
1712
1713        let status = DoneStatus::from_bits(src.get_u16_le());
1714        let cur_cmd = src.get_u16_le();
1715        let row_count = src.get_u64_le();
1716
1717        Ok(Self {
1718            status,
1719            cur_cmd,
1720            row_count,
1721        })
1722    }
1723
1724    /// Encode the DONEINPROC token to bytes.
1725    pub fn encode(&self, dst: &mut impl BufMut) {
1726        dst.put_u8(TokenType::DoneInProc as u8);
1727        dst.put_u16_le(self.status.to_bits());
1728        dst.put_u16_le(self.cur_cmd);
1729        dst.put_u64_le(self.row_count);
1730    }
1731}
1732
1733impl ServerError {
1734    /// Decode an ERROR token from bytes.
1735    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1736        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1737        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1738        if src.remaining() < 2 {
1739            return Err(ProtocolError::UnexpectedEof);
1740        }
1741
1742        let _length = src.get_u16_le();
1743
1744        if src.remaining() < 6 {
1745            return Err(ProtocolError::UnexpectedEof);
1746        }
1747
1748        let number = src.get_i32_le();
1749        let state = src.get_u8();
1750        let class = src.get_u8();
1751
1752        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1753        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1754        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1755
1756        if src.remaining() < 4 {
1757            return Err(ProtocolError::UnexpectedEof);
1758        }
1759        let line = src.get_i32_le();
1760
1761        Ok(Self {
1762            number,
1763            state,
1764            class,
1765            message,
1766            server,
1767            procedure,
1768            line,
1769        })
1770    }
1771
1772    /// Check if this is a fatal error (severity >= 20).
1773    #[must_use]
1774    pub const fn is_fatal(&self) -> bool {
1775        self.class >= 20
1776    }
1777
1778    /// Check if this error indicates the batch was aborted (severity >= 16).
1779    #[must_use]
1780    pub const fn is_batch_abort(&self) -> bool {
1781        self.class >= 16
1782    }
1783}
1784
1785impl ServerInfo {
1786    /// Decode an INFO token from bytes.
1787    ///
1788    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1789    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1790        if src.remaining() < 2 {
1791            return Err(ProtocolError::UnexpectedEof);
1792        }
1793
1794        let _length = src.get_u16_le();
1795
1796        if src.remaining() < 6 {
1797            return Err(ProtocolError::UnexpectedEof);
1798        }
1799
1800        let number = src.get_i32_le();
1801        let state = src.get_u8();
1802        let class = src.get_u8();
1803
1804        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1805        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1806        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1807
1808        if src.remaining() < 4 {
1809            return Err(ProtocolError::UnexpectedEof);
1810        }
1811        let line = src.get_i32_le();
1812
1813        Ok(Self {
1814            number,
1815            state,
1816            class,
1817            message,
1818            server,
1819            procedure,
1820            line,
1821        })
1822    }
1823}
1824
1825impl LoginAck {
1826    /// Decode a LOGINACK token from bytes.
1827    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1828        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1829        if src.remaining() < 2 {
1830            return Err(ProtocolError::UnexpectedEof);
1831        }
1832
1833        let _length = src.get_u16_le();
1834
1835        if src.remaining() < 5 {
1836            return Err(ProtocolError::UnexpectedEof);
1837        }
1838
1839        let interface = src.get_u8();
1840        let tds_version = src.get_u32_le();
1841        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1842
1843        if src.remaining() < 4 {
1844            return Err(ProtocolError::UnexpectedEof);
1845        }
1846        let prog_version = src.get_u32_le();
1847
1848        Ok(Self {
1849            interface,
1850            tds_version,
1851            prog_name,
1852            prog_version,
1853        })
1854    }
1855
1856    /// Get the TDS version as a `TdsVersion`.
1857    #[must_use]
1858    pub fn tds_version(&self) -> crate::version::TdsVersion {
1859        crate::version::TdsVersion::new(self.tds_version)
1860    }
1861}
1862
1863impl EnvChangeType {
1864    /// Create from raw byte value.
1865    pub fn from_u8(value: u8) -> Option<Self> {
1866        match value {
1867            1 => Some(Self::Database),
1868            2 => Some(Self::Language),
1869            3 => Some(Self::CharacterSet),
1870            4 => Some(Self::PacketSize),
1871            5 => Some(Self::UnicodeSortingLocalId),
1872            6 => Some(Self::UnicodeComparisonFlags),
1873            7 => Some(Self::SqlCollation),
1874            8 => Some(Self::BeginTransaction),
1875            9 => Some(Self::CommitTransaction),
1876            10 => Some(Self::RollbackTransaction),
1877            11 => Some(Self::EnlistDtcTransaction),
1878            12 => Some(Self::DefectTransaction),
1879            13 => Some(Self::RealTimeLogShipping),
1880            15 => Some(Self::PromoteTransaction),
1881            16 => Some(Self::TransactionManagerAddress),
1882            17 => Some(Self::TransactionEnded),
1883            18 => Some(Self::ResetConnectionCompletionAck),
1884            19 => Some(Self::UserInstanceStarted),
1885            20 => Some(Self::Routing),
1886            _ => None,
1887        }
1888    }
1889}
1890
1891impl EnvChange {
1892    /// Decode an ENVCHANGE token from bytes.
1893    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1894        if src.remaining() < 3 {
1895            return Err(ProtocolError::UnexpectedEof);
1896        }
1897
1898        let length = src.get_u16_le() as usize;
1899        if src.remaining() < length {
1900            return Err(ProtocolError::IncompletePacket {
1901                expected: length,
1902                actual: src.remaining(),
1903            });
1904        }
1905        let remaining_before = src.remaining();
1906
1907        let env_type_byte = src.get_u8();
1908        let env_type = EnvChangeType::from_u8(env_type_byte)
1909            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
1910
1911        let (new_value, old_value) = match env_type {
1912            EnvChangeType::Routing => {
1913                // Routing has special format
1914                let new_value = Self::decode_routing_value(src)?;
1915                let old_value = EnvChangeValue::Binary(Bytes::new());
1916                (new_value, old_value)
1917            }
1918            EnvChangeType::BeginTransaction
1919            | EnvChangeType::CommitTransaction
1920            | EnvChangeType::RollbackTransaction
1921            | EnvChangeType::EnlistDtcTransaction
1922            | EnvChangeType::SqlCollation => {
1923                // These use binary format per MS-TDS spec:
1924                // - Transaction tokens: transaction descriptor (8 bytes)
1925                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
1926                // The declared ENVCHANGE `length` can be shorter than this
1927                // branch needs (e.g. covers only the type byte), so the
1928                // length-prefix reads must be bounds-checked individually:
1929                // `get_u8` on an empty buffer panics. Match the branch's
1930                // existing graceful style — a missing prefix means empty.
1931                let new_len = if src.has_remaining() {
1932                    src.get_u8() as usize
1933                } else {
1934                    0
1935                };
1936                let new_value = if new_len > 0 && src.remaining() >= new_len {
1937                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
1938                } else {
1939                    EnvChangeValue::Binary(Bytes::new())
1940                };
1941
1942                let old_len = if src.has_remaining() {
1943                    src.get_u8() as usize
1944                } else {
1945                    0
1946                };
1947                let old_value = if old_len > 0 && src.remaining() >= old_len {
1948                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
1949                } else {
1950                    EnvChangeValue::Binary(Bytes::new())
1951                };
1952
1953                (new_value, old_value)
1954            }
1955            _ => {
1956                // String format for most env changes
1957                let new_value = read_b_varchar(src)
1958                    .map(EnvChangeValue::String)
1959                    .unwrap_or(EnvChangeValue::String(String::new()));
1960
1961                let old_value = read_b_varchar(src)
1962                    .map(EnvChangeValue::String)
1963                    .unwrap_or(EnvChangeValue::String(String::new()));
1964
1965                (new_value, old_value)
1966            }
1967        };
1968
1969        // The declared `length` frames the whole ENVCHANGE data. Value
1970        // decoders may legitimately consume less than that — e.g. the
1971        // Routing decoder reads only the NewValue, leaving the spec-mandated
1972        // zero-length OldValue (two bytes) behind. Skip to the frame
1973        // boundary so the leftover bytes are not misparsed as the next
1974        // token. (Decoders that tolerantly read *past* an under-declared
1975        // length are left as-is — see the transaction/collation branch.)
1976        let consumed = remaining_before - src.remaining();
1977        if consumed < length {
1978            src.advance(length - consumed);
1979        }
1980
1981        Ok(Self {
1982            env_type,
1983            new_value,
1984            old_value,
1985        })
1986    }
1987
1988    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
1989        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
1990        if src.remaining() < 2 {
1991            return Err(ProtocolError::UnexpectedEof);
1992        }
1993
1994        let _routing_len = src.get_u16_le();
1995
1996        if src.remaining() < 5 {
1997            return Err(ProtocolError::UnexpectedEof);
1998        }
1999
2000        let _protocol = src.get_u8();
2001        let port = src.get_u16_le();
2002        let server_len = src.get_u16_le() as usize;
2003
2004        // Read UTF-16LE server name
2005        if src.remaining() < server_len * 2 {
2006            return Err(ProtocolError::UnexpectedEof);
2007        }
2008
2009        let mut chars = Vec::with_capacity(server_len);
2010        for _ in 0..server_len {
2011            chars.push(src.get_u16_le());
2012        }
2013
2014        let host = String::from_utf16(&chars).map_err(|_| {
2015            ProtocolError::StringEncoding(
2016                #[cfg(feature = "std")]
2017                "invalid UTF-16 in routing hostname".to_string(),
2018                #[cfg(not(feature = "std"))]
2019                "invalid UTF-16 in routing hostname",
2020            )
2021        })?;
2022
2023        Ok(EnvChangeValue::Routing { host, port })
2024    }
2025
2026    /// Check if this is a routing redirect.
2027    #[must_use]
2028    pub fn is_routing(&self) -> bool {
2029        self.env_type == EnvChangeType::Routing
2030    }
2031
2032    /// Get routing information if this is a routing change.
2033    #[must_use]
2034    pub fn routing_info(&self) -> Option<(&str, u16)> {
2035        if let EnvChangeValue::Routing { host, port } = &self.new_value {
2036            Some((host, *port))
2037        } else {
2038            None
2039        }
2040    }
2041
2042    /// Get the new database name if this is a database change.
2043    #[must_use]
2044    pub fn new_database(&self) -> Option<&str> {
2045        if self.env_type == EnvChangeType::Database {
2046            if let EnvChangeValue::String(s) = &self.new_value {
2047                return Some(s);
2048            }
2049        }
2050        None
2051    }
2052}
2053
2054impl Order {
2055    /// Decode an ORDER token from bytes.
2056    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2057        if src.remaining() < 2 {
2058            return Err(ProtocolError::UnexpectedEof);
2059        }
2060
2061        let length = src.get_u16_le() as usize;
2062        let column_count = length / 2;
2063
2064        if src.remaining() < length {
2065            return Err(ProtocolError::IncompletePacket {
2066                expected: length,
2067                actual: src.remaining(),
2068            });
2069        }
2070
2071        let mut columns = Vec::with_capacity(column_count);
2072        for _ in 0..column_count {
2073            columns.push(src.get_u16_le());
2074        }
2075
2076        Ok(Self { columns })
2077    }
2078}
2079
2080impl FeatureExtAck {
2081    /// Feature terminator byte.
2082    pub const TERMINATOR: u8 = 0xFF;
2083
2084    /// Decode a FEATUREEXTACK token from bytes.
2085    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2086        let mut features = Vec::new();
2087
2088        loop {
2089            if !src.has_remaining() {
2090                return Err(ProtocolError::UnexpectedEof);
2091            }
2092
2093            let feature_id = src.get_u8();
2094            if feature_id == Self::TERMINATOR {
2095                break;
2096            }
2097
2098            if src.remaining() < 4 {
2099                return Err(ProtocolError::UnexpectedEof);
2100            }
2101
2102            let data_len = src.get_u32_le() as usize;
2103
2104            if src.remaining() < data_len {
2105                return Err(ProtocolError::IncompletePacket {
2106                    expected: data_len,
2107                    actual: src.remaining(),
2108                });
2109            }
2110
2111            let data = src.copy_to_bytes(data_len);
2112            features.push(FeatureAck { feature_id, data });
2113        }
2114
2115        Ok(Self { features })
2116    }
2117}
2118
2119impl SspiToken {
2120    /// Decode an SSPI token from bytes.
2121    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2122        if src.remaining() < 2 {
2123            return Err(ProtocolError::UnexpectedEof);
2124        }
2125
2126        let length = src.get_u16_le() as usize;
2127
2128        if src.remaining() < length {
2129            return Err(ProtocolError::IncompletePacket {
2130                expected: length,
2131                actual: src.remaining(),
2132            });
2133        }
2134
2135        let data = src.copy_to_bytes(length);
2136        Ok(Self { data })
2137    }
2138}
2139
2140impl FedAuthInfo {
2141    /// Decode a FEDAUTHINFO token from bytes.
2142    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2143        if src.remaining() < 4 {
2144            return Err(ProtocolError::UnexpectedEof);
2145        }
2146
2147        let _length = src.get_u32_le();
2148
2149        if src.remaining() < 5 {
2150            return Err(ProtocolError::UnexpectedEof);
2151        }
2152
2153        let _count = src.get_u8();
2154
2155        // Read option data
2156        let mut sts_url = String::new();
2157        let mut spn = String::new();
2158
2159        // Parse info options until we have both
2160        while src.has_remaining() {
2161            if src.remaining() < 9 {
2162                break;
2163            }
2164
2165            let info_id = src.get_u8();
2166            let info_len = src.get_u32_le() as usize;
2167            let _info_offset = src.get_u32_le();
2168
2169            if src.remaining() < info_len {
2170                break;
2171            }
2172
2173            // Read UTF-16LE string
2174            let char_count = info_len / 2;
2175            let mut chars = Vec::with_capacity(char_count);
2176            for _ in 0..char_count {
2177                chars.push(src.get_u16_le());
2178            }
2179
2180            if let Ok(value) = String::from_utf16(&chars) {
2181                match info_id {
2182                    0x01 => spn = value,
2183                    0x02 => sts_url = value,
2184                    _ => {}
2185                }
2186            }
2187        }
2188
2189        Ok(Self { sts_url, spn })
2190    }
2191}
2192
2193// =============================================================================
2194// Token Parser
2195// =============================================================================
2196
2197/// Token stream parser.
2198///
2199/// Parses a stream of TDS tokens from a byte buffer.
2200///
2201/// # Basic vs Context-Aware Parsing
2202///
2203/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
2204/// Use [`next_token()`](TokenParser::next_token) for these.
2205///
2206/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
2207/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
2208/// for these.
2209///
2210/// # Example
2211///
2212/// ```rust,ignore
2213/// let mut parser = TokenParser::new(data);
2214/// let mut metadata = None;
2215///
2216/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
2217///     match token {
2218///         Token::ColMetaData(meta) => {
2219///             metadata = Some(meta);
2220///         }
2221///         Token::Row(row) => {
2222///             // Process row using metadata
2223///         }
2224///         Token::Done(done) => {
2225///             if !done.has_more() {
2226///                 break;
2227///             }
2228///         }
2229///         _ => {}
2230///     }
2231/// }
2232/// ```
2233pub struct TokenParser {
2234    data: Bytes,
2235    position: usize,
2236    /// Whether Always Encrypted was negotiated for this connection.
2237    /// When true, ColMetaData tokens are parsed with CekTable and per-column CryptoMetadata.
2238    encryption_enabled: bool,
2239}
2240
2241impl TokenParser {
2242    /// Create a new token parser from bytes.
2243    #[must_use]
2244    pub fn new(data: Bytes) -> Self {
2245        Self {
2246            data,
2247            position: 0,
2248            encryption_enabled: false,
2249        }
2250    }
2251
2252    /// Enable Always Encrypted metadata parsing.
2253    ///
2254    /// When enabled, ColMetaData tokens are parsed using the encrypted format
2255    /// which includes a CekTable and per-column CryptoMetadata.
2256    #[must_use]
2257    pub fn with_encryption(mut self, enabled: bool) -> Self {
2258        self.encryption_enabled = enabled;
2259        self
2260    }
2261
2262    /// Get remaining bytes in the buffer.
2263    #[must_use]
2264    pub fn remaining(&self) -> usize {
2265        self.data.len().saturating_sub(self.position)
2266    }
2267
2268    /// Check if there are more bytes to parse.
2269    #[must_use]
2270    pub fn has_remaining(&self) -> bool {
2271        self.position < self.data.len()
2272    }
2273
2274    /// Peek at the next token type without consuming it.
2275    #[must_use]
2276    pub fn peek_token_type(&self) -> Option<TokenType> {
2277        if self.position < self.data.len() {
2278            TokenType::from_u8(self.data[self.position])
2279        } else {
2280            None
2281        }
2282    }
2283
2284    /// Parse the next token from the stream.
2285    ///
2286    /// This method can only parse context-independent tokens. For tokens that
2287    /// require column metadata (ColMetaData, Row, NbcRow), use
2288    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2289    ///
2290    /// Returns `None` if no more tokens are available.
2291    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2292        self.next_token_with_metadata(None)
2293    }
2294
2295    /// Parse the next token with optional column metadata context.
2296    ///
2297    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2298    /// Without metadata, those tokens will return an error.
2299    ///
2300    /// Returns `None` if no more tokens are available.
2301    pub fn next_token_with_metadata(
2302        &mut self,
2303        metadata: Option<&ColMetaData>,
2304    ) -> Result<Option<Token>, ProtocolError> {
2305        if !self.has_remaining() {
2306            return Ok(None);
2307        }
2308
2309        let mut buf = &self.data[self.position..];
2310        let start_pos = self.position;
2311
2312        let token_type_byte = buf.get_u8();
2313        let token_type = TokenType::from_u8(token_type_byte);
2314
2315        let token = match token_type {
2316            Some(TokenType::Done) => {
2317                let done = Done::decode(&mut buf)?;
2318                Token::Done(done)
2319            }
2320            Some(TokenType::DoneProc) => {
2321                let done = DoneProc::decode(&mut buf)?;
2322                Token::DoneProc(done)
2323            }
2324            Some(TokenType::DoneInProc) => {
2325                let done = DoneInProc::decode(&mut buf)?;
2326                Token::DoneInProc(done)
2327            }
2328            Some(TokenType::Error) => {
2329                let error = ServerError::decode(&mut buf)?;
2330                Token::Error(error)
2331            }
2332            Some(TokenType::Info) => {
2333                let info = ServerInfo::decode(&mut buf)?;
2334                Token::Info(info)
2335            }
2336            Some(TokenType::LoginAck) => {
2337                let login_ack = LoginAck::decode(&mut buf)?;
2338                Token::LoginAck(login_ack)
2339            }
2340            Some(TokenType::EnvChange) => {
2341                let env_change = EnvChange::decode(&mut buf)?;
2342                Token::EnvChange(env_change)
2343            }
2344            Some(TokenType::Order) => {
2345                let order = Order::decode(&mut buf)?;
2346                Token::Order(order)
2347            }
2348            Some(TokenType::FeatureExtAck) => {
2349                let ack = FeatureExtAck::decode(&mut buf)?;
2350                Token::FeatureExtAck(ack)
2351            }
2352            Some(TokenType::Sspi) => {
2353                let sspi = SspiToken::decode(&mut buf)?;
2354                Token::Sspi(sspi)
2355            }
2356            Some(TokenType::FedAuthInfo) => {
2357                let info = FedAuthInfo::decode(&mut buf)?;
2358                Token::FedAuthInfo(info)
2359            }
2360            Some(TokenType::ReturnStatus) => {
2361                if buf.remaining() < 4 {
2362                    return Err(ProtocolError::UnexpectedEof);
2363                }
2364                let status = buf.get_i32_le();
2365                Token::ReturnStatus(status)
2366            }
2367            Some(TokenType::ColMetaData) => {
2368                let col_meta = if self.encryption_enabled {
2369                    ColMetaData::decode_encrypted(&mut buf)?
2370                } else {
2371                    ColMetaData::decode(&mut buf)?
2372                };
2373                Token::ColMetaData(col_meta)
2374            }
2375            Some(TokenType::Row) => {
2376                let meta = metadata.ok_or_else(|| {
2377                    ProtocolError::StringEncoding(
2378                        #[cfg(feature = "std")]
2379                        "Row token requires column metadata".to_string(),
2380                        #[cfg(not(feature = "std"))]
2381                        "Row token requires column metadata",
2382                    )
2383                })?;
2384                let row = RawRow::decode(&mut buf, meta)?;
2385                Token::Row(row)
2386            }
2387            Some(TokenType::NbcRow) => {
2388                let meta = metadata.ok_or_else(|| {
2389                    ProtocolError::StringEncoding(
2390                        #[cfg(feature = "std")]
2391                        "NbcRow token requires column metadata".to_string(),
2392                        #[cfg(not(feature = "std"))]
2393                        "NbcRow token requires column metadata",
2394                    )
2395                })?;
2396                let row = NbcRow::decode(&mut buf, meta)?;
2397                Token::NbcRow(row)
2398            }
2399            Some(TokenType::ReturnValue) => {
2400                let ret_val = ReturnValue::decode(&mut buf)?;
2401                Token::ReturnValue(ret_val)
2402            }
2403            Some(TokenType::SessionState) => {
2404                let session = SessionState::decode(&mut buf)?;
2405                Token::SessionState(session)
2406            }
2407            Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2408                // These tokens are rarely used and have complex formats.
2409                // Skip them by reading the length and advancing.
2410                if buf.remaining() < 2 {
2411                    return Err(ProtocolError::UnexpectedEof);
2412                }
2413                let length = buf.get_u16_le() as usize;
2414                if buf.remaining() < length {
2415                    return Err(ProtocolError::IncompletePacket {
2416                        expected: length,
2417                        actual: buf.remaining(),
2418                    });
2419                }
2420                // Skip the data
2421                buf.advance(length);
2422                // Recursively get the next token
2423                self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2424                return self.next_token_with_metadata(metadata);
2425            }
2426            None => {
2427                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2428            }
2429        };
2430
2431        // Update position based on how much was consumed
2432        let consumed = self.data.len() - start_pos - buf.remaining();
2433        self.position = start_pos + consumed;
2434
2435        Ok(Some(token))
2436    }
2437
2438    /// Skip the current token without fully parsing it.
2439    ///
2440    /// This is useful for skipping unknown or uninteresting tokens.
2441    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2442        if !self.has_remaining() {
2443            return Ok(());
2444        }
2445
2446        let token_type_byte = self.data[self.position];
2447        let token_type = TokenType::from_u8(token_type_byte);
2448
2449        // Calculate how many bytes to skip based on token type
2450        let skip_amount = match token_type {
2451            // Fixed-size tokens
2452            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2453                1 + Done::SIZE // token type + 12 bytes
2454            }
2455            Some(TokenType::ReturnStatus) => {
2456                1 + 4 // token type + 4 bytes
2457            }
2458            // Variable-length tokens with 2-byte length prefix
2459            Some(TokenType::Error)
2460            | Some(TokenType::Info)
2461            | Some(TokenType::LoginAck)
2462            | Some(TokenType::EnvChange)
2463            | Some(TokenType::Order)
2464            | Some(TokenType::Sspi)
2465            | Some(TokenType::ColInfo)
2466            | Some(TokenType::TabName)
2467            | Some(TokenType::Offset)
2468            | Some(TokenType::ReturnValue) => {
2469                if self.remaining() < 3 {
2470                    return Err(ProtocolError::UnexpectedEof);
2471                }
2472                let length = u16::from_le_bytes([
2473                    self.data[self.position + 1],
2474                    self.data[self.position + 2],
2475                ]) as usize;
2476                1 + 2 + length // token type + length prefix + data
2477            }
2478            // Tokens with 4-byte length prefix
2479            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2480                if self.remaining() < 5 {
2481                    return Err(ProtocolError::UnexpectedEof);
2482                }
2483                let length = u32::from_le_bytes([
2484                    self.data[self.position + 1],
2485                    self.data[self.position + 2],
2486                    self.data[self.position + 3],
2487                    self.data[self.position + 4],
2488                ]) as usize;
2489                1 + 4 + length
2490            }
2491            // FeatureExtAck has no length prefix - must parse
2492            Some(TokenType::FeatureExtAck) => {
2493                // Parse to find end
2494                let mut buf = &self.data[self.position + 1..];
2495                let _ = FeatureExtAck::decode(&mut buf)?;
2496                self.data.len() - self.position - buf.remaining()
2497            }
2498            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2499            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2500                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2501            }
2502            None => {
2503                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2504            }
2505        };
2506
2507        if self.remaining() < skip_amount {
2508            return Err(ProtocolError::UnexpectedEof);
2509        }
2510
2511        self.position += skip_amount;
2512        Ok(())
2513    }
2514
2515    /// Get the current position in the buffer.
2516    #[must_use]
2517    pub fn position(&self) -> usize {
2518        self.position
2519    }
2520
2521    /// Reset the parser to the beginning.
2522    pub fn reset(&mut self) {
2523        self.position = 0;
2524    }
2525}
2526
2527// =============================================================================
2528// Tests
2529// =============================================================================
2530
2531#[cfg(test)]
2532#[allow(clippy::unwrap_used, clippy::panic)]
2533mod tests {
2534    use super::*;
2535    use bytes::BytesMut;
2536
2537    #[test]
2538    fn test_done_roundtrip() {
2539        let done = Done {
2540            status: DoneStatus {
2541                more: false,
2542                error: false,
2543                in_xact: false,
2544                count: true,
2545                attn: false,
2546                srverror: false,
2547            },
2548            cur_cmd: 193, // SELECT
2549            row_count: 42,
2550        };
2551
2552        let mut buf = BytesMut::new();
2553        done.encode(&mut buf);
2554
2555        // Skip the token type byte
2556        let mut cursor = &buf[1..];
2557        let decoded = Done::decode(&mut cursor).unwrap();
2558
2559        assert_eq!(decoded.status.count, done.status.count);
2560        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2561        assert_eq!(decoded.row_count, done.row_count);
2562    }
2563
2564    #[test]
2565    fn test_done_status_bits() {
2566        let status = DoneStatus {
2567            more: true,
2568            error: true,
2569            in_xact: true,
2570            count: true,
2571            attn: false,
2572            srverror: false,
2573        };
2574
2575        let bits = status.to_bits();
2576        let restored = DoneStatus::from_bits(bits);
2577
2578        assert_eq!(status.more, restored.more);
2579        assert_eq!(status.error, restored.error);
2580        assert_eq!(status.in_xact, restored.in_xact);
2581        assert_eq!(status.count, restored.count);
2582    }
2583
2584    #[test]
2585    fn test_token_parser_done() {
2586        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2587        let data = Bytes::from_static(&[
2588            0xFD, // DONE token type
2589            0x10, 0x00, // status: DONE_COUNT
2590            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2591            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2592        ]);
2593
2594        let mut parser = TokenParser::new(data);
2595        let token = parser.next_token().unwrap().unwrap();
2596
2597        match token {
2598            Token::Done(done) => {
2599                assert!(done.status.count);
2600                assert!(!done.status.more);
2601                assert_eq!(done.cur_cmd, 193);
2602                assert_eq!(done.row_count, 5);
2603            }
2604            _ => panic!("Expected Done token"),
2605        }
2606
2607        // No more tokens
2608        assert!(parser.next_token().unwrap().is_none());
2609    }
2610
2611    #[test]
2612    fn test_env_change_type_from_u8() {
2613        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2614        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2615        assert_eq!(EnvChangeType::from_u8(100), None);
2616    }
2617
2618    /// A spec-faithful Routing ENVCHANGE (MS-TDS 2.2.7.9) carries a
2619    /// zero-length OldValue (two bytes) after the routing data. The decoder
2620    /// reads only the NewValue, so it must skip to the declared frame
2621    /// boundary — otherwise the leftover `00 00` is misparsed as the next
2622    /// token type and the rest of the login response is garbage. Azure SQL
2623    /// Gateway redirects send exactly this shape.
2624    #[test]
2625    fn test_env_change_routing_consumes_declared_length() {
2626        let host = "redirect.example";
2627        let host_utf16: Vec<u16> = host.encode_utf16().collect();
2628
2629        let mut data = BytesMut::new();
2630        // RoutingDataValue: length + protocol + port + server_len + server
2631        let routing_len = 1 + 2 + 2 + host_utf16.len() * 2;
2632        // ENVCHANGE length: type byte + routing length prefix + routing data
2633        // + zero-length OldValue
2634        let env_len = 1 + 2 + routing_len + 2;
2635        data.put_u16_le(env_len as u16);
2636        data.put_u8(20); // Routing
2637        data.put_u16_le(routing_len as u16);
2638        data.put_u8(0); // protocol: TCP
2639        data.put_u16_le(11000); // port
2640        data.put_u16_le(host_utf16.len() as u16);
2641        for c in &host_utf16 {
2642            data.put_u16_le(*c);
2643        }
2644        data.put_u16_le(0); // OldValue: zero-length US_VARBYTE
2645        // A trailing DONE token type byte that must remain for the next read.
2646        data.put_u8(0xFD);
2647
2648        let mut buf: &[u8] = &data;
2649        let env = EnvChange::decode(&mut buf).unwrap();
2650        assert_eq!(env.routing_info(), Some((host, 11000)));
2651        assert_eq!(
2652            buf,
2653            &[0xFD],
2654            "decode must consume exactly the declared ENVCHANGE frame"
2655        );
2656    }
2657
2658    fn put_b_varchar(buf: &mut BytesMut, s: &str) {
2659        let utf16: Vec<u16> = s.encode_utf16().collect();
2660        buf.put_u8(utf16.len() as u8);
2661        for c in utf16 {
2662            buf.put_u16_le(c);
2663        }
2664    }
2665
2666    fn put_us_varchar(buf: &mut BytesMut, s: &str) {
2667        let utf16: Vec<u16> = s.encode_utf16().collect();
2668        buf.put_u16_le(utf16.len() as u16);
2669        for c in utf16 {
2670            buf.put_u16_le(c);
2671        }
2672    }
2673
2674    /// UDT_INFO regression (issue #154): per MS-TDS, DB_NAME, SCHEMA_NAME,
2675    /// and TYPE_NAME are B_VARCHAR (1-byte length); only
2676    /// ASSEMBLY_QUALIFIED_NAME is US_VARCHAR. Reading all four as US_VARCHAR
2677    /// misaligned the stream, so every query selecting a UDT column
2678    /// (geography, geometry, hierarchyid, CLR UDTs) failed with
2679    /// UnexpectedEof.
2680    #[test]
2681    fn test_udt_info_metadata_uses_b_varchar_names() {
2682        let mut data = BytesMut::new();
2683        data.put_u16_le(0xFFFF); // MAX_BYTE_SIZE
2684        put_b_varchar(&mut data, "master");
2685        put_b_varchar(&mut data, "dbo");
2686        put_b_varchar(&mut data, "hierarchyid");
2687        put_us_varchar(
2688            &mut data,
2689            "Microsoft.SqlServer.Types.SqlHierarchyId, Microsoft.SqlServer.Types",
2690        );
2691        // A trailing token type byte that must remain for the next read.
2692        data.put_u8(0xFD);
2693
2694        let mut buf: &[u8] = &data;
2695        let info = decode_type_info(&mut buf, TypeId::Udt, TypeId::Udt as u8).unwrap();
2696        assert_eq!(info.max_length, Some(0xFFFF));
2697        assert_eq!(
2698            buf,
2699            &[0xFD],
2700            "decode must consume exactly the UDT_INFO frame"
2701        );
2702    }
2703
2704    /// XML_INFO regression (issue #154): per MS-TDS §2.2.5.5.3, DBNAME and
2705    /// OWNING_SCHEMA are B_VARCHAR; only XML_SCHEMA_COLLECTION is US_VARCHAR.
2706    /// Schema-bound xml columns (SCHEMA_PRESENT=1) previously misparsed.
2707    #[test]
2708    fn test_xml_info_schema_bound_uses_b_varchar_names() {
2709        let mut data = BytesMut::new();
2710        data.put_u8(1); // SCHEMA_PRESENT
2711        put_b_varchar(&mut data, "master");
2712        put_b_varchar(&mut data, "dbo");
2713        put_us_varchar(&mut data, "MyXmlSchemaCollection");
2714        data.put_u8(0xFD);
2715
2716        let mut buf: &[u8] = &data;
2717        decode_type_info(&mut buf, TypeId::Xml, TypeId::Xml as u8).unwrap();
2718        assert_eq!(
2719            buf,
2720            &[0xFD],
2721            "decode must consume exactly the XML_INFO frame"
2722        );
2723    }
2724
2725    #[test]
2726    fn hostile_env_change_binary_truncated_is_not_panic() {
2727        // length=1 covers only the type byte (0x08 = BeginTransaction, a
2728        // binary-format type); the new_len/old_len prefix reads then hit an
2729        // empty buffer. Must decode gracefully, never panic (found by the
2730        // parse_env_change and parse_token fuzz targets).
2731        let data = [0x01, 0x00, 0x08];
2732        let mut buf: &[u8] = &data;
2733        let env = EnvChange::decode(&mut buf).unwrap();
2734        assert_eq!(env.env_type, EnvChangeType::BeginTransaction);
2735    }
2736
2737    #[test]
2738    fn test_colmetadata_no_columns() {
2739        // No metadata marker (0xFFFF)
2740        let data = Bytes::from_static(&[0xFF, 0xFF]);
2741        let mut cursor: &[u8] = &data;
2742        let meta = ColMetaData::decode(&mut cursor).unwrap();
2743        assert!(meta.is_empty());
2744        assert_eq!(meta.column_count(), 0);
2745    }
2746
2747    #[test]
2748    fn test_colmetadata_single_int_column() {
2749        // COLMETADATA with 1 INT column
2750        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
2751        let mut data = BytesMut::new();
2752        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2753        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2754        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2755        data.extend_from_slice(&[0x38]); // TypeId::Int4
2756        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
2757        data.extend_from_slice(&[0x02]); // 2 characters
2758        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
2759
2760        let mut cursor: &[u8] = &data;
2761        let meta = ColMetaData::decode(&mut cursor).unwrap();
2762
2763        assert_eq!(meta.column_count(), 1);
2764        assert_eq!(meta.columns[0].name, "id");
2765        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2766        assert!(meta.columns[0].is_nullable());
2767    }
2768
2769    #[test]
2770    fn test_colmetadata_nvarchar_column() {
2771        // COLMETADATA with 1 NVARCHAR(50) column
2772        let mut data = BytesMut::new();
2773        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2774        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2775        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2776        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
2777        // Type info: max_length (2 bytes) + collation (5 bytes)
2778        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
2779        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
2780        // Column name "name"
2781        data.extend_from_slice(&[0x04]); // 4 characters
2782        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
2783
2784        let mut cursor: &[u8] = &data;
2785        let meta = ColMetaData::decode(&mut cursor).unwrap();
2786
2787        assert_eq!(meta.column_count(), 1);
2788        assert_eq!(meta.columns[0].name, "name");
2789        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
2790        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
2791        assert!(meta.columns[0].type_info.collation.is_some());
2792    }
2793
2794    #[test]
2795    fn test_raw_row_decode_int() {
2796        // Create metadata for a single INT column
2797        let metadata = ColMetaData {
2798            cek_table: None,
2799            columns: vec![ColumnData {
2800                name: "id".to_string(),
2801                type_id: TypeId::Int4,
2802                col_type: 0x38,
2803                flags: 0,
2804                user_type: 0,
2805                type_info: TypeInfo::default(),
2806                crypto_metadata: None,
2807            }],
2808        };
2809
2810        // Row data: just 4 bytes for the int value 42
2811        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
2812        let mut cursor: &[u8] = &data;
2813        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2814
2815        // The raw data should contain the 4 bytes
2816        assert_eq!(row.data.len(), 4);
2817        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
2818    }
2819
2820    #[test]
2821    fn test_raw_row_decode_nullable_int() {
2822        // Create metadata for a nullable INT column (IntN)
2823        let metadata = ColMetaData {
2824            cek_table: None,
2825            columns: vec![ColumnData {
2826                name: "id".to_string(),
2827                type_id: TypeId::IntN,
2828                col_type: 0x26,
2829                flags: 0x01, // nullable
2830                user_type: 0,
2831                type_info: TypeInfo {
2832                    max_length: Some(4),
2833                    ..Default::default()
2834                },
2835                crypto_metadata: None,
2836            }],
2837        };
2838
2839        // Row data with value: 1 byte length + 4 bytes value
2840        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
2841        let mut cursor: &[u8] = &data;
2842        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2843
2844        assert_eq!(row.data.len(), 5);
2845        assert_eq!(row.data[0], 4); // length
2846        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
2847    }
2848
2849    #[test]
2850    fn test_raw_row_decode_null_value() {
2851        // Create metadata for a nullable INT column (IntN)
2852        let metadata = ColMetaData {
2853            cek_table: None,
2854            columns: vec![ColumnData {
2855                name: "id".to_string(),
2856                type_id: TypeId::IntN,
2857                col_type: 0x26,
2858                flags: 0x01, // nullable
2859                user_type: 0,
2860                type_info: TypeInfo {
2861                    max_length: Some(4),
2862                    ..Default::default()
2863                },
2864                crypto_metadata: None,
2865            }],
2866        };
2867
2868        // NULL value: length = 0xFF (for bytelen types)
2869        let data = Bytes::from_static(&[0xFF]);
2870        let mut cursor: &[u8] = &data;
2871        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2872
2873        assert_eq!(row.data.len(), 1);
2874        assert_eq!(row.data[0], 0xFF); // NULL marker
2875    }
2876
2877    #[test]
2878    fn test_nbcrow_null_bitmap() {
2879        let row = NbcRow {
2880            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
2881            data: Bytes::new(),
2882        };
2883
2884        assert!(row.is_null(0));
2885        assert!(!row.is_null(1));
2886        assert!(row.is_null(2));
2887        assert!(!row.is_null(3));
2888    }
2889
2890    #[test]
2891    fn test_token_parser_colmetadata() {
2892        // Build a COLMETADATA token with 1 INT column
2893        let mut data = BytesMut::new();
2894        data.extend_from_slice(&[0x81]); // COLMETADATA token type
2895        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2896        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2897        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2898        data.extend_from_slice(&[0x38]); // TypeId::Int4
2899        data.extend_from_slice(&[0x02]); // column name length
2900        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
2901
2902        let mut parser = TokenParser::new(data.freeze());
2903        let token = parser.next_token().unwrap().unwrap();
2904
2905        match token {
2906            Token::ColMetaData(meta) => {
2907                assert_eq!(meta.column_count(), 1);
2908                assert_eq!(meta.columns[0].name, "id");
2909                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2910            }
2911            _ => panic!("Expected ColMetaData token"),
2912        }
2913    }
2914
2915    #[test]
2916    fn test_token_parser_row_with_metadata() {
2917        // Build metadata
2918        let metadata = ColMetaData {
2919            cek_table: None,
2920            columns: vec![ColumnData {
2921                name: "id".to_string(),
2922                type_id: TypeId::Int4,
2923                col_type: 0x38,
2924                flags: 0,
2925                user_type: 0,
2926                type_info: TypeInfo::default(),
2927                crypto_metadata: None,
2928            }],
2929        };
2930
2931        // Build ROW token
2932        let mut data = BytesMut::new();
2933        data.extend_from_slice(&[0xD1]); // ROW token type
2934        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2935
2936        let mut parser = TokenParser::new(data.freeze());
2937        let token = parser
2938            .next_token_with_metadata(Some(&metadata))
2939            .unwrap()
2940            .unwrap();
2941
2942        match token {
2943            Token::Row(row) => {
2944                assert_eq!(row.data.len(), 4);
2945            }
2946            _ => panic!("Expected Row token"),
2947        }
2948    }
2949
2950    #[test]
2951    fn test_token_parser_row_without_metadata_fails() {
2952        // Build ROW token
2953        let mut data = BytesMut::new();
2954        data.extend_from_slice(&[0xD1]); // ROW token type
2955        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2956
2957        let mut parser = TokenParser::new(data.freeze());
2958        let result = parser.next_token(); // No metadata provided
2959
2960        assert!(result.is_err());
2961    }
2962
2963    #[test]
2964    fn test_token_parser_peek() {
2965        let data = Bytes::from_static(&[
2966            0xFD, // DONE token type
2967            0x10, 0x00, // status
2968            0xC1, 0x00, // cur_cmd
2969            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
2970        ]);
2971
2972        let parser = TokenParser::new(data);
2973        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
2974    }
2975
2976    #[test]
2977    fn test_column_data_fixed_size() {
2978        let col = ColumnData {
2979            name: String::new(),
2980            type_id: TypeId::Int4,
2981            col_type: 0x38,
2982            flags: 0,
2983            user_type: 0,
2984            type_info: TypeInfo::default(),
2985            crypto_metadata: None,
2986        };
2987        assert_eq!(col.fixed_size(), Some(4));
2988
2989        let col2 = ColumnData {
2990            name: String::new(),
2991            type_id: TypeId::NVarChar,
2992            col_type: 0xE7,
2993            flags: 0,
2994            user_type: 0,
2995            type_info: TypeInfo::default(),
2996            crypto_metadata: None,
2997        };
2998        assert_eq!(col2.fixed_size(), None);
2999    }
3000
3001    // ========================================================================
3002    // End-to-End Decode Tests (Wire → Stored → Verification)
3003    // ========================================================================
3004    //
3005    // These tests verify that RawRow::decode_column_value correctly stores
3006    // column values in a format that can be parsed back.
3007
3008    #[test]
3009    fn test_decode_nvarchar_then_intn_roundtrip() {
3010        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
3011        // This tests the scenario from the MCP parameterized query
3012
3013        // Build wire data (what the server sends)
3014        let mut wire_data = BytesMut::new();
3015
3016        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
3017        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
3018        let word = "World";
3019        let utf16: Vec<u16> = word.encode_utf16().collect();
3020        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
3021        for code_unit in &utf16 {
3022            wire_data.put_u16_le(*code_unit);
3023        }
3024
3025        // Column 1: IntN 42 - 1-byte length prefix
3026        wire_data.put_u8(4); // 4 bytes for INT
3027        wire_data.put_i32_le(42);
3028
3029        // Build column metadata
3030        let metadata = ColMetaData {
3031            cek_table: None,
3032            columns: vec![
3033                ColumnData {
3034                    name: "greeting".to_string(),
3035                    type_id: TypeId::NVarChar,
3036                    col_type: 0xE7,
3037                    flags: 0x01,
3038                    user_type: 0,
3039                    type_info: TypeInfo {
3040                        max_length: Some(10), // non-MAX
3041                        precision: None,
3042                        scale: None,
3043                        collation: None,
3044                    },
3045                    crypto_metadata: None,
3046                },
3047                ColumnData {
3048                    name: "number".to_string(),
3049                    type_id: TypeId::IntN,
3050                    col_type: 0x26,
3051                    flags: 0x01,
3052                    user_type: 0,
3053                    type_info: TypeInfo {
3054                        max_length: Some(4),
3055                        precision: None,
3056                        scale: None,
3057                        collation: None,
3058                    },
3059                    crypto_metadata: None,
3060                },
3061            ],
3062        };
3063
3064        // Decode the wire data into stored format
3065        let mut wire_cursor = wire_data.freeze();
3066        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
3067
3068        // Verify wire data was fully consumed
3069        assert_eq!(
3070            wire_cursor.remaining(),
3071            0,
3072            "wire data should be fully consumed"
3073        );
3074
3075        // Now parse the stored data
3076        let mut stored_cursor: &[u8] = &raw_row.data;
3077
3078        // Parse column 0 (NVarChar)
3079        // Stored format for non-MAX NVarChar: [2-byte len][data]
3080        assert!(
3081            stored_cursor.remaining() >= 2,
3082            "need at least 2 bytes for length"
3083        );
3084        let len0 = stored_cursor.get_u16_le() as usize;
3085        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
3086        assert!(
3087            stored_cursor.remaining() >= len0,
3088            "need {len0} bytes for data"
3089        );
3090
3091        // Read UTF-16LE and convert to string
3092        let mut utf16_read = Vec::new();
3093        for _ in 0..(len0 / 2) {
3094            utf16_read.push(stored_cursor.get_u16_le());
3095        }
3096        let string0 = String::from_utf16(&utf16_read).unwrap();
3097        assert_eq!(string0, "World", "column 0 should be 'World'");
3098
3099        // Parse column 1 (IntN)
3100        // Stored format for IntN: [1-byte len][data]
3101        assert!(
3102            stored_cursor.remaining() >= 1,
3103            "need at least 1 byte for length"
3104        );
3105        let len1 = stored_cursor.get_u8();
3106        assert_eq!(len1, 4, "IntN length should be 4");
3107        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
3108        let int1 = stored_cursor.get_i32_le();
3109        assert_eq!(int1, 42, "column 1 should be 42");
3110
3111        // Verify stored data was fully consumed
3112        assert_eq!(
3113            stored_cursor.remaining(),
3114            0,
3115            "stored data should be fully consumed"
3116        );
3117    }
3118
3119    #[test]
3120    fn test_decode_nvarchar_max_then_intn_roundtrip() {
3121        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
3122
3123        // Build wire data for PLP NVARCHAR(MAX) + IntN
3124        let mut wire_data = BytesMut::new();
3125
3126        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
3127        // PLP: 8-byte total length, then chunks
3128        let word = "Hello";
3129        let utf16: Vec<u16> = word.encode_utf16().collect();
3130        let byte_len = (utf16.len() * 2) as u64;
3131
3132        wire_data.put_u64_le(byte_len); // total length = 10
3133        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
3134        for code_unit in &utf16 {
3135            wire_data.put_u16_le(*code_unit);
3136        }
3137        wire_data.put_u32_le(0); // terminating zero-length chunk
3138
3139        // Column 1: IntN 99
3140        wire_data.put_u8(4);
3141        wire_data.put_i32_le(99);
3142
3143        // Build metadata with MAX type
3144        let metadata = ColMetaData {
3145            cek_table: None,
3146            columns: vec![
3147                ColumnData {
3148                    name: "text".to_string(),
3149                    type_id: TypeId::NVarChar,
3150                    col_type: 0xE7,
3151                    flags: 0x01,
3152                    user_type: 0,
3153                    type_info: TypeInfo {
3154                        max_length: Some(0xFFFF), // MAX indicator
3155                        precision: None,
3156                        scale: None,
3157                        collation: None,
3158                    },
3159                    crypto_metadata: None,
3160                },
3161                ColumnData {
3162                    name: "num".to_string(),
3163                    type_id: TypeId::IntN,
3164                    col_type: 0x26,
3165                    flags: 0x01,
3166                    user_type: 0,
3167                    type_info: TypeInfo {
3168                        max_length: Some(4),
3169                        precision: None,
3170                        scale: None,
3171                        collation: None,
3172                    },
3173                    crypto_metadata: None,
3174                },
3175            ],
3176        };
3177
3178        // Decode wire data
3179        let mut wire_cursor = wire_data.freeze();
3180        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
3181
3182        // Verify wire data was fully consumed
3183        assert_eq!(
3184            wire_cursor.remaining(),
3185            0,
3186            "wire data should be fully consumed"
3187        );
3188
3189        // Parse stored PLP data for column 0
3190        let mut stored_cursor: &[u8] = &raw_row.data;
3191
3192        // PLP stored format: [8-byte total][chunks...][4-byte 0]
3193        let total_len = stored_cursor.get_u64_le();
3194        assert_eq!(total_len, 10, "PLP total length should be 10");
3195
3196        let chunk_len = stored_cursor.get_u32_le();
3197        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
3198
3199        let mut utf16_read = Vec::new();
3200        for _ in 0..(chunk_len / 2) {
3201            utf16_read.push(stored_cursor.get_u16_le());
3202        }
3203        let string0 = String::from_utf16(&utf16_read).unwrap();
3204        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
3205
3206        let terminator = stored_cursor.get_u32_le();
3207        assert_eq!(terminator, 0, "PLP should end with 0");
3208
3209        // Parse IntN
3210        let len1 = stored_cursor.get_u8();
3211        assert_eq!(len1, 4);
3212        let int1 = stored_cursor.get_i32_le();
3213        assert_eq!(int1, 99, "column 1 should be 99");
3214
3215        // Verify fully consumed
3216        assert_eq!(
3217            stored_cursor.remaining(),
3218            0,
3219            "stored data should be fully consumed"
3220        );
3221    }
3222
3223    // ========================================================================
3224    // ReturnStatus Token Tests
3225    // ========================================================================
3226
3227    #[test]
3228    fn test_return_status_via_parser() {
3229        // RETURNSTATUS token: type (0x79) + value (i32 LE)
3230        let data = Bytes::from_static(&[
3231            0x79, // RETURNSTATUS token type
3232            0x00, 0x00, 0x00, 0x00, // return value = 0 (success)
3233        ]);
3234
3235        let mut parser = TokenParser::new(data);
3236        let token = parser.next_token().unwrap().unwrap();
3237
3238        match token {
3239            Token::ReturnStatus(status) => {
3240                assert_eq!(status, 0);
3241            }
3242            _ => panic!("Expected ReturnStatus token, got {token:?}"),
3243        }
3244
3245        assert!(parser.next_token().unwrap().is_none());
3246    }
3247
3248    #[test]
3249    fn test_return_status_nonzero() {
3250        // Return value = -6 (common for error returns)
3251        let mut buf = BytesMut::new();
3252        buf.put_u8(0x79); // RETURNSTATUS
3253        buf.put_i32_le(-6);
3254
3255        let mut parser = TokenParser::new(buf.freeze());
3256        let token = parser.next_token().unwrap().unwrap();
3257
3258        match token {
3259            Token::ReturnStatus(status) => {
3260                assert_eq!(status, -6);
3261            }
3262            _ => panic!("Expected ReturnStatus token"),
3263        }
3264    }
3265
3266    // ========================================================================
3267    // DoneProc Token Tests
3268    // ========================================================================
3269
3270    #[test]
3271    fn test_done_proc_roundtrip() {
3272        let done = DoneProc {
3273            status: DoneStatus {
3274                more: false,
3275                error: false,
3276                in_xact: false,
3277                count: true,
3278                attn: false,
3279                srverror: false,
3280            },
3281            cur_cmd: 0x00C6, // EXECUTE (198)
3282            row_count: 100,
3283        };
3284
3285        let mut buf = BytesMut::new();
3286        done.encode(&mut buf);
3287
3288        // Verify token type byte
3289        assert_eq!(buf[0], 0xFE);
3290
3291        // Skip token type byte and decode
3292        let mut cursor = &buf[1..];
3293        let decoded = DoneProc::decode(&mut cursor).unwrap();
3294
3295        assert!(decoded.status.count);
3296        assert!(!decoded.status.more);
3297        assert!(!decoded.status.error);
3298        assert_eq!(decoded.cur_cmd, 0x00C6);
3299        assert_eq!(decoded.row_count, 100);
3300    }
3301
3302    #[test]
3303    fn test_done_proc_via_parser() {
3304        let data = Bytes::from_static(&[
3305            0xFE, // DONEPROC token type
3306            0x00, 0x00, // status: no flags
3307            0xC6, 0x00, // cur_cmd: EXECUTE (198)
3308            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 0
3309        ]);
3310
3311        let mut parser = TokenParser::new(data);
3312        let token = parser.next_token().unwrap().unwrap();
3313
3314        match token {
3315            Token::DoneProc(done) => {
3316                assert!(!done.status.count);
3317                assert!(!done.status.more);
3318                assert_eq!(done.cur_cmd, 198);
3319                assert_eq!(done.row_count, 0);
3320            }
3321            _ => panic!("Expected DoneProc token"),
3322        }
3323    }
3324
3325    #[test]
3326    fn test_done_proc_with_error_flag() {
3327        let mut buf = BytesMut::new();
3328        buf.put_u8(0xFE); // DONEPROC
3329        buf.put_u16_le(0x0002); // status: DONE_ERROR
3330        buf.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3331        buf.put_u64_le(0); // row_count
3332
3333        let mut parser = TokenParser::new(buf.freeze());
3334        let token = parser.next_token().unwrap().unwrap();
3335
3336        match token {
3337            Token::DoneProc(done) => {
3338                assert!(done.status.error);
3339                assert!(!done.status.count);
3340                assert!(!done.status.more);
3341            }
3342            _ => panic!("Expected DoneProc token"),
3343        }
3344    }
3345
3346    // ========================================================================
3347    // DoneInProc Token Tests
3348    // ========================================================================
3349
3350    #[test]
3351    fn test_done_in_proc_roundtrip() {
3352        let done = DoneInProc {
3353            status: DoneStatus {
3354                more: true,
3355                error: false,
3356                in_xact: false,
3357                count: true,
3358                attn: false,
3359                srverror: false,
3360            },
3361            cur_cmd: 193, // SELECT
3362            row_count: 7,
3363        };
3364
3365        let mut buf = BytesMut::new();
3366        done.encode(&mut buf);
3367
3368        assert_eq!(buf[0], 0xFF);
3369
3370        let mut cursor = &buf[1..];
3371        let decoded = DoneInProc::decode(&mut cursor).unwrap();
3372
3373        assert!(decoded.status.more);
3374        assert!(decoded.status.count);
3375        assert!(!decoded.status.error);
3376        assert_eq!(decoded.cur_cmd, 193);
3377        assert_eq!(decoded.row_count, 7);
3378    }
3379
3380    #[test]
3381    fn test_done_in_proc_via_parser() {
3382        let data = Bytes::from_static(&[
3383            0xFF, // DONEINPROC token type
3384            0x11, 0x00, // status: MORE | COUNT
3385            0xC1, 0x00, // cur_cmd: SELECT (193)
3386            0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 3
3387        ]);
3388
3389        let mut parser = TokenParser::new(data);
3390        let token = parser.next_token().unwrap().unwrap();
3391
3392        match token {
3393            Token::DoneInProc(done) => {
3394                assert!(done.status.more);
3395                assert!(done.status.count);
3396                assert_eq!(done.cur_cmd, 193);
3397                assert_eq!(done.row_count, 3);
3398            }
3399            _ => panic!("Expected DoneInProc token"),
3400        }
3401    }
3402
3403    // ========================================================================
3404    // ServerError Token Tests
3405    // ========================================================================
3406
3407    #[test]
3408    fn test_server_error_decode() {
3409        // Build a realistic ERROR token (without the 0xAA type byte,
3410        // since decode() is called after the parser strips it).
3411        let mut buf = BytesMut::new();
3412
3413        // Construct the message fields first to compute length
3414        let msg_utf16: Vec<u16> = "Invalid column name 'foo'.".encode_utf16().collect();
3415        let srv_utf16: Vec<u16> = "SQLDB01".encode_utf16().collect();
3416        let proc_utf16: Vec<u16> = "".encode_utf16().collect();
3417
3418        // Length = number(4) + state(1) + class(1)
3419        //        + us_varchar(message): 2 + msg_utf16.len()*2
3420        //        + b_varchar(server): 1 + srv_utf16.len()*2
3421        //        + b_varchar(procedure): 1 + proc_utf16.len()*2
3422        //        + line(4)
3423        let length: u16 = (4
3424            + 1
3425            + 1
3426            + 2
3427            + (msg_utf16.len() * 2)
3428            + 1
3429            + (srv_utf16.len() * 2)
3430            + 1
3431            + (proc_utf16.len() * 2)
3432            + 4) as u16;
3433
3434        buf.put_u16_le(length);
3435        buf.put_i32_le(207); // error number: Invalid column
3436        buf.put_u8(1); // state
3437        buf.put_u8(16); // class (severity 16)
3438
3439        // Message (US_VARCHAR: 2-byte char count + UTF-16LE)
3440        buf.put_u16_le(msg_utf16.len() as u16);
3441        for &c in &msg_utf16 {
3442            buf.put_u16_le(c);
3443        }
3444
3445        // Server (B_VARCHAR: 1-byte char count + UTF-16LE)
3446        buf.put_u8(srv_utf16.len() as u8);
3447        for &c in &srv_utf16 {
3448            buf.put_u16_le(c);
3449        }
3450
3451        // Procedure (B_VARCHAR: empty)
3452        buf.put_u8(proc_utf16.len() as u8);
3453
3454        // Line number
3455        buf.put_i32_le(42);
3456
3457        let mut cursor = buf.freeze();
3458        let error = ServerError::decode(&mut cursor).unwrap();
3459
3460        assert_eq!(error.number, 207);
3461        assert_eq!(error.state, 1);
3462        assert_eq!(error.class, 16);
3463        assert_eq!(error.message, "Invalid column name 'foo'.");
3464        assert_eq!(error.server, "SQLDB01");
3465        assert_eq!(error.procedure, "");
3466        assert_eq!(error.line, 42);
3467    }
3468
3469    #[test]
3470    fn test_server_error_severity_helpers() {
3471        let fatal = ServerError {
3472            number: 4014,
3473            state: 1,
3474            class: 20,
3475            message: "Fatal error".to_string(),
3476            server: String::new(),
3477            procedure: String::new(),
3478            line: 0,
3479        };
3480        assert!(fatal.is_fatal());
3481        assert!(fatal.is_batch_abort());
3482
3483        let batch_abort = ServerError {
3484            number: 547,
3485            state: 0,
3486            class: 16,
3487            message: "Constraint violation".to_string(),
3488            server: String::new(),
3489            procedure: String::new(),
3490            line: 1,
3491        };
3492        assert!(!batch_abort.is_fatal());
3493        assert!(batch_abort.is_batch_abort());
3494
3495        let informational = ServerError {
3496            number: 5701,
3497            state: 2,
3498            class: 10,
3499            message: "Changed db context".to_string(),
3500            server: String::new(),
3501            procedure: String::new(),
3502            line: 0,
3503        };
3504        assert!(!informational.is_fatal());
3505        assert!(!informational.is_batch_abort());
3506    }
3507
3508    #[test]
3509    fn test_server_error_via_parser() {
3510        // Build an ERROR token with the 0xAA type byte for the parser
3511        let mut buf = BytesMut::new();
3512        buf.put_u8(0xAA); // ERROR token type
3513
3514        let msg_utf16: Vec<u16> = "Syntax error".encode_utf16().collect();
3515        let srv_utf16: Vec<u16> = "SRV".encode_utf16().collect();
3516        let proc_utf16: Vec<u16> = "sp_test".encode_utf16().collect();
3517
3518        let length: u16 = (4
3519            + 1
3520            + 1
3521            + 2
3522            + (msg_utf16.len() * 2)
3523            + 1
3524            + (srv_utf16.len() * 2)
3525            + 1
3526            + (proc_utf16.len() * 2)
3527            + 4) as u16;
3528
3529        buf.put_u16_le(length);
3530        buf.put_i32_le(102); // Syntax error
3531        buf.put_u8(1);
3532        buf.put_u8(15);
3533
3534        buf.put_u16_le(msg_utf16.len() as u16);
3535        for &c in &msg_utf16 {
3536            buf.put_u16_le(c);
3537        }
3538        buf.put_u8(srv_utf16.len() as u8);
3539        for &c in &srv_utf16 {
3540            buf.put_u16_le(c);
3541        }
3542        buf.put_u8(proc_utf16.len() as u8);
3543        for &c in &proc_utf16 {
3544            buf.put_u16_le(c);
3545        }
3546        buf.put_i32_le(5);
3547
3548        let mut parser = TokenParser::new(buf.freeze());
3549        let token = parser.next_token().unwrap().unwrap();
3550
3551        match token {
3552            Token::Error(err) => {
3553                assert_eq!(err.number, 102);
3554                assert_eq!(err.class, 15);
3555                assert_eq!(err.message, "Syntax error");
3556                assert_eq!(err.server, "SRV");
3557                assert_eq!(err.procedure, "sp_test");
3558                assert_eq!(err.line, 5);
3559            }
3560            _ => panic!("Expected Error token"),
3561        }
3562    }
3563
3564    // ========================================================================
3565    // ReturnValue Token Tests
3566    // ========================================================================
3567
3568    /// Helper: build a ReturnValue token (without the 0xAC type byte)
3569    /// for an IntN output parameter.
3570    fn build_return_value_intn(
3571        ordinal: u16,
3572        name: &str,
3573        status: u8,
3574        value: Option<i32>,
3575    ) -> BytesMut {
3576        let mut inner = BytesMut::new();
3577
3578        // param_ordinal
3579        inner.put_u16_le(ordinal);
3580
3581        // param_name (B_VARCHAR)
3582        let name_utf16: Vec<u16> = name.encode_utf16().collect();
3583        inner.put_u8(name_utf16.len() as u8);
3584        for &c in &name_utf16 {
3585            inner.put_u16_le(c);
3586        }
3587
3588        // status
3589        inner.put_u8(status);
3590
3591        // user_type (4 bytes)
3592        inner.put_u32_le(0);
3593
3594        // flags (2 bytes)
3595        inner.put_u16_le(0x0001); // nullable
3596
3597        // type_id: IntN = 0x26
3598        inner.put_u8(0x26);
3599
3600        // type_info for IntN: 1-byte max_length
3601        inner.put_u8(4);
3602
3603        // value (TYPE_VARBYTE for IntN: 1-byte length + data)
3604        match value {
3605            Some(v) => {
3606                inner.put_u8(4); // length = 4
3607                inner.put_i32_le(v);
3608            }
3609            None => {
3610                inner.put_u8(0); // length = 0 means NULL
3611            }
3612        }
3613
3614        // RETURNVALUE has no outer length prefix (MS-TDS §2.2.7.18) — the
3615        // decoder walks the inner fields directly after the 0xAC token byte.
3616        inner
3617    }
3618
3619    #[test]
3620    fn test_return_value_int_output() {
3621        let buf = build_return_value_intn(1, "@result", 0x01, Some(42));
3622        let mut cursor = buf.freeze();
3623        let rv = ReturnValue::decode(&mut cursor).unwrap();
3624
3625        assert_eq!(rv.param_ordinal, 1);
3626        assert_eq!(rv.param_name, "@result");
3627        assert_eq!(rv.status, 0x01); // OUTPUT
3628        assert_eq!(rv.col_type, 0x26); // IntN
3629        assert_eq!(rv.type_info.max_length, Some(4));
3630        // Value should contain: length byte (4) + i32 LE (42)
3631        assert_eq!(rv.value.len(), 5);
3632        assert_eq!(rv.value[0], 4);
3633        assert_eq!(
3634            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3635            42
3636        );
3637    }
3638
3639    #[test]
3640    fn test_return_value_null_output() {
3641        let buf = build_return_value_intn(2, "@count", 0x01, None);
3642        let mut cursor = buf.freeze();
3643        let rv = ReturnValue::decode(&mut cursor).unwrap();
3644
3645        assert_eq!(rv.param_ordinal, 2);
3646        assert_eq!(rv.param_name, "@count");
3647        assert_eq!(rv.status, 0x01);
3648        assert_eq!(rv.col_type, 0x26);
3649        // NULL value: length byte = 0
3650        assert_eq!(rv.value.len(), 1);
3651        assert_eq!(rv.value[0], 0);
3652    }
3653
3654    #[test]
3655    fn test_return_value_udf_status() {
3656        // UDF return value has status = 0x02
3657        let buf = build_return_value_intn(0, "@RETURN_VALUE", 0x02, Some(-1));
3658        let mut cursor = buf.freeze();
3659        let rv = ReturnValue::decode(&mut cursor).unwrap();
3660
3661        assert_eq!(rv.param_ordinal, 0);
3662        assert_eq!(rv.param_name, "@RETURN_VALUE");
3663        assert_eq!(rv.status, 0x02); // UDF return value
3664        assert_eq!(rv.value[0], 4);
3665        assert_eq!(
3666            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3667            -1
3668        );
3669    }
3670
3671    #[test]
3672    fn test_return_value_nvarchar_output() {
3673        // Build a ReturnValue for NVARCHAR(100) output parameter
3674        let mut inner = BytesMut::new();
3675
3676        // param_ordinal
3677        inner.put_u16_le(1);
3678
3679        // param_name "@name"
3680        let name_utf16: Vec<u16> = "@name".encode_utf16().collect();
3681        inner.put_u8(name_utf16.len() as u8);
3682        for &c in &name_utf16 {
3683            inner.put_u16_le(c);
3684        }
3685
3686        // status = OUTPUT
3687        inner.put_u8(0x01);
3688        // user_type
3689        inner.put_u32_le(0);
3690        // flags (nullable)
3691        inner.put_u16_le(0x0001);
3692        // type_id: NVarChar = 0xE7
3693        inner.put_u8(0xE7);
3694        // type_info for NVarChar: 2-byte max_length + 5-byte collation
3695        inner.put_u16_le(200); // max 100 chars * 2 bytes
3696        inner.put_u32_le(0x0904D000); // collation LCID
3697        inner.put_u8(0x34); // collation sort_id
3698
3699        // value: "Hello" in UTF-16LE with 2-byte length prefix
3700        let val_utf16: Vec<u16> = "Hello".encode_utf16().collect();
3701        let byte_len = (val_utf16.len() * 2) as u16;
3702        inner.put_u16_le(byte_len);
3703        for &c in &val_utf16 {
3704            inner.put_u16_le(c);
3705        }
3706
3707        let mut cursor = inner.freeze();
3708        let rv = ReturnValue::decode(&mut cursor).unwrap();
3709
3710        assert_eq!(rv.param_ordinal, 1);
3711        assert_eq!(rv.param_name, "@name");
3712        assert_eq!(rv.status, 0x01);
3713        assert_eq!(rv.col_type, 0xE7); // NVarChar
3714        assert_eq!(rv.type_info.max_length, Some(200));
3715        assert!(rv.type_info.collation.is_some());
3716
3717        // Value: 2-byte length (10) + "Hello" in UTF-16LE
3718        assert_eq!(rv.value.len(), 12); // 2 + 10
3719        let val_len = u16::from_le_bytes([rv.value[0], rv.value[1]]);
3720        assert_eq!(val_len, 10);
3721    }
3722
3723    #[test]
3724    fn test_return_value_via_parser() {
3725        // Build a full ReturnValue token with the 0xAC type byte
3726        let mut data = BytesMut::new();
3727        data.put_u8(0xAC); // RETURNVALUE token type
3728        data.extend_from_slice(&build_return_value_intn(0, "@out", 0x01, Some(99)));
3729
3730        let mut parser = TokenParser::new(data.freeze());
3731        let token = parser.next_token().unwrap().unwrap();
3732
3733        match token {
3734            Token::ReturnValue(rv) => {
3735                assert_eq!(rv.param_name, "@out");
3736                assert_eq!(rv.param_ordinal, 0);
3737                assert_eq!(rv.status, 0x01);
3738                assert_eq!(rv.col_type, 0x26);
3739            }
3740            _ => panic!("Expected ReturnValue token"),
3741        }
3742    }
3743
3744    // ========================================================================
3745    // Multi-Token Stream Tests
3746    // ========================================================================
3747
3748    #[test]
3749    fn test_multi_token_stored_proc_response() {
3750        // Simulate a stored procedure response:
3751        // DoneInProc (result set done) → ReturnStatus → DoneProc
3752        let mut data = BytesMut::new();
3753
3754        // Token 1: DONEINPROC — result set with 3 rows
3755        data.put_u8(0xFF); // DONEINPROC
3756        data.put_u16_le(0x0010); // status: COUNT
3757        data.put_u16_le(0x00C1); // cur_cmd: SELECT
3758        data.put_u64_le(3); // row_count
3759
3760        // Token 2: RETURNSTATUS — procedure returned 0
3761        data.put_u8(0x79); // RETURNSTATUS
3762        data.put_i32_le(0);
3763
3764        // Token 3: DONEPROC — final
3765        data.put_u8(0xFE); // DONEPROC
3766        data.put_u16_le(0x0000); // status: no flags
3767        data.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3768        data.put_u64_le(0);
3769
3770        let mut parser = TokenParser::new(data.freeze());
3771
3772        // Token 1: DoneInProc
3773        let t1 = parser.next_token().unwrap().unwrap();
3774        match t1 {
3775            Token::DoneInProc(done) => {
3776                assert!(done.status.count);
3777                assert_eq!(done.row_count, 3);
3778                assert_eq!(done.cur_cmd, 193);
3779            }
3780            _ => panic!("Expected DoneInProc, got {t1:?}"),
3781        }
3782
3783        // Token 2: ReturnStatus
3784        let t2 = parser.next_token().unwrap().unwrap();
3785        match t2 {
3786            Token::ReturnStatus(status) => {
3787                assert_eq!(status, 0);
3788            }
3789            _ => panic!("Expected ReturnStatus, got {t2:?}"),
3790        }
3791
3792        // Token 3: DoneProc
3793        let t3 = parser.next_token().unwrap().unwrap();
3794        match t3 {
3795            Token::DoneProc(done) => {
3796                assert!(!done.status.count);
3797                assert!(!done.status.more);
3798                assert_eq!(done.cur_cmd, 198);
3799            }
3800            _ => panic!("Expected DoneProc, got {t3:?}"),
3801        }
3802
3803        // No more tokens
3804        assert!(parser.next_token().unwrap().is_none());
3805    }
3806
3807    #[test]
3808    fn test_multi_token_error_in_stream() {
3809        // Simulate: ERROR → DONE (error during query)
3810        let mut data = BytesMut::new();
3811
3812        // Token 1: ERROR
3813        data.put_u8(0xAA);
3814
3815        let msg_utf16: Vec<u16> = "Deadlock".encode_utf16().collect();
3816        let srv_utf16: Vec<u16> = "DB1".encode_utf16().collect();
3817
3818        let length: u16 = (4 + 1 + 1
3819            + 2 + (msg_utf16.len() * 2)
3820            + 1 + (srv_utf16.len() * 2)
3821            + 1  // empty procedure
3822            + 4) as u16;
3823
3824        data.put_u16_le(length);
3825        data.put_i32_le(1205); // deadlock
3826        data.put_u8(51); // state
3827        data.put_u8(13); // class
3828
3829        data.put_u16_le(msg_utf16.len() as u16);
3830        for &c in &msg_utf16 {
3831            data.put_u16_le(c);
3832        }
3833        data.put_u8(srv_utf16.len() as u8);
3834        for &c in &srv_utf16 {
3835            data.put_u16_le(c);
3836        }
3837        data.put_u8(0); // empty procedure
3838        data.put_i32_le(0);
3839
3840        // Token 2: DONE with error flag
3841        data.put_u8(0xFD);
3842        data.put_u16_le(0x0002); // DONE_ERROR
3843        data.put_u16_le(0x00C1); // SELECT
3844        data.put_u64_le(0);
3845
3846        let mut parser = TokenParser::new(data.freeze());
3847
3848        // Token 1: Error
3849        let t1 = parser.next_token().unwrap().unwrap();
3850        match t1 {
3851            Token::Error(err) => {
3852                assert_eq!(err.number, 1205);
3853                assert_eq!(err.class, 13);
3854                assert_eq!(err.message, "Deadlock");
3855                assert_eq!(err.server, "DB1");
3856            }
3857            _ => panic!("Expected Error token, got {t1:?}"),
3858        }
3859
3860        // Token 2: Done with error
3861        let t2 = parser.next_token().unwrap().unwrap();
3862        match t2 {
3863            Token::Done(done) => {
3864                assert!(done.status.error);
3865                assert!(!done.status.count);
3866            }
3867            _ => panic!("Expected Done token, got {t2:?}"),
3868        }
3869
3870        assert!(parser.next_token().unwrap().is_none());
3871    }
3872
3873    #[test]
3874    fn test_multi_token_proc_with_return_value() {
3875        // Simulate stored proc: ReturnValue → ReturnStatus → DoneProc
3876        let mut data = BytesMut::new();
3877
3878        // Token 1: ReturnValue (@result = 42)
3879        data.put_u8(0xAC);
3880        data.extend_from_slice(&build_return_value_intn(1, "@result", 0x01, Some(42)));
3881
3882        // Token 2: ReturnStatus = 0
3883        data.put_u8(0x79);
3884        data.put_i32_le(0);
3885
3886        // Token 3: DoneProc
3887        data.put_u8(0xFE);
3888        data.put_u16_le(0x0000);
3889        data.put_u16_le(0x00C6);
3890        data.put_u64_le(0);
3891
3892        let mut parser = TokenParser::new(data.freeze());
3893
3894        let t1 = parser.next_token().unwrap().unwrap();
3895        match t1 {
3896            Token::ReturnValue(rv) => {
3897                assert_eq!(rv.param_name, "@result");
3898                assert_eq!(rv.param_ordinal, 1);
3899            }
3900            _ => panic!("Expected ReturnValue, got {t1:?}"),
3901        }
3902
3903        let t2 = parser.next_token().unwrap().unwrap();
3904        assert!(matches!(t2, Token::ReturnStatus(0)));
3905
3906        let t3 = parser.next_token().unwrap().unwrap();
3907        assert!(matches!(t3, Token::DoneProc(_)));
3908
3909        assert!(parser.next_token().unwrap().is_none());
3910    }
3911
3912    // ========================================================================
3913    // EOF / Truncation Edge Cases
3914    // ========================================================================
3915
3916    #[test]
3917    fn test_return_status_truncated() {
3918        // Only 3 bytes instead of 4 for i32
3919        let data = Bytes::from_static(&[0x79, 0x01, 0x02, 0x03]);
3920        let mut parser = TokenParser::new(data);
3921        assert!(parser.next_token().is_err());
3922    }
3923
3924    #[test]
3925    fn test_done_proc_truncated() {
3926        // Only 8 bytes instead of 12
3927        let data = Bytes::from_static(&[0xFE, 0x00, 0x00, 0xC1, 0x00, 0x01, 0x00, 0x00, 0x00]);
3928        let mut parser = TokenParser::new(data);
3929        assert!(parser.next_token().is_err());
3930    }
3931
3932    #[test]
3933    fn test_server_error_truncated() {
3934        // ERROR token with only the length field (body truncated)
3935        let data = Bytes::from_static(&[0xAA, 0x20, 0x00]);
3936        let mut parser = TokenParser::new(data);
3937        assert!(parser.next_token().is_err());
3938    }
3939}