Skip to main content

tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,ignore
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! let data: Bytes = /* received from server */;
19//! let mut parser = TokenParser::new(data);
20//!
21//! while let Some(token) = parser.next_token()? {
22//!     match token {
23//!         Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!         Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!         _ => {}
26//!     }
27//! }
28//! ```
29
30use bytes::{Buf, BufMut, Bytes};
31
32use crate::codec::{read_b_varchar, read_us_varchar};
33use crate::error::ProtocolError;
34use crate::prelude::*;
35use crate::types::TypeId;
36
37/// Token type identifier.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39#[repr(u8)]
40#[non_exhaustive]
41pub enum TokenType {
42    /// Column metadata (COLMETADATA).
43    ColMetaData = 0x81,
44    /// Error message (ERROR).
45    Error = 0xAA,
46    /// Informational message (INFO).
47    Info = 0xAB,
48    /// Login acknowledgment (LOGINACK).
49    LoginAck = 0xAD,
50    /// Row data (ROW).
51    Row = 0xD1,
52    /// Null bitmap compressed row (NBCROW).
53    NbcRow = 0xD2,
54    /// Environment change (ENVCHANGE).
55    EnvChange = 0xE3,
56    /// SSPI authentication (SSPI).
57    Sspi = 0xED,
58    /// Done (DONE).
59    Done = 0xFD,
60    /// Done in procedure (DONEINPROC).
61    DoneInProc = 0xFF,
62    /// Done procedure (DONEPROC).
63    DoneProc = 0xFE,
64    /// Return status (RETURNSTATUS).
65    ReturnStatus = 0x79,
66    /// Return value (RETURNVALUE).
67    ReturnValue = 0xAC,
68    /// Order (ORDER).
69    Order = 0xA9,
70    /// Feature extension acknowledgment (FEATUREEXTACK).
71    FeatureExtAck = 0xAE,
72    /// Session state (SESSIONSTATE).
73    SessionState = 0xE4,
74    /// Federated authentication info (FEDAUTHINFO).
75    FedAuthInfo = 0xEE,
76    /// Column info (COLINFO).
77    ColInfo = 0xA5,
78    /// Table name (TABNAME).
79    TabName = 0xA4,
80    /// Offset (OFFSET).
81    Offset = 0x78,
82}
83
84impl TokenType {
85    /// Create a token type from a raw byte.
86    pub fn from_u8(value: u8) -> Option<Self> {
87        match value {
88            0x81 => Some(Self::ColMetaData),
89            0xAA => Some(Self::Error),
90            0xAB => Some(Self::Info),
91            0xAD => Some(Self::LoginAck),
92            0xD1 => Some(Self::Row),
93            0xD2 => Some(Self::NbcRow),
94            0xE3 => Some(Self::EnvChange),
95            0xED => Some(Self::Sspi),
96            0xFD => Some(Self::Done),
97            0xFF => Some(Self::DoneInProc),
98            0xFE => Some(Self::DoneProc),
99            0x79 => Some(Self::ReturnStatus),
100            0xAC => Some(Self::ReturnValue),
101            0xA9 => Some(Self::Order),
102            0xAE => Some(Self::FeatureExtAck),
103            0xE4 => Some(Self::SessionState),
104            0xEE => Some(Self::FedAuthInfo),
105            0xA5 => Some(Self::ColInfo),
106            0xA4 => Some(Self::TabName),
107            0x78 => Some(Self::Offset),
108            _ => None,
109        }
110    }
111}
112
113/// Parsed TDS token.
114///
115/// This enum represents all possible tokens that can be received from SQL Server.
116/// Each variant contains the parsed token data.
117#[derive(Debug, Clone)]
118#[non_exhaustive]
119pub enum Token {
120    /// Column metadata describing result set structure.
121    ColMetaData(ColMetaData),
122    /// Row data.
123    Row(RawRow),
124    /// Null bitmap compressed row.
125    NbcRow(NbcRow),
126    /// Completion of a SQL statement.
127    Done(Done),
128    /// Completion of a stored procedure.
129    DoneProc(DoneProc),
130    /// Completion within a stored procedure.
131    DoneInProc(DoneInProc),
132    /// Return status from stored procedure.
133    ReturnStatus(i32),
134    /// Return value from stored procedure.
135    ReturnValue(ReturnValue),
136    /// Error message from server.
137    Error(ServerError),
138    /// Informational message from server.
139    Info(ServerInfo),
140    /// Login acknowledgment.
141    LoginAck(LoginAck),
142    /// Environment change notification.
143    EnvChange(EnvChange),
144    /// Column ordering information.
145    Order(Order),
146    /// Feature extension acknowledgment.
147    FeatureExtAck(FeatureExtAck),
148    /// SSPI authentication data.
149    Sspi(SspiToken),
150    /// Session state information.
151    SessionState(SessionState),
152    /// Federated authentication info.
153    FedAuthInfo(FedAuthInfo),
154}
155
156/// Column metadata token.
157#[derive(Debug, Clone, Default)]
158pub struct ColMetaData {
159    /// Column definitions.
160    pub columns: Vec<ColumnData>,
161}
162
163/// Column definition within metadata.
164#[derive(Debug, Clone)]
165pub struct ColumnData {
166    /// Column name.
167    pub name: String,
168    /// Column data type ID.
169    pub type_id: TypeId,
170    /// Column data type raw byte (for unknown types).
171    pub col_type: u8,
172    /// Column flags.
173    pub flags: u16,
174    /// User type ID.
175    pub user_type: u32,
176    /// Type-specific metadata.
177    pub type_info: TypeInfo,
178}
179
180/// Type-specific metadata.
181#[derive(Debug, Clone, Default)]
182pub struct TypeInfo {
183    /// Maximum length for variable-length types.
184    pub max_length: Option<u32>,
185    /// Precision for numeric types.
186    pub precision: Option<u8>,
187    /// Scale for numeric types.
188    pub scale: Option<u8>,
189    /// Collation for string types.
190    pub collation: Option<Collation>,
191}
192
193/// SQL Server collation.
194///
195/// Collations in SQL Server define the character encoding and sorting rules
196/// for string data. For `VARCHAR` columns, the collation determines which
197/// code page (character encoding) is used to store the data.
198///
199/// # Encoding Support
200///
201/// When the `encoding` feature is enabled, the [`Collation::encoding()`] method
202/// returns the appropriate [`encoding_rs::Encoding`] for decoding `VARCHAR` data.
203///
204/// # Example
205///
206/// ```rust,ignore
207/// use tds_protocol::token::Collation;
208///
209/// let collation = Collation { lcid: 0x0804, sort_id: 0 }; // Chinese (PRC)
210/// if let Some(encoding) = collation.encoding() {
211///     let (decoded, _, _) = encoding.decode(raw_bytes);
212///     // decoded is now proper Chinese text
213/// }
214/// ```
215#[derive(Debug, Clone, Copy, Default)]
216pub struct Collation {
217    /// Locale ID (LCID).
218    ///
219    /// The LCID encodes both the language and region. The lower 16 bits
220    /// contain the primary language ID, and bits 16-19 contain the sort ID
221    /// for some collations.
222    ///
223    /// For UTF-8 collations (SQL Server 2019+), bit 27 (0x0800_0000) is set.
224    pub lcid: u32,
225    /// Sort ID.
226    ///
227    /// Used with certain collations to specify sorting behavior.
228    pub sort_id: u8,
229}
230
231impl Collation {
232    /// Returns the character encoding for this collation.
233    ///
234    /// This method maps the collation's LCID to the appropriate character
235    /// encoding from the `encoding_rs` crate.
236    ///
237    /// # Returns
238    ///
239    /// - `Some(&Encoding)` - The encoding to use for decoding `VARCHAR` data
240    /// - `None` - If the collation uses UTF-8 (no transcoding needed) or
241    ///   the LCID is not recognized (caller should use Windows-1252 fallback)
242    ///
243    /// # UTF-8 Collations
244    ///
245    /// SQL Server 2019+ supports UTF-8 collations (identified by the `_UTF8`
246    /// suffix). These return `None` because no transcoding is needed.
247    ///
248    /// # Example
249    ///
250    /// ```rust,ignore
251    /// let collation = Collation { lcid: 0x0419, sort_id: 0 }; // Russian
252    /// if let Some(encoding) = collation.encoding() {
253    ///     // encoding is Windows-1251 for Cyrillic
254    ///     let (text, _, had_errors) = encoding.decode(&raw_bytes);
255    /// }
256    /// ```
257    #[cfg(feature = "encoding")]
258    pub fn encoding(&self) -> Option<&'static encoding_rs::Encoding> {
259        crate::collation::encoding_for_lcid(self.lcid)
260    }
261
262    /// Returns whether this collation uses UTF-8 encoding.
263    ///
264    /// UTF-8 collations were introduced in SQL Server 2019 and are
265    /// identified by the `_UTF8` suffix in the collation name.
266    #[cfg(feature = "encoding")]
267    pub fn is_utf8(&self) -> bool {
268        crate::collation::is_utf8_collation(self.lcid)
269    }
270
271    /// Returns the Windows code page number for this collation.
272    ///
273    /// Useful for error messages and debugging.
274    ///
275    /// # Returns
276    ///
277    /// The code page number (e.g., 1252 for Western European, 932 for Japanese).
278    #[cfg(feature = "encoding")]
279    pub fn code_page(&self) -> Option<u16> {
280        crate::collation::code_page_for_lcid(self.lcid)
281    }
282
283    /// Returns the encoding name for this collation.
284    ///
285    /// Useful for error messages and debugging.
286    #[cfg(feature = "encoding")]
287    pub fn encoding_name(&self) -> &'static str {
288        crate::collation::encoding_name_for_lcid(self.lcid)
289    }
290}
291
292/// Raw row data (not yet decoded).
293#[derive(Debug, Clone)]
294pub struct RawRow {
295    /// Raw column values.
296    pub data: bytes::Bytes,
297}
298
299/// Null bitmap compressed row.
300#[derive(Debug, Clone)]
301pub struct NbcRow {
302    /// Null bitmap.
303    pub null_bitmap: Vec<u8>,
304    /// Raw non-null column values.
305    pub data: bytes::Bytes,
306}
307
308/// Done token indicating statement completion.
309#[derive(Debug, Clone, Copy)]
310pub struct Done {
311    /// Status flags.
312    pub status: DoneStatus,
313    /// Current command.
314    pub cur_cmd: u16,
315    /// Row count (if applicable).
316    pub row_count: u64,
317}
318
319/// Done status flags.
320#[derive(Debug, Clone, Copy, Default)]
321#[non_exhaustive]
322pub struct DoneStatus {
323    /// More results follow.
324    pub more: bool,
325    /// Error occurred.
326    pub error: bool,
327    /// Transaction in progress.
328    pub in_xact: bool,
329    /// Row count is valid.
330    pub count: bool,
331    /// Attention acknowledgment.
332    pub attn: bool,
333    /// Server error caused statement termination.
334    pub srverror: bool,
335}
336
337/// Done in procedure token.
338#[derive(Debug, Clone, Copy)]
339pub struct DoneInProc {
340    /// Status flags.
341    pub status: DoneStatus,
342    /// Current command.
343    pub cur_cmd: u16,
344    /// Row count.
345    pub row_count: u64,
346}
347
348/// Done procedure token.
349#[derive(Debug, Clone, Copy)]
350pub struct DoneProc {
351    /// Status flags.
352    pub status: DoneStatus,
353    /// Current command.
354    pub cur_cmd: u16,
355    /// Row count.
356    pub row_count: u64,
357}
358
359/// Return value from stored procedure.
360#[derive(Debug, Clone)]
361#[non_exhaustive]
362pub struct ReturnValue {
363    /// Parameter ordinal.
364    pub param_ordinal: u16,
365    /// Parameter name.
366    pub param_name: String,
367    /// Status flags.
368    pub status: u8,
369    /// User type.
370    pub user_type: u32,
371    /// Type flags.
372    pub flags: u16,
373    /// Raw column type byte from the wire.
374    pub col_type: u8,
375    /// Type info.
376    pub type_info: TypeInfo,
377    /// Value data.
378    pub value: bytes::Bytes,
379}
380
381/// Server error message.
382#[derive(Debug, Clone)]
383pub struct ServerError {
384    /// Error number.
385    pub number: i32,
386    /// Error state.
387    pub state: u8,
388    /// Error severity class.
389    pub class: u8,
390    /// Error message text.
391    pub message: String,
392    /// Server name.
393    pub server: String,
394    /// Procedure name.
395    pub procedure: String,
396    /// Line number.
397    pub line: i32,
398}
399
400/// Server informational message.
401#[derive(Debug, Clone)]
402pub struct ServerInfo {
403    /// Info number.
404    pub number: i32,
405    /// Info state.
406    pub state: u8,
407    /// Info class (severity).
408    pub class: u8,
409    /// Info message text.
410    pub message: String,
411    /// Server name.
412    pub server: String,
413    /// Procedure name.
414    pub procedure: String,
415    /// Line number.
416    pub line: i32,
417}
418
419/// Login acknowledgment token.
420#[derive(Debug, Clone)]
421pub struct LoginAck {
422    /// Interface type.
423    pub interface: u8,
424    /// TDS version.
425    pub tds_version: u32,
426    /// Program name.
427    pub prog_name: String,
428    /// Program version.
429    pub prog_version: u32,
430}
431
432/// Environment change token.
433#[derive(Debug, Clone)]
434pub struct EnvChange {
435    /// Type of environment change.
436    pub env_type: EnvChangeType,
437    /// New value.
438    pub new_value: EnvChangeValue,
439    /// Old value.
440    pub old_value: EnvChangeValue,
441}
442
443/// Environment change type.
444#[derive(Debug, Clone, Copy, PartialEq, Eq)]
445#[repr(u8)]
446#[non_exhaustive]
447pub enum EnvChangeType {
448    /// Database changed.
449    Database = 1,
450    /// Language changed.
451    Language = 2,
452    /// Character set changed.
453    CharacterSet = 3,
454    /// Packet size changed.
455    PacketSize = 4,
456    /// Unicode data sorting locale ID.
457    UnicodeSortingLocalId = 5,
458    /// Unicode comparison flags.
459    UnicodeComparisonFlags = 6,
460    /// SQL collation.
461    SqlCollation = 7,
462    /// Begin transaction.
463    BeginTransaction = 8,
464    /// Commit transaction.
465    CommitTransaction = 9,
466    /// Rollback transaction.
467    RollbackTransaction = 10,
468    /// Enlist DTC transaction.
469    EnlistDtcTransaction = 11,
470    /// Defect DTC transaction.
471    DefectTransaction = 12,
472    /// Real-time log shipping.
473    RealTimeLogShipping = 13,
474    /// Promote transaction.
475    PromoteTransaction = 15,
476    /// Transaction manager address.
477    TransactionManagerAddress = 16,
478    /// Transaction ended.
479    TransactionEnded = 17,
480    /// Reset connection completion acknowledgment.
481    ResetConnectionCompletionAck = 18,
482    /// User instance started.
483    UserInstanceStarted = 19,
484    /// Routing information.
485    Routing = 20,
486}
487
488/// Environment change value.
489#[derive(Debug, Clone)]
490#[non_exhaustive]
491pub enum EnvChangeValue {
492    /// String value.
493    String(String),
494    /// Binary value.
495    Binary(bytes::Bytes),
496    /// Routing information.
497    Routing {
498        /// Host name.
499        host: String,
500        /// Port number.
501        port: u16,
502    },
503}
504
505/// Column ordering information.
506#[derive(Debug, Clone)]
507pub struct Order {
508    /// Ordered column indices.
509    pub columns: Vec<u16>,
510}
511
512/// Feature extension acknowledgment.
513#[derive(Debug, Clone)]
514pub struct FeatureExtAck {
515    /// Acknowledged features.
516    pub features: Vec<FeatureAck>,
517}
518
519/// Individual feature acknowledgment.
520#[derive(Debug, Clone)]
521pub struct FeatureAck {
522    /// Feature ID.
523    pub feature_id: u8,
524    /// Feature data.
525    pub data: bytes::Bytes,
526}
527
528/// SSPI authentication token.
529#[derive(Debug, Clone)]
530pub struct SspiToken {
531    /// SSPI data.
532    pub data: bytes::Bytes,
533}
534
535/// Session state token.
536#[derive(Debug, Clone)]
537pub struct SessionState {
538    /// Session state data.
539    pub data: bytes::Bytes,
540}
541
542/// Federated authentication info.
543#[derive(Debug, Clone)]
544pub struct FedAuthInfo {
545    /// STS URL.
546    pub sts_url: String,
547    /// Service principal name.
548    pub spn: String,
549}
550
551// =============================================================================
552// ColMetaData and Row Parsing Implementation
553// =============================================================================
554
555impl ColMetaData {
556    /// Special value indicating no metadata.
557    pub const NO_METADATA: u16 = 0xFFFF;
558
559    /// Decode a COLMETADATA token from bytes.
560    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
561        if src.remaining() < 2 {
562            return Err(ProtocolError::UnexpectedEof);
563        }
564
565        let column_count = src.get_u16_le();
566
567        // 0xFFFF means no metadata present
568        if column_count == Self::NO_METADATA {
569            return Ok(Self {
570                columns: Vec::new(),
571            });
572        }
573
574        let mut columns = Vec::with_capacity(column_count as usize);
575
576        for _ in 0..column_count {
577            let column = Self::decode_column(src)?;
578            columns.push(column);
579        }
580
581        Ok(Self { columns })
582    }
583
584    /// Decode a single column from the metadata.
585    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
586        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
587        if src.remaining() < 7 {
588            return Err(ProtocolError::UnexpectedEof);
589        }
590
591        let user_type = src.get_u32_le();
592        let flags = src.get_u16_le();
593        let col_type = src.get_u8();
594
595        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null); // Default to Null for unknown types
596
597        // Parse type-specific metadata
598        let type_info = Self::decode_type_info(src, type_id, col_type)?;
599
600        // Read column name (B_VARCHAR format - 1 byte length in characters)
601        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
602
603        Ok(ColumnData {
604            name,
605            type_id,
606            col_type,
607            flags,
608            user_type,
609            type_info,
610        })
611    }
612
613    /// Decode type-specific metadata based on the type ID.
614    fn decode_type_info(
615        src: &mut impl Buf,
616        type_id: TypeId,
617        col_type: u8,
618    ) -> Result<TypeInfo, ProtocolError> {
619        match type_id {
620            // Fixed-length types have no additional metadata
621            TypeId::Null => Ok(TypeInfo::default()),
622            TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
623            TypeId::Int2 => Ok(TypeInfo::default()),
624            TypeId::Int4 => Ok(TypeInfo::default()),
625            TypeId::Int8 => Ok(TypeInfo::default()),
626            TypeId::Float4 => Ok(TypeInfo::default()),
627            TypeId::Float8 => Ok(TypeInfo::default()),
628            TypeId::Money => Ok(TypeInfo::default()),
629            TypeId::Money4 => Ok(TypeInfo::default()),
630            TypeId::DateTime => Ok(TypeInfo::default()),
631            TypeId::DateTime4 => Ok(TypeInfo::default()),
632
633            // Variable length integer/float/money (1-byte max length)
634            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
635                if src.remaining() < 1 {
636                    return Err(ProtocolError::UnexpectedEof);
637                }
638                let max_length = src.get_u8() as u32;
639                Ok(TypeInfo {
640                    max_length: Some(max_length),
641                    ..Default::default()
642                })
643            }
644
645            // GUID has 1-byte length
646            TypeId::Guid => {
647                if src.remaining() < 1 {
648                    return Err(ProtocolError::UnexpectedEof);
649                }
650                let max_length = src.get_u8() as u32;
651                Ok(TypeInfo {
652                    max_length: Some(max_length),
653                    ..Default::default()
654                })
655            }
656
657            // Decimal/Numeric types (1-byte length + precision + scale)
658            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
659                if src.remaining() < 3 {
660                    return Err(ProtocolError::UnexpectedEof);
661                }
662                let max_length = src.get_u8() as u32;
663                let precision = src.get_u8();
664                let scale = src.get_u8();
665                Ok(TypeInfo {
666                    max_length: Some(max_length),
667                    precision: Some(precision),
668                    scale: Some(scale),
669                    ..Default::default()
670                })
671            }
672
673            // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
674            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
675                if src.remaining() < 1 {
676                    return Err(ProtocolError::UnexpectedEof);
677                }
678                let max_length = src.get_u8() as u32;
679                Ok(TypeInfo {
680                    max_length: Some(max_length),
681                    ..Default::default()
682                })
683            }
684
685            // Big varchar/binary with 2-byte length + collation for strings
686            TypeId::BigVarChar | TypeId::BigChar => {
687                if src.remaining() < 7 {
688                    // 2 (length) + 5 (collation)
689                    return Err(ProtocolError::UnexpectedEof);
690                }
691                let max_length = src.get_u16_le() as u32;
692                let collation = Self::decode_collation(src)?;
693                Ok(TypeInfo {
694                    max_length: Some(max_length),
695                    collation: Some(collation),
696                    ..Default::default()
697                })
698            }
699
700            // Big binary (2-byte length, no collation)
701            TypeId::BigVarBinary | TypeId::BigBinary => {
702                if src.remaining() < 2 {
703                    return Err(ProtocolError::UnexpectedEof);
704                }
705                let max_length = src.get_u16_le() as u32;
706                Ok(TypeInfo {
707                    max_length: Some(max_length),
708                    ..Default::default()
709                })
710            }
711
712            // Unicode strings (NChar, NVarChar) - 2-byte length + collation
713            TypeId::NChar | TypeId::NVarChar => {
714                if src.remaining() < 7 {
715                    // 2 (length) + 5 (collation)
716                    return Err(ProtocolError::UnexpectedEof);
717                }
718                let max_length = src.get_u16_le() as u32;
719                let collation = Self::decode_collation(src)?;
720                Ok(TypeInfo {
721                    max_length: Some(max_length),
722                    collation: Some(collation),
723                    ..Default::default()
724                })
725            }
726
727            // Date type (no additional metadata)
728            TypeId::Date => Ok(TypeInfo::default()),
729
730            // Time, DateTime2, DateTimeOffset have scale
731            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
732                if src.remaining() < 1 {
733                    return Err(ProtocolError::UnexpectedEof);
734                }
735                let scale = src.get_u8();
736                Ok(TypeInfo {
737                    scale: Some(scale),
738                    ..Default::default()
739                })
740            }
741
742            // Text/NText/Image (deprecated LOB types)
743            TypeId::Text | TypeId::NText | TypeId::Image => {
744                // These have complex metadata: length (4) + collation (5) + table name parts
745                if src.remaining() < 4 {
746                    return Err(ProtocolError::UnexpectedEof);
747                }
748                let max_length = src.get_u32_le();
749
750                // For Text/NText, read collation
751                let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
752                    if src.remaining() < 5 {
753                        return Err(ProtocolError::UnexpectedEof);
754                    }
755                    Some(Self::decode_collation(src)?)
756                } else {
757                    None
758                };
759
760                // Skip table name parts (variable length)
761                // Format: numParts (1 byte) followed by us_varchar for each part
762                if src.remaining() < 1 {
763                    return Err(ProtocolError::UnexpectedEof);
764                }
765                let num_parts = src.get_u8();
766                for _ in 0..num_parts {
767                    // Read and discard table name part
768                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
769                }
770
771                Ok(TypeInfo {
772                    max_length: Some(max_length),
773                    collation,
774                    ..Default::default()
775                })
776            }
777
778            // XML type
779            TypeId::Xml => {
780                if src.remaining() < 1 {
781                    return Err(ProtocolError::UnexpectedEof);
782                }
783                let schema_present = src.get_u8();
784
785                if schema_present != 0 {
786                    // Read schema info (3 us_varchar strings)
787                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
788                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
789                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
790                }
791
792                Ok(TypeInfo::default())
793            }
794
795            // UDT (User-defined type) - complex metadata
796            TypeId::Udt => {
797                // Max length (2 bytes)
798                if src.remaining() < 2 {
799                    return Err(ProtocolError::UnexpectedEof);
800                }
801                let max_length = src.get_u16_le() as u32;
802
803                // UDT metadata: db name, schema name, type name, assembly qualified name
804                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
805                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
806                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
807                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
808
809                Ok(TypeInfo {
810                    max_length: Some(max_length),
811                    ..Default::default()
812                })
813            }
814
815            // Table-valued parameter - complex metadata (skip for now)
816            TypeId::Tvp => {
817                // TVP has very complex metadata, not commonly used
818                // For now, we can't properly parse this
819                Err(ProtocolError::InvalidTokenType(col_type))
820            }
821
822            // SQL Variant - 4-byte length
823            TypeId::Variant => {
824                if src.remaining() < 4 {
825                    return Err(ProtocolError::UnexpectedEof);
826                }
827                let max_length = src.get_u32_le();
828                Ok(TypeInfo {
829                    max_length: Some(max_length),
830                    ..Default::default()
831                })
832            }
833        }
834    }
835
836    /// Decode collation information (5 bytes).
837    fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
838        if src.remaining() < 5 {
839            return Err(ProtocolError::UnexpectedEof);
840        }
841        // Collation: LCID (4 bytes) + Sort ID (1 byte)
842        let lcid = src.get_u32_le();
843        let sort_id = src.get_u8();
844        Ok(Collation { lcid, sort_id })
845    }
846
847    /// Get the number of columns.
848    #[must_use]
849    pub fn column_count(&self) -> usize {
850        self.columns.len()
851    }
852
853    /// Check if this represents no metadata.
854    #[must_use]
855    pub fn is_empty(&self) -> bool {
856        self.columns.is_empty()
857    }
858}
859
860impl ColumnData {
861    /// Check if this column is nullable.
862    #[must_use]
863    pub fn is_nullable(&self) -> bool {
864        (self.flags & 0x0001) != 0
865    }
866
867    /// Get the fixed size in bytes for this column, if applicable.
868    ///
869    /// Returns `None` for variable-length types.
870    #[must_use]
871    pub fn fixed_size(&self) -> Option<usize> {
872        match self.type_id {
873            TypeId::Null => Some(0),
874            TypeId::Int1 | TypeId::Bit => Some(1),
875            TypeId::Int2 => Some(2),
876            TypeId::Int4 => Some(4),
877            TypeId::Int8 => Some(8),
878            TypeId::Float4 => Some(4),
879            TypeId::Float8 => Some(8),
880            TypeId::Money => Some(8),
881            TypeId::Money4 => Some(4),
882            TypeId::DateTime => Some(8),
883            TypeId::DateTime4 => Some(4),
884            TypeId::Date => Some(3),
885            _ => None,
886        }
887    }
888}
889
890// =============================================================================
891// Row Parsing Implementation
892// =============================================================================
893
894impl RawRow {
895    /// Decode a ROW token from bytes.
896    ///
897    /// This function requires the column metadata to know how to parse the row.
898    /// The row data is stored as raw bytes for later parsing.
899    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
900        let mut data = bytes::BytesMut::new();
901
902        for col in &metadata.columns {
903            Self::decode_column_value(src, col, &mut data)?;
904        }
905
906        Ok(Self {
907            data: data.freeze(),
908        })
909    }
910
911    /// Decode a single column value and append to the output buffer.
912    fn decode_column_value(
913        src: &mut impl Buf,
914        col: &ColumnData,
915        dst: &mut bytes::BytesMut,
916    ) -> Result<(), ProtocolError> {
917        match col.type_id {
918            // Fixed-length types
919            TypeId::Null => {
920                // No data
921            }
922            TypeId::Int1 | TypeId::Bit => {
923                if src.remaining() < 1 {
924                    return Err(ProtocolError::UnexpectedEof);
925                }
926                dst.extend_from_slice(&[src.get_u8()]);
927            }
928            TypeId::Int2 => {
929                if src.remaining() < 2 {
930                    return Err(ProtocolError::UnexpectedEof);
931                }
932                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
933            }
934            TypeId::Int4 => {
935                if src.remaining() < 4 {
936                    return Err(ProtocolError::UnexpectedEof);
937                }
938                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
939            }
940            TypeId::Int8 => {
941                if src.remaining() < 8 {
942                    return Err(ProtocolError::UnexpectedEof);
943                }
944                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
945            }
946            TypeId::Float4 => {
947                if src.remaining() < 4 {
948                    return Err(ProtocolError::UnexpectedEof);
949                }
950                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
951            }
952            TypeId::Float8 => {
953                if src.remaining() < 8 {
954                    return Err(ProtocolError::UnexpectedEof);
955                }
956                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
957            }
958            TypeId::Money => {
959                if src.remaining() < 8 {
960                    return Err(ProtocolError::UnexpectedEof);
961                }
962                let hi = src.get_u32_le();
963                let lo = src.get_u32_le();
964                dst.extend_from_slice(&hi.to_le_bytes());
965                dst.extend_from_slice(&lo.to_le_bytes());
966            }
967            TypeId::Money4 => {
968                if src.remaining() < 4 {
969                    return Err(ProtocolError::UnexpectedEof);
970                }
971                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
972            }
973            TypeId::DateTime => {
974                if src.remaining() < 8 {
975                    return Err(ProtocolError::UnexpectedEof);
976                }
977                let days = src.get_u32_le();
978                let time = src.get_u32_le();
979                dst.extend_from_slice(&days.to_le_bytes());
980                dst.extend_from_slice(&time.to_le_bytes());
981            }
982            TypeId::DateTime4 => {
983                if src.remaining() < 4 {
984                    return Err(ProtocolError::UnexpectedEof);
985                }
986                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
987            }
988            // DATE type uses 1-byte length prefix (can be NULL)
989            TypeId::Date => {
990                Self::decode_bytelen_type(src, dst)?;
991            }
992
993            // Variable-length nullable types (length-prefixed)
994            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
995                Self::decode_bytelen_type(src, dst)?;
996            }
997
998            TypeId::Guid => {
999                Self::decode_bytelen_type(src, dst)?;
1000            }
1001
1002            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
1003                Self::decode_bytelen_type(src, dst)?;
1004            }
1005
1006            // Old-style byte-length strings
1007            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
1008                Self::decode_bytelen_type(src, dst)?;
1009            }
1010
1011            // 2-byte length strings (or PLP for MAX types)
1012            TypeId::BigVarChar | TypeId::BigVarBinary => {
1013                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
1014                if col.type_info.max_length == Some(0xFFFF) {
1015                    Self::decode_plp_type(src, dst)?;
1016                } else {
1017                    Self::decode_ushortlen_type(src, dst)?;
1018                }
1019            }
1020
1021            // Fixed-length types that don't have MAX variants
1022            TypeId::BigChar | TypeId::BigBinary => {
1023                Self::decode_ushortlen_type(src, dst)?;
1024            }
1025
1026            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
1027            TypeId::NVarChar => {
1028                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
1029                if col.type_info.max_length == Some(0xFFFF) {
1030                    Self::decode_plp_type(src, dst)?;
1031                } else {
1032                    Self::decode_ushortlen_type(src, dst)?;
1033                }
1034            }
1035
1036            // Fixed-length NCHAR doesn't have MAX variant
1037            TypeId::NChar => {
1038                Self::decode_ushortlen_type(src, dst)?;
1039            }
1040
1041            // Time types with scale
1042            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
1043                Self::decode_bytelen_type(src, dst)?;
1044            }
1045
1046            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
1047            TypeId::Text | TypeId::NText | TypeId::Image => {
1048                Self::decode_textptr_type(src, dst)?;
1049            }
1050
1051            // XML - uses actual PLP format
1052            TypeId::Xml => {
1053                Self::decode_plp_type(src, dst)?;
1054            }
1055
1056            // Complex types
1057            TypeId::Variant => {
1058                Self::decode_intlen_type(src, dst)?;
1059            }
1060
1061            TypeId::Udt => {
1062                // UDT uses PLP encoding
1063                Self::decode_plp_type(src, dst)?;
1064            }
1065
1066            TypeId::Tvp => {
1067                // TVP not supported in row data
1068                return Err(ProtocolError::InvalidTokenType(col.col_type));
1069            }
1070        }
1071
1072        Ok(())
1073    }
1074
1075    /// Decode a 1-byte length-prefixed value.
1076    fn decode_bytelen_type(
1077        src: &mut impl Buf,
1078        dst: &mut bytes::BytesMut,
1079    ) -> Result<(), ProtocolError> {
1080        if src.remaining() < 1 {
1081            return Err(ProtocolError::UnexpectedEof);
1082        }
1083        let len = src.get_u8() as usize;
1084        if len == 0xFF {
1085            // NULL value - store as zero-length with NULL marker
1086            dst.extend_from_slice(&[0xFF]);
1087        } else if len == 0 {
1088            // Empty value
1089            dst.extend_from_slice(&[0x00]);
1090        } else {
1091            if src.remaining() < len {
1092                return Err(ProtocolError::UnexpectedEof);
1093            }
1094            dst.extend_from_slice(&[len as u8]);
1095            for _ in 0..len {
1096                dst.extend_from_slice(&[src.get_u8()]);
1097            }
1098        }
1099        Ok(())
1100    }
1101
1102    /// Decode a 2-byte length-prefixed value.
1103    fn decode_ushortlen_type(
1104        src: &mut impl Buf,
1105        dst: &mut bytes::BytesMut,
1106    ) -> Result<(), ProtocolError> {
1107        if src.remaining() < 2 {
1108            return Err(ProtocolError::UnexpectedEof);
1109        }
1110        let len = src.get_u16_le() as usize;
1111        if len == 0xFFFF {
1112            // NULL value
1113            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1114        } else if len == 0 {
1115            // Empty value
1116            dst.extend_from_slice(&0u16.to_le_bytes());
1117        } else {
1118            if src.remaining() < len {
1119                return Err(ProtocolError::UnexpectedEof);
1120            }
1121            dst.extend_from_slice(&(len as u16).to_le_bytes());
1122            for _ in 0..len {
1123                dst.extend_from_slice(&[src.get_u8()]);
1124            }
1125        }
1126        Ok(())
1127    }
1128
1129    /// Decode a 4-byte length-prefixed value.
1130    fn decode_intlen_type(
1131        src: &mut impl Buf,
1132        dst: &mut bytes::BytesMut,
1133    ) -> Result<(), ProtocolError> {
1134        if src.remaining() < 4 {
1135            return Err(ProtocolError::UnexpectedEof);
1136        }
1137        let len = src.get_u32_le() as usize;
1138        if len == 0xFFFFFFFF {
1139            // NULL value
1140            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1141        } else if len == 0 {
1142            // Empty value
1143            dst.extend_from_slice(&0u32.to_le_bytes());
1144        } else {
1145            if src.remaining() < len {
1146                return Err(ProtocolError::UnexpectedEof);
1147            }
1148            dst.extend_from_slice(&(len as u32).to_le_bytes());
1149            for _ in 0..len {
1150                dst.extend_from_slice(&[src.get_u8()]);
1151            }
1152        }
1153        Ok(())
1154    }
1155
1156    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1157    ///
1158    /// These deprecated LOB types use a special format:
1159    /// - 1 byte: textptr_len (0 = NULL)
1160    /// - textptr_len bytes: textptr (if not NULL)
1161    /// - 8 bytes: timestamp (if not NULL)
1162    /// - 4 bytes: data length (if not NULL)
1163    /// - data_len bytes: the actual data (if not NULL)
1164    ///
1165    /// We convert this to PLP format for the client to parse:
1166    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1167    /// - 4 bytes: chunk length (= data length)
1168    /// - chunk data
1169    /// - 4 bytes: 0 (terminator)
1170    fn decode_textptr_type(
1171        src: &mut impl Buf,
1172        dst: &mut bytes::BytesMut,
1173    ) -> Result<(), ProtocolError> {
1174        if src.remaining() < 1 {
1175            return Err(ProtocolError::UnexpectedEof);
1176        }
1177
1178        let textptr_len = src.get_u8() as usize;
1179
1180        if textptr_len == 0 {
1181            // NULL value - write PLP NULL marker
1182            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1183            return Ok(());
1184        }
1185
1186        // Skip textptr bytes
1187        if src.remaining() < textptr_len {
1188            return Err(ProtocolError::UnexpectedEof);
1189        }
1190        src.advance(textptr_len);
1191
1192        // Skip 8-byte timestamp
1193        if src.remaining() < 8 {
1194            return Err(ProtocolError::UnexpectedEof);
1195        }
1196        src.advance(8);
1197
1198        // Read data length
1199        if src.remaining() < 4 {
1200            return Err(ProtocolError::UnexpectedEof);
1201        }
1202        let data_len = src.get_u32_le() as usize;
1203
1204        if src.remaining() < data_len {
1205            return Err(ProtocolError::UnexpectedEof);
1206        }
1207
1208        // Write in PLP format for client parsing:
1209        // - 8 bytes: total length
1210        // - 4 bytes: chunk length
1211        // - chunk data
1212        // - 4 bytes: 0 (terminator)
1213        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1214        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1215        for _ in 0..data_len {
1216            dst.extend_from_slice(&[src.get_u8()]);
1217        }
1218        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1219
1220        Ok(())
1221    }
1222
1223    /// Decode a PLP (Partially Length-Prefixed) value.
1224    ///
1225    /// PLP format:
1226    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1227    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1228    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1229        if src.remaining() < 8 {
1230            return Err(ProtocolError::UnexpectedEof);
1231        }
1232
1233        let total_len = src.get_u64_le();
1234
1235        // Store the total length marker
1236        dst.extend_from_slice(&total_len.to_le_bytes());
1237
1238        if total_len == 0xFFFFFFFFFFFFFFFF {
1239            // NULL value - no more data
1240            return Ok(());
1241        }
1242
1243        // Read chunks until terminator
1244        loop {
1245            if src.remaining() < 4 {
1246                return Err(ProtocolError::UnexpectedEof);
1247            }
1248            let chunk_len = src.get_u32_le() as usize;
1249            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1250
1251            if chunk_len == 0 {
1252                // End of PLP data
1253                break;
1254            }
1255
1256            if src.remaining() < chunk_len {
1257                return Err(ProtocolError::UnexpectedEof);
1258            }
1259
1260            for _ in 0..chunk_len {
1261                dst.extend_from_slice(&[src.get_u8()]);
1262            }
1263        }
1264
1265        Ok(())
1266    }
1267}
1268
1269// =============================================================================
1270// NbcRow Parsing Implementation
1271// =============================================================================
1272
1273impl NbcRow {
1274    /// Decode an NBCROW token from bytes.
1275    ///
1276    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1277    /// columns are NULL, followed by only the non-NULL values.
1278    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1279        let col_count = metadata.columns.len();
1280        let bitmap_len = col_count.div_ceil(8);
1281
1282        if src.remaining() < bitmap_len {
1283            return Err(ProtocolError::UnexpectedEof);
1284        }
1285
1286        // Read null bitmap
1287        let mut null_bitmap = vec![0u8; bitmap_len];
1288        for byte in &mut null_bitmap {
1289            *byte = src.get_u8();
1290        }
1291
1292        // Read non-null values
1293        let mut data = bytes::BytesMut::new();
1294
1295        for (i, col) in metadata.columns.iter().enumerate() {
1296            let byte_idx = i / 8;
1297            let bit_idx = i % 8;
1298            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1299
1300            if !is_null {
1301                // Read the value - for NBCROW, we read without the length prefix
1302                // for fixed-length types, and with length prefix for variable types
1303                RawRow::decode_column_value(src, col, &mut data)?;
1304            }
1305        }
1306
1307        Ok(Self {
1308            null_bitmap,
1309            data: data.freeze(),
1310        })
1311    }
1312
1313    /// Check if a column at the given index is NULL.
1314    #[must_use]
1315    pub fn is_null(&self, column_index: usize) -> bool {
1316        let byte_idx = column_index / 8;
1317        let bit_idx = column_index % 8;
1318        if byte_idx < self.null_bitmap.len() {
1319            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1320        } else {
1321            true // Out of bounds = NULL
1322        }
1323    }
1324}
1325
1326// =============================================================================
1327// ReturnValue Parsing Implementation
1328// =============================================================================
1329
1330impl ReturnValue {
1331    /// Decode a RETURNVALUE token from bytes.
1332    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1333        // Length (2 bytes)
1334        if src.remaining() < 2 {
1335            return Err(ProtocolError::UnexpectedEof);
1336        }
1337        let _length = src.get_u16_le();
1338
1339        // Parameter ordinal (2 bytes)
1340        if src.remaining() < 2 {
1341            return Err(ProtocolError::UnexpectedEof);
1342        }
1343        let param_ordinal = src.get_u16_le();
1344
1345        // Parameter name (B_VARCHAR)
1346        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1347
1348        // Status (1 byte)
1349        if src.remaining() < 1 {
1350            return Err(ProtocolError::UnexpectedEof);
1351        }
1352        let status = src.get_u8();
1353
1354        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1355        if src.remaining() < 7 {
1356            return Err(ProtocolError::UnexpectedEof);
1357        }
1358        let user_type = src.get_u32_le();
1359        let flags = src.get_u16_le();
1360        let col_type = src.get_u8();
1361
1362        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
1363
1364        // Parse type info
1365        let type_info = ColMetaData::decode_type_info(src, type_id, col_type)?;
1366
1367        // Read the value data
1368        let mut value_buf = bytes::BytesMut::new();
1369
1370        // Create a temporary column for value parsing
1371        let temp_col = ColumnData {
1372            name: String::new(),
1373            type_id,
1374            col_type,
1375            flags,
1376            user_type,
1377            type_info: type_info.clone(),
1378        };
1379
1380        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1381
1382        Ok(Self {
1383            param_ordinal,
1384            param_name,
1385            status,
1386            user_type,
1387            flags,
1388            col_type,
1389            type_info,
1390            value: value_buf.freeze(),
1391        })
1392    }
1393}
1394
1395// =============================================================================
1396// SessionState Parsing Implementation
1397// =============================================================================
1398
1399impl SessionState {
1400    /// Decode a SESSIONSTATE token from bytes.
1401    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1402        if src.remaining() < 4 {
1403            return Err(ProtocolError::UnexpectedEof);
1404        }
1405
1406        let length = src.get_u32_le() as usize;
1407
1408        if src.remaining() < length {
1409            return Err(ProtocolError::IncompletePacket {
1410                expected: length,
1411                actual: src.remaining(),
1412            });
1413        }
1414
1415        let data = src.copy_to_bytes(length);
1416
1417        Ok(Self { data })
1418    }
1419}
1420
1421// =============================================================================
1422// Token Parsing Implementation
1423// =============================================================================
1424
1425/// Done token status flags bit positions.
1426mod done_status_bits {
1427    pub const DONE_MORE: u16 = 0x0001;
1428    pub const DONE_ERROR: u16 = 0x0002;
1429    pub const DONE_INXACT: u16 = 0x0004;
1430    pub const DONE_COUNT: u16 = 0x0010;
1431    pub const DONE_ATTN: u16 = 0x0020;
1432    pub const DONE_SRVERROR: u16 = 0x0100;
1433}
1434
1435impl DoneStatus {
1436    /// Parse done status from raw bits.
1437    #[must_use]
1438    pub fn from_bits(bits: u16) -> Self {
1439        use done_status_bits::*;
1440        Self {
1441            more: (bits & DONE_MORE) != 0,
1442            error: (bits & DONE_ERROR) != 0,
1443            in_xact: (bits & DONE_INXACT) != 0,
1444            count: (bits & DONE_COUNT) != 0,
1445            attn: (bits & DONE_ATTN) != 0,
1446            srverror: (bits & DONE_SRVERROR) != 0,
1447        }
1448    }
1449
1450    /// Convert to raw bits.
1451    #[must_use]
1452    pub fn to_bits(&self) -> u16 {
1453        use done_status_bits::*;
1454        let mut bits = 0u16;
1455        if self.more {
1456            bits |= DONE_MORE;
1457        }
1458        if self.error {
1459            bits |= DONE_ERROR;
1460        }
1461        if self.in_xact {
1462            bits |= DONE_INXACT;
1463        }
1464        if self.count {
1465            bits |= DONE_COUNT;
1466        }
1467        if self.attn {
1468            bits |= DONE_ATTN;
1469        }
1470        if self.srverror {
1471            bits |= DONE_SRVERROR;
1472        }
1473        bits
1474    }
1475}
1476
1477impl Done {
1478    /// Size of the DONE token in bytes (excluding token type byte).
1479    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1480
1481    /// Decode a DONE token from bytes.
1482    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1483        if src.remaining() < Self::SIZE {
1484            return Err(ProtocolError::IncompletePacket {
1485                expected: Self::SIZE,
1486                actual: src.remaining(),
1487            });
1488        }
1489
1490        let status = DoneStatus::from_bits(src.get_u16_le());
1491        let cur_cmd = src.get_u16_le();
1492        let row_count = src.get_u64_le();
1493
1494        Ok(Self {
1495            status,
1496            cur_cmd,
1497            row_count,
1498        })
1499    }
1500
1501    /// Encode the DONE token to bytes.
1502    pub fn encode(&self, dst: &mut impl BufMut) {
1503        dst.put_u8(TokenType::Done as u8);
1504        dst.put_u16_le(self.status.to_bits());
1505        dst.put_u16_le(self.cur_cmd);
1506        dst.put_u64_le(self.row_count);
1507    }
1508
1509    /// Check if more results follow this DONE token.
1510    #[must_use]
1511    pub const fn has_more(&self) -> bool {
1512        self.status.more
1513    }
1514
1515    /// Check if an error occurred.
1516    #[must_use]
1517    pub const fn has_error(&self) -> bool {
1518        self.status.error
1519    }
1520
1521    /// Check if the row count is valid.
1522    #[must_use]
1523    pub const fn has_count(&self) -> bool {
1524        self.status.count
1525    }
1526}
1527
1528impl DoneProc {
1529    /// Size of the DONEPROC token in bytes (excluding token type byte).
1530    pub const SIZE: usize = 12;
1531
1532    /// Decode a DONEPROC token from bytes.
1533    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1534        if src.remaining() < Self::SIZE {
1535            return Err(ProtocolError::IncompletePacket {
1536                expected: Self::SIZE,
1537                actual: src.remaining(),
1538            });
1539        }
1540
1541        let status = DoneStatus::from_bits(src.get_u16_le());
1542        let cur_cmd = src.get_u16_le();
1543        let row_count = src.get_u64_le();
1544
1545        Ok(Self {
1546            status,
1547            cur_cmd,
1548            row_count,
1549        })
1550    }
1551
1552    /// Encode the DONEPROC token to bytes.
1553    pub fn encode(&self, dst: &mut impl BufMut) {
1554        dst.put_u8(TokenType::DoneProc as u8);
1555        dst.put_u16_le(self.status.to_bits());
1556        dst.put_u16_le(self.cur_cmd);
1557        dst.put_u64_le(self.row_count);
1558    }
1559}
1560
1561impl DoneInProc {
1562    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1563    pub const SIZE: usize = 12;
1564
1565    /// Decode a DONEINPROC token from bytes.
1566    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1567        if src.remaining() < Self::SIZE {
1568            return Err(ProtocolError::IncompletePacket {
1569                expected: Self::SIZE,
1570                actual: src.remaining(),
1571            });
1572        }
1573
1574        let status = DoneStatus::from_bits(src.get_u16_le());
1575        let cur_cmd = src.get_u16_le();
1576        let row_count = src.get_u64_le();
1577
1578        Ok(Self {
1579            status,
1580            cur_cmd,
1581            row_count,
1582        })
1583    }
1584
1585    /// Encode the DONEINPROC token to bytes.
1586    pub fn encode(&self, dst: &mut impl BufMut) {
1587        dst.put_u8(TokenType::DoneInProc as u8);
1588        dst.put_u16_le(self.status.to_bits());
1589        dst.put_u16_le(self.cur_cmd);
1590        dst.put_u64_le(self.row_count);
1591    }
1592}
1593
1594impl ServerError {
1595    /// Decode an ERROR token from bytes.
1596    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1597        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1598        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1599        if src.remaining() < 2 {
1600            return Err(ProtocolError::UnexpectedEof);
1601        }
1602
1603        let _length = src.get_u16_le();
1604
1605        if src.remaining() < 6 {
1606            return Err(ProtocolError::UnexpectedEof);
1607        }
1608
1609        let number = src.get_i32_le();
1610        let state = src.get_u8();
1611        let class = src.get_u8();
1612
1613        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1614        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1615        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1616
1617        if src.remaining() < 4 {
1618            return Err(ProtocolError::UnexpectedEof);
1619        }
1620        let line = src.get_i32_le();
1621
1622        Ok(Self {
1623            number,
1624            state,
1625            class,
1626            message,
1627            server,
1628            procedure,
1629            line,
1630        })
1631    }
1632
1633    /// Check if this is a fatal error (severity >= 20).
1634    #[must_use]
1635    pub const fn is_fatal(&self) -> bool {
1636        self.class >= 20
1637    }
1638
1639    /// Check if this error indicates the batch was aborted (severity >= 16).
1640    #[must_use]
1641    pub const fn is_batch_abort(&self) -> bool {
1642        self.class >= 16
1643    }
1644}
1645
1646impl ServerInfo {
1647    /// Decode an INFO token from bytes.
1648    ///
1649    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1650    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1651        if src.remaining() < 2 {
1652            return Err(ProtocolError::UnexpectedEof);
1653        }
1654
1655        let _length = src.get_u16_le();
1656
1657        if src.remaining() < 6 {
1658            return Err(ProtocolError::UnexpectedEof);
1659        }
1660
1661        let number = src.get_i32_le();
1662        let state = src.get_u8();
1663        let class = src.get_u8();
1664
1665        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1666        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1667        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1668
1669        if src.remaining() < 4 {
1670            return Err(ProtocolError::UnexpectedEof);
1671        }
1672        let line = src.get_i32_le();
1673
1674        Ok(Self {
1675            number,
1676            state,
1677            class,
1678            message,
1679            server,
1680            procedure,
1681            line,
1682        })
1683    }
1684}
1685
1686impl LoginAck {
1687    /// Decode a LOGINACK token from bytes.
1688    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1689        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1690        if src.remaining() < 2 {
1691            return Err(ProtocolError::UnexpectedEof);
1692        }
1693
1694        let _length = src.get_u16_le();
1695
1696        if src.remaining() < 5 {
1697            return Err(ProtocolError::UnexpectedEof);
1698        }
1699
1700        let interface = src.get_u8();
1701        let tds_version = src.get_u32_le();
1702        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1703
1704        if src.remaining() < 4 {
1705            return Err(ProtocolError::UnexpectedEof);
1706        }
1707        let prog_version = src.get_u32_le();
1708
1709        Ok(Self {
1710            interface,
1711            tds_version,
1712            prog_name,
1713            prog_version,
1714        })
1715    }
1716
1717    /// Get the TDS version as a `TdsVersion`.
1718    #[must_use]
1719    pub fn tds_version(&self) -> crate::version::TdsVersion {
1720        crate::version::TdsVersion::new(self.tds_version)
1721    }
1722}
1723
1724impl EnvChangeType {
1725    /// Create from raw byte value.
1726    pub fn from_u8(value: u8) -> Option<Self> {
1727        match value {
1728            1 => Some(Self::Database),
1729            2 => Some(Self::Language),
1730            3 => Some(Self::CharacterSet),
1731            4 => Some(Self::PacketSize),
1732            5 => Some(Self::UnicodeSortingLocalId),
1733            6 => Some(Self::UnicodeComparisonFlags),
1734            7 => Some(Self::SqlCollation),
1735            8 => Some(Self::BeginTransaction),
1736            9 => Some(Self::CommitTransaction),
1737            10 => Some(Self::RollbackTransaction),
1738            11 => Some(Self::EnlistDtcTransaction),
1739            12 => Some(Self::DefectTransaction),
1740            13 => Some(Self::RealTimeLogShipping),
1741            15 => Some(Self::PromoteTransaction),
1742            16 => Some(Self::TransactionManagerAddress),
1743            17 => Some(Self::TransactionEnded),
1744            18 => Some(Self::ResetConnectionCompletionAck),
1745            19 => Some(Self::UserInstanceStarted),
1746            20 => Some(Self::Routing),
1747            _ => None,
1748        }
1749    }
1750}
1751
1752impl EnvChange {
1753    /// Decode an ENVCHANGE token from bytes.
1754    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1755        if src.remaining() < 3 {
1756            return Err(ProtocolError::UnexpectedEof);
1757        }
1758
1759        let length = src.get_u16_le() as usize;
1760        if src.remaining() < length {
1761            return Err(ProtocolError::IncompletePacket {
1762                expected: length,
1763                actual: src.remaining(),
1764            });
1765        }
1766
1767        let env_type_byte = src.get_u8();
1768        let env_type = EnvChangeType::from_u8(env_type_byte)
1769            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
1770
1771        let (new_value, old_value) = match env_type {
1772            EnvChangeType::Routing => {
1773                // Routing has special format
1774                let new_value = Self::decode_routing_value(src)?;
1775                let old_value = EnvChangeValue::Binary(Bytes::new());
1776                (new_value, old_value)
1777            }
1778            EnvChangeType::BeginTransaction
1779            | EnvChangeType::CommitTransaction
1780            | EnvChangeType::RollbackTransaction
1781            | EnvChangeType::EnlistDtcTransaction
1782            | EnvChangeType::SqlCollation => {
1783                // These use binary format per MS-TDS spec:
1784                // - Transaction tokens: transaction descriptor (8 bytes)
1785                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
1786                let new_len = src.get_u8() as usize;
1787                let new_value = if new_len > 0 && src.remaining() >= new_len {
1788                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
1789                } else {
1790                    EnvChangeValue::Binary(Bytes::new())
1791                };
1792
1793                let old_len = src.get_u8() as usize;
1794                let old_value = if old_len > 0 && src.remaining() >= old_len {
1795                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
1796                } else {
1797                    EnvChangeValue::Binary(Bytes::new())
1798                };
1799
1800                (new_value, old_value)
1801            }
1802            _ => {
1803                // String format for most env changes
1804                let new_value = read_b_varchar(src)
1805                    .map(EnvChangeValue::String)
1806                    .unwrap_or(EnvChangeValue::String(String::new()));
1807
1808                let old_value = read_b_varchar(src)
1809                    .map(EnvChangeValue::String)
1810                    .unwrap_or(EnvChangeValue::String(String::new()));
1811
1812                (new_value, old_value)
1813            }
1814        };
1815
1816        Ok(Self {
1817            env_type,
1818            new_value,
1819            old_value,
1820        })
1821    }
1822
1823    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
1824        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
1825        if src.remaining() < 2 {
1826            return Err(ProtocolError::UnexpectedEof);
1827        }
1828
1829        let _routing_len = src.get_u16_le();
1830
1831        if src.remaining() < 5 {
1832            return Err(ProtocolError::UnexpectedEof);
1833        }
1834
1835        let _protocol = src.get_u8();
1836        let port = src.get_u16_le();
1837        let server_len = src.get_u16_le() as usize;
1838
1839        // Read UTF-16LE server name
1840        if src.remaining() < server_len * 2 {
1841            return Err(ProtocolError::UnexpectedEof);
1842        }
1843
1844        let mut chars = Vec::with_capacity(server_len);
1845        for _ in 0..server_len {
1846            chars.push(src.get_u16_le());
1847        }
1848
1849        let host = String::from_utf16(&chars).map_err(|_| {
1850            ProtocolError::StringEncoding(
1851                #[cfg(feature = "std")]
1852                "invalid UTF-16 in routing hostname".to_string(),
1853                #[cfg(not(feature = "std"))]
1854                "invalid UTF-16 in routing hostname",
1855            )
1856        })?;
1857
1858        Ok(EnvChangeValue::Routing { host, port })
1859    }
1860
1861    /// Check if this is a routing redirect.
1862    #[must_use]
1863    pub fn is_routing(&self) -> bool {
1864        self.env_type == EnvChangeType::Routing
1865    }
1866
1867    /// Get routing information if this is a routing change.
1868    #[must_use]
1869    pub fn routing_info(&self) -> Option<(&str, u16)> {
1870        if let EnvChangeValue::Routing { host, port } = &self.new_value {
1871            Some((host, *port))
1872        } else {
1873            None
1874        }
1875    }
1876
1877    /// Get the new database name if this is a database change.
1878    #[must_use]
1879    pub fn new_database(&self) -> Option<&str> {
1880        if self.env_type == EnvChangeType::Database {
1881            if let EnvChangeValue::String(s) = &self.new_value {
1882                return Some(s);
1883            }
1884        }
1885        None
1886    }
1887}
1888
1889impl Order {
1890    /// Decode an ORDER token from bytes.
1891    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1892        if src.remaining() < 2 {
1893            return Err(ProtocolError::UnexpectedEof);
1894        }
1895
1896        let length = src.get_u16_le() as usize;
1897        let column_count = length / 2;
1898
1899        if src.remaining() < length {
1900            return Err(ProtocolError::IncompletePacket {
1901                expected: length,
1902                actual: src.remaining(),
1903            });
1904        }
1905
1906        let mut columns = Vec::with_capacity(column_count);
1907        for _ in 0..column_count {
1908            columns.push(src.get_u16_le());
1909        }
1910
1911        Ok(Self { columns })
1912    }
1913}
1914
1915impl FeatureExtAck {
1916    /// Feature terminator byte.
1917    pub const TERMINATOR: u8 = 0xFF;
1918
1919    /// Decode a FEATUREEXTACK token from bytes.
1920    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1921        let mut features = Vec::new();
1922
1923        loop {
1924            if !src.has_remaining() {
1925                return Err(ProtocolError::UnexpectedEof);
1926            }
1927
1928            let feature_id = src.get_u8();
1929            if feature_id == Self::TERMINATOR {
1930                break;
1931            }
1932
1933            if src.remaining() < 4 {
1934                return Err(ProtocolError::UnexpectedEof);
1935            }
1936
1937            let data_len = src.get_u32_le() as usize;
1938
1939            if src.remaining() < data_len {
1940                return Err(ProtocolError::IncompletePacket {
1941                    expected: data_len,
1942                    actual: src.remaining(),
1943                });
1944            }
1945
1946            let data = src.copy_to_bytes(data_len);
1947            features.push(FeatureAck { feature_id, data });
1948        }
1949
1950        Ok(Self { features })
1951    }
1952}
1953
1954impl SspiToken {
1955    /// Decode an SSPI token from bytes.
1956    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1957        if src.remaining() < 2 {
1958            return Err(ProtocolError::UnexpectedEof);
1959        }
1960
1961        let length = src.get_u16_le() as usize;
1962
1963        if src.remaining() < length {
1964            return Err(ProtocolError::IncompletePacket {
1965                expected: length,
1966                actual: src.remaining(),
1967            });
1968        }
1969
1970        let data = src.copy_to_bytes(length);
1971        Ok(Self { data })
1972    }
1973}
1974
1975impl FedAuthInfo {
1976    /// Decode a FEDAUTHINFO token from bytes.
1977    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1978        if src.remaining() < 4 {
1979            return Err(ProtocolError::UnexpectedEof);
1980        }
1981
1982        let _length = src.get_u32_le();
1983
1984        if src.remaining() < 5 {
1985            return Err(ProtocolError::UnexpectedEof);
1986        }
1987
1988        let _count = src.get_u8();
1989
1990        // Read option data
1991        let mut sts_url = String::new();
1992        let mut spn = String::new();
1993
1994        // Parse info options until we have both
1995        while src.has_remaining() {
1996            if src.remaining() < 9 {
1997                break;
1998            }
1999
2000            let info_id = src.get_u8();
2001            let info_len = src.get_u32_le() as usize;
2002            let _info_offset = src.get_u32_le();
2003
2004            if src.remaining() < info_len {
2005                break;
2006            }
2007
2008            // Read UTF-16LE string
2009            let char_count = info_len / 2;
2010            let mut chars = Vec::with_capacity(char_count);
2011            for _ in 0..char_count {
2012                chars.push(src.get_u16_le());
2013            }
2014
2015            if let Ok(value) = String::from_utf16(&chars) {
2016                match info_id {
2017                    0x01 => spn = value,
2018                    0x02 => sts_url = value,
2019                    _ => {}
2020                }
2021            }
2022        }
2023
2024        Ok(Self { sts_url, spn })
2025    }
2026}
2027
2028// =============================================================================
2029// Token Parser
2030// =============================================================================
2031
2032/// Token stream parser.
2033///
2034/// Parses a stream of TDS tokens from a byte buffer.
2035///
2036/// # Basic vs Context-Aware Parsing
2037///
2038/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
2039/// Use [`next_token()`](TokenParser::next_token) for these.
2040///
2041/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
2042/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
2043/// for these.
2044///
2045/// # Example
2046///
2047/// ```rust,ignore
2048/// let mut parser = TokenParser::new(data);
2049/// let mut metadata = None;
2050///
2051/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
2052///     match token {
2053///         Token::ColMetaData(meta) => {
2054///             metadata = Some(meta);
2055///         }
2056///         Token::Row(row) => {
2057///             // Process row using metadata
2058///         }
2059///         Token::Done(done) => {
2060///             if !done.has_more() {
2061///                 break;
2062///             }
2063///         }
2064///         _ => {}
2065///     }
2066/// }
2067/// ```
2068pub struct TokenParser {
2069    data: Bytes,
2070    position: usize,
2071}
2072
2073impl TokenParser {
2074    /// Create a new token parser from bytes.
2075    #[must_use]
2076    pub fn new(data: Bytes) -> Self {
2077        Self { data, position: 0 }
2078    }
2079
2080    /// Get remaining bytes in the buffer.
2081    #[must_use]
2082    pub fn remaining(&self) -> usize {
2083        self.data.len().saturating_sub(self.position)
2084    }
2085
2086    /// Check if there are more bytes to parse.
2087    #[must_use]
2088    pub fn has_remaining(&self) -> bool {
2089        self.position < self.data.len()
2090    }
2091
2092    /// Peek at the next token type without consuming it.
2093    #[must_use]
2094    pub fn peek_token_type(&self) -> Option<TokenType> {
2095        if self.position < self.data.len() {
2096            TokenType::from_u8(self.data[self.position])
2097        } else {
2098            None
2099        }
2100    }
2101
2102    /// Parse the next token from the stream.
2103    ///
2104    /// This method can only parse context-independent tokens. For tokens that
2105    /// require column metadata (ColMetaData, Row, NbcRow), use
2106    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2107    ///
2108    /// Returns `None` if no more tokens are available.
2109    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2110        self.next_token_with_metadata(None)
2111    }
2112
2113    /// Parse the next token with optional column metadata context.
2114    ///
2115    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2116    /// Without metadata, those tokens will return an error.
2117    ///
2118    /// Returns `None` if no more tokens are available.
2119    pub fn next_token_with_metadata(
2120        &mut self,
2121        metadata: Option<&ColMetaData>,
2122    ) -> Result<Option<Token>, ProtocolError> {
2123        if !self.has_remaining() {
2124            return Ok(None);
2125        }
2126
2127        let mut buf = &self.data[self.position..];
2128        let start_pos = self.position;
2129
2130        let token_type_byte = buf.get_u8();
2131        let token_type = TokenType::from_u8(token_type_byte);
2132
2133        let token = match token_type {
2134            Some(TokenType::Done) => {
2135                let done = Done::decode(&mut buf)?;
2136                Token::Done(done)
2137            }
2138            Some(TokenType::DoneProc) => {
2139                let done = DoneProc::decode(&mut buf)?;
2140                Token::DoneProc(done)
2141            }
2142            Some(TokenType::DoneInProc) => {
2143                let done = DoneInProc::decode(&mut buf)?;
2144                Token::DoneInProc(done)
2145            }
2146            Some(TokenType::Error) => {
2147                let error = ServerError::decode(&mut buf)?;
2148                Token::Error(error)
2149            }
2150            Some(TokenType::Info) => {
2151                let info = ServerInfo::decode(&mut buf)?;
2152                Token::Info(info)
2153            }
2154            Some(TokenType::LoginAck) => {
2155                let login_ack = LoginAck::decode(&mut buf)?;
2156                Token::LoginAck(login_ack)
2157            }
2158            Some(TokenType::EnvChange) => {
2159                let env_change = EnvChange::decode(&mut buf)?;
2160                Token::EnvChange(env_change)
2161            }
2162            Some(TokenType::Order) => {
2163                let order = Order::decode(&mut buf)?;
2164                Token::Order(order)
2165            }
2166            Some(TokenType::FeatureExtAck) => {
2167                let ack = FeatureExtAck::decode(&mut buf)?;
2168                Token::FeatureExtAck(ack)
2169            }
2170            Some(TokenType::Sspi) => {
2171                let sspi = SspiToken::decode(&mut buf)?;
2172                Token::Sspi(sspi)
2173            }
2174            Some(TokenType::FedAuthInfo) => {
2175                let info = FedAuthInfo::decode(&mut buf)?;
2176                Token::FedAuthInfo(info)
2177            }
2178            Some(TokenType::ReturnStatus) => {
2179                if buf.remaining() < 4 {
2180                    return Err(ProtocolError::UnexpectedEof);
2181                }
2182                let status = buf.get_i32_le();
2183                Token::ReturnStatus(status)
2184            }
2185            Some(TokenType::ColMetaData) => {
2186                let col_meta = ColMetaData::decode(&mut buf)?;
2187                Token::ColMetaData(col_meta)
2188            }
2189            Some(TokenType::Row) => {
2190                let meta = metadata.ok_or_else(|| {
2191                    ProtocolError::StringEncoding(
2192                        #[cfg(feature = "std")]
2193                        "Row token requires column metadata".to_string(),
2194                        #[cfg(not(feature = "std"))]
2195                        "Row token requires column metadata",
2196                    )
2197                })?;
2198                let row = RawRow::decode(&mut buf, meta)?;
2199                Token::Row(row)
2200            }
2201            Some(TokenType::NbcRow) => {
2202                let meta = metadata.ok_or_else(|| {
2203                    ProtocolError::StringEncoding(
2204                        #[cfg(feature = "std")]
2205                        "NbcRow token requires column metadata".to_string(),
2206                        #[cfg(not(feature = "std"))]
2207                        "NbcRow token requires column metadata",
2208                    )
2209                })?;
2210                let row = NbcRow::decode(&mut buf, meta)?;
2211                Token::NbcRow(row)
2212            }
2213            Some(TokenType::ReturnValue) => {
2214                let ret_val = ReturnValue::decode(&mut buf)?;
2215                Token::ReturnValue(ret_val)
2216            }
2217            Some(TokenType::SessionState) => {
2218                let session = SessionState::decode(&mut buf)?;
2219                Token::SessionState(session)
2220            }
2221            Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2222                // These tokens are rarely used and have complex formats.
2223                // Skip them by reading the length and advancing.
2224                if buf.remaining() < 2 {
2225                    return Err(ProtocolError::UnexpectedEof);
2226                }
2227                let length = buf.get_u16_le() as usize;
2228                if buf.remaining() < length {
2229                    return Err(ProtocolError::IncompletePacket {
2230                        expected: length,
2231                        actual: buf.remaining(),
2232                    });
2233                }
2234                // Skip the data
2235                buf.advance(length);
2236                // Recursively get the next token
2237                self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2238                return self.next_token_with_metadata(metadata);
2239            }
2240            None => {
2241                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2242            }
2243        };
2244
2245        // Update position based on how much was consumed
2246        let consumed = self.data.len() - start_pos - buf.remaining();
2247        self.position = start_pos + consumed;
2248
2249        Ok(Some(token))
2250    }
2251
2252    /// Skip the current token without fully parsing it.
2253    ///
2254    /// This is useful for skipping unknown or uninteresting tokens.
2255    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2256        if !self.has_remaining() {
2257            return Ok(());
2258        }
2259
2260        let token_type_byte = self.data[self.position];
2261        let token_type = TokenType::from_u8(token_type_byte);
2262
2263        // Calculate how many bytes to skip based on token type
2264        let skip_amount = match token_type {
2265            // Fixed-size tokens
2266            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2267                1 + Done::SIZE // token type + 12 bytes
2268            }
2269            Some(TokenType::ReturnStatus) => {
2270                1 + 4 // token type + 4 bytes
2271            }
2272            // Variable-length tokens with 2-byte length prefix
2273            Some(TokenType::Error)
2274            | Some(TokenType::Info)
2275            | Some(TokenType::LoginAck)
2276            | Some(TokenType::EnvChange)
2277            | Some(TokenType::Order)
2278            | Some(TokenType::Sspi)
2279            | Some(TokenType::ColInfo)
2280            | Some(TokenType::TabName)
2281            | Some(TokenType::Offset)
2282            | Some(TokenType::ReturnValue) => {
2283                if self.remaining() < 3 {
2284                    return Err(ProtocolError::UnexpectedEof);
2285                }
2286                let length = u16::from_le_bytes([
2287                    self.data[self.position + 1],
2288                    self.data[self.position + 2],
2289                ]) as usize;
2290                1 + 2 + length // token type + length prefix + data
2291            }
2292            // Tokens with 4-byte length prefix
2293            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2294                if self.remaining() < 5 {
2295                    return Err(ProtocolError::UnexpectedEof);
2296                }
2297                let length = u32::from_le_bytes([
2298                    self.data[self.position + 1],
2299                    self.data[self.position + 2],
2300                    self.data[self.position + 3],
2301                    self.data[self.position + 4],
2302                ]) as usize;
2303                1 + 4 + length
2304            }
2305            // FeatureExtAck has no length prefix - must parse
2306            Some(TokenType::FeatureExtAck) => {
2307                // Parse to find end
2308                let mut buf = &self.data[self.position + 1..];
2309                let _ = FeatureExtAck::decode(&mut buf)?;
2310                self.data.len() - self.position - buf.remaining()
2311            }
2312            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2313            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2314                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2315            }
2316            None => {
2317                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2318            }
2319        };
2320
2321        if self.remaining() < skip_amount {
2322            return Err(ProtocolError::UnexpectedEof);
2323        }
2324
2325        self.position += skip_amount;
2326        Ok(())
2327    }
2328
2329    /// Get the current position in the buffer.
2330    #[must_use]
2331    pub fn position(&self) -> usize {
2332        self.position
2333    }
2334
2335    /// Reset the parser to the beginning.
2336    pub fn reset(&mut self) {
2337        self.position = 0;
2338    }
2339}
2340
2341// =============================================================================
2342// Tests
2343// =============================================================================
2344
2345#[cfg(test)]
2346#[allow(clippy::unwrap_used, clippy::panic)]
2347mod tests {
2348    use super::*;
2349    use bytes::BytesMut;
2350
2351    #[test]
2352    fn test_done_roundtrip() {
2353        let done = Done {
2354            status: DoneStatus {
2355                more: false,
2356                error: false,
2357                in_xact: false,
2358                count: true,
2359                attn: false,
2360                srverror: false,
2361            },
2362            cur_cmd: 193, // SELECT
2363            row_count: 42,
2364        };
2365
2366        let mut buf = BytesMut::new();
2367        done.encode(&mut buf);
2368
2369        // Skip the token type byte
2370        let mut cursor = &buf[1..];
2371        let decoded = Done::decode(&mut cursor).unwrap();
2372
2373        assert_eq!(decoded.status.count, done.status.count);
2374        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2375        assert_eq!(decoded.row_count, done.row_count);
2376    }
2377
2378    #[test]
2379    fn test_done_status_bits() {
2380        let status = DoneStatus {
2381            more: true,
2382            error: true,
2383            in_xact: true,
2384            count: true,
2385            attn: false,
2386            srverror: false,
2387        };
2388
2389        let bits = status.to_bits();
2390        let restored = DoneStatus::from_bits(bits);
2391
2392        assert_eq!(status.more, restored.more);
2393        assert_eq!(status.error, restored.error);
2394        assert_eq!(status.in_xact, restored.in_xact);
2395        assert_eq!(status.count, restored.count);
2396    }
2397
2398    #[test]
2399    fn test_token_parser_done() {
2400        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2401        let data = Bytes::from_static(&[
2402            0xFD, // DONE token type
2403            0x10, 0x00, // status: DONE_COUNT
2404            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2405            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2406        ]);
2407
2408        let mut parser = TokenParser::new(data);
2409        let token = parser.next_token().unwrap().unwrap();
2410
2411        match token {
2412            Token::Done(done) => {
2413                assert!(done.status.count);
2414                assert!(!done.status.more);
2415                assert_eq!(done.cur_cmd, 193);
2416                assert_eq!(done.row_count, 5);
2417            }
2418            _ => panic!("Expected Done token"),
2419        }
2420
2421        // No more tokens
2422        assert!(parser.next_token().unwrap().is_none());
2423    }
2424
2425    #[test]
2426    fn test_env_change_type_from_u8() {
2427        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2428        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2429        assert_eq!(EnvChangeType::from_u8(100), None);
2430    }
2431
2432    #[test]
2433    fn test_colmetadata_no_columns() {
2434        // No metadata marker (0xFFFF)
2435        let data = Bytes::from_static(&[0xFF, 0xFF]);
2436        let mut cursor: &[u8] = &data;
2437        let meta = ColMetaData::decode(&mut cursor).unwrap();
2438        assert!(meta.is_empty());
2439        assert_eq!(meta.column_count(), 0);
2440    }
2441
2442    #[test]
2443    fn test_colmetadata_single_int_column() {
2444        // COLMETADATA with 1 INT column
2445        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
2446        let mut data = BytesMut::new();
2447        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2448        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2449        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2450        data.extend_from_slice(&[0x38]); // TypeId::Int4
2451        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
2452        data.extend_from_slice(&[0x02]); // 2 characters
2453        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
2454
2455        let mut cursor: &[u8] = &data;
2456        let meta = ColMetaData::decode(&mut cursor).unwrap();
2457
2458        assert_eq!(meta.column_count(), 1);
2459        assert_eq!(meta.columns[0].name, "id");
2460        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2461        assert!(meta.columns[0].is_nullable());
2462    }
2463
2464    #[test]
2465    fn test_colmetadata_nvarchar_column() {
2466        // COLMETADATA with 1 NVARCHAR(50) column
2467        let mut data = BytesMut::new();
2468        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2469        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2470        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2471        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
2472        // Type info: max_length (2 bytes) + collation (5 bytes)
2473        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
2474        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
2475        // Column name "name"
2476        data.extend_from_slice(&[0x04]); // 4 characters
2477        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
2478
2479        let mut cursor: &[u8] = &data;
2480        let meta = ColMetaData::decode(&mut cursor).unwrap();
2481
2482        assert_eq!(meta.column_count(), 1);
2483        assert_eq!(meta.columns[0].name, "name");
2484        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
2485        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
2486        assert!(meta.columns[0].type_info.collation.is_some());
2487    }
2488
2489    #[test]
2490    fn test_raw_row_decode_int() {
2491        // Create metadata for a single INT column
2492        let metadata = ColMetaData {
2493            columns: vec![ColumnData {
2494                name: "id".to_string(),
2495                type_id: TypeId::Int4,
2496                col_type: 0x38,
2497                flags: 0,
2498                user_type: 0,
2499                type_info: TypeInfo::default(),
2500            }],
2501        };
2502
2503        // Row data: just 4 bytes for the int value 42
2504        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
2505        let mut cursor: &[u8] = &data;
2506        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2507
2508        // The raw data should contain the 4 bytes
2509        assert_eq!(row.data.len(), 4);
2510        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
2511    }
2512
2513    #[test]
2514    fn test_raw_row_decode_nullable_int() {
2515        // Create metadata for a nullable INT column (IntN)
2516        let metadata = ColMetaData {
2517            columns: vec![ColumnData {
2518                name: "id".to_string(),
2519                type_id: TypeId::IntN,
2520                col_type: 0x26,
2521                flags: 0x01, // nullable
2522                user_type: 0,
2523                type_info: TypeInfo {
2524                    max_length: Some(4),
2525                    ..Default::default()
2526                },
2527            }],
2528        };
2529
2530        // Row data with value: 1 byte length + 4 bytes value
2531        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
2532        let mut cursor: &[u8] = &data;
2533        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2534
2535        assert_eq!(row.data.len(), 5);
2536        assert_eq!(row.data[0], 4); // length
2537        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
2538    }
2539
2540    #[test]
2541    fn test_raw_row_decode_null_value() {
2542        // Create metadata for a nullable INT column (IntN)
2543        let metadata = ColMetaData {
2544            columns: vec![ColumnData {
2545                name: "id".to_string(),
2546                type_id: TypeId::IntN,
2547                col_type: 0x26,
2548                flags: 0x01, // nullable
2549                user_type: 0,
2550                type_info: TypeInfo {
2551                    max_length: Some(4),
2552                    ..Default::default()
2553                },
2554            }],
2555        };
2556
2557        // NULL value: length = 0xFF (for bytelen types)
2558        let data = Bytes::from_static(&[0xFF]);
2559        let mut cursor: &[u8] = &data;
2560        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2561
2562        assert_eq!(row.data.len(), 1);
2563        assert_eq!(row.data[0], 0xFF); // NULL marker
2564    }
2565
2566    #[test]
2567    fn test_nbcrow_null_bitmap() {
2568        let row = NbcRow {
2569            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
2570            data: Bytes::new(),
2571        };
2572
2573        assert!(row.is_null(0));
2574        assert!(!row.is_null(1));
2575        assert!(row.is_null(2));
2576        assert!(!row.is_null(3));
2577    }
2578
2579    #[test]
2580    fn test_token_parser_colmetadata() {
2581        // Build a COLMETADATA token with 1 INT column
2582        let mut data = BytesMut::new();
2583        data.extend_from_slice(&[0x81]); // COLMETADATA token type
2584        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2585        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2586        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2587        data.extend_from_slice(&[0x38]); // TypeId::Int4
2588        data.extend_from_slice(&[0x02]); // column name length
2589        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
2590
2591        let mut parser = TokenParser::new(data.freeze());
2592        let token = parser.next_token().unwrap().unwrap();
2593
2594        match token {
2595            Token::ColMetaData(meta) => {
2596                assert_eq!(meta.column_count(), 1);
2597                assert_eq!(meta.columns[0].name, "id");
2598                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2599            }
2600            _ => panic!("Expected ColMetaData token"),
2601        }
2602    }
2603
2604    #[test]
2605    fn test_token_parser_row_with_metadata() {
2606        // Build metadata
2607        let metadata = ColMetaData {
2608            columns: vec![ColumnData {
2609                name: "id".to_string(),
2610                type_id: TypeId::Int4,
2611                col_type: 0x38,
2612                flags: 0,
2613                user_type: 0,
2614                type_info: TypeInfo::default(),
2615            }],
2616        };
2617
2618        // Build ROW token
2619        let mut data = BytesMut::new();
2620        data.extend_from_slice(&[0xD1]); // ROW token type
2621        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2622
2623        let mut parser = TokenParser::new(data.freeze());
2624        let token = parser
2625            .next_token_with_metadata(Some(&metadata))
2626            .unwrap()
2627            .unwrap();
2628
2629        match token {
2630            Token::Row(row) => {
2631                assert_eq!(row.data.len(), 4);
2632            }
2633            _ => panic!("Expected Row token"),
2634        }
2635    }
2636
2637    #[test]
2638    fn test_token_parser_row_without_metadata_fails() {
2639        // Build ROW token
2640        let mut data = BytesMut::new();
2641        data.extend_from_slice(&[0xD1]); // ROW token type
2642        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2643
2644        let mut parser = TokenParser::new(data.freeze());
2645        let result = parser.next_token(); // No metadata provided
2646
2647        assert!(result.is_err());
2648    }
2649
2650    #[test]
2651    fn test_token_parser_peek() {
2652        let data = Bytes::from_static(&[
2653            0xFD, // DONE token type
2654            0x10, 0x00, // status
2655            0xC1, 0x00, // cur_cmd
2656            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
2657        ]);
2658
2659        let parser = TokenParser::new(data);
2660        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
2661    }
2662
2663    #[test]
2664    fn test_column_data_fixed_size() {
2665        let col = ColumnData {
2666            name: String::new(),
2667            type_id: TypeId::Int4,
2668            col_type: 0x38,
2669            flags: 0,
2670            user_type: 0,
2671            type_info: TypeInfo::default(),
2672        };
2673        assert_eq!(col.fixed_size(), Some(4));
2674
2675        let col2 = ColumnData {
2676            name: String::new(),
2677            type_id: TypeId::NVarChar,
2678            col_type: 0xE7,
2679            flags: 0,
2680            user_type: 0,
2681            type_info: TypeInfo::default(),
2682        };
2683        assert_eq!(col2.fixed_size(), None);
2684    }
2685
2686    // ========================================================================
2687    // End-to-End Decode Tests (Wire → Stored → Verification)
2688    // ========================================================================
2689    //
2690    // These tests verify that RawRow::decode_column_value correctly stores
2691    // column values in a format that can be parsed back.
2692
2693    #[test]
2694    fn test_decode_nvarchar_then_intn_roundtrip() {
2695        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
2696        // This tests the scenario from the MCP parameterized query
2697
2698        // Build wire data (what the server sends)
2699        let mut wire_data = BytesMut::new();
2700
2701        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
2702        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
2703        let word = "World";
2704        let utf16: Vec<u16> = word.encode_utf16().collect();
2705        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
2706        for code_unit in &utf16 {
2707            wire_data.put_u16_le(*code_unit);
2708        }
2709
2710        // Column 1: IntN 42 - 1-byte length prefix
2711        wire_data.put_u8(4); // 4 bytes for INT
2712        wire_data.put_i32_le(42);
2713
2714        // Build column metadata
2715        let metadata = ColMetaData {
2716            columns: vec![
2717                ColumnData {
2718                    name: "greeting".to_string(),
2719                    type_id: TypeId::NVarChar,
2720                    col_type: 0xE7,
2721                    flags: 0x01,
2722                    user_type: 0,
2723                    type_info: TypeInfo {
2724                        max_length: Some(10), // non-MAX
2725                        precision: None,
2726                        scale: None,
2727                        collation: None,
2728                    },
2729                },
2730                ColumnData {
2731                    name: "number".to_string(),
2732                    type_id: TypeId::IntN,
2733                    col_type: 0x26,
2734                    flags: 0x01,
2735                    user_type: 0,
2736                    type_info: TypeInfo {
2737                        max_length: Some(4),
2738                        precision: None,
2739                        scale: None,
2740                        collation: None,
2741                    },
2742                },
2743            ],
2744        };
2745
2746        // Decode the wire data into stored format
2747        let mut wire_cursor = wire_data.freeze();
2748        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2749
2750        // Verify wire data was fully consumed
2751        assert_eq!(
2752            wire_cursor.remaining(),
2753            0,
2754            "wire data should be fully consumed"
2755        );
2756
2757        // Now parse the stored data
2758        let mut stored_cursor: &[u8] = &raw_row.data;
2759
2760        // Parse column 0 (NVarChar)
2761        // Stored format for non-MAX NVarChar: [2-byte len][data]
2762        assert!(
2763            stored_cursor.remaining() >= 2,
2764            "need at least 2 bytes for length"
2765        );
2766        let len0 = stored_cursor.get_u16_le() as usize;
2767        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
2768        assert!(
2769            stored_cursor.remaining() >= len0,
2770            "need {len0} bytes for data"
2771        );
2772
2773        // Read UTF-16LE and convert to string
2774        let mut utf16_read = Vec::new();
2775        for _ in 0..(len0 / 2) {
2776            utf16_read.push(stored_cursor.get_u16_le());
2777        }
2778        let string0 = String::from_utf16(&utf16_read).unwrap();
2779        assert_eq!(string0, "World", "column 0 should be 'World'");
2780
2781        // Parse column 1 (IntN)
2782        // Stored format for IntN: [1-byte len][data]
2783        assert!(
2784            stored_cursor.remaining() >= 1,
2785            "need at least 1 byte for length"
2786        );
2787        let len1 = stored_cursor.get_u8();
2788        assert_eq!(len1, 4, "IntN length should be 4");
2789        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
2790        let int1 = stored_cursor.get_i32_le();
2791        assert_eq!(int1, 42, "column 1 should be 42");
2792
2793        // Verify stored data was fully consumed
2794        assert_eq!(
2795            stored_cursor.remaining(),
2796            0,
2797            "stored data should be fully consumed"
2798        );
2799    }
2800
2801    #[test]
2802    fn test_decode_nvarchar_max_then_intn_roundtrip() {
2803        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
2804
2805        // Build wire data for PLP NVARCHAR(MAX) + IntN
2806        let mut wire_data = BytesMut::new();
2807
2808        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
2809        // PLP: 8-byte total length, then chunks
2810        let word = "Hello";
2811        let utf16: Vec<u16> = word.encode_utf16().collect();
2812        let byte_len = (utf16.len() * 2) as u64;
2813
2814        wire_data.put_u64_le(byte_len); // total length = 10
2815        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
2816        for code_unit in &utf16 {
2817            wire_data.put_u16_le(*code_unit);
2818        }
2819        wire_data.put_u32_le(0); // terminating zero-length chunk
2820
2821        // Column 1: IntN 99
2822        wire_data.put_u8(4);
2823        wire_data.put_i32_le(99);
2824
2825        // Build metadata with MAX type
2826        let metadata = ColMetaData {
2827            columns: vec![
2828                ColumnData {
2829                    name: "text".to_string(),
2830                    type_id: TypeId::NVarChar,
2831                    col_type: 0xE7,
2832                    flags: 0x01,
2833                    user_type: 0,
2834                    type_info: TypeInfo {
2835                        max_length: Some(0xFFFF), // MAX indicator
2836                        precision: None,
2837                        scale: None,
2838                        collation: None,
2839                    },
2840                },
2841                ColumnData {
2842                    name: "num".to_string(),
2843                    type_id: TypeId::IntN,
2844                    col_type: 0x26,
2845                    flags: 0x01,
2846                    user_type: 0,
2847                    type_info: TypeInfo {
2848                        max_length: Some(4),
2849                        precision: None,
2850                        scale: None,
2851                        collation: None,
2852                    },
2853                },
2854            ],
2855        };
2856
2857        // Decode wire data
2858        let mut wire_cursor = wire_data.freeze();
2859        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2860
2861        // Verify wire data was fully consumed
2862        assert_eq!(
2863            wire_cursor.remaining(),
2864            0,
2865            "wire data should be fully consumed"
2866        );
2867
2868        // Parse stored PLP data for column 0
2869        let mut stored_cursor: &[u8] = &raw_row.data;
2870
2871        // PLP stored format: [8-byte total][chunks...][4-byte 0]
2872        let total_len = stored_cursor.get_u64_le();
2873        assert_eq!(total_len, 10, "PLP total length should be 10");
2874
2875        let chunk_len = stored_cursor.get_u32_le();
2876        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
2877
2878        let mut utf16_read = Vec::new();
2879        for _ in 0..(chunk_len / 2) {
2880            utf16_read.push(stored_cursor.get_u16_le());
2881        }
2882        let string0 = String::from_utf16(&utf16_read).unwrap();
2883        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
2884
2885        let terminator = stored_cursor.get_u32_le();
2886        assert_eq!(terminator, 0, "PLP should end with 0");
2887
2888        // Parse IntN
2889        let len1 = stored_cursor.get_u8();
2890        assert_eq!(len1, 4);
2891        let int1 = stored_cursor.get_i32_le();
2892        assert_eq!(int1, 99, "column 1 should be 99");
2893
2894        // Verify fully consumed
2895        assert_eq!(
2896            stored_cursor.remaining(),
2897            0,
2898            "stored data should be fully consumed"
2899        );
2900    }
2901}