Skip to main content

tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,ignore
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! let data: Bytes = /* received from server */;
19//! let mut parser = TokenParser::new(data);
20//!
21//! while let Some(token) = parser.next_token()? {
22//!     match token {
23//!         Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!         Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!         _ => {}
26//!     }
27//! }
28//! ```
29
30use bytes::{Buf, BufMut, Bytes};
31
32use crate::codec::{read_b_varchar, read_us_varchar};
33use crate::error::ProtocolError;
34use crate::prelude::*;
35use crate::types::TypeId;
36
37/// Token type identifier.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39#[repr(u8)]
40#[non_exhaustive]
41pub enum TokenType {
42    /// Column metadata (COLMETADATA).
43    ColMetaData = 0x81,
44    /// Error message (ERROR).
45    Error = 0xAA,
46    /// Informational message (INFO).
47    Info = 0xAB,
48    /// Login acknowledgment (LOGINACK).
49    LoginAck = 0xAD,
50    /// Row data (ROW).
51    Row = 0xD1,
52    /// Null bitmap compressed row (NBCROW).
53    NbcRow = 0xD2,
54    /// Environment change (ENVCHANGE).
55    EnvChange = 0xE3,
56    /// SSPI authentication (SSPI).
57    Sspi = 0xED,
58    /// Done (DONE).
59    Done = 0xFD,
60    /// Done in procedure (DONEINPROC).
61    DoneInProc = 0xFF,
62    /// Done procedure (DONEPROC).
63    DoneProc = 0xFE,
64    /// Return status (RETURNSTATUS).
65    ReturnStatus = 0x79,
66    /// Return value (RETURNVALUE).
67    ReturnValue = 0xAC,
68    /// Order (ORDER).
69    Order = 0xA9,
70    /// Feature extension acknowledgment (FEATUREEXTACK).
71    FeatureExtAck = 0xAE,
72    /// Session state (SESSIONSTATE).
73    SessionState = 0xE4,
74    /// Federated authentication info (FEDAUTHINFO).
75    FedAuthInfo = 0xEE,
76    /// Column info (COLINFO).
77    ColInfo = 0xA5,
78    /// Table name (TABNAME).
79    TabName = 0xA4,
80    /// Offset (OFFSET).
81    Offset = 0x78,
82}
83
84impl TokenType {
85    /// Create a token type from a raw byte.
86    pub fn from_u8(value: u8) -> Option<Self> {
87        match value {
88            0x81 => Some(Self::ColMetaData),
89            0xAA => Some(Self::Error),
90            0xAB => Some(Self::Info),
91            0xAD => Some(Self::LoginAck),
92            0xD1 => Some(Self::Row),
93            0xD2 => Some(Self::NbcRow),
94            0xE3 => Some(Self::EnvChange),
95            0xED => Some(Self::Sspi),
96            0xFD => Some(Self::Done),
97            0xFF => Some(Self::DoneInProc),
98            0xFE => Some(Self::DoneProc),
99            0x79 => Some(Self::ReturnStatus),
100            0xAC => Some(Self::ReturnValue),
101            0xA9 => Some(Self::Order),
102            0xAE => Some(Self::FeatureExtAck),
103            0xE4 => Some(Self::SessionState),
104            0xEE => Some(Self::FedAuthInfo),
105            0xA5 => Some(Self::ColInfo),
106            0xA4 => Some(Self::TabName),
107            0x78 => Some(Self::Offset),
108            _ => None,
109        }
110    }
111}
112
113/// Parsed TDS token.
114///
115/// This enum represents all possible tokens that can be received from SQL Server.
116/// Each variant contains the parsed token data.
117#[derive(Debug, Clone)]
118#[non_exhaustive]
119pub enum Token {
120    /// Column metadata describing result set structure.
121    ColMetaData(ColMetaData),
122    /// Row data.
123    Row(RawRow),
124    /// Null bitmap compressed row.
125    NbcRow(NbcRow),
126    /// Completion of a SQL statement.
127    Done(Done),
128    /// Completion of a stored procedure.
129    DoneProc(DoneProc),
130    /// Completion within a stored procedure.
131    DoneInProc(DoneInProc),
132    /// Return status from stored procedure.
133    ReturnStatus(i32),
134    /// Return value from stored procedure.
135    ReturnValue(ReturnValue),
136    /// Error message from server.
137    Error(ServerError),
138    /// Informational message from server.
139    Info(ServerInfo),
140    /// Login acknowledgment.
141    LoginAck(LoginAck),
142    /// Environment change notification.
143    EnvChange(EnvChange),
144    /// Column ordering information.
145    Order(Order),
146    /// Feature extension acknowledgment.
147    FeatureExtAck(FeatureExtAck),
148    /// SSPI authentication data.
149    Sspi(SspiToken),
150    /// Session state information.
151    SessionState(SessionState),
152    /// Federated authentication info.
153    FedAuthInfo(FedAuthInfo),
154}
155
156/// Column metadata token.
157#[derive(Debug, Clone, Default)]
158pub struct ColMetaData {
159    /// Column definitions.
160    pub columns: Vec<ColumnData>,
161    /// CEK table for Always Encrypted result sets.
162    /// Present only when the server sends encrypted column metadata.
163    pub cek_table: Option<crate::crypto::CekTable>,
164}
165
166/// Column definition within metadata.
167#[derive(Debug, Clone)]
168pub struct ColumnData {
169    /// Column name.
170    pub name: String,
171    /// Column data type ID.
172    pub type_id: TypeId,
173    /// Column data type raw byte (for unknown types).
174    pub col_type: u8,
175    /// Column flags.
176    pub flags: u16,
177    /// User type ID.
178    pub user_type: u32,
179    /// Type-specific metadata.
180    pub type_info: TypeInfo,
181    /// Per-column encryption metadata (Always Encrypted).
182    /// Present only for columns with the encrypted flag (0x0800) set.
183    pub crypto_metadata: Option<crate::crypto::CryptoMetadata>,
184}
185
186/// Type-specific metadata.
187#[derive(Debug, Clone, Default)]
188pub struct TypeInfo {
189    /// Maximum length for variable-length types.
190    pub max_length: Option<u32>,
191    /// Precision for numeric types.
192    pub precision: Option<u8>,
193    /// Scale for numeric types.
194    pub scale: Option<u8>,
195    /// Collation for string types.
196    pub collation: Option<Collation>,
197}
198
199/// SQL Server collation.
200///
201/// Collations in SQL Server define the character encoding and sorting rules
202/// for string data. For `VARCHAR` columns, the collation determines which
203/// code page (character encoding) is used to store the data.
204///
205/// # Encoding Support
206///
207/// When the `encoding` feature is enabled, the [`Collation::encoding()`] method
208/// returns the appropriate [`encoding_rs::Encoding`] for decoding `VARCHAR` data.
209///
210/// # Example
211///
212/// ```rust,ignore
213/// use tds_protocol::token::Collation;
214///
215/// let collation = Collation { lcid: 0x0804, sort_id: 0 }; // Chinese (PRC)
216/// if let Some(encoding) = collation.encoding() {
217///     let (decoded, _, _) = encoding.decode(raw_bytes);
218///     // decoded is now proper Chinese text
219/// }
220/// ```
221#[derive(Debug, Clone, Copy, Default)]
222pub struct Collation {
223    /// Locale ID (LCID).
224    ///
225    /// The LCID encodes both the language and region. The lower 16 bits
226    /// contain the primary language ID, and bits 16-19 contain the sort ID
227    /// for some collations.
228    ///
229    /// For UTF-8 collations (SQL Server 2019+), bit 27 (0x0800_0000) is set.
230    pub lcid: u32,
231    /// Sort ID.
232    ///
233    /// Used with certain collations to specify sorting behavior.
234    pub sort_id: u8,
235}
236
237impl Collation {
238    /// Returns the character encoding for this collation.
239    ///
240    /// This method maps the collation's LCID to the appropriate character
241    /// encoding from the `encoding_rs` crate.
242    ///
243    /// # Returns
244    ///
245    /// - `Some(&Encoding)` - The encoding to use for decoding `VARCHAR` data
246    /// - `None` - If the collation uses UTF-8 (no transcoding needed) or
247    ///   the LCID is not recognized (caller should use Windows-1252 fallback)
248    ///
249    /// # UTF-8 Collations
250    ///
251    /// SQL Server 2019+ supports UTF-8 collations (identified by the `_UTF8`
252    /// suffix). These return `None` because no transcoding is needed.
253    ///
254    /// # Example
255    ///
256    /// ```rust,ignore
257    /// let collation = Collation { lcid: 0x0419, sort_id: 0 }; // Russian
258    /// if let Some(encoding) = collation.encoding() {
259    ///     // encoding is Windows-1251 for Cyrillic
260    ///     let (text, _, had_errors) = encoding.decode(&raw_bytes);
261    /// }
262    /// ```
263    #[cfg(feature = "encoding")]
264    pub fn encoding(&self) -> Option<&'static encoding_rs::Encoding> {
265        crate::collation::encoding_for_lcid(self.lcid)
266    }
267
268    /// Returns whether this collation uses UTF-8 encoding.
269    ///
270    /// UTF-8 collations were introduced in SQL Server 2019 and are
271    /// identified by the `_UTF8` suffix in the collation name.
272    #[cfg(feature = "encoding")]
273    pub fn is_utf8(&self) -> bool {
274        crate::collation::is_utf8_collation(self.lcid)
275    }
276
277    /// Returns the Windows code page number for this collation.
278    ///
279    /// Useful for error messages and debugging.
280    ///
281    /// # Returns
282    ///
283    /// The code page number (e.g., 1252 for Western European, 932 for Japanese).
284    #[cfg(feature = "encoding")]
285    pub fn code_page(&self) -> Option<u16> {
286        crate::collation::code_page_for_lcid(self.lcid)
287    }
288
289    /// Returns the encoding name for this collation.
290    ///
291    /// Useful for error messages and debugging.
292    #[cfg(feature = "encoding")]
293    pub fn encoding_name(&self) -> &'static str {
294        crate::collation::encoding_name_for_lcid(self.lcid)
295    }
296}
297
298/// Raw row data (not yet decoded).
299#[derive(Debug, Clone)]
300pub struct RawRow {
301    /// Raw column values.
302    pub data: bytes::Bytes,
303}
304
305/// Null bitmap compressed row.
306#[derive(Debug, Clone)]
307pub struct NbcRow {
308    /// Null bitmap.
309    pub null_bitmap: Vec<u8>,
310    /// Raw non-null column values.
311    pub data: bytes::Bytes,
312}
313
314/// Done token indicating statement completion.
315#[derive(Debug, Clone, Copy)]
316pub struct Done {
317    /// Status flags.
318    pub status: DoneStatus,
319    /// Current command.
320    pub cur_cmd: u16,
321    /// Row count (if applicable).
322    pub row_count: u64,
323}
324
325/// Done status flags.
326#[derive(Debug, Clone, Copy, Default)]
327#[non_exhaustive]
328pub struct DoneStatus {
329    /// More results follow.
330    pub more: bool,
331    /// Error occurred.
332    pub error: bool,
333    /// Transaction in progress.
334    pub in_xact: bool,
335    /// Row count is valid.
336    pub count: bool,
337    /// Attention acknowledgment.
338    pub attn: bool,
339    /// Server error caused statement termination.
340    pub srverror: bool,
341}
342
343/// Done in procedure token.
344#[derive(Debug, Clone, Copy)]
345pub struct DoneInProc {
346    /// Status flags.
347    pub status: DoneStatus,
348    /// Current command.
349    pub cur_cmd: u16,
350    /// Row count.
351    pub row_count: u64,
352}
353
354/// Done procedure token.
355#[derive(Debug, Clone, Copy)]
356pub struct DoneProc {
357    /// Status flags.
358    pub status: DoneStatus,
359    /// Current command.
360    pub cur_cmd: u16,
361    /// Row count.
362    pub row_count: u64,
363}
364
365/// Return value from stored procedure.
366#[derive(Debug, Clone)]
367#[non_exhaustive]
368pub struct ReturnValue {
369    /// Parameter ordinal.
370    pub param_ordinal: u16,
371    /// Parameter name.
372    pub param_name: String,
373    /// Status flags.
374    pub status: u8,
375    /// User type.
376    pub user_type: u32,
377    /// Type flags.
378    pub flags: u16,
379    /// Raw column type byte from the wire.
380    pub col_type: u8,
381    /// Type info.
382    pub type_info: TypeInfo,
383    /// Value data.
384    pub value: bytes::Bytes,
385}
386
387/// Server error message.
388#[derive(Debug, Clone)]
389pub struct ServerError {
390    /// Error number.
391    pub number: i32,
392    /// Error state.
393    pub state: u8,
394    /// Error severity class.
395    pub class: u8,
396    /// Error message text.
397    pub message: String,
398    /// Server name.
399    pub server: String,
400    /// Procedure name.
401    pub procedure: String,
402    /// Line number.
403    pub line: i32,
404}
405
406/// Server informational message.
407#[derive(Debug, Clone)]
408pub struct ServerInfo {
409    /// Info number.
410    pub number: i32,
411    /// Info state.
412    pub state: u8,
413    /// Info class (severity).
414    pub class: u8,
415    /// Info message text.
416    pub message: String,
417    /// Server name.
418    pub server: String,
419    /// Procedure name.
420    pub procedure: String,
421    /// Line number.
422    pub line: i32,
423}
424
425/// Login acknowledgment token.
426#[derive(Debug, Clone)]
427pub struct LoginAck {
428    /// Interface type.
429    pub interface: u8,
430    /// TDS version.
431    pub tds_version: u32,
432    /// Program name.
433    pub prog_name: String,
434    /// Program version.
435    pub prog_version: u32,
436}
437
438/// Environment change token.
439#[derive(Debug, Clone)]
440pub struct EnvChange {
441    /// Type of environment change.
442    pub env_type: EnvChangeType,
443    /// New value.
444    pub new_value: EnvChangeValue,
445    /// Old value.
446    pub old_value: EnvChangeValue,
447}
448
449/// Environment change type.
450#[derive(Debug, Clone, Copy, PartialEq, Eq)]
451#[repr(u8)]
452#[non_exhaustive]
453pub enum EnvChangeType {
454    /// Database changed.
455    Database = 1,
456    /// Language changed.
457    Language = 2,
458    /// Character set changed.
459    CharacterSet = 3,
460    /// Packet size changed.
461    PacketSize = 4,
462    /// Unicode data sorting locale ID.
463    UnicodeSortingLocalId = 5,
464    /// Unicode comparison flags.
465    UnicodeComparisonFlags = 6,
466    /// SQL collation.
467    SqlCollation = 7,
468    /// Begin transaction.
469    BeginTransaction = 8,
470    /// Commit transaction.
471    CommitTransaction = 9,
472    /// Rollback transaction.
473    RollbackTransaction = 10,
474    /// Enlist DTC transaction.
475    EnlistDtcTransaction = 11,
476    /// Defect DTC transaction.
477    DefectTransaction = 12,
478    /// Real-time log shipping.
479    RealTimeLogShipping = 13,
480    /// Promote transaction.
481    PromoteTransaction = 15,
482    /// Transaction manager address.
483    TransactionManagerAddress = 16,
484    /// Transaction ended.
485    TransactionEnded = 17,
486    /// Reset connection completion acknowledgment.
487    ResetConnectionCompletionAck = 18,
488    /// User instance started.
489    UserInstanceStarted = 19,
490    /// Routing information.
491    Routing = 20,
492}
493
494/// Environment change value.
495#[derive(Debug, Clone)]
496#[non_exhaustive]
497pub enum EnvChangeValue {
498    /// String value.
499    String(String),
500    /// Binary value.
501    Binary(bytes::Bytes),
502    /// Routing information.
503    Routing {
504        /// Host name.
505        host: String,
506        /// Port number.
507        port: u16,
508    },
509}
510
511/// Column ordering information.
512#[derive(Debug, Clone)]
513pub struct Order {
514    /// Ordered column indices.
515    pub columns: Vec<u16>,
516}
517
518/// Feature extension acknowledgment.
519#[derive(Debug, Clone)]
520pub struct FeatureExtAck {
521    /// Acknowledged features.
522    pub features: Vec<FeatureAck>,
523}
524
525/// Individual feature acknowledgment.
526#[derive(Debug, Clone)]
527pub struct FeatureAck {
528    /// Feature ID.
529    pub feature_id: u8,
530    /// Feature data.
531    pub data: bytes::Bytes,
532}
533
534/// SSPI authentication token.
535#[derive(Debug, Clone)]
536pub struct SspiToken {
537    /// SSPI data.
538    pub data: bytes::Bytes,
539}
540
541/// Session state token.
542#[derive(Debug, Clone)]
543pub struct SessionState {
544    /// Session state data.
545    pub data: bytes::Bytes,
546}
547
548/// Federated authentication info.
549#[derive(Debug, Clone)]
550pub struct FedAuthInfo {
551    /// STS URL.
552    pub sts_url: String,
553    /// Service principal name.
554    pub spn: String,
555}
556
557// =============================================================================
558// ColMetaData and Row Parsing Implementation
559// =============================================================================
560
561/// Decode collation information (5 bytes).
562///
563/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
564pub(crate) fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
565    if src.remaining() < 5 {
566        return Err(ProtocolError::UnexpectedEof);
567    }
568    // Collation: LCID (4 bytes) + Sort ID (1 byte)
569    let lcid = src.get_u32_le();
570    let sort_id = src.get_u8();
571    Ok(Collation { lcid, sort_id })
572}
573
574/// Decode type-specific metadata for a column based on its TypeId.
575///
576/// Shared by ColMetaData column parsing and CryptoMetadata base type parsing.
577pub(crate) fn decode_type_info(
578    src: &mut impl Buf,
579    type_id: TypeId,
580    col_type: u8,
581) -> Result<TypeInfo, ProtocolError> {
582    match type_id {
583        // Fixed-length types have no additional metadata
584        TypeId::Null => Ok(TypeInfo::default()),
585        TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
586        TypeId::Int2 => Ok(TypeInfo::default()),
587        TypeId::Int4 => Ok(TypeInfo::default()),
588        TypeId::Int8 => Ok(TypeInfo::default()),
589        TypeId::Float4 => Ok(TypeInfo::default()),
590        TypeId::Float8 => Ok(TypeInfo::default()),
591        TypeId::Money => Ok(TypeInfo::default()),
592        TypeId::Money4 => Ok(TypeInfo::default()),
593        TypeId::DateTime => Ok(TypeInfo::default()),
594        TypeId::DateTime4 => Ok(TypeInfo::default()),
595
596        // Variable length integer/float/money (1-byte max length)
597        TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
598            if src.remaining() < 1 {
599                return Err(ProtocolError::UnexpectedEof);
600            }
601            let max_length = src.get_u8() as u32;
602            Ok(TypeInfo {
603                max_length: Some(max_length),
604                ..Default::default()
605            })
606        }
607
608        // GUID has 1-byte length
609        TypeId::Guid => {
610            if src.remaining() < 1 {
611                return Err(ProtocolError::UnexpectedEof);
612            }
613            let max_length = src.get_u8() as u32;
614            Ok(TypeInfo {
615                max_length: Some(max_length),
616                ..Default::default()
617            })
618        }
619
620        // Decimal/Numeric types (1-byte length + precision + scale)
621        TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
622            if src.remaining() < 3 {
623                return Err(ProtocolError::UnexpectedEof);
624            }
625            let max_length = src.get_u8() as u32;
626            let precision = src.get_u8();
627            let scale = src.get_u8();
628            Ok(TypeInfo {
629                max_length: Some(max_length),
630                precision: Some(precision),
631                scale: Some(scale),
632                ..Default::default()
633            })
634        }
635
636        // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
637        TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
638            if src.remaining() < 1 {
639                return Err(ProtocolError::UnexpectedEof);
640            }
641            let max_length = src.get_u8() as u32;
642            Ok(TypeInfo {
643                max_length: Some(max_length),
644                ..Default::default()
645            })
646        }
647
648        // Big varchar/binary with 2-byte length + collation for strings
649        TypeId::BigVarChar | TypeId::BigChar => {
650            if src.remaining() < 7 {
651                // 2 (length) + 5 (collation)
652                return Err(ProtocolError::UnexpectedEof);
653            }
654            let max_length = src.get_u16_le() as u32;
655            let collation = decode_collation(src)?;
656            Ok(TypeInfo {
657                max_length: Some(max_length),
658                collation: Some(collation),
659                ..Default::default()
660            })
661        }
662
663        // Big binary (2-byte length, no collation)
664        TypeId::BigVarBinary | TypeId::BigBinary => {
665            if src.remaining() < 2 {
666                return Err(ProtocolError::UnexpectedEof);
667            }
668            let max_length = src.get_u16_le() as u32;
669            Ok(TypeInfo {
670                max_length: Some(max_length),
671                ..Default::default()
672            })
673        }
674
675        // Unicode strings (NChar, NVarChar) - 2-byte length + collation
676        TypeId::NChar | TypeId::NVarChar => {
677            if src.remaining() < 7 {
678                // 2 (length) + 5 (collation)
679                return Err(ProtocolError::UnexpectedEof);
680            }
681            let max_length = src.get_u16_le() as u32;
682            let collation = decode_collation(src)?;
683            Ok(TypeInfo {
684                max_length: Some(max_length),
685                collation: Some(collation),
686                ..Default::default()
687            })
688        }
689
690        // Date type (no additional metadata)
691        TypeId::Date => Ok(TypeInfo::default()),
692
693        // Time, DateTime2, DateTimeOffset have scale
694        TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
695            if src.remaining() < 1 {
696                return Err(ProtocolError::UnexpectedEof);
697            }
698            let scale = src.get_u8();
699            Ok(TypeInfo {
700                scale: Some(scale),
701                ..Default::default()
702            })
703        }
704
705        // Text/NText/Image (deprecated LOB types)
706        TypeId::Text | TypeId::NText | TypeId::Image => {
707            // These have complex metadata: length (4) + collation (5) + table name parts
708            if src.remaining() < 4 {
709                return Err(ProtocolError::UnexpectedEof);
710            }
711            let max_length = src.get_u32_le();
712
713            // For Text/NText, read collation
714            let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
715                if src.remaining() < 5 {
716                    return Err(ProtocolError::UnexpectedEof);
717                }
718                Some(decode_collation(src)?)
719            } else {
720                None
721            };
722
723            // Skip table name parts (variable length)
724            // Format: numParts (1 byte) followed by us_varchar for each part
725            if src.remaining() < 1 {
726                return Err(ProtocolError::UnexpectedEof);
727            }
728            let num_parts = src.get_u8();
729            for _ in 0..num_parts {
730                // Read and discard table name part
731                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
732            }
733
734            Ok(TypeInfo {
735                max_length: Some(max_length),
736                collation,
737                ..Default::default()
738            })
739        }
740
741        // XML type
742        TypeId::Xml => {
743            if src.remaining() < 1 {
744                return Err(ProtocolError::UnexpectedEof);
745            }
746            let schema_present = src.get_u8();
747
748            if schema_present != 0 {
749                // Read schema info (3 us_varchar strings)
750                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
751                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
752                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
753            }
754
755            Ok(TypeInfo::default())
756        }
757
758        // UDT (User-defined type) - complex metadata
759        TypeId::Udt => {
760            // Max length (2 bytes)
761            if src.remaining() < 2 {
762                return Err(ProtocolError::UnexpectedEof);
763            }
764            let max_length = src.get_u16_le() as u32;
765
766            // UDT metadata: db name, schema name, type name, assembly qualified name
767            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
768            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
769            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
770            let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
771
772            Ok(TypeInfo {
773                max_length: Some(max_length),
774                ..Default::default()
775            })
776        }
777
778        // Table-valued parameter - complex metadata (skip for now)
779        TypeId::Tvp => {
780            // TVP has very complex metadata, not commonly used
781            // For now, we can't properly parse this
782            Err(ProtocolError::InvalidTokenType(col_type))
783        }
784
785        // SQL Variant - 4-byte length
786        TypeId::Variant => {
787            if src.remaining() < 4 {
788                return Err(ProtocolError::UnexpectedEof);
789            }
790            let max_length = src.get_u32_le();
791            Ok(TypeInfo {
792                max_length: Some(max_length),
793                ..Default::default()
794            })
795        }
796    }
797}
798
799impl ColMetaData {
800    /// Special value indicating no metadata.
801    pub const NO_METADATA: u16 = 0xFFFF;
802
803    /// Decode a COLMETADATA token from bytes.
804    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
805        if src.remaining() < 2 {
806            return Err(ProtocolError::UnexpectedEof);
807        }
808
809        let column_count = src.get_u16_le();
810
811        // 0xFFFF means no metadata present
812        if column_count == Self::NO_METADATA {
813            return Ok(Self {
814                columns: Vec::new(),
815                cek_table: None,
816            });
817        }
818
819        let mut columns = Vec::with_capacity(column_count as usize);
820
821        for _ in 0..column_count {
822            let column = Self::decode_column(src)?;
823            columns.push(column);
824        }
825
826        Ok(Self {
827            columns,
828            cek_table: None,
829        })
830    }
831
832    /// Decode a single column from the metadata.
833    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
834        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
835        if src.remaining() < 7 {
836            return Err(ProtocolError::UnexpectedEof);
837        }
838
839        let user_type = src.get_u32_le();
840        let flags = src.get_u16_le();
841        let col_type = src.get_u8();
842
843        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null); // Default to Null for unknown types
844
845        // Parse type-specific metadata
846        let type_info = decode_type_info(src, type_id, col_type)?;
847
848        // Read column name (B_VARCHAR format - 1 byte length in characters)
849        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
850
851        Ok(ColumnData {
852            name,
853            type_id,
854            col_type,
855            flags,
856            user_type,
857            type_info,
858            crypto_metadata: None,
859        })
860    }
861
862    /// Decode a COLMETADATA token with Always Encrypted support.
863    ///
864    /// When column encryption was negotiated in Login7, the server sends a CekTable
865    /// before column definitions and per-column CryptoMetadata for encrypted columns.
866    ///
867    /// # Wire Format (with encryption)
868    ///
869    /// ```text
870    /// column_count: USHORT
871    /// cek_table: CekTable (always present when encryption negotiated)
872    /// columns: ColumnData[column_count] (with CryptoMetadata for encrypted columns)
873    /// ```
874    pub fn decode_encrypted(src: &mut impl Buf) -> Result<Self, ProtocolError> {
875        if src.remaining() < 2 {
876            return Err(ProtocolError::UnexpectedEof);
877        }
878
879        let column_count = src.get_u16_le();
880
881        if column_count == Self::NO_METADATA {
882            return Ok(Self {
883                columns: Vec::new(),
884                cek_table: None,
885            });
886        }
887
888        // Parse CEK table (always present when encryption was negotiated)
889        let cek_table = crate::crypto::CekTable::decode(src)?;
890
891        let mut columns = Vec::with_capacity(column_count as usize);
892
893        for _ in 0..column_count {
894            let column = Self::decode_column_encrypted(src)?;
895            columns.push(column);
896        }
897
898        Ok(Self {
899            columns,
900            cek_table: Some(cek_table),
901        })
902    }
903
904    /// Decode a single column definition with Always Encrypted support.
905    ///
906    /// For encrypted columns (flags & 0x0800), parses CryptoMetadata after the type info.
907    fn decode_column_encrypted(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
908        if src.remaining() < 7 {
909            return Err(ProtocolError::UnexpectedEof);
910        }
911
912        let user_type = src.get_u32_le();
913        let flags = src.get_u16_le();
914        let col_type = src.get_u8();
915
916        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
917
918        // Parse type-specific metadata (for encrypted columns, this is the transport type)
919        let type_info = decode_type_info(src, type_id, col_type)?;
920
921        // Parse CryptoMetadata if the column is encrypted
922        let crypto_metadata = if crate::crypto::is_column_encrypted(flags) {
923            Some(crate::crypto::CryptoMetadata::decode(src)?)
924        } else {
925            None
926        };
927
928        // Read column name
929        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
930
931        Ok(ColumnData {
932            name,
933            type_id,
934            col_type,
935            flags,
936            user_type,
937            type_info,
938            crypto_metadata,
939        })
940    }
941
942    /// Get the number of columns.
943    #[must_use]
944    pub fn column_count(&self) -> usize {
945        self.columns.len()
946    }
947
948    /// Check if this represents no metadata.
949    #[must_use]
950    pub fn is_empty(&self) -> bool {
951        self.columns.is_empty()
952    }
953}
954
955impl ColumnData {
956    /// Check if this column is nullable.
957    #[must_use]
958    pub fn is_nullable(&self) -> bool {
959        (self.flags & 0x0001) != 0
960    }
961
962    /// Get the fixed size in bytes for this column, if applicable.
963    ///
964    /// Returns `None` for variable-length types.
965    #[must_use]
966    pub fn fixed_size(&self) -> Option<usize> {
967        match self.type_id {
968            TypeId::Null => Some(0),
969            TypeId::Int1 | TypeId::Bit => Some(1),
970            TypeId::Int2 => Some(2),
971            TypeId::Int4 => Some(4),
972            TypeId::Int8 => Some(8),
973            TypeId::Float4 => Some(4),
974            TypeId::Float8 => Some(8),
975            TypeId::Money => Some(8),
976            TypeId::Money4 => Some(4),
977            TypeId::DateTime => Some(8),
978            TypeId::DateTime4 => Some(4),
979            TypeId::Date => Some(3),
980            _ => None,
981        }
982    }
983}
984
985// =============================================================================
986// Row Parsing Implementation
987// =============================================================================
988
989impl RawRow {
990    /// Decode a ROW token from bytes.
991    ///
992    /// This function requires the column metadata to know how to parse the row.
993    /// The row data is stored as raw bytes for later parsing.
994    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
995        let mut data = bytes::BytesMut::new();
996
997        for col in &metadata.columns {
998            Self::decode_column_value(src, col, &mut data)?;
999        }
1000
1001        Ok(Self {
1002            data: data.freeze(),
1003        })
1004    }
1005
1006    /// Decode a single column value and append to the output buffer.
1007    fn decode_column_value(
1008        src: &mut impl Buf,
1009        col: &ColumnData,
1010        dst: &mut bytes::BytesMut,
1011    ) -> Result<(), ProtocolError> {
1012        match col.type_id {
1013            // Fixed-length types
1014            TypeId::Null => {
1015                // No data
1016            }
1017            TypeId::Int1 | TypeId::Bit => {
1018                if src.remaining() < 1 {
1019                    return Err(ProtocolError::UnexpectedEof);
1020                }
1021                dst.extend_from_slice(&[src.get_u8()]);
1022            }
1023            TypeId::Int2 => {
1024                if src.remaining() < 2 {
1025                    return Err(ProtocolError::UnexpectedEof);
1026                }
1027                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
1028            }
1029            TypeId::Int4 => {
1030                if src.remaining() < 4 {
1031                    return Err(ProtocolError::UnexpectedEof);
1032                }
1033                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1034            }
1035            TypeId::Int8 => {
1036                if src.remaining() < 8 {
1037                    return Err(ProtocolError::UnexpectedEof);
1038                }
1039                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1040            }
1041            TypeId::Float4 => {
1042                if src.remaining() < 4 {
1043                    return Err(ProtocolError::UnexpectedEof);
1044                }
1045                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1046            }
1047            TypeId::Float8 => {
1048                if src.remaining() < 8 {
1049                    return Err(ProtocolError::UnexpectedEof);
1050                }
1051                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
1052            }
1053            TypeId::Money => {
1054                if src.remaining() < 8 {
1055                    return Err(ProtocolError::UnexpectedEof);
1056                }
1057                let hi = src.get_u32_le();
1058                let lo = src.get_u32_le();
1059                dst.extend_from_slice(&hi.to_le_bytes());
1060                dst.extend_from_slice(&lo.to_le_bytes());
1061            }
1062            TypeId::Money4 => {
1063                if src.remaining() < 4 {
1064                    return Err(ProtocolError::UnexpectedEof);
1065                }
1066                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1067            }
1068            TypeId::DateTime => {
1069                if src.remaining() < 8 {
1070                    return Err(ProtocolError::UnexpectedEof);
1071                }
1072                let days = src.get_u32_le();
1073                let time = src.get_u32_le();
1074                dst.extend_from_slice(&days.to_le_bytes());
1075                dst.extend_from_slice(&time.to_le_bytes());
1076            }
1077            TypeId::DateTime4 => {
1078                if src.remaining() < 4 {
1079                    return Err(ProtocolError::UnexpectedEof);
1080                }
1081                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
1082            }
1083            // DATE type uses 1-byte length prefix (can be NULL)
1084            TypeId::Date => {
1085                Self::decode_bytelen_type(src, dst)?;
1086            }
1087
1088            // Variable-length nullable types (length-prefixed)
1089            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
1090                Self::decode_bytelen_type(src, dst)?;
1091            }
1092
1093            TypeId::Guid => {
1094                Self::decode_bytelen_type(src, dst)?;
1095            }
1096
1097            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1098                Self::decode_bytelen_type(src, dst)?;
1099            }
1100
1101            // Old-style byte-length strings
1102            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
1103                Self::decode_bytelen_type(src, dst)?;
1104            }
1105
1106            // 2-byte length strings (or PLP for MAX types)
1107            TypeId::BigVarChar | TypeId::BigVarBinary => {
1108                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
1109                if col.type_info.max_length == Some(0xFFFF) {
1110                    Self::decode_plp_type(src, dst)?;
1111                } else {
1112                    Self::decode_ushortlen_type(src, dst)?;
1113                }
1114            }
1115
1116            // Fixed-length types that don't have MAX variants
1117            TypeId::BigChar | TypeId::BigBinary => {
1118                Self::decode_ushortlen_type(src, dst)?;
1119            }
1120
1121            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
1122            TypeId::NVarChar => {
1123                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
1124                if col.type_info.max_length == Some(0xFFFF) {
1125                    Self::decode_plp_type(src, dst)?;
1126                } else {
1127                    Self::decode_ushortlen_type(src, dst)?;
1128                }
1129            }
1130
1131            // Fixed-length NCHAR doesn't have MAX variant
1132            TypeId::NChar => {
1133                Self::decode_ushortlen_type(src, dst)?;
1134            }
1135
1136            // Time types with scale
1137            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
1138                Self::decode_bytelen_type(src, dst)?;
1139            }
1140
1141            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
1142            TypeId::Text | TypeId::NText | TypeId::Image => {
1143                Self::decode_textptr_type(src, dst)?;
1144            }
1145
1146            // XML - uses actual PLP format
1147            TypeId::Xml => {
1148                Self::decode_plp_type(src, dst)?;
1149            }
1150
1151            // Complex types
1152            TypeId::Variant => {
1153                Self::decode_intlen_type(src, dst)?;
1154            }
1155
1156            TypeId::Udt => {
1157                // UDT uses PLP encoding
1158                Self::decode_plp_type(src, dst)?;
1159            }
1160
1161            TypeId::Tvp => {
1162                // TVP not supported in row data
1163                return Err(ProtocolError::InvalidTokenType(col.col_type));
1164            }
1165        }
1166
1167        Ok(())
1168    }
1169
1170    /// Decode a 1-byte length-prefixed value.
1171    fn decode_bytelen_type(
1172        src: &mut impl Buf,
1173        dst: &mut bytes::BytesMut,
1174    ) -> Result<(), ProtocolError> {
1175        if src.remaining() < 1 {
1176            return Err(ProtocolError::UnexpectedEof);
1177        }
1178        let len = src.get_u8() as usize;
1179        if len == 0xFF {
1180            // NULL value - store as zero-length with NULL marker
1181            dst.extend_from_slice(&[0xFF]);
1182        } else if len == 0 {
1183            // Empty value
1184            dst.extend_from_slice(&[0x00]);
1185        } else {
1186            if src.remaining() < len {
1187                return Err(ProtocolError::UnexpectedEof);
1188            }
1189            dst.extend_from_slice(&[len as u8]);
1190            for _ in 0..len {
1191                dst.extend_from_slice(&[src.get_u8()]);
1192            }
1193        }
1194        Ok(())
1195    }
1196
1197    /// Decode a 2-byte length-prefixed value.
1198    fn decode_ushortlen_type(
1199        src: &mut impl Buf,
1200        dst: &mut bytes::BytesMut,
1201    ) -> Result<(), ProtocolError> {
1202        if src.remaining() < 2 {
1203            return Err(ProtocolError::UnexpectedEof);
1204        }
1205        let len = src.get_u16_le() as usize;
1206        if len == 0xFFFF {
1207            // NULL value
1208            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1209        } else if len == 0 {
1210            // Empty value
1211            dst.extend_from_slice(&0u16.to_le_bytes());
1212        } else {
1213            if src.remaining() < len {
1214                return Err(ProtocolError::UnexpectedEof);
1215            }
1216            dst.extend_from_slice(&(len as u16).to_le_bytes());
1217            for _ in 0..len {
1218                dst.extend_from_slice(&[src.get_u8()]);
1219            }
1220        }
1221        Ok(())
1222    }
1223
1224    /// Decode a 4-byte length-prefixed value.
1225    fn decode_intlen_type(
1226        src: &mut impl Buf,
1227        dst: &mut bytes::BytesMut,
1228    ) -> Result<(), ProtocolError> {
1229        if src.remaining() < 4 {
1230            return Err(ProtocolError::UnexpectedEof);
1231        }
1232        let len = src.get_u32_le() as usize;
1233        if len == 0xFFFFFFFF {
1234            // NULL value
1235            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1236        } else if len == 0 {
1237            // Empty value
1238            dst.extend_from_slice(&0u32.to_le_bytes());
1239        } else {
1240            if src.remaining() < len {
1241                return Err(ProtocolError::UnexpectedEof);
1242            }
1243            dst.extend_from_slice(&(len as u32).to_le_bytes());
1244            for _ in 0..len {
1245                dst.extend_from_slice(&[src.get_u8()]);
1246            }
1247        }
1248        Ok(())
1249    }
1250
1251    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1252    ///
1253    /// These deprecated LOB types use a special format:
1254    /// - 1 byte: textptr_len (0 = NULL)
1255    /// - textptr_len bytes: textptr (if not NULL)
1256    /// - 8 bytes: timestamp (if not NULL)
1257    /// - 4 bytes: data length (if not NULL)
1258    /// - data_len bytes: the actual data (if not NULL)
1259    ///
1260    /// We convert this to PLP format for the client to parse:
1261    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1262    /// - 4 bytes: chunk length (= data length)
1263    /// - chunk data
1264    /// - 4 bytes: 0 (terminator)
1265    fn decode_textptr_type(
1266        src: &mut impl Buf,
1267        dst: &mut bytes::BytesMut,
1268    ) -> Result<(), ProtocolError> {
1269        if src.remaining() < 1 {
1270            return Err(ProtocolError::UnexpectedEof);
1271        }
1272
1273        let textptr_len = src.get_u8() as usize;
1274
1275        if textptr_len == 0 {
1276            // NULL value - write PLP NULL marker
1277            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1278            return Ok(());
1279        }
1280
1281        // Skip textptr bytes
1282        if src.remaining() < textptr_len {
1283            return Err(ProtocolError::UnexpectedEof);
1284        }
1285        src.advance(textptr_len);
1286
1287        // Skip 8-byte timestamp
1288        if src.remaining() < 8 {
1289            return Err(ProtocolError::UnexpectedEof);
1290        }
1291        src.advance(8);
1292
1293        // Read data length
1294        if src.remaining() < 4 {
1295            return Err(ProtocolError::UnexpectedEof);
1296        }
1297        let data_len = src.get_u32_le() as usize;
1298
1299        if src.remaining() < data_len {
1300            return Err(ProtocolError::UnexpectedEof);
1301        }
1302
1303        // Write in PLP format for client parsing:
1304        // - 8 bytes: total length
1305        // - 4 bytes: chunk length
1306        // - chunk data
1307        // - 4 bytes: 0 (terminator)
1308        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1309        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1310        for _ in 0..data_len {
1311            dst.extend_from_slice(&[src.get_u8()]);
1312        }
1313        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1314
1315        Ok(())
1316    }
1317
1318    /// Decode a PLP (Partially Length-Prefixed) value.
1319    ///
1320    /// PLP format:
1321    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1322    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1323    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1324        if src.remaining() < 8 {
1325            return Err(ProtocolError::UnexpectedEof);
1326        }
1327
1328        let total_len = src.get_u64_le();
1329
1330        // Store the total length marker
1331        dst.extend_from_slice(&total_len.to_le_bytes());
1332
1333        if total_len == 0xFFFFFFFFFFFFFFFF {
1334            // NULL value - no more data
1335            return Ok(());
1336        }
1337
1338        // Read chunks until terminator
1339        loop {
1340            if src.remaining() < 4 {
1341                return Err(ProtocolError::UnexpectedEof);
1342            }
1343            let chunk_len = src.get_u32_le() as usize;
1344            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1345
1346            if chunk_len == 0 {
1347                // End of PLP data
1348                break;
1349            }
1350
1351            if src.remaining() < chunk_len {
1352                return Err(ProtocolError::UnexpectedEof);
1353            }
1354
1355            for _ in 0..chunk_len {
1356                dst.extend_from_slice(&[src.get_u8()]);
1357            }
1358        }
1359
1360        Ok(())
1361    }
1362}
1363
1364// =============================================================================
1365// NbcRow Parsing Implementation
1366// =============================================================================
1367
1368impl NbcRow {
1369    /// Decode an NBCROW token from bytes.
1370    ///
1371    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1372    /// columns are NULL, followed by only the non-NULL values.
1373    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1374        let col_count = metadata.columns.len();
1375        let bitmap_len = col_count.div_ceil(8);
1376
1377        if src.remaining() < bitmap_len {
1378            return Err(ProtocolError::UnexpectedEof);
1379        }
1380
1381        // Read null bitmap
1382        let mut null_bitmap = vec![0u8; bitmap_len];
1383        for byte in &mut null_bitmap {
1384            *byte = src.get_u8();
1385        }
1386
1387        // Read non-null values
1388        let mut data = bytes::BytesMut::new();
1389
1390        for (i, col) in metadata.columns.iter().enumerate() {
1391            let byte_idx = i / 8;
1392            let bit_idx = i % 8;
1393            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1394
1395            if !is_null {
1396                // Read the value - for NBCROW, we read without the length prefix
1397                // for fixed-length types, and with length prefix for variable types
1398                RawRow::decode_column_value(src, col, &mut data)?;
1399            }
1400        }
1401
1402        Ok(Self {
1403            null_bitmap,
1404            data: data.freeze(),
1405        })
1406    }
1407
1408    /// Check if a column at the given index is NULL.
1409    #[must_use]
1410    pub fn is_null(&self, column_index: usize) -> bool {
1411        let byte_idx = column_index / 8;
1412        let bit_idx = column_index % 8;
1413        if byte_idx < self.null_bitmap.len() {
1414            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1415        } else {
1416            true // Out of bounds = NULL
1417        }
1418    }
1419}
1420
1421// =============================================================================
1422// ReturnValue Parsing Implementation
1423// =============================================================================
1424
1425impl ReturnValue {
1426    /// Decode a RETURNVALUE token from bytes.
1427    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1428        // Length (2 bytes)
1429        if src.remaining() < 2 {
1430            return Err(ProtocolError::UnexpectedEof);
1431        }
1432        let _length = src.get_u16_le();
1433
1434        // Parameter ordinal (2 bytes)
1435        if src.remaining() < 2 {
1436            return Err(ProtocolError::UnexpectedEof);
1437        }
1438        let param_ordinal = src.get_u16_le();
1439
1440        // Parameter name (B_VARCHAR)
1441        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1442
1443        // Status (1 byte)
1444        if src.remaining() < 1 {
1445            return Err(ProtocolError::UnexpectedEof);
1446        }
1447        let status = src.get_u8();
1448
1449        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1450        if src.remaining() < 7 {
1451            return Err(ProtocolError::UnexpectedEof);
1452        }
1453        let user_type = src.get_u32_le();
1454        let flags = src.get_u16_le();
1455        let col_type = src.get_u8();
1456
1457        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
1458
1459        // Parse type info
1460        let type_info = decode_type_info(src, type_id, col_type)?;
1461
1462        // Read the value data
1463        let mut value_buf = bytes::BytesMut::new();
1464
1465        // Create a temporary column for value parsing
1466        let temp_col = ColumnData {
1467            name: String::new(),
1468            type_id,
1469            col_type,
1470            flags,
1471            user_type,
1472            type_info: type_info.clone(),
1473            crypto_metadata: None,
1474        };
1475
1476        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1477
1478        Ok(Self {
1479            param_ordinal,
1480            param_name,
1481            status,
1482            user_type,
1483            flags,
1484            col_type,
1485            type_info,
1486            value: value_buf.freeze(),
1487        })
1488    }
1489}
1490
1491// =============================================================================
1492// SessionState Parsing Implementation
1493// =============================================================================
1494
1495impl SessionState {
1496    /// Decode a SESSIONSTATE token from bytes.
1497    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1498        if src.remaining() < 4 {
1499            return Err(ProtocolError::UnexpectedEof);
1500        }
1501
1502        let length = src.get_u32_le() as usize;
1503
1504        if src.remaining() < length {
1505            return Err(ProtocolError::IncompletePacket {
1506                expected: length,
1507                actual: src.remaining(),
1508            });
1509        }
1510
1511        let data = src.copy_to_bytes(length);
1512
1513        Ok(Self { data })
1514    }
1515}
1516
1517// =============================================================================
1518// Token Parsing Implementation
1519// =============================================================================
1520
1521/// Done token status flags bit positions.
1522mod done_status_bits {
1523    pub const DONE_MORE: u16 = 0x0001;
1524    pub const DONE_ERROR: u16 = 0x0002;
1525    pub const DONE_INXACT: u16 = 0x0004;
1526    pub const DONE_COUNT: u16 = 0x0010;
1527    pub const DONE_ATTN: u16 = 0x0020;
1528    pub const DONE_SRVERROR: u16 = 0x0100;
1529}
1530
1531impl DoneStatus {
1532    /// Parse done status from raw bits.
1533    #[must_use]
1534    pub fn from_bits(bits: u16) -> Self {
1535        use done_status_bits::*;
1536        Self {
1537            more: (bits & DONE_MORE) != 0,
1538            error: (bits & DONE_ERROR) != 0,
1539            in_xact: (bits & DONE_INXACT) != 0,
1540            count: (bits & DONE_COUNT) != 0,
1541            attn: (bits & DONE_ATTN) != 0,
1542            srverror: (bits & DONE_SRVERROR) != 0,
1543        }
1544    }
1545
1546    /// Convert to raw bits.
1547    #[must_use]
1548    pub fn to_bits(&self) -> u16 {
1549        use done_status_bits::*;
1550        let mut bits = 0u16;
1551        if self.more {
1552            bits |= DONE_MORE;
1553        }
1554        if self.error {
1555            bits |= DONE_ERROR;
1556        }
1557        if self.in_xact {
1558            bits |= DONE_INXACT;
1559        }
1560        if self.count {
1561            bits |= DONE_COUNT;
1562        }
1563        if self.attn {
1564            bits |= DONE_ATTN;
1565        }
1566        if self.srverror {
1567            bits |= DONE_SRVERROR;
1568        }
1569        bits
1570    }
1571}
1572
1573impl Done {
1574    /// Size of the DONE token in bytes (excluding token type byte).
1575    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1576
1577    /// Decode a DONE token from bytes.
1578    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1579        if src.remaining() < Self::SIZE {
1580            return Err(ProtocolError::IncompletePacket {
1581                expected: Self::SIZE,
1582                actual: src.remaining(),
1583            });
1584        }
1585
1586        let status = DoneStatus::from_bits(src.get_u16_le());
1587        let cur_cmd = src.get_u16_le();
1588        let row_count = src.get_u64_le();
1589
1590        Ok(Self {
1591            status,
1592            cur_cmd,
1593            row_count,
1594        })
1595    }
1596
1597    /// Encode the DONE token to bytes.
1598    pub fn encode(&self, dst: &mut impl BufMut) {
1599        dst.put_u8(TokenType::Done as u8);
1600        dst.put_u16_le(self.status.to_bits());
1601        dst.put_u16_le(self.cur_cmd);
1602        dst.put_u64_le(self.row_count);
1603    }
1604
1605    /// Check if more results follow this DONE token.
1606    #[must_use]
1607    pub const fn has_more(&self) -> bool {
1608        self.status.more
1609    }
1610
1611    /// Check if an error occurred.
1612    #[must_use]
1613    pub const fn has_error(&self) -> bool {
1614        self.status.error
1615    }
1616
1617    /// Check if the row count is valid.
1618    #[must_use]
1619    pub const fn has_count(&self) -> bool {
1620        self.status.count
1621    }
1622}
1623
1624impl DoneProc {
1625    /// Size of the DONEPROC token in bytes (excluding token type byte).
1626    pub const SIZE: usize = 12;
1627
1628    /// Decode a DONEPROC token from bytes.
1629    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1630        if src.remaining() < Self::SIZE {
1631            return Err(ProtocolError::IncompletePacket {
1632                expected: Self::SIZE,
1633                actual: src.remaining(),
1634            });
1635        }
1636
1637        let status = DoneStatus::from_bits(src.get_u16_le());
1638        let cur_cmd = src.get_u16_le();
1639        let row_count = src.get_u64_le();
1640
1641        Ok(Self {
1642            status,
1643            cur_cmd,
1644            row_count,
1645        })
1646    }
1647
1648    /// Encode the DONEPROC token to bytes.
1649    pub fn encode(&self, dst: &mut impl BufMut) {
1650        dst.put_u8(TokenType::DoneProc as u8);
1651        dst.put_u16_le(self.status.to_bits());
1652        dst.put_u16_le(self.cur_cmd);
1653        dst.put_u64_le(self.row_count);
1654    }
1655}
1656
1657impl DoneInProc {
1658    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1659    pub const SIZE: usize = 12;
1660
1661    /// Decode a DONEINPROC token from bytes.
1662    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1663        if src.remaining() < Self::SIZE {
1664            return Err(ProtocolError::IncompletePacket {
1665                expected: Self::SIZE,
1666                actual: src.remaining(),
1667            });
1668        }
1669
1670        let status = DoneStatus::from_bits(src.get_u16_le());
1671        let cur_cmd = src.get_u16_le();
1672        let row_count = src.get_u64_le();
1673
1674        Ok(Self {
1675            status,
1676            cur_cmd,
1677            row_count,
1678        })
1679    }
1680
1681    /// Encode the DONEINPROC token to bytes.
1682    pub fn encode(&self, dst: &mut impl BufMut) {
1683        dst.put_u8(TokenType::DoneInProc as u8);
1684        dst.put_u16_le(self.status.to_bits());
1685        dst.put_u16_le(self.cur_cmd);
1686        dst.put_u64_le(self.row_count);
1687    }
1688}
1689
1690impl ServerError {
1691    /// Decode an ERROR token from bytes.
1692    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1693        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1694        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1695        if src.remaining() < 2 {
1696            return Err(ProtocolError::UnexpectedEof);
1697        }
1698
1699        let _length = src.get_u16_le();
1700
1701        if src.remaining() < 6 {
1702            return Err(ProtocolError::UnexpectedEof);
1703        }
1704
1705        let number = src.get_i32_le();
1706        let state = src.get_u8();
1707        let class = src.get_u8();
1708
1709        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1710        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1711        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1712
1713        if src.remaining() < 4 {
1714            return Err(ProtocolError::UnexpectedEof);
1715        }
1716        let line = src.get_i32_le();
1717
1718        Ok(Self {
1719            number,
1720            state,
1721            class,
1722            message,
1723            server,
1724            procedure,
1725            line,
1726        })
1727    }
1728
1729    /// Check if this is a fatal error (severity >= 20).
1730    #[must_use]
1731    pub const fn is_fatal(&self) -> bool {
1732        self.class >= 20
1733    }
1734
1735    /// Check if this error indicates the batch was aborted (severity >= 16).
1736    #[must_use]
1737    pub const fn is_batch_abort(&self) -> bool {
1738        self.class >= 16
1739    }
1740}
1741
1742impl ServerInfo {
1743    /// Decode an INFO token from bytes.
1744    ///
1745    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1746    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1747        if src.remaining() < 2 {
1748            return Err(ProtocolError::UnexpectedEof);
1749        }
1750
1751        let _length = src.get_u16_le();
1752
1753        if src.remaining() < 6 {
1754            return Err(ProtocolError::UnexpectedEof);
1755        }
1756
1757        let number = src.get_i32_le();
1758        let state = src.get_u8();
1759        let class = src.get_u8();
1760
1761        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1762        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1763        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1764
1765        if src.remaining() < 4 {
1766            return Err(ProtocolError::UnexpectedEof);
1767        }
1768        let line = src.get_i32_le();
1769
1770        Ok(Self {
1771            number,
1772            state,
1773            class,
1774            message,
1775            server,
1776            procedure,
1777            line,
1778        })
1779    }
1780}
1781
1782impl LoginAck {
1783    /// Decode a LOGINACK token from bytes.
1784    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1785        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1786        if src.remaining() < 2 {
1787            return Err(ProtocolError::UnexpectedEof);
1788        }
1789
1790        let _length = src.get_u16_le();
1791
1792        if src.remaining() < 5 {
1793            return Err(ProtocolError::UnexpectedEof);
1794        }
1795
1796        let interface = src.get_u8();
1797        let tds_version = src.get_u32_le();
1798        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1799
1800        if src.remaining() < 4 {
1801            return Err(ProtocolError::UnexpectedEof);
1802        }
1803        let prog_version = src.get_u32_le();
1804
1805        Ok(Self {
1806            interface,
1807            tds_version,
1808            prog_name,
1809            prog_version,
1810        })
1811    }
1812
1813    /// Get the TDS version as a `TdsVersion`.
1814    #[must_use]
1815    pub fn tds_version(&self) -> crate::version::TdsVersion {
1816        crate::version::TdsVersion::new(self.tds_version)
1817    }
1818}
1819
1820impl EnvChangeType {
1821    /// Create from raw byte value.
1822    pub fn from_u8(value: u8) -> Option<Self> {
1823        match value {
1824            1 => Some(Self::Database),
1825            2 => Some(Self::Language),
1826            3 => Some(Self::CharacterSet),
1827            4 => Some(Self::PacketSize),
1828            5 => Some(Self::UnicodeSortingLocalId),
1829            6 => Some(Self::UnicodeComparisonFlags),
1830            7 => Some(Self::SqlCollation),
1831            8 => Some(Self::BeginTransaction),
1832            9 => Some(Self::CommitTransaction),
1833            10 => Some(Self::RollbackTransaction),
1834            11 => Some(Self::EnlistDtcTransaction),
1835            12 => Some(Self::DefectTransaction),
1836            13 => Some(Self::RealTimeLogShipping),
1837            15 => Some(Self::PromoteTransaction),
1838            16 => Some(Self::TransactionManagerAddress),
1839            17 => Some(Self::TransactionEnded),
1840            18 => Some(Self::ResetConnectionCompletionAck),
1841            19 => Some(Self::UserInstanceStarted),
1842            20 => Some(Self::Routing),
1843            _ => None,
1844        }
1845    }
1846}
1847
1848impl EnvChange {
1849    /// Decode an ENVCHANGE token from bytes.
1850    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1851        if src.remaining() < 3 {
1852            return Err(ProtocolError::UnexpectedEof);
1853        }
1854
1855        let length = src.get_u16_le() as usize;
1856        if src.remaining() < length {
1857            return Err(ProtocolError::IncompletePacket {
1858                expected: length,
1859                actual: src.remaining(),
1860            });
1861        }
1862
1863        let env_type_byte = src.get_u8();
1864        let env_type = EnvChangeType::from_u8(env_type_byte)
1865            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
1866
1867        let (new_value, old_value) = match env_type {
1868            EnvChangeType::Routing => {
1869                // Routing has special format
1870                let new_value = Self::decode_routing_value(src)?;
1871                let old_value = EnvChangeValue::Binary(Bytes::new());
1872                (new_value, old_value)
1873            }
1874            EnvChangeType::BeginTransaction
1875            | EnvChangeType::CommitTransaction
1876            | EnvChangeType::RollbackTransaction
1877            | EnvChangeType::EnlistDtcTransaction
1878            | EnvChangeType::SqlCollation => {
1879                // These use binary format per MS-TDS spec:
1880                // - Transaction tokens: transaction descriptor (8 bytes)
1881                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
1882                let new_len = src.get_u8() as usize;
1883                let new_value = if new_len > 0 && src.remaining() >= new_len {
1884                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
1885                } else {
1886                    EnvChangeValue::Binary(Bytes::new())
1887                };
1888
1889                let old_len = src.get_u8() as usize;
1890                let old_value = if old_len > 0 && src.remaining() >= old_len {
1891                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
1892                } else {
1893                    EnvChangeValue::Binary(Bytes::new())
1894                };
1895
1896                (new_value, old_value)
1897            }
1898            _ => {
1899                // String format for most env changes
1900                let new_value = read_b_varchar(src)
1901                    .map(EnvChangeValue::String)
1902                    .unwrap_or(EnvChangeValue::String(String::new()));
1903
1904                let old_value = read_b_varchar(src)
1905                    .map(EnvChangeValue::String)
1906                    .unwrap_or(EnvChangeValue::String(String::new()));
1907
1908                (new_value, old_value)
1909            }
1910        };
1911
1912        Ok(Self {
1913            env_type,
1914            new_value,
1915            old_value,
1916        })
1917    }
1918
1919    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
1920        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
1921        if src.remaining() < 2 {
1922            return Err(ProtocolError::UnexpectedEof);
1923        }
1924
1925        let _routing_len = src.get_u16_le();
1926
1927        if src.remaining() < 5 {
1928            return Err(ProtocolError::UnexpectedEof);
1929        }
1930
1931        let _protocol = src.get_u8();
1932        let port = src.get_u16_le();
1933        let server_len = src.get_u16_le() as usize;
1934
1935        // Read UTF-16LE server name
1936        if src.remaining() < server_len * 2 {
1937            return Err(ProtocolError::UnexpectedEof);
1938        }
1939
1940        let mut chars = Vec::with_capacity(server_len);
1941        for _ in 0..server_len {
1942            chars.push(src.get_u16_le());
1943        }
1944
1945        let host = String::from_utf16(&chars).map_err(|_| {
1946            ProtocolError::StringEncoding(
1947                #[cfg(feature = "std")]
1948                "invalid UTF-16 in routing hostname".to_string(),
1949                #[cfg(not(feature = "std"))]
1950                "invalid UTF-16 in routing hostname",
1951            )
1952        })?;
1953
1954        Ok(EnvChangeValue::Routing { host, port })
1955    }
1956
1957    /// Check if this is a routing redirect.
1958    #[must_use]
1959    pub fn is_routing(&self) -> bool {
1960        self.env_type == EnvChangeType::Routing
1961    }
1962
1963    /// Get routing information if this is a routing change.
1964    #[must_use]
1965    pub fn routing_info(&self) -> Option<(&str, u16)> {
1966        if let EnvChangeValue::Routing { host, port } = &self.new_value {
1967            Some((host, *port))
1968        } else {
1969            None
1970        }
1971    }
1972
1973    /// Get the new database name if this is a database change.
1974    #[must_use]
1975    pub fn new_database(&self) -> Option<&str> {
1976        if self.env_type == EnvChangeType::Database {
1977            if let EnvChangeValue::String(s) = &self.new_value {
1978                return Some(s);
1979            }
1980        }
1981        None
1982    }
1983}
1984
1985impl Order {
1986    /// Decode an ORDER token from bytes.
1987    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1988        if src.remaining() < 2 {
1989            return Err(ProtocolError::UnexpectedEof);
1990        }
1991
1992        let length = src.get_u16_le() as usize;
1993        let column_count = length / 2;
1994
1995        if src.remaining() < length {
1996            return Err(ProtocolError::IncompletePacket {
1997                expected: length,
1998                actual: src.remaining(),
1999            });
2000        }
2001
2002        let mut columns = Vec::with_capacity(column_count);
2003        for _ in 0..column_count {
2004            columns.push(src.get_u16_le());
2005        }
2006
2007        Ok(Self { columns })
2008    }
2009}
2010
2011impl FeatureExtAck {
2012    /// Feature terminator byte.
2013    pub const TERMINATOR: u8 = 0xFF;
2014
2015    /// Decode a FEATUREEXTACK token from bytes.
2016    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2017        let mut features = Vec::new();
2018
2019        loop {
2020            if !src.has_remaining() {
2021                return Err(ProtocolError::UnexpectedEof);
2022            }
2023
2024            let feature_id = src.get_u8();
2025            if feature_id == Self::TERMINATOR {
2026                break;
2027            }
2028
2029            if src.remaining() < 4 {
2030                return Err(ProtocolError::UnexpectedEof);
2031            }
2032
2033            let data_len = src.get_u32_le() as usize;
2034
2035            if src.remaining() < data_len {
2036                return Err(ProtocolError::IncompletePacket {
2037                    expected: data_len,
2038                    actual: src.remaining(),
2039                });
2040            }
2041
2042            let data = src.copy_to_bytes(data_len);
2043            features.push(FeatureAck { feature_id, data });
2044        }
2045
2046        Ok(Self { features })
2047    }
2048}
2049
2050impl SspiToken {
2051    /// Decode an SSPI token from bytes.
2052    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2053        if src.remaining() < 2 {
2054            return Err(ProtocolError::UnexpectedEof);
2055        }
2056
2057        let length = src.get_u16_le() as usize;
2058
2059        if src.remaining() < length {
2060            return Err(ProtocolError::IncompletePacket {
2061                expected: length,
2062                actual: src.remaining(),
2063            });
2064        }
2065
2066        let data = src.copy_to_bytes(length);
2067        Ok(Self { data })
2068    }
2069}
2070
2071impl FedAuthInfo {
2072    /// Decode a FEDAUTHINFO token from bytes.
2073    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
2074        if src.remaining() < 4 {
2075            return Err(ProtocolError::UnexpectedEof);
2076        }
2077
2078        let _length = src.get_u32_le();
2079
2080        if src.remaining() < 5 {
2081            return Err(ProtocolError::UnexpectedEof);
2082        }
2083
2084        let _count = src.get_u8();
2085
2086        // Read option data
2087        let mut sts_url = String::new();
2088        let mut spn = String::new();
2089
2090        // Parse info options until we have both
2091        while src.has_remaining() {
2092            if src.remaining() < 9 {
2093                break;
2094            }
2095
2096            let info_id = src.get_u8();
2097            let info_len = src.get_u32_le() as usize;
2098            let _info_offset = src.get_u32_le();
2099
2100            if src.remaining() < info_len {
2101                break;
2102            }
2103
2104            // Read UTF-16LE string
2105            let char_count = info_len / 2;
2106            let mut chars = Vec::with_capacity(char_count);
2107            for _ in 0..char_count {
2108                chars.push(src.get_u16_le());
2109            }
2110
2111            if let Ok(value) = String::from_utf16(&chars) {
2112                match info_id {
2113                    0x01 => spn = value,
2114                    0x02 => sts_url = value,
2115                    _ => {}
2116                }
2117            }
2118        }
2119
2120        Ok(Self { sts_url, spn })
2121    }
2122}
2123
2124// =============================================================================
2125// Token Parser
2126// =============================================================================
2127
2128/// Token stream parser.
2129///
2130/// Parses a stream of TDS tokens from a byte buffer.
2131///
2132/// # Basic vs Context-Aware Parsing
2133///
2134/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
2135/// Use [`next_token()`](TokenParser::next_token) for these.
2136///
2137/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
2138/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
2139/// for these.
2140///
2141/// # Example
2142///
2143/// ```rust,ignore
2144/// let mut parser = TokenParser::new(data);
2145/// let mut metadata = None;
2146///
2147/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
2148///     match token {
2149///         Token::ColMetaData(meta) => {
2150///             metadata = Some(meta);
2151///         }
2152///         Token::Row(row) => {
2153///             // Process row using metadata
2154///         }
2155///         Token::Done(done) => {
2156///             if !done.has_more() {
2157///                 break;
2158///             }
2159///         }
2160///         _ => {}
2161///     }
2162/// }
2163/// ```
2164pub struct TokenParser {
2165    data: Bytes,
2166    position: usize,
2167    /// Whether Always Encrypted was negotiated for this connection.
2168    /// When true, ColMetaData tokens are parsed with CekTable and per-column CryptoMetadata.
2169    encryption_enabled: bool,
2170}
2171
2172impl TokenParser {
2173    /// Create a new token parser from bytes.
2174    #[must_use]
2175    pub fn new(data: Bytes) -> Self {
2176        Self {
2177            data,
2178            position: 0,
2179            encryption_enabled: false,
2180        }
2181    }
2182
2183    /// Enable Always Encrypted metadata parsing.
2184    ///
2185    /// When enabled, ColMetaData tokens are parsed using the encrypted format
2186    /// which includes a CekTable and per-column CryptoMetadata.
2187    #[must_use]
2188    pub fn with_encryption(mut self, enabled: bool) -> Self {
2189        self.encryption_enabled = enabled;
2190        self
2191    }
2192
2193    /// Get remaining bytes in the buffer.
2194    #[must_use]
2195    pub fn remaining(&self) -> usize {
2196        self.data.len().saturating_sub(self.position)
2197    }
2198
2199    /// Check if there are more bytes to parse.
2200    #[must_use]
2201    pub fn has_remaining(&self) -> bool {
2202        self.position < self.data.len()
2203    }
2204
2205    /// Peek at the next token type without consuming it.
2206    #[must_use]
2207    pub fn peek_token_type(&self) -> Option<TokenType> {
2208        if self.position < self.data.len() {
2209            TokenType::from_u8(self.data[self.position])
2210        } else {
2211            None
2212        }
2213    }
2214
2215    /// Parse the next token from the stream.
2216    ///
2217    /// This method can only parse context-independent tokens. For tokens that
2218    /// require column metadata (ColMetaData, Row, NbcRow), use
2219    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2220    ///
2221    /// Returns `None` if no more tokens are available.
2222    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2223        self.next_token_with_metadata(None)
2224    }
2225
2226    /// Parse the next token with optional column metadata context.
2227    ///
2228    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2229    /// Without metadata, those tokens will return an error.
2230    ///
2231    /// Returns `None` if no more tokens are available.
2232    pub fn next_token_with_metadata(
2233        &mut self,
2234        metadata: Option<&ColMetaData>,
2235    ) -> Result<Option<Token>, ProtocolError> {
2236        if !self.has_remaining() {
2237            return Ok(None);
2238        }
2239
2240        let mut buf = &self.data[self.position..];
2241        let start_pos = self.position;
2242
2243        let token_type_byte = buf.get_u8();
2244        let token_type = TokenType::from_u8(token_type_byte);
2245
2246        let token = match token_type {
2247            Some(TokenType::Done) => {
2248                let done = Done::decode(&mut buf)?;
2249                Token::Done(done)
2250            }
2251            Some(TokenType::DoneProc) => {
2252                let done = DoneProc::decode(&mut buf)?;
2253                Token::DoneProc(done)
2254            }
2255            Some(TokenType::DoneInProc) => {
2256                let done = DoneInProc::decode(&mut buf)?;
2257                Token::DoneInProc(done)
2258            }
2259            Some(TokenType::Error) => {
2260                let error = ServerError::decode(&mut buf)?;
2261                Token::Error(error)
2262            }
2263            Some(TokenType::Info) => {
2264                let info = ServerInfo::decode(&mut buf)?;
2265                Token::Info(info)
2266            }
2267            Some(TokenType::LoginAck) => {
2268                let login_ack = LoginAck::decode(&mut buf)?;
2269                Token::LoginAck(login_ack)
2270            }
2271            Some(TokenType::EnvChange) => {
2272                let env_change = EnvChange::decode(&mut buf)?;
2273                Token::EnvChange(env_change)
2274            }
2275            Some(TokenType::Order) => {
2276                let order = Order::decode(&mut buf)?;
2277                Token::Order(order)
2278            }
2279            Some(TokenType::FeatureExtAck) => {
2280                let ack = FeatureExtAck::decode(&mut buf)?;
2281                Token::FeatureExtAck(ack)
2282            }
2283            Some(TokenType::Sspi) => {
2284                let sspi = SspiToken::decode(&mut buf)?;
2285                Token::Sspi(sspi)
2286            }
2287            Some(TokenType::FedAuthInfo) => {
2288                let info = FedAuthInfo::decode(&mut buf)?;
2289                Token::FedAuthInfo(info)
2290            }
2291            Some(TokenType::ReturnStatus) => {
2292                if buf.remaining() < 4 {
2293                    return Err(ProtocolError::UnexpectedEof);
2294                }
2295                let status = buf.get_i32_le();
2296                Token::ReturnStatus(status)
2297            }
2298            Some(TokenType::ColMetaData) => {
2299                let col_meta = if self.encryption_enabled {
2300                    ColMetaData::decode_encrypted(&mut buf)?
2301                } else {
2302                    ColMetaData::decode(&mut buf)?
2303                };
2304                Token::ColMetaData(col_meta)
2305            }
2306            Some(TokenType::Row) => {
2307                let meta = metadata.ok_or_else(|| {
2308                    ProtocolError::StringEncoding(
2309                        #[cfg(feature = "std")]
2310                        "Row token requires column metadata".to_string(),
2311                        #[cfg(not(feature = "std"))]
2312                        "Row token requires column metadata",
2313                    )
2314                })?;
2315                let row = RawRow::decode(&mut buf, meta)?;
2316                Token::Row(row)
2317            }
2318            Some(TokenType::NbcRow) => {
2319                let meta = metadata.ok_or_else(|| {
2320                    ProtocolError::StringEncoding(
2321                        #[cfg(feature = "std")]
2322                        "NbcRow token requires column metadata".to_string(),
2323                        #[cfg(not(feature = "std"))]
2324                        "NbcRow token requires column metadata",
2325                    )
2326                })?;
2327                let row = NbcRow::decode(&mut buf, meta)?;
2328                Token::NbcRow(row)
2329            }
2330            Some(TokenType::ReturnValue) => {
2331                let ret_val = ReturnValue::decode(&mut buf)?;
2332                Token::ReturnValue(ret_val)
2333            }
2334            Some(TokenType::SessionState) => {
2335                let session = SessionState::decode(&mut buf)?;
2336                Token::SessionState(session)
2337            }
2338            Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2339                // These tokens are rarely used and have complex formats.
2340                // Skip them by reading the length and advancing.
2341                if buf.remaining() < 2 {
2342                    return Err(ProtocolError::UnexpectedEof);
2343                }
2344                let length = buf.get_u16_le() as usize;
2345                if buf.remaining() < length {
2346                    return Err(ProtocolError::IncompletePacket {
2347                        expected: length,
2348                        actual: buf.remaining(),
2349                    });
2350                }
2351                // Skip the data
2352                buf.advance(length);
2353                // Recursively get the next token
2354                self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2355                return self.next_token_with_metadata(metadata);
2356            }
2357            None => {
2358                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2359            }
2360        };
2361
2362        // Update position based on how much was consumed
2363        let consumed = self.data.len() - start_pos - buf.remaining();
2364        self.position = start_pos + consumed;
2365
2366        Ok(Some(token))
2367    }
2368
2369    /// Skip the current token without fully parsing it.
2370    ///
2371    /// This is useful for skipping unknown or uninteresting tokens.
2372    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2373        if !self.has_remaining() {
2374            return Ok(());
2375        }
2376
2377        let token_type_byte = self.data[self.position];
2378        let token_type = TokenType::from_u8(token_type_byte);
2379
2380        // Calculate how many bytes to skip based on token type
2381        let skip_amount = match token_type {
2382            // Fixed-size tokens
2383            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2384                1 + Done::SIZE // token type + 12 bytes
2385            }
2386            Some(TokenType::ReturnStatus) => {
2387                1 + 4 // token type + 4 bytes
2388            }
2389            // Variable-length tokens with 2-byte length prefix
2390            Some(TokenType::Error)
2391            | Some(TokenType::Info)
2392            | Some(TokenType::LoginAck)
2393            | Some(TokenType::EnvChange)
2394            | Some(TokenType::Order)
2395            | Some(TokenType::Sspi)
2396            | Some(TokenType::ColInfo)
2397            | Some(TokenType::TabName)
2398            | Some(TokenType::Offset)
2399            | Some(TokenType::ReturnValue) => {
2400                if self.remaining() < 3 {
2401                    return Err(ProtocolError::UnexpectedEof);
2402                }
2403                let length = u16::from_le_bytes([
2404                    self.data[self.position + 1],
2405                    self.data[self.position + 2],
2406                ]) as usize;
2407                1 + 2 + length // token type + length prefix + data
2408            }
2409            // Tokens with 4-byte length prefix
2410            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2411                if self.remaining() < 5 {
2412                    return Err(ProtocolError::UnexpectedEof);
2413                }
2414                let length = u32::from_le_bytes([
2415                    self.data[self.position + 1],
2416                    self.data[self.position + 2],
2417                    self.data[self.position + 3],
2418                    self.data[self.position + 4],
2419                ]) as usize;
2420                1 + 4 + length
2421            }
2422            // FeatureExtAck has no length prefix - must parse
2423            Some(TokenType::FeatureExtAck) => {
2424                // Parse to find end
2425                let mut buf = &self.data[self.position + 1..];
2426                let _ = FeatureExtAck::decode(&mut buf)?;
2427                self.data.len() - self.position - buf.remaining()
2428            }
2429            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2430            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2431                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2432            }
2433            None => {
2434                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2435            }
2436        };
2437
2438        if self.remaining() < skip_amount {
2439            return Err(ProtocolError::UnexpectedEof);
2440        }
2441
2442        self.position += skip_amount;
2443        Ok(())
2444    }
2445
2446    /// Get the current position in the buffer.
2447    #[must_use]
2448    pub fn position(&self) -> usize {
2449        self.position
2450    }
2451
2452    /// Reset the parser to the beginning.
2453    pub fn reset(&mut self) {
2454        self.position = 0;
2455    }
2456}
2457
2458// =============================================================================
2459// Tests
2460// =============================================================================
2461
2462#[cfg(test)]
2463#[allow(clippy::unwrap_used, clippy::panic)]
2464mod tests {
2465    use super::*;
2466    use bytes::BytesMut;
2467
2468    #[test]
2469    fn test_done_roundtrip() {
2470        let done = Done {
2471            status: DoneStatus {
2472                more: false,
2473                error: false,
2474                in_xact: false,
2475                count: true,
2476                attn: false,
2477                srverror: false,
2478            },
2479            cur_cmd: 193, // SELECT
2480            row_count: 42,
2481        };
2482
2483        let mut buf = BytesMut::new();
2484        done.encode(&mut buf);
2485
2486        // Skip the token type byte
2487        let mut cursor = &buf[1..];
2488        let decoded = Done::decode(&mut cursor).unwrap();
2489
2490        assert_eq!(decoded.status.count, done.status.count);
2491        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2492        assert_eq!(decoded.row_count, done.row_count);
2493    }
2494
2495    #[test]
2496    fn test_done_status_bits() {
2497        let status = DoneStatus {
2498            more: true,
2499            error: true,
2500            in_xact: true,
2501            count: true,
2502            attn: false,
2503            srverror: false,
2504        };
2505
2506        let bits = status.to_bits();
2507        let restored = DoneStatus::from_bits(bits);
2508
2509        assert_eq!(status.more, restored.more);
2510        assert_eq!(status.error, restored.error);
2511        assert_eq!(status.in_xact, restored.in_xact);
2512        assert_eq!(status.count, restored.count);
2513    }
2514
2515    #[test]
2516    fn test_token_parser_done() {
2517        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2518        let data = Bytes::from_static(&[
2519            0xFD, // DONE token type
2520            0x10, 0x00, // status: DONE_COUNT
2521            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2522            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2523        ]);
2524
2525        let mut parser = TokenParser::new(data);
2526        let token = parser.next_token().unwrap().unwrap();
2527
2528        match token {
2529            Token::Done(done) => {
2530                assert!(done.status.count);
2531                assert!(!done.status.more);
2532                assert_eq!(done.cur_cmd, 193);
2533                assert_eq!(done.row_count, 5);
2534            }
2535            _ => panic!("Expected Done token"),
2536        }
2537
2538        // No more tokens
2539        assert!(parser.next_token().unwrap().is_none());
2540    }
2541
2542    #[test]
2543    fn test_env_change_type_from_u8() {
2544        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2545        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2546        assert_eq!(EnvChangeType::from_u8(100), None);
2547    }
2548
2549    #[test]
2550    fn test_colmetadata_no_columns() {
2551        // No metadata marker (0xFFFF)
2552        let data = Bytes::from_static(&[0xFF, 0xFF]);
2553        let mut cursor: &[u8] = &data;
2554        let meta = ColMetaData::decode(&mut cursor).unwrap();
2555        assert!(meta.is_empty());
2556        assert_eq!(meta.column_count(), 0);
2557    }
2558
2559    #[test]
2560    fn test_colmetadata_single_int_column() {
2561        // COLMETADATA with 1 INT column
2562        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
2563        let mut data = BytesMut::new();
2564        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2565        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2566        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2567        data.extend_from_slice(&[0x38]); // TypeId::Int4
2568        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
2569        data.extend_from_slice(&[0x02]); // 2 characters
2570        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
2571
2572        let mut cursor: &[u8] = &data;
2573        let meta = ColMetaData::decode(&mut cursor).unwrap();
2574
2575        assert_eq!(meta.column_count(), 1);
2576        assert_eq!(meta.columns[0].name, "id");
2577        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2578        assert!(meta.columns[0].is_nullable());
2579    }
2580
2581    #[test]
2582    fn test_colmetadata_nvarchar_column() {
2583        // COLMETADATA with 1 NVARCHAR(50) column
2584        let mut data = BytesMut::new();
2585        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2586        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2587        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2588        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
2589        // Type info: max_length (2 bytes) + collation (5 bytes)
2590        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
2591        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
2592        // Column name "name"
2593        data.extend_from_slice(&[0x04]); // 4 characters
2594        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
2595
2596        let mut cursor: &[u8] = &data;
2597        let meta = ColMetaData::decode(&mut cursor).unwrap();
2598
2599        assert_eq!(meta.column_count(), 1);
2600        assert_eq!(meta.columns[0].name, "name");
2601        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
2602        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
2603        assert!(meta.columns[0].type_info.collation.is_some());
2604    }
2605
2606    #[test]
2607    fn test_raw_row_decode_int() {
2608        // Create metadata for a single INT column
2609        let metadata = ColMetaData {
2610            cek_table: None,
2611            columns: vec![ColumnData {
2612                name: "id".to_string(),
2613                type_id: TypeId::Int4,
2614                col_type: 0x38,
2615                flags: 0,
2616                user_type: 0,
2617                type_info: TypeInfo::default(),
2618                crypto_metadata: None,
2619            }],
2620        };
2621
2622        // Row data: just 4 bytes for the int value 42
2623        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
2624        let mut cursor: &[u8] = &data;
2625        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2626
2627        // The raw data should contain the 4 bytes
2628        assert_eq!(row.data.len(), 4);
2629        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
2630    }
2631
2632    #[test]
2633    fn test_raw_row_decode_nullable_int() {
2634        // Create metadata for a nullable INT column (IntN)
2635        let metadata = ColMetaData {
2636            cek_table: None,
2637            columns: vec![ColumnData {
2638                name: "id".to_string(),
2639                type_id: TypeId::IntN,
2640                col_type: 0x26,
2641                flags: 0x01, // nullable
2642                user_type: 0,
2643                type_info: TypeInfo {
2644                    max_length: Some(4),
2645                    ..Default::default()
2646                },
2647                crypto_metadata: None,
2648            }],
2649        };
2650
2651        // Row data with value: 1 byte length + 4 bytes value
2652        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
2653        let mut cursor: &[u8] = &data;
2654        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2655
2656        assert_eq!(row.data.len(), 5);
2657        assert_eq!(row.data[0], 4); // length
2658        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
2659    }
2660
2661    #[test]
2662    fn test_raw_row_decode_null_value() {
2663        // Create metadata for a nullable INT column (IntN)
2664        let metadata = ColMetaData {
2665            cek_table: None,
2666            columns: vec![ColumnData {
2667                name: "id".to_string(),
2668                type_id: TypeId::IntN,
2669                col_type: 0x26,
2670                flags: 0x01, // nullable
2671                user_type: 0,
2672                type_info: TypeInfo {
2673                    max_length: Some(4),
2674                    ..Default::default()
2675                },
2676                crypto_metadata: None,
2677            }],
2678        };
2679
2680        // NULL value: length = 0xFF (for bytelen types)
2681        let data = Bytes::from_static(&[0xFF]);
2682        let mut cursor: &[u8] = &data;
2683        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2684
2685        assert_eq!(row.data.len(), 1);
2686        assert_eq!(row.data[0], 0xFF); // NULL marker
2687    }
2688
2689    #[test]
2690    fn test_nbcrow_null_bitmap() {
2691        let row = NbcRow {
2692            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
2693            data: Bytes::new(),
2694        };
2695
2696        assert!(row.is_null(0));
2697        assert!(!row.is_null(1));
2698        assert!(row.is_null(2));
2699        assert!(!row.is_null(3));
2700    }
2701
2702    #[test]
2703    fn test_token_parser_colmetadata() {
2704        // Build a COLMETADATA token with 1 INT column
2705        let mut data = BytesMut::new();
2706        data.extend_from_slice(&[0x81]); // COLMETADATA token type
2707        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2708        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2709        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2710        data.extend_from_slice(&[0x38]); // TypeId::Int4
2711        data.extend_from_slice(&[0x02]); // column name length
2712        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
2713
2714        let mut parser = TokenParser::new(data.freeze());
2715        let token = parser.next_token().unwrap().unwrap();
2716
2717        match token {
2718            Token::ColMetaData(meta) => {
2719                assert_eq!(meta.column_count(), 1);
2720                assert_eq!(meta.columns[0].name, "id");
2721                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2722            }
2723            _ => panic!("Expected ColMetaData token"),
2724        }
2725    }
2726
2727    #[test]
2728    fn test_token_parser_row_with_metadata() {
2729        // Build metadata
2730        let metadata = ColMetaData {
2731            cek_table: None,
2732            columns: vec![ColumnData {
2733                name: "id".to_string(),
2734                type_id: TypeId::Int4,
2735                col_type: 0x38,
2736                flags: 0,
2737                user_type: 0,
2738                type_info: TypeInfo::default(),
2739                crypto_metadata: None,
2740            }],
2741        };
2742
2743        // Build ROW token
2744        let mut data = BytesMut::new();
2745        data.extend_from_slice(&[0xD1]); // ROW token type
2746        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2747
2748        let mut parser = TokenParser::new(data.freeze());
2749        let token = parser
2750            .next_token_with_metadata(Some(&metadata))
2751            .unwrap()
2752            .unwrap();
2753
2754        match token {
2755            Token::Row(row) => {
2756                assert_eq!(row.data.len(), 4);
2757            }
2758            _ => panic!("Expected Row token"),
2759        }
2760    }
2761
2762    #[test]
2763    fn test_token_parser_row_without_metadata_fails() {
2764        // Build ROW token
2765        let mut data = BytesMut::new();
2766        data.extend_from_slice(&[0xD1]); // ROW token type
2767        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2768
2769        let mut parser = TokenParser::new(data.freeze());
2770        let result = parser.next_token(); // No metadata provided
2771
2772        assert!(result.is_err());
2773    }
2774
2775    #[test]
2776    fn test_token_parser_peek() {
2777        let data = Bytes::from_static(&[
2778            0xFD, // DONE token type
2779            0x10, 0x00, // status
2780            0xC1, 0x00, // cur_cmd
2781            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
2782        ]);
2783
2784        let parser = TokenParser::new(data);
2785        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
2786    }
2787
2788    #[test]
2789    fn test_column_data_fixed_size() {
2790        let col = ColumnData {
2791            name: String::new(),
2792            type_id: TypeId::Int4,
2793            col_type: 0x38,
2794            flags: 0,
2795            user_type: 0,
2796            type_info: TypeInfo::default(),
2797            crypto_metadata: None,
2798        };
2799        assert_eq!(col.fixed_size(), Some(4));
2800
2801        let col2 = ColumnData {
2802            name: String::new(),
2803            type_id: TypeId::NVarChar,
2804            col_type: 0xE7,
2805            flags: 0,
2806            user_type: 0,
2807            type_info: TypeInfo::default(),
2808            crypto_metadata: None,
2809        };
2810        assert_eq!(col2.fixed_size(), None);
2811    }
2812
2813    // ========================================================================
2814    // End-to-End Decode Tests (Wire → Stored → Verification)
2815    // ========================================================================
2816    //
2817    // These tests verify that RawRow::decode_column_value correctly stores
2818    // column values in a format that can be parsed back.
2819
2820    #[test]
2821    fn test_decode_nvarchar_then_intn_roundtrip() {
2822        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
2823        // This tests the scenario from the MCP parameterized query
2824
2825        // Build wire data (what the server sends)
2826        let mut wire_data = BytesMut::new();
2827
2828        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
2829        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
2830        let word = "World";
2831        let utf16: Vec<u16> = word.encode_utf16().collect();
2832        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
2833        for code_unit in &utf16 {
2834            wire_data.put_u16_le(*code_unit);
2835        }
2836
2837        // Column 1: IntN 42 - 1-byte length prefix
2838        wire_data.put_u8(4); // 4 bytes for INT
2839        wire_data.put_i32_le(42);
2840
2841        // Build column metadata
2842        let metadata = ColMetaData {
2843            cek_table: None,
2844            columns: vec![
2845                ColumnData {
2846                    name: "greeting".to_string(),
2847                    type_id: TypeId::NVarChar,
2848                    col_type: 0xE7,
2849                    flags: 0x01,
2850                    user_type: 0,
2851                    type_info: TypeInfo {
2852                        max_length: Some(10), // non-MAX
2853                        precision: None,
2854                        scale: None,
2855                        collation: None,
2856                    },
2857                    crypto_metadata: None,
2858                },
2859                ColumnData {
2860                    name: "number".to_string(),
2861                    type_id: TypeId::IntN,
2862                    col_type: 0x26,
2863                    flags: 0x01,
2864                    user_type: 0,
2865                    type_info: TypeInfo {
2866                        max_length: Some(4),
2867                        precision: None,
2868                        scale: None,
2869                        collation: None,
2870                    },
2871                    crypto_metadata: None,
2872                },
2873            ],
2874        };
2875
2876        // Decode the wire data into stored format
2877        let mut wire_cursor = wire_data.freeze();
2878        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2879
2880        // Verify wire data was fully consumed
2881        assert_eq!(
2882            wire_cursor.remaining(),
2883            0,
2884            "wire data should be fully consumed"
2885        );
2886
2887        // Now parse the stored data
2888        let mut stored_cursor: &[u8] = &raw_row.data;
2889
2890        // Parse column 0 (NVarChar)
2891        // Stored format for non-MAX NVarChar: [2-byte len][data]
2892        assert!(
2893            stored_cursor.remaining() >= 2,
2894            "need at least 2 bytes for length"
2895        );
2896        let len0 = stored_cursor.get_u16_le() as usize;
2897        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
2898        assert!(
2899            stored_cursor.remaining() >= len0,
2900            "need {len0} bytes for data"
2901        );
2902
2903        // Read UTF-16LE and convert to string
2904        let mut utf16_read = Vec::new();
2905        for _ in 0..(len0 / 2) {
2906            utf16_read.push(stored_cursor.get_u16_le());
2907        }
2908        let string0 = String::from_utf16(&utf16_read).unwrap();
2909        assert_eq!(string0, "World", "column 0 should be 'World'");
2910
2911        // Parse column 1 (IntN)
2912        // Stored format for IntN: [1-byte len][data]
2913        assert!(
2914            stored_cursor.remaining() >= 1,
2915            "need at least 1 byte for length"
2916        );
2917        let len1 = stored_cursor.get_u8();
2918        assert_eq!(len1, 4, "IntN length should be 4");
2919        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
2920        let int1 = stored_cursor.get_i32_le();
2921        assert_eq!(int1, 42, "column 1 should be 42");
2922
2923        // Verify stored data was fully consumed
2924        assert_eq!(
2925            stored_cursor.remaining(),
2926            0,
2927            "stored data should be fully consumed"
2928        );
2929    }
2930
2931    #[test]
2932    fn test_decode_nvarchar_max_then_intn_roundtrip() {
2933        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
2934
2935        // Build wire data for PLP NVARCHAR(MAX) + IntN
2936        let mut wire_data = BytesMut::new();
2937
2938        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
2939        // PLP: 8-byte total length, then chunks
2940        let word = "Hello";
2941        let utf16: Vec<u16> = word.encode_utf16().collect();
2942        let byte_len = (utf16.len() * 2) as u64;
2943
2944        wire_data.put_u64_le(byte_len); // total length = 10
2945        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
2946        for code_unit in &utf16 {
2947            wire_data.put_u16_le(*code_unit);
2948        }
2949        wire_data.put_u32_le(0); // terminating zero-length chunk
2950
2951        // Column 1: IntN 99
2952        wire_data.put_u8(4);
2953        wire_data.put_i32_le(99);
2954
2955        // Build metadata with MAX type
2956        let metadata = ColMetaData {
2957            cek_table: None,
2958            columns: vec![
2959                ColumnData {
2960                    name: "text".to_string(),
2961                    type_id: TypeId::NVarChar,
2962                    col_type: 0xE7,
2963                    flags: 0x01,
2964                    user_type: 0,
2965                    type_info: TypeInfo {
2966                        max_length: Some(0xFFFF), // MAX indicator
2967                        precision: None,
2968                        scale: None,
2969                        collation: None,
2970                    },
2971                    crypto_metadata: None,
2972                },
2973                ColumnData {
2974                    name: "num".to_string(),
2975                    type_id: TypeId::IntN,
2976                    col_type: 0x26,
2977                    flags: 0x01,
2978                    user_type: 0,
2979                    type_info: TypeInfo {
2980                        max_length: Some(4),
2981                        precision: None,
2982                        scale: None,
2983                        collation: None,
2984                    },
2985                    crypto_metadata: None,
2986                },
2987            ],
2988        };
2989
2990        // Decode wire data
2991        let mut wire_cursor = wire_data.freeze();
2992        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2993
2994        // Verify wire data was fully consumed
2995        assert_eq!(
2996            wire_cursor.remaining(),
2997            0,
2998            "wire data should be fully consumed"
2999        );
3000
3001        // Parse stored PLP data for column 0
3002        let mut stored_cursor: &[u8] = &raw_row.data;
3003
3004        // PLP stored format: [8-byte total][chunks...][4-byte 0]
3005        let total_len = stored_cursor.get_u64_le();
3006        assert_eq!(total_len, 10, "PLP total length should be 10");
3007
3008        let chunk_len = stored_cursor.get_u32_le();
3009        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
3010
3011        let mut utf16_read = Vec::new();
3012        for _ in 0..(chunk_len / 2) {
3013            utf16_read.push(stored_cursor.get_u16_le());
3014        }
3015        let string0 = String::from_utf16(&utf16_read).unwrap();
3016        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
3017
3018        let terminator = stored_cursor.get_u32_le();
3019        assert_eq!(terminator, 0, "PLP should end with 0");
3020
3021        // Parse IntN
3022        let len1 = stored_cursor.get_u8();
3023        assert_eq!(len1, 4);
3024        let int1 = stored_cursor.get_i32_le();
3025        assert_eq!(int1, 99, "column 1 should be 99");
3026
3027        // Verify fully consumed
3028        assert_eq!(
3029            stored_cursor.remaining(),
3030            0,
3031            "stored data should be fully consumed"
3032        );
3033    }
3034
3035    // ========================================================================
3036    // ReturnStatus Token Tests
3037    // ========================================================================
3038
3039    #[test]
3040    fn test_return_status_via_parser() {
3041        // RETURNSTATUS token: type (0x79) + value (i32 LE)
3042        let data = Bytes::from_static(&[
3043            0x79, // RETURNSTATUS token type
3044            0x00, 0x00, 0x00, 0x00, // return value = 0 (success)
3045        ]);
3046
3047        let mut parser = TokenParser::new(data);
3048        let token = parser.next_token().unwrap().unwrap();
3049
3050        match token {
3051            Token::ReturnStatus(status) => {
3052                assert_eq!(status, 0);
3053            }
3054            _ => panic!("Expected ReturnStatus token, got {token:?}"),
3055        }
3056
3057        assert!(parser.next_token().unwrap().is_none());
3058    }
3059
3060    #[test]
3061    fn test_return_status_nonzero() {
3062        // Return value = -6 (common for error returns)
3063        let mut buf = BytesMut::new();
3064        buf.put_u8(0x79); // RETURNSTATUS
3065        buf.put_i32_le(-6);
3066
3067        let mut parser = TokenParser::new(buf.freeze());
3068        let token = parser.next_token().unwrap().unwrap();
3069
3070        match token {
3071            Token::ReturnStatus(status) => {
3072                assert_eq!(status, -6);
3073            }
3074            _ => panic!("Expected ReturnStatus token"),
3075        }
3076    }
3077
3078    // ========================================================================
3079    // DoneProc Token Tests
3080    // ========================================================================
3081
3082    #[test]
3083    fn test_done_proc_roundtrip() {
3084        let done = DoneProc {
3085            status: DoneStatus {
3086                more: false,
3087                error: false,
3088                in_xact: false,
3089                count: true,
3090                attn: false,
3091                srverror: false,
3092            },
3093            cur_cmd: 0x00C6, // EXECUTE (198)
3094            row_count: 100,
3095        };
3096
3097        let mut buf = BytesMut::new();
3098        done.encode(&mut buf);
3099
3100        // Verify token type byte
3101        assert_eq!(buf[0], 0xFE);
3102
3103        // Skip token type byte and decode
3104        let mut cursor = &buf[1..];
3105        let decoded = DoneProc::decode(&mut cursor).unwrap();
3106
3107        assert!(decoded.status.count);
3108        assert!(!decoded.status.more);
3109        assert!(!decoded.status.error);
3110        assert_eq!(decoded.cur_cmd, 0x00C6);
3111        assert_eq!(decoded.row_count, 100);
3112    }
3113
3114    #[test]
3115    fn test_done_proc_via_parser() {
3116        let data = Bytes::from_static(&[
3117            0xFE, // DONEPROC token type
3118            0x00, 0x00, // status: no flags
3119            0xC6, 0x00, // cur_cmd: EXECUTE (198)
3120            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 0
3121        ]);
3122
3123        let mut parser = TokenParser::new(data);
3124        let token = parser.next_token().unwrap().unwrap();
3125
3126        match token {
3127            Token::DoneProc(done) => {
3128                assert!(!done.status.count);
3129                assert!(!done.status.more);
3130                assert_eq!(done.cur_cmd, 198);
3131                assert_eq!(done.row_count, 0);
3132            }
3133            _ => panic!("Expected DoneProc token"),
3134        }
3135    }
3136
3137    #[test]
3138    fn test_done_proc_with_error_flag() {
3139        let mut buf = BytesMut::new();
3140        buf.put_u8(0xFE); // DONEPROC
3141        buf.put_u16_le(0x0002); // status: DONE_ERROR
3142        buf.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3143        buf.put_u64_le(0); // row_count
3144
3145        let mut parser = TokenParser::new(buf.freeze());
3146        let token = parser.next_token().unwrap().unwrap();
3147
3148        match token {
3149            Token::DoneProc(done) => {
3150                assert!(done.status.error);
3151                assert!(!done.status.count);
3152                assert!(!done.status.more);
3153            }
3154            _ => panic!("Expected DoneProc token"),
3155        }
3156    }
3157
3158    // ========================================================================
3159    // DoneInProc Token Tests
3160    // ========================================================================
3161
3162    #[test]
3163    fn test_done_in_proc_roundtrip() {
3164        let done = DoneInProc {
3165            status: DoneStatus {
3166                more: true,
3167                error: false,
3168                in_xact: false,
3169                count: true,
3170                attn: false,
3171                srverror: false,
3172            },
3173            cur_cmd: 193, // SELECT
3174            row_count: 7,
3175        };
3176
3177        let mut buf = BytesMut::new();
3178        done.encode(&mut buf);
3179
3180        assert_eq!(buf[0], 0xFF);
3181
3182        let mut cursor = &buf[1..];
3183        let decoded = DoneInProc::decode(&mut cursor).unwrap();
3184
3185        assert!(decoded.status.more);
3186        assert!(decoded.status.count);
3187        assert!(!decoded.status.error);
3188        assert_eq!(decoded.cur_cmd, 193);
3189        assert_eq!(decoded.row_count, 7);
3190    }
3191
3192    #[test]
3193    fn test_done_in_proc_via_parser() {
3194        let data = Bytes::from_static(&[
3195            0xFF, // DONEINPROC token type
3196            0x11, 0x00, // status: MORE | COUNT
3197            0xC1, 0x00, // cur_cmd: SELECT (193)
3198            0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 3
3199        ]);
3200
3201        let mut parser = TokenParser::new(data);
3202        let token = parser.next_token().unwrap().unwrap();
3203
3204        match token {
3205            Token::DoneInProc(done) => {
3206                assert!(done.status.more);
3207                assert!(done.status.count);
3208                assert_eq!(done.cur_cmd, 193);
3209                assert_eq!(done.row_count, 3);
3210            }
3211            _ => panic!("Expected DoneInProc token"),
3212        }
3213    }
3214
3215    // ========================================================================
3216    // ServerError Token Tests
3217    // ========================================================================
3218
3219    #[test]
3220    fn test_server_error_decode() {
3221        // Build a realistic ERROR token (without the 0xAA type byte,
3222        // since decode() is called after the parser strips it).
3223        let mut buf = BytesMut::new();
3224
3225        // Construct the message fields first to compute length
3226        let msg_utf16: Vec<u16> = "Invalid column name 'foo'.".encode_utf16().collect();
3227        let srv_utf16: Vec<u16> = "SQLDB01".encode_utf16().collect();
3228        let proc_utf16: Vec<u16> = "".encode_utf16().collect();
3229
3230        // Length = number(4) + state(1) + class(1)
3231        //        + us_varchar(message): 2 + msg_utf16.len()*2
3232        //        + b_varchar(server): 1 + srv_utf16.len()*2
3233        //        + b_varchar(procedure): 1 + proc_utf16.len()*2
3234        //        + line(4)
3235        let length: u16 = (4
3236            + 1
3237            + 1
3238            + 2
3239            + (msg_utf16.len() * 2)
3240            + 1
3241            + (srv_utf16.len() * 2)
3242            + 1
3243            + (proc_utf16.len() * 2)
3244            + 4) as u16;
3245
3246        buf.put_u16_le(length);
3247        buf.put_i32_le(207); // error number: Invalid column
3248        buf.put_u8(1); // state
3249        buf.put_u8(16); // class (severity 16)
3250
3251        // Message (US_VARCHAR: 2-byte char count + UTF-16LE)
3252        buf.put_u16_le(msg_utf16.len() as u16);
3253        for &c in &msg_utf16 {
3254            buf.put_u16_le(c);
3255        }
3256
3257        // Server (B_VARCHAR: 1-byte char count + UTF-16LE)
3258        buf.put_u8(srv_utf16.len() as u8);
3259        for &c in &srv_utf16 {
3260            buf.put_u16_le(c);
3261        }
3262
3263        // Procedure (B_VARCHAR: empty)
3264        buf.put_u8(proc_utf16.len() as u8);
3265
3266        // Line number
3267        buf.put_i32_le(42);
3268
3269        let mut cursor = buf.freeze();
3270        let error = ServerError::decode(&mut cursor).unwrap();
3271
3272        assert_eq!(error.number, 207);
3273        assert_eq!(error.state, 1);
3274        assert_eq!(error.class, 16);
3275        assert_eq!(error.message, "Invalid column name 'foo'.");
3276        assert_eq!(error.server, "SQLDB01");
3277        assert_eq!(error.procedure, "");
3278        assert_eq!(error.line, 42);
3279    }
3280
3281    #[test]
3282    fn test_server_error_severity_helpers() {
3283        let fatal = ServerError {
3284            number: 4014,
3285            state: 1,
3286            class: 20,
3287            message: "Fatal error".to_string(),
3288            server: String::new(),
3289            procedure: String::new(),
3290            line: 0,
3291        };
3292        assert!(fatal.is_fatal());
3293        assert!(fatal.is_batch_abort());
3294
3295        let batch_abort = ServerError {
3296            number: 547,
3297            state: 0,
3298            class: 16,
3299            message: "Constraint violation".to_string(),
3300            server: String::new(),
3301            procedure: String::new(),
3302            line: 1,
3303        };
3304        assert!(!batch_abort.is_fatal());
3305        assert!(batch_abort.is_batch_abort());
3306
3307        let informational = ServerError {
3308            number: 5701,
3309            state: 2,
3310            class: 10,
3311            message: "Changed db context".to_string(),
3312            server: String::new(),
3313            procedure: String::new(),
3314            line: 0,
3315        };
3316        assert!(!informational.is_fatal());
3317        assert!(!informational.is_batch_abort());
3318    }
3319
3320    #[test]
3321    fn test_server_error_via_parser() {
3322        // Build an ERROR token with the 0xAA type byte for the parser
3323        let mut buf = BytesMut::new();
3324        buf.put_u8(0xAA); // ERROR token type
3325
3326        let msg_utf16: Vec<u16> = "Syntax error".encode_utf16().collect();
3327        let srv_utf16: Vec<u16> = "SRV".encode_utf16().collect();
3328        let proc_utf16: Vec<u16> = "sp_test".encode_utf16().collect();
3329
3330        let length: u16 = (4
3331            + 1
3332            + 1
3333            + 2
3334            + (msg_utf16.len() * 2)
3335            + 1
3336            + (srv_utf16.len() * 2)
3337            + 1
3338            + (proc_utf16.len() * 2)
3339            + 4) as u16;
3340
3341        buf.put_u16_le(length);
3342        buf.put_i32_le(102); // Syntax error
3343        buf.put_u8(1);
3344        buf.put_u8(15);
3345
3346        buf.put_u16_le(msg_utf16.len() as u16);
3347        for &c in &msg_utf16 {
3348            buf.put_u16_le(c);
3349        }
3350        buf.put_u8(srv_utf16.len() as u8);
3351        for &c in &srv_utf16 {
3352            buf.put_u16_le(c);
3353        }
3354        buf.put_u8(proc_utf16.len() as u8);
3355        for &c in &proc_utf16 {
3356            buf.put_u16_le(c);
3357        }
3358        buf.put_i32_le(5);
3359
3360        let mut parser = TokenParser::new(buf.freeze());
3361        let token = parser.next_token().unwrap().unwrap();
3362
3363        match token {
3364            Token::Error(err) => {
3365                assert_eq!(err.number, 102);
3366                assert_eq!(err.class, 15);
3367                assert_eq!(err.message, "Syntax error");
3368                assert_eq!(err.server, "SRV");
3369                assert_eq!(err.procedure, "sp_test");
3370                assert_eq!(err.line, 5);
3371            }
3372            _ => panic!("Expected Error token"),
3373        }
3374    }
3375
3376    // ========================================================================
3377    // ReturnValue Token Tests
3378    // ========================================================================
3379
3380    /// Helper: build a ReturnValue token (without the 0xAC type byte)
3381    /// for an IntN output parameter.
3382    fn build_return_value_intn(
3383        ordinal: u16,
3384        name: &str,
3385        status: u8,
3386        value: Option<i32>,
3387    ) -> BytesMut {
3388        let mut inner = BytesMut::new();
3389
3390        // param_ordinal
3391        inner.put_u16_le(ordinal);
3392
3393        // param_name (B_VARCHAR)
3394        let name_utf16: Vec<u16> = name.encode_utf16().collect();
3395        inner.put_u8(name_utf16.len() as u8);
3396        for &c in &name_utf16 {
3397            inner.put_u16_le(c);
3398        }
3399
3400        // status
3401        inner.put_u8(status);
3402
3403        // user_type (4 bytes)
3404        inner.put_u32_le(0);
3405
3406        // flags (2 bytes)
3407        inner.put_u16_le(0x0001); // nullable
3408
3409        // type_id: IntN = 0x26
3410        inner.put_u8(0x26);
3411
3412        // type_info for IntN: 1-byte max_length
3413        inner.put_u8(4);
3414
3415        // value (TYPE_VARBYTE for IntN: 1-byte length + data)
3416        match value {
3417            Some(v) => {
3418                inner.put_u8(4); // length = 4
3419                inner.put_i32_le(v);
3420            }
3421            None => {
3422                inner.put_u8(0); // length = 0 means NULL
3423            }
3424        }
3425
3426        // Now build the full token with the 2-byte length prefix
3427        let mut buf = BytesMut::new();
3428        buf.put_u16_le(inner.len() as u16);
3429        buf.extend_from_slice(&inner);
3430        buf
3431    }
3432
3433    #[test]
3434    fn test_return_value_int_output() {
3435        let buf = build_return_value_intn(1, "@result", 0x01, Some(42));
3436        let mut cursor = buf.freeze();
3437        let rv = ReturnValue::decode(&mut cursor).unwrap();
3438
3439        assert_eq!(rv.param_ordinal, 1);
3440        assert_eq!(rv.param_name, "@result");
3441        assert_eq!(rv.status, 0x01); // OUTPUT
3442        assert_eq!(rv.col_type, 0x26); // IntN
3443        assert_eq!(rv.type_info.max_length, Some(4));
3444        // Value should contain: length byte (4) + i32 LE (42)
3445        assert_eq!(rv.value.len(), 5);
3446        assert_eq!(rv.value[0], 4);
3447        assert_eq!(
3448            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3449            42
3450        );
3451    }
3452
3453    #[test]
3454    fn test_return_value_null_output() {
3455        let buf = build_return_value_intn(2, "@count", 0x01, None);
3456        let mut cursor = buf.freeze();
3457        let rv = ReturnValue::decode(&mut cursor).unwrap();
3458
3459        assert_eq!(rv.param_ordinal, 2);
3460        assert_eq!(rv.param_name, "@count");
3461        assert_eq!(rv.status, 0x01);
3462        assert_eq!(rv.col_type, 0x26);
3463        // NULL value: length byte = 0
3464        assert_eq!(rv.value.len(), 1);
3465        assert_eq!(rv.value[0], 0);
3466    }
3467
3468    #[test]
3469    fn test_return_value_udf_status() {
3470        // UDF return value has status = 0x02
3471        let buf = build_return_value_intn(0, "@RETURN_VALUE", 0x02, Some(-1));
3472        let mut cursor = buf.freeze();
3473        let rv = ReturnValue::decode(&mut cursor).unwrap();
3474
3475        assert_eq!(rv.param_ordinal, 0);
3476        assert_eq!(rv.param_name, "@RETURN_VALUE");
3477        assert_eq!(rv.status, 0x02); // UDF return value
3478        assert_eq!(rv.value[0], 4);
3479        assert_eq!(
3480            i32::from_le_bytes([rv.value[1], rv.value[2], rv.value[3], rv.value[4]]),
3481            -1
3482        );
3483    }
3484
3485    #[test]
3486    fn test_return_value_nvarchar_output() {
3487        // Build a ReturnValue for NVARCHAR(100) output parameter
3488        let mut inner = BytesMut::new();
3489
3490        // param_ordinal
3491        inner.put_u16_le(1);
3492
3493        // param_name "@name"
3494        let name_utf16: Vec<u16> = "@name".encode_utf16().collect();
3495        inner.put_u8(name_utf16.len() as u8);
3496        for &c in &name_utf16 {
3497            inner.put_u16_le(c);
3498        }
3499
3500        // status = OUTPUT
3501        inner.put_u8(0x01);
3502        // user_type
3503        inner.put_u32_le(0);
3504        // flags (nullable)
3505        inner.put_u16_le(0x0001);
3506        // type_id: NVarChar = 0xE7
3507        inner.put_u8(0xE7);
3508        // type_info for NVarChar: 2-byte max_length + 5-byte collation
3509        inner.put_u16_le(200); // max 100 chars * 2 bytes
3510        inner.put_u32_le(0x0904D000); // collation LCID
3511        inner.put_u8(0x34); // collation sort_id
3512
3513        // value: "Hello" in UTF-16LE with 2-byte length prefix
3514        let val_utf16: Vec<u16> = "Hello".encode_utf16().collect();
3515        let byte_len = (val_utf16.len() * 2) as u16;
3516        inner.put_u16_le(byte_len);
3517        for &c in &val_utf16 {
3518            inner.put_u16_le(c);
3519        }
3520
3521        // Wrap with length prefix
3522        let mut buf = BytesMut::new();
3523        buf.put_u16_le(inner.len() as u16);
3524        buf.extend_from_slice(&inner);
3525
3526        let mut cursor = buf.freeze();
3527        let rv = ReturnValue::decode(&mut cursor).unwrap();
3528
3529        assert_eq!(rv.param_ordinal, 1);
3530        assert_eq!(rv.param_name, "@name");
3531        assert_eq!(rv.status, 0x01);
3532        assert_eq!(rv.col_type, 0xE7); // NVarChar
3533        assert_eq!(rv.type_info.max_length, Some(200));
3534        assert!(rv.type_info.collation.is_some());
3535
3536        // Value: 2-byte length (10) + "Hello" in UTF-16LE
3537        assert_eq!(rv.value.len(), 12); // 2 + 10
3538        let val_len = u16::from_le_bytes([rv.value[0], rv.value[1]]);
3539        assert_eq!(val_len, 10);
3540    }
3541
3542    #[test]
3543    fn test_return_value_via_parser() {
3544        // Build a full ReturnValue token with the 0xAC type byte
3545        let mut data = BytesMut::new();
3546        data.put_u8(0xAC); // RETURNVALUE token type
3547        data.extend_from_slice(&build_return_value_intn(0, "@out", 0x01, Some(99)));
3548
3549        let mut parser = TokenParser::new(data.freeze());
3550        let token = parser.next_token().unwrap().unwrap();
3551
3552        match token {
3553            Token::ReturnValue(rv) => {
3554                assert_eq!(rv.param_name, "@out");
3555                assert_eq!(rv.param_ordinal, 0);
3556                assert_eq!(rv.status, 0x01);
3557                assert_eq!(rv.col_type, 0x26);
3558            }
3559            _ => panic!("Expected ReturnValue token"),
3560        }
3561    }
3562
3563    // ========================================================================
3564    // Multi-Token Stream Tests
3565    // ========================================================================
3566
3567    #[test]
3568    fn test_multi_token_stored_proc_response() {
3569        // Simulate a stored procedure response:
3570        // DoneInProc (result set done) → ReturnStatus → DoneProc
3571        let mut data = BytesMut::new();
3572
3573        // Token 1: DONEINPROC — result set with 3 rows
3574        data.put_u8(0xFF); // DONEINPROC
3575        data.put_u16_le(0x0010); // status: COUNT
3576        data.put_u16_le(0x00C1); // cur_cmd: SELECT
3577        data.put_u64_le(3); // row_count
3578
3579        // Token 2: RETURNSTATUS — procedure returned 0
3580        data.put_u8(0x79); // RETURNSTATUS
3581        data.put_i32_le(0);
3582
3583        // Token 3: DONEPROC — final
3584        data.put_u8(0xFE); // DONEPROC
3585        data.put_u16_le(0x0000); // status: no flags
3586        data.put_u16_le(0x00C6); // cur_cmd: EXECUTE
3587        data.put_u64_le(0);
3588
3589        let mut parser = TokenParser::new(data.freeze());
3590
3591        // Token 1: DoneInProc
3592        let t1 = parser.next_token().unwrap().unwrap();
3593        match t1 {
3594            Token::DoneInProc(done) => {
3595                assert!(done.status.count);
3596                assert_eq!(done.row_count, 3);
3597                assert_eq!(done.cur_cmd, 193);
3598            }
3599            _ => panic!("Expected DoneInProc, got {t1:?}"),
3600        }
3601
3602        // Token 2: ReturnStatus
3603        let t2 = parser.next_token().unwrap().unwrap();
3604        match t2 {
3605            Token::ReturnStatus(status) => {
3606                assert_eq!(status, 0);
3607            }
3608            _ => panic!("Expected ReturnStatus, got {t2:?}"),
3609        }
3610
3611        // Token 3: DoneProc
3612        let t3 = parser.next_token().unwrap().unwrap();
3613        match t3 {
3614            Token::DoneProc(done) => {
3615                assert!(!done.status.count);
3616                assert!(!done.status.more);
3617                assert_eq!(done.cur_cmd, 198);
3618            }
3619            _ => panic!("Expected DoneProc, got {t3:?}"),
3620        }
3621
3622        // No more tokens
3623        assert!(parser.next_token().unwrap().is_none());
3624    }
3625
3626    #[test]
3627    fn test_multi_token_error_in_stream() {
3628        // Simulate: ERROR → DONE (error during query)
3629        let mut data = BytesMut::new();
3630
3631        // Token 1: ERROR
3632        data.put_u8(0xAA);
3633
3634        let msg_utf16: Vec<u16> = "Deadlock".encode_utf16().collect();
3635        let srv_utf16: Vec<u16> = "DB1".encode_utf16().collect();
3636
3637        let length: u16 = (4 + 1 + 1
3638            + 2 + (msg_utf16.len() * 2)
3639            + 1 + (srv_utf16.len() * 2)
3640            + 1  // empty procedure
3641            + 4) as u16;
3642
3643        data.put_u16_le(length);
3644        data.put_i32_le(1205); // deadlock
3645        data.put_u8(51); // state
3646        data.put_u8(13); // class
3647
3648        data.put_u16_le(msg_utf16.len() as u16);
3649        for &c in &msg_utf16 {
3650            data.put_u16_le(c);
3651        }
3652        data.put_u8(srv_utf16.len() as u8);
3653        for &c in &srv_utf16 {
3654            data.put_u16_le(c);
3655        }
3656        data.put_u8(0); // empty procedure
3657        data.put_i32_le(0);
3658
3659        // Token 2: DONE with error flag
3660        data.put_u8(0xFD);
3661        data.put_u16_le(0x0002); // DONE_ERROR
3662        data.put_u16_le(0x00C1); // SELECT
3663        data.put_u64_le(0);
3664
3665        let mut parser = TokenParser::new(data.freeze());
3666
3667        // Token 1: Error
3668        let t1 = parser.next_token().unwrap().unwrap();
3669        match t1 {
3670            Token::Error(err) => {
3671                assert_eq!(err.number, 1205);
3672                assert_eq!(err.class, 13);
3673                assert_eq!(err.message, "Deadlock");
3674                assert_eq!(err.server, "DB1");
3675            }
3676            _ => panic!("Expected Error token, got {t1:?}"),
3677        }
3678
3679        // Token 2: Done with error
3680        let t2 = parser.next_token().unwrap().unwrap();
3681        match t2 {
3682            Token::Done(done) => {
3683                assert!(done.status.error);
3684                assert!(!done.status.count);
3685            }
3686            _ => panic!("Expected Done token, got {t2:?}"),
3687        }
3688
3689        assert!(parser.next_token().unwrap().is_none());
3690    }
3691
3692    #[test]
3693    fn test_multi_token_proc_with_return_value() {
3694        // Simulate stored proc: ReturnValue → ReturnStatus → DoneProc
3695        let mut data = BytesMut::new();
3696
3697        // Token 1: ReturnValue (@result = 42)
3698        data.put_u8(0xAC);
3699        data.extend_from_slice(&build_return_value_intn(1, "@result", 0x01, Some(42)));
3700
3701        // Token 2: ReturnStatus = 0
3702        data.put_u8(0x79);
3703        data.put_i32_le(0);
3704
3705        // Token 3: DoneProc
3706        data.put_u8(0xFE);
3707        data.put_u16_le(0x0000);
3708        data.put_u16_le(0x00C6);
3709        data.put_u64_le(0);
3710
3711        let mut parser = TokenParser::new(data.freeze());
3712
3713        let t1 = parser.next_token().unwrap().unwrap();
3714        match t1 {
3715            Token::ReturnValue(rv) => {
3716                assert_eq!(rv.param_name, "@result");
3717                assert_eq!(rv.param_ordinal, 1);
3718            }
3719            _ => panic!("Expected ReturnValue, got {t1:?}"),
3720        }
3721
3722        let t2 = parser.next_token().unwrap().unwrap();
3723        assert!(matches!(t2, Token::ReturnStatus(0)));
3724
3725        let t3 = parser.next_token().unwrap().unwrap();
3726        assert!(matches!(t3, Token::DoneProc(_)));
3727
3728        assert!(parser.next_token().unwrap().is_none());
3729    }
3730
3731    // ========================================================================
3732    // EOF / Truncation Edge Cases
3733    // ========================================================================
3734
3735    #[test]
3736    fn test_return_status_truncated() {
3737        // Only 3 bytes instead of 4 for i32
3738        let data = Bytes::from_static(&[0x79, 0x01, 0x02, 0x03]);
3739        let mut parser = TokenParser::new(data);
3740        assert!(parser.next_token().is_err());
3741    }
3742
3743    #[test]
3744    fn test_done_proc_truncated() {
3745        // Only 8 bytes instead of 12
3746        let data = Bytes::from_static(&[0xFE, 0x00, 0x00, 0xC1, 0x00, 0x01, 0x00, 0x00, 0x00]);
3747        let mut parser = TokenParser::new(data);
3748        assert!(parser.next_token().is_err());
3749    }
3750
3751    #[test]
3752    fn test_server_error_truncated() {
3753        // ERROR token with only the length field (body truncated)
3754        let data = Bytes::from_static(&[0xAA, 0x20, 0x00]);
3755        let mut parser = TokenParser::new(data);
3756        assert!(parser.next_token().is_err());
3757    }
3758}