tds_protocol/
token.rs

1//! TDS token stream definitions.
2//!
3//! Tokens are the fundamental units of TDS response data. The server sends
4//! a stream of tokens that describe metadata, rows, errors, and other information.
5//!
6//! ## Token Structure
7//!
8//! Each token begins with a 1-byte token type identifier, followed by
9//! token-specific data. Some tokens have fixed lengths, while others
10//! have length prefixes.
11//!
12//! ## Usage
13//!
14//! ```rust,ignore
15//! use tds_protocol::token::{Token, TokenParser};
16//! use bytes::Bytes;
17//!
18//! let data: Bytes = /* received from server */;
19//! let mut parser = TokenParser::new(data);
20//!
21//! while let Some(token) = parser.next_token()? {
22//!     match token {
23//!         Token::Done(done) => println!("Rows affected: {}", done.row_count),
24//!         Token::Error(err) => eprintln!("Error {}: {}", err.number, err.message),
25//!         _ => {}
26//!     }
27//! }
28//! ```
29
30use bytes::{Buf, BufMut, Bytes};
31
32use crate::codec::{read_b_varchar, read_us_varchar};
33use crate::error::ProtocolError;
34use crate::types::TypeId;
35
36/// Token type identifier.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
38#[repr(u8)]
39pub enum TokenType {
40    /// Column metadata (COLMETADATA).
41    ColMetaData = 0x81,
42    /// Error message (ERROR).
43    Error = 0xAA,
44    /// Informational message (INFO).
45    Info = 0xAB,
46    /// Login acknowledgment (LOGINACK).
47    LoginAck = 0xAD,
48    /// Row data (ROW).
49    Row = 0xD1,
50    /// Null bitmap compressed row (NBCROW).
51    NbcRow = 0xD2,
52    /// Environment change (ENVCHANGE).
53    EnvChange = 0xE3,
54    /// SSPI authentication (SSPI).
55    Sspi = 0xED,
56    /// Done (DONE).
57    Done = 0xFD,
58    /// Done in procedure (DONEINPROC).
59    DoneInProc = 0xFF,
60    /// Done procedure (DONEPROC).
61    DoneProc = 0xFE,
62    /// Return status (RETURNSTATUS).
63    ReturnStatus = 0x79,
64    /// Return value (RETURNVALUE).
65    ReturnValue = 0xAC,
66    /// Order (ORDER).
67    Order = 0xA9,
68    /// Feature extension acknowledgment (FEATUREEXTACK).
69    FeatureExtAck = 0xAE,
70    /// Session state (SESSIONSTATE).
71    SessionState = 0xE4,
72    /// Federated authentication info (FEDAUTHINFO).
73    FedAuthInfo = 0xEE,
74    /// Column info (COLINFO).
75    ColInfo = 0xA5,
76    /// Table name (TABNAME).
77    TabName = 0xA4,
78    /// Offset (OFFSET).
79    Offset = 0x78,
80}
81
82impl TokenType {
83    /// Create a token type from a raw byte.
84    pub fn from_u8(value: u8) -> Option<Self> {
85        match value {
86            0x81 => Some(Self::ColMetaData),
87            0xAA => Some(Self::Error),
88            0xAB => Some(Self::Info),
89            0xAD => Some(Self::LoginAck),
90            0xD1 => Some(Self::Row),
91            0xD2 => Some(Self::NbcRow),
92            0xE3 => Some(Self::EnvChange),
93            0xED => Some(Self::Sspi),
94            0xFD => Some(Self::Done),
95            0xFF => Some(Self::DoneInProc),
96            0xFE => Some(Self::DoneProc),
97            0x79 => Some(Self::ReturnStatus),
98            0xAC => Some(Self::ReturnValue),
99            0xA9 => Some(Self::Order),
100            0xAE => Some(Self::FeatureExtAck),
101            0xE4 => Some(Self::SessionState),
102            0xEE => Some(Self::FedAuthInfo),
103            0xA5 => Some(Self::ColInfo),
104            0xA4 => Some(Self::TabName),
105            0x78 => Some(Self::Offset),
106            _ => None,
107        }
108    }
109}
110
111/// Parsed TDS token.
112///
113/// This enum represents all possible tokens that can be received from SQL Server.
114/// Each variant contains the parsed token data.
115#[derive(Debug, Clone)]
116pub enum Token {
117    /// Column metadata describing result set structure.
118    ColMetaData(ColMetaData),
119    /// Row data.
120    Row(RawRow),
121    /// Null bitmap compressed row.
122    NbcRow(NbcRow),
123    /// Completion of a SQL statement.
124    Done(Done),
125    /// Completion of a stored procedure.
126    DoneProc(DoneProc),
127    /// Completion within a stored procedure.
128    DoneInProc(DoneInProc),
129    /// Return status from stored procedure.
130    ReturnStatus(i32),
131    /// Return value from stored procedure.
132    ReturnValue(ReturnValue),
133    /// Error message from server.
134    Error(ServerError),
135    /// Informational message from server.
136    Info(ServerInfo),
137    /// Login acknowledgment.
138    LoginAck(LoginAck),
139    /// Environment change notification.
140    EnvChange(EnvChange),
141    /// Column ordering information.
142    Order(Order),
143    /// Feature extension acknowledgment.
144    FeatureExtAck(FeatureExtAck),
145    /// SSPI authentication data.
146    Sspi(SspiToken),
147    /// Session state information.
148    SessionState(SessionState),
149    /// Federated authentication info.
150    FedAuthInfo(FedAuthInfo),
151}
152
153/// Column metadata token.
154#[derive(Debug, Clone, Default)]
155pub struct ColMetaData {
156    /// Column definitions.
157    pub columns: Vec<ColumnData>,
158}
159
160/// Column definition within metadata.
161#[derive(Debug, Clone)]
162pub struct ColumnData {
163    /// Column name.
164    pub name: String,
165    /// Column data type ID.
166    pub type_id: TypeId,
167    /// Column data type raw byte (for unknown types).
168    pub col_type: u8,
169    /// Column flags.
170    pub flags: u16,
171    /// User type ID.
172    pub user_type: u32,
173    /// Type-specific metadata.
174    pub type_info: TypeInfo,
175}
176
177/// Type-specific metadata.
178#[derive(Debug, Clone, Default)]
179pub struct TypeInfo {
180    /// Maximum length for variable-length types.
181    pub max_length: Option<u32>,
182    /// Precision for numeric types.
183    pub precision: Option<u8>,
184    /// Scale for numeric types.
185    pub scale: Option<u8>,
186    /// Collation for string types.
187    pub collation: Option<Collation>,
188}
189
190/// SQL Server collation.
191#[derive(Debug, Clone, Copy, Default)]
192pub struct Collation {
193    /// Locale ID.
194    pub lcid: u32,
195    /// Sort ID.
196    pub sort_id: u8,
197}
198
199/// Raw row data (not yet decoded).
200#[derive(Debug, Clone)]
201pub struct RawRow {
202    /// Raw column values.
203    pub data: bytes::Bytes,
204}
205
206/// Null bitmap compressed row.
207#[derive(Debug, Clone)]
208pub struct NbcRow {
209    /// Null bitmap.
210    pub null_bitmap: Vec<u8>,
211    /// Raw non-null column values.
212    pub data: bytes::Bytes,
213}
214
215/// Done token indicating statement completion.
216#[derive(Debug, Clone, Copy)]
217pub struct Done {
218    /// Status flags.
219    pub status: DoneStatus,
220    /// Current command.
221    pub cur_cmd: u16,
222    /// Row count (if applicable).
223    pub row_count: u64,
224}
225
226/// Done status flags.
227#[derive(Debug, Clone, Copy, Default)]
228pub struct DoneStatus {
229    /// More results follow.
230    pub more: bool,
231    /// Error occurred.
232    pub error: bool,
233    /// Transaction in progress.
234    pub in_xact: bool,
235    /// Row count is valid.
236    pub count: bool,
237    /// Attention acknowledgment.
238    pub attn: bool,
239    /// Server error caused statement termination.
240    pub srverror: bool,
241}
242
243/// Done in procedure token.
244#[derive(Debug, Clone, Copy)]
245pub struct DoneInProc {
246    /// Status flags.
247    pub status: DoneStatus,
248    /// Current command.
249    pub cur_cmd: u16,
250    /// Row count.
251    pub row_count: u64,
252}
253
254/// Done procedure token.
255#[derive(Debug, Clone, Copy)]
256pub struct DoneProc {
257    /// Status flags.
258    pub status: DoneStatus,
259    /// Current command.
260    pub cur_cmd: u16,
261    /// Row count.
262    pub row_count: u64,
263}
264
265/// Return value from stored procedure.
266#[derive(Debug, Clone)]
267pub struct ReturnValue {
268    /// Parameter ordinal.
269    pub param_ordinal: u16,
270    /// Parameter name.
271    pub param_name: String,
272    /// Status flags.
273    pub status: u8,
274    /// User type.
275    pub user_type: u32,
276    /// Type flags.
277    pub flags: u16,
278    /// Type info.
279    pub type_info: TypeInfo,
280    /// Value data.
281    pub value: bytes::Bytes,
282}
283
284/// Server error message.
285#[derive(Debug, Clone)]
286pub struct ServerError {
287    /// Error number.
288    pub number: i32,
289    /// Error state.
290    pub state: u8,
291    /// Error severity class.
292    pub class: u8,
293    /// Error message text.
294    pub message: String,
295    /// Server name.
296    pub server: String,
297    /// Procedure name.
298    pub procedure: String,
299    /// Line number.
300    pub line: i32,
301}
302
303/// Server informational message.
304#[derive(Debug, Clone)]
305pub struct ServerInfo {
306    /// Info number.
307    pub number: i32,
308    /// Info state.
309    pub state: u8,
310    /// Info class (severity).
311    pub class: u8,
312    /// Info message text.
313    pub message: String,
314    /// Server name.
315    pub server: String,
316    /// Procedure name.
317    pub procedure: String,
318    /// Line number.
319    pub line: i32,
320}
321
322/// Login acknowledgment token.
323#[derive(Debug, Clone)]
324pub struct LoginAck {
325    /// Interface type.
326    pub interface: u8,
327    /// TDS version.
328    pub tds_version: u32,
329    /// Program name.
330    pub prog_name: String,
331    /// Program version.
332    pub prog_version: u32,
333}
334
335/// Environment change token.
336#[derive(Debug, Clone)]
337pub struct EnvChange {
338    /// Type of environment change.
339    pub env_type: EnvChangeType,
340    /// New value.
341    pub new_value: EnvChangeValue,
342    /// Old value.
343    pub old_value: EnvChangeValue,
344}
345
346/// Environment change type.
347#[derive(Debug, Clone, Copy, PartialEq, Eq)]
348#[repr(u8)]
349pub enum EnvChangeType {
350    /// Database changed.
351    Database = 1,
352    /// Language changed.
353    Language = 2,
354    /// Character set changed.
355    CharacterSet = 3,
356    /// Packet size changed.
357    PacketSize = 4,
358    /// Unicode data sorting locale ID.
359    UnicodeSortingLocalId = 5,
360    /// Unicode comparison flags.
361    UnicodeComparisonFlags = 6,
362    /// SQL collation.
363    SqlCollation = 7,
364    /// Begin transaction.
365    BeginTransaction = 8,
366    /// Commit transaction.
367    CommitTransaction = 9,
368    /// Rollback transaction.
369    RollbackTransaction = 10,
370    /// Enlist DTC transaction.
371    EnlistDtcTransaction = 11,
372    /// Defect DTC transaction.
373    DefectTransaction = 12,
374    /// Real-time log shipping.
375    RealTimeLogShipping = 13,
376    /// Promote transaction.
377    PromoteTransaction = 15,
378    /// Transaction manager address.
379    TransactionManagerAddress = 16,
380    /// Transaction ended.
381    TransactionEnded = 17,
382    /// Reset connection completion acknowledgment.
383    ResetConnectionCompletionAck = 18,
384    /// User instance started.
385    UserInstanceStarted = 19,
386    /// Routing information.
387    Routing = 20,
388}
389
390/// Environment change value.
391#[derive(Debug, Clone)]
392pub enum EnvChangeValue {
393    /// String value.
394    String(String),
395    /// Binary value.
396    Binary(bytes::Bytes),
397    /// Routing information.
398    Routing {
399        /// Host name.
400        host: String,
401        /// Port number.
402        port: u16,
403    },
404}
405
406/// Column ordering information.
407#[derive(Debug, Clone)]
408pub struct Order {
409    /// Ordered column indices.
410    pub columns: Vec<u16>,
411}
412
413/// Feature extension acknowledgment.
414#[derive(Debug, Clone)]
415pub struct FeatureExtAck {
416    /// Acknowledged features.
417    pub features: Vec<FeatureAck>,
418}
419
420/// Individual feature acknowledgment.
421#[derive(Debug, Clone)]
422pub struct FeatureAck {
423    /// Feature ID.
424    pub feature_id: u8,
425    /// Feature data.
426    pub data: bytes::Bytes,
427}
428
429/// SSPI authentication token.
430#[derive(Debug, Clone)]
431pub struct SspiToken {
432    /// SSPI data.
433    pub data: bytes::Bytes,
434}
435
436/// Session state token.
437#[derive(Debug, Clone)]
438pub struct SessionState {
439    /// Session state data.
440    pub data: bytes::Bytes,
441}
442
443/// Federated authentication info.
444#[derive(Debug, Clone)]
445pub struct FedAuthInfo {
446    /// STS URL.
447    pub sts_url: String,
448    /// Service principal name.
449    pub spn: String,
450}
451
452// =============================================================================
453// ColMetaData and Row Parsing Implementation
454// =============================================================================
455
456impl ColMetaData {
457    /// Special value indicating no metadata.
458    pub const NO_METADATA: u16 = 0xFFFF;
459
460    /// Decode a COLMETADATA token from bytes.
461    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
462        if src.remaining() < 2 {
463            return Err(ProtocolError::UnexpectedEof);
464        }
465
466        let column_count = src.get_u16_le();
467
468        // 0xFFFF means no metadata present
469        if column_count == Self::NO_METADATA {
470            return Ok(Self {
471                columns: Vec::new(),
472            });
473        }
474
475        let mut columns = Vec::with_capacity(column_count as usize);
476
477        for _ in 0..column_count {
478            let column = Self::decode_column(src)?;
479            columns.push(column);
480        }
481
482        Ok(Self { columns })
483    }
484
485    /// Decode a single column from the metadata.
486    fn decode_column(src: &mut impl Buf) -> Result<ColumnData, ProtocolError> {
487        // UserType (4 bytes) + Flags (2 bytes) + TypeId (1 byte)
488        if src.remaining() < 7 {
489            return Err(ProtocolError::UnexpectedEof);
490        }
491
492        let user_type = src.get_u32_le();
493        let flags = src.get_u16_le();
494        let col_type = src.get_u8();
495
496        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null); // Default to Null for unknown types
497
498        // Parse type-specific metadata
499        let type_info = Self::decode_type_info(src, type_id, col_type)?;
500
501        // Read column name (B_VARCHAR format - 1 byte length in characters)
502        let name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
503
504        Ok(ColumnData {
505            name,
506            type_id,
507            col_type,
508            flags,
509            user_type,
510            type_info,
511        })
512    }
513
514    /// Decode type-specific metadata based on the type ID.
515    fn decode_type_info(
516        src: &mut impl Buf,
517        type_id: TypeId,
518        col_type: u8,
519    ) -> Result<TypeInfo, ProtocolError> {
520        match type_id {
521            // Fixed-length types have no additional metadata
522            TypeId::Null => Ok(TypeInfo::default()),
523            TypeId::Int1 | TypeId::Bit => Ok(TypeInfo::default()),
524            TypeId::Int2 => Ok(TypeInfo::default()),
525            TypeId::Int4 => Ok(TypeInfo::default()),
526            TypeId::Int8 => Ok(TypeInfo::default()),
527            TypeId::Float4 => Ok(TypeInfo::default()),
528            TypeId::Float8 => Ok(TypeInfo::default()),
529            TypeId::Money => Ok(TypeInfo::default()),
530            TypeId::Money4 => Ok(TypeInfo::default()),
531            TypeId::DateTime => Ok(TypeInfo::default()),
532            TypeId::DateTime4 => Ok(TypeInfo::default()),
533
534            // Variable length integer/float/money (1-byte max length)
535            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
536                if src.remaining() < 1 {
537                    return Err(ProtocolError::UnexpectedEof);
538                }
539                let max_length = src.get_u8() as u32;
540                Ok(TypeInfo {
541                    max_length: Some(max_length),
542                    ..Default::default()
543                })
544            }
545
546            // GUID has 1-byte length
547            TypeId::Guid => {
548                if src.remaining() < 1 {
549                    return Err(ProtocolError::UnexpectedEof);
550                }
551                let max_length = src.get_u8() as u32;
552                Ok(TypeInfo {
553                    max_length: Some(max_length),
554                    ..Default::default()
555                })
556            }
557
558            // Decimal/Numeric types (1-byte length + precision + scale)
559            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
560                if src.remaining() < 3 {
561                    return Err(ProtocolError::UnexpectedEof);
562                }
563                let max_length = src.get_u8() as u32;
564                let precision = src.get_u8();
565                let scale = src.get_u8();
566                Ok(TypeInfo {
567                    max_length: Some(max_length),
568                    precision: Some(precision),
569                    scale: Some(scale),
570                    ..Default::default()
571                })
572            }
573
574            // Old-style byte-length strings (Char, VarChar, Binary, VarBinary)
575            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
576                if src.remaining() < 1 {
577                    return Err(ProtocolError::UnexpectedEof);
578                }
579                let max_length = src.get_u8() as u32;
580                Ok(TypeInfo {
581                    max_length: Some(max_length),
582                    ..Default::default()
583                })
584            }
585
586            // Big varchar/binary with 2-byte length + collation for strings
587            TypeId::BigVarChar | TypeId::BigChar => {
588                if src.remaining() < 7 {
589                    // 2 (length) + 5 (collation)
590                    return Err(ProtocolError::UnexpectedEof);
591                }
592                let max_length = src.get_u16_le() as u32;
593                let collation = Self::decode_collation(src)?;
594                Ok(TypeInfo {
595                    max_length: Some(max_length),
596                    collation: Some(collation),
597                    ..Default::default()
598                })
599            }
600
601            // Big binary (2-byte length, no collation)
602            TypeId::BigVarBinary | TypeId::BigBinary => {
603                if src.remaining() < 2 {
604                    return Err(ProtocolError::UnexpectedEof);
605                }
606                let max_length = src.get_u16_le() as u32;
607                Ok(TypeInfo {
608                    max_length: Some(max_length),
609                    ..Default::default()
610                })
611            }
612
613            // Unicode strings (NChar, NVarChar) - 2-byte length + collation
614            TypeId::NChar | TypeId::NVarChar => {
615                if src.remaining() < 7 {
616                    // 2 (length) + 5 (collation)
617                    return Err(ProtocolError::UnexpectedEof);
618                }
619                let max_length = src.get_u16_le() as u32;
620                let collation = Self::decode_collation(src)?;
621                Ok(TypeInfo {
622                    max_length: Some(max_length),
623                    collation: Some(collation),
624                    ..Default::default()
625                })
626            }
627
628            // Date type (no additional metadata)
629            TypeId::Date => Ok(TypeInfo::default()),
630
631            // Time, DateTime2, DateTimeOffset have scale
632            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
633                if src.remaining() < 1 {
634                    return Err(ProtocolError::UnexpectedEof);
635                }
636                let scale = src.get_u8();
637                Ok(TypeInfo {
638                    scale: Some(scale),
639                    ..Default::default()
640                })
641            }
642
643            // Text/NText/Image (deprecated LOB types)
644            TypeId::Text | TypeId::NText | TypeId::Image => {
645                // These have complex metadata: length (4) + collation (5) + table name parts
646                if src.remaining() < 4 {
647                    return Err(ProtocolError::UnexpectedEof);
648                }
649                let max_length = src.get_u32_le();
650
651                // For Text/NText, read collation
652                let collation = if type_id == TypeId::Text || type_id == TypeId::NText {
653                    if src.remaining() < 5 {
654                        return Err(ProtocolError::UnexpectedEof);
655                    }
656                    Some(Self::decode_collation(src)?)
657                } else {
658                    None
659                };
660
661                // Skip table name parts (variable length)
662                // Format: numParts (1 byte) followed by us_varchar for each part
663                if src.remaining() < 1 {
664                    return Err(ProtocolError::UnexpectedEof);
665                }
666                let num_parts = src.get_u8();
667                for _ in 0..num_parts {
668                    // Read and discard table name part
669                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
670                }
671
672                Ok(TypeInfo {
673                    max_length: Some(max_length),
674                    collation,
675                    ..Default::default()
676                })
677            }
678
679            // XML type
680            TypeId::Xml => {
681                if src.remaining() < 1 {
682                    return Err(ProtocolError::UnexpectedEof);
683                }
684                let schema_present = src.get_u8();
685
686                if schema_present != 0 {
687                    // Read schema info (3 us_varchar strings)
688                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
689                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // owning schema
690                    let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // xml schema collection
691                }
692
693                Ok(TypeInfo::default())
694            }
695
696            // UDT (User-defined type) - complex metadata
697            TypeId::Udt => {
698                // Max length (2 bytes)
699                if src.remaining() < 2 {
700                    return Err(ProtocolError::UnexpectedEof);
701                }
702                let max_length = src.get_u16_le() as u32;
703
704                // UDT metadata: db name, schema name, type name, assembly qualified name
705                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // db name
706                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // schema name
707                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // type name
708                let _ = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?; // assembly qualified name
709
710                Ok(TypeInfo {
711                    max_length: Some(max_length),
712                    ..Default::default()
713                })
714            }
715
716            // Table-valued parameter - complex metadata (skip for now)
717            TypeId::Tvp => {
718                // TVP has very complex metadata, not commonly used
719                // For now, we can't properly parse this
720                Err(ProtocolError::InvalidTokenType(col_type))
721            }
722
723            // SQL Variant - 4-byte length
724            TypeId::Variant => {
725                if src.remaining() < 4 {
726                    return Err(ProtocolError::UnexpectedEof);
727                }
728                let max_length = src.get_u32_le();
729                Ok(TypeInfo {
730                    max_length: Some(max_length),
731                    ..Default::default()
732                })
733            }
734        }
735    }
736
737    /// Decode collation information (5 bytes).
738    fn decode_collation(src: &mut impl Buf) -> Result<Collation, ProtocolError> {
739        if src.remaining() < 5 {
740            return Err(ProtocolError::UnexpectedEof);
741        }
742        // Collation: LCID (4 bytes) + Sort ID (1 byte)
743        let lcid = src.get_u32_le();
744        let sort_id = src.get_u8();
745        Ok(Collation { lcid, sort_id })
746    }
747
748    /// Get the number of columns.
749    #[must_use]
750    pub fn column_count(&self) -> usize {
751        self.columns.len()
752    }
753
754    /// Check if this represents no metadata.
755    #[must_use]
756    pub fn is_empty(&self) -> bool {
757        self.columns.is_empty()
758    }
759}
760
761impl ColumnData {
762    /// Check if this column is nullable.
763    #[must_use]
764    pub fn is_nullable(&self) -> bool {
765        (self.flags & 0x0001) != 0
766    }
767
768    /// Get the fixed size in bytes for this column, if applicable.
769    ///
770    /// Returns `None` for variable-length types.
771    #[must_use]
772    pub fn fixed_size(&self) -> Option<usize> {
773        match self.type_id {
774            TypeId::Null => Some(0),
775            TypeId::Int1 | TypeId::Bit => Some(1),
776            TypeId::Int2 => Some(2),
777            TypeId::Int4 => Some(4),
778            TypeId::Int8 => Some(8),
779            TypeId::Float4 => Some(4),
780            TypeId::Float8 => Some(8),
781            TypeId::Money => Some(8),
782            TypeId::Money4 => Some(4),
783            TypeId::DateTime => Some(8),
784            TypeId::DateTime4 => Some(4),
785            TypeId::Date => Some(3),
786            _ => None,
787        }
788    }
789}
790
791// =============================================================================
792// Row Parsing Implementation
793// =============================================================================
794
795impl RawRow {
796    /// Decode a ROW token from bytes.
797    ///
798    /// This function requires the column metadata to know how to parse the row.
799    /// The row data is stored as raw bytes for later parsing.
800    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
801        let mut data = bytes::BytesMut::new();
802
803        for col in &metadata.columns {
804            Self::decode_column_value(src, col, &mut data)?;
805        }
806
807        Ok(Self {
808            data: data.freeze(),
809        })
810    }
811
812    /// Decode a single column value and append to the output buffer.
813    fn decode_column_value(
814        src: &mut impl Buf,
815        col: &ColumnData,
816        dst: &mut bytes::BytesMut,
817    ) -> Result<(), ProtocolError> {
818        match col.type_id {
819            // Fixed-length types
820            TypeId::Null => {
821                // No data
822            }
823            TypeId::Int1 | TypeId::Bit => {
824                if src.remaining() < 1 {
825                    return Err(ProtocolError::UnexpectedEof);
826                }
827                dst.extend_from_slice(&[src.get_u8()]);
828            }
829            TypeId::Int2 => {
830                if src.remaining() < 2 {
831                    return Err(ProtocolError::UnexpectedEof);
832                }
833                dst.extend_from_slice(&src.get_u16_le().to_le_bytes());
834            }
835            TypeId::Int4 => {
836                if src.remaining() < 4 {
837                    return Err(ProtocolError::UnexpectedEof);
838                }
839                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
840            }
841            TypeId::Int8 => {
842                if src.remaining() < 8 {
843                    return Err(ProtocolError::UnexpectedEof);
844                }
845                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
846            }
847            TypeId::Float4 => {
848                if src.remaining() < 4 {
849                    return Err(ProtocolError::UnexpectedEof);
850                }
851                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
852            }
853            TypeId::Float8 => {
854                if src.remaining() < 8 {
855                    return Err(ProtocolError::UnexpectedEof);
856                }
857                dst.extend_from_slice(&src.get_u64_le().to_le_bytes());
858            }
859            TypeId::Money => {
860                if src.remaining() < 8 {
861                    return Err(ProtocolError::UnexpectedEof);
862                }
863                let hi = src.get_u32_le();
864                let lo = src.get_u32_le();
865                dst.extend_from_slice(&hi.to_le_bytes());
866                dst.extend_from_slice(&lo.to_le_bytes());
867            }
868            TypeId::Money4 => {
869                if src.remaining() < 4 {
870                    return Err(ProtocolError::UnexpectedEof);
871                }
872                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
873            }
874            TypeId::DateTime => {
875                if src.remaining() < 8 {
876                    return Err(ProtocolError::UnexpectedEof);
877                }
878                let days = src.get_u32_le();
879                let time = src.get_u32_le();
880                dst.extend_from_slice(&days.to_le_bytes());
881                dst.extend_from_slice(&time.to_le_bytes());
882            }
883            TypeId::DateTime4 => {
884                if src.remaining() < 4 {
885                    return Err(ProtocolError::UnexpectedEof);
886                }
887                dst.extend_from_slice(&src.get_u32_le().to_le_bytes());
888            }
889            // DATE type uses 1-byte length prefix (can be NULL)
890            TypeId::Date => {
891                Self::decode_bytelen_type(src, dst)?;
892            }
893
894            // Variable-length nullable types (length-prefixed)
895            TypeId::IntN | TypeId::BitN | TypeId::FloatN | TypeId::MoneyN | TypeId::DateTimeN => {
896                Self::decode_bytelen_type(src, dst)?;
897            }
898
899            TypeId::Guid => {
900                Self::decode_bytelen_type(src, dst)?;
901            }
902
903            TypeId::Decimal | TypeId::Numeric | TypeId::DecimalN | TypeId::NumericN => {
904                Self::decode_bytelen_type(src, dst)?;
905            }
906
907            // Old-style byte-length strings
908            TypeId::Char | TypeId::VarChar | TypeId::Binary | TypeId::VarBinary => {
909                Self::decode_bytelen_type(src, dst)?;
910            }
911
912            // 2-byte length strings (or PLP for MAX types)
913            TypeId::BigVarChar | TypeId::BigVarBinary => {
914                // max_length == 0xFFFF indicates VARCHAR(MAX) or VARBINARY(MAX), which uses PLP
915                if col.type_info.max_length == Some(0xFFFF) {
916                    Self::decode_plp_type(src, dst)?;
917                } else {
918                    Self::decode_ushortlen_type(src, dst)?;
919                }
920            }
921
922            // Fixed-length types that don't have MAX variants
923            TypeId::BigChar | TypeId::BigBinary => {
924                Self::decode_ushortlen_type(src, dst)?;
925            }
926
927            // Unicode strings (2-byte length in bytes, or PLP for NVARCHAR(MAX))
928            TypeId::NVarChar => {
929                // max_length == 0xFFFF indicates NVARCHAR(MAX), which uses PLP
930                if col.type_info.max_length == Some(0xFFFF) {
931                    Self::decode_plp_type(src, dst)?;
932                } else {
933                    Self::decode_ushortlen_type(src, dst)?;
934                }
935            }
936
937            // Fixed-length NCHAR doesn't have MAX variant
938            TypeId::NChar => {
939                Self::decode_ushortlen_type(src, dst)?;
940            }
941
942            // Time types with scale
943            TypeId::Time | TypeId::DateTime2 | TypeId::DateTimeOffset => {
944                Self::decode_bytelen_type(src, dst)?;
945            }
946
947            // TEXT/NTEXT/IMAGE - deprecated LOB types using textptr format
948            TypeId::Text | TypeId::NText | TypeId::Image => {
949                Self::decode_textptr_type(src, dst)?;
950            }
951
952            // XML - uses actual PLP format
953            TypeId::Xml => {
954                Self::decode_plp_type(src, dst)?;
955            }
956
957            // Complex types
958            TypeId::Variant => {
959                Self::decode_intlen_type(src, dst)?;
960            }
961
962            TypeId::Udt => {
963                // UDT uses PLP encoding
964                Self::decode_plp_type(src, dst)?;
965            }
966
967            TypeId::Tvp => {
968                // TVP not supported in row data
969                return Err(ProtocolError::InvalidTokenType(col.col_type));
970            }
971        }
972
973        Ok(())
974    }
975
976    /// Decode a 1-byte length-prefixed value.
977    fn decode_bytelen_type(
978        src: &mut impl Buf,
979        dst: &mut bytes::BytesMut,
980    ) -> Result<(), ProtocolError> {
981        if src.remaining() < 1 {
982            return Err(ProtocolError::UnexpectedEof);
983        }
984        let len = src.get_u8() as usize;
985        if len == 0xFF {
986            // NULL value - store as zero-length with NULL marker
987            dst.extend_from_slice(&[0xFF]);
988        } else if len == 0 {
989            // Empty value
990            dst.extend_from_slice(&[0x00]);
991        } else {
992            if src.remaining() < len {
993                return Err(ProtocolError::UnexpectedEof);
994            }
995            dst.extend_from_slice(&[len as u8]);
996            for _ in 0..len {
997                dst.extend_from_slice(&[src.get_u8()]);
998            }
999        }
1000        Ok(())
1001    }
1002
1003    /// Decode a 2-byte length-prefixed value.
1004    fn decode_ushortlen_type(
1005        src: &mut impl Buf,
1006        dst: &mut bytes::BytesMut,
1007    ) -> Result<(), ProtocolError> {
1008        if src.remaining() < 2 {
1009            return Err(ProtocolError::UnexpectedEof);
1010        }
1011        let len = src.get_u16_le() as usize;
1012        if len == 0xFFFF {
1013            // NULL value
1014            dst.extend_from_slice(&0xFFFFu16.to_le_bytes());
1015        } else if len == 0 {
1016            // Empty value
1017            dst.extend_from_slice(&0u16.to_le_bytes());
1018        } else {
1019            if src.remaining() < len {
1020                return Err(ProtocolError::UnexpectedEof);
1021            }
1022            dst.extend_from_slice(&(len as u16).to_le_bytes());
1023            for _ in 0..len {
1024                dst.extend_from_slice(&[src.get_u8()]);
1025            }
1026        }
1027        Ok(())
1028    }
1029
1030    /// Decode a 4-byte length-prefixed value.
1031    fn decode_intlen_type(
1032        src: &mut impl Buf,
1033        dst: &mut bytes::BytesMut,
1034    ) -> Result<(), ProtocolError> {
1035        if src.remaining() < 4 {
1036            return Err(ProtocolError::UnexpectedEof);
1037        }
1038        let len = src.get_u32_le() as usize;
1039        if len == 0xFFFFFFFF {
1040            // NULL value
1041            dst.extend_from_slice(&0xFFFFFFFFu32.to_le_bytes());
1042        } else if len == 0 {
1043            // Empty value
1044            dst.extend_from_slice(&0u32.to_le_bytes());
1045        } else {
1046            if src.remaining() < len {
1047                return Err(ProtocolError::UnexpectedEof);
1048            }
1049            dst.extend_from_slice(&(len as u32).to_le_bytes());
1050            for _ in 0..len {
1051                dst.extend_from_slice(&[src.get_u8()]);
1052            }
1053        }
1054        Ok(())
1055    }
1056
1057    /// Decode a TEXT/NTEXT/IMAGE type (textptr format).
1058    ///
1059    /// These deprecated LOB types use a special format:
1060    /// - 1 byte: textptr_len (0 = NULL)
1061    /// - textptr_len bytes: textptr (if not NULL)
1062    /// - 8 bytes: timestamp (if not NULL)
1063    /// - 4 bytes: data length (if not NULL)
1064    /// - data_len bytes: the actual data (if not NULL)
1065    ///
1066    /// We convert this to PLP format for the client to parse:
1067    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFF = NULL)
1068    /// - 4 bytes: chunk length (= data length)
1069    /// - chunk data
1070    /// - 4 bytes: 0 (terminator)
1071    fn decode_textptr_type(
1072        src: &mut impl Buf,
1073        dst: &mut bytes::BytesMut,
1074    ) -> Result<(), ProtocolError> {
1075        if src.remaining() < 1 {
1076            return Err(ProtocolError::UnexpectedEof);
1077        }
1078
1079        let textptr_len = src.get_u8() as usize;
1080
1081        if textptr_len == 0 {
1082            // NULL value - write PLP NULL marker
1083            dst.extend_from_slice(&0xFFFFFFFFFFFFFFFFu64.to_le_bytes());
1084            return Ok(());
1085        }
1086
1087        // Skip textptr bytes
1088        if src.remaining() < textptr_len {
1089            return Err(ProtocolError::UnexpectedEof);
1090        }
1091        src.advance(textptr_len);
1092
1093        // Skip 8-byte timestamp
1094        if src.remaining() < 8 {
1095            return Err(ProtocolError::UnexpectedEof);
1096        }
1097        src.advance(8);
1098
1099        // Read data length
1100        if src.remaining() < 4 {
1101            return Err(ProtocolError::UnexpectedEof);
1102        }
1103        let data_len = src.get_u32_le() as usize;
1104
1105        if src.remaining() < data_len {
1106            return Err(ProtocolError::UnexpectedEof);
1107        }
1108
1109        // Write in PLP format for client parsing:
1110        // - 8 bytes: total length
1111        // - 4 bytes: chunk length
1112        // - chunk data
1113        // - 4 bytes: 0 (terminator)
1114        dst.extend_from_slice(&(data_len as u64).to_le_bytes());
1115        dst.extend_from_slice(&(data_len as u32).to_le_bytes());
1116        for _ in 0..data_len {
1117            dst.extend_from_slice(&[src.get_u8()]);
1118        }
1119        dst.extend_from_slice(&0u32.to_le_bytes()); // PLP terminator
1120
1121        Ok(())
1122    }
1123
1124    /// Decode a PLP (Partially Length-Prefixed) value.
1125    ///
1126    /// PLP format:
1127    /// - 8 bytes: total length (0xFFFFFFFFFFFFFFFE = unknown, 0xFFFFFFFFFFFFFFFF = NULL)
1128    /// - If not NULL: chunks of (4 byte chunk length + data) until chunk length = 0
1129    fn decode_plp_type(src: &mut impl Buf, dst: &mut bytes::BytesMut) -> Result<(), ProtocolError> {
1130        if src.remaining() < 8 {
1131            return Err(ProtocolError::UnexpectedEof);
1132        }
1133
1134        let total_len = src.get_u64_le();
1135
1136        // Store the total length marker
1137        dst.extend_from_slice(&total_len.to_le_bytes());
1138
1139        if total_len == 0xFFFFFFFFFFFFFFFF {
1140            // NULL value - no more data
1141            return Ok(());
1142        }
1143
1144        // Read chunks until terminator
1145        loop {
1146            if src.remaining() < 4 {
1147                return Err(ProtocolError::UnexpectedEof);
1148            }
1149            let chunk_len = src.get_u32_le() as usize;
1150            dst.extend_from_slice(&(chunk_len as u32).to_le_bytes());
1151
1152            if chunk_len == 0 {
1153                // End of PLP data
1154                break;
1155            }
1156
1157            if src.remaining() < chunk_len {
1158                return Err(ProtocolError::UnexpectedEof);
1159            }
1160
1161            for _ in 0..chunk_len {
1162                dst.extend_from_slice(&[src.get_u8()]);
1163            }
1164        }
1165
1166        Ok(())
1167    }
1168}
1169
1170// =============================================================================
1171// NbcRow Parsing Implementation
1172// =============================================================================
1173
1174impl NbcRow {
1175    /// Decode an NBCROW token from bytes.
1176    ///
1177    /// NBCROW (Null Bitmap Compressed Row) stores a bitmap indicating which
1178    /// columns are NULL, followed by only the non-NULL values.
1179    pub fn decode(src: &mut impl Buf, metadata: &ColMetaData) -> Result<Self, ProtocolError> {
1180        let col_count = metadata.columns.len();
1181        let bitmap_len = (col_count + 7) / 8;
1182
1183        if src.remaining() < bitmap_len {
1184            return Err(ProtocolError::UnexpectedEof);
1185        }
1186
1187        // Read null bitmap
1188        let mut null_bitmap = vec![0u8; bitmap_len];
1189        for byte in &mut null_bitmap {
1190            *byte = src.get_u8();
1191        }
1192
1193        // Read non-null values
1194        let mut data = bytes::BytesMut::new();
1195
1196        for (i, col) in metadata.columns.iter().enumerate() {
1197            let byte_idx = i / 8;
1198            let bit_idx = i % 8;
1199            let is_null = (null_bitmap[byte_idx] & (1 << bit_idx)) != 0;
1200
1201            if !is_null {
1202                // Read the value - for NBCROW, we read without the length prefix
1203                // for fixed-length types, and with length prefix for variable types
1204                RawRow::decode_column_value(src, col, &mut data)?;
1205            }
1206        }
1207
1208        Ok(Self {
1209            null_bitmap,
1210            data: data.freeze(),
1211        })
1212    }
1213
1214    /// Check if a column at the given index is NULL.
1215    #[must_use]
1216    pub fn is_null(&self, column_index: usize) -> bool {
1217        let byte_idx = column_index / 8;
1218        let bit_idx = column_index % 8;
1219        if byte_idx < self.null_bitmap.len() {
1220            (self.null_bitmap[byte_idx] & (1 << bit_idx)) != 0
1221        } else {
1222            true // Out of bounds = NULL
1223        }
1224    }
1225}
1226
1227// =============================================================================
1228// ReturnValue Parsing Implementation
1229// =============================================================================
1230
1231impl ReturnValue {
1232    /// Decode a RETURNVALUE token from bytes.
1233    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1234        // Length (2 bytes)
1235        if src.remaining() < 2 {
1236            return Err(ProtocolError::UnexpectedEof);
1237        }
1238        let _length = src.get_u16_le();
1239
1240        // Parameter ordinal (2 bytes)
1241        if src.remaining() < 2 {
1242            return Err(ProtocolError::UnexpectedEof);
1243        }
1244        let param_ordinal = src.get_u16_le();
1245
1246        // Parameter name (B_VARCHAR)
1247        let param_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1248
1249        // Status (1 byte)
1250        if src.remaining() < 1 {
1251            return Err(ProtocolError::UnexpectedEof);
1252        }
1253        let status = src.get_u8();
1254
1255        // User type (4 bytes) + flags (2 bytes) + type id (1 byte)
1256        if src.remaining() < 7 {
1257            return Err(ProtocolError::UnexpectedEof);
1258        }
1259        let user_type = src.get_u32_le();
1260        let flags = src.get_u16_le();
1261        let col_type = src.get_u8();
1262
1263        let type_id = TypeId::from_u8(col_type).unwrap_or(TypeId::Null);
1264
1265        // Parse type info
1266        let type_info = ColMetaData::decode_type_info(src, type_id, col_type)?;
1267
1268        // Read the value data
1269        let mut value_buf = bytes::BytesMut::new();
1270
1271        // Create a temporary column for value parsing
1272        let temp_col = ColumnData {
1273            name: String::new(),
1274            type_id,
1275            col_type,
1276            flags,
1277            user_type,
1278            type_info: type_info.clone(),
1279        };
1280
1281        RawRow::decode_column_value(src, &temp_col, &mut value_buf)?;
1282
1283        Ok(Self {
1284            param_ordinal,
1285            param_name,
1286            status,
1287            user_type,
1288            flags,
1289            type_info,
1290            value: value_buf.freeze(),
1291        })
1292    }
1293}
1294
1295// =============================================================================
1296// SessionState Parsing Implementation
1297// =============================================================================
1298
1299impl SessionState {
1300    /// Decode a SESSIONSTATE token from bytes.
1301    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1302        if src.remaining() < 4 {
1303            return Err(ProtocolError::UnexpectedEof);
1304        }
1305
1306        let length = src.get_u32_le() as usize;
1307
1308        if src.remaining() < length {
1309            return Err(ProtocolError::IncompletePacket {
1310                expected: length,
1311                actual: src.remaining(),
1312            });
1313        }
1314
1315        let data = src.copy_to_bytes(length);
1316
1317        Ok(Self { data })
1318    }
1319}
1320
1321// =============================================================================
1322// Token Parsing Implementation
1323// =============================================================================
1324
1325/// Done token status flags bit positions.
1326mod done_status_bits {
1327    pub const DONE_MORE: u16 = 0x0001;
1328    pub const DONE_ERROR: u16 = 0x0002;
1329    pub const DONE_INXACT: u16 = 0x0004;
1330    pub const DONE_COUNT: u16 = 0x0010;
1331    pub const DONE_ATTN: u16 = 0x0020;
1332    pub const DONE_SRVERROR: u16 = 0x0100;
1333}
1334
1335impl DoneStatus {
1336    /// Parse done status from raw bits.
1337    #[must_use]
1338    pub fn from_bits(bits: u16) -> Self {
1339        use done_status_bits::*;
1340        Self {
1341            more: (bits & DONE_MORE) != 0,
1342            error: (bits & DONE_ERROR) != 0,
1343            in_xact: (bits & DONE_INXACT) != 0,
1344            count: (bits & DONE_COUNT) != 0,
1345            attn: (bits & DONE_ATTN) != 0,
1346            srverror: (bits & DONE_SRVERROR) != 0,
1347        }
1348    }
1349
1350    /// Convert to raw bits.
1351    #[must_use]
1352    pub fn to_bits(&self) -> u16 {
1353        use done_status_bits::*;
1354        let mut bits = 0u16;
1355        if self.more {
1356            bits |= DONE_MORE;
1357        }
1358        if self.error {
1359            bits |= DONE_ERROR;
1360        }
1361        if self.in_xact {
1362            bits |= DONE_INXACT;
1363        }
1364        if self.count {
1365            bits |= DONE_COUNT;
1366        }
1367        if self.attn {
1368            bits |= DONE_ATTN;
1369        }
1370        if self.srverror {
1371            bits |= DONE_SRVERROR;
1372        }
1373        bits
1374    }
1375}
1376
1377impl Done {
1378    /// Size of the DONE token in bytes (excluding token type byte).
1379    pub const SIZE: usize = 12; // 2 (status) + 2 (curcmd) + 8 (rowcount)
1380
1381    /// Decode a DONE token from bytes.
1382    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1383        if src.remaining() < Self::SIZE {
1384            return Err(ProtocolError::IncompletePacket {
1385                expected: Self::SIZE,
1386                actual: src.remaining(),
1387            });
1388        }
1389
1390        let status = DoneStatus::from_bits(src.get_u16_le());
1391        let cur_cmd = src.get_u16_le();
1392        let row_count = src.get_u64_le();
1393
1394        Ok(Self {
1395            status,
1396            cur_cmd,
1397            row_count,
1398        })
1399    }
1400
1401    /// Encode the DONE token to bytes.
1402    pub fn encode(&self, dst: &mut impl BufMut) {
1403        dst.put_u8(TokenType::Done as u8);
1404        dst.put_u16_le(self.status.to_bits());
1405        dst.put_u16_le(self.cur_cmd);
1406        dst.put_u64_le(self.row_count);
1407    }
1408
1409    /// Check if more results follow this DONE token.
1410    #[must_use]
1411    pub const fn has_more(&self) -> bool {
1412        self.status.more
1413    }
1414
1415    /// Check if an error occurred.
1416    #[must_use]
1417    pub const fn has_error(&self) -> bool {
1418        self.status.error
1419    }
1420
1421    /// Check if the row count is valid.
1422    #[must_use]
1423    pub const fn has_count(&self) -> bool {
1424        self.status.count
1425    }
1426}
1427
1428impl DoneProc {
1429    /// Size of the DONEPROC token in bytes (excluding token type byte).
1430    pub const SIZE: usize = 12;
1431
1432    /// Decode a DONEPROC token from bytes.
1433    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1434        if src.remaining() < Self::SIZE {
1435            return Err(ProtocolError::IncompletePacket {
1436                expected: Self::SIZE,
1437                actual: src.remaining(),
1438            });
1439        }
1440
1441        let status = DoneStatus::from_bits(src.get_u16_le());
1442        let cur_cmd = src.get_u16_le();
1443        let row_count = src.get_u64_le();
1444
1445        Ok(Self {
1446            status,
1447            cur_cmd,
1448            row_count,
1449        })
1450    }
1451
1452    /// Encode the DONEPROC token to bytes.
1453    pub fn encode(&self, dst: &mut impl BufMut) {
1454        dst.put_u8(TokenType::DoneProc as u8);
1455        dst.put_u16_le(self.status.to_bits());
1456        dst.put_u16_le(self.cur_cmd);
1457        dst.put_u64_le(self.row_count);
1458    }
1459}
1460
1461impl DoneInProc {
1462    /// Size of the DONEINPROC token in bytes (excluding token type byte).
1463    pub const SIZE: usize = 12;
1464
1465    /// Decode a DONEINPROC token from bytes.
1466    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1467        if src.remaining() < Self::SIZE {
1468            return Err(ProtocolError::IncompletePacket {
1469                expected: Self::SIZE,
1470                actual: src.remaining(),
1471            });
1472        }
1473
1474        let status = DoneStatus::from_bits(src.get_u16_le());
1475        let cur_cmd = src.get_u16_le();
1476        let row_count = src.get_u64_le();
1477
1478        Ok(Self {
1479            status,
1480            cur_cmd,
1481            row_count,
1482        })
1483    }
1484
1485    /// Encode the DONEINPROC token to bytes.
1486    pub fn encode(&self, dst: &mut impl BufMut) {
1487        dst.put_u8(TokenType::DoneInProc as u8);
1488        dst.put_u16_le(self.status.to_bits());
1489        dst.put_u16_le(self.cur_cmd);
1490        dst.put_u64_le(self.row_count);
1491    }
1492}
1493
1494impl ServerError {
1495    /// Decode an ERROR token from bytes.
1496    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1497        // ERROR token: length (2) + number (4) + state (1) + class (1) +
1498        //              message (us_varchar) + server (b_varchar) + procedure (b_varchar) + line (4)
1499        if src.remaining() < 2 {
1500            return Err(ProtocolError::UnexpectedEof);
1501        }
1502
1503        let _length = src.get_u16_le();
1504
1505        if src.remaining() < 6 {
1506            return Err(ProtocolError::UnexpectedEof);
1507        }
1508
1509        let number = src.get_i32_le();
1510        let state = src.get_u8();
1511        let class = src.get_u8();
1512
1513        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1514        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1515        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1516
1517        if src.remaining() < 4 {
1518            return Err(ProtocolError::UnexpectedEof);
1519        }
1520        let line = src.get_i32_le();
1521
1522        Ok(Self {
1523            number,
1524            state,
1525            class,
1526            message,
1527            server,
1528            procedure,
1529            line,
1530        })
1531    }
1532
1533    /// Check if this is a fatal error (severity >= 20).
1534    #[must_use]
1535    pub const fn is_fatal(&self) -> bool {
1536        self.class >= 20
1537    }
1538
1539    /// Check if this error indicates the batch was aborted (severity >= 16).
1540    #[must_use]
1541    pub const fn is_batch_abort(&self) -> bool {
1542        self.class >= 16
1543    }
1544}
1545
1546impl ServerInfo {
1547    /// Decode an INFO token from bytes.
1548    ///
1549    /// INFO tokens have the same structure as ERROR tokens but with lower severity.
1550    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1551        if src.remaining() < 2 {
1552            return Err(ProtocolError::UnexpectedEof);
1553        }
1554
1555        let _length = src.get_u16_le();
1556
1557        if src.remaining() < 6 {
1558            return Err(ProtocolError::UnexpectedEof);
1559        }
1560
1561        let number = src.get_i32_le();
1562        let state = src.get_u8();
1563        let class = src.get_u8();
1564
1565        let message = read_us_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1566        let server = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1567        let procedure = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1568
1569        if src.remaining() < 4 {
1570            return Err(ProtocolError::UnexpectedEof);
1571        }
1572        let line = src.get_i32_le();
1573
1574        Ok(Self {
1575            number,
1576            state,
1577            class,
1578            message,
1579            server,
1580            procedure,
1581            line,
1582        })
1583    }
1584}
1585
1586impl LoginAck {
1587    /// Decode a LOGINACK token from bytes.
1588    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1589        // LOGINACK: length (2) + interface (1) + tds_version (4) + prog_name (b_varchar) + prog_version (4)
1590        if src.remaining() < 2 {
1591            return Err(ProtocolError::UnexpectedEof);
1592        }
1593
1594        let _length = src.get_u16_le();
1595
1596        if src.remaining() < 5 {
1597            return Err(ProtocolError::UnexpectedEof);
1598        }
1599
1600        let interface = src.get_u8();
1601        let tds_version = src.get_u32_le();
1602        let prog_name = read_b_varchar(src).ok_or(ProtocolError::UnexpectedEof)?;
1603
1604        if src.remaining() < 4 {
1605            return Err(ProtocolError::UnexpectedEof);
1606        }
1607        let prog_version = src.get_u32_le();
1608
1609        Ok(Self {
1610            interface,
1611            tds_version,
1612            prog_name,
1613            prog_version,
1614        })
1615    }
1616
1617    /// Get the TDS version as a `TdsVersion`.
1618    #[must_use]
1619    pub fn tds_version(&self) -> crate::version::TdsVersion {
1620        crate::version::TdsVersion::new(self.tds_version)
1621    }
1622}
1623
1624impl EnvChangeType {
1625    /// Create from raw byte value.
1626    pub fn from_u8(value: u8) -> Option<Self> {
1627        match value {
1628            1 => Some(Self::Database),
1629            2 => Some(Self::Language),
1630            3 => Some(Self::CharacterSet),
1631            4 => Some(Self::PacketSize),
1632            5 => Some(Self::UnicodeSortingLocalId),
1633            6 => Some(Self::UnicodeComparisonFlags),
1634            7 => Some(Self::SqlCollation),
1635            8 => Some(Self::BeginTransaction),
1636            9 => Some(Self::CommitTransaction),
1637            10 => Some(Self::RollbackTransaction),
1638            11 => Some(Self::EnlistDtcTransaction),
1639            12 => Some(Self::DefectTransaction),
1640            13 => Some(Self::RealTimeLogShipping),
1641            15 => Some(Self::PromoteTransaction),
1642            16 => Some(Self::TransactionManagerAddress),
1643            17 => Some(Self::TransactionEnded),
1644            18 => Some(Self::ResetConnectionCompletionAck),
1645            19 => Some(Self::UserInstanceStarted),
1646            20 => Some(Self::Routing),
1647            _ => None,
1648        }
1649    }
1650}
1651
1652impl EnvChange {
1653    /// Decode an ENVCHANGE token from bytes.
1654    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1655        if src.remaining() < 3 {
1656            return Err(ProtocolError::UnexpectedEof);
1657        }
1658
1659        let length = src.get_u16_le() as usize;
1660        if src.remaining() < length {
1661            return Err(ProtocolError::IncompletePacket {
1662                expected: length,
1663                actual: src.remaining(),
1664            });
1665        }
1666
1667        let env_type_byte = src.get_u8();
1668        let env_type = EnvChangeType::from_u8(env_type_byte)
1669            .ok_or(ProtocolError::InvalidTokenType(env_type_byte))?;
1670
1671        let (new_value, old_value) = match env_type {
1672            EnvChangeType::Routing => {
1673                // Routing has special format
1674                let new_value = Self::decode_routing_value(src)?;
1675                let old_value = EnvChangeValue::Binary(Bytes::new());
1676                (new_value, old_value)
1677            }
1678            EnvChangeType::BeginTransaction
1679            | EnvChangeType::CommitTransaction
1680            | EnvChangeType::RollbackTransaction
1681            | EnvChangeType::EnlistDtcTransaction
1682            | EnvChangeType::SqlCollation => {
1683                // These use binary format per MS-TDS spec:
1684                // - Transaction tokens: transaction descriptor (8 bytes)
1685                // - SqlCollation: collation info (5 bytes: LCID + sort flags)
1686                let new_len = src.get_u8() as usize;
1687                let new_value = if new_len > 0 && src.remaining() >= new_len {
1688                    EnvChangeValue::Binary(src.copy_to_bytes(new_len))
1689                } else {
1690                    EnvChangeValue::Binary(Bytes::new())
1691                };
1692
1693                let old_len = src.get_u8() as usize;
1694                let old_value = if old_len > 0 && src.remaining() >= old_len {
1695                    EnvChangeValue::Binary(src.copy_to_bytes(old_len))
1696                } else {
1697                    EnvChangeValue::Binary(Bytes::new())
1698                };
1699
1700                (new_value, old_value)
1701            }
1702            _ => {
1703                // String format for most env changes
1704                let new_value = read_b_varchar(src)
1705                    .map(EnvChangeValue::String)
1706                    .unwrap_or(EnvChangeValue::String(String::new()));
1707
1708                let old_value = read_b_varchar(src)
1709                    .map(EnvChangeValue::String)
1710                    .unwrap_or(EnvChangeValue::String(String::new()));
1711
1712                (new_value, old_value)
1713            }
1714        };
1715
1716        Ok(Self {
1717            env_type,
1718            new_value,
1719            old_value,
1720        })
1721    }
1722
1723    fn decode_routing_value(src: &mut impl Buf) -> Result<EnvChangeValue, ProtocolError> {
1724        // Routing format: length (2) + protocol (1) + port (2) + server_len (2) + server (utf16)
1725        if src.remaining() < 2 {
1726            return Err(ProtocolError::UnexpectedEof);
1727        }
1728
1729        let _routing_len = src.get_u16_le();
1730
1731        if src.remaining() < 5 {
1732            return Err(ProtocolError::UnexpectedEof);
1733        }
1734
1735        let _protocol = src.get_u8();
1736        let port = src.get_u16_le();
1737        let server_len = src.get_u16_le() as usize;
1738
1739        // Read UTF-16LE server name
1740        if src.remaining() < server_len * 2 {
1741            return Err(ProtocolError::UnexpectedEof);
1742        }
1743
1744        let mut chars = Vec::with_capacity(server_len);
1745        for _ in 0..server_len {
1746            chars.push(src.get_u16_le());
1747        }
1748
1749        let host = String::from_utf16(&chars).map_err(|_| {
1750            ProtocolError::StringEncoding(
1751                #[cfg(feature = "std")]
1752                "invalid UTF-16 in routing hostname".to_string(),
1753                #[cfg(not(feature = "std"))]
1754                "invalid UTF-16 in routing hostname",
1755            )
1756        })?;
1757
1758        Ok(EnvChangeValue::Routing { host, port })
1759    }
1760
1761    /// Check if this is a routing redirect.
1762    #[must_use]
1763    pub fn is_routing(&self) -> bool {
1764        self.env_type == EnvChangeType::Routing
1765    }
1766
1767    /// Get routing information if this is a routing change.
1768    #[must_use]
1769    pub fn routing_info(&self) -> Option<(&str, u16)> {
1770        if let EnvChangeValue::Routing { host, port } = &self.new_value {
1771            Some((host, *port))
1772        } else {
1773            None
1774        }
1775    }
1776
1777    /// Get the new database name if this is a database change.
1778    #[must_use]
1779    pub fn new_database(&self) -> Option<&str> {
1780        if self.env_type == EnvChangeType::Database {
1781            if let EnvChangeValue::String(s) = &self.new_value {
1782                return Some(s);
1783            }
1784        }
1785        None
1786    }
1787}
1788
1789impl Order {
1790    /// Decode an ORDER token from bytes.
1791    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1792        if src.remaining() < 2 {
1793            return Err(ProtocolError::UnexpectedEof);
1794        }
1795
1796        let length = src.get_u16_le() as usize;
1797        let column_count = length / 2;
1798
1799        if src.remaining() < length {
1800            return Err(ProtocolError::IncompletePacket {
1801                expected: length,
1802                actual: src.remaining(),
1803            });
1804        }
1805
1806        let mut columns = Vec::with_capacity(column_count);
1807        for _ in 0..column_count {
1808            columns.push(src.get_u16_le());
1809        }
1810
1811        Ok(Self { columns })
1812    }
1813}
1814
1815impl FeatureExtAck {
1816    /// Feature terminator byte.
1817    pub const TERMINATOR: u8 = 0xFF;
1818
1819    /// Decode a FEATUREEXTACK token from bytes.
1820    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1821        let mut features = Vec::new();
1822
1823        loop {
1824            if !src.has_remaining() {
1825                return Err(ProtocolError::UnexpectedEof);
1826            }
1827
1828            let feature_id = src.get_u8();
1829            if feature_id == Self::TERMINATOR {
1830                break;
1831            }
1832
1833            if src.remaining() < 4 {
1834                return Err(ProtocolError::UnexpectedEof);
1835            }
1836
1837            let data_len = src.get_u32_le() as usize;
1838
1839            if src.remaining() < data_len {
1840                return Err(ProtocolError::IncompletePacket {
1841                    expected: data_len,
1842                    actual: src.remaining(),
1843                });
1844            }
1845
1846            let data = src.copy_to_bytes(data_len);
1847            features.push(FeatureAck { feature_id, data });
1848        }
1849
1850        Ok(Self { features })
1851    }
1852}
1853
1854impl SspiToken {
1855    /// Decode an SSPI token from bytes.
1856    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1857        if src.remaining() < 2 {
1858            return Err(ProtocolError::UnexpectedEof);
1859        }
1860
1861        let length = src.get_u16_le() as usize;
1862
1863        if src.remaining() < length {
1864            return Err(ProtocolError::IncompletePacket {
1865                expected: length,
1866                actual: src.remaining(),
1867            });
1868        }
1869
1870        let data = src.copy_to_bytes(length);
1871        Ok(Self { data })
1872    }
1873}
1874
1875impl FedAuthInfo {
1876    /// Decode a FEDAUTHINFO token from bytes.
1877    pub fn decode(src: &mut impl Buf) -> Result<Self, ProtocolError> {
1878        if src.remaining() < 4 {
1879            return Err(ProtocolError::UnexpectedEof);
1880        }
1881
1882        let _length = src.get_u32_le();
1883
1884        if src.remaining() < 5 {
1885            return Err(ProtocolError::UnexpectedEof);
1886        }
1887
1888        let _count = src.get_u8();
1889
1890        // Read option data
1891        let mut sts_url = String::new();
1892        let mut spn = String::new();
1893
1894        // Parse info options until we have both
1895        while src.has_remaining() {
1896            if src.remaining() < 9 {
1897                break;
1898            }
1899
1900            let info_id = src.get_u8();
1901            let info_len = src.get_u32_le() as usize;
1902            let _info_offset = src.get_u32_le();
1903
1904            if src.remaining() < info_len {
1905                break;
1906            }
1907
1908            // Read UTF-16LE string
1909            let char_count = info_len / 2;
1910            let mut chars = Vec::with_capacity(char_count);
1911            for _ in 0..char_count {
1912                chars.push(src.get_u16_le());
1913            }
1914
1915            if let Ok(value) = String::from_utf16(&chars) {
1916                match info_id {
1917                    0x01 => spn = value,
1918                    0x02 => sts_url = value,
1919                    _ => {}
1920                }
1921            }
1922        }
1923
1924        Ok(Self { sts_url, spn })
1925    }
1926}
1927
1928// =============================================================================
1929// Token Parser
1930// =============================================================================
1931
1932/// Token stream parser.
1933///
1934/// Parses a stream of TDS tokens from a byte buffer.
1935///
1936/// # Basic vs Context-Aware Parsing
1937///
1938/// Some tokens (like `Done`, `Error`, `LoginAck`) can be parsed without context.
1939/// Use [`next_token()`](TokenParser::next_token) for these.
1940///
1941/// Other tokens (like `ColMetaData`, `Row`, `NbcRow`) require column metadata
1942/// to parse correctly. Use [`next_token_with_metadata()`](TokenParser::next_token_with_metadata)
1943/// for these.
1944///
1945/// # Example
1946///
1947/// ```rust,ignore
1948/// let mut parser = TokenParser::new(data);
1949/// let mut metadata = None;
1950///
1951/// while let Some(token) = parser.next_token_with_metadata(metadata.as_ref())? {
1952///     match token {
1953///         Token::ColMetaData(meta) => {
1954///             metadata = Some(meta);
1955///         }
1956///         Token::Row(row) => {
1957///             // Process row using metadata
1958///         }
1959///         Token::Done(done) => {
1960///             if !done.has_more() {
1961///                 break;
1962///             }
1963///         }
1964///         _ => {}
1965///     }
1966/// }
1967/// ```
1968pub struct TokenParser {
1969    data: Bytes,
1970    position: usize,
1971}
1972
1973impl TokenParser {
1974    /// Create a new token parser from bytes.
1975    #[must_use]
1976    pub fn new(data: Bytes) -> Self {
1977        Self { data, position: 0 }
1978    }
1979
1980    /// Get remaining bytes in the buffer.
1981    #[must_use]
1982    pub fn remaining(&self) -> usize {
1983        self.data.len().saturating_sub(self.position)
1984    }
1985
1986    /// Check if there are more bytes to parse.
1987    #[must_use]
1988    pub fn has_remaining(&self) -> bool {
1989        self.position < self.data.len()
1990    }
1991
1992    /// Peek at the next token type without consuming it.
1993    #[must_use]
1994    pub fn peek_token_type(&self) -> Option<TokenType> {
1995        if self.position < self.data.len() {
1996            TokenType::from_u8(self.data[self.position])
1997        } else {
1998            None
1999        }
2000    }
2001
2002    /// Parse the next token from the stream.
2003    ///
2004    /// This method can only parse context-independent tokens. For tokens that
2005    /// require column metadata (ColMetaData, Row, NbcRow), use
2006    /// [`next_token_with_metadata()`](TokenParser::next_token_with_metadata).
2007    ///
2008    /// Returns `None` if no more tokens are available.
2009    pub fn next_token(&mut self) -> Result<Option<Token>, ProtocolError> {
2010        self.next_token_with_metadata(None)
2011    }
2012
2013    /// Parse the next token with optional column metadata context.
2014    ///
2015    /// When `metadata` is provided, this method can parse Row and NbcRow tokens.
2016    /// Without metadata, those tokens will return an error.
2017    ///
2018    /// Returns `None` if no more tokens are available.
2019    pub fn next_token_with_metadata(
2020        &mut self,
2021        metadata: Option<&ColMetaData>,
2022    ) -> Result<Option<Token>, ProtocolError> {
2023        if !self.has_remaining() {
2024            return Ok(None);
2025        }
2026
2027        let mut buf = &self.data[self.position..];
2028        let start_pos = self.position;
2029
2030        let token_type_byte = buf.get_u8();
2031        let token_type = TokenType::from_u8(token_type_byte);
2032
2033        let token = match token_type {
2034            Some(TokenType::Done) => {
2035                let done = Done::decode(&mut buf)?;
2036                Token::Done(done)
2037            }
2038            Some(TokenType::DoneProc) => {
2039                let done = DoneProc::decode(&mut buf)?;
2040                Token::DoneProc(done)
2041            }
2042            Some(TokenType::DoneInProc) => {
2043                let done = DoneInProc::decode(&mut buf)?;
2044                Token::DoneInProc(done)
2045            }
2046            Some(TokenType::Error) => {
2047                let error = ServerError::decode(&mut buf)?;
2048                Token::Error(error)
2049            }
2050            Some(TokenType::Info) => {
2051                let info = ServerInfo::decode(&mut buf)?;
2052                Token::Info(info)
2053            }
2054            Some(TokenType::LoginAck) => {
2055                let login_ack = LoginAck::decode(&mut buf)?;
2056                Token::LoginAck(login_ack)
2057            }
2058            Some(TokenType::EnvChange) => {
2059                let env_change = EnvChange::decode(&mut buf)?;
2060                Token::EnvChange(env_change)
2061            }
2062            Some(TokenType::Order) => {
2063                let order = Order::decode(&mut buf)?;
2064                Token::Order(order)
2065            }
2066            Some(TokenType::FeatureExtAck) => {
2067                let ack = FeatureExtAck::decode(&mut buf)?;
2068                Token::FeatureExtAck(ack)
2069            }
2070            Some(TokenType::Sspi) => {
2071                let sspi = SspiToken::decode(&mut buf)?;
2072                Token::Sspi(sspi)
2073            }
2074            Some(TokenType::FedAuthInfo) => {
2075                let info = FedAuthInfo::decode(&mut buf)?;
2076                Token::FedAuthInfo(info)
2077            }
2078            Some(TokenType::ReturnStatus) => {
2079                if buf.remaining() < 4 {
2080                    return Err(ProtocolError::UnexpectedEof);
2081                }
2082                let status = buf.get_i32_le();
2083                Token::ReturnStatus(status)
2084            }
2085            Some(TokenType::ColMetaData) => {
2086                let col_meta = ColMetaData::decode(&mut buf)?;
2087                Token::ColMetaData(col_meta)
2088            }
2089            Some(TokenType::Row) => {
2090                let meta = metadata.ok_or_else(|| {
2091                    ProtocolError::StringEncoding(
2092                        #[cfg(feature = "std")]
2093                        "Row token requires column metadata".to_string(),
2094                        #[cfg(not(feature = "std"))]
2095                        "Row token requires column metadata",
2096                    )
2097                })?;
2098                let row = RawRow::decode(&mut buf, meta)?;
2099                Token::Row(row)
2100            }
2101            Some(TokenType::NbcRow) => {
2102                let meta = metadata.ok_or_else(|| {
2103                    ProtocolError::StringEncoding(
2104                        #[cfg(feature = "std")]
2105                        "NbcRow token requires column metadata".to_string(),
2106                        #[cfg(not(feature = "std"))]
2107                        "NbcRow token requires column metadata",
2108                    )
2109                })?;
2110                let row = NbcRow::decode(&mut buf, meta)?;
2111                Token::NbcRow(row)
2112            }
2113            Some(TokenType::ReturnValue) => {
2114                let ret_val = ReturnValue::decode(&mut buf)?;
2115                Token::ReturnValue(ret_val)
2116            }
2117            Some(TokenType::SessionState) => {
2118                let session = SessionState::decode(&mut buf)?;
2119                Token::SessionState(session)
2120            }
2121            Some(TokenType::ColInfo) | Some(TokenType::TabName) | Some(TokenType::Offset) => {
2122                // These tokens are rarely used and have complex formats.
2123                // Skip them by reading the length and advancing.
2124                if buf.remaining() < 2 {
2125                    return Err(ProtocolError::UnexpectedEof);
2126                }
2127                let length = buf.get_u16_le() as usize;
2128                if buf.remaining() < length {
2129                    return Err(ProtocolError::IncompletePacket {
2130                        expected: length,
2131                        actual: buf.remaining(),
2132                    });
2133                }
2134                // Skip the data
2135                buf.advance(length);
2136                // Recursively get the next token
2137                self.position = start_pos + (self.data.len() - start_pos - buf.remaining());
2138                return self.next_token_with_metadata(metadata);
2139            }
2140            None => {
2141                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2142            }
2143        };
2144
2145        // Update position based on how much was consumed
2146        let consumed = self.data.len() - start_pos - buf.remaining();
2147        self.position = start_pos + consumed;
2148
2149        Ok(Some(token))
2150    }
2151
2152    /// Skip the current token without fully parsing it.
2153    ///
2154    /// This is useful for skipping unknown or uninteresting tokens.
2155    pub fn skip_token(&mut self) -> Result<(), ProtocolError> {
2156        if !self.has_remaining() {
2157            return Ok(());
2158        }
2159
2160        let token_type_byte = self.data[self.position];
2161        let token_type = TokenType::from_u8(token_type_byte);
2162
2163        // Calculate how many bytes to skip based on token type
2164        let skip_amount = match token_type {
2165            // Fixed-size tokens
2166            Some(TokenType::Done) | Some(TokenType::DoneProc) | Some(TokenType::DoneInProc) => {
2167                1 + Done::SIZE // token type + 12 bytes
2168            }
2169            Some(TokenType::ReturnStatus) => {
2170                1 + 4 // token type + 4 bytes
2171            }
2172            // Variable-length tokens with 2-byte length prefix
2173            Some(TokenType::Error)
2174            | Some(TokenType::Info)
2175            | Some(TokenType::LoginAck)
2176            | Some(TokenType::EnvChange)
2177            | Some(TokenType::Order)
2178            | Some(TokenType::Sspi)
2179            | Some(TokenType::ColInfo)
2180            | Some(TokenType::TabName)
2181            | Some(TokenType::Offset)
2182            | Some(TokenType::ReturnValue) => {
2183                if self.remaining() < 3 {
2184                    return Err(ProtocolError::UnexpectedEof);
2185                }
2186                let length = u16::from_le_bytes([
2187                    self.data[self.position + 1],
2188                    self.data[self.position + 2],
2189                ]) as usize;
2190                1 + 2 + length // token type + length prefix + data
2191            }
2192            // Tokens with 4-byte length prefix
2193            Some(TokenType::SessionState) | Some(TokenType::FedAuthInfo) => {
2194                if self.remaining() < 5 {
2195                    return Err(ProtocolError::UnexpectedEof);
2196                }
2197                let length = u32::from_le_bytes([
2198                    self.data[self.position + 1],
2199                    self.data[self.position + 2],
2200                    self.data[self.position + 3],
2201                    self.data[self.position + 4],
2202                ]) as usize;
2203                1 + 4 + length
2204            }
2205            // FeatureExtAck has no length prefix - must parse
2206            Some(TokenType::FeatureExtAck) => {
2207                // Parse to find end
2208                let mut buf = &self.data[self.position + 1..];
2209                let _ = FeatureExtAck::decode(&mut buf)?;
2210                self.data.len() - self.position - buf.remaining()
2211            }
2212            // ColMetaData, Row, NbcRow require context and can't be easily skipped
2213            Some(TokenType::ColMetaData) | Some(TokenType::Row) | Some(TokenType::NbcRow) => {
2214                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2215            }
2216            None => {
2217                return Err(ProtocolError::InvalidTokenType(token_type_byte));
2218            }
2219        };
2220
2221        if self.remaining() < skip_amount {
2222            return Err(ProtocolError::UnexpectedEof);
2223        }
2224
2225        self.position += skip_amount;
2226        Ok(())
2227    }
2228
2229    /// Get the current position in the buffer.
2230    #[must_use]
2231    pub fn position(&self) -> usize {
2232        self.position
2233    }
2234
2235    /// Reset the parser to the beginning.
2236    pub fn reset(&mut self) {
2237        self.position = 0;
2238    }
2239}
2240
2241// =============================================================================
2242// no_std support
2243// =============================================================================
2244
2245#[cfg(not(feature = "std"))]
2246use alloc::string::String;
2247#[cfg(not(feature = "std"))]
2248use alloc::vec::Vec;
2249
2250// =============================================================================
2251// Tests
2252// =============================================================================
2253
2254#[cfg(test)]
2255#[allow(clippy::unwrap_used, clippy::panic)]
2256mod tests {
2257    use super::*;
2258    use bytes::BytesMut;
2259
2260    #[test]
2261    fn test_done_roundtrip() {
2262        let done = Done {
2263            status: DoneStatus {
2264                more: false,
2265                error: false,
2266                in_xact: false,
2267                count: true,
2268                attn: false,
2269                srverror: false,
2270            },
2271            cur_cmd: 193, // SELECT
2272            row_count: 42,
2273        };
2274
2275        let mut buf = BytesMut::new();
2276        done.encode(&mut buf);
2277
2278        // Skip the token type byte
2279        let mut cursor = &buf[1..];
2280        let decoded = Done::decode(&mut cursor).unwrap();
2281
2282        assert_eq!(decoded.status.count, done.status.count);
2283        assert_eq!(decoded.cur_cmd, done.cur_cmd);
2284        assert_eq!(decoded.row_count, done.row_count);
2285    }
2286
2287    #[test]
2288    fn test_done_status_bits() {
2289        let status = DoneStatus {
2290            more: true,
2291            error: true,
2292            in_xact: true,
2293            count: true,
2294            attn: false,
2295            srverror: false,
2296        };
2297
2298        let bits = status.to_bits();
2299        let restored = DoneStatus::from_bits(bits);
2300
2301        assert_eq!(status.more, restored.more);
2302        assert_eq!(status.error, restored.error);
2303        assert_eq!(status.in_xact, restored.in_xact);
2304        assert_eq!(status.count, restored.count);
2305    }
2306
2307    #[test]
2308    fn test_token_parser_done() {
2309        // DONE token: type (1) + status (2) + curcmd (2) + rowcount (8)
2310        let data = Bytes::from_static(&[
2311            0xFD, // DONE token type
2312            0x10, 0x00, // status: DONE_COUNT
2313            0xC1, 0x00, // cur_cmd: 193 (SELECT)
2314            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count: 5
2315        ]);
2316
2317        let mut parser = TokenParser::new(data);
2318        let token = parser.next_token().unwrap().unwrap();
2319
2320        match token {
2321            Token::Done(done) => {
2322                assert!(done.status.count);
2323                assert!(!done.status.more);
2324                assert_eq!(done.cur_cmd, 193);
2325                assert_eq!(done.row_count, 5);
2326            }
2327            _ => panic!("Expected Done token"),
2328        }
2329
2330        // No more tokens
2331        assert!(parser.next_token().unwrap().is_none());
2332    }
2333
2334    #[test]
2335    fn test_env_change_type_from_u8() {
2336        assert_eq!(EnvChangeType::from_u8(1), Some(EnvChangeType::Database));
2337        assert_eq!(EnvChangeType::from_u8(20), Some(EnvChangeType::Routing));
2338        assert_eq!(EnvChangeType::from_u8(100), None);
2339    }
2340
2341    #[test]
2342    fn test_colmetadata_no_columns() {
2343        // No metadata marker (0xFFFF)
2344        let data = Bytes::from_static(&[0xFF, 0xFF]);
2345        let mut cursor: &[u8] = &data;
2346        let meta = ColMetaData::decode(&mut cursor).unwrap();
2347        assert!(meta.is_empty());
2348        assert_eq!(meta.column_count(), 0);
2349    }
2350
2351    #[test]
2352    fn test_colmetadata_single_int_column() {
2353        // COLMETADATA with 1 INT column
2354        // Format: column_count (2) + [user_type (4) + flags (2) + type_id (1) + name (b_varchar)]
2355        let mut data = BytesMut::new();
2356        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2357        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2358        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2359        data.extend_from_slice(&[0x38]); // TypeId::Int4
2360        // Column name "id" as B_VARCHAR (1 byte length + UTF-16LE)
2361        data.extend_from_slice(&[0x02]); // 2 characters
2362        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id" in UTF-16LE
2363
2364        let mut cursor: &[u8] = &data;
2365        let meta = ColMetaData::decode(&mut cursor).unwrap();
2366
2367        assert_eq!(meta.column_count(), 1);
2368        assert_eq!(meta.columns[0].name, "id");
2369        assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2370        assert!(meta.columns[0].is_nullable());
2371    }
2372
2373    #[test]
2374    fn test_colmetadata_nvarchar_column() {
2375        // COLMETADATA with 1 NVARCHAR(50) column
2376        let mut data = BytesMut::new();
2377        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2378        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2379        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2380        data.extend_from_slice(&[0xE7]); // TypeId::NVarChar
2381        // Type info: max_length (2 bytes) + collation (5 bytes)
2382        data.extend_from_slice(&[0x64, 0x00]); // max_length = 100 (50 chars * 2)
2383        data.extend_from_slice(&[0x09, 0x04, 0xD0, 0x00, 0x34]); // collation
2384        // Column name "name"
2385        data.extend_from_slice(&[0x04]); // 4 characters
2386        data.extend_from_slice(&[b'n', 0x00, b'a', 0x00, b'm', 0x00, b'e', 0x00]);
2387
2388        let mut cursor: &[u8] = &data;
2389        let meta = ColMetaData::decode(&mut cursor).unwrap();
2390
2391        assert_eq!(meta.column_count(), 1);
2392        assert_eq!(meta.columns[0].name, "name");
2393        assert_eq!(meta.columns[0].type_id, TypeId::NVarChar);
2394        assert_eq!(meta.columns[0].type_info.max_length, Some(100));
2395        assert!(meta.columns[0].type_info.collation.is_some());
2396    }
2397
2398    #[test]
2399    fn test_raw_row_decode_int() {
2400        // Create metadata for a single INT column
2401        let metadata = ColMetaData {
2402            columns: vec![ColumnData {
2403                name: "id".to_string(),
2404                type_id: TypeId::Int4,
2405                col_type: 0x38,
2406                flags: 0,
2407                user_type: 0,
2408                type_info: TypeInfo::default(),
2409            }],
2410        };
2411
2412        // Row data: just 4 bytes for the int value 42
2413        let data = Bytes::from_static(&[0x2A, 0x00, 0x00, 0x00]); // 42 in little-endian
2414        let mut cursor: &[u8] = &data;
2415        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2416
2417        // The raw data should contain the 4 bytes
2418        assert_eq!(row.data.len(), 4);
2419        assert_eq!(&row.data[..], &[0x2A, 0x00, 0x00, 0x00]);
2420    }
2421
2422    #[test]
2423    fn test_raw_row_decode_nullable_int() {
2424        // Create metadata for a nullable INT column (IntN)
2425        let metadata = ColMetaData {
2426            columns: vec![ColumnData {
2427                name: "id".to_string(),
2428                type_id: TypeId::IntN,
2429                col_type: 0x26,
2430                flags: 0x01, // nullable
2431                user_type: 0,
2432                type_info: TypeInfo {
2433                    max_length: Some(4),
2434                    ..Default::default()
2435                },
2436            }],
2437        };
2438
2439        // Row data with value: 1 byte length + 4 bytes value
2440        let data = Bytes::from_static(&[0x04, 0x2A, 0x00, 0x00, 0x00]); // length=4, value=42
2441        let mut cursor: &[u8] = &data;
2442        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2443
2444        assert_eq!(row.data.len(), 5);
2445        assert_eq!(row.data[0], 4); // length
2446        assert_eq!(&row.data[1..], &[0x2A, 0x00, 0x00, 0x00]);
2447    }
2448
2449    #[test]
2450    fn test_raw_row_decode_null_value() {
2451        // Create metadata for a nullable INT column (IntN)
2452        let metadata = ColMetaData {
2453            columns: vec![ColumnData {
2454                name: "id".to_string(),
2455                type_id: TypeId::IntN,
2456                col_type: 0x26,
2457                flags: 0x01, // nullable
2458                user_type: 0,
2459                type_info: TypeInfo {
2460                    max_length: Some(4),
2461                    ..Default::default()
2462                },
2463            }],
2464        };
2465
2466        // NULL value: length = 0xFF (for bytelen types)
2467        let data = Bytes::from_static(&[0xFF]);
2468        let mut cursor: &[u8] = &data;
2469        let row = RawRow::decode(&mut cursor, &metadata).unwrap();
2470
2471        assert_eq!(row.data.len(), 1);
2472        assert_eq!(row.data[0], 0xFF); // NULL marker
2473    }
2474
2475    #[test]
2476    fn test_nbcrow_null_bitmap() {
2477        let row = NbcRow {
2478            null_bitmap: vec![0b00000101], // columns 0 and 2 are NULL
2479            data: Bytes::new(),
2480        };
2481
2482        assert!(row.is_null(0));
2483        assert!(!row.is_null(1));
2484        assert!(row.is_null(2));
2485        assert!(!row.is_null(3));
2486    }
2487
2488    #[test]
2489    fn test_token_parser_colmetadata() {
2490        // Build a COLMETADATA token with 1 INT column
2491        let mut data = BytesMut::new();
2492        data.extend_from_slice(&[0x81]); // COLMETADATA token type
2493        data.extend_from_slice(&[0x01, 0x00]); // 1 column
2494        data.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); // user_type = 0
2495        data.extend_from_slice(&[0x01, 0x00]); // flags (nullable)
2496        data.extend_from_slice(&[0x38]); // TypeId::Int4
2497        data.extend_from_slice(&[0x02]); // column name length
2498        data.extend_from_slice(&[b'i', 0x00, b'd', 0x00]); // "id"
2499
2500        let mut parser = TokenParser::new(data.freeze());
2501        let token = parser.next_token().unwrap().unwrap();
2502
2503        match token {
2504            Token::ColMetaData(meta) => {
2505                assert_eq!(meta.column_count(), 1);
2506                assert_eq!(meta.columns[0].name, "id");
2507                assert_eq!(meta.columns[0].type_id, TypeId::Int4);
2508            }
2509            _ => panic!("Expected ColMetaData token"),
2510        }
2511    }
2512
2513    #[test]
2514    fn test_token_parser_row_with_metadata() {
2515        // Build metadata
2516        let metadata = ColMetaData {
2517            columns: vec![ColumnData {
2518                name: "id".to_string(),
2519                type_id: TypeId::Int4,
2520                col_type: 0x38,
2521                flags: 0,
2522                user_type: 0,
2523                type_info: TypeInfo::default(),
2524            }],
2525        };
2526
2527        // Build ROW token
2528        let mut data = BytesMut::new();
2529        data.extend_from_slice(&[0xD1]); // ROW token type
2530        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2531
2532        let mut parser = TokenParser::new(data.freeze());
2533        let token = parser
2534            .next_token_with_metadata(Some(&metadata))
2535            .unwrap()
2536            .unwrap();
2537
2538        match token {
2539            Token::Row(row) => {
2540                assert_eq!(row.data.len(), 4);
2541            }
2542            _ => panic!("Expected Row token"),
2543        }
2544    }
2545
2546    #[test]
2547    fn test_token_parser_row_without_metadata_fails() {
2548        // Build ROW token
2549        let mut data = BytesMut::new();
2550        data.extend_from_slice(&[0xD1]); // ROW token type
2551        data.extend_from_slice(&[0x2A, 0x00, 0x00, 0x00]); // value = 42
2552
2553        let mut parser = TokenParser::new(data.freeze());
2554        let result = parser.next_token(); // No metadata provided
2555
2556        assert!(result.is_err());
2557    }
2558
2559    #[test]
2560    fn test_token_parser_peek() {
2561        let data = Bytes::from_static(&[
2562            0xFD, // DONE token type
2563            0x10, 0x00, // status
2564            0xC1, 0x00, // cur_cmd
2565            0x05, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // row_count
2566        ]);
2567
2568        let parser = TokenParser::new(data);
2569        assert_eq!(parser.peek_token_type(), Some(TokenType::Done));
2570    }
2571
2572    #[test]
2573    fn test_column_data_fixed_size() {
2574        let col = ColumnData {
2575            name: String::new(),
2576            type_id: TypeId::Int4,
2577            col_type: 0x38,
2578            flags: 0,
2579            user_type: 0,
2580            type_info: TypeInfo::default(),
2581        };
2582        assert_eq!(col.fixed_size(), Some(4));
2583
2584        let col2 = ColumnData {
2585            name: String::new(),
2586            type_id: TypeId::NVarChar,
2587            col_type: 0xE7,
2588            flags: 0,
2589            user_type: 0,
2590            type_info: TypeInfo::default(),
2591        };
2592        assert_eq!(col2.fixed_size(), None);
2593    }
2594
2595    // ========================================================================
2596    // End-to-End Decode Tests (Wire → Stored → Verification)
2597    // ========================================================================
2598    //
2599    // These tests verify that RawRow::decode_column_value correctly stores
2600    // column values in a format that can be parsed back.
2601
2602    #[test]
2603    fn test_decode_nvarchar_then_intn_roundtrip() {
2604        // Simulate wire data for: "World" (NVarChar), 42 (IntN)
2605        // This tests the scenario from the MCP parameterized query
2606
2607        // Build wire data (what the server sends)
2608        let mut wire_data = BytesMut::new();
2609
2610        // Column 0: NVarChar "World" - 2-byte length prefix in bytes
2611        // "World" in UTF-16LE: W=0x0057, o=0x006F, r=0x0072, l=0x006C, d=0x0064
2612        let word = "World";
2613        let utf16: Vec<u16> = word.encode_utf16().collect();
2614        wire_data.put_u16_le((utf16.len() * 2) as u16); // byte length = 10
2615        for code_unit in &utf16 {
2616            wire_data.put_u16_le(*code_unit);
2617        }
2618
2619        // Column 1: IntN 42 - 1-byte length prefix
2620        wire_data.put_u8(4); // 4 bytes for INT
2621        wire_data.put_i32_le(42);
2622
2623        // Build column metadata
2624        let metadata = ColMetaData {
2625            columns: vec![
2626                ColumnData {
2627                    name: "greeting".to_string(),
2628                    type_id: TypeId::NVarChar,
2629                    col_type: 0xE7,
2630                    flags: 0x01,
2631                    user_type: 0,
2632                    type_info: TypeInfo {
2633                        max_length: Some(10), // non-MAX
2634                        precision: None,
2635                        scale: None,
2636                        collation: None,
2637                    },
2638                },
2639                ColumnData {
2640                    name: "number".to_string(),
2641                    type_id: TypeId::IntN,
2642                    col_type: 0x26,
2643                    flags: 0x01,
2644                    user_type: 0,
2645                    type_info: TypeInfo {
2646                        max_length: Some(4),
2647                        precision: None,
2648                        scale: None,
2649                        collation: None,
2650                    },
2651                },
2652            ],
2653        };
2654
2655        // Decode the wire data into stored format
2656        let mut wire_cursor = wire_data.freeze();
2657        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2658
2659        // Verify wire data was fully consumed
2660        assert_eq!(
2661            wire_cursor.remaining(),
2662            0,
2663            "wire data should be fully consumed"
2664        );
2665
2666        // Now parse the stored data
2667        let mut stored_cursor: &[u8] = &raw_row.data;
2668
2669        // Parse column 0 (NVarChar)
2670        // Stored format for non-MAX NVarChar: [2-byte len][data]
2671        assert!(
2672            stored_cursor.remaining() >= 2,
2673            "need at least 2 bytes for length"
2674        );
2675        let len0 = stored_cursor.get_u16_le() as usize;
2676        assert_eq!(len0, 10, "NVarChar length should be 10 bytes");
2677        assert!(
2678            stored_cursor.remaining() >= len0,
2679            "need {len0} bytes for data"
2680        );
2681
2682        // Read UTF-16LE and convert to string
2683        let mut utf16_read = Vec::new();
2684        for _ in 0..(len0 / 2) {
2685            utf16_read.push(stored_cursor.get_u16_le());
2686        }
2687        let string0 = String::from_utf16(&utf16_read).unwrap();
2688        assert_eq!(string0, "World", "column 0 should be 'World'");
2689
2690        // Parse column 1 (IntN)
2691        // Stored format for IntN: [1-byte len][data]
2692        assert!(
2693            stored_cursor.remaining() >= 1,
2694            "need at least 1 byte for length"
2695        );
2696        let len1 = stored_cursor.get_u8();
2697        assert_eq!(len1, 4, "IntN length should be 4");
2698        assert!(stored_cursor.remaining() >= 4, "need 4 bytes for INT data");
2699        let int1 = stored_cursor.get_i32_le();
2700        assert_eq!(int1, 42, "column 1 should be 42");
2701
2702        // Verify stored data was fully consumed
2703        assert_eq!(
2704            stored_cursor.remaining(),
2705            0,
2706            "stored data should be fully consumed"
2707        );
2708    }
2709
2710    #[test]
2711    fn test_decode_nvarchar_max_then_intn_roundtrip() {
2712        // Test NVARCHAR(MAX) followed by IntN - uses PLP encoding
2713
2714        // Build wire data for PLP NVARCHAR(MAX) + IntN
2715        let mut wire_data = BytesMut::new();
2716
2717        // Column 0: NVARCHAR(MAX) "Hello" - PLP format
2718        // PLP: 8-byte total length, then chunks
2719        let word = "Hello";
2720        let utf16: Vec<u16> = word.encode_utf16().collect();
2721        let byte_len = (utf16.len() * 2) as u64;
2722
2723        wire_data.put_u64_le(byte_len); // total length = 10
2724        wire_data.put_u32_le(byte_len as u32); // chunk length = 10
2725        for code_unit in &utf16 {
2726            wire_data.put_u16_le(*code_unit);
2727        }
2728        wire_data.put_u32_le(0); // terminating zero-length chunk
2729
2730        // Column 1: IntN 99
2731        wire_data.put_u8(4);
2732        wire_data.put_i32_le(99);
2733
2734        // Build metadata with MAX type
2735        let metadata = ColMetaData {
2736            columns: vec![
2737                ColumnData {
2738                    name: "text".to_string(),
2739                    type_id: TypeId::NVarChar,
2740                    col_type: 0xE7,
2741                    flags: 0x01,
2742                    user_type: 0,
2743                    type_info: TypeInfo {
2744                        max_length: Some(0xFFFF), // MAX indicator
2745                        precision: None,
2746                        scale: None,
2747                        collation: None,
2748                    },
2749                },
2750                ColumnData {
2751                    name: "num".to_string(),
2752                    type_id: TypeId::IntN,
2753                    col_type: 0x26,
2754                    flags: 0x01,
2755                    user_type: 0,
2756                    type_info: TypeInfo {
2757                        max_length: Some(4),
2758                        precision: None,
2759                        scale: None,
2760                        collation: None,
2761                    },
2762                },
2763            ],
2764        };
2765
2766        // Decode wire data
2767        let mut wire_cursor = wire_data.freeze();
2768        let raw_row = RawRow::decode(&mut wire_cursor, &metadata).unwrap();
2769
2770        // Verify wire data was fully consumed
2771        assert_eq!(
2772            wire_cursor.remaining(),
2773            0,
2774            "wire data should be fully consumed"
2775        );
2776
2777        // Parse stored PLP data for column 0
2778        let mut stored_cursor: &[u8] = &raw_row.data;
2779
2780        // PLP stored format: [8-byte total][chunks...][4-byte 0]
2781        let total_len = stored_cursor.get_u64_le();
2782        assert_eq!(total_len, 10, "PLP total length should be 10");
2783
2784        let chunk_len = stored_cursor.get_u32_le();
2785        assert_eq!(chunk_len, 10, "PLP chunk length should be 10");
2786
2787        let mut utf16_read = Vec::new();
2788        for _ in 0..(chunk_len / 2) {
2789            utf16_read.push(stored_cursor.get_u16_le());
2790        }
2791        let string0 = String::from_utf16(&utf16_read).unwrap();
2792        assert_eq!(string0, "Hello", "column 0 should be 'Hello'");
2793
2794        let terminator = stored_cursor.get_u32_le();
2795        assert_eq!(terminator, 0, "PLP should end with 0");
2796
2797        // Parse IntN
2798        let len1 = stored_cursor.get_u8();
2799        assert_eq!(len1, 4);
2800        let int1 = stored_cursor.get_i32_le();
2801        assert_eq!(int1, 99, "column 1 should be 99");
2802
2803        // Verify fully consumed
2804        assert_eq!(
2805            stored_cursor.remaining(),
2806            0,
2807            "stored data should be fully consumed"
2808        );
2809    }
2810}