Skip to main content

tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,no_run
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! fn parse(data: Bytes) -> Result<(), tds_protocol::ProtocolError> {
19//!     let mut parser = TokenParser::new(data);
20//!
21//!     while let Some(token) = parser.next_token()? {
22//!         match token {
23//!             Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!             Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!             _ => {}
26//!         }
27//!     }
28//!     Ok(())
29//! }
30//! ```
31
32use bytes::{Buf, BufMut, Bytes};
33
34use crate::codec::{read_b_varchar, read_us_varchar};
35use crate::error::ProtocolError;
36use crate::prelude::*;
37use crate::types::TypeId;
38
39/// Token type identifier.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41#[repr(u8)]
42#[non_exhaustive]
43pub enum TokenType {
44    /// Column metadata (COLMETADATA).
45    ColMetaData = 0x81,
46    /// Error message (ERROR).
47    Error = 0xAA,
48    /// Informational message (INFO).
49    Info = 0xAB,
50    /// Login acknowledgment (LOGINACK).
51    LoginAck = 0xAD,
52    /// Row data (ROW).
53    Row = 0xD1,
54    /// Null bitmap compressed row (NBCROW).
55    NbcRow = 0xD2,
56    /// Environment change (ENVCHANGE).
57    EnvChange = 0xE3,
58    /// SSPI authentication (SSPI).
59    Sspi = 0xED,
60    /// Done (DONE).
61    Done = 0xFD,
62    /// Done in procedure (DONEINPROC).
63    DoneInProc = 0xFF,
64    /// Done procedure (DONEPROC).
65    DoneProc = 0xFE,
66    /// Return status (RETURNSTATUS).
67    ReturnStatus = 0x79,
68    /// Return value (RETURNVALUE).
69    ReturnValue = 0xAC,
70    /// Order (ORDER).
71    Order = 0xA9,
72    /// Feature extension acknowledgment (FEATUREEXTACK).
73    FeatureExtAck = 0xAE,
74    /// Session state (SESSIONSTATE).
75    SessionState = 0xE4,
76    /// Federated authentication info (FEDAUTHINFO).
77    FedAuthInfo = 0xEE,
78    /// Column info (COLINFO).
79    ColInfo = 0xA5,
80    /// Table name (TABNAME).
81    TabName = 0xA4,
82    /// Offset (OFFSET).
83    Offset = 0x78,
84}
85
86impl TokenType {
87    /// Create a token type from a raw byte.
88    pub fn from_u8(value: u8) -> Option<Self> {
89        match value {
90            0x81 => Some(Self::ColMetaData),
91            0xAA => Some(Self::Error),
92            0xAB => Some(Self::Info),
93            0xAD => Some(Self::LoginAck),
94            0xD1 => Some(Self::Row),
95            0xD2 => Some(Self::NbcRow),
96            0xE3 => Some(Self::EnvChange),
97            0xED => Some(Self::Sspi),
98            0xFD => Some(Self::Done),
99            0xFF => Some(Self::DoneInProc),
100            0xFE => Some(Self::DoneProc),
101            0x79 => Some(Self::ReturnStatus),
102            0xAC => Some(Self::ReturnValue),
103            0xA9 => Some(Self::Order),
104            0xAE => Some(Self::FeatureExtAck),
105            0xE4 => Some(Self::SessionState),
106            0xEE => Some(Self::FedAuthInfo),
107            0xA5 => Some(Self::ColInfo),
108            0xA4 => Some(Self::TabName),
109            0x78 => Some(Self::Offset),
110            _ => None,
111        }
112    }
113}
114
115/// Parsed TDS token.
116///
117/// This enum represents all possible tokens that can be received from SQL Server.
118/// Each variant contains the parsed token data.
119#[derive(Debug, Clone)]
120#[non_exhaustive]
121pub enum Token {
122    /// Column metadata describing result set structure.
123    ColMetaData(ColMetaData),
124    /// Row data.
125    Row(RawRow),
126    /// Null bitmap compressed row.
127    NbcRow(NbcRow),
128    /// Completion of a SQL statement.
129    Done(Done),
130    /// Completion of a stored procedure.
131    DoneProc(DoneProc),
132    /// Completion within a stored procedure.
133    DoneInProc(DoneInProc),
134    /// Return status from stored procedure.
135    ReturnStatus(i32),
136    /// Return value from stored procedure.
137    ReturnValue(ReturnValue),
138    /// Error message from server.
139    Error(ServerError),
140    /// Informational message from server.
141    Info(ServerInfo),
142    /// Login acknowledgment.
143    LoginAck(LoginAck),
144    /// Environment change notification.
145    EnvChange(EnvChange),
146    /// Column ordering information.
147    Order(Order),
148    /// Feature extension acknowledgment.
149    FeatureExtAck(FeatureExtAck),
150    /// SSPI authentication data.
151    Sspi(SspiToken),
152    /// Session state information.
153    SessionState(SessionState),
154    /// Federated authentication info.
155    FedAuthInfo(FedAuthInfo),
156}
157
158/// Column metadata token.
159#[derive(Debug, Clone, Default)]
160pub struct ColMetaData {
161    /// Column definitions.
162    pub columns: Vec<ColumnData>,
163    /// CEK table for Always Encrypted result sets.
164    /// Present only when the server sends encrypted column metadata.
165    pub cek_table: Option<crate::crypto::CekTable>,
166}
167
168/// Column definition within metadata.
169#[derive(Debug, Clone)]
170pub struct ColumnData {
171    /// Column name.
172    pub name: String,
173    /// Column data type ID.
174    pub type_id: TypeId,
175    /// Column data type raw byte (for unknown types).
176    pub col_type: u8,
177    /// Column flags.
178    pub flags: u16,
179    /// User type ID.
180    pub user_type: u32,
181    /// Type-specific metadata.
182    pub type_info: TypeInfo,
183    /// Per-column encryption metadata (Always Encrypted).
184    /// Present only for columns with the encrypted flag (0x0800) set.
185    pub crypto_metadata: Option<crate::crypto::CryptoMetadata>,
186}
187
188/// Type-specific metadata.
189#[derive(Debug, Clone, Default)]
190pub struct TypeInfo {
191    /// Maximum length for variable-length types.
192    pub max_length: Option<u32>,
193    /// Precision for numeric types.
194    pub precision: Option<u8>,
195    /// Scale for numeric types.
196    pub scale: Option<u8>,
197    /// Collation for string types.
198    pub collation: Option<Collation>,
199}
200
201/// SQL Server collation.
202///
203/// Collations in SQL Server define the character encoding and sorting rules
204/// for string data. For `VARCHAR` columns, the collation determines which
205/// code page (character encoding) is used to store the data.
206///
207/// # Encoding Support
208///
209/// When the `encoding` feature is enabled, the [`Collation::encoding()`] method
210/// returns the appropriate [`encoding_rs::Encoding`] for decoding `VARCHAR` data.
211///
212/// # Example
213///
214/// ```rust,ignore
215/// use tds_protocol::token::Collation;
216///
217/// let collation = Collation { lcid: 0x0804, sort_id: 0 }; // Chinese (PRC)
218/// if let Some(encoding) = collation.encoding() {
219///     let (decoded, _, _) = encoding.decode(raw_bytes);
220///     // decoded is now proper Chinese text
221/// }
222/// ```
223#[derive(Debug, Clone, Copy, Default)]
224pub struct Collation {
225    /// Locale ID (LCID).
226    ///
227    /// The LCID encodes both the language and region. The lower 16 bits
228    /// contain the primary language ID, and bits 16-19 contain the sort ID
229    /// for some collations.
230    ///
231    /// For UTF-8 collations (SQL Server 2019+), bit 27 (0x0800_0000) is set.
232    pub lcid: u32,
233    /// Sort ID.
234    ///
235    /// Used with certain collations to specify sorting behavior.
236    pub sort_id: u8,
237}
238
239impl Collation {
240    /// Create a `Collation` from the 5-byte TDS wire format.
241    ///
242    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
243    pub fn from_bytes(bytes: &[u8; 5]) -> Self {
244        Self {
245            lcid: u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
246            sort_id: bytes[4],
247        }
248    }
249
250    /// Serialize to the 5-byte TDS wire format.
251    ///
252    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
253    pub fn to_bytes(&self) -> [u8; 5] {
254        let b = self.lcid.to_le_bytes();
255        [b[0], b[1], b[2], b[3], self.sort_id]
256    }
257
258    /// Returns the character encoding for this collation.
259    ///
260    /// This method maps the collation's LCID to the appropriate character
261    /// encoding from the `encoding_rs` crate.
262    ///
263    /// # Returns
264    ///
265    /// - `Some(&Encoding)` - The encoding to use for decoding `VARCHAR` data
266    /// - `None` - If the collation uses UTF-8 (no transcoding needed) or
267    ///   the LCID is not recognized (caller should use Windows-1252 fallback)
268    ///
269    /// # UTF-8 Collations
270    ///
271    /// SQL Server 2019+ supports UTF-8 collations (identified by the `_UTF8`
272    /// suffix). These return `None` because no transcoding is needed.
273    ///
274    /// # Example
275    ///
276    /// ```rust,ignore
277    /// let collation = Collation { lcid: 0x0419, sort_id: 0 }; // Russian
278    /// if let Some(encoding) = collation.encoding() {
279    ///     // encoding is Windows-1251 for Cyrillic
280    ///     let (text, _, had_errors) = encoding.decode(&raw_bytes);
281    /// }
282    /// ```
283    #[cfg(feature = "encoding")]
284    pub fn encoding(&self) -> Option<&'static encoding_rs::Encoding> {
285        crate::collation::encoding_for_lcid(self.lcid)
286    }
287
288    /// Returns whether this collation uses UTF-8 encoding.
289    ///
290    /// UTF-8 collations were introduced in SQL Server 2019 and are
291    /// identified by the `_UTF8` suffix in the collation name.
292    #[cfg(feature = "encoding")]
293    pub fn is_utf8(&self) -> bool {
294        crate::collation::is_utf8_collation(self.lcid)
295    }
296
297    /// Returns the Windows code page number for this collation.
298    ///
299    /// Useful for error messages and debugging.
300    ///
301    /// # Returns
302    ///
303    /// The code page number (e.g., 1252 for Western European, 932 for Japanese).
304    #[cfg(feature = "encoding")]
305    pub fn code_page(&self) -> Option<u16> {
306        crate::collation::code_page_for_lcid(self.lcid)
307    }
308
309    /// Returns the encoding name for this collation.
310    ///
311    /// Useful for error messages and debugging.
312    #[cfg(feature = "encoding")]
313    pub fn encoding_name(&self) -> &'static str {
314        crate::collation::encoding_name_for_lcid(self.lcid)
315    }
316}
317
318/// Raw row data (not yet decoded).
319#[derive(Debug, Clone)]
320pub struct RawRow {
321    /// Raw column values.
322    pub data: bytes::Bytes,
323}
324
325/// Null bitmap compressed row.
326#[derive(Debug, Clone)]
327pub struct NbcRow {
328    /// Null bitmap.
329    pub null_bitmap: Vec<u8>,
330    /// Raw non-null column values.
331    pub data: bytes::Bytes,
332}
333
334/// Done token indicating statement completion.
335#[derive(Debug, Clone, Copy)]
336pub struct Done {
337    /// Status flags.
338    pub status: DoneStatus,
339    /// Current command.
340    pub cur_cmd: u16,
341    /// Row count (if applicable).
342    pub row_count: u64,
343}
344
345/// Done status flags.
346#[derive(Debug, Clone, Copy, Default)]
347#[non_exhaustive]
348pub struct DoneStatus {
349    /// More results follow.
350    pub more: bool,
351    /// Error occurred.
352    pub error: bool,
353    /// Transaction in progress.
354    pub in_xact: bool,
355    /// Row count is valid.
356    pub count: bool,
357    /// Attention acknowledgment.
358    pub attn: bool,
359    /// Server error caused statement termination.
360    pub srverror: bool,
361}
362
363/// Done in procedure token.
364#[derive(Debug, Clone, Copy)]
365pub struct DoneInProc {
366    /// Status flags.
367    pub status: DoneStatus,
368    /// Current command.
369    pub cur_cmd: u16,
370    /// Row count.
371    pub row_count: u64,
372}
373
374/// Done procedure token.
375#[derive(Debug, Clone, Copy)]
376pub struct DoneProc {
377    /// Status flags.
378    pub status: DoneStatus,
379    /// Current command.
380    pub cur_cmd: u16,
381    /// Row count.
382    pub row_count: u64,
383}
384
385/// Return value from stored procedure.
386#[derive(Debug, Clone)]
387#[non_exhaustive]
388pub struct ReturnValue {
389    /// Parameter ordinal.
390    pub param_ordinal: u16,
391    /// Parameter name.
392    pub param_name: String,
393    /// Status flags.
394    pub status: u8,
395    /// User type.
396    pub user_type: u32,
397    /// Type flags.
398    pub flags: u16,
399    /// Raw column type byte from the wire.
400    pub col_type: u8,
401    /// Type info.
402    pub type_info: TypeInfo,
403    /// Value data.
404    pub value: bytes::Bytes,
405}
406
407/// Server error message.
408#[derive(Debug, Clone)]
409pub struct ServerError {
410    /// Error number.
411    pub number: i32,
412    /// Error state.
413    pub state: u8,
414    /// Error severity class.
415    pub class: u8,
416    /// Error message text.
417    pub message: String,
418    /// Server name.
419    pub server: String,
420    /// Procedure name.
421    pub procedure: String,
422    /// Line number.
423    pub line: i32,
424}
425
426/// Server informational message.
427#[derive(Debug, Clone)]
428pub struct ServerInfo {
429    /// Info number.
430    pub number: i32,
431    /// Info state.
432    pub state: u8,
433    /// Info class (severity).
434    pub class: u8,
435    /// Info message text.
436    pub message: String,
437    /// Server name.
438    pub server: String,
439    /// Procedure name.
440    pub procedure: String,
441    /// Line number.
442    pub line: i32,
443}
444
445/// Login acknowledgment token.
446#[derive(Debug, Clone)]
447pub struct LoginAck {
448    /// Interface type.
449    pub interface: u8,
450    /// TDS version.
451    pub tds_version: u32,
452    /// Program name.
453    pub prog_name: String,
454    /// Program version.
455    pub prog_version: u32,
456}
457
458/// Environment change token.
459#[derive(Debug, Clone)]
460pub struct EnvChange {
461    /// Type of environment change.
462    pub env_type: EnvChangeType,
463    /// New value.
464    pub new_value: EnvChangeValue,
465    /// Old value.
466    pub old_value: EnvChangeValue,
467}
468
469/// Environment change type.
470#[derive(Debug, Clone, Copy, PartialEq, Eq)]
471#[repr(u8)]
472#[non_exhaustive]
473pub enum EnvChangeType {
474    /// Database changed.
475    Database = 1,
476    /// Language changed.
477    Language = 2,
478    /// Character set changed.
479    CharacterSet = 3,
480    /// Packet size changed.
481    PacketSize = 4,
482    /// Unicode data sorting locale ID.
483    UnicodeSortingLocalId = 5,
484    /// Unicode comparison flags.
485    UnicodeComparisonFlags = 6,
486    /// SQL collation.
487    SqlCollation = 7,
488    /// Begin transaction.
489    BeginTransaction = 8,
490    /// Commit transaction.
491    CommitTransaction = 9,
492    /// Rollback transaction.
493    RollbackTransaction = 10,
494    /// Enlist DTC transaction.
495    EnlistDtcTransaction = 11,
496    /// Defect DTC transaction.
497    DefectTransaction = 12,
498    /// Real-time log shipping.
499    RealTimeLogShipping = 13,
500    /// Promote transaction.
501    PromoteTransaction = 15,
502    /// Transaction manager address.
503    TransactionManagerAddress = 16,
504    /// Transaction ended.
505    TransactionEnded = 17,
506    /// Reset connection completion acknowledgment.
507    ResetConnectionCompletionAck = 18,
508    /// User instance started.
509    UserInstanceStarted = 19,
510    /// Routing information.
511    Routing = 20,
512}
513
514/// Environment change value.
515#[derive(Debug, Clone)]
516#[non_exhaustive]
517pub enum EnvChangeValue {
518    /// String value.
519    String(String),
520    /// Binary value.
521    Binary(bytes::Bytes),
522    /// Routing information.
523    Routing {
524        /// Host name.
525        host: String,
526        /// Port number.
527        port: u16,
528    },
529}
530
531/// Column ordering information.
532#[derive(Debug, Clone)]
533pub struct Order {
534    /// Ordered column indices.
535    pub columns: Vec<u16>,
536}
537
538/// Feature extension acknowledgment.
539#[derive(Debug, Clone)]
540pub struct FeatureExtAck {
541    /// Acknowledged features.
542    pub features: Vec<FeatureAck>,
543}
544
545/// Individual feature acknowledgment.
546#[derive(Debug, Clone)]
547pub struct FeatureAck {
548    /// Feature ID.
549    pub feature_id: u8,
550    /// Feature data.
551    pub data: bytes::Bytes,
552}
553
554/// SSPI authentication token.
555#[derive(Debug, Clone)]
556pub struct SspiToken {
557    /// SSPI data.
558    pub data: bytes::Bytes,
559}
560
561/// Session state token.
562#[derive(Debug, Clone)]
563pub struct SessionState {
564    /// Session state data.
565    pub data: bytes::Bytes,
566}
567
568/// Federated authentication info.
569#[derive(Debug, Clone)]
570pub struct FedAuthInfo {
571    /// STS URL.
572    pub sts_url: String,
573    /// Service principal name.
574    pub spn: String,
575}
576
577// =============================================================================
578// ColMetaData and Row Parsing Implementation
579// =============================================================================
580
581/// Decode collation information (5 bytes).
582///
583/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
584pub(crate) fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
585    if src.remaining() < 5 {
586        return Err(ProtocolError::UnexpectedEof);
587    }
588    // Collation: LCID (4 bytes) + Sort ID (1 byte)
589    let lcid = src.get_u32_le();
590    let sort_id = src.get_u8();
591    Ok(Collation { lcid, sort_id })
592}
593
594/// Decode type-specific metadata for a column based on its TypeId.
595///
596/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
597pub(crate) fn decode_type_info(
598    src: &mut impl Buf,
599    type_id: TypeId,
600    col_type: u8,
601) -> Result<TypeInfo, ProtocolError> {
602    match type_id {
603        // Fixed-length types have no additional metadata
604        TypeId::Null => Ok(TypeInfo::default()),
605        TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
606        TypeId::Int2 => Ok(TypeInfo::default()),
607        TypeId::Int4 => Ok(TypeInfo::default()),
608        TypeId::Int8 => Ok(TypeInfo::default()),
609        TypeId::Float4 => Ok(TypeInfo::default()),
610        TypeId::Float8 => Ok(TypeInfo::default()),
611        TypeId::Money => Ok(TypeInfo::default()),
612        TypeId::Money4 => Ok(TypeInfo::default()),
613        TypeId::DateTime => Ok(TypeInfo::default()),
614        TypeId::DateTime4 => Ok(TypeInfo::default()),
615
616        // Variable length integer/float/money (1-byte max length)
617        TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
618            if src.remaining() < 1 {
619                return Err(ProtocolError::UnexpectedEof);
620            }
621            let max_length = src.get_u8() as u32;
622            Ok(TypeInfo {
623                max_length: Some(max_length),
624                ..Default::default()
625            })
626        }
627
628        // GUID has 1-byte length
629        TypeId::Guid => {
630            if src.remaining() < 1 {
631                return Err(ProtocolError::UnexpectedEof);
632            }
633            let max_length = src.get_u8() as u32;
634            Ok(TypeInfo {
635                max_length: Some(max_length),
636                ..Default::default()
637            })
638        }
639
640        // Decimal/Numeric types (1-byte length + precision + scale)
641        TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
642            if src.remaining() < 3 {
643                return Err(ProtocolError::UnexpectedEof);
644            }
645            let max_length = src.get_u8() as u32;
646            let precision = src.get_u8();
647            let scale = src.get_u8();
648            Ok(TypeInfo {
649                max_length: Some(max_length),
650                precision: Some(precision),
651                scale: Some(scale),
652                ..Default::default()
653            })
654        }
655
656        // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
657        TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
658            if src.remaining() < 1 {
659                return Err(ProtocolError::UnexpectedEof);
660            }
661            let max_length = src.get_u8() as u32;
662            Ok(TypeInfo {
663                max_length: Some(max_length),
664                ..Default::default()
665            })
666        }
667
668        // Big varchar/binary with 2-byte length + collation for strings
669        TypeId::BigVarChar | TypeId::BigChar => {
670            if src.remaining() < 7 {
671                // 2 (length) + 5 (collation)
672                return Err(ProtocolError::UnexpectedEof);
673            }
674            let max_length = src.get_u16_le() as u32;
675            let collation = decode_collation(src)?;
676            Ok(TypeInfo {
677                max_length: Some(max_length),
678                collation: Some(collation),
679                ..Default::default()
680            })
681        }
682
683        // Big binary (2-byte length, no collation)
684        TypeId::BigVarBinary | TypeId::BigBinary => {
685            if src.remaining() < 2 {
686                return Err(ProtocolError::UnexpectedEof);
687            }
688            let max_length = src.get_u16_le() as u32;
689            Ok(TypeInfo {
690                max_length: Some(max_length),
691                ..Default::default()
692            })
693        }
694
695        // Unicode strings (NChar, NVarChar) - 2-byte length + collation
696        TypeId::NChar | TypeId::NVarChar => {
697            if src.remaining() < 7 {
698                // 2 (length) + 5 (collation)
699                return Err(ProtocolError::UnexpectedEof);
700            }
701            let max_length = src.get_u16_le() as u32;
702            let collation = decode_collation(src)?;
703            Ok(TypeInfo {
704                max_length: Some(max_length),
705                collation: Some(collation),
706                ..Default::default()
707            })
708        }
709
710        // Date type (no additional metadata)
711        TypeId::Date => Ok(TypeInfo::default()),
712
713        // Time, DateTime2, DateTimeOffset have scale
714        TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
715            if src.remaining() < 1 {
716                return Err(ProtocolError::UnexpectedEof);
717            }
718            let scale = src.get_u8();
719            Ok(TypeInfo {
720                scale: Some(scale),
721                ..Default::default()
722            })
723        }
724
725        // Text/NText/Image (deprecated LOB types)
726        TypeId::Text | TypeId::NText | TypeId::Image => {
727            // These have complex metadata: length (4) + collation (5) + table name parts
728            if src.remaining() < 4 {
729                return Err(ProtocolError::UnexpectedEof);
730            }
731            let max_length = src.get_u32_le();
732
733            // For Text/NText, read collation
734            let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
735                if src.remaining() < 5 {
736                    return Err(ProtocolError::UnexpectedEof);
737                }
738                Some(decode_collation(src)?)
739            } else {
740                None
741            };
742
743            // Skip table name parts (variable length)
744            // Format: numParts (1 byte) followed by us_varchar for each part
745            if src.remaining() < 1 {
746                return Err(ProtocolError::UnexpectedEof);
747            }
748            let num_parts = src.get_u8();
749            for _ in 0..num_parts {
750                // Read and discard table name part
751                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
752            }
753
754            Ok(TypeInfo {
755                max_length: Some(max_length),
756                collation,
757                ..Default::default()
758            })
759        }
760
761        // XML type
762        TypeId::Xml => {
763            if src.remaining() < 1 {
764                return Err(ProtocolError::UnexpectedEof);
765            }
766            let schema_present = src.get_u8();
767
768            if schema_present != 0 {
769                // Read schema info (3 us_varchar strings)
770                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
771                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
772                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
773            }
774
775            Ok(TypeInfo::default())
776        }
777
778        // UDT (User-defined type) - complex metadata
779        TypeId::Udt => {
780            // Max length (2 bytes)
781            if src.remaining() < 2 {
782                return Err(ProtocolError::UnexpectedEof);
783            }
784            let max_length = src.get_u16_le() as u32;
785
786            // UDT metadata: db name, schema name, type name, assembly qualified name
787            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
788            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
789            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
790            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
791
792            Ok(TypeInfo {
793                max_length: Some(max_length),
794                ..Default::default()
795            })
796        }
797
798        // Table-valued parameter - complex metadata (skip for now)
799        TypeId::Tvp => {
800            // TVP has very complex metadata, not commonly used
801            // For now, we can't properly parse this
802            Err(ProtocolError::InvalidTokenType(col_type))
803        }
804
805        // SQL Variant - 4-byte length
806        TypeId::Variant => {
807            if src.remaining() < 4 {
808                return Err(ProtocolError::UnexpectedEof);
809            }
810            let max_length = src.get_u32_le();
811            Ok(TypeInfo {
812                max_length: Some(max_length),
813                ..Default::default()
814            })
815        }
816    }
817}
818
819impl ColMetaData {
820    /// Special value indicating no metadata.
821    pub const NO_METADATA: u16 = 0xFFFF;
822
823    /// Decode a COLMETADATA token from bytes.
824    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
825        if src.remaining() < 2 {
826            return Err(ProtocolError::UnexpectedEof);
827        }
828
829        let column_count = src.get_u16_le();
830
831        // 0xFFFF means no metadata present
832        if column_count == Self::NO_METADATA {
833            return Ok(Self {
834                columns: Vec::new(),
835                cek_table: None,
836            });
837        }
838
839        let mut columns = Vec::with_capacity(column_count as usize);
840
841        for _ in 0..column_count {
842            let column = Self::decode_column(src)?;
843            columns.push(column);
844        }
845
846        Ok(Self {
847            columns,
848            cek_table: None,
849        })
850    }
851
852    /// Decode a single column from the metadata.
853    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
854        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
855        if src.remaining() < 7 {
856            return Err(ProtocolError::UnexpectedEof);
857        }
858
859        let user_type = src.get_u32_le();
860        let flags = src.get_u16_le();
861        let col_type = src.get_u8();
862
863        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null); // Default to Null for unknown types
864
865        // Parse type-specific metadata
866        let type_info = decode_type_info(src, type_id, col_type)?;
867
868        // Read column name (B_VARCHAR format - 1 byte length in characters)
869        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
870
871        Ok(ColumnData {
872            name,
873            type_id,
874            col_type,
875            flags,
876            user_type,
877            type_info,
878            crypto_metadata: None,
879        })
880    }
881
882    /// Decode a COLMETADATA token with Always Encrypted support.
883    ///
884    /// When column encryption was negotiated in Login7, the server sends a CekTable
885    /// before column definitions and per-column CryptoMetadata for encrypted columns.
886    ///
887    /// # Wire Format (with encryption)
888    ///
889    /// ```text
890    /// column_count: USHORT
891    /// cek_table: CekTable (always present when encryption negotiated)
892    /// columns: ColumnData[column_count] (with CryptoMetadata for encrypted columns)
893    /// ```
894    pub fn decode_encrypted(src: &mut impl Buf) -> Result<Self, ProtocolError> {
895        if src.remaining() < 2 {
896            return Err(ProtocolError::UnexpectedEof);
897        }
898
899        let column_count = src.get_u16_le();
900
901        if column_count == Self::NO_METADATA {
902            return Ok(Self {
903                columns: Vec::new(),
904                cek_table: None,
905            });
906        }
907
908        // Parse CEK table (always present when encryption was negotiated)
909        let cek_table = crate::crypto::CekTable::decode(src)?;
910
911        let mut columns = Vec::with_capacity(column_count as usize);
912
913        for _ in 0..column_count {
914            let column = Self::decode_column_encrypted(src)?;
915            columns.push(column);
916        }
917
918        Ok(Self {
919            columns,
920            cek_table: Some(cek_table),
921        })
922    }
923
924    /// Decode a single column definition with Always Encrypted support.
925    ///
926    /// For encrypted columns (flags & 0x0800), parses CryptoMetadata after the type info.
927    fn decode_column_encrypted(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
928        if src.remaining() < 7 {
929            return Err(ProtocolError::UnexpectedEof);
930        }
931
932        let user_type = src.get_u32_le();
933        let flags = src.get_u16_le();
934        let col_type = src.get_u8();
935
936        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
937
938        // Parse type-specific metadata (for encrypted columns, this is the transport type)
939        let type_info = decode_type_info(src, type_id, col_type)?;
940
941        // Parse CryptoMetadata if the column is encrypted
942        let crypto_metadata = if crate::crypto::is_column_encrypted(flags) {
943            Some(crate::crypto::CryptoMetadata::decode(src)?)
944        } else {
945            None
946        };
947
948        // Read column name
949        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
950
951        Ok(ColumnData {
952            name,
953            type_id,
954            col_type,
955            flags,
956            user_type,
957            type_info,
958            crypto_metadata,
959        })
960    }
961
962    /// Get the number of columns.
963    #[must_use]
964    pub fn column_count(&self) -> usize {
965        self.columns.len()
966    }
967
968    /// Check if this represents no metadata.
969    #[must_use]
970    pub fn is_empty(&self) -> bool {
971        self.columns.is_empty()
972    }
973}
974
975impl ColumnData {
976    /// Check if this column is nullable.
977    #[must_use]
978    pub fn is_nullable(&self) -> bool {
979        (self.flags & 0x0001) != 0
980    }
981
982    /// Get the fixed size in bytes for this column, if applicable.
983    ///
984    /// Returns `None` for variable-length types.
985    #[must_use]
986    pub fn fixed_size(&self) -> Option<usize> {
987        match self.type_id {
988            TypeId::Null => Some(0),
989            TypeId::Int1 | TypeId::Bit => Some(1),
990            TypeId::Int2 => Some(2),
991            TypeId::Int4 => Some(4),
992            TypeId::Int8 => Some(8),
993            TypeId::Float4 => Some(4),
994            TypeId::Float8 => Some(8),
995            TypeId::Money => Some(8),
996            TypeId::Money4 => Some(4),
997            TypeId::DateTime => Some(8),
998            TypeId::DateTime4 => Some(4),
999            TypeId::Date => Some(3),
1000            _ => None,
1001        }
1002    }
1003}
1004
1005// =============================================================================
1006// Row Parsing Implementation
1007// =============================================================================
1008
1009impl RawRow {
1010    /// Decode a ROW token from bytes.
1011    ///
1012    /// This function requires the column metadata to know how to parse the row.
1013    /// The row data is stored as raw bytes for later parsing.
1014    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1015        let mut data = bytes::BytesMut::new();
1016
1017        for col in &metadata.columns {
1018            Self::decode_column_value(src, col, &mut data)?;
1019        }
1020
1021        Ok(Self {
1022            data: data.freeze(),
1023        })
1024    }
1025
1026    /// Decode a single column value and append to the output buffer.
1027    fn decode_column_value(
1028        src: &mut impl Buf,
1029        col: &ColumnData,
1030        dst: &mut bytes::BytesMut,
1031    ) -> Result<(), ProtocolError> {
1032        match col.type_id {
1033            // Fixed-length types
1034            TypeId::Null => {
1035                // No data
1036            }
1037            TypeId::Int1 | TypeId::Bit => {
1038                if src.remaining() < 1 {
1039                    return Err(ProtocolError::UnexpectedEof);
1040                }
1041                dst.extend_from_slice(&[src.get_u8()]);
1042            }
1043            TypeId::Int2 => {
1044                if src.remaining() < 2 {
1045                    return Err(ProtocolError::UnexpectedEof);
1046                }
1047                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
1048            }
1049            TypeId::Int4 => {
1050                if src.remaining() < 4 {
1051                    return Err(ProtocolError::UnexpectedEof);
1052                }
1053                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1054            }
1055            TypeId::Int8 => {
1056                if src.remaining() < 8 {
1057                    return Err(ProtocolError::UnexpectedEof);
1058                }
1059                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1060            }
1061            TypeId::Float4 => {
1062                if src.remaining() < 4 {
1063                    return Err(ProtocolError::UnexpectedEof);
1064                }
1065                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1066            }
1067            TypeId::Float8 => {
1068                if src.remaining() < 8 {
1069                    return Err(ProtocolError::UnexpectedEof);
1070                }
1071                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1072            }
1073            TypeId::Money => {
1074                if src.remaining() < 8 {
1075                    return Err(ProtocolError::UnexpectedEof);
1076                }
1077                let hi = src.get_u32_le();
1078                let lo = src.get_u32_le();
1079                dst.extend_from_slice(&hi.to_le_bytes());
1080                dst.extend_from_slice(&lo.to_le_bytes());
1081            }
1082            TypeId::Money4 => {
1083                if src.remaining() < 4 {
1084                    return Err(ProtocolError::UnexpectedEof);
1085                }
1086                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1087            }
1088            TypeId::DateTime => {
1089                if src.remaining() < 8 {
1090                    return Err(ProtocolError::UnexpectedEof);
1091                }
1092                let days = src.get_u32_le();
1093                let time = src.get_u32_le();
1094                dst.extend_from_slice(&days.to_le_bytes());
1095                dst.extend_from_slice(&time.to_le_bytes());
1096            }
1097            TypeId::DateTime4 => {
1098                if src.remaining() < 4 {
1099                    return Err(ProtocolError::UnexpectedEof);
1100                }
1101                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1102            }
1103            // DATE type uses 1-byte length prefix (can be NULL)
1104            TypeId::Date => {
1105                Self::decode_bytelen_type(src, dst)?;
1106            }
1107
1108            // Variable-length nullable types (length-prefixed)
1109            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
1110                Self::decode_bytelen_type(src, dst)?;
1111            }
1112
1113            TypeId::Guid => {
1114                Self::decode_bytelen_type(src, dst)?;
1115            }
1116
1117            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1118                Self::decode_bytelen_type(src, dst)?;
1119            }
1120
1121            // Old-style byte-length strings
1122            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
1123                Self::decode_bytelen_type(src, dst)?;
1124            }
1125
1126            // 2-byte length strings (or PLP for MAX types)
1127            TypeId::BigVarChar | TypeId::BigVarBinary => {
1128                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
1129                if col.type_info.max_length == Some(0xFFFF) {
1130                    Self::decode_plp_type(src, dst)?;
1131                } else {
1132                    Self::decode_ushortlen_type(src, dst)?;
1133                }
1134            }
1135
1136            // Fixed-length types that don't have MAX variants
1137            TypeId::BigChar | TypeId::BigBinary => {
1138                Self::decode_ushortlen_type(src, dst)?;
1139            }
1140
1141            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
1142            TypeId::NVarChar => {
1143                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
1144                if col.type_info.max_length == Some(0xFFFF) {
1145                    Self::decode_plp_type(src, dst)?;
1146                } else {
1147                    Self::decode_ushortlen_type(src, dst)?;
1148                }
1149            }
1150
1151            // Fixed-length NCHAR doesn't have MAX variant
1152            TypeId::NChar => {
1153                Self::decode_ushortlen_type(src, dst)?;
1154            }
1155
1156            // Time types with scale
1157            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
1158                Self::decode_bytelen_type(src, dst)?;
1159            }
1160
1161            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
1162            TypeId::Text | TypeId::NText | TypeId::Image => {
1163                Self::decode_textptr_type(src, dst)?;
1164            }
1165
1166            // XML - uses actual PLP format
1167            TypeId::Xml => {
1168                Self::decode_plp_type(src, dst)?;
1169            }
1170
1171            // Complex types
1172            TypeId::Variant => {
1173                Self::decode_intlen_type(src, dst)?;
1174            }
1175
1176            TypeId::Udt => {
1177                // UDT uses PLP encoding
1178                Self::decode_plp_type(src, dst)?;
1179            }
1180
1181            TypeId::Tvp => {
1182                // TVP not supported in row data
1183                return Err(ProtocolError::InvalidTokenType(col.col_type));
1184            }
1185        }
1186
1187        Ok(())
1188    }
1189
1190    /// Decode a 1-byte length-prefixed value.
1191    fn decode_bytelen_type(
1192        src: &mut impl Buf,
1193        dst: &mut bytes::BytesMut,
1194    ) -> Result<(), ProtocolError> {
1195        if src.remaining() < 1 {
1196            return Err(ProtocolError::UnexpectedEof);
1197        }
1198        let len = src.get_u8() as usize;
1199        if len == 0xFF {
1200            // NULL value - store as zero-length with NULL marker
1201            dst.extend_from_slice(&[0xFF]);
1202        } else if len == 0 {
1203            // Empty value
1204            dst.extend_from_slice(&[0x00]);
1205        } else {
1206            if src.remaining() < len {
1207                return Err(ProtocolError::UnexpectedEof);
1208            }
1209            dst.extend_from_slice(&[len as u8]);
1210            for _ in 0..len {
1211                dst.extend_from_slice(&[src.get_u8()]);
1212            }
1213        }
1214        Ok(())
1215    }
1216
1217    /// Decode a 2-byte length-prefixed value.
1218    fn decode_ushortlen_type(
1219        src: &mut impl Buf,
1220        dst: &mut bytes::BytesMut,
1221    ) -> Result<(), ProtocolError> {
1222        if src.remaining() < 2 {
1223            return Err(ProtocolError::UnexpectedEof);
1224        }
1225        let len = src.get_u16_le() as usize;
1226        if len == 0xFFFF {
1227            // NULL value
1228            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1229        } else if len == 0 {
1230            // Empty value
1231            dst.extend_from_slice(&0u16.to_le_bytes());
1232        } else {
1233            if src.remaining() < len {
1234                return Err(ProtocolError::UnexpectedEof);
1235            }
1236            dst.extend_from_slice(&(len as u16).to_le_bytes());
1237            for _ in 0..len {
1238                dst.extend_from_slice(&[src.get_u8()]);
1239            }
1240        }
1241        Ok(())
1242    }
1243
1244    /// Decode a 4-byte length-prefixed value.
1245    fn decode_intlen_type(
1246        src: &mut impl Buf,
1247        dst: &mut bytes::BytesMut,
1248    ) -> Result<(), ProtocolError> {
1249        if src.remaining() < 4 {
1250            return Err(ProtocolError::UnexpectedEof);
1251        }
1252        let len = src.get_u32_le() as usize;
1253        if len == 0xFFFFFFFF {
1254            // NULL value
1255            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1256        } else if len == 0 {
1257            // Empty value
1258            dst.extend_from_slice(&0u32.to_le_bytes());
1259        } else {
1260            if src.remaining() < len {
1261                return Err(ProtocolError::UnexpectedEof);
1262            }
1263            dst.extend_from_slice(&(len as u32).to_le_bytes());
1264            for _ in 0..len {
1265                dst.extend_from_slice(&[src.get_u8()]);
1266            }
1267        }
1268        Ok(())
1269    }
1270
1271    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1272    ///
1273    /// These deprecated LOB types use a special format:
1274    /// - 1 byte: textptr_len (0 = NULL)
1275    /// - textptr_len bytes: textptr (if not NULL)
1276    /// - 8 bytes: timestamp (if not NULL)
1277    /// - 4 bytes: data length (if not NULL)
1278    /// - data_len bytes: the actual data (if not NULL)
1279    ///
1280    /// We convert this to PLP format for the client to parse:
1281    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1282    /// - 4 bytes: chunk length (= data length)
1283    /// - chunk data
1284    /// - 4 bytes: 0 (terminator)
1285    fn decode_textptr_type(
1286        src: &mut impl Buf,
1287        dst: &mut bytes::BytesMut,
1288    ) -> Result<(), ProtocolError> {
1289        if src.remaining() < 1 {
1290            return Err(ProtocolError::UnexpectedEof);
1291        }
1292
1293        let textptr_len = src.get_u8() as usize;
1294
1295        if textptr_len == 0 {
1296            // NULL value - write PLP NULL marker
1297            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1298            return Ok(());
1299        }
1300
1301        // Skip textptr bytes
1302        if src.remaining() < textptr_len {
1303            return Err(ProtocolError::UnexpectedEof);
1304        }
1305        src.advance(textptr_len);
1306
1307        // Skip 8-byte timestamp
1308        if src.remaining() < 8 {
1309            return Err(ProtocolError::UnexpectedEof);
1310        }
1311        src.advance(8);
1312
1313        // Read data length
1314        if src.remaining() < 4 {
1315            return Err(ProtocolError::UnexpectedEof);
1316        }
1317        let data_len = src.get_u32_le() as usize;
1318
1319        if src.remaining() < data_len {
1320            return Err(ProtocolError::UnexpectedEof);
1321        }
1322
1323        // Write in PLP format for client parsing:
1324        // - 8 bytes: total length
1325        // - 4 bytes: chunk length
1326        // - chunk data
1327        // - 4 bytes: 0 (terminator)
1328        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1329        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1330        for _ in 0..data_len {
1331            dst.extend_from_slice(&[src.get_u8()]);
1332        }
1333        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1334
1335        Ok(())
1336    }
1337
1338    /// Decode a PLP (Partially Length-Prefixed) value.
1339    ///
1340    /// PLP format:
1341    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1342    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1343    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1344        if src.remaining() < 8 {
1345            return Err(ProtocolError::UnexpectedEof);
1346        }
1347
1348        let total_len = src.get_u64_le();
1349
1350        // Store the total length marker
1351        dst.extend_from_slice(&total_len.to_le_bytes());
1352
1353        if total_len == 0xFFFFFFFFFFFFFFFF {
1354            // NULL value - no more data
1355            return Ok(());
1356        }
1357
1358        // Read chunks until terminator
1359        loop {
1360            if src.remaining() < 4 {
1361                return Err(ProtocolError::UnexpectedEof);
1362            }
1363            let chunk_len = src.get_u32_le() as usize;
1364            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1365
1366            if chunk_len == 0 {
1367                // End of PLP data
1368                break;
1369            }
1370
1371            if src.remaining() < chunk_len {
1372                return Err(ProtocolError::UnexpectedEof);
1373            }
1374
1375            for _ in 0..chunk_len {
1376                dst.extend_from_slice(&[src.get_u8()]);
1377            }
1378        }
1379
1380        Ok(())
1381    }
1382}
1383
1384// =============================================================================
1385// NbcRow Parsing Implementation
1386// =============================================================================
1387
1388impl NbcRow {
1389    /// Decode an NBCROW token from bytes.
1390    ///
1391    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1392    /// columns are NULL, followed by only the non-NULL values.
1393    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1394        let col_count = metadata.columns.len();
1395        let bitmap_len = col_count.div_ceil(8);
1396
1397        if src.remaining() < bitmap_len {
1398            return Err(ProtocolError::UnexpectedEof);
1399        }
1400
1401        // Read null bitmap
1402        let mut null_bitmap = vec![0u8; bitmap_len];
1403        for byte in &mut null_bitmap {
1404            *byte = src.get_u8();
1405        }
1406
1407        // Read non-null values
1408        let mut data = bytes::BytesMut::new();
1409
1410        for (i, col) in metadata.columns.iter().enumerate() {
1411            let byte_idx = i / 8;
1412            let bit_idx = i % 8;
1413            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1414
1415            if !is_null {
1416                // Read the value - for NBCROW, we read without the length prefix
1417                // for fixed-length types, and with length prefix for variable types
1418                RawRow::decode_column_value(src, col, &mut data)?;
1419            }
1420        }
1421
1422        Ok(Self {
1423            null_bitmap,
1424            data: data.freeze(),
1425        })
1426    }
1427
1428    /// Check if a column at the given index is NULL.
1429    #[must_use]
1430    pub fn is_null(&self, column_index: usize) -> bool {
1431        let byte_idx = column_index / 8;
1432        let bit_idx = column_index % 8;
1433        if byte_idx < self.null_bitmap.len() {
1434            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1435        } else {
1436            true // Out of bounds = NULL
1437        }
1438    }
1439}
1440
1441// =============================================================================
1442// ReturnValue Parsing Implementation
1443// =============================================================================
1444
1445impl ReturnValue {
1446    /// Decode a RETURNVALUE token from bytes.
1447    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1448        // MS-TDS §2.2.7.18: the RETURNVALUE token has no length prefix —
1449        // it begins directly with the 2-byte ParamOrdinal. The previous
1450        // spurious 2-byte read consumed the ordinal and shifted every
1451        // subsequent field, leaving the stream parser two bytes ahead and
1452        // reading value bytes as the next token type (e.g. `0x74` from a
1453        // Unicode name fragment was misread as an unknown token).
1454        if src.remaining() < 2 {
1455            return Err(ProtocolError::UnexpectedEof);
1456        }
1457        let param_ordinal = src.get_u16_le();
1458
1459        // Parameter name (B_VARCHAR)
1460        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1461
1462        // Status (1 byte)
1463        if src.remaining() < 1 {
1464            return Err(ProtocolError::UnexpectedEof);
1465        }
1466        let status = src.get_u8();
1467
1468        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1469        if src.remaining() < 7 {
1470            return Err(ProtocolError::UnexpectedEof);
1471        }
1472        let user_type = src.get_u32_le();
1473        let flags = src.get_u16_le();
1474        let col_type = src.get_u8();
1475
1476        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
1477
1478        // Parse type info
1479        let type_info = decode_type_info(src, type_id, col_type)?;
1480
1481        // Read the value data
1482        let mut value_buf = bytes::BytesMut::new();
1483
1484        // Create a temporary column for value parsing
1485        let temp_col = ColumnData {
1486            name: String::new(),
1487            type_id,
1488            col_type,
1489            flags,
1490            user_type,
1491            type_info: type_info.clone(),
1492            crypto_metadata: None,
1493        };
1494
1495        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1496
1497        Ok(Self {
1498            param_ordinal,
1499            param_name,
1500            status,
1501            user_type,
1502            flags,
1503            col_type,
1504            type_info,
1505            value: value_buf.freeze(),
1506        })
1507    }
1508}
1509
1510// =============================================================================
1511// SessionState Parsing Implementation
1512// =============================================================================
1513
1514impl SessionState {
1515    /// Decode a SESSIONSTATE token from bytes.
1516    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1517        if src.remaining() < 4 {
1518            return Err(ProtocolError::UnexpectedEof);
1519        }
1520
1521        let length = src.get_u32_le() as usize;
1522
1523        if src.remaining() < length {
1524            return Err(ProtocolError::IncompletePacket {
1525                expected: length,
1526                actual: src.remaining(),
1527            });
1528        }
1529
1530        let data = src.copy_to_bytes(length);
1531
1532        Ok(Self { data })
1533    }
1534}
1535
1536// =============================================================================
1537// Token Parsing Implementation
1538// =============================================================================
1539
1540/// Done token status flags bit positions.
1541mod done_status_bits {
1542    pub const DONE_MORE: u16 = 0x0001;
1543    pub const DONE_ERROR: u16 = 0x0002;
1544    pub const DONE_INXACT: u16 = 0x0004;
1545    pub const DONE_COUNT: u16 = 0x0010;
1546    pub const DONE_ATTN: u16 = 0x0020;
1547    pub const DONE_SRVERROR: u16 = 0x0100;
1548}
1549
1550impl DoneStatus {
1551    /// Parse done status from raw bits.
1552    #[must_use]
1553    pub fn from_bits(bits: u16) -> Self {
1554        use done_status_bits::*;
1555        Self {
1556            more: (bits & DONE_MORE) != 0,
1557            error: (bits & DONE_ERROR) != 0,
1558            in_xact: (bits & DONE_INXACT) != 0,
1559            count: (bits & DONE_COUNT) != 0,
1560            attn: (bits & DONE_ATTN) != 0,
1561            srverror: (bits & DONE_SRVERROR) != 0,
1562        }
1563    }
1564
1565    /// Convert to raw bits.
1566    #[must_use]
1567    pub fn to_bits(&self) -> u16 {
1568        use done_status_bits::*;
1569        let mut bits = 0u16;
1570        if self.more {
1571            bits |= DONE_MORE;
1572        }
1573        if self.error {
1574            bits |= DONE_ERROR;
1575        }
1576        if self.in_xact {
1577            bits |= DONE_INXACT;
1578        }
1579        if self.count {
1580            bits |= DONE_COUNT;
1581        }
1582        if self.attn {
1583            bits |= DONE_ATTN;
1584        }
1585        if self.srverror {
1586            bits |= DONE_SRVERROR;
1587        }
1588        bits
1589    }
1590}
1591
1592impl Done {
1593    /// Size of the DONE token in bytes (excluding token type byte).
1594    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1595
1596    /// Decode a DONE token from bytes.
1597    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1598        if src.remaining() < Self::SIZE {
1599            return Err(ProtocolError::IncompletePacket {
1600                expected: Self::SIZE,
1601                actual: src.remaining(),
1602            });
1603        }
1604
1605        let status = DoneStatus::from_bits(src.get_u16_le());
1606        let cur_cmd = src.get_u16_le();
1607        let row_count = src.get_u64_le();
1608
1609        Ok(Self {
1610            status,
1611            cur_cmd,
1612            row_count,
1613        })
1614    }
1615
1616    /// Encode the DONE token to bytes.
1617    pub fn encode(&self, dst: &mut impl BufMut) {
1618        dst.put_u8(TokenType::Done as u8);
1619        dst.put_u16_le(self.status.to_bits());
1620        dst.put_u16_le(self.cur_cmd);
1621        dst.put_u64_le(self.row_count);
1622    }
1623
1624    /// Check if more results follow this DONE token.
1625    #[must_use]
1626    pub const fn has_more(&self) -> bool {
1627        self.status.more
1628    }
1629
1630    /// Check if an error occurred.
1631    #[must_use]
1632    pub const fn has_error(&self) -> bool {
1633        self.status.error
1634    }
1635
1636    /// Check if the row count is valid.
1637    #[must_use]
1638    pub const fn has_count(&self) -> bool {
1639        self.status.count
1640    }
1641}
1642
1643impl DoneProc {
1644    /// Size of the DONEPROC token in bytes (excluding token type byte).
1645    pub const SIZE: usize = 12;
1646
1647    /// Decode a DONEPROC token from bytes.
1648    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1649        if src.remaining() < Self::SIZE {
1650            return Err(ProtocolError::IncompletePacket {
1651                expected: Self::SIZE,
1652                actual: src.remaining(),
1653            });
1654        }
1655
1656        let status = DoneStatus::from_bits(src.get_u16_le());
1657        let cur_cmd = src.get_u16_le();
1658        let row_count = src.get_u64_le();
1659
1660        Ok(Self {
1661            status,
1662            cur_cmd,
1663            row_count,
1664        })
1665    }
1666
1667    /// Encode the DONEPROC token to bytes.
1668    pub fn encode(&self, dst: &mut impl BufMut) {
1669        dst.put_u8(TokenType::DoneProc as u8);
1670        dst.put_u16_le(self.status.to_bits());
1671        dst.put_u16_le(self.cur_cmd);
1672        dst.put_u64_le(self.row_count);
1673    }
1674}
1675
1676impl DoneInProc {
1677    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1678    pub const SIZE: usize = 12;
1679
1680    /// Decode a DONEINPROC token from bytes.
1681    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1682        if src.remaining() < Self::SIZE {
1683            return Err(ProtocolError::IncompletePacket {
1684                expected: Self::SIZE,
1685                actual: src.remaining(),
1686            });
1687        }
1688
1689        let status = DoneStatus::from_bits(src.get_u16_le());
1690        let cur_cmd = src.get_u16_le();
1691        let row_count = src.get_u64_le();
1692
1693        Ok(Self {
1694            status,
1695            cur_cmd,
1696            row_count,
1697        })
1698    }
1699
1700    /// Encode the DONEINPROC token to bytes.
1701    pub fn encode(&self, dst: &mut impl BufMut) {
1702        dst.put_u8(TokenType::DoneInProc as u8);
1703        dst.put_u16_le(self.status.to_bits());
1704        dst.put_u16_le(self.cur_cmd);
1705        dst.put_u64_le(self.row_count);
1706    }
1707}
1708
1709impl ServerError {
1710    /// Decode an ERROR token from bytes.
1711    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1712        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1713        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1714        if src.remaining() < 2 {
1715            return Err(ProtocolError::UnexpectedEof);
1716        }
1717
1718        let _length = src.get_u16_le();
1719
1720        if src.remaining() < 6 {
1721            return Err(ProtocolError::UnexpectedEof);
1722        }
1723
1724        let number = src.get_i32_le();
1725        let state = src.get_u8();
1726        let class = src.get_u8();
1727
1728        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1729        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1730        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1731
1732        if src.remaining() < 4 {
1733            return Err(ProtocolError::UnexpectedEof);
1734        }
1735        let line = src.get_i32_le();
1736
1737        Ok(Self {
1738            number,
1739            state,
1740            class,
1741            message,
1742            server,
1743            procedure,
1744            line,
1745        })
1746    }
1747
1748    /// Check if this is a fatal error (severity >= 20).
1749    #[must_use]
1750    pub const fn is_fatal(&self) -> bool {
1751        self.class >= 20
1752    }
1753
1754    /// Check if this error indicates the batch was aborted (severity >= 16).
1755    #[must_use]
1756    pub const fn is_batch_abort(&self) -> bool {
1757        self.class >= 16
1758    }
1759}
1760
1761impl ServerInfo {
1762    /// Decode an INFO token from bytes.
1763    ///
1764    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1765    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1766        if src.remaining() < 2 {
1767            return Err(ProtocolError::UnexpectedEof);
1768        }
1769
1770        let _length = src.get_u16_le();
1771
1772        if src.remaining() < 6 {
1773            return Err(ProtocolError::UnexpectedEof);
1774        }
1775
1776        let number = src.get_i32_le();
1777        let state = src.get_u8();
1778        let class = src.get_u8();
1779
1780        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1781        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1782        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1783
1784        if src.remaining() < 4 {
1785            return Err(ProtocolError::UnexpectedEof);
1786        }
1787        let line = src.get_i32_le();
1788
1789        Ok(Self {
1790            number,
1791            state,
1792            class,
1793            message,
1794            server,
1795            procedure,
1796            line,
1797        })
1798    }
1799}
1800
1801impl LoginAck {
1802    /// Decode a LOGINACK token from bytes.
1803    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1804        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1805        if src.remaining() < 2 {
1806            return Err(ProtocolError::UnexpectedEof);
1807        }
1808
1809        let _length = src.get_u16_le();
1810
1811        if src.remaining() < 5 {
1812            return Err(ProtocolError::UnexpectedEof);
1813        }
1814
1815        let interface = src.get_u8();
1816        let tds_version = src.get_u32_le();
1817        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1818
1819        if src.remaining() < 4 {
1820            return Err(ProtocolError::UnexpectedEof);
1821        }
1822        let prog_version = src.get_u32_le();
1823
1824        Ok(Self {
1825            interface,
1826            tds_version,
1827            prog_name,
1828            prog_version,
1829        })
1830    }
1831
1832    /// Get the TDS version as a `TdsVersion`.
1833    #[must_use]
1834    pub fn tds_version(&self) -> crate::version::TdsVersion {
1835        crate::version::TdsVersion::new(self.tds_version)
1836    }
1837}
1838
1839impl EnvChangeType {
1840    /// Create from raw byte value.
1841    pub fn from_u8(value: u8) -> Option<Self> {
1842        match value {
1843            1 => Some(Self::Database),
1844            2 => Some(Self::Language),
1845            3 => Some(Self::CharacterSet),
1846            4 => Some(Self::PacketSize),
1847            5 => Some(Self::UnicodeSortingLocalId),
1848            6 => Some(Self::UnicodeComparisonFlags),
1849            7 => Some(Self::SqlCollation),
1850            8 => Some(Self::BeginTransaction),
1851            9 => Some(Self::CommitTransaction),
1852            10 => Some(Self::RollbackTransaction),
1853            11 => Some(Self::EnlistDtcTransaction),
1854            12 => Some(Self::DefectTransaction),
1855            13 => Some(Self::RealTimeLogShipping),
1856            15 => Some(Self::PromoteTransaction),
1857            16 => Some(Self::TransactionManagerAddress),
1858            17 => Some(Self::TransactionEnded),
1859            18 => Some(Self::ResetConnectionCompletionAck),
1860            19 => Some(Self::UserInstanceStarted),
1861            20 => Some(Self::Routing),
1862            _ => None,
1863        }
1864    }
1865}
1866
1867impl EnvChange {
1868    /// Decode an ENVCHANGE token from bytes.
1869    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1870        if src.remaining() < 3 {
1871            return Err(ProtocolError::UnexpectedEof);
1872        }
1873
1874        let length = src.get_u16_le() as usize;
1875        if src.remaining() < length {
1876            return Err(ProtocolError::IncompletePacket {
1877                expected: length,
1878                actual: src.remaining(),
1879            });
1880        }
1881
1882        let env_type_byte = src.get_u8();
1883        let env_type = EnvChangeType::from_u8(env_type_byte)
1884            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
1885
1886        let (new_value, old_value) = match env_type {
1887            EnvChangeType::Routing => {
1888                // Routing has special format
1889                let new_value = Self::decode_routing_value(src)?;
1890                let old_value = EnvChangeValue::Binary(Bytes::new());
1891                (new_value, old_value)
1892            }
1893            EnvChangeType::BeginTransaction
1894            | EnvChangeType::CommitTransaction
1895            | EnvChangeType::RollbackTransaction
1896            | EnvChangeType::EnlistDtcTransaction
1897            | EnvChangeType::SqlCollation => {
1898                // These use binary format per MS-TDS spec:
1899                // - Transaction tokens: transaction descriptor (8 bytes)
1900                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
1901                let new_len = src.get_u8() as usize;
1902                let new_value = if new_len > 0 && src.remaining() >= new_len {
1903                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
1904                } else {
1905                    EnvChangeValue::Binary(Bytes::new())
1906                };
1907
1908                let old_len = src.get_u8() as usize;
1909                let old_value = if old_len > 0 && src.remaining() >= old_len {
1910                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
1911                } else {
1912                    EnvChangeValue::Binary(Bytes::new())
1913                };
1914
1915                (new_value, old_value)
1916            }
1917            _ => {
1918                // String format for most env changes
1919                let new_value = read_b_varchar(src)
1920                    .map(EnvChangeValue::String)
1921                    .unwrap_or(EnvChangeValue::String(String::new()));
1922
1923                let old_value = read_b_varchar(src)
1924                    .map(EnvChangeValue::String)
1925                    .unwrap_or(EnvChangeValue::String(String::new()));
1926
1927                (new_value, old_value)
1928            }
1929        };
1930
1931        Ok(Self {
1932            env_type,
1933            new_value,
1934            old_value,
1935        })
1936    }
1937
1938    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
1939        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
1940        if src.remaining() < 2 {
1941            return Err(ProtocolError::UnexpectedEof);
1942        }
1943
1944        let _routing_len = src.get_u16_le();
1945
1946        if src.remaining() < 5 {
1947            return Err(ProtocolError::UnexpectedEof);
1948        }
1949
1950        let _protocol = src.get_u8();
1951        let port = src.get_u16_le();
1952        let server_len = src.get_u16_le() as usize;
1953
1954        // Read UTF-16LE server name
1955        if src.remaining() < server_len * 2 {
1956            return Err(ProtocolError::UnexpectedEof);
1957        }
1958
1959        let mut chars = Vec::with_capacity(server_len);
1960        for _ in 0..server_len {
1961            chars.push(src.get_u16_le());
1962        }
1963
1964        let host = String::from_utf16(&chars).map_err(|_| {
1965            ProtocolError::StringEncoding(
1966                #[cfg(feature = "std")]
1967                "invalid UTF-16 in routing hostname".to_string(),
1968                #[cfg(not(feature = "std"))]
1969                "invalid UTF-16 in routing hostname",
1970            )
1971        })?;
1972
1973        Ok(EnvChangeValue::Routing { host, port })
1974    }
1975
1976    /// Check if this is a routing redirect.
1977    #[must_use]
1978    pub fn is_routing(&self) -> bool {
1979        self.env_type == EnvChangeType::Routing
1980    }
1981
1982    /// Get routing information if this is a routing change.
1983    #[must_use]
1984    pub fn routing_info(&self) -> Option<(&str, u16)> {
1985        if let EnvChangeValue::Routing { host, port } = &self.new_value {
1986            Some((host, *port))
1987        } else {
1988            None
1989        }
1990    }
1991
1992    /// Get the new database name if this is a database change.
1993    #[must_use]
1994    pub fn new_database(&self) -> Option<&str> {
1995        if self.env_type == EnvChangeType::Database {
1996            if let EnvChangeValue::String(s) = &self.new_value {
1997                return Some(s);
1998            }
1999        }
2000        None
2001    }
2002}
2003
2004impl Order {
2005    /// Decode an ORDER token from bytes.
2006    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2007        if src.remaining() < 2 {
2008            return Err(ProtocolError::UnexpectedEof);
2009        }
2010
2011        let length = src.get_u16_le() as usize;
2012        let column_count = length / 2;
2013
2014        if src.remaining() < length {
2015            return Err(ProtocolError::IncompletePacket {
2016                expected: length,
2017                actual: src.remaining(),
2018            });
2019        }
2020
2021        let mut columns = Vec::with_capacity(column_count);
2022        for _ in 0..column_count {
2023            columns.push(src.get_u16_le());
2024        }
2025
2026        Ok(Self { columns })
2027    }
2028}
2029
2030impl FeatureExtAck {
2031    /// Feature terminator byte.
2032    pub const TERMINATOR: u8 = 0xFF;
2033
2034    /// Decode a FEATUREEXTACK token from bytes.
2035    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2036        let mut features = Vec::new();
2037
2038        loop {
2039            if !src.has_remaining() {
2040                return Err(ProtocolError::UnexpectedEof);
2041            }
2042
2043            let feature_id = src.get_u8();
2044            if feature_id == Self::TERMINATOR {
2045                break;
2046            }
2047
2048            if src.remaining() < 4 {
2049                return Err(ProtocolError::UnexpectedEof);
2050            }
2051
2052            let data_len = src.get_u32_le() as usize;
2053
2054            if src.remaining() < data_len {
2055                return Err(ProtocolError::IncompletePacket {
2056                    expected: data_len,
2057                    actual: src.remaining(),
2058                });
2059            }
2060
2061            let data = src.copy_to_bytes(data_len);
2062            features.push(FeatureAck { feature_id, data });
2063        }
2064
2065        Ok(Self { features })
2066    }
2067}
2068
2069impl SspiToken {
2070    /// Decode an SSPI token from bytes.
2071    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2072        if src.remaining() < 2 {
2073            return Err(ProtocolError::UnexpectedEof);
2074        }
2075
2076        let length = src.get_u16_le() as usize;
2077
2078        if src.remaining() < length {
2079            return Err(ProtocolError::IncompletePacket {
2080                expected: length,
2081                actual: src.remaining(),
2082            });
2083        }
2084
2085        let data = src.copy_to_bytes(length);
2086        Ok(Self { data })
2087    }
2088}
2089
2090impl FedAuthInfo {
2091    /// Decode a FEDAUTHINFO token from bytes.
2092    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2093        if src.remaining() < 4 {
2094            return Err(ProtocolError::UnexpectedEof);
2095        }
2096
2097        let _length = src.get_u32_le();
2098
2099        if src.remaining() < 5 {
2100            return Err(ProtocolError::UnexpectedEof);
2101        }
2102
2103        let _count = src.get_u8();
2104
2105        // Read option data
2106        let mut sts_url = String::new();
2107        let mut spn = String::new();
2108
2109        // Parse info options until we have both
2110        while src.has_remaining() {
2111            if src.remaining() < 9 {
2112                break;
2113            }
2114
2115            let info_id = src.get_u8();
2116            let info_len = src.get_u32_le() as usize;
2117            let _info_offset = src.get_u32_le();
2118
2119            if src.remaining() < info_len {
2120                break;
2121            }
2122
2123            // Read UTF-16LE string
2124            let char_count = info_len / 2;
2125            let mut chars = Vec::with_capacity(char_count);
2126            for _ in 0..char_count {
2127                chars.push(src.get_u16_le());
2128            }
2129
2130            if let Ok(value) = String::from_utf16(&chars) {
2131                match info_id {
2132                    0x01 => spn = value,
2133                    0x02 => sts_url = value,
2134                    _ => {}
2135                }
2136            }
2137        }
2138
2139        Ok(Self { sts_url, spn })
2140    }
2141}
2142
2143// =============================================================================
2144// Token Parser
2145// =============================================================================
2146
2147/// Token stream parser.
2148///
2149/// Parses a stream of TDS tokens from a byte buffer.
2150///
2151/// # Basic vs Context-Aware Parsing
2152///
2153/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
2154/// Use [`next_token()`](TokenParser::next_token) for these.
2155///
2156/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
2157/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
2158/// for these.
2159///
2160/// # Example
2161///
2162/// ```rust,ignore
2163/// let mut parser = TokenParser::new(data);
2164/// let mut metadata = None;
2165///
2166/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
2167///     match token {
2168///         Token::ColMetaData(meta) => {
2169///             metadata = Some(meta);
2170///         }
2171///         Token::Row(row) => {
2172///             // Process row using metadata
2173///         }
2174///         Token::Done(done) => {
2175///             if !done.has_more() {
2176///                 break;
2177///             }
2178///         }
2179///         _ => {}
2180///     }
2181/// }
2182/// ```
2183pub struct TokenParser {
2184    data: Bytes,
2185    position: usize,
2186    /// Whether Always Encrypted was negotiated for this connection.
2187    /// When true, ColMetaData tokens are parsed with CekTable and per-column CryptoMetadata.
2188    encryption_enabled: bool,
2189}
2190
2191impl TokenParser {
2192    /// Create a new token parser from bytes.
2193    #[must_use]
2194    pub fn new(data: Bytes) -> Self {
2195        Self {
2196            data,
2197            position: 0,
2198            encryption_enabled: false,
2199        }
2200    }
2201
2202    /// Enable Always Encrypted metadata parsing.
2203    ///
2204    /// When enabled, ColMetaData tokens are parsed using the encrypted format
2205    /// which includes a CekTable and per-column CryptoMetadata.
2206    #[must_use]
2207    pub fn with_encryption(mut self, enabled: bool) -> Self {
2208        self.encryption_enabled = enabled;
2209        self
2210    }
2211
2212    /// Get remaining bytes in the buffer.
2213    #[must_use]
2214    pub fn remaining(&self) -> usize {
2215        self.data.len().saturating_sub(self.position)
2216    }
2217
2218    /// Check if there are more bytes to parse.
2219    #[must_use]
2220    pub fn has_remaining(&self) -> bool {
2221        self.position < self.data.len()
2222    }
2223
2224    /// Peek at the next token type without consuming it.
2225    #[must_use]
2226    pub fn peek_token_type(&self) -> Option<TokenType> {
2227        if self.position < self.data.len() {
2228            TokenType::from_u8(self.data[self.position])
2229        } else {
2230            None
2231        }
2232    }
2233
2234    /// Parse the next token from the stream.
2235    ///
2236    /// This method can only parse context-independent tokens. For tokens that
2237    /// require column metadata (ColMetaData, Row, NbcRow), use
2238    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2239    ///
2240    /// Returns `None` if no more tokens are available.
2241    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2242        self.next_token_with_metadata(None)
2243    }
2244
2245    /// Parse the next token with optional column metadata context.
2246    ///
2247    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2248    /// Without metadata, those tokens will return an error.
2249    ///
2250    /// Returns `None` if no more tokens are available.
2251    pub fn next_token_with_metadata(
2252        &mut self,
2253        metadata: Option<&ColMetaData>,
2254    ) -> Result<Option<Token>, ProtocolError> {
2255        if !self.has_remaining() {
2256            return Ok(None);
2257        }
2258
2259        let mut buf = &self.data[self.position..];
2260        let start_pos = self.position;
2261
2262        let token_type_byte = buf.get_u8();
2263        let token_type = TokenType::from_u8(token_type_byte);
2264
2265        let token = match token_type {
2266            Some(TokenType::Done) => {
2267                let done = Done::decode(&mut buf)?;
2268                Token::Done(done)
2269            }
2270            Some(TokenType::DoneProc) => {
2271                let done = DoneProc::decode(&mut buf)?;
2272                Token::DoneProc(done)
2273            }
2274            Some(TokenType::DoneInProc) => {
2275                let done = DoneInProc::decode(&mut buf)?;
2276                Token::DoneInProc(done)
2277            }
2278            Some(TokenType::Error) => {
2279                let error = ServerError::decode(&mut buf)?;
2280                Token::Error(error)
2281            }
2282            Some(TokenType::Info) => {
2283                let info = ServerInfo::decode(&mut buf)?;
2284                Token::Info(info)
2285            }
2286            Some(TokenType::LoginAck) => {
2287                let login_ack = LoginAck::decode(&mut buf)?;
2288                Token::LoginAck(login_ack)
2289            }
2290            Some(TokenType::EnvChange) => {
2291                let env_change = EnvChange::decode(&mut buf)?;
2292                Token::EnvChange(env_change)
2293            }
2294            Some(TokenType::Order) => {
2295                let order = Order::decode(&mut buf)?;
2296                Token::Order(order)
2297            }
2298            Some(TokenType::FeatureExtAck) => {
2299                let ack = FeatureExtAck::decode(&mut buf)?;
2300                Token::FeatureExtAck(ack)
2301            }
2302            Some(TokenType::Sspi) => {
2303                let sspi = SspiToken::decode(&mut buf)?;
2304                Token::Sspi(sspi)
2305            }
2306            Some(TokenType::FedAuthInfo) => {
2307                let info = FedAuthInfo::decode(&mut buf)?;
2308                Token::FedAuthInfo(info)
2309            }
2310            Some(TokenType::ReturnStatus) => {
2311                if buf.remaining() < 4 {
2312                    return Err(ProtocolError::UnexpectedEof);
2313                }
2314                let status = buf.get_i32_le();
2315                Token::ReturnStatus(status)
2316            }
2317            Some(TokenType::ColMetaData) => {
2318                let col_meta = if self.encryption_enabled {
2319                    ColMetaData::decode_encrypted(&mut buf)?
2320                } else {
2321                    ColMetaData::decode(&mut buf)?
2322                };
2323                Token::ColMetaData(col_meta)
2324            }
2325            Some(TokenType::Row) => {
2326                let meta = metadata.ok_or_else(|| {
2327                    ProtocolError::StringEncoding(
2328                        #[cfg(feature = "std")]
2329                        "Row token requires column metadata".to_string(),
2330                        #[cfg(not(feature = "std"))]
2331                        "Row token requires column metadata",
2332                    )
2333                })?;
2334                let row = RawRow::decode(&mut buf, meta)?;
2335                Token::Row(row)
2336            }
2337            Some(TokenType::NbcRow) => {
2338                let meta = metadata.ok_or_else(|| {
2339                    ProtocolError::StringEncoding(
2340                        #[cfg(feature = "std")]
2341                        "NbcRow token requires column metadata".to_string(),
2342                        #[cfg(not(feature = "std"))]
2343                        "NbcRow token requires column metadata",
2344                    )
2345                })?;
2346                let row = NbcRow::decode(&mut buf, meta)?;
2347                Token::NbcRow(row)
2348            }
2349            Some(TokenType::ReturnValue) => {
2350                let ret_val = ReturnValue::decode(&mut buf)?;
2351                Token::ReturnValue(ret_val)
2352            }
2353            Some(TokenType::SessionState) => {
2354                let session = SessionState::decode(&mut buf)?;
2355                Token::SessionState(session)
2356            }
2357            Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2358                // These tokens are rarely used and have complex formats.
2359                // Skip them by reading the length and advancing.
2360                if buf.remaining() < 2 {
2361                    return Err(ProtocolError::UnexpectedEof);
2362                }
2363                let length = buf.get_u16_le() as usize;
2364                if buf.remaining() < length {
2365                    return Err(ProtocolError::IncompletePacket {
2366                        expected: length,
2367                        actual: buf.remaining(),
2368                    });
2369                }
2370                // Skip the data
2371                buf.advance(length);
2372                // Recursively get the next token
2373                self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2374                return self.next_token_with_metadata(metadata);
2375            }
2376            None => {
2377                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2378            }
2379        };
2380
2381        // Update position based on how much was consumed
2382        let consumed = self.data.len() - start_pos - buf.remaining();
2383        self.position = start_pos + consumed;
2384
2385        Ok(Some(token))
2386    }
2387
2388    /// Skip the current token without fully parsing it.
2389    ///
2390    /// This is useful for skipping unknown or uninteresting tokens.
2391    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2392        if !self.has_remaining() {
2393            return Ok(());
2394        }
2395
2396        let token_type_byte = self.data[self.position];
2397        let token_type = TokenType::from_u8(token_type_byte);
2398
2399        // Calculate how many bytes to skip based on token type
2400        let skip_amount = match token_type {
2401            // Fixed-size tokens
2402            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2403                1 + Done::SIZE // token type + 12 bytes
2404            }
2405            Some(TokenType::ReturnStatus) => {
2406                1 + 4 // token type + 4 bytes
2407            }
2408            // Variable-length tokens with 2-byte length prefix
2409            Some(TokenType::Error)
2410            | Some(TokenType::Info)
2411            | Some(TokenType::LoginAck)
2412            | Some(TokenType::EnvChange)
2413            | Some(TokenType::Order)
2414            | Some(TokenType::Sspi)
2415            | Some(TokenType::ColInfo)
2416            | Some(TokenType::TabName)
2417            | Some(TokenType::Offset)
2418            | Some(TokenType::ReturnValue) => {
2419                if self.remaining() < 3 {
2420                    return Err(ProtocolError::UnexpectedEof);
2421                }
2422                let length = u16::from_le_bytes([
2423                    self.data[self.position + 1],
2424                    self.data[self.position + 2],
2425                ]) as usize;
2426                1 + 2 + length // token type + length prefix + data
2427            }
2428            // Tokens with 4-byte length prefix
2429            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2430                if self.remaining() < 5 {
2431                    return Err(ProtocolError::UnexpectedEof);
2432                }
2433                let length = u32::from_le_bytes([
2434                    self.data[self.position + 1],
2435                    self.data[self.position + 2],
2436                    self.data[self.position + 3],
2437                    self.data[self.position + 4],
2438                ]) as usize;
2439                1 + 4 + length
2440            }
2441            // FeatureExtAck has no length prefix - must parse
2442            Some(TokenType::FeatureExtAck) => {
2443                // Parse to find end
2444                let mut buf = &self.data[self.position + 1..];
2445                let _ = FeatureExtAck::decode(&mut buf)?;
2446                self.data.len() - self.position - buf.remaining()
2447            }
2448            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2449            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2450                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2451            }
2452            None => {
2453                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2454            }
2455        };
2456
2457        if self.remaining() < skip_amount {
2458            return Err(ProtocolError::UnexpectedEof);
2459        }
2460
2461        self.position += skip_amount;
2462        Ok(())
2463    }
2464
2465    /// Get the current position in the buffer.
2466    #[must_use]
2467    pub fn position(&self) -> usize {
2468        self.position
2469    }
2470
2471    /// Reset the parser to the beginning.
2472    pub fn reset(&mut self) {
2473        self.position = 0;
2474    }
2475}
2476
2477// =============================================================================
2478// Tests
2479// =============================================================================
2480
2481#[cfg(test)]
2482#[allow(clippy::unwrap_used, clippy::panic)]
2483mod tests {
2484    use super::*;
2485    use bytes::BytesMut;
2486
2487    #[test]
2488    fn test_done_roundtrip() {
2489        let done = Done {
2490            status: DoneStatus {
2491                more: false,
2492                error: false,
2493                in_xact: false,
2494                count: true,
2495                attn: false,
2496                srverror: false,
2497            },
2498            cur_cmd: 193, // SELECT
2499            row_count: 42,
2500        };
2501
2502        let mut buf = BytesMut::new();
2503        done.encode(&mut buf);
2504
2505        // Skip the token type byte
2506        let mut cursor = &buf[1..];
2507        let decoded = Done::decode(&mut cursor).unwrap();
2508
2509        assert_eq!(decoded.status.count, done.status.count);
2510        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2511        assert_eq!(decoded.row_count, done.row_count);
2512    }
2513
2514    #[test]
2515    fn test_done_status_bits() {
2516        let status = DoneStatus {
2517            more: true,
2518            error: true,
2519            in_xact: true,
2520            count: true,
2521            attn: false,
2522            srverror: false,
2523        };
2524
2525        let bits = status.to_bits();
2526        let restored = DoneStatus::from_bits(bits);
2527
2528        assert_eq!(status.more, restored.more);
2529        assert_eq!(status.error, restored.error);
2530        assert_eq!(status.in_xact, restored.in_xact);
2531        assert_eq!(status.count, restored.count);
2532    }
2533
2534    #[test]
2535    fn test_token_parser_done() {
2536        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2537        let data = Bytes::from_static(&[
2538            0xFD, // DONE token type
2539            0x10, 0x00, // status: DONE_COUNT
2540            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2541            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2542        ]);
2543
2544        let mut parser = TokenParser::new(data);
2545        let token = parser.next_token().unwrap().unwrap();
2546
2547        match token {
2548            Token::Done(done) => {
2549                assert!(done.status.count);
2550                assert!(!done.status.more);
2551                assert_eq!(done.cur_cmd, 193);
2552                assert_eq!(done.row_count, 5);
2553            }
2554            _ => panic!("Expected Done token"),
2555        }
2556
2557        // No more tokens
2558        assert!(parser.next_token().unwrap().is_none());
2559    }
2560
2561    #[test]
2562    fn test_env_change_type_from_u8() {
2563        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2564        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2565        assert_eq!(EnvChangeType::from_u8(100), None);
2566    }
2567
2568    #[test]
2569    fn test_colmetadata_no_columns() {
2570        // No metadata marker (0xFFFF)
2571        let data = Bytes::from_static(&[0xFF, 0xFF]);
2572        let mut cursor: &[u8] = &data;
2573        let meta = ColMetaData::decode(&mut cursor).unwrap();
2574        assert!(meta.is_empty());
2575        assert_eq!(meta.column_count(), 0);
2576    }
2577
2578    #[test]
2579    fn test_colmetadata_single_int_column() {
2580        // COLMETADATA with 1 INT column
2581        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
2582        let mut data = BytesMut::new();
2583        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2584        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2585        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2586        data.extend_from_slice(&[0x38]); // TypeId::Int4
2587        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
2588        data.extend_from_slice(&[0x02]); // 2 characters
2589        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
2590
2591        let mut cursor: &[u8] = &data;
2592        let meta = ColMetaData::decode(&mut cursor).unwrap();
2593
2594        assert_eq!(meta.column_count(), 1);
2595        assert_eq!(meta.columns[0].name, "id");
2596        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2597        assert!(meta.columns[0].is_nullable());
2598    }
2599
2600    #[test]
2601    fn test_colmetadata_nvarchar_column() {
2602        // COLMETADATA with 1 NVARCHAR(50) column
2603        let mut data = BytesMut::new();
2604        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2605        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2606        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2607        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
2608        // Type info: max_length (2 bytes) + collation (5 bytes)
2609        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
2610        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
2611        // Column name "name"
2612        data.extend_from_slice(&[0x04]); // 4 characters
2613        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
2614
2615        let mut cursor: &[u8] = &data;
2616        let meta = ColMetaData::decode(&mut cursor).unwrap();
2617
2618        assert_eq!(meta.column_count(), 1);
2619        assert_eq!(meta.columns[0].name, "name");
2620        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
2621        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
2622        assert!(meta.columns[0].type_info.collation.is_some());
2623    }
2624
2625    #[test]
2626    fn test_raw_row_decode_int() {
2627        // Create metadata for a single INT column
2628        let metadata = ColMetaData {
2629            cek_table: None,
2630            columns: vec![ColumnData {
2631                name: "id".to_string(),
2632                type_id: TypeId::Int4,
2633                col_type: 0x38,
2634                flags: 0,
2635                user_type: 0,
2636                type_info: TypeInfo::default(),
2637                crypto_metadata: None,
2638            }],
2639        };
2640
2641        // Row data: just 4 bytes for the int value 42
2642        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
2643        let mut cursor: &[u8] = &data;
2644        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2645
2646        // The raw data should contain the 4 bytes
2647        assert_eq!(row.data.len(), 4);
2648        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
2649    }
2650
2651    #[test]
2652    fn test_raw_row_decode_nullable_int() {
2653        // Create metadata for a nullable INT column (IntN)
2654        let metadata = ColMetaData {
2655            cek_table: None,
2656            columns: vec![ColumnData {
2657                name: "id".to_string(),
2658                type_id: TypeId::IntN,
2659                col_type: 0x26,
2660                flags: 0x01, // nullable
2661                user_type: 0,
2662                type_info: TypeInfo {
2663                    max_length: Some(4),
2664                    ..Default::default()
2665                },
2666                crypto_metadata: None,
2667            }],
2668        };
2669
2670        // Row data with value: 1 byte length + 4 bytes value
2671        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
2672        let mut cursor: &[u8] = &data;
2673        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2674
2675        assert_eq!(row.data.len(), 5);
2676        assert_eq!(row.data[0], 4); // length
2677        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
2678    }
2679
2680    #[test]
2681    fn test_raw_row_decode_null_value() {
2682        // Create metadata for a nullable INT column (IntN)
2683        let metadata = ColMetaData {
2684            cek_table: None,
2685            columns: vec![ColumnData {
2686                name: "id".to_string(),
2687                type_id: TypeId::IntN,
2688                col_type: 0x26,
2689                flags: 0x01, // nullable
2690                user_type: 0,
2691                type_info: TypeInfo {
2692                    max_length: Some(4),
2693                    ..Default::default()
2694                },
2695                crypto_metadata: None,
2696            }],
2697        };
2698
2699        // NULL value: length = 0xFF (for bytelen types)
2700        let data = Bytes::from_static(&[0xFF]);
2701        let mut cursor: &[u8] = &data;
2702        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2703
2704        assert_eq!(row.data.len(), 1);
2705        assert_eq!(row.data[0], 0xFF); // NULL marker
2706    }
2707
2708    #[test]
2709    fn test_nbcrow_null_bitmap() {
2710        let row = NbcRow {
2711            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
2712            data: Bytes::new(),
2713        };
2714
2715        assert!(row.is_null(0));
2716        assert!(!row.is_null(1));
2717        assert!(row.is_null(2));
2718        assert!(!row.is_null(3));
2719    }
2720
2721    #[test]
2722    fn test_token_parser_colmetadata() {
2723        // Build a COLMETADATA token with 1 INT column
2724        let mut data = BytesMut::new();
2725        data.extend_from_slice(&[0x81]); // COLMETADATA token type
2726        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2727        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2728        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2729        data.extend_from_slice(&[0x38]); // TypeId::Int4
2730        data.extend_from_slice(&[0x02]); // column name length
2731        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
2732
2733        let mut parser = TokenParser::new(data.freeze());
2734        let token = parser.next_token().unwrap().unwrap();
2735
2736        match token {
2737            Token::ColMetaData(meta) => {
2738                assert_eq!(meta.column_count(), 1);
2739                assert_eq!(meta.columns[0].name, "id");
2740                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2741            }
2742            _ => panic!("Expected ColMetaData token"),
2743        }
2744    }
2745
2746    #[test]
2747    fn test_token_parser_row_with_metadata() {
2748        // Build metadata
2749        let metadata = ColMetaData {
2750            cek_table: None,
2751            columns: vec![ColumnData {
2752                name: "id".to_string(),
2753                type_id: TypeId::Int4,
2754                col_type: 0x38,
2755                flags: 0,
2756                user_type: 0,
2757                type_info: TypeInfo::default(),
2758                crypto_metadata: None,
2759            }],
2760        };
2761
2762        // Build ROW token
2763        let mut data = BytesMut::new();
2764        data.extend_from_slice(&[0xD1]); // ROW token type
2765        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2766
2767        let mut parser = TokenParser::new(data.freeze());
2768        let token = parser
2769            .next_token_with_metadata(Some(&metadata))
2770            .unwrap()
2771            .unwrap();
2772
2773        match token {
2774            Token::Row(row) => {
2775                assert_eq!(row.data.len(), 4);
2776            }
2777            _ => panic!("Expected Row token"),
2778        }
2779    }
2780
2781    #[test]
2782    fn test_token_parser_row_without_metadata_fails() {
2783        // Build ROW token
2784        let mut data = BytesMut::new();
2785        data.extend_from_slice(&[0xD1]); // ROW token type
2786        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2787
2788        let mut parser = TokenParser::new(data.freeze());
2789        let result = parser.next_token(); // No metadata provided
2790
2791        assert!(result.is_err());
2792    }
2793
2794    #[test]
2795    fn test_token_parser_peek() {
2796        let data = Bytes::from_static(&[
2797            0xFD, // DONE token type
2798            0x10, 0x00, // status
2799            0xC1, 0x00, // cur_cmd
2800            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
2801        ]);
2802
2803        let parser = TokenParser::new(data);
2804        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
2805    }
2806
2807    #[test]
2808    fn test_column_data_fixed_size() {
2809        let col = ColumnData {
2810            name: String::new(),
2811            type_id: TypeId::Int4,
2812            col_type: 0x38,
2813            flags: 0,
2814            user_type: 0,
2815            type_info: TypeInfo::default(),
2816            crypto_metadata: None,
2817        };
2818        assert_eq!(col.fixed_size(), Some(4));
2819
2820        let col2 = ColumnData {
2821            name: String::new(),
2822            type_id: TypeId::NVarChar,
2823            col_type: 0xE7,
2824            flags: 0,
2825            user_type: 0,
2826            type_info: TypeInfo::default(),
2827            crypto_metadata: None,
2828        };
2829        assert_eq!(col2.fixed_size(), None);
2830    }
2831
2832    // ========================================================================
2833    // End-to-End Decode Tests (Wire → Stored → Verification)
2834    // ========================================================================
2835    //
2836    // These tests verify that RawRow::decode_column_value correctly stores
2837    // column values in a format that can be parsed back.
2838
2839    #[test]
2840    fn test_decode_nvarchar_then_intn_roundtrip() {
2841        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
2842        // This tests the scenario from the MCP parameterized query
2843
2844        // Build wire data (what the server sends)
2845        let mut wire_data = BytesMut::new();
2846
2847        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
2848        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
2849        let word = "World";
2850        let utf16: Vec<u16> = word.encode_utf16().collect();
2851        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
2852        for code_unit in &utf16 {
2853            wire_data.put_u16_le(*code_unit);
2854        }
2855
2856        // Column 1: IntN 42 - 1-byte length prefix
2857        wire_data.put_u8(4); // 4 bytes for INT
2858        wire_data.put_i32_le(42);
2859
2860        // Build column metadata
2861        let metadata = ColMetaData {
2862            cek_table: None,
2863            columns: vec![
2864                ColumnData {
2865                    name: "greeting".to_string(),
2866                    type_id: TypeId::NVarChar,
2867                    col_type: 0xE7,
2868                    flags: 0x01,
2869                    user_type: 0,
2870                    type_info: TypeInfo {
2871                        max_length: Some(10), // non-MAX
2872                        precision: None,
2873                        scale: None,
2874                        collation: None,
2875                    },
2876                    crypto_metadata: None,
2877                },
2878                ColumnData {
2879                    name: "number".to_string(),
2880                    type_id: TypeId::IntN,
2881                    col_type: 0x26,
2882                    flags: 0x01,
2883                    user_type: 0,
2884                    type_info: TypeInfo {
2885                        max_length: Some(4),
2886                        precision: None,
2887                        scale: None,
2888                        collation: None,
2889                    },
2890                    crypto_metadata: None,
2891                },
2892            ],
2893        };
2894
2895        // Decode the wire data into stored format
2896        let mut wire_cursor = wire_data.freeze();
2897        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2898
2899        // Verify wire data was fully consumed
2900        assert_eq!(
2901            wire_cursor.remaining(),
2902            0,
2903            "wire data should be fully consumed"
2904        );
2905
2906        // Now parse the stored data
2907        let mut stored_cursor: &[u8] = &raw_row.data;
2908
2909        // Parse column 0 (NVarChar)
2910        // Stored format for non-MAX NVarChar: [2-byte len][data]
2911        assert!(
2912            stored_cursor.remaining() >= 2,
2913            "need at least 2 bytes for length"
2914        );
2915        let len0 = stored_cursor.get_u16_le() as usize;
2916        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
2917        assert!(
2918            stored_cursor.remaining() >= len0,
2919            "need {len0} bytes for data"
2920        );
2921
2922        // Read UTF-16LE and convert to string
2923        let mut utf16_read = Vec::new();
2924        for _ in 0..(len0 / 2) {
2925            utf16_read.push(stored_cursor.get_u16_le());
2926        }
2927        let string0 = String::from_utf16(&utf16_read).unwrap();
2928        assert_eq!(string0, "World", "column 0 should be 'World'");
2929
2930        // Parse column 1 (IntN)
2931        // Stored format for IntN: [1-byte len][data]
2932        assert!(
2933            stored_cursor.remaining() >= 1,
2934            "need at least 1 byte for length"
2935        );
2936        let len1 = stored_cursor.get_u8();
2937        assert_eq!(len1, 4, "IntN length should be 4");
2938        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
2939        let int1 = stored_cursor.get_i32_le();
2940        assert_eq!(int1, 42, "column 1 should be 42");
2941
2942        // Verify stored data was fully consumed
2943        assert_eq!(
2944            stored_cursor.remaining(),
2945            0,
2946            "stored data should be fully consumed"
2947        );
2948    }
2949
2950    #[test]
2951    fn test_decode_nvarchar_max_then_intn_roundtrip() {
2952        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
2953
2954        // Build wire data for PLP NVARCHAR(MAX) + IntN
2955        let mut wire_data = BytesMut::new();
2956
2957        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
2958        // PLP: 8-byte total length, then chunks
2959        let word = "Hello";
2960        let utf16: Vec<u16> = word.encode_utf16().collect();
2961        let byte_len = (utf16.len() * 2) as u64;
2962
2963        wire_data.put_u64_le(byte_len); // total length = 10
2964        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
2965        for code_unit in &utf16 {
2966            wire_data.put_u16_le(*code_unit);
2967        }
2968        wire_data.put_u32_le(0); // terminating zero-length chunk
2969
2970        // Column 1: IntN 99
2971        wire_data.put_u8(4);
2972        wire_data.put_i32_le(99);
2973
2974        // Build metadata with MAX type
2975        let metadata = ColMetaData {
2976            cek_table: None,
2977            columns: vec![
2978                ColumnData {
2979                    name: "text".to_string(),
2980                    type_id: TypeId::NVarChar,
2981                    col_type: 0xE7,
2982                    flags: 0x01,
2983                    user_type: 0,
2984                    type_info: TypeInfo {
2985                        max_length: Some(0xFFFF), // MAX indicator
2986                        precision: None,
2987                        scale: None,
2988                        collation: None,
2989                    },
2990                    crypto_metadata: None,
2991                },
2992                ColumnData {
2993                    name: "num".to_string(),
2994                    type_id: TypeId::IntN,
2995                    col_type: 0x26,
2996                    flags: 0x01,
2997                    user_type: 0,
2998                    type_info: TypeInfo {
2999                        max_length: Some(4),
3000                        precision: None,
3001                        scale: None,
3002                        collation: None,
3003                    },
3004                    crypto_metadata: None,
3005                },
3006            ],
3007        };
3008
3009        // Decode wire data
3010        let mut wire_cursor = wire_data.freeze();
3011        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
3012
3013        // Verify wire data was fully consumed
3014        assert_eq!(
3015            wire_cursor.remaining(),
3016            0,
3017            "wire data should be fully consumed"
3018        );
3019
3020        // Parse stored PLP data for column 0
3021        let mut stored_cursor: &[u8] = &raw_row.data;
3022
3023        // PLP stored format: [8-byte total][chunks...][4-byte 0]
3024        let total_len = stored_cursor.get_u64_le();
3025        assert_eq!(total_len, 10, "PLP total length should be 10");
3026
3027        let chunk_len = stored_cursor.get_u32_le();
3028        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
3029
3030        let mut utf16_read = Vec::new();
3031        for _ in 0..(chunk_len / 2) {
3032            utf16_read.push(stored_cursor.get_u16_le());
3033        }
3034        let string0 = String::from_utf16(&utf16_read).unwrap();
3035        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
3036
3037        let terminator = stored_cursor.get_u32_le();
3038        assert_eq!(terminator, 0, "PLP should end with 0");
3039
3040        // Parse IntN
3041        let len1 = stored_cursor.get_u8();
3042        assert_eq!(len1, 4);
3043        let int1 = stored_cursor.get_i32_le();
3044        assert_eq!(int1, 99, "column 1 should be 99");
3045
3046        // Verify fully consumed
3047        assert_eq!(
3048            stored_cursor.remaining(),
3049            0,
3050            "stored data should be fully consumed"
3051        );
3052    }
3053
3054    // ========================================================================
3055    // ReturnStatus Token Tests
3056    // ========================================================================
3057
3058    #[test]
3059    fn test_return_status_via_parser() {
3060        // RETURNSTATUS token: type (0x79) + value (i32 LE)
3061        let data = Bytes::from_static(&[
3062            0x79, // RETURNSTATUS token type
3063            0x00, 0x00, 0x00, 0x00, // return value = 0 (success)
3064        ]);
3065
3066        let mut parser = TokenParser::new(data);
3067        let token = parser.next_token().unwrap().unwrap();
3068
3069        match token {
3070            Token::ReturnStatus(status) => {
3071                assert_eq!(status, 0);
3072            }
3073            _ => panic!("Expected ReturnStatus token, got {token:?}"),
3074        }
3075
3076        assert!(parser.next_token().unwrap().is_none());
3077    }
3078
3079    #[test]
3080    fn test_return_status_nonzero() {
3081        // Return value = -6 (common for error returns)
3082        let mut buf = BytesMut::new();
3083        buf.put_u8(0x79); // RETURNSTATUS
3084        buf.put_i32_le(-6);
3085
3086        let mut parser = TokenParser::new(buf.freeze());
3087        let token = parser.next_token().unwrap().unwrap();
3088
3089        match token {
3090            Token::ReturnStatus(status) => {
3091                assert_eq!(status, -6);
3092            }
3093            _ => panic!("Expected ReturnStatus token"),
3094        }
3095    }
3096
3097    // ========================================================================
3098    // DoneProc Token Tests
3099    // ========================================================================
3100
3101    #[test]
3102    fn test_done_proc_roundtrip() {
3103        let done = DoneProc {
3104            status: DoneStatus {
3105                more: false,
3106                error: false,
3107                in_xact: false,
3108                count: true,
3109                attn: false,
3110                srverror: false,
3111            },
3112            cur_cmd: 0x00C6, // EXECUTE (198)
3113            row_count: 100,
3114        };
3115
3116        let mut buf = BytesMut::new();
3117        done.encode(&mut buf);
3118
3119        // Verify token type byte
3120        assert_eq!(buf[0], 0xFE);
3121
3122        // Skip token type byte and decode
3123        let mut cursor = &buf[1..];
3124        let decoded = DoneProc::decode(&mut cursor).unwrap();
3125
3126        assert!(decoded.status.count);
3127        assert!(!decoded.status.more);
3128        assert!(!decoded.status.error);
3129        assert_eq!(decoded.cur_cmd, 0x00C6);
3130        assert_eq!(decoded.row_count, 100);
3131    }
3132
3133    #[test]
3134    fn test_done_proc_via_parser() {
3135        let data = Bytes::from_static(&[
3136            0xFE, // DONEPROC token type
3137            0x00, 0x00, // status: no flags
3138            0xC6, 0x00, // cur_cmd: EXECUTE (198)
3139            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 0
3140        ]);
3141
3142        let mut parser = TokenParser::new(data);
3143        let token = parser.next_token().unwrap().unwrap();
3144
3145        match token {
3146            Token::DoneProc(done) => {
3147                assert!(!done.status.count);
3148                assert!(!done.status.more);
3149                assert_eq!(done.cur_cmd, 198);
3150                assert_eq!(done.row_count, 0);
3151            }
3152            _ => panic!("Expected DoneProc token"),
3153        }
3154    }
3155
3156    #[test]
3157    fn test_done_proc_with_error_flag() {
3158        let mut buf = BytesMut::new();
3159        buf.put_u8(0xFE); // DONEPROC
3160        buf.put_u16_le(0x0002); // status: DONE_ERROR
3161        buf.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3162        buf.put_u64_le(0); // row_count
3163
3164        let mut parser = TokenParser::new(buf.freeze());
3165        let token = parser.next_token().unwrap().unwrap();
3166
3167        match token {
3168            Token::DoneProc(done) => {
3169                assert!(done.status.error);
3170                assert!(!done.status.count);
3171                assert!(!done.status.more);
3172            }
3173            _ => panic!("Expected DoneProc token"),
3174        }
3175    }
3176
3177    // ========================================================================
3178    // DoneInProc Token Tests
3179    // ========================================================================
3180
3181    #[test]
3182    fn test_done_in_proc_roundtrip() {
3183        let done = DoneInProc {
3184            status: DoneStatus {
3185                more: true,
3186                error: false,
3187                in_xact: false,
3188                count: true,
3189                attn: false,
3190                srverror: false,
3191            },
3192            cur_cmd: 193, // SELECT
3193            row_count: 7,
3194        };
3195
3196        let mut buf = BytesMut::new();
3197        done.encode(&mut buf);
3198
3199        assert_eq!(buf[0], 0xFF);
3200
3201        let mut cursor = &buf[1..];
3202        let decoded = DoneInProc::decode(&mut cursor).unwrap();
3203
3204        assert!(decoded.status.more);
3205        assert!(decoded.status.count);
3206        assert!(!decoded.status.error);
3207        assert_eq!(decoded.cur_cmd, 193);
3208        assert_eq!(decoded.row_count, 7);
3209    }
3210
3211    #[test]
3212    fn test_done_in_proc_via_parser() {
3213        let data = Bytes::from_static(&[
3214            0xFF, // DONEINPROC token type
3215            0x11, 0x00, // status: MORE | COUNT
3216            0xC1, 0x00, // cur_cmd: SELECT (193)
3217            0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 3
3218        ]);
3219
3220        let mut parser = TokenParser::new(data);
3221        let token = parser.next_token().unwrap().unwrap();
3222
3223        match token {
3224            Token::DoneInProc(done) => {
3225                assert!(done.status.more);
3226                assert!(done.status.count);
3227                assert_eq!(done.cur_cmd, 193);
3228                assert_eq!(done.row_count, 3);
3229            }
3230            _ => panic!("Expected DoneInProc token"),
3231        }
3232    }
3233
3234    // ========================================================================
3235    // ServerError Token Tests
3236    // ========================================================================
3237
3238    #[test]
3239    fn test_server_error_decode() {
3240        // Build a realistic ERROR token (without the 0xAA type byte,
3241        // since decode() is called after the parser strips it).
3242        let mut buf = BytesMut::new();
3243
3244        // Construct the message fields first to compute length
3245        let msg_utf16: Vec<u16> = "Invalid column name 'foo'.".encode_utf16().collect();
3246        let srv_utf16: Vec<u16> = "SQLDB01".encode_utf16().collect();
3247        let proc_utf16: Vec<u16> = "".encode_utf16().collect();
3248
3249        // Length = number(4) + state(1) + class(1)
3250        //        + us_varchar(message): 2 + msg_utf16.len()*2
3251        //        + b_varchar(server): 1 + srv_utf16.len()*2
3252        //        + b_varchar(procedure): 1 + proc_utf16.len()*2
3253        //        + line(4)
3254        let length: u16 = (4
3255            + 1
3256            + 1
3257            + 2
3258            + (msg_utf16.len() * 2)
3259            + 1
3260            + (srv_utf16.len() * 2)
3261            + 1
3262            + (proc_utf16.len() * 2)
3263            + 4) as u16;
3264
3265        buf.put_u16_le(length);
3266        buf.put_i32_le(207); // error number: Invalid column
3267        buf.put_u8(1); // state
3268        buf.put_u8(16); // class (severity 16)
3269
3270        // Message (US_VARCHAR: 2-byte char count + UTF-16LE)
3271        buf.put_u16_le(msg_utf16.len() as u16);
3272        for &c in &msg_utf16 {
3273            buf.put_u16_le(c);
3274        }
3275
3276        // Server (B_VARCHAR: 1-byte char count + UTF-16LE)
3277        buf.put_u8(srv_utf16.len() as u8);
3278        for &c in &srv_utf16 {
3279            buf.put_u16_le(c);
3280        }
3281
3282        // Procedure (B_VARCHAR: empty)
3283        buf.put_u8(proc_utf16.len() as u8);
3284
3285        // Line number
3286        buf.put_i32_le(42);
3287
3288        let mut cursor = buf.freeze();
3289        let error = ServerError::decode(&mut cursor).unwrap();
3290
3291        assert_eq!(error.number, 207);
3292        assert_eq!(error.state, 1);
3293        assert_eq!(error.class, 16);
3294        assert_eq!(error.message, "Invalid column name 'foo'.");
3295        assert_eq!(error.server, "SQLDB01");
3296        assert_eq!(error.procedure, "");
3297        assert_eq!(error.line, 42);
3298    }
3299
3300    #[test]
3301    fn test_server_error_severity_helpers() {
3302        let fatal = ServerError {
3303            number: 4014,
3304            state: 1,
3305            class: 20,
3306            message: "Fatal error".to_string(),
3307            server: String::new(),
3308            procedure: String::new(),
3309            line: 0,
3310        };
3311        assert!(fatal.is_fatal());
3312        assert!(fatal.is_batch_abort());
3313
3314        let batch_abort = ServerError {
3315            number: 547,
3316            state: 0,
3317            class: 16,
3318            message: "Constraint violation".to_string(),
3319            server: String::new(),
3320            procedure: String::new(),
3321            line: 1,
3322        };
3323        assert!(!batch_abort.is_fatal());
3324        assert!(batch_abort.is_batch_abort());
3325
3326        let informational = ServerError {
3327            number: 5701,
3328            state: 2,
3329            class: 10,
3330            message: "Changed db context".to_string(),
3331            server: String::new(),
3332            procedure: String::new(),
3333            line: 0,
3334        };
3335        assert!(!informational.is_fatal());
3336        assert!(!informational.is_batch_abort());
3337    }
3338
3339    #[test]
3340    fn test_server_error_via_parser() {
3341        // Build an ERROR token with the 0xAA type byte for the parser
3342        let mut buf = BytesMut::new();
3343        buf.put_u8(0xAA); // ERROR token type
3344
3345        let msg_utf16: Vec<u16> = "Syntax error".encode_utf16().collect();
3346        let srv_utf16: Vec<u16> = "SRV".encode_utf16().collect();
3347        let proc_utf16: Vec<u16> = "sp_test".encode_utf16().collect();
3348
3349        let length: u16 = (4
3350            + 1
3351            + 1
3352            + 2
3353            + (msg_utf16.len() * 2)
3354            + 1
3355            + (srv_utf16.len() * 2)
3356            + 1
3357            + (proc_utf16.len() * 2)
3358            + 4) as u16;
3359
3360        buf.put_u16_le(length);
3361        buf.put_i32_le(102); // Syntax error
3362        buf.put_u8(1);
3363        buf.put_u8(15);
3364
3365        buf.put_u16_le(msg_utf16.len() as u16);
3366        for &c in &msg_utf16 {
3367            buf.put_u16_le(c);
3368        }
3369        buf.put_u8(srv_utf16.len() as u8);
3370        for &c in &srv_utf16 {
3371            buf.put_u16_le(c);
3372        }
3373        buf.put_u8(proc_utf16.len() as u8);
3374        for &c in &proc_utf16 {
3375            buf.put_u16_le(c);
3376        }
3377        buf.put_i32_le(5);
3378
3379        let mut parser = TokenParser::new(buf.freeze());
3380        let token = parser.next_token().unwrap().unwrap();
3381
3382        match token {
3383            Token::Error(err) => {
3384                assert_eq!(err.number, 102);
3385                assert_eq!(err.class, 15);
3386                assert_eq!(err.message, "Syntax error");
3387                assert_eq!(err.server, "SRV");
3388                assert_eq!(err.procedure, "sp_test");
3389                assert_eq!(err.line, 5);
3390            }
3391            _ => panic!("Expected Error token"),
3392        }
3393    }
3394
3395    // ========================================================================
3396    // ReturnValue Token Tests
3397    // ========================================================================
3398
3399    /// Helper: build a ReturnValue token (without the 0xAC type byte)
3400    /// for an IntN output parameter.
3401    fn build_return_value_intn(
3402        ordinal: u16,
3403        name: &str,
3404        status: u8,
3405        value: Option<i32>,
3406    ) -> BytesMut {
3407        let mut inner = BytesMut::new();
3408
3409        // param_ordinal
3410        inner.put_u16_le(ordinal);
3411
3412        // param_name (B_VARCHAR)
3413        let name_utf16: Vec<u16> = name.encode_utf16().collect();
3414        inner.put_u8(name_utf16.len() as u8);
3415        for &c in &name_utf16 {
3416            inner.put_u16_le(c);
3417        }
3418
3419        // status
3420        inner.put_u8(status);
3421
3422        // user_type (4 bytes)
3423        inner.put_u32_le(0);
3424
3425        // flags (2 bytes)
3426        inner.put_u16_le(0x0001); // nullable
3427
3428        // type_id: IntN = 0x26
3429        inner.put_u8(0x26);
3430
3431        // type_info for IntN: 1-byte max_length
3432        inner.put_u8(4);
3433
3434        // value (TYPE_VARBYTE for IntN: 1-byte length + data)
3435        match value {
3436            Some(v) => {
3437                inner.put_u8(4); // length = 4
3438                inner.put_i32_le(v);
3439            }
3440            None => {
3441                inner.put_u8(0); // length = 0 means NULL
3442            }
3443        }
3444
3445        // RETURNVALUE has no outer length prefix (MS-TDS §2.2.7.18) — the
3446        // decoder walks the inner fields directly after the 0xAC token byte.
3447        inner
3448    }
3449
3450    #[test]
3451    fn test_return_value_int_output() {
3452        let buf = build_return_value_intn(1, "@result", 0x01, Some(42));
3453        let mut cursor = buf.freeze();
3454        let rv = ReturnValue::decode(&mut cursor).unwrap();
3455
3456        assert_eq!(rv.param_ordinal, 1);
3457        assert_eq!(rv.param_name, "@result");
3458        assert_eq!(rv.status, 0x01); // OUTPUT
3459        assert_eq!(rv.col_type, 0x26); // IntN
3460        assert_eq!(rv.type_info.max_length, Some(4));
3461        // Value should contain: length byte (4) + i32 LE (42)
3462        assert_eq!(rv.value.len(), 5);
3463        assert_eq!(rv.value[0], 4);
3464        assert_eq!(
3465            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3466            42
3467        );
3468    }
3469
3470    #[test]
3471    fn test_return_value_null_output() {
3472        let buf = build_return_value_intn(2, "@count", 0x01, None);
3473        let mut cursor = buf.freeze();
3474        let rv = ReturnValue::decode(&mut cursor).unwrap();
3475
3476        assert_eq!(rv.param_ordinal, 2);
3477        assert_eq!(rv.param_name, "@count");
3478        assert_eq!(rv.status, 0x01);
3479        assert_eq!(rv.col_type, 0x26);
3480        // NULL value: length byte = 0
3481        assert_eq!(rv.value.len(), 1);
3482        assert_eq!(rv.value[0], 0);
3483    }
3484
3485    #[test]
3486    fn test_return_value_udf_status() {
3487        // UDF return value has status = 0x02
3488        let buf = build_return_value_intn(0, "@RETURN_VALUE", 0x02, Some(-1));
3489        let mut cursor = buf.freeze();
3490        let rv = ReturnValue::decode(&mut cursor).unwrap();
3491
3492        assert_eq!(rv.param_ordinal, 0);
3493        assert_eq!(rv.param_name, "@RETURN_VALUE");
3494        assert_eq!(rv.status, 0x02); // UDF return value
3495        assert_eq!(rv.value[0], 4);
3496        assert_eq!(
3497            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3498            -1
3499        );
3500    }
3501
3502    #[test]
3503    fn test_return_value_nvarchar_output() {
3504        // Build a ReturnValue for NVARCHAR(100) output parameter
3505        let mut inner = BytesMut::new();
3506
3507        // param_ordinal
3508        inner.put_u16_le(1);
3509
3510        // param_name "@name"
3511        let name_utf16: Vec<u16> = "@name".encode_utf16().collect();
3512        inner.put_u8(name_utf16.len() as u8);
3513        for &c in &name_utf16 {
3514            inner.put_u16_le(c);
3515        }
3516
3517        // status = OUTPUT
3518        inner.put_u8(0x01);
3519        // user_type
3520        inner.put_u32_le(0);
3521        // flags (nullable)
3522        inner.put_u16_le(0x0001);
3523        // type_id: NVarChar = 0xE7
3524        inner.put_u8(0xE7);
3525        // type_info for NVarChar: 2-byte max_length + 5-byte collation
3526        inner.put_u16_le(200); // max 100 chars * 2 bytes
3527        inner.put_u32_le(0x0904D000); // collation LCID
3528        inner.put_u8(0x34); // collation sort_id
3529
3530        // value: "Hello" in UTF-16LE with 2-byte length prefix
3531        let val_utf16: Vec<u16> = "Hello".encode_utf16().collect();
3532        let byte_len = (val_utf16.len() * 2) as u16;
3533        inner.put_u16_le(byte_len);
3534        for &c in &val_utf16 {
3535            inner.put_u16_le(c);
3536        }
3537
3538        let mut cursor = inner.freeze();
3539        let rv = ReturnValue::decode(&mut cursor).unwrap();
3540
3541        assert_eq!(rv.param_ordinal, 1);
3542        assert_eq!(rv.param_name, "@name");
3543        assert_eq!(rv.status, 0x01);
3544        assert_eq!(rv.col_type, 0xE7); // NVarChar
3545        assert_eq!(rv.type_info.max_length, Some(200));
3546        assert!(rv.type_info.collation.is_some());
3547
3548        // Value: 2-byte length (10) + "Hello" in UTF-16LE
3549        assert_eq!(rv.value.len(), 12); // 2 + 10
3550        let val_len = u16::from_le_bytes([rv.value[0], rv.value[1]]);
3551        assert_eq!(val_len, 10);
3552    }
3553
3554    #[test]
3555    fn test_return_value_via_parser() {
3556        // Build a full ReturnValue token with the 0xAC type byte
3557        let mut data = BytesMut::new();
3558        data.put_u8(0xAC); // RETURNVALUE token type
3559        data.extend_from_slice(&build_return_value_intn(0, "@out", 0x01, Some(99)));
3560
3561        let mut parser = TokenParser::new(data.freeze());
3562        let token = parser.next_token().unwrap().unwrap();
3563
3564        match token {
3565            Token::ReturnValue(rv) => {
3566                assert_eq!(rv.param_name, "@out");
3567                assert_eq!(rv.param_ordinal, 0);
3568                assert_eq!(rv.status, 0x01);
3569                assert_eq!(rv.col_type, 0x26);
3570            }
3571            _ => panic!("Expected ReturnValue token"),
3572        }
3573    }
3574
3575    // ========================================================================
3576    // Multi-Token Stream Tests
3577    // ========================================================================
3578
3579    #[test]
3580    fn test_multi_token_stored_proc_response() {
3581        // Simulate a stored procedure response:
3582        // DoneInProc (result set done) → ReturnStatus → DoneProc
3583        let mut data = BytesMut::new();
3584
3585        // Token 1: DONEINPROC — result set with 3 rows
3586        data.put_u8(0xFF); // DONEINPROC
3587        data.put_u16_le(0x0010); // status: COUNT
3588        data.put_u16_le(0x00C1); // cur_cmd: SELECT
3589        data.put_u64_le(3); // row_count
3590
3591        // Token 2: RETURNSTATUS — procedure returned 0
3592        data.put_u8(0x79); // RETURNSTATUS
3593        data.put_i32_le(0);
3594
3595        // Token 3: DONEPROC — final
3596        data.put_u8(0xFE); // DONEPROC
3597        data.put_u16_le(0x0000); // status: no flags
3598        data.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3599        data.put_u64_le(0);
3600
3601        let mut parser = TokenParser::new(data.freeze());
3602
3603        // Token 1: DoneInProc
3604        let t1 = parser.next_token().unwrap().unwrap();
3605        match t1 {
3606            Token::DoneInProc(done) => {
3607                assert!(done.status.count);
3608                assert_eq!(done.row_count, 3);
3609                assert_eq!(done.cur_cmd, 193);
3610            }
3611            _ => panic!("Expected DoneInProc, got {t1:?}"),
3612        }
3613
3614        // Token 2: ReturnStatus
3615        let t2 = parser.next_token().unwrap().unwrap();
3616        match t2 {
3617            Token::ReturnStatus(status) => {
3618                assert_eq!(status, 0);
3619            }
3620            _ => panic!("Expected ReturnStatus, got {t2:?}"),
3621        }
3622
3623        // Token 3: DoneProc
3624        let t3 = parser.next_token().unwrap().unwrap();
3625        match t3 {
3626            Token::DoneProc(done) => {
3627                assert!(!done.status.count);
3628                assert!(!done.status.more);
3629                assert_eq!(done.cur_cmd, 198);
3630            }
3631            _ => panic!("Expected DoneProc, got {t3:?}"),
3632        }
3633
3634        // No more tokens
3635        assert!(parser.next_token().unwrap().is_none());
3636    }
3637
3638    #[test]
3639    fn test_multi_token_error_in_stream() {
3640        // Simulate: ERROR → DONE (error during query)
3641        let mut data = BytesMut::new();
3642
3643        // Token 1: ERROR
3644        data.put_u8(0xAA);
3645
3646        let msg_utf16: Vec<u16> = "Deadlock".encode_utf16().collect();
3647        let srv_utf16: Vec<u16> = "DB1".encode_utf16().collect();
3648
3649        let length: u16 = (4 + 1 + 1
3650            + 2 + (msg_utf16.len() * 2)
3651            + 1 + (srv_utf16.len() * 2)
3652            + 1  // empty procedure
3653            + 4) as u16;
3654
3655        data.put_u16_le(length);
3656        data.put_i32_le(1205); // deadlock
3657        data.put_u8(51); // state
3658        data.put_u8(13); // class
3659
3660        data.put_u16_le(msg_utf16.len() as u16);
3661        for &c in &msg_utf16 {
3662            data.put_u16_le(c);
3663        }
3664        data.put_u8(srv_utf16.len() as u8);
3665        for &c in &srv_utf16 {
3666            data.put_u16_le(c);
3667        }
3668        data.put_u8(0); // empty procedure
3669        data.put_i32_le(0);
3670
3671        // Token 2: DONE with error flag
3672        data.put_u8(0xFD);
3673        data.put_u16_le(0x0002); // DONE_ERROR
3674        data.put_u16_le(0x00C1); // SELECT
3675        data.put_u64_le(0);
3676
3677        let mut parser = TokenParser::new(data.freeze());
3678
3679        // Token 1: Error
3680        let t1 = parser.next_token().unwrap().unwrap();
3681        match t1 {
3682            Token::Error(err) => {
3683                assert_eq!(err.number, 1205);
3684                assert_eq!(err.class, 13);
3685                assert_eq!(err.message, "Deadlock");
3686                assert_eq!(err.server, "DB1");
3687            }
3688            _ => panic!("Expected Error token, got {t1:?}"),
3689        }
3690
3691        // Token 2: Done with error
3692        let t2 = parser.next_token().unwrap().unwrap();
3693        match t2 {
3694            Token::Done(done) => {
3695                assert!(done.status.error);
3696                assert!(!done.status.count);
3697            }
3698            _ => panic!("Expected Done token, got {t2:?}"),
3699        }
3700
3701        assert!(parser.next_token().unwrap().is_none());
3702    }
3703
3704    #[test]
3705    fn test_multi_token_proc_with_return_value() {
3706        // Simulate stored proc: ReturnValue → ReturnStatus → DoneProc
3707        let mut data = BytesMut::new();
3708
3709        // Token 1: ReturnValue (@result = 42)
3710        data.put_u8(0xAC);
3711        data.extend_from_slice(&build_return_value_intn(1, "@result", 0x01, Some(42)));
3712
3713        // Token 2: ReturnStatus = 0
3714        data.put_u8(0x79);
3715        data.put_i32_le(0);
3716
3717        // Token 3: DoneProc
3718        data.put_u8(0xFE);
3719        data.put_u16_le(0x0000);
3720        data.put_u16_le(0x00C6);
3721        data.put_u64_le(0);
3722
3723        let mut parser = TokenParser::new(data.freeze());
3724
3725        let t1 = parser.next_token().unwrap().unwrap();
3726        match t1 {
3727            Token::ReturnValue(rv) => {
3728                assert_eq!(rv.param_name, "@result");
3729                assert_eq!(rv.param_ordinal, 1);
3730            }
3731            _ => panic!("Expected ReturnValue, got {t1:?}"),
3732        }
3733
3734        let t2 = parser.next_token().unwrap().unwrap();
3735        assert!(matches!(t2, Token::ReturnStatus(0)));
3736
3737        let t3 = parser.next_token().unwrap().unwrap();
3738        assert!(matches!(t3, Token::DoneProc(_)));
3739
3740        assert!(parser.next_token().unwrap().is_none());
3741    }
3742
3743    // ========================================================================
3744    // EOF / Truncation Edge Cases
3745    // ========================================================================
3746
3747    #[test]
3748    fn test_return_status_truncated() {
3749        // Only 3 bytes instead of 4 for i32
3750        let data = Bytes::from_static(&[0x79, 0x01, 0x02, 0x03]);
3751        let mut parser = TokenParser::new(data);
3752        assert!(parser.next_token().is_err());
3753    }
3754
3755    #[test]
3756    fn test_done_proc_truncated() {
3757        // Only 8 bytes instead of 12
3758        let data = Bytes::from_static(&[0xFE, 0x00, 0x00, 0xC1, 0x00, 0x01, 0x00, 0x00, 0x00]);
3759        let mut parser = TokenParser::new(data);
3760        assert!(parser.next_token().is_err());
3761    }
3762
3763    #[test]
3764    fn test_server_error_truncated() {
3765        // ERROR token with only the length field (body truncated)
3766        let data = Bytes::from_static(&[0xAA, 0x20, 0x00]);
3767        let mut parser = TokenParser::new(data);
3768        assert!(parser.next_token().is_err());
3769    }
3770}