Skip to main content

tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,no_run
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! fn parse(data: Bytes) -> Result<(), tds_protocol::ProtocolError> {
19//!     let mut parser = TokenParser::new(data);
20//!
21//!     while let Some(token) = parser.next_token()? {
22//!         match token {
23//!             Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!             Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!             _ => {}
26//!         }
27//!     }
28//!     Ok(())
29//! }
30//! ```
31
32use bytes::{Buf, BufMut, Bytes};
33
34use crate::codec::{read_b_varchar, read_us_varchar};
35use crate::error::ProtocolError;
36use crate::prelude::*;
37use crate::types::TypeId;
38
39/// Token type identifier.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41#[repr(u8)]
42#[non_exhaustive]
43pub enum TokenType {
44    /// Column metadata (COLMETADATA).
45    ColMetaData = 0x81,
46    /// Error message (ERROR).
47    Error = 0xAA,
48    /// Informational message (INFO).
49    Info = 0xAB,
50    /// Login acknowledgment (LOGINACK).
51    LoginAck = 0xAD,
52    /// Row data (ROW).
53    Row = 0xD1,
54    /// Null bitmap compressed row (NBCROW).
55    NbcRow = 0xD2,
56    /// Environment change (ENVCHANGE).
57    EnvChange = 0xE3,
58    /// SSPI authentication (SSPI).
59    Sspi = 0xED,
60    /// Done (DONE).
61    Done = 0xFD,
62    /// Done in procedure (DONEINPROC).
63    DoneInProc = 0xFF,
64    /// Done procedure (DONEPROC).
65    DoneProc = 0xFE,
66    /// Return status (RETURNSTATUS).
67    ReturnStatus = 0x79,
68    /// Return value (RETURNVALUE).
69    ReturnValue = 0xAC,
70    /// Order (ORDER).
71    Order = 0xA9,
72    /// Feature extension acknowledgment (FEATUREEXTACK).
73    FeatureExtAck = 0xAE,
74    /// Session state (SESSIONSTATE).
75    SessionState = 0xE4,
76    /// Federated authentication info (FEDAUTHINFO).
77    FedAuthInfo = 0xEE,
78    /// Column info (COLINFO).
79    ColInfo = 0xA5,
80    /// Table name (TABNAME).
81    TabName = 0xA4,
82    /// Offset (OFFSET).
83    Offset = 0x78,
84    /// Compute (COMPUTE BY) column metadata (ALTMETADATA). Deprecated in TDS 7.4.
85    AltMetaData = 0x88,
86    /// Compute (COMPUTE BY) row data (ALTROW). Deprecated in TDS 7.4.
87    AltRow = 0xD3,
88}
89
90impl TokenType {
91    /// Create a token type from a raw byte.
92    pub fn from_u8(value: u8) -> Option<Self> {
93        match value {
94            0x81 => Some(Self::ColMetaData),
95            0xAA => Some(Self::Error),
96            0xAB => Some(Self::Info),
97            0xAD => Some(Self::LoginAck),
98            0xD1 => Some(Self::Row),
99            0xD2 => Some(Self::NbcRow),
100            0xE3 => Some(Self::EnvChange),
101            0xED => Some(Self::Sspi),
102            0xFD => Some(Self::Done),
103            0xFF => Some(Self::DoneInProc),
104            0xFE => Some(Self::DoneProc),
105            0x79 => Some(Self::ReturnStatus),
106            0xAC => Some(Self::ReturnValue),
107            0xA9 => Some(Self::Order),
108            0xAE => Some(Self::FeatureExtAck),
109            0xE4 => Some(Self::SessionState),
110            0xEE => Some(Self::FedAuthInfo),
111            0xA5 => Some(Self::ColInfo),
112            0xA4 => Some(Self::TabName),
113            0x78 => Some(Self::Offset),
114            0x88 => Some(Self::AltMetaData),
115            0xD3 => Some(Self::AltRow),
116            _ => None,
117        }
118    }
119}
120
121/// Parsed TDS token.
122///
123/// This enum represents all possible tokens that can be received from SQL Server.
124/// Each variant contains the parsed token data.
125#[derive(Debug, Clone)]
126#[non_exhaustive]
127pub enum Token {
128    /// Column metadata describing result set structure.
129    ColMetaData(ColMetaData),
130    /// Row data.
131    Row(RawRow),
132    /// Null bitmap compressed row.
133    NbcRow(NbcRow),
134    /// Completion of a SQL statement.
135    Done(Done),
136    /// Completion of a stored procedure.
137    DoneProc(DoneProc),
138    /// Completion within a stored procedure.
139    DoneInProc(DoneInProc),
140    /// Return status from stored procedure.
141    ReturnStatus(i32),
142    /// Return value from stored procedure.
143    ReturnValue(ReturnValue),
144    /// Error message from server.
145    Error(ServerError),
146    /// Informational message from server.
147    Info(ServerInfo),
148    /// Login acknowledgment.
149    LoginAck(LoginAck),
150    /// Environment change notification.
151    EnvChange(EnvChange),
152    /// Column ordering information.
153    Order(Order),
154    /// Feature extension acknowledgment.
155    FeatureExtAck(FeatureExtAck),
156    /// SSPI authentication data.
157    Sspi(SspiToken),
158    /// Session state information.
159    SessionState(SessionState),
160    /// Federated authentication info.
161    FedAuthInfo(FedAuthInfo),
162}
163
164/// Column metadata token.
165#[derive(Debug, Clone, Default)]
166pub struct ColMetaData {
167    /// Column definitions.
168    pub columns: Vec<ColumnData>,
169    /// CEK table for Always Encrypted result sets.
170    /// Present only when the server sends encrypted column metadata.
171    pub cek_table: Option<crate::crypto::CekTable>,
172}
173
174/// Column definition within metadata.
175#[derive(Debug, Clone)]
176pub struct ColumnData {
177    /// Column name.
178    pub name: String,
179    /// Column data type ID.
180    pub type_id: TypeId,
181    /// Column data type raw byte (for unknown types).
182    pub col_type: u8,
183    /// Column flags.
184    pub flags: u16,
185    /// User type ID.
186    pub user_type: u32,
187    /// Type-specific metadata.
188    pub type_info: TypeInfo,
189    /// Per-column encryption metadata (Always Encrypted).
190    /// Present only for columns with the encrypted flag (0x0800) set.
191    pub crypto_metadata: Option<crate::crypto::CryptoMetadata>,
192}
193
194/// Type-specific metadata.
195#[derive(Debug, Clone, Default)]
196pub struct TypeInfo {
197    /// Maximum length for variable-length types.
198    pub max_length: Option<u32>,
199    /// Precision for numeric types.
200    pub precision: Option<u8>,
201    /// Scale for numeric types.
202    pub scale: Option<u8>,
203    /// Collation for string types.
204    pub collation: Option<Collation>,
205}
206
207/// SQL Server collation.
208///
209/// Collations in SQL Server define the character encoding and sorting rules
210/// for string data. For `VARCHAR` columns, the collation determines which
211/// code page (character encoding) is used to store the data.
212///
213/// # Encoding Support
214///
215/// When the `encoding` feature is enabled, the [`Collation::encoding()`] method
216/// returns the appropriate [`encoding_rs::Encoding`] for decoding `VARCHAR` data.
217///
218/// # Example
219///
220/// ```rust,ignore
221/// use tds_protocol::token::Collation;
222///
223/// let collation = Collation { lcid: 0x0804, sort_id: 0 }; // Chinese (PRC)
224/// if let Some(encoding) = collation.encoding() {
225///     let (decoded, _, _) = encoding.decode(raw_bytes);
226///     // decoded is now proper Chinese text
227/// }
228/// ```
229#[derive(Debug, Clone, Copy, Default)]
230pub struct Collation {
231    /// Locale ID (LCID).
232    ///
233    /// The LCID encodes both the language and region. The lower 16 bits
234    /// contain the primary language ID, and bits 16-19 contain the sort ID
235    /// for some collations.
236    ///
237    /// For UTF-8 collations (SQL Server 2019+), fUTF8 (bit 26, 0x0400_0000) is set.
238    pub lcid: u32,
239    /// Sort ID.
240    ///
241    /// Used with certain collations to specify sorting behavior.
242    pub sort_id: u8,
243}
244
245impl Collation {
246    /// Create a `Collation` from the 5-byte TDS wire format.
247    ///
248    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
249    pub fn from_bytes(bytes: &[u8; 5]) -> Self {
250        Self {
251            lcid: u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
252            sort_id: bytes[4],
253        }
254    }
255
256    /// Serialize to the 5-byte TDS wire format.
257    ///
258    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
259    pub fn to_bytes(&self) -> [u8; 5] {
260        let b = self.lcid.to_le_bytes();
261        [b[0], b[1], b[2], b[3], self.sort_id]
262    }
263
264    /// Returns the character encoding for this collation.
265    ///
266    /// This method maps the collation's LCID to the appropriate character
267    /// encoding from the `encoding_rs` crate.
268    ///
269    /// # Returns
270    ///
271    /// - `Some(&Encoding)` - The encoding to use for decoding `VARCHAR` data
272    /// - `None` - If the collation uses UTF-8 (no transcoding needed) or
273    ///   the LCID is not recognized (caller should use Windows-1252 fallback)
274    ///
275    /// # UTF-8 Collations
276    ///
277    /// SQL Server 2019+ supports UTF-8 collations (identified by the `_UTF8`
278    /// suffix). These return `None` because no transcoding is needed.
279    ///
280    /// # Example
281    ///
282    /// ```rust,ignore
283    /// let collation = Collation { lcid: 0x0419, sort_id: 0 }; // Russian
284    /// if let Some(encoding) = collation.encoding() {
285    ///     // encoding is Windows-1251 for Cyrillic
286    ///     let (text, _, had_errors) = encoding.decode(&raw_bytes);
287    /// }
288    /// ```
289    #[cfg(feature = "encoding")]
290    pub fn encoding(&self) -> Option<&'static encoding_rs::Encoding> {
291        // A non-zero SortId means a SQL collation, whose code page derives
292        // from the SortId, not the LCID (MS-TDS). Consulting only the LCID
293        // silently decoded these as windows-1252 (issue #158).
294        if self.sort_id != 0 {
295            return crate::collation::encoding_for_sort_id(self.sort_id);
296        }
297        crate::collation::encoding_for_lcid(self.lcid)
298    }
299
300    /// Returns whether this collation uses UTF-8 encoding.
301    ///
302    /// UTF-8 collations were introduced in SQL Server 2019 and are
303    /// identified by the `_UTF8` suffix in the collation name.
304    #[cfg(feature = "encoding")]
305    pub fn is_utf8(&self) -> bool {
306        crate::collation::is_utf8_collation(self.lcid)
307    }
308
309    /// Returns the Windows code page number for this collation.
310    ///
311    /// Useful for error messages and debugging.
312    ///
313    /// # Returns
314    ///
315    /// The code page number (e.g., 1252 for Western European, 932 for Japanese).
316    #[cfg(feature = "encoding")]
317    pub fn code_page(&self) -> Option<u16> {
318        // SQL collations (non-zero SortId) derive their code page from the
319        // SortId, not the LCID (issue #158).
320        if self.sort_id != 0 {
321            return crate::collation::code_page_for_sort_id(self.sort_id);
322        }
323        crate::collation::code_page_for_lcid(self.lcid)
324    }
325
326    /// Returns the encoding name for this collation.
327    ///
328    /// Useful for error messages and debugging.
329    #[cfg(feature = "encoding")]
330    pub fn encoding_name(&self) -> &'static str {
331        if self.sort_id != 0 {
332            return match crate::collation::encoding_for_sort_id(self.sort_id) {
333                Some(enc) => enc.name(),
334                None => "unsupported",
335            };
336        }
337        crate::collation::encoding_name_for_lcid(self.lcid)
338    }
339}
340
341/// Raw row data (not yet decoded).
342#[derive(Debug, Clone)]
343pub struct RawRow {
344    /// Raw column values.
345    pub data: bytes::Bytes,
346}
347
348/// Null bitmap compressed row.
349#[derive(Debug, Clone)]
350pub struct NbcRow {
351    /// Null bitmap.
352    pub null_bitmap: Vec<u8>,
353    /// Raw non-null column values.
354    pub data: bytes::Bytes,
355}
356
357/// Done token indicating statement completion.
358#[derive(Debug, Clone, Copy)]
359pub struct Done {
360    /// Status flags.
361    pub status: DoneStatus,
362    /// Current command.
363    pub cur_cmd: u16,
364    /// Row count (if applicable).
365    pub row_count: u64,
366}
367
368/// Done status flags.
369#[derive(Debug, Clone, Copy, Default)]
370#[non_exhaustive]
371pub struct DoneStatus {
372    /// More results follow.
373    pub more: bool,
374    /// Error occurred.
375    pub error: bool,
376    /// Transaction in progress.
377    pub in_xact: bool,
378    /// Row count is valid.
379    pub count: bool,
380    /// Attention acknowledgment.
381    pub attn: bool,
382    /// Server error caused statement termination.
383    pub srverror: bool,
384}
385
386/// Done in procedure token.
387#[derive(Debug, Clone, Copy)]
388pub struct DoneInProc {
389    /// Status flags.
390    pub status: DoneStatus,
391    /// Current command.
392    pub cur_cmd: u16,
393    /// Row count.
394    pub row_count: u64,
395}
396
397/// Done procedure token.
398#[derive(Debug, Clone, Copy)]
399pub struct DoneProc {
400    /// Status flags.
401    pub status: DoneStatus,
402    /// Current command.
403    pub cur_cmd: u16,
404    /// Row count.
405    pub row_count: u64,
406}
407
408/// Return value from stored procedure.
409#[derive(Debug, Clone)]
410#[non_exhaustive]
411pub struct ReturnValue {
412    /// Parameter ordinal.
413    pub param_ordinal: u16,
414    /// Parameter name.
415    pub param_name: String,
416    /// Status flags.
417    pub status: u8,
418    /// User type.
419    pub user_type: u32,
420    /// Type flags.
421    pub flags: u16,
422    /// Raw column type byte from the wire.
423    pub col_type: u8,
424    /// Type info.
425    pub type_info: TypeInfo,
426    /// Value data.
427    pub value: bytes::Bytes,
428}
429
430/// Server error message.
431#[derive(Debug, Clone)]
432pub struct ServerError {
433    /// Error number.
434    pub number: i32,
435    /// Error state.
436    pub state: u8,
437    /// Error severity class.
438    pub class: u8,
439    /// Error message text.
440    pub message: String,
441    /// Server name.
442    pub server: String,
443    /// Procedure name.
444    pub procedure: String,
445    /// Line number.
446    pub line: i32,
447}
448
449/// Server informational message.
450#[derive(Debug, Clone)]
451pub struct ServerInfo {
452    /// Info number.
453    pub number: i32,
454    /// Info state.
455    pub state: u8,
456    /// Info class (severity).
457    pub class: u8,
458    /// Info message text.
459    pub message: String,
460    /// Server name.
461    pub server: String,
462    /// Procedure name.
463    pub procedure: String,
464    /// Line number.
465    pub line: i32,
466}
467
468/// Login acknowledgment token.
469#[derive(Debug, Clone)]
470pub struct LoginAck {
471    /// Interface type.
472    pub interface: u8,
473    /// TDS version.
474    pub tds_version: u32,
475    /// Program name.
476    pub prog_name: String,
477    /// Program version.
478    pub prog_version: u32,
479}
480
481/// Environment change token.
482#[derive(Debug, Clone)]
483pub struct EnvChange {
484    /// Type of environment change.
485    pub env_type: EnvChangeType,
486    /// New value.
487    pub new_value: EnvChangeValue,
488    /// Old value.
489    pub old_value: EnvChangeValue,
490}
491
492/// Environment change type.
493#[derive(Debug, Clone, Copy, PartialEq, Eq)]
494#[repr(u8)]
495#[non_exhaustive]
496pub enum EnvChangeType {
497    /// Database changed.
498    Database = 1,
499    /// Language changed.
500    Language = 2,
501    /// Character set changed.
502    CharacterSet = 3,
503    /// Packet size changed.
504    PacketSize = 4,
505    /// Unicode data sorting locale ID.
506    UnicodeSortingLocalId = 5,
507    /// Unicode comparison flags.
508    UnicodeComparisonFlags = 6,
509    /// SQL collation.
510    SqlCollation = 7,
511    /// Begin transaction.
512    BeginTransaction = 8,
513    /// Commit transaction.
514    CommitTransaction = 9,
515    /// Rollback transaction.
516    RollbackTransaction = 10,
517    /// Enlist DTC transaction.
518    EnlistDtcTransaction = 11,
519    /// Defect DTC transaction.
520    DefectTransaction = 12,
521    /// Real-time log shipping.
522    RealTimeLogShipping = 13,
523    /// Promote transaction.
524    PromoteTransaction = 15,
525    /// Transaction manager address.
526    TransactionManagerAddress = 16,
527    /// Transaction ended.
528    TransactionEnded = 17,
529    /// Reset connection completion acknowledgment.
530    ResetConnectionCompletionAck = 18,
531    /// User instance started.
532    UserInstanceStarted = 19,
533    /// Routing information.
534    Routing = 20,
535}
536
537/// Environment change value.
538#[derive(Debug, Clone)]
539#[non_exhaustive]
540pub enum EnvChangeValue {
541    /// String value.
542    String(String),
543    /// Binary value.
544    Binary(bytes::Bytes),
545    /// Routing information.
546    Routing {
547        /// Host name.
548        host: String,
549        /// Port number.
550        port: u16,
551    },
552}
553
554/// Column ordering information.
555#[derive(Debug, Clone)]
556pub struct Order {
557    /// Ordered column indices.
558    pub columns: Vec<u16>,
559}
560
561/// Feature extension acknowledgment.
562#[derive(Debug, Clone)]
563pub struct FeatureExtAck {
564    /// Acknowledged features.
565    pub features: Vec<FeatureAck>,
566}
567
568/// Individual feature acknowledgment.
569#[derive(Debug, Clone)]
570pub struct FeatureAck {
571    /// Feature ID.
572    pub feature_id: u8,
573    /// Feature data.
574    pub data: bytes::Bytes,
575}
576
577/// SSPI authentication token.
578#[derive(Debug, Clone)]
579pub struct SspiToken {
580    /// SSPI data.
581    pub data: bytes::Bytes,
582}
583
584/// Session state token.
585#[derive(Debug, Clone)]
586pub struct SessionState {
587    /// Session state data.
588    pub data: bytes::Bytes,
589}
590
591/// Federated authentication info.
592#[derive(Debug, Clone)]
593pub struct FedAuthInfo {
594    /// STS URL.
595    pub sts_url: String,
596    /// Service principal name.
597    pub spn: String,
598}
599
600// =============================================================================
601// ColMetaData and Row Parsing Implementation
602// =============================================================================
603
604/// Decode collation information (5 bytes).
605///
606/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
607pub(crate) fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
608    if src.remaining() < 5 {
609        return Err(ProtocolError::UnexpectedEof);
610    }
611    // Collation: LCID (4 bytes) + Sort ID (1 byte)
612    let lcid = src.get_u32_le();
613    let sort_id = src.get_u8();
614    Ok(Collation { lcid, sort_id })
615}
616
617/// Decode type-specific metadata for a column based on its TypeId.
618///
619/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
620pub(crate) fn decode_type_info(
621    src: &mut impl Buf,
622    type_id: TypeId,
623    col_type: u8,
624) -> Result<TypeInfo, ProtocolError> {
625    match type_id {
626        // Fixed-length types have no additional metadata
627        TypeId::Null => Ok(TypeInfo::default()),
628        TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
629        TypeId::Int2 => Ok(TypeInfo::default()),
630        TypeId::Int4 => Ok(TypeInfo::default()),
631        TypeId::Int8 => Ok(TypeInfo::default()),
632        TypeId::Float4 => Ok(TypeInfo::default()),
633        TypeId::Float8 => Ok(TypeInfo::default()),
634        TypeId::Money => Ok(TypeInfo::default()),
635        TypeId::Money4 => Ok(TypeInfo::default()),
636        TypeId::DateTime => Ok(TypeInfo::default()),
637        TypeId::DateTime4 => Ok(TypeInfo::default()),
638
639        // Variable length integer/float/money (1-byte max length)
640        TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
641            if src.remaining() < 1 {
642                return Err(ProtocolError::UnexpectedEof);
643            }
644            let max_length = src.get_u8() as u32;
645            Ok(TypeInfo {
646                max_length: Some(max_length),
647                ..Default::default()
648            })
649        }
650
651        // GUID has 1-byte length
652        TypeId::Guid => {
653            if src.remaining() < 1 {
654                return Err(ProtocolError::UnexpectedEof);
655            }
656            let max_length = src.get_u8() as u32;
657            Ok(TypeInfo {
658                max_length: Some(max_length),
659                ..Default::default()
660            })
661        }
662
663        // Decimal/Numeric types (1-byte length + precision + scale)
664        TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
665            if src.remaining() < 3 {
666                return Err(ProtocolError::UnexpectedEof);
667            }
668            let max_length = src.get_u8() as u32;
669            let precision = src.get_u8();
670            let scale = src.get_u8();
671            Ok(TypeInfo {
672                max_length: Some(max_length),
673                precision: Some(precision),
674                scale: Some(scale),
675                ..Default::default()
676            })
677        }
678
679        // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
680        TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
681            if src.remaining() < 1 {
682                return Err(ProtocolError::UnexpectedEof);
683            }
684            let max_length = src.get_u8() as u32;
685            Ok(TypeInfo {
686                max_length: Some(max_length),
687                ..Default::default()
688            })
689        }
690
691        // Big varchar/binary with 2-byte length + collation for strings
692        TypeId::BigVarChar | TypeId::BigChar => {
693            if src.remaining() < 7 {
694                // 2 (length) + 5 (collation)
695                return Err(ProtocolError::UnexpectedEof);
696            }
697            let max_length = src.get_u16_le() as u32;
698            let collation = decode_collation(src)?;
699            Ok(TypeInfo {
700                max_length: Some(max_length),
701                collation: Some(collation),
702                ..Default::default()
703            })
704        }
705
706        // Big binary (2-byte length, no collation)
707        TypeId::BigVarBinary | TypeId::BigBinary => {
708            if src.remaining() < 2 {
709                return Err(ProtocolError::UnexpectedEof);
710            }
711            let max_length = src.get_u16_le() as u32;
712            Ok(TypeInfo {
713                max_length: Some(max_length),
714                ..Default::default()
715            })
716        }
717
718        // Unicode strings (NChar, NVarChar) - 2-byte length + collation
719        TypeId::NChar | TypeId::NVarChar => {
720            if src.remaining() < 7 {
721                // 2 (length) + 5 (collation)
722                return Err(ProtocolError::UnexpectedEof);
723            }
724            let max_length = src.get_u16_le() as u32;
725            let collation = decode_collation(src)?;
726            Ok(TypeInfo {
727                max_length: Some(max_length),
728                collation: Some(collation),
729                ..Default::default()
730            })
731        }
732
733        // Date type (no additional metadata)
734        TypeId::Date => Ok(TypeInfo::default()),
735
736        // Time, DateTime2, DateTimeOffset have scale
737        TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
738            if src.remaining() < 1 {
739                return Err(ProtocolError::UnexpectedEof);
740            }
741            let scale = src.get_u8();
742            Ok(TypeInfo {
743                scale: Some(scale),
744                ..Default::default()
745            })
746        }
747
748        // Text/NText/Image (deprecated LOB types)
749        TypeId::Text | TypeId::NText | TypeId::Image => {
750            // These have complex metadata: length (4) + collation (5) + table name parts
751            if src.remaining() < 4 {
752                return Err(ProtocolError::UnexpectedEof);
753            }
754            let max_length = src.get_u32_le();
755
756            // For Text/NText, read collation
757            let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
758                if src.remaining() < 5 {
759                    return Err(ProtocolError::UnexpectedEof);
760                }
761                Some(decode_collation(src)?)
762            } else {
763                None
764            };
765
766            // Skip table name parts (variable length)
767            // Format: numParts (1 byte) followed by us_varchar for each part
768            if src.remaining() < 1 {
769                return Err(ProtocolError::UnexpectedEof);
770            }
771            let num_parts = src.get_u8();
772            for _ in 0..num_parts {
773                // Read and discard table name part
774                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
775            }
776
777            Ok(TypeInfo {
778                max_length: Some(max_length),
779                collation,
780                ..Default::default()
781            })
782        }
783
784        // XML type
785        TypeId::Xml => {
786            if src.remaining() < 1 {
787                return Err(ProtocolError::UnexpectedEof);
788            }
789            let schema_present = src.get_u8();
790
791            if schema_present != 0 {
792                // XML_INFO per MS-TDS §2.2.5.5.3: DBNAME and OWNING_SCHEMA are
793                // B_VARCHAR (1-byte length); only XML_SCHEMA_COLLECTION is
794                // US_VARCHAR (2-byte length).
795                let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
796                let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
797                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
798            }
799
800            Ok(TypeInfo::default())
801        }
802
803        // UDT (User-defined type) - complex metadata
804        TypeId::Udt => {
805            // Max length (2 bytes)
806            if src.remaining() < 2 {
807                return Err(ProtocolError::UnexpectedEof);
808            }
809            let max_length = src.get_u16_le() as u32;
810
811            // UDT_INFO per MS-TDS: DB_NAME, SCHEMA_NAME, and TYPE_NAME are
812            // B_VARCHAR (1-byte length); only ASSEMBLY_QUALIFIED_NAME is
813            // US_VARCHAR (2-byte length).
814            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
815            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
816            let _ = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
817            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
818
819            Ok(TypeInfo {
820                max_length: Some(max_length),
821                ..Default::default()
822            })
823        }
824
825        // Table-valued parameter - complex metadata (skip for now)
826        TypeId::Tvp => {
827            // TVP has very complex metadata, not commonly used
828            // For now, we can't properly parse this
829            Err(ProtocolError::InvalidTokenType(col_type))
830        }
831
832        // SQL Variant - 4-byte length
833        TypeId::Variant => {
834            if src.remaining() < 4 {
835                return Err(ProtocolError::UnexpectedEof);
836            }
837            let max_length = src.get_u32_le();
838            Ok(TypeInfo {
839                max_length: Some(max_length),
840                ..Default::default()
841            })
842        }
843    }
844}
845
846impl ColMetaData {
847    /// Special value indicating no metadata.
848    pub const NO_METADATA: u16 = 0xFFFF;
849
850    /// Decode a COLMETADATA token from bytes.
851    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
852        if src.remaining() < 2 {
853            return Err(ProtocolError::UnexpectedEof);
854        }
855
856        let column_count = src.get_u16_le();
857
858        // 0xFFFF means no metadata present
859        if column_count == Self::NO_METADATA {
860            return Ok(Self {
861                columns: Vec::new(),
862                cek_table: None,
863            });
864        }
865
866        let mut columns = Vec::with_capacity(column_count as usize);
867
868        for _ in 0..column_count {
869            let column = Self::decode_column(src)?;
870            columns.push(column);
871        }
872
873        Ok(Self {
874            columns,
875            cek_table: None,
876        })
877    }
878
879    /// Decode an ALTMETADATA (COMPUTE BY) token body, returning the compute Id
880    /// and the column metadata describing the matching ALTROW values.
881    ///
882    /// The token type byte (0x88) must already be consumed. ALTMETADATA was
883    /// deprecated in TDS 7.4, but legacy `COMPUTE BY` queries still emit it. The
884    /// driver parses it only to learn the ALTROW value layout so those rows can
885    /// be consumed and dropped, keeping the token stream byte-aligned.
886    ///
887    /// Wire format (MS-TDS 2.2.7.1):
888    /// `Count(USHORT) Id(USHORT) ByCols(UCHAR) ByCols*ColNum(USHORT) Count*ComputeData`,
889    /// where each `ComputeData` is `Op(BYTE) Operand(USHORT)` followed by a
890    /// column definition identical to COLMETADATA (TableName is never sent for
891    /// COMPUTE, which excludes text/ntext/image columns).
892    fn decode_alt(src: &mut impl Buf) -> Result<(u16, Self), ProtocolError> {
893        // Count (USHORT) + Id (USHORT) + ByCols (UCHAR)
894        if src.remaining() < 5 {
895            return Err(ProtocolError::UnexpectedEof);
896        }
897        let count = src.get_u16_le();
898        let id = src.get_u16_le();
899        let by_cols = src.get_u8() as usize;
900
901        // ColNum: USHORT repeated ByCols times (the grouping columns). Not needed
902        // to parse ALTROW values, but must be consumed to stay byte-aligned.
903        if src.remaining() < by_cols * 2 {
904            return Err(ProtocolError::UnexpectedEof);
905        }
906        for _ in 0..by_cols {
907            let _ = src.get_u16_le();
908        }
909
910        let mut columns = Vec::with_capacity(count as usize);
911        for _ in 0..count {
912            // ComputeData prefix: Op (BYTE) + Operand (USHORT).
913            if src.remaining() < 3 {
914                return Err(ProtocolError::UnexpectedEof);
915            }
916            let _op = src.get_u8();
917            let _operand = src.get_u16_le();
918            columns.push(Self::decode_column(src)?);
919        }
920
921        Ok((
922            id,
923            Self {
924                columns,
925                cek_table: None,
926            },
927        ))
928    }
929
930    /// Decode a single column from the metadata.
931    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
932        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
933        if src.remaining() < 7 {
934            return Err(ProtocolError::UnexpectedEof);
935        }
936
937        let user_type = src.get_u32_le();
938        let flags = src.get_u16_le();
939        let col_type = src.get_u8();
940
941        // An unknown type byte must be a hard error: treating it as Null (a
942        // zero-length column) misaligns every subsequent column and row,
943        // producing plausible garbage values (issue #157).
944        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
945
946        // Parse type-specific metadata
947        let type_info = decode_type_info(src, type_id, col_type)?;
948
949        // Read column name (B_VARCHAR format - 1 byte length in characters)
950        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
951
952        Ok(ColumnData {
953            name,
954            type_id,
955            col_type,
956            flags,
957            user_type,
958            type_info,
959            crypto_metadata: None,
960        })
961    }
962
963    /// Decode a COLMETADATA token with Always Encrypted support.
964    ///
965    /// When column encryption was negotiated in Login7, the server sends a CekTable
966    /// before column definitions and per-column CryptoMetadata for encrypted columns.
967    ///
968    /// # Wire Format (with encryption)
969    ///
970    /// ```text
971    /// column_count: USHORT
972    /// cek_table: CekTable (always present when encryption negotiated)
973    /// columns: ColumnData[column_count] (with CryptoMetadata for encrypted columns)
974    /// ```
975    pub fn decode_encrypted(src: &mut impl Buf) -> Result<Self, ProtocolError> {
976        if src.remaining() < 2 {
977            return Err(ProtocolError::UnexpectedEof);
978        }
979
980        let column_count = src.get_u16_le();
981
982        if column_count == Self::NO_METADATA {
983            return Ok(Self {
984                columns: Vec::new(),
985                cek_table: None,
986            });
987        }
988
989        // Parse CEK table (always present when encryption was negotiated)
990        let cek_table = crate::crypto::CekTable::decode(src)?;
991
992        let mut columns = Vec::with_capacity(column_count as usize);
993
994        for _ in 0..column_count {
995            let column = Self::decode_column_encrypted(src)?;
996            columns.push(column);
997        }
998
999        Ok(Self {
1000            columns,
1001            cek_table: Some(cek_table),
1002        })
1003    }
1004
1005    /// Decode a single column definition with Always Encrypted support.
1006    ///
1007    /// For encrypted columns (flags & 0x0800), parses CryptoMetadata after the type info.
1008    fn decode_column_encrypted(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
1009        if src.remaining() < 7 {
1010            return Err(ProtocolError::UnexpectedEof);
1011        }
1012
1013        let user_type = src.get_u32_le();
1014        let flags = src.get_u16_le();
1015        let col_type = src.get_u8();
1016
1017        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
1018
1019        // Parse type-specific metadata (for encrypted columns, this is the transport type)
1020        let type_info = decode_type_info(src, type_id, col_type)?;
1021
1022        // Parse CryptoMetadata if the column is encrypted
1023        let crypto_metadata = if crate::crypto::is_column_encrypted(flags) {
1024            Some(crate::crypto::CryptoMetadata::decode(src)?)
1025        } else {
1026            None
1027        };
1028
1029        // Read column name
1030        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1031
1032        Ok(ColumnData {
1033            name,
1034            type_id,
1035            col_type,
1036            flags,
1037            user_type,
1038            type_info,
1039            crypto_metadata,
1040        })
1041    }
1042
1043    /// Get the number of columns.
1044    #[must_use]
1045    pub fn column_count(&self) -> usize {
1046        self.columns.len()
1047    }
1048
1049    /// Check if this represents no metadata.
1050    #[must_use]
1051    pub fn is_empty(&self) -> bool {
1052        self.columns.is_empty()
1053    }
1054}
1055
1056impl ColumnData {
1057    /// Check if this column is nullable.
1058    #[must_use]
1059    pub fn is_nullable(&self) -> bool {
1060        (self.flags & 0x0001) != 0
1061    }
1062
1063    /// Get the fixed size in bytes for this column, if applicable.
1064    ///
1065    /// Returns `None` for variable-length types.
1066    #[must_use]
1067    pub fn fixed_size(&self) -> Option<usize> {
1068        match self.type_id {
1069            TypeId::Null => Some(0),
1070            TypeId::Int1 | TypeId::Bit => Some(1),
1071            TypeId::Int2 => Some(2),
1072            TypeId::Int4 => Some(4),
1073            TypeId::Int8 => Some(8),
1074            TypeId::Float4 => Some(4),
1075            TypeId::Float8 => Some(8),
1076            TypeId::Money => Some(8),
1077            TypeId::Money4 => Some(4),
1078            TypeId::DateTime => Some(8),
1079            TypeId::DateTime4 => Some(4),
1080            TypeId::Date => Some(3),
1081            _ => None,
1082        }
1083    }
1084}
1085
1086// =============================================================================
1087// Row Parsing Implementation
1088// =============================================================================
1089
1090impl RawRow {
1091    /// Decode a ROW token from bytes.
1092    ///
1093    /// This function requires the column metadata to know how to parse the row.
1094    /// The row data is stored as raw bytes for later parsing.
1095    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1096        let mut data = bytes::BytesMut::new();
1097
1098        for col in &metadata.columns {
1099            Self::decode_column_value(src, col, &mut data)?;
1100        }
1101
1102        Ok(Self {
1103            data: data.freeze(),
1104        })
1105    }
1106
1107    /// Decode only the first `prefix_len` columns of a ROW token, leaving `src`
1108    /// positioned at the start of column `prefix_len`.
1109    ///
1110    /// Used by the BLOB streaming path to decode the leading scalar columns of a
1111    /// row and stop at a trailing MAX column, whose PLP value is then streamed
1112    /// directly from the socket rather than buffered.
1113    pub fn decode_prefix(
1114        src: &mut impl Buf,
1115        metadata: &ColMetaData,
1116        prefix_len: usize,
1117    ) -> Result<Self, ProtocolError> {
1118        let mut data = bytes::BytesMut::new();
1119        for col in metadata.columns.iter().take(prefix_len) {
1120            Self::decode_column_value(src, col, &mut data)?;
1121        }
1122        Ok(Self {
1123            data: data.freeze(),
1124        })
1125    }
1126
1127    /// Decode a single column value and append to the output buffer.
1128    fn decode_column_value(
1129        src: &mut impl Buf,
1130        col: &ColumnData,
1131        dst: &mut bytes::BytesMut,
1132    ) -> Result<(), ProtocolError> {
1133        match col.type_id {
1134            // Fixed-length types
1135            TypeId::Null => {
1136                // No data
1137            }
1138            TypeId::Int1 | TypeId::Bit => {
1139                if src.remaining() < 1 {
1140                    return Err(ProtocolError::UnexpectedEof);
1141                }
1142                dst.extend_from_slice(&[src.get_u8()]);
1143            }
1144            TypeId::Int2 => {
1145                if src.remaining() < 2 {
1146                    return Err(ProtocolError::UnexpectedEof);
1147                }
1148                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
1149            }
1150            TypeId::Int4 => {
1151                if src.remaining() < 4 {
1152                    return Err(ProtocolError::UnexpectedEof);
1153                }
1154                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1155            }
1156            TypeId::Int8 => {
1157                if src.remaining() < 8 {
1158                    return Err(ProtocolError::UnexpectedEof);
1159                }
1160                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1161            }
1162            TypeId::Float4 => {
1163                if src.remaining() < 4 {
1164                    return Err(ProtocolError::UnexpectedEof);
1165                }
1166                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1167            }
1168            TypeId::Float8 => {
1169                if src.remaining() < 8 {
1170                    return Err(ProtocolError::UnexpectedEof);
1171                }
1172                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1173            }
1174            TypeId::Money => {
1175                if src.remaining() < 8 {
1176                    return Err(ProtocolError::UnexpectedEof);
1177                }
1178                let hi = src.get_u32_le();
1179                let lo = src.get_u32_le();
1180                dst.extend_from_slice(&hi.to_le_bytes());
1181                dst.extend_from_slice(&lo.to_le_bytes());
1182            }
1183            TypeId::Money4 => {
1184                if src.remaining() < 4 {
1185                    return Err(ProtocolError::UnexpectedEof);
1186                }
1187                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1188            }
1189            TypeId::DateTime => {
1190                if src.remaining() < 8 {
1191                    return Err(ProtocolError::UnexpectedEof);
1192                }
1193                let days = src.get_u32_le();
1194                let time = src.get_u32_le();
1195                dst.extend_from_slice(&days.to_le_bytes());
1196                dst.extend_from_slice(&time.to_le_bytes());
1197            }
1198            TypeId::DateTime4 => {
1199                if src.remaining() < 4 {
1200                    return Err(ProtocolError::UnexpectedEof);
1201                }
1202                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1203            }
1204            // DATE type uses 1-byte length prefix (can be NULL)
1205            TypeId::Date => {
1206                Self::decode_bytelen_type(src, dst)?;
1207            }
1208
1209            // Variable-length nullable types (length-prefixed)
1210            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
1211                Self::decode_bytelen_type(src, dst)?;
1212            }
1213
1214            TypeId::Guid => {
1215                Self::decode_bytelen_type(src, dst)?;
1216            }
1217
1218            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1219                Self::decode_bytelen_type(src, dst)?;
1220            }
1221
1222            // Old-style byte-length strings
1223            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
1224                Self::decode_bytelen_type(src, dst)?;
1225            }
1226
1227            // 2-byte length strings (or PLP for MAX types)
1228            TypeId::BigVarChar | TypeId::BigVarBinary => {
1229                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
1230                if col.type_info.max_length == Some(0xFFFF) {
1231                    Self::decode_plp_type(src, dst)?;
1232                } else {
1233                    Self::decode_ushortlen_type(src, dst)?;
1234                }
1235            }
1236
1237            // Fixed-length types that don't have MAX variants
1238            TypeId::BigChar | TypeId::BigBinary => {
1239                Self::decode_ushortlen_type(src, dst)?;
1240            }
1241
1242            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
1243            TypeId::NVarChar => {
1244                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
1245                if col.type_info.max_length == Some(0xFFFF) {
1246                    Self::decode_plp_type(src, dst)?;
1247                } else {
1248                    Self::decode_ushortlen_type(src, dst)?;
1249                }
1250            }
1251
1252            // Fixed-length NCHAR doesn't have MAX variant
1253            TypeId::NChar => {
1254                Self::decode_ushortlen_type(src, dst)?;
1255            }
1256
1257            // Time types with scale
1258            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
1259                Self::decode_bytelen_type(src, dst)?;
1260            }
1261
1262            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
1263            TypeId::Text | TypeId::NText | TypeId::Image => {
1264                Self::decode_textptr_type(src, dst)?;
1265            }
1266
1267            // XML - uses actual PLP format
1268            TypeId::Xml => {
1269                Self::decode_plp_type(src, dst)?;
1270            }
1271
1272            // Complex types
1273            TypeId::Variant => {
1274                Self::decode_intlen_type(src, dst)?;
1275            }
1276
1277            TypeId::Udt => {
1278                // UDT uses PLP encoding
1279                Self::decode_plp_type(src, dst)?;
1280            }
1281
1282            TypeId::Tvp => {
1283                // TVP not supported in row data
1284                return Err(ProtocolError::InvalidTokenType(col.col_type));
1285            }
1286        }
1287
1288        Ok(())
1289    }
1290
1291    /// Decode a 1-byte length-prefixed value.
1292    fn decode_bytelen_type(
1293        src: &mut impl Buf,
1294        dst: &mut bytes::BytesMut,
1295    ) -> Result<(), ProtocolError> {
1296        if src.remaining() < 1 {
1297            return Err(ProtocolError::UnexpectedEof);
1298        }
1299        let len = src.get_u8() as usize;
1300        if len == 0xFF {
1301            // NULL value - store as zero-length with NULL marker
1302            dst.extend_from_slice(&[0xFF]);
1303        } else if len == 0 {
1304            // Empty value
1305            dst.extend_from_slice(&[0x00]);
1306        } else {
1307            if src.remaining() < len {
1308                return Err(ProtocolError::UnexpectedEof);
1309            }
1310            dst.extend_from_slice(&[len as u8]);
1311            for _ in 0..len {
1312                dst.extend_from_slice(&[src.get_u8()]);
1313            }
1314        }
1315        Ok(())
1316    }
1317
1318    /// Decode a 2-byte length-prefixed value.
1319    fn decode_ushortlen_type(
1320        src: &mut impl Buf,
1321        dst: &mut bytes::BytesMut,
1322    ) -> Result<(), ProtocolError> {
1323        if src.remaining() < 2 {
1324            return Err(ProtocolError::UnexpectedEof);
1325        }
1326        let len = src.get_u16_le() as usize;
1327        if len == 0xFFFF {
1328            // NULL value
1329            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1330        } else if len == 0 {
1331            // Empty value
1332            dst.extend_from_slice(&0u16.to_le_bytes());
1333        } else {
1334            if src.remaining() < len {
1335                return Err(ProtocolError::UnexpectedEof);
1336            }
1337            dst.extend_from_slice(&(len as u16).to_le_bytes());
1338            for _ in 0..len {
1339                dst.extend_from_slice(&[src.get_u8()]);
1340            }
1341        }
1342        Ok(())
1343    }
1344
1345    /// Decode a 4-byte length-prefixed value.
1346    fn decode_intlen_type(
1347        src: &mut impl Buf,
1348        dst: &mut bytes::BytesMut,
1349    ) -> Result<(), ProtocolError> {
1350        if src.remaining() < 4 {
1351            return Err(ProtocolError::UnexpectedEof);
1352        }
1353        let len = src.get_u32_le() as usize;
1354        if len == 0xFFFFFFFF {
1355            // NULL value
1356            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1357        } else if len == 0 {
1358            // Empty value
1359            dst.extend_from_slice(&0u32.to_le_bytes());
1360        } else {
1361            if src.remaining() < len {
1362                return Err(ProtocolError::UnexpectedEof);
1363            }
1364            dst.extend_from_slice(&(len as u32).to_le_bytes());
1365            for _ in 0..len {
1366                dst.extend_from_slice(&[src.get_u8()]);
1367            }
1368        }
1369        Ok(())
1370    }
1371
1372    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1373    ///
1374    /// These deprecated LOB types use a special format:
1375    /// - 1 byte: textptr_len (0 = NULL)
1376    /// - textptr_len bytes: textptr (if not NULL)
1377    /// - 8 bytes: timestamp (if not NULL)
1378    /// - 4 bytes: data length (if not NULL)
1379    /// - data_len bytes: the actual data (if not NULL)
1380    ///
1381    /// We convert this to PLP format for the client to parse:
1382    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1383    /// - 4 bytes: chunk length (= data length)
1384    /// - chunk data
1385    /// - 4 bytes: 0 (terminator)
1386    fn decode_textptr_type(
1387        src: &mut impl Buf,
1388        dst: &mut bytes::BytesMut,
1389    ) -> Result<(), ProtocolError> {
1390        if src.remaining() < 1 {
1391            return Err(ProtocolError::UnexpectedEof);
1392        }
1393
1394        let textptr_len = src.get_u8() as usize;
1395
1396        if textptr_len == 0 {
1397            // NULL value - write PLP NULL marker
1398            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1399            return Ok(());
1400        }
1401
1402        // Skip textptr bytes
1403        if src.remaining() < textptr_len {
1404            return Err(ProtocolError::UnexpectedEof);
1405        }
1406        src.advance(textptr_len);
1407
1408        // Skip 8-byte timestamp
1409        if src.remaining() < 8 {
1410            return Err(ProtocolError::UnexpectedEof);
1411        }
1412        src.advance(8);
1413
1414        // Read data length
1415        if src.remaining() < 4 {
1416            return Err(ProtocolError::UnexpectedEof);
1417        }
1418        let data_len = src.get_u32_le() as usize;
1419
1420        if src.remaining() < data_len {
1421            return Err(ProtocolError::UnexpectedEof);
1422        }
1423
1424        // Write in PLP format for client parsing:
1425        // - 8 bytes: total length
1426        // - 4 bytes: chunk length
1427        // - chunk data
1428        // - 4 bytes: 0 (terminator)
1429        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1430        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1431        for _ in 0..data_len {
1432            dst.extend_from_slice(&[src.get_u8()]);
1433        }
1434        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1435
1436        Ok(())
1437    }
1438
1439    /// Decode a PLP (Partially Length-Prefixed) value.
1440    ///
1441    /// PLP format:
1442    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1443    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1444    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1445        if src.remaining() < 8 {
1446            return Err(ProtocolError::UnexpectedEof);
1447        }
1448
1449        let total_len = src.get_u64_le();
1450
1451        // Store the total length marker
1452        dst.extend_from_slice(&total_len.to_le_bytes());
1453
1454        if total_len == 0xFFFFFFFFFFFFFFFF {
1455            // NULL value - no more data
1456            return Ok(());
1457        }
1458
1459        // Read chunks until terminator
1460        loop {
1461            if src.remaining() < 4 {
1462                return Err(ProtocolError::UnexpectedEof);
1463            }
1464            let chunk_len = src.get_u32_le() as usize;
1465            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1466
1467            if chunk_len == 0 {
1468                // End of PLP data
1469                break;
1470            }
1471
1472            if src.remaining() < chunk_len {
1473                return Err(ProtocolError::UnexpectedEof);
1474            }
1475
1476            for _ in 0..chunk_len {
1477                dst.extend_from_slice(&[src.get_u8()]);
1478            }
1479        }
1480
1481        Ok(())
1482    }
1483}
1484
1485// =============================================================================
1486// NbcRow Parsing Implementation
1487// =============================================================================
1488
1489impl NbcRow {
1490    /// Decode an NBCROW token from bytes.
1491    ///
1492    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1493    /// columns are NULL, followed by only the non-NULL values.
1494    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1495        let col_count = metadata.columns.len();
1496        let bitmap_len = col_count.div_ceil(8);
1497
1498        if src.remaining() < bitmap_len {
1499            return Err(ProtocolError::UnexpectedEof);
1500        }
1501
1502        // Read null bitmap
1503        let mut null_bitmap = vec![0u8; bitmap_len];
1504        for byte in &mut null_bitmap {
1505            *byte = src.get_u8();
1506        }
1507
1508        // Read non-null values
1509        let mut data = bytes::BytesMut::new();
1510
1511        for (i, col) in metadata.columns.iter().enumerate() {
1512            let byte_idx = i / 8;
1513            let bit_idx = i % 8;
1514            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1515
1516            if !is_null {
1517                // Read the value - for NBCROW, we read without the length prefix
1518                // for fixed-length types, and with length prefix for variable types
1519                RawRow::decode_column_value(src, col, &mut data)?;
1520            }
1521        }
1522
1523        Ok(Self {
1524            null_bitmap,
1525            data: data.freeze(),
1526        })
1527    }
1528
1529    /// Decode the null bitmap and the first `prefix_len` columns of an NBCROW,
1530    /// leaving `src` positioned at column `prefix_len`'s value (when that column
1531    /// is non-NULL per the bitmap). The returned row carries the full bitmap and
1532    /// the leading non-NULL values; query the trailing column's nullness with
1533    /// [`is_null`](Self::is_null).
1534    pub fn decode_prefix(
1535        src: &mut impl Buf,
1536        metadata: &ColMetaData,
1537        prefix_len: usize,
1538    ) -> Result<Self, ProtocolError> {
1539        let col_count = metadata.columns.len();
1540        let bitmap_len = col_count.div_ceil(8);
1541
1542        if src.remaining() < bitmap_len {
1543            return Err(ProtocolError::UnexpectedEof);
1544        }
1545
1546        let mut null_bitmap = vec![0u8; bitmap_len];
1547        for byte in &mut null_bitmap {
1548            *byte = src.get_u8();
1549        }
1550
1551        let mut data = bytes::BytesMut::new();
1552        for (i, col) in metadata.columns.iter().enumerate().take(prefix_len) {
1553            let is_null = (null_bitmap[i / 8] & (1 << (i % 8))) != 0;
1554            if !is_null {
1555                RawRow::decode_column_value(src, col, &mut data)?;
1556            }
1557        }
1558
1559        Ok(Self {
1560            null_bitmap,
1561            data: data.freeze(),
1562        })
1563    }
1564
1565    /// Check if a column at the given index is NULL.
1566    #[must_use]
1567    pub fn is_null(&self, column_index: usize) -> bool {
1568        let byte_idx = column_index / 8;
1569        let bit_idx = column_index % 8;
1570        if byte_idx < self.null_bitmap.len() {
1571            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1572        } else {
1573            true // Out of bounds = NULL
1574        }
1575    }
1576}
1577
1578// =============================================================================
1579// ReturnValue Parsing Implementation
1580// =============================================================================
1581
1582impl ReturnValue {
1583    /// Decode a RETURNVALUE token from bytes.
1584    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1585        // MS-TDS §2.2.7.18: the RETURNVALUE token has no length prefix —
1586        // it begins directly with the 2-byte ParamOrdinal. The previous
1587        // spurious 2-byte read consumed the ordinal and shifted every
1588        // subsequent field, leaving the stream parser two bytes ahead and
1589        // reading value bytes as the next token type (e.g. `0x74` from a
1590        // Unicode name fragment was misread as an unknown token).
1591        if src.remaining() < 2 {
1592            return Err(ProtocolError::UnexpectedEof);
1593        }
1594        let param_ordinal = src.get_u16_le();
1595
1596        // Parameter name (B_VARCHAR)
1597        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1598
1599        // Status (1 byte)
1600        if src.remaining() < 1 {
1601            return Err(ProtocolError::UnexpectedEof);
1602        }
1603        let status = src.get_u8();
1604
1605        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1606        if src.remaining() < 7 {
1607            return Err(ProtocolError::UnexpectedEof);
1608        }
1609        let user_type = src.get_u32_le();
1610        let flags = src.get_u16_le();
1611        let col_type = src.get_u8();
1612
1613        let type_id = TypeId::from_u8(col_type).ok_or(ProtocolError::InvalidDataType(col_type))?;
1614
1615        // Parse type info
1616        let type_info = decode_type_info(src, type_id, col_type)?;
1617
1618        // Read the value data
1619        let mut value_buf = bytes::BytesMut::new();
1620
1621        // Create a temporary column for value parsing
1622        let temp_col = ColumnData {
1623            name: String::new(),
1624            type_id,
1625            col_type,
1626            flags,
1627            user_type,
1628            type_info: type_info.clone(),
1629            crypto_metadata: None,
1630        };
1631
1632        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1633
1634        Ok(Self {
1635            param_ordinal,
1636            param_name,
1637            status,
1638            user_type,
1639            flags,
1640            col_type,
1641            type_info,
1642            value: value_buf.freeze(),
1643        })
1644    }
1645}
1646
1647// =============================================================================
1648// SessionState Parsing Implementation
1649// =============================================================================
1650
1651impl SessionState {
1652    /// Decode a SESSIONSTATE token from bytes.
1653    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1654        if src.remaining() < 4 {
1655            return Err(ProtocolError::UnexpectedEof);
1656        }
1657
1658        let length = src.get_u32_le() as usize;
1659
1660        if src.remaining() < length {
1661            return Err(ProtocolError::IncompletePacket {
1662                expected: length,
1663                actual: src.remaining(),
1664            });
1665        }
1666
1667        let data = src.copy_to_bytes(length);
1668
1669        Ok(Self { data })
1670    }
1671}
1672
1673// =============================================================================
1674// Token Parsing Implementation
1675// =============================================================================
1676
1677/// Done token status flags bit positions.
1678mod done_status_bits {
1679    pub const DONE_MORE: u16 = 0x0001;
1680    pub const DONE_ERROR: u16 = 0x0002;
1681    pub const DONE_INXACT: u16 = 0x0004;
1682    pub const DONE_COUNT: u16 = 0x0010;
1683    pub const DONE_ATTN: u16 = 0x0020;
1684    pub const DONE_SRVERROR: u16 = 0x0100;
1685}
1686
1687impl DoneStatus {
1688    /// Parse done status from raw bits.
1689    #[must_use]
1690    pub fn from_bits(bits: u16) -> Self {
1691        use done_status_bits::*;
1692        Self {
1693            more: (bits & DONE_MORE) != 0,
1694            error: (bits & DONE_ERROR) != 0,
1695            in_xact: (bits & DONE_INXACT) != 0,
1696            count: (bits & DONE_COUNT) != 0,
1697            attn: (bits & DONE_ATTN) != 0,
1698            srverror: (bits & DONE_SRVERROR) != 0,
1699        }
1700    }
1701
1702    /// Convert to raw bits.
1703    #[must_use]
1704    pub fn to_bits(&self) -> u16 {
1705        use done_status_bits::*;
1706        let mut bits = 0u16;
1707        if self.more {
1708            bits |= DONE_MORE;
1709        }
1710        if self.error {
1711            bits |= DONE_ERROR;
1712        }
1713        if self.in_xact {
1714            bits |= DONE_INXACT;
1715        }
1716        if self.count {
1717            bits |= DONE_COUNT;
1718        }
1719        if self.attn {
1720            bits |= DONE_ATTN;
1721        }
1722        if self.srverror {
1723            bits |= DONE_SRVERROR;
1724        }
1725        bits
1726    }
1727}
1728
1729impl Done {
1730    /// Size of the DONE token in bytes (excluding token type byte).
1731    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1732
1733    /// Decode a DONE token from bytes.
1734    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1735        if src.remaining() < Self::SIZE {
1736            return Err(ProtocolError::IncompletePacket {
1737                expected: Self::SIZE,
1738                actual: src.remaining(),
1739            });
1740        }
1741
1742        let status = DoneStatus::from_bits(src.get_u16_le());
1743        let cur_cmd = src.get_u16_le();
1744        let row_count = src.get_u64_le();
1745
1746        Ok(Self {
1747            status,
1748            cur_cmd,
1749            row_count,
1750        })
1751    }
1752
1753    /// Encode the DONE token to bytes.
1754    pub fn encode(&self, dst: &mut impl BufMut) {
1755        dst.put_u8(TokenType::Done as u8);
1756        dst.put_u16_le(self.status.to_bits());
1757        dst.put_u16_le(self.cur_cmd);
1758        dst.put_u64_le(self.row_count);
1759    }
1760
1761    /// Check if more results follow this DONE token.
1762    #[must_use]
1763    pub const fn has_more(&self) -> bool {
1764        self.status.more
1765    }
1766
1767    /// Check if an error occurred.
1768    #[must_use]
1769    pub const fn has_error(&self) -> bool {
1770        self.status.error
1771    }
1772
1773    /// Check if the row count is valid.
1774    #[must_use]
1775    pub const fn has_count(&self) -> bool {
1776        self.status.count
1777    }
1778}
1779
1780impl DoneProc {
1781    /// Size of the DONEPROC token in bytes (excluding token type byte).
1782    pub const SIZE: usize = 12;
1783
1784    /// Decode a DONEPROC token from bytes.
1785    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1786        if src.remaining() < Self::SIZE {
1787            return Err(ProtocolError::IncompletePacket {
1788                expected: Self::SIZE,
1789                actual: src.remaining(),
1790            });
1791        }
1792
1793        let status = DoneStatus::from_bits(src.get_u16_le());
1794        let cur_cmd = src.get_u16_le();
1795        let row_count = src.get_u64_le();
1796
1797        Ok(Self {
1798            status,
1799            cur_cmd,
1800            row_count,
1801        })
1802    }
1803
1804    /// Encode the DONEPROC token to bytes.
1805    pub fn encode(&self, dst: &mut impl BufMut) {
1806        dst.put_u8(TokenType::DoneProc as u8);
1807        dst.put_u16_le(self.status.to_bits());
1808        dst.put_u16_le(self.cur_cmd);
1809        dst.put_u64_le(self.row_count);
1810    }
1811}
1812
1813impl DoneInProc {
1814    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1815    pub const SIZE: usize = 12;
1816
1817    /// Decode a DONEINPROC token from bytes.
1818    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1819        if src.remaining() < Self::SIZE {
1820            return Err(ProtocolError::IncompletePacket {
1821                expected: Self::SIZE,
1822                actual: src.remaining(),
1823            });
1824        }
1825
1826        let status = DoneStatus::from_bits(src.get_u16_le());
1827        let cur_cmd = src.get_u16_le();
1828        let row_count = src.get_u64_le();
1829
1830        Ok(Self {
1831            status,
1832            cur_cmd,
1833            row_count,
1834        })
1835    }
1836
1837    /// Encode the DONEINPROC token to bytes.
1838    pub fn encode(&self, dst: &mut impl BufMut) {
1839        dst.put_u8(TokenType::DoneInProc as u8);
1840        dst.put_u16_le(self.status.to_bits());
1841        dst.put_u16_le(self.cur_cmd);
1842        dst.put_u64_le(self.row_count);
1843    }
1844}
1845
1846impl ServerError {
1847    /// Decode an ERROR token from bytes.
1848    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1849        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1850        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1851        if src.remaining() < 2 {
1852            return Err(ProtocolError::UnexpectedEof);
1853        }
1854
1855        let _length = src.get_u16_le();
1856
1857        if src.remaining() < 6 {
1858            return Err(ProtocolError::UnexpectedEof);
1859        }
1860
1861        let number = src.get_i32_le();
1862        let state = src.get_u8();
1863        let class = src.get_u8();
1864
1865        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1866        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1867        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1868
1869        if src.remaining() < 4 {
1870            return Err(ProtocolError::UnexpectedEof);
1871        }
1872        let line = src.get_i32_le();
1873
1874        Ok(Self {
1875            number,
1876            state,
1877            class,
1878            message,
1879            server,
1880            procedure,
1881            line,
1882        })
1883    }
1884
1885    /// Check if this is a fatal error (severity >= 20).
1886    #[must_use]
1887    pub const fn is_fatal(&self) -> bool {
1888        self.class >= 20
1889    }
1890
1891    /// Check if this error indicates the batch was aborted (severity >= 16).
1892    #[must_use]
1893    pub const fn is_batch_abort(&self) -> bool {
1894        self.class >= 16
1895    }
1896}
1897
1898impl ServerInfo {
1899    /// Decode an INFO token from bytes.
1900    ///
1901    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1902    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1903        if src.remaining() < 2 {
1904            return Err(ProtocolError::UnexpectedEof);
1905        }
1906
1907        let _length = src.get_u16_le();
1908
1909        if src.remaining() < 6 {
1910            return Err(ProtocolError::UnexpectedEof);
1911        }
1912
1913        let number = src.get_i32_le();
1914        let state = src.get_u8();
1915        let class = src.get_u8();
1916
1917        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1918        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1919        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1920
1921        if src.remaining() < 4 {
1922            return Err(ProtocolError::UnexpectedEof);
1923        }
1924        let line = src.get_i32_le();
1925
1926        Ok(Self {
1927            number,
1928            state,
1929            class,
1930            message,
1931            server,
1932            procedure,
1933            line,
1934        })
1935    }
1936}
1937
1938impl LoginAck {
1939    /// Decode a LOGINACK token from bytes.
1940    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1941        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1942        if src.remaining() < 2 {
1943            return Err(ProtocolError::UnexpectedEof);
1944        }
1945
1946        let _length = src.get_u16_le();
1947
1948        if src.remaining() < 5 {
1949            return Err(ProtocolError::UnexpectedEof);
1950        }
1951
1952        let interface = src.get_u8();
1953        let tds_version = src.get_u32_le();
1954        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1955
1956        if src.remaining() < 4 {
1957            return Err(ProtocolError::UnexpectedEof);
1958        }
1959        let prog_version = src.get_u32_le();
1960
1961        Ok(Self {
1962            interface,
1963            tds_version,
1964            prog_name,
1965            prog_version,
1966        })
1967    }
1968
1969    /// Get the TDS version as a `TdsVersion`.
1970    #[must_use]
1971    pub fn tds_version(&self) -> crate::version::TdsVersion {
1972        crate::version::TdsVersion::new(self.tds_version)
1973    }
1974}
1975
1976impl EnvChangeType {
1977    /// Create from raw byte value.
1978    pub fn from_u8(value: u8) -> Option<Self> {
1979        match value {
1980            1 => Some(Self::Database),
1981            2 => Some(Self::Language),
1982            3 => Some(Self::CharacterSet),
1983            4 => Some(Self::PacketSize),
1984            5 => Some(Self::UnicodeSortingLocalId),
1985            6 => Some(Self::UnicodeComparisonFlags),
1986            7 => Some(Self::SqlCollation),
1987            8 => Some(Self::BeginTransaction),
1988            9 => Some(Self::CommitTransaction),
1989            10 => Some(Self::RollbackTransaction),
1990            11 => Some(Self::EnlistDtcTransaction),
1991            12 => Some(Self::DefectTransaction),
1992            13 => Some(Self::RealTimeLogShipping),
1993            15 => Some(Self::PromoteTransaction),
1994            16 => Some(Self::TransactionManagerAddress),
1995            17 => Some(Self::TransactionEnded),
1996            18 => Some(Self::ResetConnectionCompletionAck),
1997            19 => Some(Self::UserInstanceStarted),
1998            20 => Some(Self::Routing),
1999            _ => None,
2000        }
2001    }
2002}
2003
2004impl EnvChange {
2005    /// Decode an ENVCHANGE token from bytes.
2006    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2007        if src.remaining() < 3 {
2008            return Err(ProtocolError::UnexpectedEof);
2009        }
2010
2011        let length = src.get_u16_le() as usize;
2012        if length == 0 {
2013            // The frame must at least contain the type byte; reading it from
2014            // outside a zero-length frame would consume the next token.
2015            return Err(ProtocolError::UnexpectedEof);
2016        }
2017        if src.remaining() < length {
2018            return Err(ProtocolError::IncompletePacket {
2019                expected: length,
2020                actual: src.remaining(),
2021            });
2022        }
2023
2024        // Frame-strict decoding (issue #145): the value decoders below only
2025        // bounds-check against the *buffer*, so on an under-declared frame
2026        // they could read past the declared length into the next token's
2027        // bytes. Slice exactly the declared frame and decode from that:
2028        // over-read attempts now hit frame end and take the lenient
2029        // empty-value fallbacks (preserving the #140 hostile-input
2030        // behavior), and the outer buffer always advances by exactly
2031        // `length`.
2032        let mut frame = src.copy_to_bytes(length);
2033        let src = &mut frame;
2034
2035        let env_type_byte = src.get_u8();
2036        let env_type = EnvChangeType::from_u8(env_type_byte)
2037            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
2038
2039        let (new_value, old_value) = match env_type {
2040            EnvChangeType::Routing => {
2041                // Routing has special format
2042                let new_value = Self::decode_routing_value(src)?;
2043                let old_value = EnvChangeValue::Binary(Bytes::new());
2044                (new_value, old_value)
2045            }
2046            EnvChangeType::BeginTransaction
2047            | EnvChangeType::CommitTransaction
2048            | EnvChangeType::RollbackTransaction
2049            | EnvChangeType::EnlistDtcTransaction
2050            | EnvChangeType::SqlCollation => {
2051                // These use binary format per MS-TDS spec:
2052                // - Transaction tokens: transaction descriptor (8 bytes)
2053                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
2054                // The declared ENVCHANGE `length` can be shorter than this
2055                // branch needs (e.g. covers only the type byte), so the
2056                // length-prefix reads must be bounds-checked individually:
2057                // `get_u8` on an empty buffer panics. Match the branch's
2058                // existing graceful style — a missing prefix means empty.
2059                let new_len = if src.has_remaining() {
2060                    src.get_u8() as usize
2061                } else {
2062                    0
2063                };
2064                let new_value = if new_len > 0 && src.remaining() >= new_len {
2065                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
2066                } else {
2067                    EnvChangeValue::Binary(Bytes::new())
2068                };
2069
2070                let old_len = if src.has_remaining() {
2071                    src.get_u8() as usize
2072                } else {
2073                    0
2074                };
2075                let old_value = if old_len > 0 && src.remaining() >= old_len {
2076                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
2077                } else {
2078                    EnvChangeValue::Binary(Bytes::new())
2079                };
2080
2081                (new_value, old_value)
2082            }
2083            _ => {
2084                // String format for most env changes
2085                let new_value = read_b_varchar(src)
2086                    .map(EnvChangeValue::String)
2087                    .unwrap_or(EnvChangeValue::String(String::new()));
2088
2089                let old_value = read_b_varchar(src)
2090                    .map(EnvChangeValue::String)
2091                    .unwrap_or(EnvChangeValue::String(String::new()));
2092
2093                (new_value, old_value)
2094            }
2095        };
2096
2097        // No frame-boundary fixup needed: the whole declared frame was
2098        // consumed from the outer buffer up front, so decoders that
2099        // under-consume (e.g. Routing's implicit zero-length OldValue) just
2100        // leave bytes behind in the dropped sub-frame.
2101
2102        Ok(Self {
2103            env_type,
2104            new_value,
2105            old_value,
2106        })
2107    }
2108
2109    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
2110        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
2111        if src.remaining() < 2 {
2112            return Err(ProtocolError::UnexpectedEof);
2113        }
2114
2115        let _routing_len = src.get_u16_le();
2116
2117        if src.remaining() < 5 {
2118            return Err(ProtocolError::UnexpectedEof);
2119        }
2120
2121        let _protocol = src.get_u8();
2122        let port = src.get_u16_le();
2123        let server_len = src.get_u16_le() as usize;
2124
2125        // Read UTF-16LE server name
2126        if src.remaining() < server_len * 2 {
2127            return Err(ProtocolError::UnexpectedEof);
2128        }
2129
2130        let mut chars = Vec::with_capacity(server_len);
2131        for _ in 0..server_len {
2132            chars.push(src.get_u16_le());
2133        }
2134
2135        let host = String::from_utf16(&chars).map_err(|_| {
2136            ProtocolError::StringEncoding(
2137                #[cfg(feature = "std")]
2138                "invalid UTF-16 in routing hostname".to_string(),
2139                #[cfg(not(feature = "std"))]
2140                "invalid UTF-16 in routing hostname",
2141            )
2142        })?;
2143
2144        Ok(EnvChangeValue::Routing { host, port })
2145    }
2146
2147    /// Check if this is a routing redirect.
2148    #[must_use]
2149    pub fn is_routing(&self) -> bool {
2150        self.env_type == EnvChangeType::Routing
2151    }
2152
2153    /// Get routing information if this is a routing change.
2154    #[must_use]
2155    pub fn routing_info(&self) -> Option<(&str, u16)> {
2156        if let EnvChangeValue::Routing { host, port } = &self.new_value {
2157            Some((host, *port))
2158        } else {
2159            None
2160        }
2161    }
2162
2163    /// Get the new database name if this is a database change.
2164    #[must_use]
2165    pub fn new_database(&self) -> Option<&str> {
2166        if self.env_type == EnvChangeType::Database {
2167            if let EnvChangeValue::String(s) = &self.new_value {
2168                return Some(s);
2169            }
2170        }
2171        None
2172    }
2173}
2174
2175impl Order {
2176    /// Decode an ORDER token from bytes.
2177    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2178        if src.remaining() < 2 {
2179            return Err(ProtocolError::UnexpectedEof);
2180        }
2181
2182        let length = src.get_u16_le() as usize;
2183        let column_count = length / 2;
2184
2185        if src.remaining() < length {
2186            return Err(ProtocolError::IncompletePacket {
2187                expected: length,
2188                actual: src.remaining(),
2189            });
2190        }
2191
2192        let mut columns = Vec::with_capacity(column_count);
2193        for _ in 0..column_count {
2194            columns.push(src.get_u16_le());
2195        }
2196
2197        Ok(Self { columns })
2198    }
2199}
2200
2201impl FeatureExtAck {
2202    /// Feature terminator byte.
2203    pub const TERMINATOR: u8 = 0xFF;
2204
2205    /// Decode a FEATUREEXTACK token from bytes.
2206    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2207        let mut features = Vec::new();
2208
2209        loop {
2210            if !src.has_remaining() {
2211                return Err(ProtocolError::UnexpectedEof);
2212            }
2213
2214            let feature_id = src.get_u8();
2215            if feature_id == Self::TERMINATOR {
2216                break;
2217            }
2218
2219            if src.remaining() < 4 {
2220                return Err(ProtocolError::UnexpectedEof);
2221            }
2222
2223            let data_len = src.get_u32_le() as usize;
2224
2225            if src.remaining() < data_len {
2226                return Err(ProtocolError::IncompletePacket {
2227                    expected: data_len,
2228                    actual: src.remaining(),
2229                });
2230            }
2231
2232            let data = src.copy_to_bytes(data_len);
2233            features.push(FeatureAck { feature_id, data });
2234        }
2235
2236        Ok(Self { features })
2237    }
2238}
2239
2240impl SspiToken {
2241    /// Decode an SSPI token from bytes.
2242    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2243        if src.remaining() < 2 {
2244            return Err(ProtocolError::UnexpectedEof);
2245        }
2246
2247        let length = src.get_u16_le() as usize;
2248
2249        if src.remaining() < length {
2250            return Err(ProtocolError::IncompletePacket {
2251                expected: length,
2252                actual: src.remaining(),
2253            });
2254        }
2255
2256        let data = src.copy_to_bytes(length);
2257        Ok(Self { data })
2258    }
2259}
2260
2261impl FedAuthInfo {
2262    /// `FedAuthInfoID` for the STS URL (MS-TDS §2.2.7.12: %0x01 = STSURL).
2263    const ID_STSURL: u8 = 0x01;
2264    /// `FedAuthInfoID` for the service principal name (MS-TDS §2.2.7.12:
2265    /// %0x02 = SPN).
2266    const ID_SPN: u8 = 0x02;
2267    /// Size of one `FedAuthInfoOpt` header: ID (1) + DataLen (4) + DataOffset (4).
2268    const OPT_HEADER_LEN: usize = 9;
2269
2270    /// Decode a FEDAUTHINFO token from bytes.
2271    ///
2272    /// Wire layout per MS-TDS §2.2.7.12 (after the 0xEE token byte):
2273    /// `TokenLength` (DWORD) covering everything that follows, then
2274    /// `CountOfInfoIDs` (DWORD), then `CountOfInfoIDs` option headers of
2275    /// ID (BYTE) + `FedAuthInfoDataLen` (DWORD) + `FedAuthInfoDataOffset`
2276    /// (DWORD), then the data block. Offsets are relative to the start of
2277    /// the `CountOfInfoIDs` field, and the option data is UTF-16LE.
2278    ///
2279    /// Exactly `TokenLength` bytes are consumed, so tokens that follow
2280    /// FEDAUTHINFO in the login stream (LOGINACK, DONE) are preserved.
2281    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2282        if src.remaining() < 4 {
2283            return Err(ProtocolError::UnexpectedEof);
2284        }
2285        let token_len = src.get_u32_le() as usize;
2286        if src.remaining() < token_len {
2287            return Err(ProtocolError::UnexpectedEof);
2288        }
2289
2290        // Offsets in the option headers are relative to the start of this
2291        // region (the CountOfInfoIDs field), so address into it directly.
2292        let region = src.copy_to_bytes(token_len);
2293        if region.len() < 4 {
2294            return Err(ProtocolError::UnexpectedEof);
2295        }
2296        let count = u32::from_le_bytes([region[0], region[1], region[2], region[3]]) as usize;
2297
2298        // All headers must fit between the count field and the end of the
2299        // token. The checked math also rejects hostile counts that would
2300        // overflow the offset arithmetic.
2301        let headers_end = count
2302            .checked_mul(Self::OPT_HEADER_LEN)
2303            .and_then(|n| n.checked_add(4))
2304            .ok_or(ProtocolError::UnexpectedEof)?;
2305        if headers_end > region.len() {
2306            return Err(ProtocolError::UnexpectedEof);
2307        }
2308
2309        let mut sts_url = String::new();
2310        let mut spn = String::new();
2311
2312        for i in 0..count {
2313            let h = 4 + i * Self::OPT_HEADER_LEN;
2314            let info_id = region[h];
2315            let data_len =
2316                u32::from_le_bytes([region[h + 1], region[h + 2], region[h + 3], region[h + 4]])
2317                    as usize;
2318            let data_off =
2319                u32::from_le_bytes([region[h + 5], region[h + 6], region[h + 7], region[h + 8]])
2320                    as usize;
2321
2322            // Unknown IDs are skipped without validating their data, per the
2323            // spec's instruction to ignore unrecognized options.
2324            if info_id != Self::ID_SPN && info_id != Self::ID_STSURL {
2325                continue;
2326            }
2327
2328            let data_end = data_off
2329                .checked_add(data_len)
2330                .ok_or(ProtocolError::UnexpectedEof)?;
2331            if data_end > region.len() {
2332                return Err(ProtocolError::UnexpectedEof);
2333            }
2334            if data_len % 2 != 0 {
2335                return Err(ProtocolError::StringEncoding(
2336                    #[cfg(feature = "std")]
2337                    "FEDAUTHINFO option data has odd length, not UTF-16".to_string(),
2338                    #[cfg(not(feature = "std"))]
2339                    "FEDAUTHINFO option data has odd length, not UTF-16",
2340                ));
2341            }
2342
2343            let chars: Vec<u16> = region[data_off..data_end]
2344                .chunks_exact(2)
2345                .map(|b| u16::from_le_bytes([b[0], b[1]]))
2346                .collect();
2347            let value = String::from_utf16(&chars).map_err(|_| {
2348                ProtocolError::StringEncoding(
2349                    #[cfg(feature = "std")]
2350                    "invalid UTF-16 in FEDAUTHINFO option".to_string(),
2351                    #[cfg(not(feature = "std"))]
2352                    "invalid UTF-16 in FEDAUTHINFO option",
2353                )
2354            })?;
2355
2356            if info_id == Self::ID_SPN {
2357                spn = value;
2358            } else {
2359                sts_url = value;
2360            }
2361        }
2362
2363        Ok(Self { sts_url, spn })
2364    }
2365}
2366
2367// =============================================================================
2368// Token Parser
2369// =============================================================================
2370
2371/// Token stream parser.
2372///
2373/// Parses a stream of TDS tokens from a byte buffer.
2374///
2375/// # Basic vs Context-Aware Parsing
2376///
2377/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
2378/// Use [`next_token()`](TokenParser::next_token) for these.
2379///
2380/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
2381/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
2382/// for these.
2383///
2384/// # Example
2385///
2386/// ```rust,ignore
2387/// let mut parser = TokenParser::new(data);
2388/// let mut metadata = None;
2389///
2390/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
2391///     match token {
2392///         Token::ColMetaData(meta) => {
2393///             metadata = Some(meta);
2394///         }
2395///         Token::Row(row) => {
2396///             // Process row using metadata
2397///         }
2398///         Token::Done(done) => {
2399///             if !done.has_more() {
2400///                 break;
2401///             }
2402///         }
2403///         _ => {}
2404///     }
2405/// }
2406/// ```
2407pub struct TokenParser {
2408    data: Bytes,
2409    position: usize,
2410    /// Whether Always Encrypted was negotiated for this connection.
2411    /// When true, ColMetaData tokens are parsed with CekTable and per-column CryptoMetadata.
2412    encryption_enabled: bool,
2413    /// COMPUTE BY (ALTMETADATA) column layouts keyed by compute Id, used to
2414    /// parse and drop the matching ALTROW tokens. Cleared on each new
2415    /// ColMetaData (result-set boundary), where compute Ids become unique again.
2416    alt_metadata: Vec<(u16, ColMetaData)>,
2417}
2418
2419impl TokenParser {
2420    /// Create a new token parser from bytes.
2421    #[must_use]
2422    pub fn new(data: Bytes) -> Self {
2423        Self {
2424            data,
2425            position: 0,
2426            encryption_enabled: false,
2427            alt_metadata: Vec::new(),
2428        }
2429    }
2430
2431    /// Enable Always Encrypted metadata parsing.
2432    ///
2433    /// When enabled, ColMetaData tokens are parsed using the encrypted format
2434    /// which includes a CekTable and per-column CryptoMetadata.
2435    #[must_use]
2436    pub fn with_encryption(mut self, enabled: bool) -> Self {
2437        self.encryption_enabled = enabled;
2438        self
2439    }
2440
2441    /// Get remaining bytes in the buffer.
2442    #[must_use]
2443    pub fn remaining(&self) -> usize {
2444        self.data.len().saturating_sub(self.position)
2445    }
2446
2447    /// Check if there are more bytes to parse.
2448    #[must_use]
2449    pub fn has_remaining(&self) -> bool {
2450        self.position < self.data.len()
2451    }
2452
2453    /// Peek at the next token type without consuming it.
2454    #[must_use]
2455    pub fn peek_token_type(&self) -> Option<TokenType> {
2456        if self.position < self.data.len() {
2457            TokenType::from_u8(self.data[self.position])
2458        } else {
2459            None
2460        }
2461    }
2462
2463    /// Parse the next token from the stream.
2464    ///
2465    /// This method can only parse context-independent tokens. For tokens that
2466    /// require column metadata (ColMetaData, Row, NbcRow), use
2467    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2468    ///
2469    /// Returns `None` if no more tokens are available.
2470    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2471        self.next_token_with_metadata(None)
2472    }
2473
2474    /// Parse the next token with optional column metadata context.
2475    ///
2476    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2477    /// Without metadata, those tokens will return an error.
2478    ///
2479    /// Returns `None` if no more tokens are available.
2480    pub fn next_token_with_metadata(
2481        &mut self,
2482        metadata: Option<&ColMetaData>,
2483    ) -> Result<Option<Token>, ProtocolError> {
2484        loop {
2485            if !self.has_remaining() {
2486                return Ok(None);
2487            }
2488
2489            let mut buf = &self.data[self.position..];
2490            let start_pos = self.position;
2491
2492            let token_type_byte = buf.get_u8();
2493            let token_type = TokenType::from_u8(token_type_byte);
2494
2495            let token = match token_type {
2496                Some(TokenType::Done) => {
2497                    let done = Done::decode(&mut buf)?;
2498                    Token::Done(done)
2499                }
2500                Some(TokenType::DoneProc) => {
2501                    let done = DoneProc::decode(&mut buf)?;
2502                    Token::DoneProc(done)
2503                }
2504                Some(TokenType::DoneInProc) => {
2505                    let done = DoneInProc::decode(&mut buf)?;
2506                    Token::DoneInProc(done)
2507                }
2508                Some(TokenType::Error) => {
2509                    let error = ServerError::decode(&mut buf)?;
2510                    Token::Error(error)
2511                }
2512                Some(TokenType::Info) => {
2513                    let info = ServerInfo::decode(&mut buf)?;
2514                    Token::Info(info)
2515                }
2516                Some(TokenType::LoginAck) => {
2517                    let login_ack = LoginAck::decode(&mut buf)?;
2518                    Token::LoginAck(login_ack)
2519                }
2520                Some(TokenType::EnvChange) => {
2521                    let env_change = EnvChange::decode(&mut buf)?;
2522                    Token::EnvChange(env_change)
2523                }
2524                Some(TokenType::Order) => {
2525                    let order = Order::decode(&mut buf)?;
2526                    Token::Order(order)
2527                }
2528                Some(TokenType::FeatureExtAck) => {
2529                    let ack = FeatureExtAck::decode(&mut buf)?;
2530                    Token::FeatureExtAck(ack)
2531                }
2532                Some(TokenType::Sspi) => {
2533                    let sspi = SspiToken::decode(&mut buf)?;
2534                    Token::Sspi(sspi)
2535                }
2536                Some(TokenType::FedAuthInfo) => {
2537                    let info = FedAuthInfo::decode(&mut buf)?;
2538                    Token::FedAuthInfo(info)
2539                }
2540                Some(TokenType::ReturnStatus) => {
2541                    if buf.remaining() < 4 {
2542                        return Err(ProtocolError::UnexpectedEof);
2543                    }
2544                    let status = buf.get_i32_le();
2545                    Token::ReturnStatus(status)
2546                }
2547                Some(TokenType::ColMetaData) => {
2548                    let col_meta = if self.encryption_enabled {
2549                        ColMetaData::decode_encrypted(&mut buf)?
2550                    } else {
2551                        ColMetaData::decode(&mut buf)?
2552                    };
2553                    // New result set: prior COMPUTE BY layouts no longer apply
2554                    // and their Ids may be reused.
2555                    self.alt_metadata.clear();
2556                    Token::ColMetaData(col_meta)
2557                }
2558                Some(TokenType::Row) => {
2559                    let meta = metadata.ok_or_else(|| {
2560                        ProtocolError::StringEncoding(
2561                            #[cfg(feature = "std")]
2562                            "Row token requires column metadata".to_string(),
2563                            #[cfg(not(feature = "std"))]
2564                            "Row token requires column metadata",
2565                        )
2566                    })?;
2567                    let row = RawRow::decode(&mut buf, meta)?;
2568                    Token::Row(row)
2569                }
2570                Some(TokenType::NbcRow) => {
2571                    let meta = metadata.ok_or_else(|| {
2572                        ProtocolError::StringEncoding(
2573                            #[cfg(feature = "std")]
2574                            "NbcRow token requires column metadata".to_string(),
2575                            #[cfg(not(feature = "std"))]
2576                            "NbcRow token requires column metadata",
2577                        )
2578                    })?;
2579                    let row = NbcRow::decode(&mut buf, meta)?;
2580                    Token::NbcRow(row)
2581                }
2582                Some(TokenType::ReturnValue) => {
2583                    let ret_val = ReturnValue::decode(&mut buf)?;
2584                    Token::ReturnValue(ret_val)
2585                }
2586                Some(TokenType::SessionState) => {
2587                    let session = SessionState::decode(&mut buf)?;
2588                    Token::SessionState(session)
2589                }
2590                Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2591                    // These tokens are rarely used and have complex formats.
2592                    // Skip them by reading the length and advancing.
2593                    if buf.remaining() < 2 {
2594                        return Err(ProtocolError::UnexpectedEof);
2595                    }
2596                    let length = buf.get_u16_le() as usize;
2597                    if buf.remaining() < length {
2598                        return Err(ProtocolError::IncompletePacket {
2599                            expected: length,
2600                            actual: buf.remaining(),
2601                        });
2602                    }
2603                    // Skip the data
2604                    buf.advance(length);
2605                    // #273: advance past the skipped token and iterate. The skip
2606                    // path must NOT recurse — a server-controlled flat run of these
2607                    // tokens would otherwise add one stack frame per token and
2608                    // overflow the stack (remote DoS).
2609                    self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2610                    continue;
2611                }
2612                Some(TokenType::AltMetaData) => {
2613                    // COMPUTE BY metadata (#275). Parse to learn the ALTROW value
2614                    // layout, store it keyed by Id, then drop the token — compute
2615                    // rows are not surfaced, but their bytes must be consumed so
2616                    // the base result set stays parseable.
2617                    let (id, alt_meta) = ColMetaData::decode_alt(&mut buf)?;
2618                    self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2619                    self.alt_metadata.push((id, alt_meta));
2620                    continue;
2621                }
2622                Some(TokenType::AltRow) => {
2623                    // COMPUTE BY row (#275). Parse using the matching ALTMETADATA
2624                    // to consume its bytes, then drop it. Like the ColInfo skip
2625                    // path, this MUST NOT recurse (remote DoS via a flat token run).
2626                    if buf.remaining() < 2 {
2627                        return Err(ProtocolError::UnexpectedEof);
2628                    }
2629                    let id = buf.get_u16_le();
2630                    let alt_meta = self
2631                        .alt_metadata
2632                        .iter()
2633                        .find(|(stored_id, _)| *stored_id == id)
2634                        .map(|(_, meta)| meta)
2635                        .ok_or_else(|| {
2636                            ProtocolError::StringEncoding(
2637                                #[cfg(feature = "std")]
2638                                "ALTROW token without matching ALTMETADATA".to_string(),
2639                                #[cfg(not(feature = "std"))]
2640                                "ALTROW token without matching ALTMETADATA",
2641                            )
2642                        })?;
2643                    let _ = RawRow::decode(&mut buf, alt_meta)?;
2644                    self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2645                    continue;
2646                }
2647                None => {
2648                    return Err(ProtocolError::InvalidTokenType(token_type_byte));
2649                }
2650            };
2651
2652            // Update position based on how much was consumed
2653            let consumed = self.data.len() - start_pos - buf.remaining();
2654            self.position = start_pos + consumed;
2655
2656            return Ok(Some(token));
2657        }
2658    }
2659
2660    /// Skip the current token without fully parsing it.
2661    ///
2662    /// This is useful for skipping unknown or uninteresting tokens.
2663    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2664        if !self.has_remaining() {
2665            return Ok(());
2666        }
2667
2668        let token_type_byte = self.data[self.position];
2669        let token_type = TokenType::from_u8(token_type_byte);
2670
2671        // Calculate how many bytes to skip based on token type
2672        let skip_amount = match token_type {
2673            // Fixed-size tokens
2674            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2675                1 + Done::SIZE // token type + 12 bytes
2676            }
2677            Some(TokenType::ReturnStatus) => {
2678                1 + 4 // token type + 4 bytes
2679            }
2680            // Variable-length tokens with 2-byte length prefix
2681            Some(TokenType::Error)
2682            | Some(TokenType::Info)
2683            | Some(TokenType::LoginAck)
2684            | Some(TokenType::EnvChange)
2685            | Some(TokenType::Order)
2686            | Some(TokenType::Sspi)
2687            | Some(TokenType::ColInfo)
2688            | Some(TokenType::TabName)
2689            | Some(TokenType::Offset) => {
2690                if self.remaining() < 3 {
2691                    return Err(ProtocolError::UnexpectedEof);
2692                }
2693                let length = u16::from_le_bytes([
2694                    self.data[self.position + 1],
2695                    self.data[self.position + 2],
2696                ]) as usize;
2697                1 + 2 + length // token type + length prefix + data
2698            }
2699            // RETURNVALUE has no length prefix (MS-TDS §2.2.7.18): the bytes
2700            // after the token type are the 2-byte ParamOrdinal, not a length.
2701            // Parse to find its end, mirroring ReturnValue::decode in the main
2702            // loop (the length-prefix treatment skipped `1 + 2 + ParamOrdinal`).
2703            Some(TokenType::ReturnValue) => {
2704                let mut buf = &self.data[self.position + 1..];
2705                let _ = ReturnValue::decode(&mut buf)?;
2706                self.data.len() - self.position - buf.remaining()
2707            }
2708            // Tokens with 4-byte length prefix
2709            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2710                if self.remaining() < 5 {
2711                    return Err(ProtocolError::UnexpectedEof);
2712                }
2713                let length = u32::from_le_bytes([
2714                    self.data[self.position + 1],
2715                    self.data[self.position + 2],
2716                    self.data[self.position + 3],
2717                    self.data[self.position + 4],
2718                ]) as usize;
2719                1 + 4 + length
2720            }
2721            // FeatureExtAck has no length prefix - must parse
2722            Some(TokenType::FeatureExtAck) => {
2723                // Parse to find end
2724                let mut buf = &self.data[self.position + 1..];
2725                let _ = FeatureExtAck::decode(&mut buf)?;
2726                self.data.len() - self.position - buf.remaining()
2727            }
2728            // COMPUTE BY metadata (#275) has no length prefix; parse to find its
2729            // end, recording the layout so a following ALTROW can be skipped.
2730            Some(TokenType::AltMetaData) => {
2731                let mut buf = &self.data[self.position + 1..];
2732                let (id, alt_meta) = ColMetaData::decode_alt(&mut buf)?;
2733                let consumed = self.data.len() - self.position - buf.remaining();
2734                self.alt_metadata.push((id, alt_meta));
2735                consumed
2736            }
2737            // COMPUTE BY row (#275); length depends on the matching ALTMETADATA.
2738            Some(TokenType::AltRow) => {
2739                let mut buf = &self.data[self.position + 1..];
2740                if buf.remaining() < 2 {
2741                    return Err(ProtocolError::UnexpectedEof);
2742                }
2743                let id = buf.get_u16_le();
2744                let alt_meta = self
2745                    .alt_metadata
2746                    .iter()
2747                    .find(|(stored_id, _)| *stored_id == id)
2748                    .map(|(_, meta)| meta)
2749                    .ok_or(ProtocolError::InvalidTokenType(token_type_byte))?;
2750                let _ = RawRow::decode(&mut buf, alt_meta)?;
2751                self.data.len() - self.position - buf.remaining()
2752            }
2753            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2754            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2755                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2756            }
2757            None => {
2758                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2759            }
2760        };
2761
2762        if self.remaining() < skip_amount {
2763            return Err(ProtocolError::UnexpectedEof);
2764        }
2765
2766        self.position += skip_amount;
2767        Ok(())
2768    }
2769
2770    /// Get the current position in the buffer.
2771    #[must_use]
2772    pub fn position(&self) -> usize {
2773        self.position
2774    }
2775
2776    /// Reset the parser to the beginning.
2777    pub fn reset(&mut self) {
2778        self.position = 0;
2779    }
2780}
2781
2782// =============================================================================
2783// Tests
2784// =============================================================================
2785
2786#[cfg(test)]
2787#[allow(clippy::unwrap_used, clippy::panic)]
2788mod tests {
2789    use super::*;
2790    use bytes::BytesMut;
2791
2792    #[test]
2793    fn test_done_roundtrip() {
2794        let done = Done {
2795            status: DoneStatus {
2796                more: false,
2797                error: false,
2798                in_xact: false,
2799                count: true,
2800                attn: false,
2801                srverror: false,
2802            },
2803            cur_cmd: 193, // SELECT
2804            row_count: 42,
2805        };
2806
2807        let mut buf = BytesMut::new();
2808        done.encode(&mut buf);
2809
2810        // Skip the token type byte
2811        let mut cursor = &buf[1..];
2812        let decoded = Done::decode(&mut cursor).unwrap();
2813
2814        assert_eq!(decoded.status.count, done.status.count);
2815        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2816        assert_eq!(decoded.row_count, done.row_count);
2817    }
2818
2819    #[test]
2820    fn test_done_status_bits() {
2821        let status = DoneStatus {
2822            more: true,
2823            error: true,
2824            in_xact: true,
2825            count: true,
2826            attn: false,
2827            srverror: false,
2828        };
2829
2830        let bits = status.to_bits();
2831        let restored = DoneStatus::from_bits(bits);
2832
2833        assert_eq!(status.more, restored.more);
2834        assert_eq!(status.error, restored.error);
2835        assert_eq!(status.in_xact, restored.in_xact);
2836        assert_eq!(status.count, restored.count);
2837    }
2838
2839    #[test]
2840    fn test_token_parser_done() {
2841        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2842        let data = Bytes::from_static(&[
2843            0xFD, // DONE token type
2844            0x10, 0x00, // status: DONE_COUNT
2845            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2846            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2847        ]);
2848
2849        let mut parser = TokenParser::new(data);
2850        let token = parser.next_token().unwrap().unwrap();
2851
2852        match token {
2853            Token::Done(done) => {
2854                assert!(done.status.count);
2855                assert!(!done.status.more);
2856                assert_eq!(done.cur_cmd, 193);
2857                assert_eq!(done.row_count, 5);
2858            }
2859            _ => panic!("Expected Done token"),
2860        }
2861
2862        // No more tokens
2863        assert!(parser.next_token().unwrap().is_none());
2864    }
2865
2866    #[test]
2867    fn test_env_change_type_from_u8() {
2868        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2869        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2870        assert_eq!(EnvChangeType::from_u8(100), None);
2871    }
2872
2873    /// A spec-faithful Routing ENVCHANGE (MS-TDS 2.2.7.9) carries a
2874    /// zero-length OldValue (two bytes) after the routing data. The decoder
2875    /// reads only the NewValue, so it must skip to the declared frame
2876    /// boundary — otherwise the leftover `00 00` is misparsed as the next
2877    /// token type and the rest of the login response is garbage. Azure SQL
2878    /// Gateway redirects send exactly this shape.
2879    #[test]
2880    fn test_env_change_routing_consumes_declared_length() {
2881        let host = "redirect.example";
2882        let host_utf16: Vec<u16> = host.encode_utf16().collect();
2883
2884        let mut data = BytesMut::new();
2885        // RoutingDataValue: length + protocol + port + server_len + server
2886        let routing_len = 1 + 2 + 2 + host_utf16.len() * 2;
2887        // ENVCHANGE length: type byte + routing length prefix + routing data
2888        // + zero-length OldValue
2889        let env_len = 1 + 2 + routing_len + 2;
2890        data.put_u16_le(env_len as u16);
2891        data.put_u8(20); // Routing
2892        data.put_u16_le(routing_len as u16);
2893        data.put_u8(0); // protocol: TCP
2894        data.put_u16_le(11000); // port
2895        data.put_u16_le(host_utf16.len() as u16);
2896        for c in &host_utf16 {
2897            data.put_u16_le(*c);
2898        }
2899        data.put_u16_le(0); // OldValue: zero-length US_VARBYTE
2900        // A trailing DONE token type byte that must remain for the next read.
2901        data.put_u8(0xFD);
2902
2903        let mut buf: &[u8] = &data;
2904        let env = EnvChange::decode(&mut buf).unwrap();
2905        assert_eq!(env.routing_info(), Some((host, 11000)));
2906        assert_eq!(
2907            buf,
2908            &[0xFD],
2909            "decode must consume exactly the declared ENVCHANGE frame"
2910        );
2911    }
2912
2913    fn put_b_varchar(buf: &mut BytesMut, s: &str) {
2914        let utf16: Vec<u16> = s.encode_utf16().collect();
2915        buf.put_u8(utf16.len() as u8);
2916        for c in utf16 {
2917            buf.put_u16_le(c);
2918        }
2919    }
2920
2921    fn put_us_varchar(buf: &mut BytesMut, s: &str) {
2922        let utf16: Vec<u16> = s.encode_utf16().collect();
2923        buf.put_u16_le(utf16.len() as u16);
2924        for c in utf16 {
2925            buf.put_u16_le(c);
2926        }
2927    }
2928
2929    /// UDT_INFO regression (issue #154): per MS-TDS, DB_NAME, SCHEMA_NAME,
2930    /// and TYPE_NAME are B_VARCHAR (1-byte length); only
2931    /// ASSEMBLY_QUALIFIED_NAME is US_VARCHAR. Reading all four as US_VARCHAR
2932    /// misaligned the stream, so every query selecting a UDT column
2933    /// (geography, geometry, hierarchyid, CLR UDTs) failed with
2934    /// UnexpectedEof.
2935    #[test]
2936    fn test_udt_info_metadata_uses_b_varchar_names() {
2937        let mut data = BytesMut::new();
2938        data.put_u16_le(0xFFFF); // MAX_BYTE_SIZE
2939        put_b_varchar(&mut data, "master");
2940        put_b_varchar(&mut data, "dbo");
2941        put_b_varchar(&mut data, "hierarchyid");
2942        put_us_varchar(
2943            &mut data,
2944            "Microsoft.SqlServer.Types.SqlHierarchyId, Microsoft.SqlServer.Types",
2945        );
2946        // A trailing token type byte that must remain for the next read.
2947        data.put_u8(0xFD);
2948
2949        let mut buf: &[u8] = &data;
2950        let info = decode_type_info(&mut buf, TypeId::Udt, TypeId::Udt as u8).unwrap();
2951        assert_eq!(info.max_length, Some(0xFFFF));
2952        assert_eq!(
2953            buf,
2954            &[0xFD],
2955            "decode must consume exactly the UDT_INFO frame"
2956        );
2957    }
2958
2959    /// XML_INFO regression (issue #154): per MS-TDS §2.2.5.5.3, DBNAME and
2960    /// OWNING_SCHEMA are B_VARCHAR; only XML_SCHEMA_COLLECTION is US_VARCHAR.
2961    /// Schema-bound xml columns (SCHEMA_PRESENT=1) previously misparsed.
2962    #[test]
2963    fn test_xml_info_schema_bound_uses_b_varchar_names() {
2964        let mut data = BytesMut::new();
2965        data.put_u8(1); // SCHEMA_PRESENT
2966        put_b_varchar(&mut data, "master");
2967        put_b_varchar(&mut data, "dbo");
2968        put_us_varchar(&mut data, "MyXmlSchemaCollection");
2969        data.put_u8(0xFD);
2970
2971        let mut buf: &[u8] = &data;
2972        decode_type_info(&mut buf, TypeId::Xml, TypeId::Xml as u8).unwrap();
2973        assert_eq!(
2974            buf,
2975            &[0xFD],
2976            "decode must consume exactly the XML_INFO frame"
2977        );
2978    }
2979
2980    #[test]
2981    fn hostile_env_change_binary_truncated_is_not_panic() {
2982        // length=1 covers only the type byte (0x08 = BeginTransaction, a
2983        // binary-format type); the new_len/old_len prefix reads then hit an
2984        // empty buffer. Must decode gracefully, never panic (found by the
2985        // parse_env_change and parse_token fuzz targets).
2986        let data = [0x01, 0x00, 0x08];
2987        let mut buf: &[u8] = &data;
2988        let env = EnvChange::decode(&mut buf).unwrap();
2989        assert_eq!(env.env_type, EnvChangeType::BeginTransaction);
2990    }
2991
2992    /// Issue #145: an under-declared frame must not let the value decoders
2993    /// read past the declared length into the next token's bytes.
2994    #[test]
2995    fn hostile_env_change_under_declared_cannot_steal_following_bytes() {
2996        // length=1 covers only the type byte; the bytes after the frame are
2997        // shaped exactly like the transaction-descriptor payload the old
2998        // buffer-bounded decoder would have consumed (new_len=8, descriptor,
2999        // old_len=0).
3000        let mut data = BytesMut::new();
3001        data.put_u16_le(1); // declared frame: type byte only
3002        data.put_u8(0x08); // BeginTransaction
3003        let following: &[u8] = &[0x08, 1, 2, 3, 4, 5, 6, 7, 8, 0x00];
3004        data.extend_from_slice(following);
3005
3006        let mut buf: &[u8] = &data;
3007        let env = EnvChange::decode(&mut buf).unwrap();
3008        assert_eq!(env.env_type, EnvChangeType::BeginTransaction);
3009        match &env.new_value {
3010            EnvChangeValue::Binary(b) => {
3011                assert!(
3012                    b.is_empty(),
3013                    "under-declared frame yields the lenient empty value"
3014                );
3015            }
3016            other => panic!("expected empty Binary value, got {other:?}"),
3017        }
3018        assert_eq!(
3019            buf, following,
3020            "bytes beyond the declared frame belong to the next token"
3021        );
3022    }
3023
3024    /// Issue #145: a zero-length frame cannot supply a type byte; reading
3025    /// one from beyond the frame would consume the next token.
3026    #[test]
3027    fn hostile_env_change_zero_length_frame_errors() {
3028        let data = [0x00, 0x00, 0xFD];
3029        let mut buf: &[u8] = &data;
3030        assert!(EnvChange::decode(&mut buf).is_err());
3031    }
3032
3033    #[test]
3034    fn test_colmetadata_no_columns() {
3035        // No metadata marker (0xFFFF)
3036        let data = Bytes::from_static(&[0xFF, 0xFF]);
3037        let mut cursor: &[u8] = &data;
3038        let meta = ColMetaData::decode(&mut cursor).unwrap();
3039        assert!(meta.is_empty());
3040        assert_eq!(meta.column_count(), 0);
3041    }
3042
3043    #[test]
3044    fn test_colmetadata_single_int_column() {
3045        // COLMETADATA with 1 INT column
3046        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
3047        let mut data = BytesMut::new();
3048        data.extend_from_slice(&[0x01, 0x00]); // 1 column
3049        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
3050        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
3051        data.extend_from_slice(&[0x38]); // TypeId::Int4
3052        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
3053        data.extend_from_slice(&[0x02]); // 2 characters
3054        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
3055
3056        let mut cursor: &[u8] = &data;
3057        let meta = ColMetaData::decode(&mut cursor).unwrap();
3058
3059        assert_eq!(meta.column_count(), 1);
3060        assert_eq!(meta.columns[0].name, "id");
3061        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
3062        assert!(meta.columns[0].is_nullable());
3063    }
3064
3065    #[test]
3066    fn test_colmetadata_nvarchar_column() {
3067        // COLMETADATA with 1 NVARCHAR(50) column
3068        let mut data = BytesMut::new();
3069        data.extend_from_slice(&[0x01, 0x00]); // 1 column
3070        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
3071        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
3072        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
3073        // Type info: max_length (2 bytes) + collation (5 bytes)
3074        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
3075        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
3076        // Column name "name"
3077        data.extend_from_slice(&[0x04]); // 4 characters
3078        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
3079
3080        let mut cursor: &[u8] = &data;
3081        let meta = ColMetaData::decode(&mut cursor).unwrap();
3082
3083        assert_eq!(meta.column_count(), 1);
3084        assert_eq!(meta.columns[0].name, "name");
3085        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
3086        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
3087        assert!(meta.columns[0].type_info.collation.is_some());
3088    }
3089
3090    #[test]
3091    fn test_raw_row_decode_int() {
3092        // Create metadata for a single INT column
3093        let metadata = ColMetaData {
3094            cek_table: None,
3095            columns: vec![ColumnData {
3096                name: "id".to_string(),
3097                type_id: TypeId::Int4,
3098                col_type: 0x38,
3099                flags: 0,
3100                user_type: 0,
3101                type_info: TypeInfo::default(),
3102                crypto_metadata: None,
3103            }],
3104        };
3105
3106        // Row data: just 4 bytes for the int value 42
3107        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
3108        let mut cursor: &[u8] = &data;
3109        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
3110
3111        // The raw data should contain the 4 bytes
3112        assert_eq!(row.data.len(), 4);
3113        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
3114    }
3115
3116    #[test]
3117    fn test_raw_row_decode_nullable_int() {
3118        // Create metadata for a nullable INT column (IntN)
3119        let metadata = ColMetaData {
3120            cek_table: None,
3121            columns: vec![ColumnData {
3122                name: "id".to_string(),
3123                type_id: TypeId::IntN,
3124                col_type: 0x26,
3125                flags: 0x01, // nullable
3126                user_type: 0,
3127                type_info: TypeInfo {
3128                    max_length: Some(4),
3129                    ..Default::default()
3130                },
3131                crypto_metadata: None,
3132            }],
3133        };
3134
3135        // Row data with value: 1 byte length + 4 bytes value
3136        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
3137        let mut cursor: &[u8] = &data;
3138        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
3139
3140        assert_eq!(row.data.len(), 5);
3141        assert_eq!(row.data[0], 4); // length
3142        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
3143    }
3144
3145    #[test]
3146    fn test_raw_row_decode_null_value() {
3147        // Create metadata for a nullable INT column (IntN)
3148        let metadata = ColMetaData {
3149            cek_table: None,
3150            columns: vec![ColumnData {
3151                name: "id".to_string(),
3152                type_id: TypeId::IntN,
3153                col_type: 0x26,
3154                flags: 0x01, // nullable
3155                user_type: 0,
3156                type_info: TypeInfo {
3157                    max_length: Some(4),
3158                    ..Default::default()
3159                },
3160                crypto_metadata: None,
3161            }],
3162        };
3163
3164        // NULL value: length = 0xFF (for bytelen types)
3165        let data = Bytes::from_static(&[0xFF]);
3166        let mut cursor: &[u8] = &data;
3167        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
3168
3169        assert_eq!(row.data.len(), 1);
3170        assert_eq!(row.data[0], 0xFF); // NULL marker
3171    }
3172
3173    #[test]
3174    fn test_nbcrow_null_bitmap() {
3175        let row = NbcRow {
3176            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
3177            data: Bytes::new(),
3178        };
3179
3180        assert!(row.is_null(0));
3181        assert!(!row.is_null(1));
3182        assert!(row.is_null(2));
3183        assert!(!row.is_null(3));
3184    }
3185
3186    #[test]
3187    fn test_token_parser_colmetadata() {
3188        // Build a COLMETADATA token with 1 INT column
3189        let mut data = BytesMut::new();
3190        data.extend_from_slice(&[0x81]); // COLMETADATA token type
3191        data.extend_from_slice(&[0x01, 0x00]); // 1 column
3192        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
3193        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
3194        data.extend_from_slice(&[0x38]); // TypeId::Int4
3195        data.extend_from_slice(&[0x02]); // column name length
3196        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
3197
3198        let mut parser = TokenParser::new(data.freeze());
3199        let token = parser.next_token().unwrap().unwrap();
3200
3201        match token {
3202            Token::ColMetaData(meta) => {
3203                assert_eq!(meta.column_count(), 1);
3204                assert_eq!(meta.columns[0].name, "id");
3205                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
3206            }
3207            _ => panic!("Expected ColMetaData token"),
3208        }
3209    }
3210
3211    #[test]
3212    fn test_token_parser_row_with_metadata() {
3213        // Build metadata
3214        let metadata = ColMetaData {
3215            cek_table: None,
3216            columns: vec![ColumnData {
3217                name: "id".to_string(),
3218                type_id: TypeId::Int4,
3219                col_type: 0x38,
3220                flags: 0,
3221                user_type: 0,
3222                type_info: TypeInfo::default(),
3223                crypto_metadata: None,
3224            }],
3225        };
3226
3227        // Build ROW token
3228        let mut data = BytesMut::new();
3229        data.extend_from_slice(&[0xD1]); // ROW token type
3230        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
3231
3232        let mut parser = TokenParser::new(data.freeze());
3233        let token = parser
3234            .next_token_with_metadata(Some(&metadata))
3235            .unwrap()
3236            .unwrap();
3237
3238        match token {
3239            Token::Row(row) => {
3240                assert_eq!(row.data.len(), 4);
3241            }
3242            _ => panic!("Expected Row token"),
3243        }
3244    }
3245
3246    #[test]
3247    fn test_token_parser_row_without_metadata_fails() {
3248        // Build ROW token
3249        let mut data = BytesMut::new();
3250        data.extend_from_slice(&[0xD1]); // ROW token type
3251        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
3252
3253        let mut parser = TokenParser::new(data.freeze());
3254        let result = parser.next_token(); // No metadata provided
3255
3256        assert!(result.is_err());
3257    }
3258
3259    #[test]
3260    fn test_token_parser_peek() {
3261        let data = Bytes::from_static(&[
3262            0xFD, // DONE token type
3263            0x10, 0x00, // status
3264            0xC1, 0x00, // cur_cmd
3265            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
3266        ]);
3267
3268        let parser = TokenParser::new(data);
3269        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
3270    }
3271
3272    #[test]
3273    fn test_column_data_fixed_size() {
3274        let col = ColumnData {
3275            name: String::new(),
3276            type_id: TypeId::Int4,
3277            col_type: 0x38,
3278            flags: 0,
3279            user_type: 0,
3280            type_info: TypeInfo::default(),
3281            crypto_metadata: None,
3282        };
3283        assert_eq!(col.fixed_size(), Some(4));
3284
3285        let col2 = ColumnData {
3286            name: String::new(),
3287            type_id: TypeId::NVarChar,
3288            col_type: 0xE7,
3289            flags: 0,
3290            user_type: 0,
3291            type_info: TypeInfo::default(),
3292            crypto_metadata: None,
3293        };
3294        assert_eq!(col2.fixed_size(), None);
3295    }
3296
3297    // ========================================================================
3298    // End-to-End Decode Tests (Wire → Stored → Verification)
3299    // ========================================================================
3300    //
3301    // These tests verify that RawRow::decode_column_value correctly stores
3302    // column values in a format that can be parsed back.
3303
3304    #[test]
3305    fn test_decode_nvarchar_then_intn_roundtrip() {
3306        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
3307        // This tests the scenario from the MCP parameterized query
3308
3309        // Build wire data (what the server sends)
3310        let mut wire_data = BytesMut::new();
3311
3312        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
3313        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
3314        let word = "World";
3315        let utf16: Vec<u16> = word.encode_utf16().collect();
3316        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
3317        for code_unit in &utf16 {
3318            wire_data.put_u16_le(*code_unit);
3319        }
3320
3321        // Column 1: IntN 42 - 1-byte length prefix
3322        wire_data.put_u8(4); // 4 bytes for INT
3323        wire_data.put_i32_le(42);
3324
3325        // Build column metadata
3326        let metadata = ColMetaData {
3327            cek_table: None,
3328            columns: vec![
3329                ColumnData {
3330                    name: "greeting".to_string(),
3331                    type_id: TypeId::NVarChar,
3332                    col_type: 0xE7,
3333                    flags: 0x01,
3334                    user_type: 0,
3335                    type_info: TypeInfo {
3336                        max_length: Some(10), // non-MAX
3337                        precision: None,
3338                        scale: None,
3339                        collation: None,
3340                    },
3341                    crypto_metadata: None,
3342                },
3343                ColumnData {
3344                    name: "number".to_string(),
3345                    type_id: TypeId::IntN,
3346                    col_type: 0x26,
3347                    flags: 0x01,
3348                    user_type: 0,
3349                    type_info: TypeInfo {
3350                        max_length: Some(4),
3351                        precision: None,
3352                        scale: None,
3353                        collation: None,
3354                    },
3355                    crypto_metadata: None,
3356                },
3357            ],
3358        };
3359
3360        // Decode the wire data into stored format
3361        let mut wire_cursor = wire_data.freeze();
3362        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
3363
3364        // Verify wire data was fully consumed
3365        assert_eq!(
3366            wire_cursor.remaining(),
3367            0,
3368            "wire data should be fully consumed"
3369        );
3370
3371        // Now parse the stored data
3372        let mut stored_cursor: &[u8] = &raw_row.data;
3373
3374        // Parse column 0 (NVarChar)
3375        // Stored format for non-MAX NVarChar: [2-byte len][data]
3376        assert!(
3377            stored_cursor.remaining() >= 2,
3378            "need at least 2 bytes for length"
3379        );
3380        let len0 = stored_cursor.get_u16_le() as usize;
3381        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
3382        assert!(
3383            stored_cursor.remaining() >= len0,
3384            "need {len0} bytes for data"
3385        );
3386
3387        // Read UTF-16LE and convert to string
3388        let mut utf16_read = Vec::new();
3389        for _ in 0..(len0 / 2) {
3390            utf16_read.push(stored_cursor.get_u16_le());
3391        }
3392        let string0 = String::from_utf16(&utf16_read).unwrap();
3393        assert_eq!(string0, "World", "column 0 should be 'World'");
3394
3395        // Parse column 1 (IntN)
3396        // Stored format for IntN: [1-byte len][data]
3397        assert!(
3398            stored_cursor.remaining() >= 1,
3399            "need at least 1 byte for length"
3400        );
3401        let len1 = stored_cursor.get_u8();
3402        assert_eq!(len1, 4, "IntN length should be 4");
3403        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
3404        let int1 = stored_cursor.get_i32_le();
3405        assert_eq!(int1, 42, "column 1 should be 42");
3406
3407        // Verify stored data was fully consumed
3408        assert_eq!(
3409            stored_cursor.remaining(),
3410            0,
3411            "stored data should be fully consumed"
3412        );
3413    }
3414
3415    #[test]
3416    fn test_decode_nvarchar_max_then_intn_roundtrip() {
3417        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
3418
3419        // Build wire data for PLP NVARCHAR(MAX) + IntN
3420        let mut wire_data = BytesMut::new();
3421
3422        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
3423        // PLP: 8-byte total length, then chunks
3424        let word = "Hello";
3425        let utf16: Vec<u16> = word.encode_utf16().collect();
3426        let byte_len = (utf16.len() * 2) as u64;
3427
3428        wire_data.put_u64_le(byte_len); // total length = 10
3429        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
3430        for code_unit in &utf16 {
3431            wire_data.put_u16_le(*code_unit);
3432        }
3433        wire_data.put_u32_le(0); // terminating zero-length chunk
3434
3435        // Column 1: IntN 99
3436        wire_data.put_u8(4);
3437        wire_data.put_i32_le(99);
3438
3439        // Build metadata with MAX type
3440        let metadata = ColMetaData {
3441            cek_table: None,
3442            columns: vec![
3443                ColumnData {
3444                    name: "text".to_string(),
3445                    type_id: TypeId::NVarChar,
3446                    col_type: 0xE7,
3447                    flags: 0x01,
3448                    user_type: 0,
3449                    type_info: TypeInfo {
3450                        max_length: Some(0xFFFF), // MAX indicator
3451                        precision: None,
3452                        scale: None,
3453                        collation: None,
3454                    },
3455                    crypto_metadata: None,
3456                },
3457                ColumnData {
3458                    name: "num".to_string(),
3459                    type_id: TypeId::IntN,
3460                    col_type: 0x26,
3461                    flags: 0x01,
3462                    user_type: 0,
3463                    type_info: TypeInfo {
3464                        max_length: Some(4),
3465                        precision: None,
3466                        scale: None,
3467                        collation: None,
3468                    },
3469                    crypto_metadata: None,
3470                },
3471            ],
3472        };
3473
3474        // Decode wire data
3475        let mut wire_cursor = wire_data.freeze();
3476        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
3477
3478        // Verify wire data was fully consumed
3479        assert_eq!(
3480            wire_cursor.remaining(),
3481            0,
3482            "wire data should be fully consumed"
3483        );
3484
3485        // Parse stored PLP data for column 0
3486        let mut stored_cursor: &[u8] = &raw_row.data;
3487
3488        // PLP stored format: [8-byte total][chunks...][4-byte 0]
3489        let total_len = stored_cursor.get_u64_le();
3490        assert_eq!(total_len, 10, "PLP total length should be 10");
3491
3492        let chunk_len = stored_cursor.get_u32_le();
3493        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
3494
3495        let mut utf16_read = Vec::new();
3496        for _ in 0..(chunk_len / 2) {
3497            utf16_read.push(stored_cursor.get_u16_le());
3498        }
3499        let string0 = String::from_utf16(&utf16_read).unwrap();
3500        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
3501
3502        let terminator = stored_cursor.get_u32_le();
3503        assert_eq!(terminator, 0, "PLP should end with 0");
3504
3505        // Parse IntN
3506        let len1 = stored_cursor.get_u8();
3507        assert_eq!(len1, 4);
3508        let int1 = stored_cursor.get_i32_le();
3509        assert_eq!(int1, 99, "column 1 should be 99");
3510
3511        // Verify fully consumed
3512        assert_eq!(
3513            stored_cursor.remaining(),
3514            0,
3515            "stored data should be fully consumed"
3516        );
3517    }
3518
3519    // ========================================================================
3520    // ReturnStatus Token Tests
3521    // ========================================================================
3522
3523    #[test]
3524    fn test_return_status_via_parser() {
3525        // RETURNSTATUS token: type (0x79) + value (i32 LE)
3526        let data = Bytes::from_static(&[
3527            0x79, // RETURNSTATUS token type
3528            0x00, 0x00, 0x00, 0x00, // return value = 0 (success)
3529        ]);
3530
3531        let mut parser = TokenParser::new(data);
3532        let token = parser.next_token().unwrap().unwrap();
3533
3534        match token {
3535            Token::ReturnStatus(status) => {
3536                assert_eq!(status, 0);
3537            }
3538            _ => panic!("Expected ReturnStatus token, got {token:?}"),
3539        }
3540
3541        assert!(parser.next_token().unwrap().is_none());
3542    }
3543
3544    #[test]
3545    fn test_return_status_nonzero() {
3546        // Return value = -6 (common for error returns)
3547        let mut buf = BytesMut::new();
3548        buf.put_u8(0x79); // RETURNSTATUS
3549        buf.put_i32_le(-6);
3550
3551        let mut parser = TokenParser::new(buf.freeze());
3552        let token = parser.next_token().unwrap().unwrap();
3553
3554        match token {
3555            Token::ReturnStatus(status) => {
3556                assert_eq!(status, -6);
3557            }
3558            _ => panic!("Expected ReturnStatus token"),
3559        }
3560    }
3561
3562    // ========================================================================
3563    // DoneProc Token Tests
3564    // ========================================================================
3565
3566    #[test]
3567    fn test_done_proc_roundtrip() {
3568        let done = DoneProc {
3569            status: DoneStatus {
3570                more: false,
3571                error: false,
3572                in_xact: false,
3573                count: true,
3574                attn: false,
3575                srverror: false,
3576            },
3577            cur_cmd: 0x00C6, // EXECUTE (198)
3578            row_count: 100,
3579        };
3580
3581        let mut buf = BytesMut::new();
3582        done.encode(&mut buf);
3583
3584        // Verify token type byte
3585        assert_eq!(buf[0], 0xFE);
3586
3587        // Skip token type byte and decode
3588        let mut cursor = &buf[1..];
3589        let decoded = DoneProc::decode(&mut cursor).unwrap();
3590
3591        assert!(decoded.status.count);
3592        assert!(!decoded.status.more);
3593        assert!(!decoded.status.error);
3594        assert_eq!(decoded.cur_cmd, 0x00C6);
3595        assert_eq!(decoded.row_count, 100);
3596    }
3597
3598    #[test]
3599    fn test_done_proc_via_parser() {
3600        let data = Bytes::from_static(&[
3601            0xFE, // DONEPROC token type
3602            0x00, 0x00, // status: no flags
3603            0xC6, 0x00, // cur_cmd: EXECUTE (198)
3604            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 0
3605        ]);
3606
3607        let mut parser = TokenParser::new(data);
3608        let token = parser.next_token().unwrap().unwrap();
3609
3610        match token {
3611            Token::DoneProc(done) => {
3612                assert!(!done.status.count);
3613                assert!(!done.status.more);
3614                assert_eq!(done.cur_cmd, 198);
3615                assert_eq!(done.row_count, 0);
3616            }
3617            _ => panic!("Expected DoneProc token"),
3618        }
3619    }
3620
3621    #[test]
3622    fn test_done_proc_with_error_flag() {
3623        let mut buf = BytesMut::new();
3624        buf.put_u8(0xFE); // DONEPROC
3625        buf.put_u16_le(0x0002); // status: DONE_ERROR
3626        buf.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3627        buf.put_u64_le(0); // row_count
3628
3629        let mut parser = TokenParser::new(buf.freeze());
3630        let token = parser.next_token().unwrap().unwrap();
3631
3632        match token {
3633            Token::DoneProc(done) => {
3634                assert!(done.status.error);
3635                assert!(!done.status.count);
3636                assert!(!done.status.more);
3637            }
3638            _ => panic!("Expected DoneProc token"),
3639        }
3640    }
3641
3642    // ========================================================================
3643    // DoneInProc Token Tests
3644    // ========================================================================
3645
3646    #[test]
3647    fn test_done_in_proc_roundtrip() {
3648        let done = DoneInProc {
3649            status: DoneStatus {
3650                more: true,
3651                error: false,
3652                in_xact: false,
3653                count: true,
3654                attn: false,
3655                srverror: false,
3656            },
3657            cur_cmd: 193, // SELECT
3658            row_count: 7,
3659        };
3660
3661        let mut buf = BytesMut::new();
3662        done.encode(&mut buf);
3663
3664        assert_eq!(buf[0], 0xFF);
3665
3666        let mut cursor = &buf[1..];
3667        let decoded = DoneInProc::decode(&mut cursor).unwrap();
3668
3669        assert!(decoded.status.more);
3670        assert!(decoded.status.count);
3671        assert!(!decoded.status.error);
3672        assert_eq!(decoded.cur_cmd, 193);
3673        assert_eq!(decoded.row_count, 7);
3674    }
3675
3676    #[test]
3677    fn test_done_in_proc_via_parser() {
3678        let data = Bytes::from_static(&[
3679            0xFF, // DONEINPROC token type
3680            0x11, 0x00, // status: MORE | COUNT
3681            0xC1, 0x00, // cur_cmd: SELECT (193)
3682            0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 3
3683        ]);
3684
3685        let mut parser = TokenParser::new(data);
3686        let token = parser.next_token().unwrap().unwrap();
3687
3688        match token {
3689            Token::DoneInProc(done) => {
3690                assert!(done.status.more);
3691                assert!(done.status.count);
3692                assert_eq!(done.cur_cmd, 193);
3693                assert_eq!(done.row_count, 3);
3694            }
3695            _ => panic!("Expected DoneInProc token"),
3696        }
3697    }
3698
3699    // ========================================================================
3700    // ServerError Token Tests
3701    // ========================================================================
3702
3703    #[test]
3704    fn test_server_error_decode() {
3705        // Build a realistic ERROR token (without the 0xAA type byte,
3706        // since decode() is called after the parser strips it).
3707        let mut buf = BytesMut::new();
3708
3709        // Construct the message fields first to compute length
3710        let msg_utf16: Vec<u16> = "Invalid column name 'foo'.".encode_utf16().collect();
3711        let srv_utf16: Vec<u16> = "SQLDB01".encode_utf16().collect();
3712        let proc_utf16: Vec<u16> = "".encode_utf16().collect();
3713
3714        // Length = number(4) + state(1) + class(1)
3715        //        + us_varchar(message): 2 + msg_utf16.len()*2
3716        //        + b_varchar(server): 1 + srv_utf16.len()*2
3717        //        + b_varchar(procedure): 1 + proc_utf16.len()*2
3718        //        + line(4)
3719        let length: u16 = (4
3720            + 1
3721            + 1
3722            + 2
3723            + (msg_utf16.len() * 2)
3724            + 1
3725            + (srv_utf16.len() * 2)
3726            + 1
3727            + (proc_utf16.len() * 2)
3728            + 4) as u16;
3729
3730        buf.put_u16_le(length);
3731        buf.put_i32_le(207); // error number: Invalid column
3732        buf.put_u8(1); // state
3733        buf.put_u8(16); // class (severity 16)
3734
3735        // Message (US_VARCHAR: 2-byte char count + UTF-16LE)
3736        buf.put_u16_le(msg_utf16.len() as u16);
3737        for &c in &msg_utf16 {
3738            buf.put_u16_le(c);
3739        }
3740
3741        // Server (B_VARCHAR: 1-byte char count + UTF-16LE)
3742        buf.put_u8(srv_utf16.len() as u8);
3743        for &c in &srv_utf16 {
3744            buf.put_u16_le(c);
3745        }
3746
3747        // Procedure (B_VARCHAR: empty)
3748        buf.put_u8(proc_utf16.len() as u8);
3749
3750        // Line number
3751        buf.put_i32_le(42);
3752
3753        let mut cursor = buf.freeze();
3754        let error = ServerError::decode(&mut cursor).unwrap();
3755
3756        assert_eq!(error.number, 207);
3757        assert_eq!(error.state, 1);
3758        assert_eq!(error.class, 16);
3759        assert_eq!(error.message, "Invalid column name 'foo'.");
3760        assert_eq!(error.server, "SQLDB01");
3761        assert_eq!(error.procedure, "");
3762        assert_eq!(error.line, 42);
3763    }
3764
3765    #[test]
3766    fn test_server_error_severity_helpers() {
3767        let fatal = ServerError {
3768            number: 4014,
3769            state: 1,
3770            class: 20,
3771            message: "Fatal error".to_string(),
3772            server: String::new(),
3773            procedure: String::new(),
3774            line: 0,
3775        };
3776        assert!(fatal.is_fatal());
3777        assert!(fatal.is_batch_abort());
3778
3779        let batch_abort = ServerError {
3780            number: 547,
3781            state: 0,
3782            class: 16,
3783            message: "Constraint violation".to_string(),
3784            server: String::new(),
3785            procedure: String::new(),
3786            line: 1,
3787        };
3788        assert!(!batch_abort.is_fatal());
3789        assert!(batch_abort.is_batch_abort());
3790
3791        let informational = ServerError {
3792            number: 5701,
3793            state: 2,
3794            class: 10,
3795            message: "Changed db context".to_string(),
3796            server: String::new(),
3797            procedure: String::new(),
3798            line: 0,
3799        };
3800        assert!(!informational.is_fatal());
3801        assert!(!informational.is_batch_abort());
3802    }
3803
3804    #[test]
3805    fn test_server_error_via_parser() {
3806        // Build an ERROR token with the 0xAA type byte for the parser
3807        let mut buf = BytesMut::new();
3808        buf.put_u8(0xAA); // ERROR token type
3809
3810        let msg_utf16: Vec<u16> = "Syntax error".encode_utf16().collect();
3811        let srv_utf16: Vec<u16> = "SRV".encode_utf16().collect();
3812        let proc_utf16: Vec<u16> = "sp_test".encode_utf16().collect();
3813
3814        let length: u16 = (4
3815            + 1
3816            + 1
3817            + 2
3818            + (msg_utf16.len() * 2)
3819            + 1
3820            + (srv_utf16.len() * 2)
3821            + 1
3822            + (proc_utf16.len() * 2)
3823            + 4) as u16;
3824
3825        buf.put_u16_le(length);
3826        buf.put_i32_le(102); // Syntax error
3827        buf.put_u8(1);
3828        buf.put_u8(15);
3829
3830        buf.put_u16_le(msg_utf16.len() as u16);
3831        for &c in &msg_utf16 {
3832            buf.put_u16_le(c);
3833        }
3834        buf.put_u8(srv_utf16.len() as u8);
3835        for &c in &srv_utf16 {
3836            buf.put_u16_le(c);
3837        }
3838        buf.put_u8(proc_utf16.len() as u8);
3839        for &c in &proc_utf16 {
3840            buf.put_u16_le(c);
3841        }
3842        buf.put_i32_le(5);
3843
3844        let mut parser = TokenParser::new(buf.freeze());
3845        let token = parser.next_token().unwrap().unwrap();
3846
3847        match token {
3848            Token::Error(err) => {
3849                assert_eq!(err.number, 102);
3850                assert_eq!(err.class, 15);
3851                assert_eq!(err.message, "Syntax error");
3852                assert_eq!(err.server, "SRV");
3853                assert_eq!(err.procedure, "sp_test");
3854                assert_eq!(err.line, 5);
3855            }
3856            _ => panic!("Expected Error token"),
3857        }
3858    }
3859
3860    // ========================================================================
3861    // ReturnValue Token Tests
3862    // ========================================================================
3863
3864    /// Helper: build a ReturnValue token (without the 0xAC type byte)
3865    /// for an IntN output parameter.
3866    fn build_return_value_intn(
3867        ordinal: u16,
3868        name: &str,
3869        status: u8,
3870        value: Option<i32>,
3871    ) -> BytesMut {
3872        let mut inner = BytesMut::new();
3873
3874        // param_ordinal
3875        inner.put_u16_le(ordinal);
3876
3877        // param_name (B_VARCHAR)
3878        let name_utf16: Vec<u16> = name.encode_utf16().collect();
3879        inner.put_u8(name_utf16.len() as u8);
3880        for &c in &name_utf16 {
3881            inner.put_u16_le(c);
3882        }
3883
3884        // status
3885        inner.put_u8(status);
3886
3887        // user_type (4 bytes)
3888        inner.put_u32_le(0);
3889
3890        // flags (2 bytes)
3891        inner.put_u16_le(0x0001); // nullable
3892
3893        // type_id: IntN = 0x26
3894        inner.put_u8(0x26);
3895
3896        // type_info for IntN: 1-byte max_length
3897        inner.put_u8(4);
3898
3899        // value (TYPE_VARBYTE for IntN: 1-byte length + data)
3900        match value {
3901            Some(v) => {
3902                inner.put_u8(4); // length = 4
3903                inner.put_i32_le(v);
3904            }
3905            None => {
3906                inner.put_u8(0); // length = 0 means NULL
3907            }
3908        }
3909
3910        // RETURNVALUE has no outer length prefix (MS-TDS §2.2.7.18) — the
3911        // decoder walks the inner fields directly after the 0xAC token byte.
3912        inner
3913    }
3914
3915    #[test]
3916    fn test_return_value_int_output() {
3917        let buf = build_return_value_intn(1, "@result", 0x01, Some(42));
3918        let mut cursor = buf.freeze();
3919        let rv = ReturnValue::decode(&mut cursor).unwrap();
3920
3921        assert_eq!(rv.param_ordinal, 1);
3922        assert_eq!(rv.param_name, "@result");
3923        assert_eq!(rv.status, 0x01); // OUTPUT
3924        assert_eq!(rv.col_type, 0x26); // IntN
3925        assert_eq!(rv.type_info.max_length, Some(4));
3926        // Value should contain: length byte (4) + i32 LE (42)
3927        assert_eq!(rv.value.len(), 5);
3928        assert_eq!(rv.value[0], 4);
3929        assert_eq!(
3930            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3931            42
3932        );
3933    }
3934
3935    #[test]
3936    fn test_return_value_null_output() {
3937        let buf = build_return_value_intn(2, "@count", 0x01, None);
3938        let mut cursor = buf.freeze();
3939        let rv = ReturnValue::decode(&mut cursor).unwrap();
3940
3941        assert_eq!(rv.param_ordinal, 2);
3942        assert_eq!(rv.param_name, "@count");
3943        assert_eq!(rv.status, 0x01);
3944        assert_eq!(rv.col_type, 0x26);
3945        // NULL value: length byte = 0
3946        assert_eq!(rv.value.len(), 1);
3947        assert_eq!(rv.value[0], 0);
3948    }
3949
3950    #[test]
3951    fn test_return_value_udf_status() {
3952        // UDF return value has status = 0x02
3953        let buf = build_return_value_intn(0, "@RETURN_VALUE", 0x02, Some(-1));
3954        let mut cursor = buf.freeze();
3955        let rv = ReturnValue::decode(&mut cursor).unwrap();
3956
3957        assert_eq!(rv.param_ordinal, 0);
3958        assert_eq!(rv.param_name, "@RETURN_VALUE");
3959        assert_eq!(rv.status, 0x02); // UDF return value
3960        assert_eq!(rv.value[0], 4);
3961        assert_eq!(
3962            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3963            -1
3964        );
3965    }
3966
3967    #[test]
3968    fn test_return_value_nvarchar_output() {
3969        // Build a ReturnValue for NVARCHAR(100) output parameter
3970        let mut inner = BytesMut::new();
3971
3972        // param_ordinal
3973        inner.put_u16_le(1);
3974
3975        // param_name "@name"
3976        let name_utf16: Vec<u16> = "@name".encode_utf16().collect();
3977        inner.put_u8(name_utf16.len() as u8);
3978        for &c in &name_utf16 {
3979            inner.put_u16_le(c);
3980        }
3981
3982        // status = OUTPUT
3983        inner.put_u8(0x01);
3984        // user_type
3985        inner.put_u32_le(0);
3986        // flags (nullable)
3987        inner.put_u16_le(0x0001);
3988        // type_id: NVarChar = 0xE7
3989        inner.put_u8(0xE7);
3990        // type_info for NVarChar: 2-byte max_length + 5-byte collation
3991        inner.put_u16_le(200); // max 100 chars * 2 bytes
3992        inner.put_u32_le(0x0904D000); // collation LCID
3993        inner.put_u8(0x34); // collation sort_id
3994
3995        // value: "Hello" in UTF-16LE with 2-byte length prefix
3996        let val_utf16: Vec<u16> = "Hello".encode_utf16().collect();
3997        let byte_len = (val_utf16.len() * 2) as u16;
3998        inner.put_u16_le(byte_len);
3999        for &c in &val_utf16 {
4000            inner.put_u16_le(c);
4001        }
4002
4003        let mut cursor = inner.freeze();
4004        let rv = ReturnValue::decode(&mut cursor).unwrap();
4005
4006        assert_eq!(rv.param_ordinal, 1);
4007        assert_eq!(rv.param_name, "@name");
4008        assert_eq!(rv.status, 0x01);
4009        assert_eq!(rv.col_type, 0xE7); // NVarChar
4010        assert_eq!(rv.type_info.max_length, Some(200));
4011        assert!(rv.type_info.collation.is_some());
4012
4013        // Value: 2-byte length (10) + "Hello" in UTF-16LE
4014        assert_eq!(rv.value.len(), 12); // 2 + 10
4015        let val_len = u16::from_le_bytes([rv.value[0], rv.value[1]]);
4016        assert_eq!(val_len, 10);
4017    }
4018
4019    #[test]
4020    fn test_return_value_via_parser() {
4021        // Build a full ReturnValue token with the 0xAC type byte
4022        let mut data = BytesMut::new();
4023        data.put_u8(0xAC); // RETURNVALUE token type
4024        data.extend_from_slice(&build_return_value_intn(0, "@out", 0x01, Some(99)));
4025
4026        let mut parser = TokenParser::new(data.freeze());
4027        let token = parser.next_token().unwrap().unwrap();
4028
4029        match token {
4030            Token::ReturnValue(rv) => {
4031                assert_eq!(rv.param_name, "@out");
4032                assert_eq!(rv.param_ordinal, 0);
4033                assert_eq!(rv.status, 0x01);
4034                assert_eq!(rv.col_type, 0x26);
4035            }
4036            _ => panic!("Expected ReturnValue token"),
4037        }
4038    }
4039
4040    // ========================================================================
4041    // Multi-Token Stream Tests
4042    // ========================================================================
4043
4044    #[test]
4045    fn test_multi_token_stored_proc_response() {
4046        // Simulate a stored procedure response:
4047        // DoneInProc (result set done) → ReturnStatus → DoneProc
4048        let mut data = BytesMut::new();
4049
4050        // Token 1: DONEINPROC — result set with 3 rows
4051        data.put_u8(0xFF); // DONEINPROC
4052        data.put_u16_le(0x0010); // status: COUNT
4053        data.put_u16_le(0x00C1); // cur_cmd: SELECT
4054        data.put_u64_le(3); // row_count
4055
4056        // Token 2: RETURNSTATUS — procedure returned 0
4057        data.put_u8(0x79); // RETURNSTATUS
4058        data.put_i32_le(0);
4059
4060        // Token 3: DONEPROC — final
4061        data.put_u8(0xFE); // DONEPROC
4062        data.put_u16_le(0x0000); // status: no flags
4063        data.put_u16_le(0x00C6); // cur_cmd: EXECUTE
4064        data.put_u64_le(0);
4065
4066        let mut parser = TokenParser::new(data.freeze());
4067
4068        // Token 1: DoneInProc
4069        let t1 = parser.next_token().unwrap().unwrap();
4070        match t1 {
4071            Token::DoneInProc(done) => {
4072                assert!(done.status.count);
4073                assert_eq!(done.row_count, 3);
4074                assert_eq!(done.cur_cmd, 193);
4075            }
4076            _ => panic!("Expected DoneInProc, got {t1:?}"),
4077        }
4078
4079        // Token 2: ReturnStatus
4080        let t2 = parser.next_token().unwrap().unwrap();
4081        match t2 {
4082            Token::ReturnStatus(status) => {
4083                assert_eq!(status, 0);
4084            }
4085            _ => panic!("Expected ReturnStatus, got {t2:?}"),
4086        }
4087
4088        // Token 3: DoneProc
4089        let t3 = parser.next_token().unwrap().unwrap();
4090        match t3 {
4091            Token::DoneProc(done) => {
4092                assert!(!done.status.count);
4093                assert!(!done.status.more);
4094                assert_eq!(done.cur_cmd, 198);
4095            }
4096            _ => panic!("Expected DoneProc, got {t3:?}"),
4097        }
4098
4099        // No more tokens
4100        assert!(parser.next_token().unwrap().is_none());
4101    }
4102
4103    #[test]
4104    fn test_multi_token_error_in_stream() {
4105        // Simulate: ERROR → DONE (error during query)
4106        let mut data = BytesMut::new();
4107
4108        // Token 1: ERROR
4109        data.put_u8(0xAA);
4110
4111        let msg_utf16: Vec<u16> = "Deadlock".encode_utf16().collect();
4112        let srv_utf16: Vec<u16> = "DB1".encode_utf16().collect();
4113
4114        let length: u16 = (4 + 1 + 1
4115            + 2 + (msg_utf16.len() * 2)
4116            + 1 + (srv_utf16.len() * 2)
4117            + 1  // empty procedure
4118            + 4) as u16;
4119
4120        data.put_u16_le(length);
4121        data.put_i32_le(1205); // deadlock
4122        data.put_u8(51); // state
4123        data.put_u8(13); // class
4124
4125        data.put_u16_le(msg_utf16.len() as u16);
4126        for &c in &msg_utf16 {
4127            data.put_u16_le(c);
4128        }
4129        data.put_u8(srv_utf16.len() as u8);
4130        for &c in &srv_utf16 {
4131            data.put_u16_le(c);
4132        }
4133        data.put_u8(0); // empty procedure
4134        data.put_i32_le(0);
4135
4136        // Token 2: DONE with error flag
4137        data.put_u8(0xFD);
4138        data.put_u16_le(0x0002); // DONE_ERROR
4139        data.put_u16_le(0x00C1); // SELECT
4140        data.put_u64_le(0);
4141
4142        let mut parser = TokenParser::new(data.freeze());
4143
4144        // Token 1: Error
4145        let t1 = parser.next_token().unwrap().unwrap();
4146        match t1 {
4147            Token::Error(err) => {
4148                assert_eq!(err.number, 1205);
4149                assert_eq!(err.class, 13);
4150                assert_eq!(err.message, "Deadlock");
4151                assert_eq!(err.server, "DB1");
4152            }
4153            _ => panic!("Expected Error token, got {t1:?}"),
4154        }
4155
4156        // Token 2: Done with error
4157        let t2 = parser.next_token().unwrap().unwrap();
4158        match t2 {
4159            Token::Done(done) => {
4160                assert!(done.status.error);
4161                assert!(!done.status.count);
4162            }
4163            _ => panic!("Expected Done token, got {t2:?}"),
4164        }
4165
4166        assert!(parser.next_token().unwrap().is_none());
4167    }
4168
4169    #[test]
4170    fn test_multi_token_proc_with_return_value() {
4171        // Simulate stored proc: ReturnValue → ReturnStatus → DoneProc
4172        let mut data = BytesMut::new();
4173
4174        // Token 1: ReturnValue (@result = 42)
4175        data.put_u8(0xAC);
4176        data.extend_from_slice(&build_return_value_intn(1, "@result", 0x01, Some(42)));
4177
4178        // Token 2: ReturnStatus = 0
4179        data.put_u8(0x79);
4180        data.put_i32_le(0);
4181
4182        // Token 3: DoneProc
4183        data.put_u8(0xFE);
4184        data.put_u16_le(0x0000);
4185        data.put_u16_le(0x00C6);
4186        data.put_u64_le(0);
4187
4188        let mut parser = TokenParser::new(data.freeze());
4189
4190        let t1 = parser.next_token().unwrap().unwrap();
4191        match t1 {
4192            Token::ReturnValue(rv) => {
4193                assert_eq!(rv.param_name, "@result");
4194                assert_eq!(rv.param_ordinal, 1);
4195            }
4196            _ => panic!("Expected ReturnValue, got {t1:?}"),
4197        }
4198
4199        let t2 = parser.next_token().unwrap().unwrap();
4200        assert!(matches!(t2, Token::ReturnStatus(0)));
4201
4202        let t3 = parser.next_token().unwrap().unwrap();
4203        assert!(matches!(t3, Token::DoneProc(_)));
4204
4205        assert!(parser.next_token().unwrap().is_none());
4206    }
4207
4208    // ========================================================================
4209    // EOF / Truncation Edge Cases
4210    // ========================================================================
4211
4212    #[test]
4213    fn test_return_status_truncated() {
4214        // Only 3 bytes instead of 4 for i32
4215        let data = Bytes::from_static(&[0x79, 0x01, 0x02, 0x03]);
4216        let mut parser = TokenParser::new(data);
4217        assert!(parser.next_token().is_err());
4218    }
4219
4220    #[test]
4221    fn test_done_proc_truncated() {
4222        // Only 8 bytes instead of 12
4223        let data = Bytes::from_static(&[0xFE, 0x00, 0x00, 0xC1, 0x00, 0x01, 0x00, 0x00, 0x00]);
4224        let mut parser = TokenParser::new(data);
4225        assert!(parser.next_token().is_err());
4226    }
4227
4228    #[test]
4229    fn test_server_error_truncated() {
4230        // ERROR token with only the length field (body truncated)
4231        let data = Bytes::from_static(&[0xAA, 0x20, 0x00]);
4232        let mut parser = TokenParser::new(data);
4233        assert!(parser.next_token().is_err());
4234    }
4235
4236    // ========================================================================
4237    // FEDAUTHINFO (issue #189: parser must follow MS-TDS §2.2.7.12)
4238    // ========================================================================
4239
4240    /// Build a spec-exact FEDAUTHINFO token (including the 0xEE type byte):
4241    /// DWORD TokenLength, DWORD CountOfInfoIDs, option headers of
4242    /// ID/DataLen/DataOffset, then UTF-16LE data addressed by the offsets
4243    /// (relative to the start of the count field).
4244    fn build_fed_auth_info_token(options: &[(u8, &str)]) -> Vec<u8> {
4245        let headers_end = 4 + options.len() * 9;
4246        let mut data_block = Vec::new();
4247        let mut headers = Vec::new();
4248        for (id, value) in options {
4249            let encoded: Vec<u8> = value.encode_utf16().flat_map(u16::to_le_bytes).collect();
4250            let offset = headers_end + data_block.len();
4251            headers.push(*id);
4252            headers.extend_from_slice(&u32::try_from(encoded.len()).unwrap().to_le_bytes());
4253            headers.extend_from_slice(&u32::try_from(offset).unwrap().to_le_bytes());
4254            data_block.extend_from_slice(&encoded);
4255        }
4256
4257        let token_len = 4 + headers.len() + data_block.len();
4258        let mut out = vec![0xEE];
4259        out.extend_from_slice(&u32::try_from(token_len).unwrap().to_le_bytes());
4260        out.extend_from_slice(&u32::try_from(options.len()).unwrap().to_le_bytes());
4261        out.extend_from_slice(&headers);
4262        out.extend_from_slice(&data_block);
4263        out
4264    }
4265
4266    #[test]
4267    fn test_fed_auth_info_decodes_spec_layout() {
4268        const STS: &str = "https://login.microsoftonline.com/common";
4269        const SPN: &str = "https://database.windows.net/";
4270        // STSURL (0x01) listed first, SPN (0x02) second. Real Azure servers
4271        // list SPN first (see the captured-token test below); decoding must
4272        // not depend on option order.
4273        let token = build_fed_auth_info_token(&[(0x01, STS), (0x02, SPN)]);
4274
4275        let mut parser = TokenParser::new(Bytes::from(token));
4276        let parsed = parser.next_token().unwrap().unwrap();
4277        let Token::FedAuthInfo(info) = parsed else {
4278            panic!("expected FedAuthInfo, got {parsed:?}");
4279        };
4280        assert_eq!(info.sts_url, STS);
4281        assert_eq!(info.spn, SPN);
4282        assert!(parser.next_token().unwrap().is_none(), "exact consumption");
4283    }
4284
4285    #[test]
4286    fn test_fed_auth_info_preserves_following_tokens() {
4287        // The old parser looped over the whole remaining stream, swallowing
4288        // the tokens that follow FEDAUTHINFO during login. A DONE token
4289        // appended after it must survive.
4290        let mut stream = build_fed_auth_info_token(&[
4291            (0x01, "https://sts.example/"),
4292            (0x02, "https://db.example/"),
4293        ]);
4294        stream.push(0xFD); // DONE
4295        stream.extend_from_slice(&0u16.to_le_bytes()); // status
4296        stream.extend_from_slice(&0u16.to_le_bytes()); // curcmd
4297        stream.extend_from_slice(&0u64.to_le_bytes()); // rowcount
4298
4299        let mut parser = TokenParser::new(Bytes::from(stream));
4300        assert!(matches!(
4301            parser.next_token().unwrap(),
4302            Some(Token::FedAuthInfo(_))
4303        ));
4304        assert!(
4305            matches!(parser.next_token().unwrap(), Some(Token::Done(_))),
4306            "DONE after FEDAUTHINFO must not be swallowed"
4307        );
4308        assert!(parser.next_token().unwrap().is_none());
4309    }
4310
4311    #[test]
4312    fn test_fed_auth_info_unknown_ids_ignored() {
4313        // Spec: unrecognized FedAuthInfoIDs must be ignored.
4314        let token =
4315            build_fed_auth_info_token(&[(0x7F, "ignore-me"), (0x01, "https://sts.example/")]);
4316        let mut parser = TokenParser::new(Bytes::from(token));
4317        let Some(Token::FedAuthInfo(info)) = parser.next_token().unwrap() else {
4318            panic!("expected FedAuthInfo");
4319        };
4320        assert_eq!(info.sts_url, "https://sts.example/");
4321        assert_eq!(info.spn, "");
4322    }
4323
4324    #[test]
4325    fn test_fed_auth_info_hostile_inputs_error() {
4326        // TokenLength longer than the buffer.
4327        let mut truncated = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4328        truncated.truncate(truncated.len() - 4);
4329        assert!(
4330            TokenParser::new(Bytes::from(truncated))
4331                .next_token()
4332                .is_err()
4333        );
4334
4335        // CountOfInfoIDs claims more headers than the token holds
4336        // (also covers hostile counts whose header math would overflow).
4337        let mut bad_count = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4338        bad_count[5..9].copy_from_slice(&u32::MAX.to_le_bytes());
4339        assert!(
4340            TokenParser::new(Bytes::from(bad_count))
4341                .next_token()
4342                .is_err()
4343        );
4344
4345        // Data offset pointing past the end of the token.
4346        let mut bad_offset = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4347        bad_offset[14..18].copy_from_slice(&u32::MAX.to_le_bytes());
4348        assert!(
4349            TokenParser::new(Bytes::from(bad_offset))
4350                .next_token()
4351                .is_err()
4352        );
4353
4354        // Odd data length cannot be UTF-16.
4355        let mut odd_len = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4356        odd_len[10..14].copy_from_slice(&3u32.to_le_bytes());
4357        assert!(TokenParser::new(Bytes::from(odd_len)).next_token().is_err());
4358    }
4359
4360    #[test]
4361    fn test_fed_auth_info_parse_and_skip_agree() {
4362        // Issue #189: decode() and skip_token() must consume the same bytes
4363        // (the old decode ran past the token while skip honored the length).
4364        let token = build_fed_auth_info_token(&[(0x02, "https://sts.example/")]);
4365        let total = token.len();
4366
4367        let mut parser = TokenParser::new(Bytes::from(token.clone()));
4368        parser.next_token().unwrap();
4369        assert_eq!(parser.position(), total, "decode consumption");
4370
4371        let mut skipper = TokenParser::new(Bytes::from(token));
4372        skipper.skip_token().unwrap();
4373        assert_eq!(skipper.position(), total, "skip consumption");
4374    }
4375
4376    /// A FEDAUTHINFO token captured from a live Azure SQL Database login on
4377    /// 2026-06-12 (the client declared the ADAL library in LOGIN7; the server
4378    /// responded with this token). The tenant GUID inside the STS URL is
4379    /// replaced with an all-zero GUID of identical length, so every offset
4380    /// and length is byte-identical to the wire capture.
4381    ///
4382    /// This is the regression test deferred from PR #193, and it earns its
4383    /// keep: the real token proves FedAuthInfoID 0x01 = STSURL and
4384    /// 0x02 = SPN (Azure lists SPN first), which the synthetic tests
4385    /// originally had swapped.
4386    #[test]
4387    fn test_fed_auth_info_captured_from_azure() {
4388        const CAPTURED: &[u8] = &[
4389            0xEE, 0xCC, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x02, 0x3A, 0x00, 0x00, 0x00,
4390            0x16, 0x00, 0x00, 0x00, 0x01, 0x7C, 0x00, 0x00, 0x00, 0x50, 0x00, 0x00, 0x00, 0x68,
4391            0x00, 0x74, 0x00, 0x74, 0x00, 0x70, 0x00, 0x73, 0x00, 0x3A, 0x00, 0x2F, 0x00, 0x2F,
4392            0x00, 0x64, 0x00, 0x61, 0x00, 0x74, 0x00, 0x61, 0x00, 0x62, 0x00, 0x61, 0x00, 0x73,
4393            0x00, 0x65, 0x00, 0x2E, 0x00, 0x77, 0x00, 0x69, 0x00, 0x6E, 0x00, 0x64, 0x00, 0x6F,
4394            0x00, 0x77, 0x00, 0x73, 0x00, 0x2E, 0x00, 0x6E, 0x00, 0x65, 0x00, 0x74, 0x00, 0x2F,
4395            0x00, 0x68, 0x00, 0x74, 0x00, 0x74, 0x00, 0x70, 0x00, 0x73, 0x00, 0x3A, 0x00, 0x2F,
4396            0x00, 0x2F, 0x00, 0x6C, 0x00, 0x6F, 0x00, 0x67, 0x00, 0x69, 0x00, 0x6E, 0x00, 0x2E,
4397            0x00, 0x77, 0x00, 0x69, 0x00, 0x6E, 0x00, 0x64, 0x00, 0x6F, 0x00, 0x77, 0x00, 0x73,
4398            0x00, 0x2E, 0x00, 0x6E, 0x00, 0x65, 0x00, 0x74, 0x00, 0x2F, 0x00, 0x30, 0x00, 0x30,
4399            0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x2D,
4400            0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x2D, 0x00, 0x30, 0x00, 0x30,
4401            0x00, 0x30, 0x00, 0x30, 0x00, 0x2D, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30,
4402            0x00, 0x2D, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30,
4403            0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00, 0x30, 0x00,
4404        ];
4405
4406        let mut parser = TokenParser::new(Bytes::from_static(CAPTURED));
4407        let Some(Token::FedAuthInfo(info)) = parser.next_token().unwrap() else {
4408            panic!("expected FedAuthInfo");
4409        };
4410        assert_eq!(
4411            info.sts_url,
4412            "https://login.windows.net/00000000-0000-0000-0000-000000000000"
4413        );
4414        assert_eq!(info.spn, "https://database.windows.net/");
4415        assert!(
4416            parser.next_token().unwrap().is_none(),
4417            "the captured token must be consumed exactly"
4418        );
4419    }
4420
4421    /// Regression test for #273 (remote DoS via unbounded token-skip recursion).
4422    ///
4423    /// COLINFO/TABNAME/OFFSET tokens are skipped, and the skip path used to
4424    /// self-recurse to fetch the next token — one real stack frame per skipped
4425    /// token (Rust guarantees no tail-call elimination). Because
4426    /// `Connection::read_message` reassembles every packet into one `Bytes`
4427    /// before tokenizing, a server-controlled multi-MB message of these 3-byte
4428    /// tokens drove ~10^5–10^6 frames and aborted the process.
4429    ///
4430    /// This feeds a flat run of 200_000 skip-tokens — far deeper than the
4431    /// (~2 MB) test-thread stack could survive by recursion — followed by a
4432    /// real DONE. Pre-fix, the recursive parser aborted (SIGABRT) on this
4433    /// input. Post-fix it must return that DONE with the whole buffer consumed,
4434    /// proving the skip path is bounded-stack and that the input was
4435    /// non-degenerate (all 200_000 tokens were actually traversed).
4436    #[test]
4437    fn skip_tokens_iterate_not_recurse_273() {
4438        const SKIP_COUNT: usize = 200_000;
4439        let mut buf = BytesMut::with_capacity(SKIP_COUNT * 3 + 13);
4440        for _ in 0..SKIP_COUNT {
4441            buf.put_u8(TokenType::ColInfo as u8);
4442            buf.put_u16_le(0); // zero-length body: 3 bytes total per skip-token
4443        }
4444        let done = Done {
4445            status: DoneStatus {
4446                more: false,
4447                error: false,
4448                in_xact: false,
4449                count: true,
4450                attn: false,
4451                srverror: false,
4452            },
4453            cur_cmd: 0xABCD,
4454            row_count: 99,
4455        };
4456        done.encode(&mut buf);
4457        let total_len = buf.len();
4458
4459        let mut parser = TokenParser::new(buf.freeze());
4460
4461        // A specific outcome — the DONE that follows the skip run — not a
4462        // generic is_err an unrelated parse failure could also satisfy.
4463        let Some(Token::Done(decoded)) = parser.next_token().unwrap() else {
4464            panic!("expected the DONE token after the skip run");
4465        };
4466        assert_eq!(decoded.cur_cmd, 0xABCD);
4467        assert_eq!(decoded.row_count, 99);
4468
4469        // Every skipped token was traversed: proof the input was non-trivial.
4470        assert_eq!(parser.position(), total_len);
4471        assert!(parser.next_token().unwrap().is_none());
4472    }
4473
4474    /// `skip_token` must parse a RETURNVALUE to find its end, not treat the
4475    /// leading ParamOrdinal as a 2-byte length prefix (issue #281). With the
4476    /// old length-prefix treatment it skipped `1 + 2 + ParamOrdinal` bytes,
4477    /// landing mid-token and corrupting the rest of the stream.
4478    #[test]
4479    fn skip_token_returnvalue_advances_to_token_end() {
4480        let rv = build_return_value_intn(1, "@result", 0x01, Some(42));
4481        let rv_token_len = 1 + rv.len(); // 0xAC type byte + inner fields
4482
4483        let mut buf = BytesMut::new();
4484        buf.put_u8(TokenType::ReturnValue as u8);
4485        buf.extend_from_slice(&rv);
4486        // A trailing DONE proves the parser realigns exactly after the skip.
4487        let done = Done {
4488            status: DoneStatus {
4489                more: false,
4490                error: false,
4491                in_xact: false,
4492                count: true,
4493                attn: false,
4494                srverror: false,
4495            },
4496            cur_cmd: 0x1234,
4497            row_count: 7,
4498        };
4499        done.encode(&mut buf);
4500
4501        let mut parser = TokenParser::new(buf.freeze());
4502        parser.skip_token().unwrap();
4503        assert_eq!(parser.position(), rv_token_len);
4504
4505        let Some(Token::Done(decoded)) = parser.next_token().unwrap() else {
4506            panic!("expected the DONE token after skipping the RETURNVALUE");
4507        };
4508        assert_eq!(decoded.cur_cmd, 0x1234);
4509        assert_eq!(decoded.row_count, 7);
4510    }
4511
4512    /// Build an ALTMETADATA token describing one `SUM` aggregate over an `int`
4513    /// column (empty name), followed by `rows` ALTROW tokens each carrying that
4514    /// int value. Mirrors the wire shape of a `COMPUTE BY` result set.
4515    fn compute_by_tokens(id: u16, rows: &[i32]) -> BytesMut {
4516        let mut buf = BytesMut::new();
4517        // ALTMETADATA
4518        buf.put_u8(TokenType::AltMetaData as u8);
4519        buf.put_u16_le(1); // Count = 1 ComputeData
4520        buf.put_u16_le(id); // Id
4521        buf.put_u8(1); // ByCols = 1
4522        buf.put_u16_le(1); // ColNum[0]
4523        buf.put_u8(0x4D); // Op = AOPSUM
4524        buf.put_u16_le(1); // Operand (column 1)
4525        buf.put_u32_le(0); // UserType (ULONG)
4526        buf.put_u16_le(0); // Flags
4527        buf.put_u8(TypeId::Int4 as u8); // TYPE_INFO (fixed-length int4)
4528        buf.put_u8(0); // ColName: B_VARCHAR length 0
4529        // ALTROW(s)
4530        for &v in rows {
4531            buf.put_u8(TokenType::AltRow as u8);
4532            buf.put_u16_le(id); // Id matching the ALTMETADATA
4533            buf.put_i32_le(v); // int4 value
4534        }
4535        buf
4536    }
4537
4538    fn trailing_done(buf: &mut BytesMut) {
4539        let done = Done {
4540            status: DoneStatus {
4541                more: false,
4542                error: false,
4543                in_xact: false,
4544                count: true,
4545                attn: false,
4546                srverror: false,
4547            },
4548            cur_cmd: 0xBEEF,
4549            row_count: 3,
4550        };
4551        done.encode(buf);
4552    }
4553
4554    /// #275: a COMPUTE BY result set (ALTMETADATA 0x88 + ALTROW 0xD3) must be
4555    /// silently consumed, not hard-error. The DONE following the compute tokens
4556    /// must still be delivered, and every byte traversed.
4557    #[test]
4558    fn compute_by_alt_tokens_skipped_275() {
4559        let mut buf = compute_by_tokens(7, &[42, -1]);
4560        trailing_done(&mut buf);
4561        let total_len = buf.len();
4562
4563        let mut parser = TokenParser::new(buf.freeze());
4564
4565        // The first token surfaced is the DONE — both ALTROWs and the
4566        // ALTMETADATA were dropped, not returned.
4567        let Some(Token::Done(done)) = parser.next_token().unwrap() else {
4568            panic!("expected DONE after the COMPUTE BY tokens were skipped");
4569        };
4570        assert_eq!(done.cur_cmd, 0xBEEF);
4571
4572        // All alt-token bytes were consumed to reach the DONE.
4573        assert_eq!(parser.position(), total_len);
4574        assert!(parser.next_token().unwrap().is_none());
4575    }
4576
4577    /// #275: ALTMETADATA must be remembered across calls so a following
4578    /// ColMetaData (new result set) clears stale compute Ids.
4579    #[test]
4580    fn compute_by_alt_metadata_cleared_on_new_colmetadata_275() {
4581        let mut buf = compute_by_tokens(7, &[42]);
4582        // New result set: a COLMETADATA token clears alt_metadata. Encode an
4583        // empty (NO_METADATA) ColMetaData, then reuse Id 7 for a fresh compute.
4584        buf.put_u8(TokenType::ColMetaData as u8);
4585        buf.put_u16_le(ColMetaData::NO_METADATA);
4586        buf.extend_from_slice(&compute_by_tokens(7, &[99]));
4587        trailing_done(&mut buf);
4588        let total_len = buf.len();
4589
4590        let mut parser = TokenParser::new(buf.freeze());
4591
4592        // First surfaced token: the empty ColMetaData (compute tokens dropped).
4593        let Some(Token::ColMetaData(_)) = parser.next_token().unwrap() else {
4594            panic!("expected ColMetaData between the two compute groups");
4595        };
4596        // Then the DONE, after the second compute group is also dropped.
4597        let Some(Token::Done(_)) = parser.next_token().unwrap() else {
4598            panic!("expected DONE after the second COMPUTE BY group");
4599        };
4600        assert_eq!(parser.position(), total_len);
4601    }
4602
4603    /// #275: an ALTROW with no matching ALTMETADATA cannot be length-decoded;
4604    /// it must error cleanly rather than panic or desync.
4605    #[test]
4606    fn altrow_without_altmetadata_errors_275() {
4607        let mut buf = BytesMut::new();
4608        buf.put_u8(TokenType::AltRow as u8);
4609        buf.put_u16_le(7); // Id with no preceding ALTMETADATA
4610        buf.put_i32_le(42);
4611
4612        let mut parser = TokenParser::new(buf.freeze());
4613        assert!(parser.next_token().is_err());
4614    }
4615}