Skip to main content

tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,no_run
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! fn parse(data: Bytes) -> Result<(), tds_protocol::ProtocolError> {
19//!     let mut parser = TokenParser::new(data);
20//!
21//!     while let Some(token) = parser.next_token()? {
22//!         match token {
23//!             Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!             Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!             _ => {}
26//!         }
27//!     }
28//!     Ok(())
29//! }
30//! ```
31
32use bytes::{Buf, BufMut, Bytes};
33
34use crate::codec::{read_b_varchar, read_us_varchar};
35use crate::error::ProtocolError;
36use crate::prelude::*;
37use crate::types::TypeId;
38
39/// Token type identifier.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
41#[repr(u8)]
42#[non_exhaustive]
43pub enum TokenType {
44    /// Column metadata (COLMETADATA).
45    ColMetaData = 0x81,
46    /// Error message (ERROR).
47    Error = 0xAA,
48    /// Informational message (INFO).
49    Info = 0xAB,
50    /// Login acknowledgment (LOGINACK).
51    LoginAck = 0xAD,
52    /// Row data (ROW).
53    Row = 0xD1,
54    /// Null bitmap compressed row (NBCROW).
55    NbcRow = 0xD2,
56    /// Environment change (ENVCHANGE).
57    EnvChange = 0xE3,
58    /// SSPI authentication (SSPI).
59    Sspi = 0xED,
60    /// Done (DONE).
61    Done = 0xFD,
62    /// Done in procedure (DONEINPROC).
63    DoneInProc = 0xFF,
64    /// Done procedure (DONEPROC).
65    DoneProc = 0xFE,
66    /// Return status (RETURNSTATUS).
67    ReturnStatus = 0x79,
68    /// Return value (RETURNVALUE).
69    ReturnValue = 0xAC,
70    /// Order (ORDER).
71    Order = 0xA9,
72    /// Feature extension acknowledgment (FEATUREEXTACK).
73    FeatureExtAck = 0xAE,
74    /// Session state (SESSIONSTATE).
75    SessionState = 0xE4,
76    /// Federated authentication info (FEDAUTHINFO).
77    FedAuthInfo = 0xEE,
78    /// Column info (COLINFO).
79    ColInfo = 0xA5,
80    /// Table name (TABNAME).
81    TabName = 0xA4,
82    /// Offset (OFFSET).
83    Offset = 0x78,
84}
85
86impl TokenType {
87    /// Create a token type from a raw byte.
88    pub fn from_u8(value: u8) -> Option<Self> {
89        match value {
90            0x81 => Some(Self::ColMetaData),
91            0xAA => Some(Self::Error),
92            0xAB => Some(Self::Info),
93            0xAD => Some(Self::LoginAck),
94            0xD1 => Some(Self::Row),
95            0xD2 => Some(Self::NbcRow),
96            0xE3 => Some(Self::EnvChange),
97            0xED => Some(Self::Sspi),
98            0xFD => Some(Self::Done),
99            0xFF => Some(Self::DoneInProc),
100            0xFE => Some(Self::DoneProc),
101            0x79 => Some(Self::ReturnStatus),
102            0xAC => Some(Self::ReturnValue),
103            0xA9 => Some(Self::Order),
104            0xAE => Some(Self::FeatureExtAck),
105            0xE4 => Some(Self::SessionState),
106            0xEE => Some(Self::FedAuthInfo),
107            0xA5 => Some(Self::ColInfo),
108            0xA4 => Some(Self::TabName),
109            0x78 => Some(Self::Offset),
110            _ => None,
111        }
112    }
113}
114
115/// Parsed TDS token.
116///
117/// This enum represents all possible tokens that can be received from SQL Server.
118/// Each variant contains the parsed token data.
119#[derive(Debug, Clone)]
120#[non_exhaustive]
121pub enum Token {
122    /// Column metadata describing result set structure.
123    ColMetaData(ColMetaData),
124    /// Row data.
125    Row(RawRow),
126    /// Null bitmap compressed row.
127    NbcRow(NbcRow),
128    /// Completion of a SQL statement.
129    Done(Done),
130    /// Completion of a stored procedure.
131    DoneProc(DoneProc),
132    /// Completion within a stored procedure.
133    DoneInProc(DoneInProc),
134    /// Return status from stored procedure.
135    ReturnStatus(i32),
136    /// Return value from stored procedure.
137    ReturnValue(ReturnValue),
138    /// Error message from server.
139    Error(ServerError),
140    /// Informational message from server.
141    Info(ServerInfo),
142    /// Login acknowledgment.
143    LoginAck(LoginAck),
144    /// Environment change notification.
145    EnvChange(EnvChange),
146    /// Column ordering information.
147    Order(Order),
148    /// Feature extension acknowledgment.
149    FeatureExtAck(FeatureExtAck),
150    /// SSPI authentication data.
151    Sspi(SspiToken),
152    /// Session state information.
153    SessionState(SessionState),
154    /// Federated authentication info.
155    FedAuthInfo(FedAuthInfo),
156}
157
158/// Column metadata token.
159#[derive(Debug, Clone, Default)]
160pub struct ColMetaData {
161    /// Column definitions.
162    pub columns: Vec<ColumnData>,
163    /// CEK table for Always Encrypted result sets.
164    /// Present only when the server sends encrypted column metadata.
165    pub cek_table: Option<crate::crypto::CekTable>,
166}
167
168/// Column definition within metadata.
169#[derive(Debug, Clone)]
170pub struct ColumnData {
171    /// Column name.
172    pub name: String,
173    /// Column data type ID.
174    pub type_id: TypeId,
175    /// Column data type raw byte (for unknown types).
176    pub col_type: u8,
177    /// Column flags.
178    pub flags: u16,
179    /// User type ID.
180    pub user_type: u32,
181    /// Type-specific metadata.
182    pub type_info: TypeInfo,
183    /// Per-column encryption metadata (Always Encrypted).
184    /// Present only for columns with the encrypted flag (0x0800) set.
185    pub crypto_metadata: Option<crate::crypto::CryptoMetadata>,
186}
187
188/// Type-specific metadata.
189#[derive(Debug, Clone, Default)]
190pub struct TypeInfo {
191    /// Maximum length for variable-length types.
192    pub max_length: Option<u32>,
193    /// Precision for numeric types.
194    pub precision: Option<u8>,
195    /// Scale for numeric types.
196    pub scale: Option<u8>,
197    /// Collation for string types.
198    pub collation: Option<Collation>,
199}
200
201/// SQL Server collation.
202///
203/// Collations in SQL Server define the character encoding and sorting rules
204/// for string data. For `VARCHAR` columns, the collation determines which
205/// code page (character encoding) is used to store the data.
206///
207/// # Encoding Support
208///
209/// When the `encoding` feature is enabled, the [`Collation::encoding()`] method
210/// returns the appropriate [`encoding_rs::Encoding`] for decoding `VARCHAR` data.
211///
212/// # Example
213///
214/// ```rust,ignore
215/// use tds_protocol::token::Collation;
216///
217/// let collation = Collation { lcid: 0x0804, sort_id: 0 }; // Chinese (PRC)
218/// if let Some(encoding) = collation.encoding() {
219///     let (decoded, _, _) = encoding.decode(raw_bytes);
220///     // decoded is now proper Chinese text
221/// }
222/// ```
223#[derive(Debug, Clone, Copy, Default)]
224pub struct Collation {
225    /// Locale ID (LCID).
226    ///
227    /// The LCID encodes both the language and region. The lower 16 bits
228    /// contain the primary language ID, and bits 16-19 contain the sort ID
229    /// for some collations.
230    ///
231    /// For UTF-8 collations (SQL Server 2019+), bit 27 (0x0800_0000) is set.
232    pub lcid: u32,
233    /// Sort ID.
234    ///
235    /// Used with certain collations to specify sorting behavior.
236    pub sort_id: u8,
237}
238
239impl Collation {
240    /// Create a `Collation` from the 5-byte TDS wire format.
241    ///
242    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
243    pub fn from_bytes(bytes: &[u8; 5]) -> Self {
244        Self {
245            lcid: u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
246            sort_id: bytes[4],
247        }
248    }
249
250    /// Serialize to the 5-byte TDS wire format.
251    ///
252    /// Format: 4 bytes LCID (little-endian u32) + 1 byte sort ID.
253    pub fn to_bytes(&self) -> [u8; 5] {
254        let b = self.lcid.to_le_bytes();
255        [b[0], b[1], b[2], b[3], self.sort_id]
256    }
257
258    /// Returns the character encoding for this collation.
259    ///
260    /// This method maps the collation's LCID to the appropriate character
261    /// encoding from the `encoding_rs` crate.
262    ///
263    /// # Returns
264    ///
265    /// - `Some(&Encoding)` - The encoding to use for decoding `VARCHAR` data
266    /// - `None` - If the collation uses UTF-8 (no transcoding needed) or
267    ///   the LCID is not recognized (caller should use Windows-1252 fallback)
268    ///
269    /// # UTF-8 Collations
270    ///
271    /// SQL Server 2019+ supports UTF-8 collations (identified by the `_UTF8`
272    /// suffix). These return `None` because no transcoding is needed.
273    ///
274    /// # Example
275    ///
276    /// ```rust,ignore
277    /// let collation = Collation { lcid: 0x0419, sort_id: 0 }; // Russian
278    /// if let Some(encoding) = collation.encoding() {
279    ///     // encoding is Windows-1251 for Cyrillic
280    ///     let (text, _, had_errors) = encoding.decode(&raw_bytes);
281    /// }
282    /// ```
283    #[cfg(feature = "encoding")]
284    pub fn encoding(&self) -> Option<&'static encoding_rs::Encoding> {
285        crate::collation::encoding_for_lcid(self.lcid)
286    }
287
288    /// Returns whether this collation uses UTF-8 encoding.
289    ///
290    /// UTF-8 collations were introduced in SQL Server 2019 and are
291    /// identified by the `_UTF8` suffix in the collation name.
292    #[cfg(feature = "encoding")]
293    pub fn is_utf8(&self) -> bool {
294        crate::collation::is_utf8_collation(self.lcid)
295    }
296
297    /// Returns the Windows code page number for this collation.
298    ///
299    /// Useful for error messages and debugging.
300    ///
301    /// # Returns
302    ///
303    /// The code page number (e.g., 1252 for Western European, 932 for Japanese).
304    #[cfg(feature = "encoding")]
305    pub fn code_page(&self) -> Option<u16> {
306        crate::collation::code_page_for_lcid(self.lcid)
307    }
308
309    /// Returns the encoding name for this collation.
310    ///
311    /// Useful for error messages and debugging.
312    #[cfg(feature = "encoding")]
313    pub fn encoding_name(&self) -> &'static str {
314        crate::collation::encoding_name_for_lcid(self.lcid)
315    }
316}
317
318/// Raw row data (not yet decoded).
319#[derive(Debug, Clone)]
320pub struct RawRow {
321    /// Raw column values.
322    pub data: bytes::Bytes,
323}
324
325/// Null bitmap compressed row.
326#[derive(Debug, Clone)]
327pub struct NbcRow {
328    /// Null bitmap.
329    pub null_bitmap: Vec<u8>,
330    /// Raw non-null column values.
331    pub data: bytes::Bytes,
332}
333
334/// Done token indicating statement completion.
335#[derive(Debug, Clone, Copy)]
336pub struct Done {
337    /// Status flags.
338    pub status: DoneStatus,
339    /// Current command.
340    pub cur_cmd: u16,
341    /// Row count (if applicable).
342    pub row_count: u64,
343}
344
345/// Done status flags.
346#[derive(Debug, Clone, Copy, Default)]
347#[non_exhaustive]
348pub struct DoneStatus {
349    /// More results follow.
350    pub more: bool,
351    /// Error occurred.
352    pub error: bool,
353    /// Transaction in progress.
354    pub in_xact: bool,
355    /// Row count is valid.
356    pub count: bool,
357    /// Attention acknowledgment.
358    pub attn: bool,
359    /// Server error caused statement termination.
360    pub srverror: bool,
361}
362
363/// Done in procedure token.
364#[derive(Debug, Clone, Copy)]
365pub struct DoneInProc {
366    /// Status flags.
367    pub status: DoneStatus,
368    /// Current command.
369    pub cur_cmd: u16,
370    /// Row count.
371    pub row_count: u64,
372}
373
374/// Done procedure token.
375#[derive(Debug, Clone, Copy)]
376pub struct DoneProc {
377    /// Status flags.
378    pub status: DoneStatus,
379    /// Current command.
380    pub cur_cmd: u16,
381    /// Row count.
382    pub row_count: u64,
383}
384
385/// Return value from stored procedure.
386#[derive(Debug, Clone)]
387#[non_exhaustive]
388pub struct ReturnValue {
389    /// Parameter ordinal.
390    pub param_ordinal: u16,
391    /// Parameter name.
392    pub param_name: String,
393    /// Status flags.
394    pub status: u8,
395    /// User type.
396    pub user_type: u32,
397    /// Type flags.
398    pub flags: u16,
399    /// Raw column type byte from the wire.
400    pub col_type: u8,
401    /// Type info.
402    pub type_info: TypeInfo,
403    /// Value data.
404    pub value: bytes::Bytes,
405}
406
407/// Server error message.
408#[derive(Debug, Clone)]
409pub struct ServerError {
410    /// Error number.
411    pub number: i32,
412    /// Error state.
413    pub state: u8,
414    /// Error severity class.
415    pub class: u8,
416    /// Error message text.
417    pub message: String,
418    /// Server name.
419    pub server: String,
420    /// Procedure name.
421    pub procedure: String,
422    /// Line number.
423    pub line: i32,
424}
425
426/// Server informational message.
427#[derive(Debug, Clone)]
428pub struct ServerInfo {
429    /// Info number.
430    pub number: i32,
431    /// Info state.
432    pub state: u8,
433    /// Info class (severity).
434    pub class: u8,
435    /// Info message text.
436    pub message: String,
437    /// Server name.
438    pub server: String,
439    /// Procedure name.
440    pub procedure: String,
441    /// Line number.
442    pub line: i32,
443}
444
445/// Login acknowledgment token.
446#[derive(Debug, Clone)]
447pub struct LoginAck {
448    /// Interface type.
449    pub interface: u8,
450    /// TDS version.
451    pub tds_version: u32,
452    /// Program name.
453    pub prog_name: String,
454    /// Program version.
455    pub prog_version: u32,
456}
457
458/// Environment change token.
459#[derive(Debug, Clone)]
460pub struct EnvChange {
461    /// Type of environment change.
462    pub env_type: EnvChangeType,
463    /// New value.
464    pub new_value: EnvChangeValue,
465    /// Old value.
466    pub old_value: EnvChangeValue,
467}
468
469/// Environment change type.
470#[derive(Debug, Clone, Copy, PartialEq, Eq)]
471#[repr(u8)]
472#[non_exhaustive]
473pub enum EnvChangeType {
474    /// Database changed.
475    Database = 1,
476    /// Language changed.
477    Language = 2,
478    /// Character set changed.
479    CharacterSet = 3,
480    /// Packet size changed.
481    PacketSize = 4,
482    /// Unicode data sorting locale ID.
483    UnicodeSortingLocalId = 5,
484    /// Unicode comparison flags.
485    UnicodeComparisonFlags = 6,
486    /// SQL collation.
487    SqlCollation = 7,
488    /// Begin transaction.
489    BeginTransaction = 8,
490    /// Commit transaction.
491    CommitTransaction = 9,
492    /// Rollback transaction.
493    RollbackTransaction = 10,
494    /// Enlist DTC transaction.
495    EnlistDtcTransaction = 11,
496    /// Defect DTC transaction.
497    DefectTransaction = 12,
498    /// Real-time log shipping.
499    RealTimeLogShipping = 13,
500    /// Promote transaction.
501    PromoteTransaction = 15,
502    /// Transaction manager address.
503    TransactionManagerAddress = 16,
504    /// Transaction ended.
505    TransactionEnded = 17,
506    /// Reset connection completion acknowledgment.
507    ResetConnectionCompletionAck = 18,
508    /// User instance started.
509    UserInstanceStarted = 19,
510    /// Routing information.
511    Routing = 20,
512}
513
514/// Environment change value.
515#[derive(Debug, Clone)]
516#[non_exhaustive]
517pub enum EnvChangeValue {
518    /// String value.
519    String(String),
520    /// Binary value.
521    Binary(bytes::Bytes),
522    /// Routing information.
523    Routing {
524        /// Host name.
525        host: String,
526        /// Port number.
527        port: u16,
528    },
529}
530
531/// Column ordering information.
532#[derive(Debug, Clone)]
533pub struct Order {
534    /// Ordered column indices.
535    pub columns: Vec<u16>,
536}
537
538/// Feature extension acknowledgment.
539#[derive(Debug, Clone)]
540pub struct FeatureExtAck {
541    /// Acknowledged features.
542    pub features: Vec<FeatureAck>,
543}
544
545/// Individual feature acknowledgment.
546#[derive(Debug, Clone)]
547pub struct FeatureAck {
548    /// Feature ID.
549    pub feature_id: u8,
550    /// Feature data.
551    pub data: bytes::Bytes,
552}
553
554/// SSPI authentication token.
555#[derive(Debug, Clone)]
556pub struct SspiToken {
557    /// SSPI data.
558    pub data: bytes::Bytes,
559}
560
561/// Session state token.
562#[derive(Debug, Clone)]
563pub struct SessionState {
564    /// Session state data.
565    pub data: bytes::Bytes,
566}
567
568/// Federated authentication info.
569#[derive(Debug, Clone)]
570pub struct FedAuthInfo {
571    /// STS URL.
572    pub sts_url: String,
573    /// Service principal name.
574    pub spn: String,
575}
576
577// =============================================================================
578// ColMetaData and Row Parsing Implementation
579// =============================================================================
580
581/// Decode collation information (5 bytes).
582///
583/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
584pub(crate) fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
585    if src.remaining() < 5 {
586        return Err(ProtocolError::UnexpectedEof);
587    }
588    // Collation: LCID (4 bytes) + Sort ID (1 byte)
589    let lcid = src.get_u32_le();
590    let sort_id = src.get_u8();
591    Ok(Collation { lcid, sort_id })
592}
593
594/// Decode type-specific metadata for a column based on its TypeId.
595///
596/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
597pub(crate) fn decode_type_info(
598    src: &mut impl Buf,
599    type_id: TypeId,
600    col_type: u8,
601) -> Result<TypeInfo, ProtocolError> {
602    match type_id {
603        // Fixed-length types have no additional metadata
604        TypeId::Null => Ok(TypeInfo::default()),
605        TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
606        TypeId::Int2 => Ok(TypeInfo::default()),
607        TypeId::Int4 => Ok(TypeInfo::default()),
608        TypeId::Int8 => Ok(TypeInfo::default()),
609        TypeId::Float4 => Ok(TypeInfo::default()),
610        TypeId::Float8 => Ok(TypeInfo::default()),
611        TypeId::Money => Ok(TypeInfo::default()),
612        TypeId::Money4 => Ok(TypeInfo::default()),
613        TypeId::DateTime => Ok(TypeInfo::default()),
614        TypeId::DateTime4 => Ok(TypeInfo::default()),
615
616        // Variable length integer/float/money (1-byte max length)
617        TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
618            if src.remaining() < 1 {
619                return Err(ProtocolError::UnexpectedEof);
620            }
621            let max_length = src.get_u8() as u32;
622            Ok(TypeInfo {
623                max_length: Some(max_length),
624                ..Default::default()
625            })
626        }
627
628        // GUID has 1-byte length
629        TypeId::Guid => {
630            if src.remaining() < 1 {
631                return Err(ProtocolError::UnexpectedEof);
632            }
633            let max_length = src.get_u8() as u32;
634            Ok(TypeInfo {
635                max_length: Some(max_length),
636                ..Default::default()
637            })
638        }
639
640        // Decimal/Numeric types (1-byte length + precision + scale)
641        TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
642            if src.remaining() < 3 {
643                return Err(ProtocolError::UnexpectedEof);
644            }
645            let max_length = src.get_u8() as u32;
646            let precision = src.get_u8();
647            let scale = src.get_u8();
648            Ok(TypeInfo {
649                max_length: Some(max_length),
650                precision: Some(precision),
651                scale: Some(scale),
652                ..Default::default()
653            })
654        }
655
656        // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
657        TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
658            if src.remaining() < 1 {
659                return Err(ProtocolError::UnexpectedEof);
660            }
661            let max_length = src.get_u8() as u32;
662            Ok(TypeInfo {
663                max_length: Some(max_length),
664                ..Default::default()
665            })
666        }
667
668        // Big varchar/binary with 2-byte length + collation for strings
669        TypeId::BigVarChar | TypeId::BigChar => {
670            if src.remaining() < 7 {
671                // 2 (length) + 5 (collation)
672                return Err(ProtocolError::UnexpectedEof);
673            }
674            let max_length = src.get_u16_le() as u32;
675            let collation = decode_collation(src)?;
676            Ok(TypeInfo {
677                max_length: Some(max_length),
678                collation: Some(collation),
679                ..Default::default()
680            })
681        }
682
683        // Big binary (2-byte length, no collation)
684        TypeId::BigVarBinary | TypeId::BigBinary => {
685            if src.remaining() < 2 {
686                return Err(ProtocolError::UnexpectedEof);
687            }
688            let max_length = src.get_u16_le() as u32;
689            Ok(TypeInfo {
690                max_length: Some(max_length),
691                ..Default::default()
692            })
693        }
694
695        // Unicode strings (NChar, NVarChar) - 2-byte length + collation
696        TypeId::NChar | TypeId::NVarChar => {
697            if src.remaining() < 7 {
698                // 2 (length) + 5 (collation)
699                return Err(ProtocolError::UnexpectedEof);
700            }
701            let max_length = src.get_u16_le() as u32;
702            let collation = decode_collation(src)?;
703            Ok(TypeInfo {
704                max_length: Some(max_length),
705                collation: Some(collation),
706                ..Default::default()
707            })
708        }
709
710        // Date type (no additional metadata)
711        TypeId::Date => Ok(TypeInfo::default()),
712
713        // Time, DateTime2, DateTimeOffset have scale
714        TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
715            if src.remaining() < 1 {
716                return Err(ProtocolError::UnexpectedEof);
717            }
718            let scale = src.get_u8();
719            Ok(TypeInfo {
720                scale: Some(scale),
721                ..Default::default()
722            })
723        }
724
725        // Text/NText/Image (deprecated LOB types)
726        TypeId::Text | TypeId::NText | TypeId::Image => {
727            // These have complex metadata: length (4) + collation (5) + table name parts
728            if src.remaining() < 4 {
729                return Err(ProtocolError::UnexpectedEof);
730            }
731            let max_length = src.get_u32_le();
732
733            // For Text/NText, read collation
734            let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
735                if src.remaining() < 5 {
736                    return Err(ProtocolError::UnexpectedEof);
737                }
738                Some(decode_collation(src)?)
739            } else {
740                None
741            };
742
743            // Skip table name parts (variable length)
744            // Format: numParts (1 byte) followed by us_varchar for each part
745            if src.remaining() < 1 {
746                return Err(ProtocolError::UnexpectedEof);
747            }
748            let num_parts = src.get_u8();
749            for _ in 0..num_parts {
750                // Read and discard table name part
751                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
752            }
753
754            Ok(TypeInfo {
755                max_length: Some(max_length),
756                collation,
757                ..Default::default()
758            })
759        }
760
761        // XML type
762        TypeId::Xml => {
763            if src.remaining() < 1 {
764                return Err(ProtocolError::UnexpectedEof);
765            }
766            let schema_present = src.get_u8();
767
768            if schema_present != 0 {
769                // Read schema info (3 us_varchar strings)
770                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
771                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
772                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
773            }
774
775            Ok(TypeInfo::default())
776        }
777
778        // UDT (User-defined type) - complex metadata
779        TypeId::Udt => {
780            // Max length (2 bytes)
781            if src.remaining() < 2 {
782                return Err(ProtocolError::UnexpectedEof);
783            }
784            let max_length = src.get_u16_le() as u32;
785
786            // UDT metadata: db name, schema name, type name, assembly qualified name
787            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
788            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
789            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
790            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
791
792            Ok(TypeInfo {
793                max_length: Some(max_length),
794                ..Default::default()
795            })
796        }
797
798        // Table-valued parameter - complex metadata (skip for now)
799        TypeId::Tvp => {
800            // TVP has very complex metadata, not commonly used
801            // For now, we can't properly parse this
802            Err(ProtocolError::InvalidTokenType(col_type))
803        }
804
805        // SQL Variant - 4-byte length
806        TypeId::Variant => {
807            if src.remaining() < 4 {
808                return Err(ProtocolError::UnexpectedEof);
809            }
810            let max_length = src.get_u32_le();
811            Ok(TypeInfo {
812                max_length: Some(max_length),
813                ..Default::default()
814            })
815        }
816    }
817}
818
819impl ColMetaData {
820    /// Special value indicating no metadata.
821    pub const NO_METADATA: u16 = 0xFFFF;
822
823    /// Decode a COLMETADATA token from bytes.
824    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
825        if src.remaining() < 2 {
826            return Err(ProtocolError::UnexpectedEof);
827        }
828
829        let column_count = src.get_u16_le();
830
831        // 0xFFFF means no metadata present
832        if column_count == Self::NO_METADATA {
833            return Ok(Self {
834                columns: Vec::new(),
835                cek_table: None,
836            });
837        }
838
839        let mut columns = Vec::with_capacity(column_count as usize);
840
841        for _ in 0..column_count {
842            let column = Self::decode_column(src)?;
843            columns.push(column);
844        }
845
846        Ok(Self {
847            columns,
848            cek_table: None,
849        })
850    }
851
852    /// Decode a single column from the metadata.
853    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
854        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
855        if src.remaining() < 7 {
856            return Err(ProtocolError::UnexpectedEof);
857        }
858
859        let user_type = src.get_u32_le();
860        let flags = src.get_u16_le();
861        let col_type = src.get_u8();
862
863        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null); // Default to Null for unknown types
864
865        // Parse type-specific metadata
866        let type_info = decode_type_info(src, type_id, col_type)?;
867
868        // Read column name (B_VARCHAR format - 1 byte length in characters)
869        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
870
871        Ok(ColumnData {
872            name,
873            type_id,
874            col_type,
875            flags,
876            user_type,
877            type_info,
878            crypto_metadata: None,
879        })
880    }
881
882    /// Decode a COLMETADATA token with Always Encrypted support.
883    ///
884    /// When column encryption was negotiated in Login7, the server sends a CekTable
885    /// before column definitions and per-column CryptoMetadata for encrypted columns.
886    ///
887    /// # Wire Format (with encryption)
888    ///
889    /// ```text
890    /// column_count: USHORT
891    /// cek_table: CekTable (always present when encryption negotiated)
892    /// columns: ColumnData[column_count] (with CryptoMetadata for encrypted columns)
893    /// ```
894    pub fn decode_encrypted(src: &mut impl Buf) -> Result<Self, ProtocolError> {
895        if src.remaining() < 2 {
896            return Err(ProtocolError::UnexpectedEof);
897        }
898
899        let column_count = src.get_u16_le();
900
901        if column_count == Self::NO_METADATA {
902            return Ok(Self {
903                columns: Vec::new(),
904                cek_table: None,
905            });
906        }
907
908        // Parse CEK table (always present when encryption was negotiated)
909        let cek_table = crate::crypto::CekTable::decode(src)?;
910
911        let mut columns = Vec::with_capacity(column_count as usize);
912
913        for _ in 0..column_count {
914            let column = Self::decode_column_encrypted(src)?;
915            columns.push(column);
916        }
917
918        Ok(Self {
919            columns,
920            cek_table: Some(cek_table),
921        })
922    }
923
924    /// Decode a single column definition with Always Encrypted support.
925    ///
926    /// For encrypted columns (flags & 0x0800), parses CryptoMetadata after the type info.
927    fn decode_column_encrypted(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
928        if src.remaining() < 7 {
929            return Err(ProtocolError::UnexpectedEof);
930        }
931
932        let user_type = src.get_u32_le();
933        let flags = src.get_u16_le();
934        let col_type = src.get_u8();
935
936        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
937
938        // Parse type-specific metadata (for encrypted columns, this is the transport type)
939        let type_info = decode_type_info(src, type_id, col_type)?;
940
941        // Parse CryptoMetadata if the column is encrypted
942        let crypto_metadata = if crate::crypto::is_column_encrypted(flags) {
943            Some(crate::crypto::CryptoMetadata::decode(src)?)
944        } else {
945            None
946        };
947
948        // Read column name
949        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
950
951        Ok(ColumnData {
952            name,
953            type_id,
954            col_type,
955            flags,
956            user_type,
957            type_info,
958            crypto_metadata,
959        })
960    }
961
962    /// Get the number of columns.
963    #[must_use]
964    pub fn column_count(&self) -> usize {
965        self.columns.len()
966    }
967
968    /// Check if this represents no metadata.
969    #[must_use]
970    pub fn is_empty(&self) -> bool {
971        self.columns.is_empty()
972    }
973}
974
975impl ColumnData {
976    /// Check if this column is nullable.
977    #[must_use]
978    pub fn is_nullable(&self) -> bool {
979        (self.flags & 0x0001) != 0
980    }
981
982    /// Get the fixed size in bytes for this column, if applicable.
983    ///
984    /// Returns `None` for variable-length types.
985    #[must_use]
986    pub fn fixed_size(&self) -> Option<usize> {
987        match self.type_id {
988            TypeId::Null => Some(0),
989            TypeId::Int1 | TypeId::Bit => Some(1),
990            TypeId::Int2 => Some(2),
991            TypeId::Int4 => Some(4),
992            TypeId::Int8 => Some(8),
993            TypeId::Float4 => Some(4),
994            TypeId::Float8 => Some(8),
995            TypeId::Money => Some(8),
996            TypeId::Money4 => Some(4),
997            TypeId::DateTime => Some(8),
998            TypeId::DateTime4 => Some(4),
999            TypeId::Date => Some(3),
1000            _ => None,
1001        }
1002    }
1003}
1004
1005// =============================================================================
1006// Row Parsing Implementation
1007// =============================================================================
1008
1009impl RawRow {
1010    /// Decode a ROW token from bytes.
1011    ///
1012    /// This function requires the column metadata to know how to parse the row.
1013    /// The row data is stored as raw bytes for later parsing.
1014    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1015        let mut data = bytes::BytesMut::new();
1016
1017        for col in &metadata.columns {
1018            Self::decode_column_value(src, col, &mut data)?;
1019        }
1020
1021        Ok(Self {
1022            data: data.freeze(),
1023        })
1024    }
1025
1026    /// Decode a single column value and append to the output buffer.
1027    fn decode_column_value(
1028        src: &mut impl Buf,
1029        col: &ColumnData,
1030        dst: &mut bytes::BytesMut,
1031    ) -> Result<(), ProtocolError> {
1032        match col.type_id {
1033            // Fixed-length types
1034            TypeId::Null => {
1035                // No data
1036            }
1037            TypeId::Int1 | TypeId::Bit => {
1038                if src.remaining() < 1 {
1039                    return Err(ProtocolError::UnexpectedEof);
1040                }
1041                dst.extend_from_slice(&[src.get_u8()]);
1042            }
1043            TypeId::Int2 => {
1044                if src.remaining() < 2 {
1045                    return Err(ProtocolError::UnexpectedEof);
1046                }
1047                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
1048            }
1049            TypeId::Int4 => {
1050                if src.remaining() < 4 {
1051                    return Err(ProtocolError::UnexpectedEof);
1052                }
1053                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1054            }
1055            TypeId::Int8 => {
1056                if src.remaining() < 8 {
1057                    return Err(ProtocolError::UnexpectedEof);
1058                }
1059                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1060            }
1061            TypeId::Float4 => {
1062                if src.remaining() < 4 {
1063                    return Err(ProtocolError::UnexpectedEof);
1064                }
1065                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1066            }
1067            TypeId::Float8 => {
1068                if src.remaining() < 8 {
1069                    return Err(ProtocolError::UnexpectedEof);
1070                }
1071                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1072            }
1073            TypeId::Money => {
1074                if src.remaining() < 8 {
1075                    return Err(ProtocolError::UnexpectedEof);
1076                }
1077                let hi = src.get_u32_le();
1078                let lo = src.get_u32_le();
1079                dst.extend_from_slice(&hi.to_le_bytes());
1080                dst.extend_from_slice(&lo.to_le_bytes());
1081            }
1082            TypeId::Money4 => {
1083                if src.remaining() < 4 {
1084                    return Err(ProtocolError::UnexpectedEof);
1085                }
1086                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1087            }
1088            TypeId::DateTime => {
1089                if src.remaining() < 8 {
1090                    return Err(ProtocolError::UnexpectedEof);
1091                }
1092                let days = src.get_u32_le();
1093                let time = src.get_u32_le();
1094                dst.extend_from_slice(&days.to_le_bytes());
1095                dst.extend_from_slice(&time.to_le_bytes());
1096            }
1097            TypeId::DateTime4 => {
1098                if src.remaining() < 4 {
1099                    return Err(ProtocolError::UnexpectedEof);
1100                }
1101                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1102            }
1103            // DATE type uses 1-byte length prefix (can be NULL)
1104            TypeId::Date => {
1105                Self::decode_bytelen_type(src, dst)?;
1106            }
1107
1108            // Variable-length nullable types (length-prefixed)
1109            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
1110                Self::decode_bytelen_type(src, dst)?;
1111            }
1112
1113            TypeId::Guid => {
1114                Self::decode_bytelen_type(src, dst)?;
1115            }
1116
1117            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1118                Self::decode_bytelen_type(src, dst)?;
1119            }
1120
1121            // Old-style byte-length strings
1122            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
1123                Self::decode_bytelen_type(src, dst)?;
1124            }
1125
1126            // 2-byte length strings (or PLP for MAX types)
1127            TypeId::BigVarChar | TypeId::BigVarBinary => {
1128                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
1129                if col.type_info.max_length == Some(0xFFFF) {
1130                    Self::decode_plp_type(src, dst)?;
1131                } else {
1132                    Self::decode_ushortlen_type(src, dst)?;
1133                }
1134            }
1135
1136            // Fixed-length types that don't have MAX variants
1137            TypeId::BigChar | TypeId::BigBinary => {
1138                Self::decode_ushortlen_type(src, dst)?;
1139            }
1140
1141            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
1142            TypeId::NVarChar => {
1143                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
1144                if col.type_info.max_length == Some(0xFFFF) {
1145                    Self::decode_plp_type(src, dst)?;
1146                } else {
1147                    Self::decode_ushortlen_type(src, dst)?;
1148                }
1149            }
1150
1151            // Fixed-length NCHAR doesn't have MAX variant
1152            TypeId::NChar => {
1153                Self::decode_ushortlen_type(src, dst)?;
1154            }
1155
1156            // Time types with scale
1157            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
1158                Self::decode_bytelen_type(src, dst)?;
1159            }
1160
1161            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
1162            TypeId::Text | TypeId::NText | TypeId::Image => {
1163                Self::decode_textptr_type(src, dst)?;
1164            }
1165
1166            // XML - uses actual PLP format
1167            TypeId::Xml => {
1168                Self::decode_plp_type(src, dst)?;
1169            }
1170
1171            // Complex types
1172            TypeId::Variant => {
1173                Self::decode_intlen_type(src, dst)?;
1174            }
1175
1176            TypeId::Udt => {
1177                // UDT uses PLP encoding
1178                Self::decode_plp_type(src, dst)?;
1179            }
1180
1181            TypeId::Tvp => {
1182                // TVP not supported in row data
1183                return Err(ProtocolError::InvalidTokenType(col.col_type));
1184            }
1185        }
1186
1187        Ok(())
1188    }
1189
1190    /// Decode a 1-byte length-prefixed value.
1191    fn decode_bytelen_type(
1192        src: &mut impl Buf,
1193        dst: &mut bytes::BytesMut,
1194    ) -> Result<(), ProtocolError> {
1195        if src.remaining() < 1 {
1196            return Err(ProtocolError::UnexpectedEof);
1197        }
1198        let len = src.get_u8() as usize;
1199        if len == 0xFF {
1200            // NULL value - store as zero-length with NULL marker
1201            dst.extend_from_slice(&[0xFF]);
1202        } else if len == 0 {
1203            // Empty value
1204            dst.extend_from_slice(&[0x00]);
1205        } else {
1206            if src.remaining() < len {
1207                return Err(ProtocolError::UnexpectedEof);
1208            }
1209            dst.extend_from_slice(&[len as u8]);
1210            for _ in 0..len {
1211                dst.extend_from_slice(&[src.get_u8()]);
1212            }
1213        }
1214        Ok(())
1215    }
1216
1217    /// Decode a 2-byte length-prefixed value.
1218    fn decode_ushortlen_type(
1219        src: &mut impl Buf,
1220        dst: &mut bytes::BytesMut,
1221    ) -> Result<(), ProtocolError> {
1222        if src.remaining() < 2 {
1223            return Err(ProtocolError::UnexpectedEof);
1224        }
1225        let len = src.get_u16_le() as usize;
1226        if len == 0xFFFF {
1227            // NULL value
1228            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1229        } else if len == 0 {
1230            // Empty value
1231            dst.extend_from_slice(&0u16.to_le_bytes());
1232        } else {
1233            if src.remaining() < len {
1234                return Err(ProtocolError::UnexpectedEof);
1235            }
1236            dst.extend_from_slice(&(len as u16).to_le_bytes());
1237            for _ in 0..len {
1238                dst.extend_from_slice(&[src.get_u8()]);
1239            }
1240        }
1241        Ok(())
1242    }
1243
1244    /// Decode a 4-byte length-prefixed value.
1245    fn decode_intlen_type(
1246        src: &mut impl Buf,
1247        dst: &mut bytes::BytesMut,
1248    ) -> Result<(), ProtocolError> {
1249        if src.remaining() < 4 {
1250            return Err(ProtocolError::UnexpectedEof);
1251        }
1252        let len = src.get_u32_le() as usize;
1253        if len == 0xFFFFFFFF {
1254            // NULL value
1255            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1256        } else if len == 0 {
1257            // Empty value
1258            dst.extend_from_slice(&0u32.to_le_bytes());
1259        } else {
1260            if src.remaining() < len {
1261                return Err(ProtocolError::UnexpectedEof);
1262            }
1263            dst.extend_from_slice(&(len as u32).to_le_bytes());
1264            for _ in 0..len {
1265                dst.extend_from_slice(&[src.get_u8()]);
1266            }
1267        }
1268        Ok(())
1269    }
1270
1271    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1272    ///
1273    /// These deprecated LOB types use a special format:
1274    /// - 1 byte: textptr_len (0 = NULL)
1275    /// - textptr_len bytes: textptr (if not NULL)
1276    /// - 8 bytes: timestamp (if not NULL)
1277    /// - 4 bytes: data length (if not NULL)
1278    /// - data_len bytes: the actual data (if not NULL)
1279    ///
1280    /// We convert this to PLP format for the client to parse:
1281    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1282    /// - 4 bytes: chunk length (= data length)
1283    /// - chunk data
1284    /// - 4 bytes: 0 (terminator)
1285    fn decode_textptr_type(
1286        src: &mut impl Buf,
1287        dst: &mut bytes::BytesMut,
1288    ) -> Result<(), ProtocolError> {
1289        if src.remaining() < 1 {
1290            return Err(ProtocolError::UnexpectedEof);
1291        }
1292
1293        let textptr_len = src.get_u8() as usize;
1294
1295        if textptr_len == 0 {
1296            // NULL value - write PLP NULL marker
1297            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1298            return Ok(());
1299        }
1300
1301        // Skip textptr bytes
1302        if src.remaining() < textptr_len {
1303            return Err(ProtocolError::UnexpectedEof);
1304        }
1305        src.advance(textptr_len);
1306
1307        // Skip 8-byte timestamp
1308        if src.remaining() < 8 {
1309            return Err(ProtocolError::UnexpectedEof);
1310        }
1311        src.advance(8);
1312
1313        // Read data length
1314        if src.remaining() < 4 {
1315            return Err(ProtocolError::UnexpectedEof);
1316        }
1317        let data_len = src.get_u32_le() as usize;
1318
1319        if src.remaining() < data_len {
1320            return Err(ProtocolError::UnexpectedEof);
1321        }
1322
1323        // Write in PLP format for client parsing:
1324        // - 8 bytes: total length
1325        // - 4 bytes: chunk length
1326        // - chunk data
1327        // - 4 bytes: 0 (terminator)
1328        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1329        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1330        for _ in 0..data_len {
1331            dst.extend_from_slice(&[src.get_u8()]);
1332        }
1333        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1334
1335        Ok(())
1336    }
1337
1338    /// Decode a PLP (Partially Length-Prefixed) value.
1339    ///
1340    /// PLP format:
1341    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1342    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1343    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1344        if src.remaining() < 8 {
1345            return Err(ProtocolError::UnexpectedEof);
1346        }
1347
1348        let total_len = src.get_u64_le();
1349
1350        // Store the total length marker
1351        dst.extend_from_slice(&total_len.to_le_bytes());
1352
1353        if total_len == 0xFFFFFFFFFFFFFFFF {
1354            // NULL value - no more data
1355            return Ok(());
1356        }
1357
1358        // Read chunks until terminator
1359        loop {
1360            if src.remaining() < 4 {
1361                return Err(ProtocolError::UnexpectedEof);
1362            }
1363            let chunk_len = src.get_u32_le() as usize;
1364            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1365
1366            if chunk_len == 0 {
1367                // End of PLP data
1368                break;
1369            }
1370
1371            if src.remaining() < chunk_len {
1372                return Err(ProtocolError::UnexpectedEof);
1373            }
1374
1375            for _ in 0..chunk_len {
1376                dst.extend_from_slice(&[src.get_u8()]);
1377            }
1378        }
1379
1380        Ok(())
1381    }
1382}
1383
1384// =============================================================================
1385// NbcRow Parsing Implementation
1386// =============================================================================
1387
1388impl NbcRow {
1389    /// Decode an NBCROW token from bytes.
1390    ///
1391    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1392    /// columns are NULL, followed by only the non-NULL values.
1393    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1394        let col_count = metadata.columns.len();
1395        let bitmap_len = col_count.div_ceil(8);
1396
1397        if src.remaining() < bitmap_len {
1398            return Err(ProtocolError::UnexpectedEof);
1399        }
1400
1401        // Read null bitmap
1402        let mut null_bitmap = vec![0u8; bitmap_len];
1403        for byte in &mut null_bitmap {
1404            *byte = src.get_u8();
1405        }
1406
1407        // Read non-null values
1408        let mut data = bytes::BytesMut::new();
1409
1410        for (i, col) in metadata.columns.iter().enumerate() {
1411            let byte_idx = i / 8;
1412            let bit_idx = i % 8;
1413            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1414
1415            if !is_null {
1416                // Read the value - for NBCROW, we read without the length prefix
1417                // for fixed-length types, and with length prefix for variable types
1418                RawRow::decode_column_value(src, col, &mut data)?;
1419            }
1420        }
1421
1422        Ok(Self {
1423            null_bitmap,
1424            data: data.freeze(),
1425        })
1426    }
1427
1428    /// Check if a column at the given index is NULL.
1429    #[must_use]
1430    pub fn is_null(&self, column_index: usize) -> bool {
1431        let byte_idx = column_index / 8;
1432        let bit_idx = column_index % 8;
1433        if byte_idx < self.null_bitmap.len() {
1434            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1435        } else {
1436            true // Out of bounds = NULL
1437        }
1438    }
1439}
1440
1441// =============================================================================
1442// ReturnValue Parsing Implementation
1443// =============================================================================
1444
1445impl ReturnValue {
1446    /// Decode a RETURNVALUE token from bytes.
1447    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1448        // MS-TDS §2.2.7.18: the RETURNVALUE token has no length prefix —
1449        // it begins directly with the 2-byte ParamOrdinal. The previous
1450        // spurious 2-byte read consumed the ordinal and shifted every
1451        // subsequent field, leaving the stream parser two bytes ahead and
1452        // reading value bytes as the next token type (e.g. `0x74` from a
1453        // Unicode name fragment was misread as an unknown token).
1454        if src.remaining() < 2 {
1455            return Err(ProtocolError::UnexpectedEof);
1456        }
1457        let param_ordinal = src.get_u16_le();
1458
1459        // Parameter name (B_VARCHAR)
1460        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1461
1462        // Status (1 byte)
1463        if src.remaining() < 1 {
1464            return Err(ProtocolError::UnexpectedEof);
1465        }
1466        let status = src.get_u8();
1467
1468        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1469        if src.remaining() < 7 {
1470            return Err(ProtocolError::UnexpectedEof);
1471        }
1472        let user_type = src.get_u32_le();
1473        let flags = src.get_u16_le();
1474        let col_type = src.get_u8();
1475
1476        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
1477
1478        // Parse type info
1479        let type_info = decode_type_info(src, type_id, col_type)?;
1480
1481        // Read the value data
1482        let mut value_buf = bytes::BytesMut::new();
1483
1484        // Create a temporary column for value parsing
1485        let temp_col = ColumnData {
1486            name: String::new(),
1487            type_id,
1488            col_type,
1489            flags,
1490            user_type,
1491            type_info: type_info.clone(),
1492            crypto_metadata: None,
1493        };
1494
1495        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1496
1497        Ok(Self {
1498            param_ordinal,
1499            param_name,
1500            status,
1501            user_type,
1502            flags,
1503            col_type,
1504            type_info,
1505            value: value_buf.freeze(),
1506        })
1507    }
1508}
1509
1510// =============================================================================
1511// SessionState Parsing Implementation
1512// =============================================================================
1513
1514impl SessionState {
1515    /// Decode a SESSIONSTATE token from bytes.
1516    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1517        if src.remaining() < 4 {
1518            return Err(ProtocolError::UnexpectedEof);
1519        }
1520
1521        let length = src.get_u32_le() as usize;
1522
1523        if src.remaining() < length {
1524            return Err(ProtocolError::IncompletePacket {
1525                expected: length,
1526                actual: src.remaining(),
1527            });
1528        }
1529
1530        let data = src.copy_to_bytes(length);
1531
1532        Ok(Self { data })
1533    }
1534}
1535
1536// =============================================================================
1537// Token Parsing Implementation
1538// =============================================================================
1539
1540/// Done token status flags bit positions.
1541mod done_status_bits {
1542    pub const DONE_MORE: u16 = 0x0001;
1543    pub const DONE_ERROR: u16 = 0x0002;
1544    pub const DONE_INXACT: u16 = 0x0004;
1545    pub const DONE_COUNT: u16 = 0x0010;
1546    pub const DONE_ATTN: u16 = 0x0020;
1547    pub const DONE_SRVERROR: u16 = 0x0100;
1548}
1549
1550impl DoneStatus {
1551    /// Parse done status from raw bits.
1552    #[must_use]
1553    pub fn from_bits(bits: u16) -> Self {
1554        use done_status_bits::*;
1555        Self {
1556            more: (bits & DONE_MORE) != 0,
1557            error: (bits & DONE_ERROR) != 0,
1558            in_xact: (bits & DONE_INXACT) != 0,
1559            count: (bits & DONE_COUNT) != 0,
1560            attn: (bits & DONE_ATTN) != 0,
1561            srverror: (bits & DONE_SRVERROR) != 0,
1562        }
1563    }
1564
1565    /// Convert to raw bits.
1566    #[must_use]
1567    pub fn to_bits(&self) -> u16 {
1568        use done_status_bits::*;
1569        let mut bits = 0u16;
1570        if self.more {
1571            bits |= DONE_MORE;
1572        }
1573        if self.error {
1574            bits |= DONE_ERROR;
1575        }
1576        if self.in_xact {
1577            bits |= DONE_INXACT;
1578        }
1579        if self.count {
1580            bits |= DONE_COUNT;
1581        }
1582        if self.attn {
1583            bits |= DONE_ATTN;
1584        }
1585        if self.srverror {
1586            bits |= DONE_SRVERROR;
1587        }
1588        bits
1589    }
1590}
1591
1592impl Done {
1593    /// Size of the DONE token in bytes (excluding token type byte).
1594    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1595
1596    /// Decode a DONE token from bytes.
1597    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1598        if src.remaining() < Self::SIZE {
1599            return Err(ProtocolError::IncompletePacket {
1600                expected: Self::SIZE,
1601                actual: src.remaining(),
1602            });
1603        }
1604
1605        let status = DoneStatus::from_bits(src.get_u16_le());
1606        let cur_cmd = src.get_u16_le();
1607        let row_count = src.get_u64_le();
1608
1609        Ok(Self {
1610            status,
1611            cur_cmd,
1612            row_count,
1613        })
1614    }
1615
1616    /// Encode the DONE token to bytes.
1617    pub fn encode(&self, dst: &mut impl BufMut) {
1618        dst.put_u8(TokenType::Done as u8);
1619        dst.put_u16_le(self.status.to_bits());
1620        dst.put_u16_le(self.cur_cmd);
1621        dst.put_u64_le(self.row_count);
1622    }
1623
1624    /// Check if more results follow this DONE token.
1625    #[must_use]
1626    pub const fn has_more(&self) -> bool {
1627        self.status.more
1628    }
1629
1630    /// Check if an error occurred.
1631    #[must_use]
1632    pub const fn has_error(&self) -> bool {
1633        self.status.error
1634    }
1635
1636    /// Check if the row count is valid.
1637    #[must_use]
1638    pub const fn has_count(&self) -> bool {
1639        self.status.count
1640    }
1641}
1642
1643impl DoneProc {
1644    /// Size of the DONEPROC token in bytes (excluding token type byte).
1645    pub const SIZE: usize = 12;
1646
1647    /// Decode a DONEPROC token from bytes.
1648    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1649        if src.remaining() < Self::SIZE {
1650            return Err(ProtocolError::IncompletePacket {
1651                expected: Self::SIZE,
1652                actual: src.remaining(),
1653            });
1654        }
1655
1656        let status = DoneStatus::from_bits(src.get_u16_le());
1657        let cur_cmd = src.get_u16_le();
1658        let row_count = src.get_u64_le();
1659
1660        Ok(Self {
1661            status,
1662            cur_cmd,
1663            row_count,
1664        })
1665    }
1666
1667    /// Encode the DONEPROC token to bytes.
1668    pub fn encode(&self, dst: &mut impl BufMut) {
1669        dst.put_u8(TokenType::DoneProc as u8);
1670        dst.put_u16_le(self.status.to_bits());
1671        dst.put_u16_le(self.cur_cmd);
1672        dst.put_u64_le(self.row_count);
1673    }
1674}
1675
1676impl DoneInProc {
1677    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1678    pub const SIZE: usize = 12;
1679
1680    /// Decode a DONEINPROC token from bytes.
1681    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1682        if src.remaining() < Self::SIZE {
1683            return Err(ProtocolError::IncompletePacket {
1684                expected: Self::SIZE,
1685                actual: src.remaining(),
1686            });
1687        }
1688
1689        let status = DoneStatus::from_bits(src.get_u16_le());
1690        let cur_cmd = src.get_u16_le();
1691        let row_count = src.get_u64_le();
1692
1693        Ok(Self {
1694            status,
1695            cur_cmd,
1696            row_count,
1697        })
1698    }
1699
1700    /// Encode the DONEINPROC token to bytes.
1701    pub fn encode(&self, dst: &mut impl BufMut) {
1702        dst.put_u8(TokenType::DoneInProc as u8);
1703        dst.put_u16_le(self.status.to_bits());
1704        dst.put_u16_le(self.cur_cmd);
1705        dst.put_u64_le(self.row_count);
1706    }
1707}
1708
1709impl ServerError {
1710    /// Decode an ERROR token from bytes.
1711    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1712        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1713        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1714        if src.remaining() < 2 {
1715            return Err(ProtocolError::UnexpectedEof);
1716        }
1717
1718        let _length = src.get_u16_le();
1719
1720        if src.remaining() < 6 {
1721            return Err(ProtocolError::UnexpectedEof);
1722        }
1723
1724        let number = src.get_i32_le();
1725        let state = src.get_u8();
1726        let class = src.get_u8();
1727
1728        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1729        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1730        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1731
1732        if src.remaining() < 4 {
1733            return Err(ProtocolError::UnexpectedEof);
1734        }
1735        let line = src.get_i32_le();
1736
1737        Ok(Self {
1738            number,
1739            state,
1740            class,
1741            message,
1742            server,
1743            procedure,
1744            line,
1745        })
1746    }
1747
1748    /// Check if this is a fatal error (severity >= 20).
1749    #[must_use]
1750    pub const fn is_fatal(&self) -> bool {
1751        self.class >= 20
1752    }
1753
1754    /// Check if this error indicates the batch was aborted (severity >= 16).
1755    #[must_use]
1756    pub const fn is_batch_abort(&self) -> bool {
1757        self.class >= 16
1758    }
1759}
1760
1761impl ServerInfo {
1762    /// Decode an INFO token from bytes.
1763    ///
1764    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1765    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1766        if src.remaining() < 2 {
1767            return Err(ProtocolError::UnexpectedEof);
1768        }
1769
1770        let _length = src.get_u16_le();
1771
1772        if src.remaining() < 6 {
1773            return Err(ProtocolError::UnexpectedEof);
1774        }
1775
1776        let number = src.get_i32_le();
1777        let state = src.get_u8();
1778        let class = src.get_u8();
1779
1780        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1781        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1782        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1783
1784        if src.remaining() < 4 {
1785            return Err(ProtocolError::UnexpectedEof);
1786        }
1787        let line = src.get_i32_le();
1788
1789        Ok(Self {
1790            number,
1791            state,
1792            class,
1793            message,
1794            server,
1795            procedure,
1796            line,
1797        })
1798    }
1799}
1800
1801impl LoginAck {
1802    /// Decode a LOGINACK token from bytes.
1803    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1804        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1805        if src.remaining() < 2 {
1806            return Err(ProtocolError::UnexpectedEof);
1807        }
1808
1809        let _length = src.get_u16_le();
1810
1811        if src.remaining() < 5 {
1812            return Err(ProtocolError::UnexpectedEof);
1813        }
1814
1815        let interface = src.get_u8();
1816        let tds_version = src.get_u32_le();
1817        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1818
1819        if src.remaining() < 4 {
1820            return Err(ProtocolError::UnexpectedEof);
1821        }
1822        let prog_version = src.get_u32_le();
1823
1824        Ok(Self {
1825            interface,
1826            tds_version,
1827            prog_name,
1828            prog_version,
1829        })
1830    }
1831
1832    /// Get the TDS version as a `TdsVersion`.
1833    #[must_use]
1834    pub fn tds_version(&self) -> crate::version::TdsVersion {
1835        crate::version::TdsVersion::new(self.tds_version)
1836    }
1837}
1838
1839impl EnvChangeType {
1840    /// Create from raw byte value.
1841    pub fn from_u8(value: u8) -> Option<Self> {
1842        match value {
1843            1 => Some(Self::Database),
1844            2 => Some(Self::Language),
1845            3 => Some(Self::CharacterSet),
1846            4 => Some(Self::PacketSize),
1847            5 => Some(Self::UnicodeSortingLocalId),
1848            6 => Some(Self::UnicodeComparisonFlags),
1849            7 => Some(Self::SqlCollation),
1850            8 => Some(Self::BeginTransaction),
1851            9 => Some(Self::CommitTransaction),
1852            10 => Some(Self::RollbackTransaction),
1853            11 => Some(Self::EnlistDtcTransaction),
1854            12 => Some(Self::DefectTransaction),
1855            13 => Some(Self::RealTimeLogShipping),
1856            15 => Some(Self::PromoteTransaction),
1857            16 => Some(Self::TransactionManagerAddress),
1858            17 => Some(Self::TransactionEnded),
1859            18 => Some(Self::ResetConnectionCompletionAck),
1860            19 => Some(Self::UserInstanceStarted),
1861            20 => Some(Self::Routing),
1862            _ => None,
1863        }
1864    }
1865}
1866
1867impl EnvChange {
1868    /// Decode an ENVCHANGE token from bytes.
1869    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1870        if src.remaining() < 3 {
1871            return Err(ProtocolError::UnexpectedEof);
1872        }
1873
1874        let length = src.get_u16_le() as usize;
1875        if src.remaining() < length {
1876            return Err(ProtocolError::IncompletePacket {
1877                expected: length,
1878                actual: src.remaining(),
1879            });
1880        }
1881        let remaining_before = src.remaining();
1882
1883        let env_type_byte = src.get_u8();
1884        let env_type = EnvChangeType::from_u8(env_type_byte)
1885            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
1886
1887        let (new_value, old_value) = match env_type {
1888            EnvChangeType::Routing => {
1889                // Routing has special format
1890                let new_value = Self::decode_routing_value(src)?;
1891                let old_value = EnvChangeValue::Binary(Bytes::new());
1892                (new_value, old_value)
1893            }
1894            EnvChangeType::BeginTransaction
1895            | EnvChangeType::CommitTransaction
1896            | EnvChangeType::RollbackTransaction
1897            | EnvChangeType::EnlistDtcTransaction
1898            | EnvChangeType::SqlCollation => {
1899                // These use binary format per MS-TDS spec:
1900                // - Transaction tokens: transaction descriptor (8 bytes)
1901                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
1902                // The declared ENVCHANGE `length` can be shorter than this
1903                // branch needs (e.g. covers only the type byte), so the
1904                // length-prefix reads must be bounds-checked individually:
1905                // `get_u8` on an empty buffer panics. Match the branch's
1906                // existing graceful style — a missing prefix means empty.
1907                let new_len = if src.has_remaining() {
1908                    src.get_u8() as usize
1909                } else {
1910                    0
1911                };
1912                let new_value = if new_len > 0 && src.remaining() >= new_len {
1913                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
1914                } else {
1915                    EnvChangeValue::Binary(Bytes::new())
1916                };
1917
1918                let old_len = if src.has_remaining() {
1919                    src.get_u8() as usize
1920                } else {
1921                    0
1922                };
1923                let old_value = if old_len > 0 && src.remaining() >= old_len {
1924                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
1925                } else {
1926                    EnvChangeValue::Binary(Bytes::new())
1927                };
1928
1929                (new_value, old_value)
1930            }
1931            _ => {
1932                // String format for most env changes
1933                let new_value = read_b_varchar(src)
1934                    .map(EnvChangeValue::String)
1935                    .unwrap_or(EnvChangeValue::String(String::new()));
1936
1937                let old_value = read_b_varchar(src)
1938                    .map(EnvChangeValue::String)
1939                    .unwrap_or(EnvChangeValue::String(String::new()));
1940
1941                (new_value, old_value)
1942            }
1943        };
1944
1945        // The declared `length` frames the whole ENVCHANGE data. Value
1946        // decoders may legitimately consume less than that — e.g. the
1947        // Routing decoder reads only the NewValue, leaving the spec-mandated
1948        // zero-length OldValue (two bytes) behind. Skip to the frame
1949        // boundary so the leftover bytes are not misparsed as the next
1950        // token. (Decoders that tolerantly read *past* an under-declared
1951        // length are left as-is — see the transaction/collation branch.)
1952        let consumed = remaining_before - src.remaining();
1953        if consumed < length {
1954            src.advance(length - consumed);
1955        }
1956
1957        Ok(Self {
1958            env_type,
1959            new_value,
1960            old_value,
1961        })
1962    }
1963
1964    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
1965        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
1966        if src.remaining() < 2 {
1967            return Err(ProtocolError::UnexpectedEof);
1968        }
1969
1970        let _routing_len = src.get_u16_le();
1971
1972        if src.remaining() < 5 {
1973            return Err(ProtocolError::UnexpectedEof);
1974        }
1975
1976        let _protocol = src.get_u8();
1977        let port = src.get_u16_le();
1978        let server_len = src.get_u16_le() as usize;
1979
1980        // Read UTF-16LE server name
1981        if src.remaining() < server_len * 2 {
1982            return Err(ProtocolError::UnexpectedEof);
1983        }
1984
1985        let mut chars = Vec::with_capacity(server_len);
1986        for _ in 0..server_len {
1987            chars.push(src.get_u16_le());
1988        }
1989
1990        let host = String::from_utf16(&chars).map_err(|_| {
1991            ProtocolError::StringEncoding(
1992                #[cfg(feature = "std")]
1993                "invalid UTF-16 in routing hostname".to_string(),
1994                #[cfg(not(feature = "std"))]
1995                "invalid UTF-16 in routing hostname",
1996            )
1997        })?;
1998
1999        Ok(EnvChangeValue::Routing { host, port })
2000    }
2001
2002    /// Check if this is a routing redirect.
2003    #[must_use]
2004    pub fn is_routing(&self) -> bool {
2005        self.env_type == EnvChangeType::Routing
2006    }
2007
2008    /// Get routing information if this is a routing change.
2009    #[must_use]
2010    pub fn routing_info(&self) -> Option<(&str, u16)> {
2011        if let EnvChangeValue::Routing { host, port } = &self.new_value {
2012            Some((host, *port))
2013        } else {
2014            None
2015        }
2016    }
2017
2018    /// Get the new database name if this is a database change.
2019    #[must_use]
2020    pub fn new_database(&self) -> Option<&str> {
2021        if self.env_type == EnvChangeType::Database {
2022            if let EnvChangeValue::String(s) = &self.new_value {
2023                return Some(s);
2024            }
2025        }
2026        None
2027    }
2028}
2029
2030impl Order {
2031    /// Decode an ORDER token from bytes.
2032    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2033        if src.remaining() < 2 {
2034            return Err(ProtocolError::UnexpectedEof);
2035        }
2036
2037        let length = src.get_u16_le() as usize;
2038        let column_count = length / 2;
2039
2040        if src.remaining() < length {
2041            return Err(ProtocolError::IncompletePacket {
2042                expected: length,
2043                actual: src.remaining(),
2044            });
2045        }
2046
2047        let mut columns = Vec::with_capacity(column_count);
2048        for _ in 0..column_count {
2049            columns.push(src.get_u16_le());
2050        }
2051
2052        Ok(Self { columns })
2053    }
2054}
2055
2056impl FeatureExtAck {
2057    /// Feature terminator byte.
2058    pub const TERMINATOR: u8 = 0xFF;
2059
2060    /// Decode a FEATUREEXTACK token from bytes.
2061    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2062        let mut features = Vec::new();
2063
2064        loop {
2065            if !src.has_remaining() {
2066                return Err(ProtocolError::UnexpectedEof);
2067            }
2068
2069            let feature_id = src.get_u8();
2070            if feature_id == Self::TERMINATOR {
2071                break;
2072            }
2073
2074            if src.remaining() < 4 {
2075                return Err(ProtocolError::UnexpectedEof);
2076            }
2077
2078            let data_len = src.get_u32_le() as usize;
2079
2080            if src.remaining() < data_len {
2081                return Err(ProtocolError::IncompletePacket {
2082                    expected: data_len,
2083                    actual: src.remaining(),
2084                });
2085            }
2086
2087            let data = src.copy_to_bytes(data_len);
2088            features.push(FeatureAck { feature_id, data });
2089        }
2090
2091        Ok(Self { features })
2092    }
2093}
2094
2095impl SspiToken {
2096    /// Decode an SSPI token from bytes.
2097    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2098        if src.remaining() < 2 {
2099            return Err(ProtocolError::UnexpectedEof);
2100        }
2101
2102        let length = src.get_u16_le() as usize;
2103
2104        if src.remaining() < length {
2105            return Err(ProtocolError::IncompletePacket {
2106                expected: length,
2107                actual: src.remaining(),
2108            });
2109        }
2110
2111        let data = src.copy_to_bytes(length);
2112        Ok(Self { data })
2113    }
2114}
2115
2116impl FedAuthInfo {
2117    /// Decode a FEDAUTHINFO token from bytes.
2118    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2119        if src.remaining() < 4 {
2120            return Err(ProtocolError::UnexpectedEof);
2121        }
2122
2123        let _length = src.get_u32_le();
2124
2125        if src.remaining() < 5 {
2126            return Err(ProtocolError::UnexpectedEof);
2127        }
2128
2129        let _count = src.get_u8();
2130
2131        // Read option data
2132        let mut sts_url = String::new();
2133        let mut spn = String::new();
2134
2135        // Parse info options until we have both
2136        while src.has_remaining() {
2137            if src.remaining() < 9 {
2138                break;
2139            }
2140
2141            let info_id = src.get_u8();
2142            let info_len = src.get_u32_le() as usize;
2143            let _info_offset = src.get_u32_le();
2144
2145            if src.remaining() < info_len {
2146                break;
2147            }
2148
2149            // Read UTF-16LE string
2150            let char_count = info_len / 2;
2151            let mut chars = Vec::with_capacity(char_count);
2152            for _ in 0..char_count {
2153                chars.push(src.get_u16_le());
2154            }
2155
2156            if let Ok(value) = String::from_utf16(&chars) {
2157                match info_id {
2158                    0x01 => spn = value,
2159                    0x02 => sts_url = value,
2160                    _ => {}
2161                }
2162            }
2163        }
2164
2165        Ok(Self { sts_url, spn })
2166    }
2167}
2168
2169// =============================================================================
2170// Token Parser
2171// =============================================================================
2172
2173/// Token stream parser.
2174///
2175/// Parses a stream of TDS tokens from a byte buffer.
2176///
2177/// # Basic vs Context-Aware Parsing
2178///
2179/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
2180/// Use [`next_token()`](TokenParser::next_token) for these.
2181///
2182/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
2183/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
2184/// for these.
2185///
2186/// # Example
2187///
2188/// ```rust,ignore
2189/// let mut parser = TokenParser::new(data);
2190/// let mut metadata = None;
2191///
2192/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
2193///     match token {
2194///         Token::ColMetaData(meta) => {
2195///             metadata = Some(meta);
2196///         }
2197///         Token::Row(row) => {
2198///             // Process row using metadata
2199///         }
2200///         Token::Done(done) => {
2201///             if !done.has_more() {
2202///                 break;
2203///             }
2204///         }
2205///         _ => {}
2206///     }
2207/// }
2208/// ```
2209pub struct TokenParser {
2210    data: Bytes,
2211    position: usize,
2212    /// Whether Always Encrypted was negotiated for this connection.
2213    /// When true, ColMetaData tokens are parsed with CekTable and per-column CryptoMetadata.
2214    encryption_enabled: bool,
2215}
2216
2217impl TokenParser {
2218    /// Create a new token parser from bytes.
2219    #[must_use]
2220    pub fn new(data: Bytes) -> Self {
2221        Self {
2222            data,
2223            position: 0,
2224            encryption_enabled: false,
2225        }
2226    }
2227
2228    /// Enable Always Encrypted metadata parsing.
2229    ///
2230    /// When enabled, ColMetaData tokens are parsed using the encrypted format
2231    /// which includes a CekTable and per-column CryptoMetadata.
2232    #[must_use]
2233    pub fn with_encryption(mut self, enabled: bool) -> Self {
2234        self.encryption_enabled = enabled;
2235        self
2236    }
2237
2238    /// Get remaining bytes in the buffer.
2239    #[must_use]
2240    pub fn remaining(&self) -> usize {
2241        self.data.len().saturating_sub(self.position)
2242    }
2243
2244    /// Check if there are more bytes to parse.
2245    #[must_use]
2246    pub fn has_remaining(&self) -> bool {
2247        self.position < self.data.len()
2248    }
2249
2250    /// Peek at the next token type without consuming it.
2251    #[must_use]
2252    pub fn peek_token_type(&self) -> Option<TokenType> {
2253        if self.position < self.data.len() {
2254            TokenType::from_u8(self.data[self.position])
2255        } else {
2256            None
2257        }
2258    }
2259
2260    /// Parse the next token from the stream.
2261    ///
2262    /// This method can only parse context-independent tokens. For tokens that
2263    /// require column metadata (ColMetaData, Row, NbcRow), use
2264    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2265    ///
2266    /// Returns `None` if no more tokens are available.
2267    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2268        self.next_token_with_metadata(None)
2269    }
2270
2271    /// Parse the next token with optional column metadata context.
2272    ///
2273    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2274    /// Without metadata, those tokens will return an error.
2275    ///
2276    /// Returns `None` if no more tokens are available.
2277    pub fn next_token_with_metadata(
2278        &mut self,
2279        metadata: Option<&ColMetaData>,
2280    ) -> Result<Option<Token>, ProtocolError> {
2281        if !self.has_remaining() {
2282            return Ok(None);
2283        }
2284
2285        let mut buf = &self.data[self.position..];
2286        let start_pos = self.position;
2287
2288        let token_type_byte = buf.get_u8();
2289        let token_type = TokenType::from_u8(token_type_byte);
2290
2291        let token = match token_type {
2292            Some(TokenType::Done) => {
2293                let done = Done::decode(&mut buf)?;
2294                Token::Done(done)
2295            }
2296            Some(TokenType::DoneProc) => {
2297                let done = DoneProc::decode(&mut buf)?;
2298                Token::DoneProc(done)
2299            }
2300            Some(TokenType::DoneInProc) => {
2301                let done = DoneInProc::decode(&mut buf)?;
2302                Token::DoneInProc(done)
2303            }
2304            Some(TokenType::Error) => {
2305                let error = ServerError::decode(&mut buf)?;
2306                Token::Error(error)
2307            }
2308            Some(TokenType::Info) => {
2309                let info = ServerInfo::decode(&mut buf)?;
2310                Token::Info(info)
2311            }
2312            Some(TokenType::LoginAck) => {
2313                let login_ack = LoginAck::decode(&mut buf)?;
2314                Token::LoginAck(login_ack)
2315            }
2316            Some(TokenType::EnvChange) => {
2317                let env_change = EnvChange::decode(&mut buf)?;
2318                Token::EnvChange(env_change)
2319            }
2320            Some(TokenType::Order) => {
2321                let order = Order::decode(&mut buf)?;
2322                Token::Order(order)
2323            }
2324            Some(TokenType::FeatureExtAck) => {
2325                let ack = FeatureExtAck::decode(&mut buf)?;
2326                Token::FeatureExtAck(ack)
2327            }
2328            Some(TokenType::Sspi) => {
2329                let sspi = SspiToken::decode(&mut buf)?;
2330                Token::Sspi(sspi)
2331            }
2332            Some(TokenType::FedAuthInfo) => {
2333                let info = FedAuthInfo::decode(&mut buf)?;
2334                Token::FedAuthInfo(info)
2335            }
2336            Some(TokenType::ReturnStatus) => {
2337                if buf.remaining() < 4 {
2338                    return Err(ProtocolError::UnexpectedEof);
2339                }
2340                let status = buf.get_i32_le();
2341                Token::ReturnStatus(status)
2342            }
2343            Some(TokenType::ColMetaData) => {
2344                let col_meta = if self.encryption_enabled {
2345                    ColMetaData::decode_encrypted(&mut buf)?
2346                } else {
2347                    ColMetaData::decode(&mut buf)?
2348                };
2349                Token::ColMetaData(col_meta)
2350            }
2351            Some(TokenType::Row) => {
2352                let meta = metadata.ok_or_else(|| {
2353                    ProtocolError::StringEncoding(
2354                        #[cfg(feature = "std")]
2355                        "Row token requires column metadata".to_string(),
2356                        #[cfg(not(feature = "std"))]
2357                        "Row token requires column metadata",
2358                    )
2359                })?;
2360                let row = RawRow::decode(&mut buf, meta)?;
2361                Token::Row(row)
2362            }
2363            Some(TokenType::NbcRow) => {
2364                let meta = metadata.ok_or_else(|| {
2365                    ProtocolError::StringEncoding(
2366                        #[cfg(feature = "std")]
2367                        "NbcRow token requires column metadata".to_string(),
2368                        #[cfg(not(feature = "std"))]
2369                        "NbcRow token requires column metadata",
2370                    )
2371                })?;
2372                let row = NbcRow::decode(&mut buf, meta)?;
2373                Token::NbcRow(row)
2374            }
2375            Some(TokenType::ReturnValue) => {
2376                let ret_val = ReturnValue::decode(&mut buf)?;
2377                Token::ReturnValue(ret_val)
2378            }
2379            Some(TokenType::SessionState) => {
2380                let session = SessionState::decode(&mut buf)?;
2381                Token::SessionState(session)
2382            }
2383            Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2384                // These tokens are rarely used and have complex formats.
2385                // Skip them by reading the length and advancing.
2386                if buf.remaining() < 2 {
2387                    return Err(ProtocolError::UnexpectedEof);
2388                }
2389                let length = buf.get_u16_le() as usize;
2390                if buf.remaining() < length {
2391                    return Err(ProtocolError::IncompletePacket {
2392                        expected: length,
2393                        actual: buf.remaining(),
2394                    });
2395                }
2396                // Skip the data
2397                buf.advance(length);
2398                // Recursively get the next token
2399                self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2400                return self.next_token_with_metadata(metadata);
2401            }
2402            None => {
2403                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2404            }
2405        };
2406
2407        // Update position based on how much was consumed
2408        let consumed = self.data.len() - start_pos - buf.remaining();
2409        self.position = start_pos + consumed;
2410
2411        Ok(Some(token))
2412    }
2413
2414    /// Skip the current token without fully parsing it.
2415    ///
2416    /// This is useful for skipping unknown or uninteresting tokens.
2417    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2418        if !self.has_remaining() {
2419            return Ok(());
2420        }
2421
2422        let token_type_byte = self.data[self.position];
2423        let token_type = TokenType::from_u8(token_type_byte);
2424
2425        // Calculate how many bytes to skip based on token type
2426        let skip_amount = match token_type {
2427            // Fixed-size tokens
2428            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2429                1 + Done::SIZE // token type + 12 bytes
2430            }
2431            Some(TokenType::ReturnStatus) => {
2432                1 + 4 // token type + 4 bytes
2433            }
2434            // Variable-length tokens with 2-byte length prefix
2435            Some(TokenType::Error)
2436            | Some(TokenType::Info)
2437            | Some(TokenType::LoginAck)
2438            | Some(TokenType::EnvChange)
2439            | Some(TokenType::Order)
2440            | Some(TokenType::Sspi)
2441            | Some(TokenType::ColInfo)
2442            | Some(TokenType::TabName)
2443            | Some(TokenType::Offset)
2444            | Some(TokenType::ReturnValue) => {
2445                if self.remaining() < 3 {
2446                    return Err(ProtocolError::UnexpectedEof);
2447                }
2448                let length = u16::from_le_bytes([
2449                    self.data[self.position + 1],
2450                    self.data[self.position + 2],
2451                ]) as usize;
2452                1 + 2 + length // token type + length prefix + data
2453            }
2454            // Tokens with 4-byte length prefix
2455            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2456                if self.remaining() < 5 {
2457                    return Err(ProtocolError::UnexpectedEof);
2458                }
2459                let length = u32::from_le_bytes([
2460                    self.data[self.position + 1],
2461                    self.data[self.position + 2],
2462                    self.data[self.position + 3],
2463                    self.data[self.position + 4],
2464                ]) as usize;
2465                1 + 4 + length
2466            }
2467            // FeatureExtAck has no length prefix - must parse
2468            Some(TokenType::FeatureExtAck) => {
2469                // Parse to find end
2470                let mut buf = &self.data[self.position + 1..];
2471                let _ = FeatureExtAck::decode(&mut buf)?;
2472                self.data.len() - self.position - buf.remaining()
2473            }
2474            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2475            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2476                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2477            }
2478            None => {
2479                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2480            }
2481        };
2482
2483        if self.remaining() < skip_amount {
2484            return Err(ProtocolError::UnexpectedEof);
2485        }
2486
2487        self.position += skip_amount;
2488        Ok(())
2489    }
2490
2491    /// Get the current position in the buffer.
2492    #[must_use]
2493    pub fn position(&self) -> usize {
2494        self.position
2495    }
2496
2497    /// Reset the parser to the beginning.
2498    pub fn reset(&mut self) {
2499        self.position = 0;
2500    }
2501}
2502
2503// =============================================================================
2504// Tests
2505// =============================================================================
2506
2507#[cfg(test)]
2508#[allow(clippy::unwrap_used, clippy::panic)]
2509mod tests {
2510    use super::*;
2511    use bytes::BytesMut;
2512
2513    #[test]
2514    fn test_done_roundtrip() {
2515        let done = Done {
2516            status: DoneStatus {
2517                more: false,
2518                error: false,
2519                in_xact: false,
2520                count: true,
2521                attn: false,
2522                srverror: false,
2523            },
2524            cur_cmd: 193, // SELECT
2525            row_count: 42,
2526        };
2527
2528        let mut buf = BytesMut::new();
2529        done.encode(&mut buf);
2530
2531        // Skip the token type byte
2532        let mut cursor = &buf[1..];
2533        let decoded = Done::decode(&mut cursor).unwrap();
2534
2535        assert_eq!(decoded.status.count, done.status.count);
2536        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2537        assert_eq!(decoded.row_count, done.row_count);
2538    }
2539
2540    #[test]
2541    fn test_done_status_bits() {
2542        let status = DoneStatus {
2543            more: true,
2544            error: true,
2545            in_xact: true,
2546            count: true,
2547            attn: false,
2548            srverror: false,
2549        };
2550
2551        let bits = status.to_bits();
2552        let restored = DoneStatus::from_bits(bits);
2553
2554        assert_eq!(status.more, restored.more);
2555        assert_eq!(status.error, restored.error);
2556        assert_eq!(status.in_xact, restored.in_xact);
2557        assert_eq!(status.count, restored.count);
2558    }
2559
2560    #[test]
2561    fn test_token_parser_done() {
2562        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2563        let data = Bytes::from_static(&[
2564            0xFD, // DONE token type
2565            0x10, 0x00, // status: DONE_COUNT
2566            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2567            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2568        ]);
2569
2570        let mut parser = TokenParser::new(data);
2571        let token = parser.next_token().unwrap().unwrap();
2572
2573        match token {
2574            Token::Done(done) => {
2575                assert!(done.status.count);
2576                assert!(!done.status.more);
2577                assert_eq!(done.cur_cmd, 193);
2578                assert_eq!(done.row_count, 5);
2579            }
2580            _ => panic!("Expected Done token"),
2581        }
2582
2583        // No more tokens
2584        assert!(parser.next_token().unwrap().is_none());
2585    }
2586
2587    #[test]
2588    fn test_env_change_type_from_u8() {
2589        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2590        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2591        assert_eq!(EnvChangeType::from_u8(100), None);
2592    }
2593
2594    /// A spec-faithful Routing ENVCHANGE (MS-TDS 2.2.7.9) carries a
2595    /// zero-length OldValue (two bytes) after the routing data. The decoder
2596    /// reads only the NewValue, so it must skip to the declared frame
2597    /// boundary — otherwise the leftover `00 00` is misparsed as the next
2598    /// token type and the rest of the login response is garbage. Azure SQL
2599    /// Gateway redirects send exactly this shape.
2600    #[test]
2601    fn test_env_change_routing_consumes_declared_length() {
2602        let host = "redirect.example";
2603        let host_utf16: Vec<u16> = host.encode_utf16().collect();
2604
2605        let mut data = BytesMut::new();
2606        // RoutingDataValue: length + protocol + port + server_len + server
2607        let routing_len = 1 + 2 + 2 + host_utf16.len() * 2;
2608        // ENVCHANGE length: type byte + routing length prefix + routing data
2609        // + zero-length OldValue
2610        let env_len = 1 + 2 + routing_len + 2;
2611        data.put_u16_le(env_len as u16);
2612        data.put_u8(20); // Routing
2613        data.put_u16_le(routing_len as u16);
2614        data.put_u8(0); // protocol: TCP
2615        data.put_u16_le(11000); // port
2616        data.put_u16_le(host_utf16.len() as u16);
2617        for c in &host_utf16 {
2618            data.put_u16_le(*c);
2619        }
2620        data.put_u16_le(0); // OldValue: zero-length US_VARBYTE
2621        // A trailing DONE token type byte that must remain for the next read.
2622        data.put_u8(0xFD);
2623
2624        let mut buf: &[u8] = &data;
2625        let env = EnvChange::decode(&mut buf).unwrap();
2626        assert_eq!(env.routing_info(), Some((host, 11000)));
2627        assert_eq!(
2628            buf,
2629            &[0xFD],
2630            "decode must consume exactly the declared ENVCHANGE frame"
2631        );
2632    }
2633
2634    #[test]
2635    fn hostile_env_change_binary_truncated_is_not_panic() {
2636        // length=1 covers only the type byte (0x08 = BeginTransaction, a
2637        // binary-format type); the new_len/old_len prefix reads then hit an
2638        // empty buffer. Must decode gracefully, never panic (found by the
2639        // parse_env_change and parse_token fuzz targets).
2640        let data = [0x01, 0x00, 0x08];
2641        let mut buf: &[u8] = &data;
2642        let env = EnvChange::decode(&mut buf).unwrap();
2643        assert_eq!(env.env_type, EnvChangeType::BeginTransaction);
2644    }
2645
2646    #[test]
2647    fn test_colmetadata_no_columns() {
2648        // No metadata marker (0xFFFF)
2649        let data = Bytes::from_static(&[0xFF, 0xFF]);
2650        let mut cursor: &[u8] = &data;
2651        let meta = ColMetaData::decode(&mut cursor).unwrap();
2652        assert!(meta.is_empty());
2653        assert_eq!(meta.column_count(), 0);
2654    }
2655
2656    #[test]
2657    fn test_colmetadata_single_int_column() {
2658        // COLMETADATA with 1 INT column
2659        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
2660        let mut data = BytesMut::new();
2661        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2662        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2663        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2664        data.extend_from_slice(&[0x38]); // TypeId::Int4
2665        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
2666        data.extend_from_slice(&[0x02]); // 2 characters
2667        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
2668
2669        let mut cursor: &[u8] = &data;
2670        let meta = ColMetaData::decode(&mut cursor).unwrap();
2671
2672        assert_eq!(meta.column_count(), 1);
2673        assert_eq!(meta.columns[0].name, "id");
2674        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2675        assert!(meta.columns[0].is_nullable());
2676    }
2677
2678    #[test]
2679    fn test_colmetadata_nvarchar_column() {
2680        // COLMETADATA with 1 NVARCHAR(50) column
2681        let mut data = BytesMut::new();
2682        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2683        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2684        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2685        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
2686        // Type info: max_length (2 bytes) + collation (5 bytes)
2687        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
2688        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
2689        // Column name "name"
2690        data.extend_from_slice(&[0x04]); // 4 characters
2691        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
2692
2693        let mut cursor: &[u8] = &data;
2694        let meta = ColMetaData::decode(&mut cursor).unwrap();
2695
2696        assert_eq!(meta.column_count(), 1);
2697        assert_eq!(meta.columns[0].name, "name");
2698        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
2699        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
2700        assert!(meta.columns[0].type_info.collation.is_some());
2701    }
2702
2703    #[test]
2704    fn test_raw_row_decode_int() {
2705        // Create metadata for a single INT column
2706        let metadata = ColMetaData {
2707            cek_table: None,
2708            columns: vec![ColumnData {
2709                name: "id".to_string(),
2710                type_id: TypeId::Int4,
2711                col_type: 0x38,
2712                flags: 0,
2713                user_type: 0,
2714                type_info: TypeInfo::default(),
2715                crypto_metadata: None,
2716            }],
2717        };
2718
2719        // Row data: just 4 bytes for the int value 42
2720        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
2721        let mut cursor: &[u8] = &data;
2722        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2723
2724        // The raw data should contain the 4 bytes
2725        assert_eq!(row.data.len(), 4);
2726        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
2727    }
2728
2729    #[test]
2730    fn test_raw_row_decode_nullable_int() {
2731        // Create metadata for a nullable INT column (IntN)
2732        let metadata = ColMetaData {
2733            cek_table: None,
2734            columns: vec![ColumnData {
2735                name: "id".to_string(),
2736                type_id: TypeId::IntN,
2737                col_type: 0x26,
2738                flags: 0x01, // nullable
2739                user_type: 0,
2740                type_info: TypeInfo {
2741                    max_length: Some(4),
2742                    ..Default::default()
2743                },
2744                crypto_metadata: None,
2745            }],
2746        };
2747
2748        // Row data with value: 1 byte length + 4 bytes value
2749        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
2750        let mut cursor: &[u8] = &data;
2751        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2752
2753        assert_eq!(row.data.len(), 5);
2754        assert_eq!(row.data[0], 4); // length
2755        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
2756    }
2757
2758    #[test]
2759    fn test_raw_row_decode_null_value() {
2760        // Create metadata for a nullable INT column (IntN)
2761        let metadata = ColMetaData {
2762            cek_table: None,
2763            columns: vec![ColumnData {
2764                name: "id".to_string(),
2765                type_id: TypeId::IntN,
2766                col_type: 0x26,
2767                flags: 0x01, // nullable
2768                user_type: 0,
2769                type_info: TypeInfo {
2770                    max_length: Some(4),
2771                    ..Default::default()
2772                },
2773                crypto_metadata: None,
2774            }],
2775        };
2776
2777        // NULL value: length = 0xFF (for bytelen types)
2778        let data = Bytes::from_static(&[0xFF]);
2779        let mut cursor: &[u8] = &data;
2780        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2781
2782        assert_eq!(row.data.len(), 1);
2783        assert_eq!(row.data[0], 0xFF); // NULL marker
2784    }
2785
2786    #[test]
2787    fn test_nbcrow_null_bitmap() {
2788        let row = NbcRow {
2789            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
2790            data: Bytes::new(),
2791        };
2792
2793        assert!(row.is_null(0));
2794        assert!(!row.is_null(1));
2795        assert!(row.is_null(2));
2796        assert!(!row.is_null(3));
2797    }
2798
2799    #[test]
2800    fn test_token_parser_colmetadata() {
2801        // Build a COLMETADATA token with 1 INT column
2802        let mut data = BytesMut::new();
2803        data.extend_from_slice(&[0x81]); // COLMETADATA token type
2804        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2805        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2806        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2807        data.extend_from_slice(&[0x38]); // TypeId::Int4
2808        data.extend_from_slice(&[0x02]); // column name length
2809        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
2810
2811        let mut parser = TokenParser::new(data.freeze());
2812        let token = parser.next_token().unwrap().unwrap();
2813
2814        match token {
2815            Token::ColMetaData(meta) => {
2816                assert_eq!(meta.column_count(), 1);
2817                assert_eq!(meta.columns[0].name, "id");
2818                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2819            }
2820            _ => panic!("Expected ColMetaData token"),
2821        }
2822    }
2823
2824    #[test]
2825    fn test_token_parser_row_with_metadata() {
2826        // Build metadata
2827        let metadata = ColMetaData {
2828            cek_table: None,
2829            columns: vec![ColumnData {
2830                name: "id".to_string(),
2831                type_id: TypeId::Int4,
2832                col_type: 0x38,
2833                flags: 0,
2834                user_type: 0,
2835                type_info: TypeInfo::default(),
2836                crypto_metadata: None,
2837            }],
2838        };
2839
2840        // Build ROW token
2841        let mut data = BytesMut::new();
2842        data.extend_from_slice(&[0xD1]); // ROW token type
2843        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2844
2845        let mut parser = TokenParser::new(data.freeze());
2846        let token = parser
2847            .next_token_with_metadata(Some(&metadata))
2848            .unwrap()
2849            .unwrap();
2850
2851        match token {
2852            Token::Row(row) => {
2853                assert_eq!(row.data.len(), 4);
2854            }
2855            _ => panic!("Expected Row token"),
2856        }
2857    }
2858
2859    #[test]
2860    fn test_token_parser_row_without_metadata_fails() {
2861        // Build ROW token
2862        let mut data = BytesMut::new();
2863        data.extend_from_slice(&[0xD1]); // ROW token type
2864        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2865
2866        let mut parser = TokenParser::new(data.freeze());
2867        let result = parser.next_token(); // No metadata provided
2868
2869        assert!(result.is_err());
2870    }
2871
2872    #[test]
2873    fn test_token_parser_peek() {
2874        let data = Bytes::from_static(&[
2875            0xFD, // DONE token type
2876            0x10, 0x00, // status
2877            0xC1, 0x00, // cur_cmd
2878            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
2879        ]);
2880
2881        let parser = TokenParser::new(data);
2882        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
2883    }
2884
2885    #[test]
2886    fn test_column_data_fixed_size() {
2887        let col = ColumnData {
2888            name: String::new(),
2889            type_id: TypeId::Int4,
2890            col_type: 0x38,
2891            flags: 0,
2892            user_type: 0,
2893            type_info: TypeInfo::default(),
2894            crypto_metadata: None,
2895        };
2896        assert_eq!(col.fixed_size(), Some(4));
2897
2898        let col2 = ColumnData {
2899            name: String::new(),
2900            type_id: TypeId::NVarChar,
2901            col_type: 0xE7,
2902            flags: 0,
2903            user_type: 0,
2904            type_info: TypeInfo::default(),
2905            crypto_metadata: None,
2906        };
2907        assert_eq!(col2.fixed_size(), None);
2908    }
2909
2910    // ========================================================================
2911    // End-to-End Decode Tests (Wire → Stored → Verification)
2912    // ========================================================================
2913    //
2914    // These tests verify that RawRow::decode_column_value correctly stores
2915    // column values in a format that can be parsed back.
2916
2917    #[test]
2918    fn test_decode_nvarchar_then_intn_roundtrip() {
2919        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
2920        // This tests the scenario from the MCP parameterized query
2921
2922        // Build wire data (what the server sends)
2923        let mut wire_data = BytesMut::new();
2924
2925        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
2926        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
2927        let word = "World";
2928        let utf16: Vec<u16> = word.encode_utf16().collect();
2929        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
2930        for code_unit in &utf16 {
2931            wire_data.put_u16_le(*code_unit);
2932        }
2933
2934        // Column 1: IntN 42 - 1-byte length prefix
2935        wire_data.put_u8(4); // 4 bytes for INT
2936        wire_data.put_i32_le(42);
2937
2938        // Build column metadata
2939        let metadata = ColMetaData {
2940            cek_table: None,
2941            columns: vec![
2942                ColumnData {
2943                    name: "greeting".to_string(),
2944                    type_id: TypeId::NVarChar,
2945                    col_type: 0xE7,
2946                    flags: 0x01,
2947                    user_type: 0,
2948                    type_info: TypeInfo {
2949                        max_length: Some(10), // non-MAX
2950                        precision: None,
2951                        scale: None,
2952                        collation: None,
2953                    },
2954                    crypto_metadata: None,
2955                },
2956                ColumnData {
2957                    name: "number".to_string(),
2958                    type_id: TypeId::IntN,
2959                    col_type: 0x26,
2960                    flags: 0x01,
2961                    user_type: 0,
2962                    type_info: TypeInfo {
2963                        max_length: Some(4),
2964                        precision: None,
2965                        scale: None,
2966                        collation: None,
2967                    },
2968                    crypto_metadata: None,
2969                },
2970            ],
2971        };
2972
2973        // Decode the wire data into stored format
2974        let mut wire_cursor = wire_data.freeze();
2975        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2976
2977        // Verify wire data was fully consumed
2978        assert_eq!(
2979            wire_cursor.remaining(),
2980            0,
2981            "wire data should be fully consumed"
2982        );
2983
2984        // Now parse the stored data
2985        let mut stored_cursor: &[u8] = &raw_row.data;
2986
2987        // Parse column 0 (NVarChar)
2988        // Stored format for non-MAX NVarChar: [2-byte len][data]
2989        assert!(
2990            stored_cursor.remaining() >= 2,
2991            "need at least 2 bytes for length"
2992        );
2993        let len0 = stored_cursor.get_u16_le() as usize;
2994        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
2995        assert!(
2996            stored_cursor.remaining() >= len0,
2997            "need {len0} bytes for data"
2998        );
2999
3000        // Read UTF-16LE and convert to string
3001        let mut utf16_read = Vec::new();
3002        for _ in 0..(len0 / 2) {
3003            utf16_read.push(stored_cursor.get_u16_le());
3004        }
3005        let string0 = String::from_utf16(&utf16_read).unwrap();
3006        assert_eq!(string0, "World", "column 0 should be 'World'");
3007
3008        // Parse column 1 (IntN)
3009        // Stored format for IntN: [1-byte len][data]
3010        assert!(
3011            stored_cursor.remaining() >= 1,
3012            "need at least 1 byte for length"
3013        );
3014        let len1 = stored_cursor.get_u8();
3015        assert_eq!(len1, 4, "IntN length should be 4");
3016        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
3017        let int1 = stored_cursor.get_i32_le();
3018        assert_eq!(int1, 42, "column 1 should be 42");
3019
3020        // Verify stored data was fully consumed
3021        assert_eq!(
3022            stored_cursor.remaining(),
3023            0,
3024            "stored data should be fully consumed"
3025        );
3026    }
3027
3028    #[test]
3029    fn test_decode_nvarchar_max_then_intn_roundtrip() {
3030        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
3031
3032        // Build wire data for PLP NVARCHAR(MAX) + IntN
3033        let mut wire_data = BytesMut::new();
3034
3035        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
3036        // PLP: 8-byte total length, then chunks
3037        let word = "Hello";
3038        let utf16: Vec<u16> = word.encode_utf16().collect();
3039        let byte_len = (utf16.len() * 2) as u64;
3040
3041        wire_data.put_u64_le(byte_len); // total length = 10
3042        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
3043        for code_unit in &utf16 {
3044            wire_data.put_u16_le(*code_unit);
3045        }
3046        wire_data.put_u32_le(0); // terminating zero-length chunk
3047
3048        // Column 1: IntN 99
3049        wire_data.put_u8(4);
3050        wire_data.put_i32_le(99);
3051
3052        // Build metadata with MAX type
3053        let metadata = ColMetaData {
3054            cek_table: None,
3055            columns: vec![
3056                ColumnData {
3057                    name: "text".to_string(),
3058                    type_id: TypeId::NVarChar,
3059                    col_type: 0xE7,
3060                    flags: 0x01,
3061                    user_type: 0,
3062                    type_info: TypeInfo {
3063                        max_length: Some(0xFFFF), // MAX indicator
3064                        precision: None,
3065                        scale: None,
3066                        collation: None,
3067                    },
3068                    crypto_metadata: None,
3069                },
3070                ColumnData {
3071                    name: "num".to_string(),
3072                    type_id: TypeId::IntN,
3073                    col_type: 0x26,
3074                    flags: 0x01,
3075                    user_type: 0,
3076                    type_info: TypeInfo {
3077                        max_length: Some(4),
3078                        precision: None,
3079                        scale: None,
3080                        collation: None,
3081                    },
3082                    crypto_metadata: None,
3083                },
3084            ],
3085        };
3086
3087        // Decode wire data
3088        let mut wire_cursor = wire_data.freeze();
3089        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
3090
3091        // Verify wire data was fully consumed
3092        assert_eq!(
3093            wire_cursor.remaining(),
3094            0,
3095            "wire data should be fully consumed"
3096        );
3097
3098        // Parse stored PLP data for column 0
3099        let mut stored_cursor: &[u8] = &raw_row.data;
3100
3101        // PLP stored format: [8-byte total][chunks...][4-byte 0]
3102        let total_len = stored_cursor.get_u64_le();
3103        assert_eq!(total_len, 10, "PLP total length should be 10");
3104
3105        let chunk_len = stored_cursor.get_u32_le();
3106        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
3107
3108        let mut utf16_read = Vec::new();
3109        for _ in 0..(chunk_len / 2) {
3110            utf16_read.push(stored_cursor.get_u16_le());
3111        }
3112        let string0 = String::from_utf16(&utf16_read).unwrap();
3113        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
3114
3115        let terminator = stored_cursor.get_u32_le();
3116        assert_eq!(terminator, 0, "PLP should end with 0");
3117
3118        // Parse IntN
3119        let len1 = stored_cursor.get_u8();
3120        assert_eq!(len1, 4);
3121        let int1 = stored_cursor.get_i32_le();
3122        assert_eq!(int1, 99, "column 1 should be 99");
3123
3124        // Verify fully consumed
3125        assert_eq!(
3126            stored_cursor.remaining(),
3127            0,
3128            "stored data should be fully consumed"
3129        );
3130    }
3131
3132    // ========================================================================
3133    // ReturnStatus Token Tests
3134    // ========================================================================
3135
3136    #[test]
3137    fn test_return_status_via_parser() {
3138        // RETURNSTATUS token: type (0x79) + value (i32 LE)
3139        let data = Bytes::from_static(&[
3140            0x79, // RETURNSTATUS token type
3141            0x00, 0x00, 0x00, 0x00, // return value = 0 (success)
3142        ]);
3143
3144        let mut parser = TokenParser::new(data);
3145        let token = parser.next_token().unwrap().unwrap();
3146
3147        match token {
3148            Token::ReturnStatus(status) => {
3149                assert_eq!(status, 0);
3150            }
3151            _ => panic!("Expected ReturnStatus token, got {token:?}"),
3152        }
3153
3154        assert!(parser.next_token().unwrap().is_none());
3155    }
3156
3157    #[test]
3158    fn test_return_status_nonzero() {
3159        // Return value = -6 (common for error returns)
3160        let mut buf = BytesMut::new();
3161        buf.put_u8(0x79); // RETURNSTATUS
3162        buf.put_i32_le(-6);
3163
3164        let mut parser = TokenParser::new(buf.freeze());
3165        let token = parser.next_token().unwrap().unwrap();
3166
3167        match token {
3168            Token::ReturnStatus(status) => {
3169                assert_eq!(status, -6);
3170            }
3171            _ => panic!("Expected ReturnStatus token"),
3172        }
3173    }
3174
3175    // ========================================================================
3176    // DoneProc Token Tests
3177    // ========================================================================
3178
3179    #[test]
3180    fn test_done_proc_roundtrip() {
3181        let done = DoneProc {
3182            status: DoneStatus {
3183                more: false,
3184                error: false,
3185                in_xact: false,
3186                count: true,
3187                attn: false,
3188                srverror: false,
3189            },
3190            cur_cmd: 0x00C6, // EXECUTE (198)
3191            row_count: 100,
3192        };
3193
3194        let mut buf = BytesMut::new();
3195        done.encode(&mut buf);
3196
3197        // Verify token type byte
3198        assert_eq!(buf[0], 0xFE);
3199
3200        // Skip token type byte and decode
3201        let mut cursor = &buf[1..];
3202        let decoded = DoneProc::decode(&mut cursor).unwrap();
3203
3204        assert!(decoded.status.count);
3205        assert!(!decoded.status.more);
3206        assert!(!decoded.status.error);
3207        assert_eq!(decoded.cur_cmd, 0x00C6);
3208        assert_eq!(decoded.row_count, 100);
3209    }
3210
3211    #[test]
3212    fn test_done_proc_via_parser() {
3213        let data = Bytes::from_static(&[
3214            0xFE, // DONEPROC token type
3215            0x00, 0x00, // status: no flags
3216            0xC6, 0x00, // cur_cmd: EXECUTE (198)
3217            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 0
3218        ]);
3219
3220        let mut parser = TokenParser::new(data);
3221        let token = parser.next_token().unwrap().unwrap();
3222
3223        match token {
3224            Token::DoneProc(done) => {
3225                assert!(!done.status.count);
3226                assert!(!done.status.more);
3227                assert_eq!(done.cur_cmd, 198);
3228                assert_eq!(done.row_count, 0);
3229            }
3230            _ => panic!("Expected DoneProc token"),
3231        }
3232    }
3233
3234    #[test]
3235    fn test_done_proc_with_error_flag() {
3236        let mut buf = BytesMut::new();
3237        buf.put_u8(0xFE); // DONEPROC
3238        buf.put_u16_le(0x0002); // status: DONE_ERROR
3239        buf.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3240        buf.put_u64_le(0); // row_count
3241
3242        let mut parser = TokenParser::new(buf.freeze());
3243        let token = parser.next_token().unwrap().unwrap();
3244
3245        match token {
3246            Token::DoneProc(done) => {
3247                assert!(done.status.error);
3248                assert!(!done.status.count);
3249                assert!(!done.status.more);
3250            }
3251            _ => panic!("Expected DoneProc token"),
3252        }
3253    }
3254
3255    // ========================================================================
3256    // DoneInProc Token Tests
3257    // ========================================================================
3258
3259    #[test]
3260    fn test_done_in_proc_roundtrip() {
3261        let done = DoneInProc {
3262            status: DoneStatus {
3263                more: true,
3264                error: false,
3265                in_xact: false,
3266                count: true,
3267                attn: false,
3268                srverror: false,
3269            },
3270            cur_cmd: 193, // SELECT
3271            row_count: 7,
3272        };
3273
3274        let mut buf = BytesMut::new();
3275        done.encode(&mut buf);
3276
3277        assert_eq!(buf[0], 0xFF);
3278
3279        let mut cursor = &buf[1..];
3280        let decoded = DoneInProc::decode(&mut cursor).unwrap();
3281
3282        assert!(decoded.status.more);
3283        assert!(decoded.status.count);
3284        assert!(!decoded.status.error);
3285        assert_eq!(decoded.cur_cmd, 193);
3286        assert_eq!(decoded.row_count, 7);
3287    }
3288
3289    #[test]
3290    fn test_done_in_proc_via_parser() {
3291        let data = Bytes::from_static(&[
3292            0xFF, // DONEINPROC token type
3293            0x11, 0x00, // status: MORE | COUNT
3294            0xC1, 0x00, // cur_cmd: SELECT (193)
3295            0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 3
3296        ]);
3297
3298        let mut parser = TokenParser::new(data);
3299        let token = parser.next_token().unwrap().unwrap();
3300
3301        match token {
3302            Token::DoneInProc(done) => {
3303                assert!(done.status.more);
3304                assert!(done.status.count);
3305                assert_eq!(done.cur_cmd, 193);
3306                assert_eq!(done.row_count, 3);
3307            }
3308            _ => panic!("Expected DoneInProc token"),
3309        }
3310    }
3311
3312    // ========================================================================
3313    // ServerError Token Tests
3314    // ========================================================================
3315
3316    #[test]
3317    fn test_server_error_decode() {
3318        // Build a realistic ERROR token (without the 0xAA type byte,
3319        // since decode() is called after the parser strips it).
3320        let mut buf = BytesMut::new();
3321
3322        // Construct the message fields first to compute length
3323        let msg_utf16: Vec<u16> = "Invalid column name 'foo'.".encode_utf16().collect();
3324        let srv_utf16: Vec<u16> = "SQLDB01".encode_utf16().collect();
3325        let proc_utf16: Vec<u16> = "".encode_utf16().collect();
3326
3327        // Length = number(4) + state(1) + class(1)
3328        //        + us_varchar(message): 2 + msg_utf16.len()*2
3329        //        + b_varchar(server): 1 + srv_utf16.len()*2
3330        //        + b_varchar(procedure): 1 + proc_utf16.len()*2
3331        //        + line(4)
3332        let length: u16 = (4
3333            + 1
3334            + 1
3335            + 2
3336            + (msg_utf16.len() * 2)
3337            + 1
3338            + (srv_utf16.len() * 2)
3339            + 1
3340            + (proc_utf16.len() * 2)
3341            + 4) as u16;
3342
3343        buf.put_u16_le(length);
3344        buf.put_i32_le(207); // error number: Invalid column
3345        buf.put_u8(1); // state
3346        buf.put_u8(16); // class (severity 16)
3347
3348        // Message (US_VARCHAR: 2-byte char count + UTF-16LE)
3349        buf.put_u16_le(msg_utf16.len() as u16);
3350        for &c in &msg_utf16 {
3351            buf.put_u16_le(c);
3352        }
3353
3354        // Server (B_VARCHAR: 1-byte char count + UTF-16LE)
3355        buf.put_u8(srv_utf16.len() as u8);
3356        for &c in &srv_utf16 {
3357            buf.put_u16_le(c);
3358        }
3359
3360        // Procedure (B_VARCHAR: empty)
3361        buf.put_u8(proc_utf16.len() as u8);
3362
3363        // Line number
3364        buf.put_i32_le(42);
3365
3366        let mut cursor = buf.freeze();
3367        let error = ServerError::decode(&mut cursor).unwrap();
3368
3369        assert_eq!(error.number, 207);
3370        assert_eq!(error.state, 1);
3371        assert_eq!(error.class, 16);
3372        assert_eq!(error.message, "Invalid column name 'foo'.");
3373        assert_eq!(error.server, "SQLDB01");
3374        assert_eq!(error.procedure, "");
3375        assert_eq!(error.line, 42);
3376    }
3377
3378    #[test]
3379    fn test_server_error_severity_helpers() {
3380        let fatal = ServerError {
3381            number: 4014,
3382            state: 1,
3383            class: 20,
3384            message: "Fatal error".to_string(),
3385            server: String::new(),
3386            procedure: String::new(),
3387            line: 0,
3388        };
3389        assert!(fatal.is_fatal());
3390        assert!(fatal.is_batch_abort());
3391
3392        let batch_abort = ServerError {
3393            number: 547,
3394            state: 0,
3395            class: 16,
3396            message: "Constraint violation".to_string(),
3397            server: String::new(),
3398            procedure: String::new(),
3399            line: 1,
3400        };
3401        assert!(!batch_abort.is_fatal());
3402        assert!(batch_abort.is_batch_abort());
3403
3404        let informational = ServerError {
3405            number: 5701,
3406            state: 2,
3407            class: 10,
3408            message: "Changed db context".to_string(),
3409            server: String::new(),
3410            procedure: String::new(),
3411            line: 0,
3412        };
3413        assert!(!informational.is_fatal());
3414        assert!(!informational.is_batch_abort());
3415    }
3416
3417    #[test]
3418    fn test_server_error_via_parser() {
3419        // Build an ERROR token with the 0xAA type byte for the parser
3420        let mut buf = BytesMut::new();
3421        buf.put_u8(0xAA); // ERROR token type
3422
3423        let msg_utf16: Vec<u16> = "Syntax error".encode_utf16().collect();
3424        let srv_utf16: Vec<u16> = "SRV".encode_utf16().collect();
3425        let proc_utf16: Vec<u16> = "sp_test".encode_utf16().collect();
3426
3427        let length: u16 = (4
3428            + 1
3429            + 1
3430            + 2
3431            + (msg_utf16.len() * 2)
3432            + 1
3433            + (srv_utf16.len() * 2)
3434            + 1
3435            + (proc_utf16.len() * 2)
3436            + 4) as u16;
3437
3438        buf.put_u16_le(length);
3439        buf.put_i32_le(102); // Syntax error
3440        buf.put_u8(1);
3441        buf.put_u8(15);
3442
3443        buf.put_u16_le(msg_utf16.len() as u16);
3444        for &c in &msg_utf16 {
3445            buf.put_u16_le(c);
3446        }
3447        buf.put_u8(srv_utf16.len() as u8);
3448        for &c in &srv_utf16 {
3449            buf.put_u16_le(c);
3450        }
3451        buf.put_u8(proc_utf16.len() as u8);
3452        for &c in &proc_utf16 {
3453            buf.put_u16_le(c);
3454        }
3455        buf.put_i32_le(5);
3456
3457        let mut parser = TokenParser::new(buf.freeze());
3458        let token = parser.next_token().unwrap().unwrap();
3459
3460        match token {
3461            Token::Error(err) => {
3462                assert_eq!(err.number, 102);
3463                assert_eq!(err.class, 15);
3464                assert_eq!(err.message, "Syntax error");
3465                assert_eq!(err.server, "SRV");
3466                assert_eq!(err.procedure, "sp_test");
3467                assert_eq!(err.line, 5);
3468            }
3469            _ => panic!("Expected Error token"),
3470        }
3471    }
3472
3473    // ========================================================================
3474    // ReturnValue Token Tests
3475    // ========================================================================
3476
3477    /// Helper: build a ReturnValue token (without the 0xAC type byte)
3478    /// for an IntN output parameter.
3479    fn build_return_value_intn(
3480        ordinal: u16,
3481        name: &str,
3482        status: u8,
3483        value: Option<i32>,
3484    ) -> BytesMut {
3485        let mut inner = BytesMut::new();
3486
3487        // param_ordinal
3488        inner.put_u16_le(ordinal);
3489
3490        // param_name (B_VARCHAR)
3491        let name_utf16: Vec<u16> = name.encode_utf16().collect();
3492        inner.put_u8(name_utf16.len() as u8);
3493        for &c in &name_utf16 {
3494            inner.put_u16_le(c);
3495        }
3496
3497        // status
3498        inner.put_u8(status);
3499
3500        // user_type (4 bytes)
3501        inner.put_u32_le(0);
3502
3503        // flags (2 bytes)
3504        inner.put_u16_le(0x0001); // nullable
3505
3506        // type_id: IntN = 0x26
3507        inner.put_u8(0x26);
3508
3509        // type_info for IntN: 1-byte max_length
3510        inner.put_u8(4);
3511
3512        // value (TYPE_VARBYTE for IntN: 1-byte length + data)
3513        match value {
3514            Some(v) => {
3515                inner.put_u8(4); // length = 4
3516                inner.put_i32_le(v);
3517            }
3518            None => {
3519                inner.put_u8(0); // length = 0 means NULL
3520            }
3521        }
3522
3523        // RETURNVALUE has no outer length prefix (MS-TDS §2.2.7.18) — the
3524        // decoder walks the inner fields directly after the 0xAC token byte.
3525        inner
3526    }
3527
3528    #[test]
3529    fn test_return_value_int_output() {
3530        let buf = build_return_value_intn(1, "@result", 0x01, Some(42));
3531        let mut cursor = buf.freeze();
3532        let rv = ReturnValue::decode(&mut cursor).unwrap();
3533
3534        assert_eq!(rv.param_ordinal, 1);
3535        assert_eq!(rv.param_name, "@result");
3536        assert_eq!(rv.status, 0x01); // OUTPUT
3537        assert_eq!(rv.col_type, 0x26); // IntN
3538        assert_eq!(rv.type_info.max_length, Some(4));
3539        // Value should contain: length byte (4) + i32 LE (42)
3540        assert_eq!(rv.value.len(), 5);
3541        assert_eq!(rv.value[0], 4);
3542        assert_eq!(
3543            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3544            42
3545        );
3546    }
3547
3548    #[test]
3549    fn test_return_value_null_output() {
3550        let buf = build_return_value_intn(2, "@count", 0x01, None);
3551        let mut cursor = buf.freeze();
3552        let rv = ReturnValue::decode(&mut cursor).unwrap();
3553
3554        assert_eq!(rv.param_ordinal, 2);
3555        assert_eq!(rv.param_name, "@count");
3556        assert_eq!(rv.status, 0x01);
3557        assert_eq!(rv.col_type, 0x26);
3558        // NULL value: length byte = 0
3559        assert_eq!(rv.value.len(), 1);
3560        assert_eq!(rv.value[0], 0);
3561    }
3562
3563    #[test]
3564    fn test_return_value_udf_status() {
3565        // UDF return value has status = 0x02
3566        let buf = build_return_value_intn(0, "@RETURN_VALUE", 0x02, Some(-1));
3567        let mut cursor = buf.freeze();
3568        let rv = ReturnValue::decode(&mut cursor).unwrap();
3569
3570        assert_eq!(rv.param_ordinal, 0);
3571        assert_eq!(rv.param_name, "@RETURN_VALUE");
3572        assert_eq!(rv.status, 0x02); // UDF return value
3573        assert_eq!(rv.value[0], 4);
3574        assert_eq!(
3575            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3576            -1
3577        );
3578    }
3579
3580    #[test]
3581    fn test_return_value_nvarchar_output() {
3582        // Build a ReturnValue for NVARCHAR(100) output parameter
3583        let mut inner = BytesMut::new();
3584
3585        // param_ordinal
3586        inner.put_u16_le(1);
3587
3588        // param_name "@name"
3589        let name_utf16: Vec<u16> = "@name".encode_utf16().collect();
3590        inner.put_u8(name_utf16.len() as u8);
3591        for &c in &name_utf16 {
3592            inner.put_u16_le(c);
3593        }
3594
3595        // status = OUTPUT
3596        inner.put_u8(0x01);
3597        // user_type
3598        inner.put_u32_le(0);
3599        // flags (nullable)
3600        inner.put_u16_le(0x0001);
3601        // type_id: NVarChar = 0xE7
3602        inner.put_u8(0xE7);
3603        // type_info for NVarChar: 2-byte max_length + 5-byte collation
3604        inner.put_u16_le(200); // max 100 chars * 2 bytes
3605        inner.put_u32_le(0x0904D000); // collation LCID
3606        inner.put_u8(0x34); // collation sort_id
3607
3608        // value: "Hello" in UTF-16LE with 2-byte length prefix
3609        let val_utf16: Vec<u16> = "Hello".encode_utf16().collect();
3610        let byte_len = (val_utf16.len() * 2) as u16;
3611        inner.put_u16_le(byte_len);
3612        for &c in &val_utf16 {
3613            inner.put_u16_le(c);
3614        }
3615
3616        let mut cursor = inner.freeze();
3617        let rv = ReturnValue::decode(&mut cursor).unwrap();
3618
3619        assert_eq!(rv.param_ordinal, 1);
3620        assert_eq!(rv.param_name, "@name");
3621        assert_eq!(rv.status, 0x01);
3622        assert_eq!(rv.col_type, 0xE7); // NVarChar
3623        assert_eq!(rv.type_info.max_length, Some(200));
3624        assert!(rv.type_info.collation.is_some());
3625
3626        // Value: 2-byte length (10) + "Hello" in UTF-16LE
3627        assert_eq!(rv.value.len(), 12); // 2 + 10
3628        let val_len = u16::from_le_bytes([rv.value[0], rv.value[1]]);
3629        assert_eq!(val_len, 10);
3630    }
3631
3632    #[test]
3633    fn test_return_value_via_parser() {
3634        // Build a full ReturnValue token with the 0xAC type byte
3635        let mut data = BytesMut::new();
3636        data.put_u8(0xAC); // RETURNVALUE token type
3637        data.extend_from_slice(&build_return_value_intn(0, "@out", 0x01, Some(99)));
3638
3639        let mut parser = TokenParser::new(data.freeze());
3640        let token = parser.next_token().unwrap().unwrap();
3641
3642        match token {
3643            Token::ReturnValue(rv) => {
3644                assert_eq!(rv.param_name, "@out");
3645                assert_eq!(rv.param_ordinal, 0);
3646                assert_eq!(rv.status, 0x01);
3647                assert_eq!(rv.col_type, 0x26);
3648            }
3649            _ => panic!("Expected ReturnValue token"),
3650        }
3651    }
3652
3653    // ========================================================================
3654    // Multi-Token Stream Tests
3655    // ========================================================================
3656
3657    #[test]
3658    fn test_multi_token_stored_proc_response() {
3659        // Simulate a stored procedure response:
3660        // DoneInProc (result set done) → ReturnStatus → DoneProc
3661        let mut data = BytesMut::new();
3662
3663        // Token 1: DONEINPROC — result set with 3 rows
3664        data.put_u8(0xFF); // DONEINPROC
3665        data.put_u16_le(0x0010); // status: COUNT
3666        data.put_u16_le(0x00C1); // cur_cmd: SELECT
3667        data.put_u64_le(3); // row_count
3668
3669        // Token 2: RETURNSTATUS — procedure returned 0
3670        data.put_u8(0x79); // RETURNSTATUS
3671        data.put_i32_le(0);
3672
3673        // Token 3: DONEPROC — final
3674        data.put_u8(0xFE); // DONEPROC
3675        data.put_u16_le(0x0000); // status: no flags
3676        data.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3677        data.put_u64_le(0);
3678
3679        let mut parser = TokenParser::new(data.freeze());
3680
3681        // Token 1: DoneInProc
3682        let t1 = parser.next_token().unwrap().unwrap();
3683        match t1 {
3684            Token::DoneInProc(done) => {
3685                assert!(done.status.count);
3686                assert_eq!(done.row_count, 3);
3687                assert_eq!(done.cur_cmd, 193);
3688            }
3689            _ => panic!("Expected DoneInProc, got {t1:?}"),
3690        }
3691
3692        // Token 2: ReturnStatus
3693        let t2 = parser.next_token().unwrap().unwrap();
3694        match t2 {
3695            Token::ReturnStatus(status) => {
3696                assert_eq!(status, 0);
3697            }
3698            _ => panic!("Expected ReturnStatus, got {t2:?}"),
3699        }
3700
3701        // Token 3: DoneProc
3702        let t3 = parser.next_token().unwrap().unwrap();
3703        match t3 {
3704            Token::DoneProc(done) => {
3705                assert!(!done.status.count);
3706                assert!(!done.status.more);
3707                assert_eq!(done.cur_cmd, 198);
3708            }
3709            _ => panic!("Expected DoneProc, got {t3:?}"),
3710        }
3711
3712        // No more tokens
3713        assert!(parser.next_token().unwrap().is_none());
3714    }
3715
3716    #[test]
3717    fn test_multi_token_error_in_stream() {
3718        // Simulate: ERROR → DONE (error during query)
3719        let mut data = BytesMut::new();
3720
3721        // Token 1: ERROR
3722        data.put_u8(0xAA);
3723
3724        let msg_utf16: Vec<u16> = "Deadlock".encode_utf16().collect();
3725        let srv_utf16: Vec<u16> = "DB1".encode_utf16().collect();
3726
3727        let length: u16 = (4 + 1 + 1
3728            + 2 + (msg_utf16.len() * 2)
3729            + 1 + (srv_utf16.len() * 2)
3730            + 1  // empty procedure
3731            + 4) as u16;
3732
3733        data.put_u16_le(length);
3734        data.put_i32_le(1205); // deadlock
3735        data.put_u8(51); // state
3736        data.put_u8(13); // class
3737
3738        data.put_u16_le(msg_utf16.len() as u16);
3739        for &c in &msg_utf16 {
3740            data.put_u16_le(c);
3741        }
3742        data.put_u8(srv_utf16.len() as u8);
3743        for &c in &srv_utf16 {
3744            data.put_u16_le(c);
3745        }
3746        data.put_u8(0); // empty procedure
3747        data.put_i32_le(0);
3748
3749        // Token 2: DONE with error flag
3750        data.put_u8(0xFD);
3751        data.put_u16_le(0x0002); // DONE_ERROR
3752        data.put_u16_le(0x00C1); // SELECT
3753        data.put_u64_le(0);
3754
3755        let mut parser = TokenParser::new(data.freeze());
3756
3757        // Token 1: Error
3758        let t1 = parser.next_token().unwrap().unwrap();
3759        match t1 {
3760            Token::Error(err) => {
3761                assert_eq!(err.number, 1205);
3762                assert_eq!(err.class, 13);
3763                assert_eq!(err.message, "Deadlock");
3764                assert_eq!(err.server, "DB1");
3765            }
3766            _ => panic!("Expected Error token, got {t1:?}"),
3767        }
3768
3769        // Token 2: Done with error
3770        let t2 = parser.next_token().unwrap().unwrap();
3771        match t2 {
3772            Token::Done(done) => {
3773                assert!(done.status.error);
3774                assert!(!done.status.count);
3775            }
3776            _ => panic!("Expected Done token, got {t2:?}"),
3777        }
3778
3779        assert!(parser.next_token().unwrap().is_none());
3780    }
3781
3782    #[test]
3783    fn test_multi_token_proc_with_return_value() {
3784        // Simulate stored proc: ReturnValue → ReturnStatus → DoneProc
3785        let mut data = BytesMut::new();
3786
3787        // Token 1: ReturnValue (@result = 42)
3788        data.put_u8(0xAC);
3789        data.extend_from_slice(&build_return_value_intn(1, "@result", 0x01, Some(42)));
3790
3791        // Token 2: ReturnStatus = 0
3792        data.put_u8(0x79);
3793        data.put_i32_le(0);
3794
3795        // Token 3: DoneProc
3796        data.put_u8(0xFE);
3797        data.put_u16_le(0x0000);
3798        data.put_u16_le(0x00C6);
3799        data.put_u64_le(0);
3800
3801        let mut parser = TokenParser::new(data.freeze());
3802
3803        let t1 = parser.next_token().unwrap().unwrap();
3804        match t1 {
3805            Token::ReturnValue(rv) => {
3806                assert_eq!(rv.param_name, "@result");
3807                assert_eq!(rv.param_ordinal, 1);
3808            }
3809            _ => panic!("Expected ReturnValue, got {t1:?}"),
3810        }
3811
3812        let t2 = parser.next_token().unwrap().unwrap();
3813        assert!(matches!(t2, Token::ReturnStatus(0)));
3814
3815        let t3 = parser.next_token().unwrap().unwrap();
3816        assert!(matches!(t3, Token::DoneProc(_)));
3817
3818        assert!(parser.next_token().unwrap().is_none());
3819    }
3820
3821    // ========================================================================
3822    // EOF / Truncation Edge Cases
3823    // ========================================================================
3824
3825    #[test]
3826    fn test_return_status_truncated() {
3827        // Only 3 bytes instead of 4 for i32
3828        let data = Bytes::from_static(&[0x79, 0x01, 0x02, 0x03]);
3829        let mut parser = TokenParser::new(data);
3830        assert!(parser.next_token().is_err());
3831    }
3832
3833    #[test]
3834    fn test_done_proc_truncated() {
3835        // Only 8 bytes instead of 12
3836        let data = Bytes::from_static(&[0xFE, 0x00, 0x00, 0xC1, 0x00, 0x01, 0x00, 0x00, 0x00]);
3837        let mut parser = TokenParser::new(data);
3838        assert!(parser.next_token().is_err());
3839    }
3840
3841    #[test]
3842    fn test_server_error_truncated() {
3843        // ERROR token with only the length field (body truncated)
3844        let data = Bytes::from_static(&[0xAA, 0x20, 0x00]);
3845        let mut parser = TokenParser::new(data);
3846        assert!(parser.next_token().is_err());
3847    }
3848}