tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,ignore
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! let data: Bytes = /* received from server */;
19//! let mut parser = TokenParser::new(data);
20//!
21//! while let Some(token) = parser.next_token()? {
22//!     match token {
23//!         Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!         Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!         _ => {}
26//!     }
27//! }
28//! ```
29
30use bytes::{Buf, BufMut, Bytes};
31
32use crate::codec::{read_b_varchar, read_us_varchar};
33use crate::error::ProtocolError;
34use crate::prelude::*;
35use crate::types::TypeId;
36
37/// Token type identifier.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39#[repr(u8)]
40pub enum TokenType {
41    /// Column metadata (COLMETADATA).
42    ColMetaData = 0x81,
43    /// Error message (ERROR).
44    Error = 0xAA,
45    /// Informational message (INFO).
46    Info = 0xAB,
47    /// Login acknowledgment (LOGINACK).
48    LoginAck = 0xAD,
49    /// Row data (ROW).
50    Row = 0xD1,
51    /// Null bitmap compressed row (NBCROW).
52    NbcRow = 0xD2,
53    /// Environment change (ENVCHANGE).
54    EnvChange = 0xE3,
55    /// SSPI authentication (SSPI).
56    Sspi = 0xED,
57    /// Done (DONE).
58    Done = 0xFD,
59    /// Done in procedure (DONEINPROC).
60    DoneInProc = 0xFF,
61    /// Done procedure (DONEPROC).
62    DoneProc = 0xFE,
63    /// Return status (RETURNSTATUS).
64    ReturnStatus = 0x79,
65    /// Return value (RETURNVALUE).
66    ReturnValue = 0xAC,
67    /// Order (ORDER).
68    Order = 0xA9,
69    /// Feature extension acknowledgment (FEATUREEXTACK).
70    FeatureExtAck = 0xAE,
71    /// Session state (SESSIONSTATE).
72    SessionState = 0xE4,
73    /// Federated authentication info (FEDAUTHINFO).
74    FedAuthInfo = 0xEE,
75    /// Column info (COLINFO).
76    ColInfo = 0xA5,
77    /// Table name (TABNAME).
78    TabName = 0xA4,
79    /// Offset (OFFSET).
80    Offset = 0x78,
81}
82
83impl TokenType {
84    /// Create a token type from a raw byte.
85    pub fn from_u8(value: u8) -> Option<Self> {
86        match value {
87            0x81 => Some(Self::ColMetaData),
88            0xAA => Some(Self::Error),
89            0xAB => Some(Self::Info),
90            0xAD => Some(Self::LoginAck),
91            0xD1 => Some(Self::Row),
92            0xD2 => Some(Self::NbcRow),
93            0xE3 => Some(Self::EnvChange),
94            0xED => Some(Self::Sspi),
95            0xFD => Some(Self::Done),
96            0xFF => Some(Self::DoneInProc),
97            0xFE => Some(Self::DoneProc),
98            0x79 => Some(Self::ReturnStatus),
99            0xAC => Some(Self::ReturnValue),
100            0xA9 => Some(Self::Order),
101            0xAE => Some(Self::FeatureExtAck),
102            0xE4 => Some(Self::SessionState),
103            0xEE => Some(Self::FedAuthInfo),
104            0xA5 => Some(Self::ColInfo),
105            0xA4 => Some(Self::TabName),
106            0x78 => Some(Self::Offset),
107            _ => None,
108        }
109    }
110}
111
112/// Parsed TDS token.
113///
114/// This enum represents all possible tokens that can be received from SQL Server.
115/// Each variant contains the parsed token data.
116#[derive(Debug, Clone)]
117pub enum Token {
118    /// Column metadata describing result set structure.
119    ColMetaData(ColMetaData),
120    /// Row data.
121    Row(RawRow),
122    /// Null bitmap compressed row.
123    NbcRow(NbcRow),
124    /// Completion of a SQL statement.
125    Done(Done),
126    /// Completion of a stored procedure.
127    DoneProc(DoneProc),
128    /// Completion within a stored procedure.
129    DoneInProc(DoneInProc),
130    /// Return status from stored procedure.
131    ReturnStatus(i32),
132    /// Return value from stored procedure.
133    ReturnValue(ReturnValue),
134    /// Error message from server.
135    Error(ServerError),
136    /// Informational message from server.
137    Info(ServerInfo),
138    /// Login acknowledgment.
139    LoginAck(LoginAck),
140    /// Environment change notification.
141    EnvChange(EnvChange),
142    /// Column ordering information.
143    Order(Order),
144    /// Feature extension acknowledgment.
145    FeatureExtAck(FeatureExtAck),
146    /// SSPI authentication data.
147    Sspi(SspiToken),
148    /// Session state information.
149    SessionState(SessionState),
150    /// Federated authentication info.
151    FedAuthInfo(FedAuthInfo),
152}
153
154/// Column metadata token.
155#[derive(Debug, Clone, Default)]
156pub struct ColMetaData {
157    /// Column definitions.
158    pub columns: Vec<ColumnData>,
159}
160
161/// Column definition within metadata.
162#[derive(Debug, Clone)]
163pub struct ColumnData {
164    /// Column name.
165    pub name: String,
166    /// Column data type ID.
167    pub type_id: TypeId,
168    /// Column data type raw byte (for unknown types).
169    pub col_type: u8,
170    /// Column flags.
171    pub flags: u16,
172    /// User type ID.
173    pub user_type: u32,
174    /// Type-specific metadata.
175    pub type_info: TypeInfo,
176}
177
178/// Type-specific metadata.
179#[derive(Debug, Clone, Default)]
180pub struct TypeInfo {
181    /// Maximum length for variable-length types.
182    pub max_length: Option<u32>,
183    /// Precision for numeric types.
184    pub precision: Option<u8>,
185    /// Scale for numeric types.
186    pub scale: Option<u8>,
187    /// Collation for string types.
188    pub collation: Option<Collation>,
189}
190
191/// SQL Server collation.
192///
193/// Collations in SQL Server define the character encoding and sorting rules
194/// for string data. For `VARCHAR` columns, the collation determines which
195/// code page (character encoding) is used to store the data.
196///
197/// # Encoding Support
198///
199/// When the `encoding` feature is enabled, the [`Collation::encoding()`] method
200/// returns the appropriate [`encoding_rs::Encoding`] for decoding `VARCHAR` data.
201///
202/// # Example
203///
204/// ```rust,ignore
205/// use tds_protocol::token::Collation;
206///
207/// let collation = Collation { lcid: 0x0804, sort_id: 0 }; // Chinese (PRC)
208/// if let Some(encoding) = collation.encoding() {
209///     let (decoded, _, _) = encoding.decode(raw_bytes);
210///     // decoded is now proper Chinese text
211/// }
212/// ```
213#[derive(Debug, Clone, Copy, Default)]
214pub struct Collation {
215    /// Locale ID (LCID).
216    ///
217    /// The LCID encodes both the language and region. The lower 16 bits
218    /// contain the primary language ID, and bits 16-19 contain the sort ID
219    /// for some collations.
220    ///
221    /// For UTF-8 collations (SQL Server 2019+), bit 27 (0x0800_0000) is set.
222    pub lcid: u32,
223    /// Sort ID.
224    ///
225    /// Used with certain collations to specify sorting behavior.
226    pub sort_id: u8,
227}
228
229impl Collation {
230    /// Returns the character encoding for this collation.
231    ///
232    /// This method maps the collation's LCID to the appropriate character
233    /// encoding from the `encoding_rs` crate.
234    ///
235    /// # Returns
236    ///
237    /// - `Some(&Encoding)` - The encoding to use for decoding `VARCHAR` data
238    /// - `None` - If the collation uses UTF-8 (no transcoding needed) or
239    ///   the LCID is not recognized (caller should use Windows-1252 fallback)
240    ///
241    /// # UTF-8 Collations
242    ///
243    /// SQL Server 2019+ supports UTF-8 collations (identified by the `_UTF8`
244    /// suffix). These return `None` because no transcoding is needed.
245    ///
246    /// # Example
247    ///
248    /// ```rust,ignore
249    /// let collation = Collation { lcid: 0x0419, sort_id: 0 }; // Russian
250    /// if let Some(encoding) = collation.encoding() {
251    ///     // encoding is Windows-1251 for Cyrillic
252    ///     let (text, _, had_errors) = encoding.decode(&raw_bytes);
253    /// }
254    /// ```
255    #[cfg(feature = "encoding")]
256    pub fn encoding(&self) -> Option<&'static encoding_rs::Encoding> {
257        crate::collation::encoding_for_lcid(self.lcid)
258    }
259
260    /// Returns whether this collation uses UTF-8 encoding.
261    ///
262    /// UTF-8 collations were introduced in SQL Server 2019 and are
263    /// identified by the `_UTF8` suffix in the collation name.
264    #[cfg(feature = "encoding")]
265    pub fn is_utf8(&self) -> bool {
266        crate::collation::is_utf8_collation(self.lcid)
267    }
268
269    /// Returns the Windows code page number for this collation.
270    ///
271    /// Useful for error messages and debugging.
272    ///
273    /// # Returns
274    ///
275    /// The code page number (e.g., 1252 for Western European, 932 for Japanese).
276    #[cfg(feature = "encoding")]
277    pub fn code_page(&self) -> Option<u16> {
278        crate::collation::code_page_for_lcid(self.lcid)
279    }
280
281    /// Returns the encoding name for this collation.
282    ///
283    /// Useful for error messages and debugging.
284    #[cfg(feature = "encoding")]
285    pub fn encoding_name(&self) -> &'static str {
286        crate::collation::encoding_name_for_lcid(self.lcid)
287    }
288}
289
290/// Raw row data (not yet decoded).
291#[derive(Debug, Clone)]
292pub struct RawRow {
293    /// Raw column values.
294    pub data: bytes::Bytes,
295}
296
297/// Null bitmap compressed row.
298#[derive(Debug, Clone)]
299pub struct NbcRow {
300    /// Null bitmap.
301    pub null_bitmap: Vec<u8>,
302    /// Raw non-null column values.
303    pub data: bytes::Bytes,
304}
305
306/// Done token indicating statement completion.
307#[derive(Debug, Clone, Copy)]
308pub struct Done {
309    /// Status flags.
310    pub status: DoneStatus,
311    /// Current command.
312    pub cur_cmd: u16,
313    /// Row count (if applicable).
314    pub row_count: u64,
315}
316
317/// Done status flags.
318#[derive(Debug, Clone, Copy, Default)]
319pub struct DoneStatus {
320    /// More results follow.
321    pub more: bool,
322    /// Error occurred.
323    pub error: bool,
324    /// Transaction in progress.
325    pub in_xact: bool,
326    /// Row count is valid.
327    pub count: bool,
328    /// Attention acknowledgment.
329    pub attn: bool,
330    /// Server error caused statement termination.
331    pub srverror: bool,
332}
333
334/// Done in procedure token.
335#[derive(Debug, Clone, Copy)]
336pub struct DoneInProc {
337    /// Status flags.
338    pub status: DoneStatus,
339    /// Current command.
340    pub cur_cmd: u16,
341    /// Row count.
342    pub row_count: u64,
343}
344
345/// Done procedure token.
346#[derive(Debug, Clone, Copy)]
347pub struct DoneProc {
348    /// Status flags.
349    pub status: DoneStatus,
350    /// Current command.
351    pub cur_cmd: u16,
352    /// Row count.
353    pub row_count: u64,
354}
355
356/// Return value from stored procedure.
357#[derive(Debug, Clone)]
358pub struct ReturnValue {
359    /// Parameter ordinal.
360    pub param_ordinal: u16,
361    /// Parameter name.
362    pub param_name: String,
363    /// Status flags.
364    pub status: u8,
365    /// User type.
366    pub user_type: u32,
367    /// Type flags.
368    pub flags: u16,
369    /// Type info.
370    pub type_info: TypeInfo,
371    /// Value data.
372    pub value: bytes::Bytes,
373}
374
375/// Server error message.
376#[derive(Debug, Clone)]
377pub struct ServerError {
378    /// Error number.
379    pub number: i32,
380    /// Error state.
381    pub state: u8,
382    /// Error severity class.
383    pub class: u8,
384    /// Error message text.
385    pub message: String,
386    /// Server name.
387    pub server: String,
388    /// Procedure name.
389    pub procedure: String,
390    /// Line number.
391    pub line: i32,
392}
393
394/// Server informational message.
395#[derive(Debug, Clone)]
396pub struct ServerInfo {
397    /// Info number.
398    pub number: i32,
399    /// Info state.
400    pub state: u8,
401    /// Info class (severity).
402    pub class: u8,
403    /// Info message text.
404    pub message: String,
405    /// Server name.
406    pub server: String,
407    /// Procedure name.
408    pub procedure: String,
409    /// Line number.
410    pub line: i32,
411}
412
413/// Login acknowledgment token.
414#[derive(Debug, Clone)]
415pub struct LoginAck {
416    /// Interface type.
417    pub interface: u8,
418    /// TDS version.
419    pub tds_version: u32,
420    /// Program name.
421    pub prog_name: String,
422    /// Program version.
423    pub prog_version: u32,
424}
425
426/// Environment change token.
427#[derive(Debug, Clone)]
428pub struct EnvChange {
429    /// Type of environment change.
430    pub env_type: EnvChangeType,
431    /// New value.
432    pub new_value: EnvChangeValue,
433    /// Old value.
434    pub old_value: EnvChangeValue,
435}
436
437/// Environment change type.
438#[derive(Debug, Clone, Copy, PartialEq, Eq)]
439#[repr(u8)]
440pub enum EnvChangeType {
441    /// Database changed.
442    Database = 1,
443    /// Language changed.
444    Language = 2,
445    /// Character set changed.
446    CharacterSet = 3,
447    /// Packet size changed.
448    PacketSize = 4,
449    /// Unicode data sorting locale ID.
450    UnicodeSortingLocalId = 5,
451    /// Unicode comparison flags.
452    UnicodeComparisonFlags = 6,
453    /// SQL collation.
454    SqlCollation = 7,
455    /// Begin transaction.
456    BeginTransaction = 8,
457    /// Commit transaction.
458    CommitTransaction = 9,
459    /// Rollback transaction.
460    RollbackTransaction = 10,
461    /// Enlist DTC transaction.
462    EnlistDtcTransaction = 11,
463    /// Defect DTC transaction.
464    DefectTransaction = 12,
465    /// Real-time log shipping.
466    RealTimeLogShipping = 13,
467    /// Promote transaction.
468    PromoteTransaction = 15,
469    /// Transaction manager address.
470    TransactionManagerAddress = 16,
471    /// Transaction ended.
472    TransactionEnded = 17,
473    /// Reset connection completion acknowledgment.
474    ResetConnectionCompletionAck = 18,
475    /// User instance started.
476    UserInstanceStarted = 19,
477    /// Routing information.
478    Routing = 20,
479}
480
481/// Environment change value.
482#[derive(Debug, Clone)]
483pub enum EnvChangeValue {
484    /// String value.
485    String(String),
486    /// Binary value.
487    Binary(bytes::Bytes),
488    /// Routing information.
489    Routing {
490        /// Host name.
491        host: String,
492        /// Port number.
493        port: u16,
494    },
495}
496
497/// Column ordering information.
498#[derive(Debug, Clone)]
499pub struct Order {
500    /// Ordered column indices.
501    pub columns: Vec<u16>,
502}
503
504/// Feature extension acknowledgment.
505#[derive(Debug, Clone)]
506pub struct FeatureExtAck {
507    /// Acknowledged features.
508    pub features: Vec<FeatureAck>,
509}
510
511/// Individual feature acknowledgment.
512#[derive(Debug, Clone)]
513pub struct FeatureAck {
514    /// Feature ID.
515    pub feature_id: u8,
516    /// Feature data.
517    pub data: bytes::Bytes,
518}
519
520/// SSPI authentication token.
521#[derive(Debug, Clone)]
522pub struct SspiToken {
523    /// SSPI data.
524    pub data: bytes::Bytes,
525}
526
527/// Session state token.
528#[derive(Debug, Clone)]
529pub struct SessionState {
530    /// Session state data.
531    pub data: bytes::Bytes,
532}
533
534/// Federated authentication info.
535#[derive(Debug, Clone)]
536pub struct FedAuthInfo {
537    /// STS URL.
538    pub sts_url: String,
539    /// Service principal name.
540    pub spn: String,
541}
542
543// =============================================================================
544// ColMetaData and Row Parsing Implementation
545// =============================================================================
546
547impl ColMetaData {
548    /// Special value indicating no metadata.
549    pub const NO_METADATA: u16 = 0xFFFF;
550
551    /// Decode a COLMETADATA token from bytes.
552    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
553        if src.remaining() < 2 {
554            return Err(ProtocolError::UnexpectedEof);
555        }
556
557        let column_count = src.get_u16_le();
558
559        // 0xFFFF means no metadata present
560        if column_count == Self::NO_METADATA {
561            return Ok(Self {
562                columns: Vec::new(),
563            });
564        }
565
566        let mut columns = Vec::with_capacity(column_count as usize);
567
568        for _ in 0..column_count {
569            let column = Self::decode_column(src)?;
570            columns.push(column);
571        }
572
573        Ok(Self { columns })
574    }
575
576    /// Decode a single column from the metadata.
577    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
578        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
579        if src.remaining() < 7 {
580            return Err(ProtocolError::UnexpectedEof);
581        }
582
583        let user_type = src.get_u32_le();
584        let flags = src.get_u16_le();
585        let col_type = src.get_u8();
586
587        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null); // Default to Null for unknown types
588
589        // Parse type-specific metadata
590        let type_info = Self::decode_type_info(src, type_id, col_type)?;
591
592        // Read column name (B_VARCHAR format - 1 byte length in characters)
593        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
594
595        Ok(ColumnData {
596            name,
597            type_id,
598            col_type,
599            flags,
600            user_type,
601            type_info,
602        })
603    }
604
605    /// Decode type-specific metadata based on the type ID.
606    fn decode_type_info(
607        src: &mut impl Buf,
608        type_id: TypeId,
609        col_type: u8,
610    ) -> Result<TypeInfo, ProtocolError> {
611        match type_id {
612            // Fixed-length types have no additional metadata
613            TypeId::Null => Ok(TypeInfo::default()),
614            TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
615            TypeId::Int2 => Ok(TypeInfo::default()),
616            TypeId::Int4 => Ok(TypeInfo::default()),
617            TypeId::Int8 => Ok(TypeInfo::default()),
618            TypeId::Float4 => Ok(TypeInfo::default()),
619            TypeId::Float8 => Ok(TypeInfo::default()),
620            TypeId::Money => Ok(TypeInfo::default()),
621            TypeId::Money4 => Ok(TypeInfo::default()),
622            TypeId::DateTime => Ok(TypeInfo::default()),
623            TypeId::DateTime4 => Ok(TypeInfo::default()),
624
625            // Variable length integer/float/money (1-byte max length)
626            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
627                if src.remaining() < 1 {
628                    return Err(ProtocolError::UnexpectedEof);
629                }
630                let max_length = src.get_u8() as u32;
631                Ok(TypeInfo {
632                    max_length: Some(max_length),
633                    ..Default::default()
634                })
635            }
636
637            // GUID has 1-byte length
638            TypeId::Guid => {
639                if src.remaining() < 1 {
640                    return Err(ProtocolError::UnexpectedEof);
641                }
642                let max_length = src.get_u8() as u32;
643                Ok(TypeInfo {
644                    max_length: Some(max_length),
645                    ..Default::default()
646                })
647            }
648
649            // Decimal/Numeric types (1-byte length + precision + scale)
650            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
651                if src.remaining() < 3 {
652                    return Err(ProtocolError::UnexpectedEof);
653                }
654                let max_length = src.get_u8() as u32;
655                let precision = src.get_u8();
656                let scale = src.get_u8();
657                Ok(TypeInfo {
658                    max_length: Some(max_length),
659                    precision: Some(precision),
660                    scale: Some(scale),
661                    ..Default::default()
662                })
663            }
664
665            // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
666            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
667                if src.remaining() < 1 {
668                    return Err(ProtocolError::UnexpectedEof);
669                }
670                let max_length = src.get_u8() as u32;
671                Ok(TypeInfo {
672                    max_length: Some(max_length),
673                    ..Default::default()
674                })
675            }
676
677            // Big varchar/binary with 2-byte length + collation for strings
678            TypeId::BigVarChar | TypeId::BigChar => {
679                if src.remaining() < 7 {
680                    // 2 (length) + 5 (collation)
681                    return Err(ProtocolError::UnexpectedEof);
682                }
683                let max_length = src.get_u16_le() as u32;
684                let collation = Self::decode_collation(src)?;
685                Ok(TypeInfo {
686                    max_length: Some(max_length),
687                    collation: Some(collation),
688                    ..Default::default()
689                })
690            }
691
692            // Big binary (2-byte length, no collation)
693            TypeId::BigVarBinary | TypeId::BigBinary => {
694                if src.remaining() < 2 {
695                    return Err(ProtocolError::UnexpectedEof);
696                }
697                let max_length = src.get_u16_le() as u32;
698                Ok(TypeInfo {
699                    max_length: Some(max_length),
700                    ..Default::default()
701                })
702            }
703
704            // Unicode strings (NChar, NVarChar) - 2-byte length + collation
705            TypeId::NChar | TypeId::NVarChar => {
706                if src.remaining() < 7 {
707                    // 2 (length) + 5 (collation)
708                    return Err(ProtocolError::UnexpectedEof);
709                }
710                let max_length = src.get_u16_le() as u32;
711                let collation = Self::decode_collation(src)?;
712                Ok(TypeInfo {
713                    max_length: Some(max_length),
714                    collation: Some(collation),
715                    ..Default::default()
716                })
717            }
718
719            // Date type (no additional metadata)
720            TypeId::Date => Ok(TypeInfo::default()),
721
722            // Time, DateTime2, DateTimeOffset have scale
723            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
724                if src.remaining() < 1 {
725                    return Err(ProtocolError::UnexpectedEof);
726                }
727                let scale = src.get_u8();
728                Ok(TypeInfo {
729                    scale: Some(scale),
730                    ..Default::default()
731                })
732            }
733
734            // Text/NText/Image (deprecated LOB types)
735            TypeId::Text | TypeId::NText | TypeId::Image => {
736                // These have complex metadata: length (4) + collation (5) + table name parts
737                if src.remaining() < 4 {
738                    return Err(ProtocolError::UnexpectedEof);
739                }
740                let max_length = src.get_u32_le();
741
742                // For Text/NText, read collation
743                let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
744                    if src.remaining() < 5 {
745                        return Err(ProtocolError::UnexpectedEof);
746                    }
747                    Some(Self::decode_collation(src)?)
748                } else {
749                    None
750                };
751
752                // Skip table name parts (variable length)
753                // Format: numParts (1 byte) followed by us_varchar for each part
754                if src.remaining() < 1 {
755                    return Err(ProtocolError::UnexpectedEof);
756                }
757                let num_parts = src.get_u8();
758                for _ in 0..num_parts {
759                    // Read and discard table name part
760                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
761                }
762
763                Ok(TypeInfo {
764                    max_length: Some(max_length),
765                    collation,
766                    ..Default::default()
767                })
768            }
769
770            // XML type
771            TypeId::Xml => {
772                if src.remaining() < 1 {
773                    return Err(ProtocolError::UnexpectedEof);
774                }
775                let schema_present = src.get_u8();
776
777                if schema_present != 0 {
778                    // Read schema info (3 us_varchar strings)
779                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
780                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
781                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
782                }
783
784                Ok(TypeInfo::default())
785            }
786
787            // UDT (User-defined type) - complex metadata
788            TypeId::Udt => {
789                // Max length (2 bytes)
790                if src.remaining() < 2 {
791                    return Err(ProtocolError::UnexpectedEof);
792                }
793                let max_length = src.get_u16_le() as u32;
794
795                // UDT metadata: db name, schema name, type name, assembly qualified name
796                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
797                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
798                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
799                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
800
801                Ok(TypeInfo {
802                    max_length: Some(max_length),
803                    ..Default::default()
804                })
805            }
806
807            // Table-valued parameter - complex metadata (skip for now)
808            TypeId::Tvp => {
809                // TVP has very complex metadata, not commonly used
810                // For now, we can't properly parse this
811                Err(ProtocolError::InvalidTokenType(col_type))
812            }
813
814            // SQL Variant - 4-byte length
815            TypeId::Variant => {
816                if src.remaining() < 4 {
817                    return Err(ProtocolError::UnexpectedEof);
818                }
819                let max_length = src.get_u32_le();
820                Ok(TypeInfo {
821                    max_length: Some(max_length),
822                    ..Default::default()
823                })
824            }
825        }
826    }
827
828    /// Decode collation information (5 bytes).
829    fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
830        if src.remaining() < 5 {
831            return Err(ProtocolError::UnexpectedEof);
832        }
833        // Collation: LCID (4 bytes) + Sort ID (1 byte)
834        let lcid = src.get_u32_le();
835        let sort_id = src.get_u8();
836        Ok(Collation { lcid, sort_id })
837    }
838
839    /// Get the number of columns.
840    #[must_use]
841    pub fn column_count(&self) -> usize {
842        self.columns.len()
843    }
844
845    /// Check if this represents no metadata.
846    #[must_use]
847    pub fn is_empty(&self) -> bool {
848        self.columns.is_empty()
849    }
850}
851
852impl ColumnData {
853    /// Check if this column is nullable.
854    #[must_use]
855    pub fn is_nullable(&self) -> bool {
856        (self.flags & 0x0001) != 0
857    }
858
859    /// Get the fixed size in bytes for this column, if applicable.
860    ///
861    /// Returns `None` for variable-length types.
862    #[must_use]
863    pub fn fixed_size(&self) -> Option<usize> {
864        match self.type_id {
865            TypeId::Null => Some(0),
866            TypeId::Int1 | TypeId::Bit => Some(1),
867            TypeId::Int2 => Some(2),
868            TypeId::Int4 => Some(4),
869            TypeId::Int8 => Some(8),
870            TypeId::Float4 => Some(4),
871            TypeId::Float8 => Some(8),
872            TypeId::Money => Some(8),
873            TypeId::Money4 => Some(4),
874            TypeId::DateTime => Some(8),
875            TypeId::DateTime4 => Some(4),
876            TypeId::Date => Some(3),
877            _ => None,
878        }
879    }
880}
881
882// =============================================================================
883// Row Parsing Implementation
884// =============================================================================
885
886impl RawRow {
887    /// Decode a ROW token from bytes.
888    ///
889    /// This function requires the column metadata to know how to parse the row.
890    /// The row data is stored as raw bytes for later parsing.
891    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
892        let mut data = bytes::BytesMut::new();
893
894        for col in &metadata.columns {
895            Self::decode_column_value(src, col, &mut data)?;
896        }
897
898        Ok(Self {
899            data: data.freeze(),
900        })
901    }
902
903    /// Decode a single column value and append to the output buffer.
904    fn decode_column_value(
905        src: &mut impl Buf,
906        col: &ColumnData,
907        dst: &mut bytes::BytesMut,
908    ) -> Result<(), ProtocolError> {
909        match col.type_id {
910            // Fixed-length types
911            TypeId::Null => {
912                // No data
913            }
914            TypeId::Int1 | TypeId::Bit => {
915                if src.remaining() < 1 {
916                    return Err(ProtocolError::UnexpectedEof);
917                }
918                dst.extend_from_slice(&[src.get_u8()]);
919            }
920            TypeId::Int2 => {
921                if src.remaining() < 2 {
922                    return Err(ProtocolError::UnexpectedEof);
923                }
924                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
925            }
926            TypeId::Int4 => {
927                if src.remaining() < 4 {
928                    return Err(ProtocolError::UnexpectedEof);
929                }
930                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
931            }
932            TypeId::Int8 => {
933                if src.remaining() < 8 {
934                    return Err(ProtocolError::UnexpectedEof);
935                }
936                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
937            }
938            TypeId::Float4 => {
939                if src.remaining() < 4 {
940                    return Err(ProtocolError::UnexpectedEof);
941                }
942                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
943            }
944            TypeId::Float8 => {
945                if src.remaining() < 8 {
946                    return Err(ProtocolError::UnexpectedEof);
947                }
948                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
949            }
950            TypeId::Money => {
951                if src.remaining() < 8 {
952                    return Err(ProtocolError::UnexpectedEof);
953                }
954                let hi = src.get_u32_le();
955                let lo = src.get_u32_le();
956                dst.extend_from_slice(&hi.to_le_bytes());
957                dst.extend_from_slice(&lo.to_le_bytes());
958            }
959            TypeId::Money4 => {
960                if src.remaining() < 4 {
961                    return Err(ProtocolError::UnexpectedEof);
962                }
963                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
964            }
965            TypeId::DateTime => {
966                if src.remaining() < 8 {
967                    return Err(ProtocolError::UnexpectedEof);
968                }
969                let days = src.get_u32_le();
970                let time = src.get_u32_le();
971                dst.extend_from_slice(&days.to_le_bytes());
972                dst.extend_from_slice(&time.to_le_bytes());
973            }
974            TypeId::DateTime4 => {
975                if src.remaining() < 4 {
976                    return Err(ProtocolError::UnexpectedEof);
977                }
978                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
979            }
980            // DATE type uses 1-byte length prefix (can be NULL)
981            TypeId::Date => {
982                Self::decode_bytelen_type(src, dst)?;
983            }
984
985            // Variable-length nullable types (length-prefixed)
986            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
987                Self::decode_bytelen_type(src, dst)?;
988            }
989
990            TypeId::Guid => {
991                Self::decode_bytelen_type(src, dst)?;
992            }
993
994            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
995                Self::decode_bytelen_type(src, dst)?;
996            }
997
998            // Old-style byte-length strings
999            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
1000                Self::decode_bytelen_type(src, dst)?;
1001            }
1002
1003            // 2-byte length strings (or PLP for MAX types)
1004            TypeId::BigVarChar | TypeId::BigVarBinary => {
1005                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
1006                if col.type_info.max_length == Some(0xFFFF) {
1007                    Self::decode_plp_type(src, dst)?;
1008                } else {
1009                    Self::decode_ushortlen_type(src, dst)?;
1010                }
1011            }
1012
1013            // Fixed-length types that don't have MAX variants
1014            TypeId::BigChar | TypeId::BigBinary => {
1015                Self::decode_ushortlen_type(src, dst)?;
1016            }
1017
1018            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
1019            TypeId::NVarChar => {
1020                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
1021                if col.type_info.max_length == Some(0xFFFF) {
1022                    Self::decode_plp_type(src, dst)?;
1023                } else {
1024                    Self::decode_ushortlen_type(src, dst)?;
1025                }
1026            }
1027
1028            // Fixed-length NCHAR doesn't have MAX variant
1029            TypeId::NChar => {
1030                Self::decode_ushortlen_type(src, dst)?;
1031            }
1032
1033            // Time types with scale
1034            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
1035                Self::decode_bytelen_type(src, dst)?;
1036            }
1037
1038            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
1039            TypeId::Text | TypeId::NText | TypeId::Image => {
1040                Self::decode_textptr_type(src, dst)?;
1041            }
1042
1043            // XML - uses actual PLP format
1044            TypeId::Xml => {
1045                Self::decode_plp_type(src, dst)?;
1046            }
1047
1048            // Complex types
1049            TypeId::Variant => {
1050                Self::decode_intlen_type(src, dst)?;
1051            }
1052
1053            TypeId::Udt => {
1054                // UDT uses PLP encoding
1055                Self::decode_plp_type(src, dst)?;
1056            }
1057
1058            TypeId::Tvp => {
1059                // TVP not supported in row data
1060                return Err(ProtocolError::InvalidTokenType(col.col_type));
1061            }
1062        }
1063
1064        Ok(())
1065    }
1066
1067    /// Decode a 1-byte length-prefixed value.
1068    fn decode_bytelen_type(
1069        src: &mut impl Buf,
1070        dst: &mut bytes::BytesMut,
1071    ) -> Result<(), ProtocolError> {
1072        if src.remaining() < 1 {
1073            return Err(ProtocolError::UnexpectedEof);
1074        }
1075        let len = src.get_u8() as usize;
1076        if len == 0xFF {
1077            // NULL value - store as zero-length with NULL marker
1078            dst.extend_from_slice(&[0xFF]);
1079        } else if len == 0 {
1080            // Empty value
1081            dst.extend_from_slice(&[0x00]);
1082        } else {
1083            if src.remaining() < len {
1084                return Err(ProtocolError::UnexpectedEof);
1085            }
1086            dst.extend_from_slice(&[len as u8]);
1087            for _ in 0..len {
1088                dst.extend_from_slice(&[src.get_u8()]);
1089            }
1090        }
1091        Ok(())
1092    }
1093
1094    /// Decode a 2-byte length-prefixed value.
1095    fn decode_ushortlen_type(
1096        src: &mut impl Buf,
1097        dst: &mut bytes::BytesMut,
1098    ) -> Result<(), ProtocolError> {
1099        if src.remaining() < 2 {
1100            return Err(ProtocolError::UnexpectedEof);
1101        }
1102        let len = src.get_u16_le() as usize;
1103        if len == 0xFFFF {
1104            // NULL value
1105            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1106        } else if len == 0 {
1107            // Empty value
1108            dst.extend_from_slice(&0u16.to_le_bytes());
1109        } else {
1110            if src.remaining() < len {
1111                return Err(ProtocolError::UnexpectedEof);
1112            }
1113            dst.extend_from_slice(&(len as u16).to_le_bytes());
1114            for _ in 0..len {
1115                dst.extend_from_slice(&[src.get_u8()]);
1116            }
1117        }
1118        Ok(())
1119    }
1120
1121    /// Decode a 4-byte length-prefixed value.
1122    fn decode_intlen_type(
1123        src: &mut impl Buf,
1124        dst: &mut bytes::BytesMut,
1125    ) -> Result<(), ProtocolError> {
1126        if src.remaining() < 4 {
1127            return Err(ProtocolError::UnexpectedEof);
1128        }
1129        let len = src.get_u32_le() as usize;
1130        if len == 0xFFFFFFFF {
1131            // NULL value
1132            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1133        } else if len == 0 {
1134            // Empty value
1135            dst.extend_from_slice(&0u32.to_le_bytes());
1136        } else {
1137            if src.remaining() < len {
1138                return Err(ProtocolError::UnexpectedEof);
1139            }
1140            dst.extend_from_slice(&(len as u32).to_le_bytes());
1141            for _ in 0..len {
1142                dst.extend_from_slice(&[src.get_u8()]);
1143            }
1144        }
1145        Ok(())
1146    }
1147
1148    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1149    ///
1150    /// These deprecated LOB types use a special format:
1151    /// - 1 byte: textptr_len (0 = NULL)
1152    /// - textptr_len bytes: textptr (if not NULL)
1153    /// - 8 bytes: timestamp (if not NULL)
1154    /// - 4 bytes: data length (if not NULL)
1155    /// - data_len bytes: the actual data (if not NULL)
1156    ///
1157    /// We convert this to PLP format for the client to parse:
1158    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1159    /// - 4 bytes: chunk length (= data length)
1160    /// - chunk data
1161    /// - 4 bytes: 0 (terminator)
1162    fn decode_textptr_type(
1163        src: &mut impl Buf,
1164        dst: &mut bytes::BytesMut,
1165    ) -> Result<(), ProtocolError> {
1166        if src.remaining() < 1 {
1167            return Err(ProtocolError::UnexpectedEof);
1168        }
1169
1170        let textptr_len = src.get_u8() as usize;
1171
1172        if textptr_len == 0 {
1173            // NULL value - write PLP NULL marker
1174            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1175            return Ok(());
1176        }
1177
1178        // Skip textptr bytes
1179        if src.remaining() < textptr_len {
1180            return Err(ProtocolError::UnexpectedEof);
1181        }
1182        src.advance(textptr_len);
1183
1184        // Skip 8-byte timestamp
1185        if src.remaining() < 8 {
1186            return Err(ProtocolError::UnexpectedEof);
1187        }
1188        src.advance(8);
1189
1190        // Read data length
1191        if src.remaining() < 4 {
1192            return Err(ProtocolError::UnexpectedEof);
1193        }
1194        let data_len = src.get_u32_le() as usize;
1195
1196        if src.remaining() < data_len {
1197            return Err(ProtocolError::UnexpectedEof);
1198        }
1199
1200        // Write in PLP format for client parsing:
1201        // - 8 bytes: total length
1202        // - 4 bytes: chunk length
1203        // - chunk data
1204        // - 4 bytes: 0 (terminator)
1205        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1206        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1207        for _ in 0..data_len {
1208            dst.extend_from_slice(&[src.get_u8()]);
1209        }
1210        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1211
1212        Ok(())
1213    }
1214
1215    /// Decode a PLP (Partially Length-Prefixed) value.
1216    ///
1217    /// PLP format:
1218    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1219    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1220    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1221        if src.remaining() < 8 {
1222            return Err(ProtocolError::UnexpectedEof);
1223        }
1224
1225        let total_len = src.get_u64_le();
1226
1227        // Store the total length marker
1228        dst.extend_from_slice(&total_len.to_le_bytes());
1229
1230        if total_len == 0xFFFFFFFFFFFFFFFF {
1231            // NULL value - no more data
1232            return Ok(());
1233        }
1234
1235        // Read chunks until terminator
1236        loop {
1237            if src.remaining() < 4 {
1238                return Err(ProtocolError::UnexpectedEof);
1239            }
1240            let chunk_len = src.get_u32_le() as usize;
1241            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1242
1243            if chunk_len == 0 {
1244                // End of PLP data
1245                break;
1246            }
1247
1248            if src.remaining() < chunk_len {
1249                return Err(ProtocolError::UnexpectedEof);
1250            }
1251
1252            for _ in 0..chunk_len {
1253                dst.extend_from_slice(&[src.get_u8()]);
1254            }
1255        }
1256
1257        Ok(())
1258    }
1259}
1260
1261// =============================================================================
1262// NbcRow Parsing Implementation
1263// =============================================================================
1264
1265impl NbcRow {
1266    /// Decode an NBCROW token from bytes.
1267    ///
1268    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1269    /// columns are NULL, followed by only the non-NULL values.
1270    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1271        let col_count = metadata.columns.len();
1272        let bitmap_len = (col_count + 7) / 8;
1273
1274        if src.remaining() < bitmap_len {
1275            return Err(ProtocolError::UnexpectedEof);
1276        }
1277
1278        // Read null bitmap
1279        let mut null_bitmap = vec![0u8; bitmap_len];
1280        for byte in &mut null_bitmap {
1281            *byte = src.get_u8();
1282        }
1283
1284        // Read non-null values
1285        let mut data = bytes::BytesMut::new();
1286
1287        for (i, col) in metadata.columns.iter().enumerate() {
1288            let byte_idx = i / 8;
1289            let bit_idx = i % 8;
1290            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1291
1292            if !is_null {
1293                // Read the value - for NBCROW, we read without the length prefix
1294                // for fixed-length types, and with length prefix for variable types
1295                RawRow::decode_column_value(src, col, &mut data)?;
1296            }
1297        }
1298
1299        Ok(Self {
1300            null_bitmap,
1301            data: data.freeze(),
1302        })
1303    }
1304
1305    /// Check if a column at the given index is NULL.
1306    #[must_use]
1307    pub fn is_null(&self, column_index: usize) -> bool {
1308        let byte_idx = column_index / 8;
1309        let bit_idx = column_index % 8;
1310        if byte_idx < self.null_bitmap.len() {
1311            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1312        } else {
1313            true // Out of bounds = NULL
1314        }
1315    }
1316}
1317
1318// =============================================================================
1319// ReturnValue Parsing Implementation
1320// =============================================================================
1321
1322impl ReturnValue {
1323    /// Decode a RETURNVALUE token from bytes.
1324    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1325        // Length (2 bytes)
1326        if src.remaining() < 2 {
1327            return Err(ProtocolError::UnexpectedEof);
1328        }
1329        let _length = src.get_u16_le();
1330
1331        // Parameter ordinal (2 bytes)
1332        if src.remaining() < 2 {
1333            return Err(ProtocolError::UnexpectedEof);
1334        }
1335        let param_ordinal = src.get_u16_le();
1336
1337        // Parameter name (B_VARCHAR)
1338        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1339
1340        // Status (1 byte)
1341        if src.remaining() < 1 {
1342            return Err(ProtocolError::UnexpectedEof);
1343        }
1344        let status = src.get_u8();
1345
1346        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1347        if src.remaining() < 7 {
1348            return Err(ProtocolError::UnexpectedEof);
1349        }
1350        let user_type = src.get_u32_le();
1351        let flags = src.get_u16_le();
1352        let col_type = src.get_u8();
1353
1354        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
1355
1356        // Parse type info
1357        let type_info = ColMetaData::decode_type_info(src, type_id, col_type)?;
1358
1359        // Read the value data
1360        let mut value_buf = bytes::BytesMut::new();
1361
1362        // Create a temporary column for value parsing
1363        let temp_col = ColumnData {
1364            name: String::new(),
1365            type_id,
1366            col_type,
1367            flags,
1368            user_type,
1369            type_info: type_info.clone(),
1370        };
1371
1372        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1373
1374        Ok(Self {
1375            param_ordinal,
1376            param_name,
1377            status,
1378            user_type,
1379            flags,
1380            type_info,
1381            value: value_buf.freeze(),
1382        })
1383    }
1384}
1385
1386// =============================================================================
1387// SessionState Parsing Implementation
1388// =============================================================================
1389
1390impl SessionState {
1391    /// Decode a SESSIONSTATE token from bytes.
1392    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1393        if src.remaining() < 4 {
1394            return Err(ProtocolError::UnexpectedEof);
1395        }
1396
1397        let length = src.get_u32_le() as usize;
1398
1399        if src.remaining() < length {
1400            return Err(ProtocolError::IncompletePacket {
1401                expected: length,
1402                actual: src.remaining(),
1403            });
1404        }
1405
1406        let data = src.copy_to_bytes(length);
1407
1408        Ok(Self { data })
1409    }
1410}
1411
1412// =============================================================================
1413// Token Parsing Implementation
1414// =============================================================================
1415
1416/// Done token status flags bit positions.
1417mod done_status_bits {
1418    pub const DONE_MORE: u16 = 0x0001;
1419    pub const DONE_ERROR: u16 = 0x0002;
1420    pub const DONE_INXACT: u16 = 0x0004;
1421    pub const DONE_COUNT: u16 = 0x0010;
1422    pub const DONE_ATTN: u16 = 0x0020;
1423    pub const DONE_SRVERROR: u16 = 0x0100;
1424}
1425
1426impl DoneStatus {
1427    /// Parse done status from raw bits.
1428    #[must_use]
1429    pub fn from_bits(bits: u16) -> Self {
1430        use done_status_bits::*;
1431        Self {
1432            more: (bits & DONE_MORE) != 0,
1433            error: (bits & DONE_ERROR) != 0,
1434            in_xact: (bits & DONE_INXACT) != 0,
1435            count: (bits & DONE_COUNT) != 0,
1436            attn: (bits & DONE_ATTN) != 0,
1437            srverror: (bits & DONE_SRVERROR) != 0,
1438        }
1439    }
1440
1441    /// Convert to raw bits.
1442    #[must_use]
1443    pub fn to_bits(&self) -> u16 {
1444        use done_status_bits::*;
1445        let mut bits = 0u16;
1446        if self.more {
1447            bits |= DONE_MORE;
1448        }
1449        if self.error {
1450            bits |= DONE_ERROR;
1451        }
1452        if self.in_xact {
1453            bits |= DONE_INXACT;
1454        }
1455        if self.count {
1456            bits |= DONE_COUNT;
1457        }
1458        if self.attn {
1459            bits |= DONE_ATTN;
1460        }
1461        if self.srverror {
1462            bits |= DONE_SRVERROR;
1463        }
1464        bits
1465    }
1466}
1467
1468impl Done {
1469    /// Size of the DONE token in bytes (excluding token type byte).
1470    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1471
1472    /// Decode a DONE token from bytes.
1473    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1474        if src.remaining() < Self::SIZE {
1475            return Err(ProtocolError::IncompletePacket {
1476                expected: Self::SIZE,
1477                actual: src.remaining(),
1478            });
1479        }
1480
1481        let status = DoneStatus::from_bits(src.get_u16_le());
1482        let cur_cmd = src.get_u16_le();
1483        let row_count = src.get_u64_le();
1484
1485        Ok(Self {
1486            status,
1487            cur_cmd,
1488            row_count,
1489        })
1490    }
1491
1492    /// Encode the DONE token to bytes.
1493    pub fn encode(&self, dst: &mut impl BufMut) {
1494        dst.put_u8(TokenType::Done as u8);
1495        dst.put_u16_le(self.status.to_bits());
1496        dst.put_u16_le(self.cur_cmd);
1497        dst.put_u64_le(self.row_count);
1498    }
1499
1500    /// Check if more results follow this DONE token.
1501    #[must_use]
1502    pub const fn has_more(&self) -> bool {
1503        self.status.more
1504    }
1505
1506    /// Check if an error occurred.
1507    #[must_use]
1508    pub const fn has_error(&self) -> bool {
1509        self.status.error
1510    }
1511
1512    /// Check if the row count is valid.
1513    #[must_use]
1514    pub const fn has_count(&self) -> bool {
1515        self.status.count
1516    }
1517}
1518
1519impl DoneProc {
1520    /// Size of the DONEPROC token in bytes (excluding token type byte).
1521    pub const SIZE: usize = 12;
1522
1523    /// Decode a DONEPROC token from bytes.
1524    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1525        if src.remaining() < Self::SIZE {
1526            return Err(ProtocolError::IncompletePacket {
1527                expected: Self::SIZE,
1528                actual: src.remaining(),
1529            });
1530        }
1531
1532        let status = DoneStatus::from_bits(src.get_u16_le());
1533        let cur_cmd = src.get_u16_le();
1534        let row_count = src.get_u64_le();
1535
1536        Ok(Self {
1537            status,
1538            cur_cmd,
1539            row_count,
1540        })
1541    }
1542
1543    /// Encode the DONEPROC token to bytes.
1544    pub fn encode(&self, dst: &mut impl BufMut) {
1545        dst.put_u8(TokenType::DoneProc as u8);
1546        dst.put_u16_le(self.status.to_bits());
1547        dst.put_u16_le(self.cur_cmd);
1548        dst.put_u64_le(self.row_count);
1549    }
1550}
1551
1552impl DoneInProc {
1553    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1554    pub const SIZE: usize = 12;
1555
1556    /// Decode a DONEINPROC token from bytes.
1557    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1558        if src.remaining() < Self::SIZE {
1559            return Err(ProtocolError::IncompletePacket {
1560                expected: Self::SIZE,
1561                actual: src.remaining(),
1562            });
1563        }
1564
1565        let status = DoneStatus::from_bits(src.get_u16_le());
1566        let cur_cmd = src.get_u16_le();
1567        let row_count = src.get_u64_le();
1568
1569        Ok(Self {
1570            status,
1571            cur_cmd,
1572            row_count,
1573        })
1574    }
1575
1576    /// Encode the DONEINPROC token to bytes.
1577    pub fn encode(&self, dst: &mut impl BufMut) {
1578        dst.put_u8(TokenType::DoneInProc as u8);
1579        dst.put_u16_le(self.status.to_bits());
1580        dst.put_u16_le(self.cur_cmd);
1581        dst.put_u64_le(self.row_count);
1582    }
1583}
1584
1585impl ServerError {
1586    /// Decode an ERROR token from bytes.
1587    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1588        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1589        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1590        if src.remaining() < 2 {
1591            return Err(ProtocolError::UnexpectedEof);
1592        }
1593
1594        let _length = src.get_u16_le();
1595
1596        if src.remaining() < 6 {
1597            return Err(ProtocolError::UnexpectedEof);
1598        }
1599
1600        let number = src.get_i32_le();
1601        let state = src.get_u8();
1602        let class = src.get_u8();
1603
1604        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1605        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1606        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1607
1608        if src.remaining() < 4 {
1609            return Err(ProtocolError::UnexpectedEof);
1610        }
1611        let line = src.get_i32_le();
1612
1613        Ok(Self {
1614            number,
1615            state,
1616            class,
1617            message,
1618            server,
1619            procedure,
1620            line,
1621        })
1622    }
1623
1624    /// Check if this is a fatal error (severity >= 20).
1625    #[must_use]
1626    pub const fn is_fatal(&self) -> bool {
1627        self.class >= 20
1628    }
1629
1630    /// Check if this error indicates the batch was aborted (severity >= 16).
1631    #[must_use]
1632    pub const fn is_batch_abort(&self) -> bool {
1633        self.class >= 16
1634    }
1635}
1636
1637impl ServerInfo {
1638    /// Decode an INFO token from bytes.
1639    ///
1640    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1641    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1642        if src.remaining() < 2 {
1643            return Err(ProtocolError::UnexpectedEof);
1644        }
1645
1646        let _length = src.get_u16_le();
1647
1648        if src.remaining() < 6 {
1649            return Err(ProtocolError::UnexpectedEof);
1650        }
1651
1652        let number = src.get_i32_le();
1653        let state = src.get_u8();
1654        let class = src.get_u8();
1655
1656        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1657        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1658        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1659
1660        if src.remaining() < 4 {
1661            return Err(ProtocolError::UnexpectedEof);
1662        }
1663        let line = src.get_i32_le();
1664
1665        Ok(Self {
1666            number,
1667            state,
1668            class,
1669            message,
1670            server,
1671            procedure,
1672            line,
1673        })
1674    }
1675}
1676
1677impl LoginAck {
1678    /// Decode a LOGINACK token from bytes.
1679    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1680        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1681        if src.remaining() < 2 {
1682            return Err(ProtocolError::UnexpectedEof);
1683        }
1684
1685        let _length = src.get_u16_le();
1686
1687        if src.remaining() < 5 {
1688            return Err(ProtocolError::UnexpectedEof);
1689        }
1690
1691        let interface = src.get_u8();
1692        let tds_version = src.get_u32_le();
1693        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1694
1695        if src.remaining() < 4 {
1696            return Err(ProtocolError::UnexpectedEof);
1697        }
1698        let prog_version = src.get_u32_le();
1699
1700        Ok(Self {
1701            interface,
1702            tds_version,
1703            prog_name,
1704            prog_version,
1705        })
1706    }
1707
1708    /// Get the TDS version as a `TdsVersion`.
1709    #[must_use]
1710    pub fn tds_version(&self) -> crate::version::TdsVersion {
1711        crate::version::TdsVersion::new(self.tds_version)
1712    }
1713}
1714
1715impl EnvChangeType {
1716    /// Create from raw byte value.
1717    pub fn from_u8(value: u8) -> Option<Self> {
1718        match value {
1719            1 => Some(Self::Database),
1720            2 => Some(Self::Language),
1721            3 => Some(Self::CharacterSet),
1722            4 => Some(Self::PacketSize),
1723            5 => Some(Self::UnicodeSortingLocalId),
1724            6 => Some(Self::UnicodeComparisonFlags),
1725            7 => Some(Self::SqlCollation),
1726            8 => Some(Self::BeginTransaction),
1727            9 => Some(Self::CommitTransaction),
1728            10 => Some(Self::RollbackTransaction),
1729            11 => Some(Self::EnlistDtcTransaction),
1730            12 => Some(Self::DefectTransaction),
1731            13 => Some(Self::RealTimeLogShipping),
1732            15 => Some(Self::PromoteTransaction),
1733            16 => Some(Self::TransactionManagerAddress),
1734            17 => Some(Self::TransactionEnded),
1735            18 => Some(Self::ResetConnectionCompletionAck),
1736            19 => Some(Self::UserInstanceStarted),
1737            20 => Some(Self::Routing),
1738            _ => None,
1739        }
1740    }
1741}
1742
1743impl EnvChange {
1744    /// Decode an ENVCHANGE token from bytes.
1745    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1746        if src.remaining() < 3 {
1747            return Err(ProtocolError::UnexpectedEof);
1748        }
1749
1750        let length = src.get_u16_le() as usize;
1751        if src.remaining() < length {
1752            return Err(ProtocolError::IncompletePacket {
1753                expected: length,
1754                actual: src.remaining(),
1755            });
1756        }
1757
1758        let env_type_byte = src.get_u8();
1759        let env_type = EnvChangeType::from_u8(env_type_byte)
1760            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
1761
1762        let (new_value, old_value) = match env_type {
1763            EnvChangeType::Routing => {
1764                // Routing has special format
1765                let new_value = Self::decode_routing_value(src)?;
1766                let old_value = EnvChangeValue::Binary(Bytes::new());
1767                (new_value, old_value)
1768            }
1769            EnvChangeType::BeginTransaction
1770            | EnvChangeType::CommitTransaction
1771            | EnvChangeType::RollbackTransaction
1772            | EnvChangeType::EnlistDtcTransaction
1773            | EnvChangeType::SqlCollation => {
1774                // These use binary format per MS-TDS spec:
1775                // - Transaction tokens: transaction descriptor (8 bytes)
1776                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
1777                let new_len = src.get_u8() as usize;
1778                let new_value = if new_len > 0 && src.remaining() >= new_len {
1779                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
1780                } else {
1781                    EnvChangeValue::Binary(Bytes::new())
1782                };
1783
1784                let old_len = src.get_u8() as usize;
1785                let old_value = if old_len > 0 && src.remaining() >= old_len {
1786                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
1787                } else {
1788                    EnvChangeValue::Binary(Bytes::new())
1789                };
1790
1791                (new_value, old_value)
1792            }
1793            _ => {
1794                // String format for most env changes
1795                let new_value = read_b_varchar(src)
1796                    .map(EnvChangeValue::String)
1797                    .unwrap_or(EnvChangeValue::String(String::new()));
1798
1799                let old_value = read_b_varchar(src)
1800                    .map(EnvChangeValue::String)
1801                    .unwrap_or(EnvChangeValue::String(String::new()));
1802
1803                (new_value, old_value)
1804            }
1805        };
1806
1807        Ok(Self {
1808            env_type,
1809            new_value,
1810            old_value,
1811        })
1812    }
1813
1814    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
1815        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
1816        if src.remaining() < 2 {
1817            return Err(ProtocolError::UnexpectedEof);
1818        }
1819
1820        let _routing_len = src.get_u16_le();
1821
1822        if src.remaining() < 5 {
1823            return Err(ProtocolError::UnexpectedEof);
1824        }
1825
1826        let _protocol = src.get_u8();
1827        let port = src.get_u16_le();
1828        let server_len = src.get_u16_le() as usize;
1829
1830        // Read UTF-16LE server name
1831        if src.remaining() < server_len * 2 {
1832            return Err(ProtocolError::UnexpectedEof);
1833        }
1834
1835        let mut chars = Vec::with_capacity(server_len);
1836        for _ in 0..server_len {
1837            chars.push(src.get_u16_le());
1838        }
1839
1840        let host = String::from_utf16(&chars).map_err(|_| {
1841            ProtocolError::StringEncoding(
1842                #[cfg(feature = "std")]
1843                "invalid UTF-16 in routing hostname".to_string(),
1844                #[cfg(not(feature = "std"))]
1845                "invalid UTF-16 in routing hostname",
1846            )
1847        })?;
1848
1849        Ok(EnvChangeValue::Routing { host, port })
1850    }
1851
1852    /// Check if this is a routing redirect.
1853    #[must_use]
1854    pub fn is_routing(&self) -> bool {
1855        self.env_type == EnvChangeType::Routing
1856    }
1857
1858    /// Get routing information if this is a routing change.
1859    #[must_use]
1860    pub fn routing_info(&self) -> Option<(&str, u16)> {
1861        if let EnvChangeValue::Routing { host, port } = &self.new_value {
1862            Some((host, *port))
1863        } else {
1864            None
1865        }
1866    }
1867
1868    /// Get the new database name if this is a database change.
1869    #[must_use]
1870    pub fn new_database(&self) -> Option<&str> {
1871        if self.env_type == EnvChangeType::Database {
1872            if let EnvChangeValue::String(s) = &self.new_value {
1873                return Some(s);
1874            }
1875        }
1876        None
1877    }
1878}
1879
1880impl Order {
1881    /// Decode an ORDER token from bytes.
1882    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1883        if src.remaining() < 2 {
1884            return Err(ProtocolError::UnexpectedEof);
1885        }
1886
1887        let length = src.get_u16_le() as usize;
1888        let column_count = length / 2;
1889
1890        if src.remaining() < length {
1891            return Err(ProtocolError::IncompletePacket {
1892                expected: length,
1893                actual: src.remaining(),
1894            });
1895        }
1896
1897        let mut columns = Vec::with_capacity(column_count);
1898        for _ in 0..column_count {
1899            columns.push(src.get_u16_le());
1900        }
1901
1902        Ok(Self { columns })
1903    }
1904}
1905
1906impl FeatureExtAck {
1907    /// Feature terminator byte.
1908    pub const TERMINATOR: u8 = 0xFF;
1909
1910    /// Decode a FEATUREEXTACK token from bytes.
1911    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1912        let mut features = Vec::new();
1913
1914        loop {
1915            if !src.has_remaining() {
1916                return Err(ProtocolError::UnexpectedEof);
1917            }
1918
1919            let feature_id = src.get_u8();
1920            if feature_id == Self::TERMINATOR {
1921                break;
1922            }
1923
1924            if src.remaining() < 4 {
1925                return Err(ProtocolError::UnexpectedEof);
1926            }
1927
1928            let data_len = src.get_u32_le() as usize;
1929
1930            if src.remaining() < data_len {
1931                return Err(ProtocolError::IncompletePacket {
1932                    expected: data_len,
1933                    actual: src.remaining(),
1934                });
1935            }
1936
1937            let data = src.copy_to_bytes(data_len);
1938            features.push(FeatureAck { feature_id, data });
1939        }
1940
1941        Ok(Self { features })
1942    }
1943}
1944
1945impl SspiToken {
1946    /// Decode an SSPI token from bytes.
1947    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1948        if src.remaining() < 2 {
1949            return Err(ProtocolError::UnexpectedEof);
1950        }
1951
1952        let length = src.get_u16_le() as usize;
1953
1954        if src.remaining() < length {
1955            return Err(ProtocolError::IncompletePacket {
1956                expected: length,
1957                actual: src.remaining(),
1958            });
1959        }
1960
1961        let data = src.copy_to_bytes(length);
1962        Ok(Self { data })
1963    }
1964}
1965
1966impl FedAuthInfo {
1967    /// Decode a FEDAUTHINFO token from bytes.
1968    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1969        if src.remaining() < 4 {
1970            return Err(ProtocolError::UnexpectedEof);
1971        }
1972
1973        let _length = src.get_u32_le();
1974
1975        if src.remaining() < 5 {
1976            return Err(ProtocolError::UnexpectedEof);
1977        }
1978
1979        let _count = src.get_u8();
1980
1981        // Read option data
1982        let mut sts_url = String::new();
1983        let mut spn = String::new();
1984
1985        // Parse info options until we have both
1986        while src.has_remaining() {
1987            if src.remaining() < 9 {
1988                break;
1989            }
1990
1991            let info_id = src.get_u8();
1992            let info_len = src.get_u32_le() as usize;
1993            let _info_offset = src.get_u32_le();
1994
1995            if src.remaining() < info_len {
1996                break;
1997            }
1998
1999            // Read UTF-16LE string
2000            let char_count = info_len / 2;
2001            let mut chars = Vec::with_capacity(char_count);
2002            for _ in 0..char_count {
2003                chars.push(src.get_u16_le());
2004            }
2005
2006            if let Ok(value) = String::from_utf16(&chars) {
2007                match info_id {
2008                    0x01 => spn = value,
2009                    0x02 => sts_url = value,
2010                    _ => {}
2011                }
2012            }
2013        }
2014
2015        Ok(Self { sts_url, spn })
2016    }
2017}
2018
2019// =============================================================================
2020// Token Parser
2021// =============================================================================
2022
2023/// Token stream parser.
2024///
2025/// Parses a stream of TDS tokens from a byte buffer.
2026///
2027/// # Basic vs Context-Aware Parsing
2028///
2029/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
2030/// Use [`next_token()`](TokenParser::next_token) for these.
2031///
2032/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
2033/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
2034/// for these.
2035///
2036/// # Example
2037///
2038/// ```rust,ignore
2039/// let mut parser = TokenParser::new(data);
2040/// let mut metadata = None;
2041///
2042/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
2043///     match token {
2044///         Token::ColMetaData(meta) => {
2045///             metadata = Some(meta);
2046///         }
2047///         Token::Row(row) => {
2048///             // Process row using metadata
2049///         }
2050///         Token::Done(done) => {
2051///             if !done.has_more() {
2052///                 break;
2053///             }
2054///         }
2055///         _ => {}
2056///     }
2057/// }
2058/// ```
2059pub struct TokenParser {
2060    data: Bytes,
2061    position: usize,
2062}
2063
2064impl TokenParser {
2065    /// Create a new token parser from bytes.
2066    #[must_use]
2067    pub fn new(data: Bytes) -> Self {
2068        Self { data, position: 0 }
2069    }
2070
2071    /// Get remaining bytes in the buffer.
2072    #[must_use]
2073    pub fn remaining(&self) -> usize {
2074        self.data.len().saturating_sub(self.position)
2075    }
2076
2077    /// Check if there are more bytes to parse.
2078    #[must_use]
2079    pub fn has_remaining(&self) -> bool {
2080        self.position < self.data.len()
2081    }
2082
2083    /// Peek at the next token type without consuming it.
2084    #[must_use]
2085    pub fn peek_token_type(&self) -> Option<TokenType> {
2086        if self.position < self.data.len() {
2087            TokenType::from_u8(self.data[self.position])
2088        } else {
2089            None
2090        }
2091    }
2092
2093    /// Parse the next token from the stream.
2094    ///
2095    /// This method can only parse context-independent tokens. For tokens that
2096    /// require column metadata (ColMetaData, Row, NbcRow), use
2097    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2098    ///
2099    /// Returns `None` if no more tokens are available.
2100    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2101        self.next_token_with_metadata(None)
2102    }
2103
2104    /// Parse the next token with optional column metadata context.
2105    ///
2106    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2107    /// Without metadata, those tokens will return an error.
2108    ///
2109    /// Returns `None` if no more tokens are available.
2110    pub fn next_token_with_metadata(
2111        &mut self,
2112        metadata: Option<&ColMetaData>,
2113    ) -> Result<Option<Token>, ProtocolError> {
2114        if !self.has_remaining() {
2115            return Ok(None);
2116        }
2117
2118        let mut buf = &self.data[self.position..];
2119        let start_pos = self.position;
2120
2121        let token_type_byte = buf.get_u8();
2122        let token_type = TokenType::from_u8(token_type_byte);
2123
2124        let token = match token_type {
2125            Some(TokenType::Done) => {
2126                let done = Done::decode(&mut buf)?;
2127                Token::Done(done)
2128            }
2129            Some(TokenType::DoneProc) => {
2130                let done = DoneProc::decode(&mut buf)?;
2131                Token::DoneProc(done)
2132            }
2133            Some(TokenType::DoneInProc) => {
2134                let done = DoneInProc::decode(&mut buf)?;
2135                Token::DoneInProc(done)
2136            }
2137            Some(TokenType::Error) => {
2138                let error = ServerError::decode(&mut buf)?;
2139                Token::Error(error)
2140            }
2141            Some(TokenType::Info) => {
2142                let info = ServerInfo::decode(&mut buf)?;
2143                Token::Info(info)
2144            }
2145            Some(TokenType::LoginAck) => {
2146                let login_ack = LoginAck::decode(&mut buf)?;
2147                Token::LoginAck(login_ack)
2148            }
2149            Some(TokenType::EnvChange) => {
2150                let env_change = EnvChange::decode(&mut buf)?;
2151                Token::EnvChange(env_change)
2152            }
2153            Some(TokenType::Order) => {
2154                let order = Order::decode(&mut buf)?;
2155                Token::Order(order)
2156            }
2157            Some(TokenType::FeatureExtAck) => {
2158                let ack = FeatureExtAck::decode(&mut buf)?;
2159                Token::FeatureExtAck(ack)
2160            }
2161            Some(TokenType::Sspi) => {
2162                let sspi = SspiToken::decode(&mut buf)?;
2163                Token::Sspi(sspi)
2164            }
2165            Some(TokenType::FedAuthInfo) => {
2166                let info = FedAuthInfo::decode(&mut buf)?;
2167                Token::FedAuthInfo(info)
2168            }
2169            Some(TokenType::ReturnStatus) => {
2170                if buf.remaining() < 4 {
2171                    return Err(ProtocolError::UnexpectedEof);
2172                }
2173                let status = buf.get_i32_le();
2174                Token::ReturnStatus(status)
2175            }
2176            Some(TokenType::ColMetaData) => {
2177                let col_meta = ColMetaData::decode(&mut buf)?;
2178                Token::ColMetaData(col_meta)
2179            }
2180            Some(TokenType::Row) => {
2181                let meta = metadata.ok_or_else(|| {
2182                    ProtocolError::StringEncoding(
2183                        #[cfg(feature = "std")]
2184                        "Row token requires column metadata".to_string(),
2185                        #[cfg(not(feature = "std"))]
2186                        "Row token requires column metadata",
2187                    )
2188                })?;
2189                let row = RawRow::decode(&mut buf, meta)?;
2190                Token::Row(row)
2191            }
2192            Some(TokenType::NbcRow) => {
2193                let meta = metadata.ok_or_else(|| {
2194                    ProtocolError::StringEncoding(
2195                        #[cfg(feature = "std")]
2196                        "NbcRow token requires column metadata".to_string(),
2197                        #[cfg(not(feature = "std"))]
2198                        "NbcRow token requires column metadata",
2199                    )
2200                })?;
2201                let row = NbcRow::decode(&mut buf, meta)?;
2202                Token::NbcRow(row)
2203            }
2204            Some(TokenType::ReturnValue) => {
2205                let ret_val = ReturnValue::decode(&mut buf)?;
2206                Token::ReturnValue(ret_val)
2207            }
2208            Some(TokenType::SessionState) => {
2209                let session = SessionState::decode(&mut buf)?;
2210                Token::SessionState(session)
2211            }
2212            Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2213                // These tokens are rarely used and have complex formats.
2214                // Skip them by reading the length and advancing.
2215                if buf.remaining() < 2 {
2216                    return Err(ProtocolError::UnexpectedEof);
2217                }
2218                let length = buf.get_u16_le() as usize;
2219                if buf.remaining() < length {
2220                    return Err(ProtocolError::IncompletePacket {
2221                        expected: length,
2222                        actual: buf.remaining(),
2223                    });
2224                }
2225                // Skip the data
2226                buf.advance(length);
2227                // Recursively get the next token
2228                self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2229                return self.next_token_with_metadata(metadata);
2230            }
2231            None => {
2232                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2233            }
2234        };
2235
2236        // Update position based on how much was consumed
2237        let consumed = self.data.len() - start_pos - buf.remaining();
2238        self.position = start_pos + consumed;
2239
2240        Ok(Some(token))
2241    }
2242
2243    /// Skip the current token without fully parsing it.
2244    ///
2245    /// This is useful for skipping unknown or uninteresting tokens.
2246    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2247        if !self.has_remaining() {
2248            return Ok(());
2249        }
2250
2251        let token_type_byte = self.data[self.position];
2252        let token_type = TokenType::from_u8(token_type_byte);
2253
2254        // Calculate how many bytes to skip based on token type
2255        let skip_amount = match token_type {
2256            // Fixed-size tokens
2257            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2258                1 + Done::SIZE // token type + 12 bytes
2259            }
2260            Some(TokenType::ReturnStatus) => {
2261                1 + 4 // token type + 4 bytes
2262            }
2263            // Variable-length tokens with 2-byte length prefix
2264            Some(TokenType::Error)
2265            | Some(TokenType::Info)
2266            | Some(TokenType::LoginAck)
2267            | Some(TokenType::EnvChange)
2268            | Some(TokenType::Order)
2269            | Some(TokenType::Sspi)
2270            | Some(TokenType::ColInfo)
2271            | Some(TokenType::TabName)
2272            | Some(TokenType::Offset)
2273            | Some(TokenType::ReturnValue) => {
2274                if self.remaining() < 3 {
2275                    return Err(ProtocolError::UnexpectedEof);
2276                }
2277                let length = u16::from_le_bytes([
2278                    self.data[self.position + 1],
2279                    self.data[self.position + 2],
2280                ]) as usize;
2281                1 + 2 + length // token type + length prefix + data
2282            }
2283            // Tokens with 4-byte length prefix
2284            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2285                if self.remaining() < 5 {
2286                    return Err(ProtocolError::UnexpectedEof);
2287                }
2288                let length = u32::from_le_bytes([
2289                    self.data[self.position + 1],
2290                    self.data[self.position + 2],
2291                    self.data[self.position + 3],
2292                    self.data[self.position + 4],
2293                ]) as usize;
2294                1 + 4 + length
2295            }
2296            // FeatureExtAck has no length prefix - must parse
2297            Some(TokenType::FeatureExtAck) => {
2298                // Parse to find end
2299                let mut buf = &self.data[self.position + 1..];
2300                let _ = FeatureExtAck::decode(&mut buf)?;
2301                self.data.len() - self.position - buf.remaining()
2302            }
2303            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2304            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2305                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2306            }
2307            None => {
2308                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2309            }
2310        };
2311
2312        if self.remaining() < skip_amount {
2313            return Err(ProtocolError::UnexpectedEof);
2314        }
2315
2316        self.position += skip_amount;
2317        Ok(())
2318    }
2319
2320    /// Get the current position in the buffer.
2321    #[must_use]
2322    pub fn position(&self) -> usize {
2323        self.position
2324    }
2325
2326    /// Reset the parser to the beginning.
2327    pub fn reset(&mut self) {
2328        self.position = 0;
2329    }
2330}
2331
2332// =============================================================================
2333// Tests
2334// =============================================================================
2335
2336#[cfg(test)]
2337#[allow(clippy::unwrap_used, clippy::panic)]
2338mod tests {
2339    use super::*;
2340    use bytes::BytesMut;
2341
2342    #[test]
2343    fn test_done_roundtrip() {
2344        let done = Done {
2345            status: DoneStatus {
2346                more: false,
2347                error: false,
2348                in_xact: false,
2349                count: true,
2350                attn: false,
2351                srverror: false,
2352            },
2353            cur_cmd: 193, // SELECT
2354            row_count: 42,
2355        };
2356
2357        let mut buf = BytesMut::new();
2358        done.encode(&mut buf);
2359
2360        // Skip the token type byte
2361        let mut cursor = &buf[1..];
2362        let decoded = Done::decode(&mut cursor).unwrap();
2363
2364        assert_eq!(decoded.status.count, done.status.count);
2365        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2366        assert_eq!(decoded.row_count, done.row_count);
2367    }
2368
2369    #[test]
2370    fn test_done_status_bits() {
2371        let status = DoneStatus {
2372            more: true,
2373            error: true,
2374            in_xact: true,
2375            count: true,
2376            attn: false,
2377            srverror: false,
2378        };
2379
2380        let bits = status.to_bits();
2381        let restored = DoneStatus::from_bits(bits);
2382
2383        assert_eq!(status.more, restored.more);
2384        assert_eq!(status.error, restored.error);
2385        assert_eq!(status.in_xact, restored.in_xact);
2386        assert_eq!(status.count, restored.count);
2387    }
2388
2389    #[test]
2390    fn test_token_parser_done() {
2391        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2392        let data = Bytes::from_static(&[
2393            0xFD, // DONE token type
2394            0x10, 0x00, // status: DONE_COUNT
2395            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2396            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2397        ]);
2398
2399        let mut parser = TokenParser::new(data);
2400        let token = parser.next_token().unwrap().unwrap();
2401
2402        match token {
2403            Token::Done(done) => {
2404                assert!(done.status.count);
2405                assert!(!done.status.more);
2406                assert_eq!(done.cur_cmd, 193);
2407                assert_eq!(done.row_count, 5);
2408            }
2409            _ => panic!("Expected Done token"),
2410        }
2411
2412        // No more tokens
2413        assert!(parser.next_token().unwrap().is_none());
2414    }
2415
2416    #[test]
2417    fn test_env_change_type_from_u8() {
2418        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2419        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2420        assert_eq!(EnvChangeType::from_u8(100), None);
2421    }
2422
2423    #[test]
2424    fn test_colmetadata_no_columns() {
2425        // No metadata marker (0xFFFF)
2426        let data = Bytes::from_static(&[0xFF, 0xFF]);
2427        let mut cursor: &[u8] = &data;
2428        let meta = ColMetaData::decode(&mut cursor).unwrap();
2429        assert!(meta.is_empty());
2430        assert_eq!(meta.column_count(), 0);
2431    }
2432
2433    #[test]
2434    fn test_colmetadata_single_int_column() {
2435        // COLMETADATA with 1 INT column
2436        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
2437        let mut data = BytesMut::new();
2438        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2439        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2440        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2441        data.extend_from_slice(&[0x38]); // TypeId::Int4
2442        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
2443        data.extend_from_slice(&[0x02]); // 2 characters
2444        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
2445
2446        let mut cursor: &[u8] = &data;
2447        let meta = ColMetaData::decode(&mut cursor).unwrap();
2448
2449        assert_eq!(meta.column_count(), 1);
2450        assert_eq!(meta.columns[0].name, "id");
2451        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2452        assert!(meta.columns[0].is_nullable());
2453    }
2454
2455    #[test]
2456    fn test_colmetadata_nvarchar_column() {
2457        // COLMETADATA with 1 NVARCHAR(50) column
2458        let mut data = BytesMut::new();
2459        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2460        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2461        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2462        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
2463        // Type info: max_length (2 bytes) + collation (5 bytes)
2464        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
2465        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
2466        // Column name "name"
2467        data.extend_from_slice(&[0x04]); // 4 characters
2468        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
2469
2470        let mut cursor: &[u8] = &data;
2471        let meta = ColMetaData::decode(&mut cursor).unwrap();
2472
2473        assert_eq!(meta.column_count(), 1);
2474        assert_eq!(meta.columns[0].name, "name");
2475        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
2476        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
2477        assert!(meta.columns[0].type_info.collation.is_some());
2478    }
2479
2480    #[test]
2481    fn test_raw_row_decode_int() {
2482        // Create metadata for a single INT column
2483        let metadata = ColMetaData {
2484            columns: vec![ColumnData {
2485                name: "id".to_string(),
2486                type_id: TypeId::Int4,
2487                col_type: 0x38,
2488                flags: 0,
2489                user_type: 0,
2490                type_info: TypeInfo::default(),
2491            }],
2492        };
2493
2494        // Row data: just 4 bytes for the int value 42
2495        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
2496        let mut cursor: &[u8] = &data;
2497        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2498
2499        // The raw data should contain the 4 bytes
2500        assert_eq!(row.data.len(), 4);
2501        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
2502    }
2503
2504    #[test]
2505    fn test_raw_row_decode_nullable_int() {
2506        // Create metadata for a nullable INT column (IntN)
2507        let metadata = ColMetaData {
2508            columns: vec![ColumnData {
2509                name: "id".to_string(),
2510                type_id: TypeId::IntN,
2511                col_type: 0x26,
2512                flags: 0x01, // nullable
2513                user_type: 0,
2514                type_info: TypeInfo {
2515                    max_length: Some(4),
2516                    ..Default::default()
2517                },
2518            }],
2519        };
2520
2521        // Row data with value: 1 byte length + 4 bytes value
2522        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
2523        let mut cursor: &[u8] = &data;
2524        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2525
2526        assert_eq!(row.data.len(), 5);
2527        assert_eq!(row.data[0], 4); // length
2528        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
2529    }
2530
2531    #[test]
2532    fn test_raw_row_decode_null_value() {
2533        // Create metadata for a nullable INT column (IntN)
2534        let metadata = ColMetaData {
2535            columns: vec![ColumnData {
2536                name: "id".to_string(),
2537                type_id: TypeId::IntN,
2538                col_type: 0x26,
2539                flags: 0x01, // nullable
2540                user_type: 0,
2541                type_info: TypeInfo {
2542                    max_length: Some(4),
2543                    ..Default::default()
2544                },
2545            }],
2546        };
2547
2548        // NULL value: length = 0xFF (for bytelen types)
2549        let data = Bytes::from_static(&[0xFF]);
2550        let mut cursor: &[u8] = &data;
2551        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2552
2553        assert_eq!(row.data.len(), 1);
2554        assert_eq!(row.data[0], 0xFF); // NULL marker
2555    }
2556
2557    #[test]
2558    fn test_nbcrow_null_bitmap() {
2559        let row = NbcRow {
2560            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
2561            data: Bytes::new(),
2562        };
2563
2564        assert!(row.is_null(0));
2565        assert!(!row.is_null(1));
2566        assert!(row.is_null(2));
2567        assert!(!row.is_null(3));
2568    }
2569
2570    #[test]
2571    fn test_token_parser_colmetadata() {
2572        // Build a COLMETADATA token with 1 INT column
2573        let mut data = BytesMut::new();
2574        data.extend_from_slice(&[0x81]); // COLMETADATA token type
2575        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2576        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2577        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2578        data.extend_from_slice(&[0x38]); // TypeId::Int4
2579        data.extend_from_slice(&[0x02]); // column name length
2580        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
2581
2582        let mut parser = TokenParser::new(data.freeze());
2583        let token = parser.next_token().unwrap().unwrap();
2584
2585        match token {
2586            Token::ColMetaData(meta) => {
2587                assert_eq!(meta.column_count(), 1);
2588                assert_eq!(meta.columns[0].name, "id");
2589                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2590            }
2591            _ => panic!("Expected ColMetaData token"),
2592        }
2593    }
2594
2595    #[test]
2596    fn test_token_parser_row_with_metadata() {
2597        // Build metadata
2598        let metadata = ColMetaData {
2599            columns: vec![ColumnData {
2600                name: "id".to_string(),
2601                type_id: TypeId::Int4,
2602                col_type: 0x38,
2603                flags: 0,
2604                user_type: 0,
2605                type_info: TypeInfo::default(),
2606            }],
2607        };
2608
2609        // Build ROW token
2610        let mut data = BytesMut::new();
2611        data.extend_from_slice(&[0xD1]); // ROW token type
2612        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2613
2614        let mut parser = TokenParser::new(data.freeze());
2615        let token = parser
2616            .next_token_with_metadata(Some(&metadata))
2617            .unwrap()
2618            .unwrap();
2619
2620        match token {
2621            Token::Row(row) => {
2622                assert_eq!(row.data.len(), 4);
2623            }
2624            _ => panic!("Expected Row token"),
2625        }
2626    }
2627
2628    #[test]
2629    fn test_token_parser_row_without_metadata_fails() {
2630        // Build ROW token
2631        let mut data = BytesMut::new();
2632        data.extend_from_slice(&[0xD1]); // ROW token type
2633        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2634
2635        let mut parser = TokenParser::new(data.freeze());
2636        let result = parser.next_token(); // No metadata provided
2637
2638        assert!(result.is_err());
2639    }
2640
2641    #[test]
2642    fn test_token_parser_peek() {
2643        let data = Bytes::from_static(&[
2644            0xFD, // DONE token type
2645            0x10, 0x00, // status
2646            0xC1, 0x00, // cur_cmd
2647            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
2648        ]);
2649
2650        let parser = TokenParser::new(data);
2651        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
2652    }
2653
2654    #[test]
2655    fn test_column_data_fixed_size() {
2656        let col = ColumnData {
2657            name: String::new(),
2658            type_id: TypeId::Int4,
2659            col_type: 0x38,
2660            flags: 0,
2661            user_type: 0,
2662            type_info: TypeInfo::default(),
2663        };
2664        assert_eq!(col.fixed_size(), Some(4));
2665
2666        let col2 = ColumnData {
2667            name: String::new(),
2668            type_id: TypeId::NVarChar,
2669            col_type: 0xE7,
2670            flags: 0,
2671            user_type: 0,
2672            type_info: TypeInfo::default(),
2673        };
2674        assert_eq!(col2.fixed_size(), None);
2675    }
2676
2677    // ========================================================================
2678    // End-to-End Decode Tests (Wire → Stored → Verification)
2679    // ========================================================================
2680    //
2681    // These tests verify that RawRow::decode_column_value correctly stores
2682    // column values in a format that can be parsed back.
2683
2684    #[test]
2685    fn test_decode_nvarchar_then_intn_roundtrip() {
2686        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
2687        // This tests the scenario from the MCP parameterized query
2688
2689        // Build wire data (what the server sends)
2690        let mut wire_data = BytesMut::new();
2691
2692        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
2693        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
2694        let word = "World";
2695        let utf16: Vec<u16> = word.encode_utf16().collect();
2696        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
2697        for code_unit in &utf16 {
2698            wire_data.put_u16_le(*code_unit);
2699        }
2700
2701        // Column 1: IntN 42 - 1-byte length prefix
2702        wire_data.put_u8(4); // 4 bytes for INT
2703        wire_data.put_i32_le(42);
2704
2705        // Build column metadata
2706        let metadata = ColMetaData {
2707            columns: vec![
2708                ColumnData {
2709                    name: "greeting".to_string(),
2710                    type_id: TypeId::NVarChar,
2711                    col_type: 0xE7,
2712                    flags: 0x01,
2713                    user_type: 0,
2714                    type_info: TypeInfo {
2715                        max_length: Some(10), // non-MAX
2716                        precision: None,
2717                        scale: None,
2718                        collation: None,
2719                    },
2720                },
2721                ColumnData {
2722                    name: "number".to_string(),
2723                    type_id: TypeId::IntN,
2724                    col_type: 0x26,
2725                    flags: 0x01,
2726                    user_type: 0,
2727                    type_info: TypeInfo {
2728                        max_length: Some(4),
2729                        precision: None,
2730                        scale: None,
2731                        collation: None,
2732                    },
2733                },
2734            ],
2735        };
2736
2737        // Decode the wire data into stored format
2738        let mut wire_cursor = wire_data.freeze();
2739        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2740
2741        // Verify wire data was fully consumed
2742        assert_eq!(
2743            wire_cursor.remaining(),
2744            0,
2745            "wire data should be fully consumed"
2746        );
2747
2748        // Now parse the stored data
2749        let mut stored_cursor: &[u8] = &raw_row.data;
2750
2751        // Parse column 0 (NVarChar)
2752        // Stored format for non-MAX NVarChar: [2-byte len][data]
2753        assert!(
2754            stored_cursor.remaining() >= 2,
2755            "need at least 2 bytes for length"
2756        );
2757        let len0 = stored_cursor.get_u16_le() as usize;
2758        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
2759        assert!(
2760            stored_cursor.remaining() >= len0,
2761            "need {len0} bytes for data"
2762        );
2763
2764        // Read UTF-16LE and convert to string
2765        let mut utf16_read = Vec::new();
2766        for _ in 0..(len0 / 2) {
2767            utf16_read.push(stored_cursor.get_u16_le());
2768        }
2769        let string0 = String::from_utf16(&utf16_read).unwrap();
2770        assert_eq!(string0, "World", "column 0 should be 'World'");
2771
2772        // Parse column 1 (IntN)
2773        // Stored format for IntN: [1-byte len][data]
2774        assert!(
2775            stored_cursor.remaining() >= 1,
2776            "need at least 1 byte for length"
2777        );
2778        let len1 = stored_cursor.get_u8();
2779        assert_eq!(len1, 4, "IntN length should be 4");
2780        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
2781        let int1 = stored_cursor.get_i32_le();
2782        assert_eq!(int1, 42, "column 1 should be 42");
2783
2784        // Verify stored data was fully consumed
2785        assert_eq!(
2786            stored_cursor.remaining(),
2787            0,
2788            "stored data should be fully consumed"
2789        );
2790    }
2791
2792    #[test]
2793    fn test_decode_nvarchar_max_then_intn_roundtrip() {
2794        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
2795
2796        // Build wire data for PLP NVARCHAR(MAX) + IntN
2797        let mut wire_data = BytesMut::new();
2798
2799        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
2800        // PLP: 8-byte total length, then chunks
2801        let word = "Hello";
2802        let utf16: Vec<u16> = word.encode_utf16().collect();
2803        let byte_len = (utf16.len() * 2) as u64;
2804
2805        wire_data.put_u64_le(byte_len); // total length = 10
2806        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
2807        for code_unit in &utf16 {
2808            wire_data.put_u16_le(*code_unit);
2809        }
2810        wire_data.put_u32_le(0); // terminating zero-length chunk
2811
2812        // Column 1: IntN 99
2813        wire_data.put_u8(4);
2814        wire_data.put_i32_le(99);
2815
2816        // Build metadata with MAX type
2817        let metadata = ColMetaData {
2818            columns: vec![
2819                ColumnData {
2820                    name: "text".to_string(),
2821                    type_id: TypeId::NVarChar,
2822                    col_type: 0xE7,
2823                    flags: 0x01,
2824                    user_type: 0,
2825                    type_info: TypeInfo {
2826                        max_length: Some(0xFFFF), // MAX indicator
2827                        precision: None,
2828                        scale: None,
2829                        collation: None,
2830                    },
2831                },
2832                ColumnData {
2833                    name: "num".to_string(),
2834                    type_id: TypeId::IntN,
2835                    col_type: 0x26,
2836                    flags: 0x01,
2837                    user_type: 0,
2838                    type_info: TypeInfo {
2839                        max_length: Some(4),
2840                        precision: None,
2841                        scale: None,
2842                        collation: None,
2843                    },
2844                },
2845            ],
2846        };
2847
2848        // Decode wire data
2849        let mut wire_cursor = wire_data.freeze();
2850        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2851
2852        // Verify wire data was fully consumed
2853        assert_eq!(
2854            wire_cursor.remaining(),
2855            0,
2856            "wire data should be fully consumed"
2857        );
2858
2859        // Parse stored PLP data for column 0
2860        let mut stored_cursor: &[u8] = &raw_row.data;
2861
2862        // PLP stored format: [8-byte total][chunks...][4-byte 0]
2863        let total_len = stored_cursor.get_u64_le();
2864        assert_eq!(total_len, 10, "PLP total length should be 10");
2865
2866        let chunk_len = stored_cursor.get_u32_le();
2867        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
2868
2869        let mut utf16_read = Vec::new();
2870        for _ in 0..(chunk_len / 2) {
2871            utf16_read.push(stored_cursor.get_u16_le());
2872        }
2873        let string0 = String::from_utf16(&utf16_read).unwrap();
2874        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
2875
2876        let terminator = stored_cursor.get_u32_le();
2877        assert_eq!(terminator, 0, "PLP should end with 0");
2878
2879        // Parse IntN
2880        let len1 = stored_cursor.get_u8();
2881        assert_eq!(len1, 4);
2882        let int1 = stored_cursor.get_i32_le();
2883        assert_eq!(int1, 99, "column 1 should be 99");
2884
2885        // Verify fully consumed
2886        assert_eq!(
2887            stored_cursor.remaining(),
2888            0,
2889            "stored data should be fully consumed"
2890        );
2891    }
2892}