Skip to main content

mssql_client/
row.rs

1//! Row representation for query results.
2//!
3//! This module implements the `Arc<Bytes>` pattern from ADR-004 for reduced-copy
4//! row data access. The `Row` struct holds a shared reference to the raw packet
5//! buffer, deferring allocation until explicitly requested.
6//!
7//! ## Access Patterns (per ADR-004)
8//!
9//! - `get_bytes()` - Returns borrowed slice into buffer (zero additional allocation)
10//! - `get_str()` - Returns Cow - borrowed if valid UTF-8, owned if conversion needed
11//! - `get_string()` - Allocates new String (explicit allocation)
12//! - `get<T>()` - Type-converting accessor with allocation only if needed
13
14use std::borrow::Cow;
15use std::sync::Arc;
16
17use bytes::Bytes;
18
19use mssql_types::__private::decode_value;
20use mssql_types::decode::TypeInfo;
21use mssql_types::{FromSql, SqlValue, TypeError};
22
23use crate::blob::BlobReader;
24
25/// Column slice information pointing into the row buffer.
26///
27/// This is the internal representation that enables zero-copy access
28/// to column data within the shared buffer.
29#[derive(Debug, Clone, Copy)]
30#[non_exhaustive]
31pub struct ColumnSlice {
32    /// Offset into the buffer where this column's data begins.
33    pub offset: u32,
34    /// Length of the column data in bytes.
35    pub length: u32,
36    /// Whether this column value is NULL.
37    pub is_null: bool,
38}
39
40impl ColumnSlice {
41    /// Create a new column slice.
42    pub fn new(offset: u32, length: u32, is_null: bool) -> Self {
43        Self {
44            offset,
45            length,
46            is_null,
47        }
48    }
49
50    /// Create a NULL column slice.
51    pub fn null() -> Self {
52        Self {
53            offset: 0,
54            length: 0,
55            is_null: true,
56        }
57    }
58}
59
60/// Column metadata describing a result set column.
61///
62/// This struct is marked `#[non_exhaustive]` to allow adding new fields
63/// in future versions without breaking semver compatibility. Use
64/// [`Column::new()`] or builder methods to construct instances.
65#[derive(Debug, Clone)]
66#[non_exhaustive]
67pub struct Column {
68    /// Column name.
69    pub name: String,
70    /// Column index (0-based).
71    pub index: usize,
72    /// SQL type name (e.g., "INT", "NVARCHAR").
73    pub type_name: String,
74    /// Whether the column allows NULL values.
75    pub nullable: bool,
76    /// Maximum length for variable-length types.
77    pub max_length: Option<u32>,
78    /// Precision for numeric types.
79    pub precision: Option<u8>,
80    /// Scale for numeric types.
81    pub scale: Option<u8>,
82    /// Collation for string types (VARCHAR, CHAR, TEXT).
83    ///
84    /// Used for proper encoding/decoding of non-Unicode string data.
85    /// When present, enables collation-aware decoding that correctly
86    /// handles locale-specific ANSI encodings (e.g., Shift_JIS, GB18030).
87    pub collation: Option<tds_protocol::Collation>,
88}
89
90impl Column {
91    /// Create a new column with basic metadata.
92    pub fn new(name: impl Into<String>, index: usize, type_name: impl Into<String>) -> Self {
93        Self {
94            name: name.into(),
95            index,
96            type_name: type_name.into(),
97            nullable: true,
98            max_length: None,
99            precision: None,
100            scale: None,
101            collation: None,
102        }
103    }
104
105    /// Set whether the column is nullable.
106    #[must_use]
107    pub fn with_nullable(mut self, nullable: bool) -> Self {
108        self.nullable = nullable;
109        self
110    }
111
112    /// Set the maximum length.
113    #[must_use]
114    pub fn with_max_length(mut self, max_length: u32) -> Self {
115        self.max_length = Some(max_length);
116        self
117    }
118
119    /// Set precision and scale for numeric types.
120    #[must_use]
121    pub fn with_precision_scale(mut self, precision: u8, scale: u8) -> Self {
122        self.precision = Some(precision);
123        self.scale = Some(scale);
124        self
125    }
126
127    /// Set the collation for string types.
128    ///
129    /// Used for proper encoding/decoding of non-Unicode string data (VARCHAR, CHAR, TEXT).
130    #[must_use]
131    pub fn with_collation(mut self, collation: tds_protocol::Collation) -> Self {
132        self.collation = Some(collation);
133        self
134    }
135
136    /// Get the encoding name for this column's collation.
137    ///
138    /// Returns the name of the character encoding used for this column's data,
139    /// or "unknown" if the collation is not set or the encoding feature is disabled.
140    ///
141    /// # Examples
142    ///
143    /// - `"Shift_JIS"` - Japanese encoding (LCID 0x0411)
144    /// - `"GB18030"` - Simplified Chinese (LCID 0x0804)
145    /// - `"UTF-8"` - SQL Server 2019+ UTF-8 collation
146    /// - `"windows-1252"` - Latin/Western European (LCID 0x0409)
147    /// - `"unknown"` - No collation or unsupported encoding
148    #[must_use]
149    pub fn encoding_name(&self) -> &'static str {
150        #[cfg(feature = "encoding")]
151        if let Some(ref collation) = self.collation {
152            return collation.encoding_name();
153        }
154        "unknown"
155    }
156
157    /// Check if this column uses UTF-8 encoding.
158    ///
159    /// Returns `true` if the column has a SQL Server 2019+ UTF-8 collation,
160    /// which is indicated by fUTF8 (bit 26, 0x0400_0000) being set in the
161    /// collation info field.
162    #[must_use]
163    pub fn is_utf8_collation(&self) -> bool {
164        #[cfg(feature = "encoding")]
165        if let Some(ref collation) = self.collation {
166            return collation.is_utf8();
167        }
168        false
169    }
170
171    /// Convert column metadata to TDS TypeInfo for decoding.
172    ///
173    /// Maps type names to TDS type IDs and constructs appropriate TypeInfo.
174    pub fn to_type_info(&self) -> TypeInfo {
175        let type_id = type_name_to_id(&self.type_name);
176        TypeInfo {
177            type_id,
178            length: self.max_length,
179            scale: self.scale,
180            precision: self.precision,
181            collation: self.collation.map(|c| mssql_types::decode::Collation {
182                lcid: c.lcid,
183                flags: c.sort_id,
184            }),
185        }
186    }
187}
188
189/// Map SQL type name to TDS type ID.
190fn type_name_to_id(name: &str) -> u8 {
191    match name.to_uppercase().as_str() {
192        // Integer types
193        "INT" | "INTEGER" => 0x38,
194        "BIGINT" => 0x7F,
195        "SMALLINT" => 0x34,
196        "TINYINT" => 0x30,
197        "BIT" => 0x32,
198
199        // Floating point
200        "FLOAT" => 0x3E,
201        "REAL" => 0x3B,
202
203        // Decimal/Numeric
204        "DECIMAL" | "NUMERIC" => 0x6C,
205        "MONEY" | "SMALLMONEY" => 0x6E,
206
207        // String types
208        "NVARCHAR" | "NCHAR" | "NTEXT" => 0xE7,
209        "VARCHAR" | "CHAR" | "TEXT" => 0xA7,
210
211        // Binary types
212        "VARBINARY" | "BINARY" | "IMAGE" => 0xA5,
213
214        // Date/Time types
215        "DATE" => 0x28,
216        "TIME" => 0x29,
217        "DATETIME2" => 0x2A,
218        "DATETIMEOFFSET" => 0x2B,
219        "DATETIME" => 0x3D,
220        "SMALLDATETIME" => 0x3F,
221
222        // GUID
223        "UNIQUEIDENTIFIER" => 0x24,
224
225        // XML
226        "XML" => 0xF1,
227
228        // Nullable variants (INTNTYPE, etc.)
229        _ if name.ends_with("N") => 0x26,
230
231        // Default to binary for unknown types
232        _ => 0xA5,
233    }
234}
235
236/// Shared column metadata for a result set.
237///
238/// This is shared across all rows in the result set to avoid
239/// duplicating metadata per row.
240#[derive(Debug, Clone)]
241pub struct ColMetaData {
242    /// Column definitions.
243    pub columns: Arc<[Column]>,
244}
245
246impl ColMetaData {
247    /// Create new column metadata from a list of columns.
248    pub fn new(columns: Vec<Column>) -> Self {
249        Self {
250            columns: columns.into(),
251        }
252    }
253
254    /// Get the number of columns.
255    #[must_use]
256    pub fn len(&self) -> usize {
257        self.columns.len()
258    }
259
260    /// Check if there are no columns.
261    #[must_use]
262    pub fn is_empty(&self) -> bool {
263        self.columns.is_empty()
264    }
265
266    /// Get a column by index.
267    #[must_use]
268    pub fn get(&self, index: usize) -> Option<&Column> {
269        self.columns.get(index)
270    }
271
272    /// Find a column index by name (case-insensitive).
273    #[must_use]
274    pub fn find_by_name(&self, name: &str) -> Option<usize> {
275        self.columns
276            .iter()
277            .position(|c| c.name.eq_ignore_ascii_case(name))
278    }
279}
280
281/// A row from a query result.
282///
283/// Implements the `Arc<Bytes>` pattern from ADR-004 for reduced memory allocation.
284/// The row holds a shared reference to the raw packet buffer and column slice
285/// information, deferring parsing and allocation until values are accessed.
286///
287/// # Memory Model
288///
289/// ```text
290/// Row {
291///     buffer: Arc<Bytes> ──────────► [raw packet data...]
292///     slices: Arc<[ColumnSlice]> ──► [{offset, length, is_null}, ...]
293///     metadata: Arc<ColMetaData> ──► [Column definitions...]
294/// }
295/// ```
296///
297/// Multiple `Row` instances from the same result set share the `metadata`.
298/// The `buffer` and `slices` are unique per row but use `Arc` for cheap cloning.
299///
300/// # Access Patterns
301///
302/// - **Zero-copy:** `get_bytes()`, `get_str()` (when UTF-8 valid)
303/// - **Allocating:** `get_string()`, `get::<String>()`
304/// - **Type-converting:** `get::<T>()` uses `FromSql` trait
305#[derive(Clone)]
306pub struct Row {
307    /// Shared reference to raw packet body containing row data.
308    buffer: Arc<Bytes>,
309    /// Column offsets into buffer.
310    slices: Arc<[ColumnSlice]>,
311    /// Column metadata (shared across result set).
312    metadata: Arc<ColMetaData>,
313    /// Cached parsed values (lazily populated).
314    /// This maintains backward compatibility with code expecting SqlValue access.
315    values: Option<Arc<[SqlValue]>>,
316}
317
318impl Row {
319    /// Create a new row with the `Arc<Bytes>` pattern.
320    ///
321    /// This is the primary constructor for the reduced-copy pattern.
322    pub fn new(buffer: Arc<Bytes>, slices: Arc<[ColumnSlice]>, metadata: Arc<ColMetaData>) -> Self {
323        Self {
324            buffer,
325            slices,
326            metadata,
327            values: None,
328        }
329    }
330
331    /// Create a row from pre-parsed values (backward compatibility).
332    ///
333    /// This constructor supports existing code that works with `SqlValue` directly.
334    /// It's less efficient than the buffer-based approach but maintains compatibility.
335    pub fn from_values(columns: Vec<Column>, values: Vec<SqlValue>) -> Self {
336        Self::from_values_shared(Arc::new(ColMetaData::new(columns)), values)
337    }
338
339    /// Like [`Row::from_values`], but shares an already-built
340    /// `Arc<ColMetaData>` rather than cloning a `Vec<Column>` and rebuilding
341    /// the metadata for every row. Every row in a result set has identical
342    /// columns, so the decode path builds the metadata once per result set and
343    /// hands each row a cheap `Arc` clone of it (#300).
344    pub(crate) fn from_values_shared(metadata: Arc<ColMetaData>, values: Vec<SqlValue>) -> Self {
345        let slices: Arc<[ColumnSlice]> = values
346            .iter()
347            .enumerate()
348            .map(|(i, v)| ColumnSlice::new(i as u32, 0, v.is_null()))
349            .collect::<Vec<_>>()
350            .into();
351
352        Self {
353            buffer: Arc::new(Bytes::new()),
354            slices,
355            metadata,
356            values: Some(values.into()),
357        }
358    }
359
360    // ========================================================================
361    // Zero-Copy Access Methods (ADR-004)
362    // ========================================================================
363
364    /// Returns borrowed slice into buffer (zero additional allocation).
365    ///
366    /// This is the most efficient access method when you need raw bytes.
367    #[must_use]
368    pub fn get_bytes(&self, index: usize) -> Option<&[u8]> {
369        let slice = self.slices.get(index)?;
370        if slice.is_null {
371            return None;
372        }
373
374        let start = slice.offset as usize;
375        let end = start + slice.length as usize;
376
377        if end <= self.buffer.len() {
378            Some(&self.buffer[start..end])
379        } else {
380            None
381        }
382    }
383
384    /// Returns Cow - borrowed if valid UTF-8, owned if conversion needed.
385    ///
386    /// For UTF-8 data, this returns a borrowed reference (zero allocation).
387    /// For VARCHAR data with collation, uses collation-aware decoding.
388    /// For UTF-16 data (NVARCHAR), decodes as UTF-16LE.
389    ///
390    /// # Collation-Aware Decoding
391    ///
392    /// When the `encoding` feature is enabled and the column has collation metadata,
393    /// VARCHAR data is decoded using the appropriate character encoding based on the
394    /// collation's LCID. This correctly handles:
395    ///
396    /// - Japanese (Shift_JIS/CP932)
397    /// - Simplified Chinese (GB18030/CP936)
398    /// - Traditional Chinese (Big5/CP950)
399    /// - Korean (EUC-KR/CP949)
400    /// - Windows code pages 874, 1250-1258
401    /// - SQL Server 2019+ UTF-8 collations
402    #[must_use]
403    pub fn get_str(&self, index: usize) -> Option<Cow<'_, str>> {
404        let bytes = self.get_bytes(index)?;
405
406        // Try to interpret as UTF-8 first (zero allocation for ASCII/UTF-8 data)
407        match std::str::from_utf8(bytes) {
408            Ok(s) => Some(Cow::Borrowed(s)),
409            Err(_) => {
410                // Check if we have collation metadata for this column
411                #[cfg(feature = "encoding")]
412                if let Some(column) = self.metadata.get(index) {
413                    if let Some(ref collation) = column.collation {
414                        // Use collation-aware decoding for VARCHAR/CHAR types
415                        if let Some(encoding) = collation.encoding() {
416                            let (decoded, _, had_errors) = encoding.decode(bytes);
417                            if had_errors {
418                                tracing::warn!(
419                                    column_name = %column.name,
420                                    column_index = index,
421                                    encoding = %encoding.name(),
422                                    lcid = collation.lcid,
423                                    byte_len = bytes.len(),
424                                    "collation-aware decoding had errors, falling back to UTF-16LE"
425                                );
426                            } else {
427                                return Some(Cow::Owned(decoded.into_owned()));
428                            }
429                        } else {
430                            tracing::debug!(
431                                column_name = %column.name,
432                                column_index = index,
433                                lcid = collation.lcid,
434                                "no encoding found for LCID, falling back to UTF-16LE"
435                            );
436                        }
437                    }
438                }
439
440                // Assume UTF-16LE (SQL Server NVARCHAR encoding)
441                // This requires allocation for the conversion
442                let utf16: Vec<u16> = bytes
443                    .chunks_exact(2)
444                    .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
445                    .collect();
446
447                String::from_utf16(&utf16).ok().map(Cow::Owned)
448            }
449        }
450    }
451
452    /// Allocates new String (explicit allocation).
453    ///
454    /// Use this when you need an owned String.
455    #[must_use]
456    pub fn get_string(&self, index: usize) -> Option<String> {
457        self.get_str(index).map(|cow| cow.into_owned())
458    }
459
460    // ========================================================================
461    // Streaming Access (LOB support)
462    // ========================================================================
463
464    /// Get a streaming reader for a binary/text column.
465    ///
466    /// Returns a [`BlobReader`] that implements [`tokio::io::AsyncRead`] for
467    /// streaming access to large binary or text columns. This is useful for:
468    ///
469    /// - Streaming large data to files without fully loading into memory
470    /// - Processing data in chunks with progress tracking
471    /// - Copying data between I/O destinations efficiently
472    ///
473    /// # Supported Column Types
474    ///
475    /// - `VARBINARY`, `VARBINARY(MAX)`
476    /// - `VARCHAR`, `VARCHAR(MAX)`
477    /// - `NVARCHAR`, `NVARCHAR(MAX)`
478    /// - `TEXT`, `NTEXT`, `IMAGE` (legacy types)
479    /// - `XML`
480    ///
481    /// # Example
482    ///
483    /// ```text
484    /// use tokio::io::AsyncWriteExt;
485    ///
486    /// // Stream a large VARBINARY(MAX) column to a file
487    /// let mut reader = row.get_stream(0)?;
488    /// let mut file = tokio::fs::File::create("output.bin").await?;
489    /// tokio::io::copy(&mut reader, &mut file).await?;
490    /// ```
491    ///
492    /// # Returns
493    ///
494    /// - `Some(BlobReader)` if the column contains binary/text data
495    /// - `None` if the column is NULL or the index is out of bounds
496    #[must_use]
497    pub fn get_stream(&self, index: usize) -> Option<BlobReader> {
498        let slice = self.slices.get(index)?;
499        if slice.is_null {
500            return None;
501        }
502
503        let start = slice.offset as usize;
504        let end = start + slice.length as usize;
505
506        if end <= self.buffer.len() {
507            // Use zero-copy slicing from Arc<Bytes>
508            let data = self.buffer.slice(start..end);
509            Some(BlobReader::from_bytes(data))
510        } else {
511            None
512        }
513    }
514
515    /// Get a streaming reader for a binary/text column by name.
516    ///
517    /// See [`get_stream`](Self::get_stream) for details.
518    ///
519    /// # Example
520    ///
521    /// ```text
522    /// let mut reader = row.get_stream_by_name("document_content")?;
523    /// // Process the blob stream...
524    /// ```
525    #[must_use]
526    pub fn get_stream_by_name(&self, name: &str) -> Option<BlobReader> {
527        let index = self.metadata.find_by_name(name)?;
528        self.get_stream(index)
529    }
530
531    // ========================================================================
532    // Type-Converting Access (FromSql trait)
533    // ========================================================================
534
535    /// Get a value by column index with type conversion.
536    ///
537    /// Uses the `FromSql` trait to convert the raw value to the requested type.
538    pub fn get<T: FromSql>(&self, index: usize) -> Result<T, TypeError> {
539        // If we have cached values, use them
540        if let Some(ref values) = self.values {
541            return values
542                .get(index)
543                .ok_or_else(|| TypeError::TypeMismatch {
544                    expected: "valid column index",
545                    actual: format!("index {index} out of bounds"),
546                })
547                .and_then(T::from_sql);
548        }
549
550        // Otherwise, parse on demand from the buffer
551        let slice = self
552            .slices
553            .get(index)
554            .ok_or_else(|| TypeError::TypeMismatch {
555                expected: "valid column index",
556                actual: format!("index {index} out of bounds"),
557            })?;
558
559        if slice.is_null {
560            return Err(TypeError::UnexpectedNull);
561        }
562
563        // Parse via SqlValue then convert to target type
564        // Note: parse_value uses zero-copy buffer slicing (Arc<Bytes>::slice)
565        let value = self.parse_value(index, slice)?;
566        T::from_sql(&value)
567    }
568
569    /// Get a value by column name with type conversion.
570    pub fn get_by_name<T: FromSql>(&self, name: &str) -> Result<T, TypeError> {
571        let index = self
572            .metadata
573            .find_by_name(name)
574            .ok_or_else(|| TypeError::TypeMismatch {
575                expected: "valid column name",
576                actual: format!("column '{name}' not found"),
577            })?;
578
579        self.get(index)
580    }
581
582    /// Try to get a value by column index.
583    ///
584    /// Returns `Ok(None)` when the column is NULL or the index is out of
585    /// bounds. Decode and conversion failures are errors — they were
586    /// previously swallowed as `None`, which made a type mismatch on a
587    /// nullable column silently read as NULL (issue #157).
588    ///
589    /// # Errors
590    ///
591    /// Returns [`TypeError`] if the column value cannot be decoded or
592    /// converted to `T`.
593    pub fn try_get<T: FromSql>(&self, index: usize) -> Result<Option<T>, TypeError> {
594        // If we have cached values, use them
595        if let Some(ref values) = self.values {
596            return match values.get(index) {
597                Some(v) => T::from_sql_nullable(v),
598                None => Ok(None),
599            };
600        }
601
602        // Otherwise check the slice
603        let Some(slice) = self.slices.get(index) else {
604            return Ok(None);
605        };
606        if slice.is_null {
607            return Ok(None);
608        }
609
610        self.get(index).map(Some)
611    }
612
613    /// Try to get a value by column name.
614    ///
615    /// Returns `Ok(None)` when the column is NULL or no column with this
616    /// name exists. Decode and conversion failures are errors — see
617    /// [`try_get`](Self::try_get).
618    ///
619    /// # Errors
620    ///
621    /// Returns [`TypeError`] if the column value cannot be decoded or
622    /// converted to `T`.
623    pub fn try_get_by_name<T: FromSql>(&self, name: &str) -> Result<Option<T>, TypeError> {
624        match self.metadata.find_by_name(name) {
625            Some(index) => self.try_get(index),
626            None => Ok(None),
627        }
628    }
629
630    // ========================================================================
631    // Raw Value Access (backward compatibility)
632    // ========================================================================
633
634    /// Get the raw SQL value by index.
635    ///
636    /// Note: This may allocate if values haven't been cached.
637    #[must_use]
638    pub fn get_raw(&self, index: usize) -> Option<SqlValue> {
639        if let Some(ref values) = self.values {
640            return values.get(index).cloned();
641        }
642
643        let slice = self.slices.get(index)?;
644        self.parse_value(index, slice).ok()
645    }
646
647    /// Get the raw SQL value by column name.
648    #[must_use]
649    pub fn get_raw_by_name(&self, name: &str) -> Option<SqlValue> {
650        let index = self.metadata.find_by_name(name)?;
651        self.get_raw(index)
652    }
653
654    // ========================================================================
655    // Metadata Access
656    // ========================================================================
657
658    /// Get the number of columns in the row.
659    #[must_use]
660    pub fn len(&self) -> usize {
661        self.slices.len()
662    }
663
664    /// Check if the row is empty.
665    #[must_use]
666    pub fn is_empty(&self) -> bool {
667        self.slices.is_empty()
668    }
669
670    /// Get the column metadata.
671    #[must_use]
672    pub fn columns(&self) -> &[Column] {
673        &self.metadata.columns
674    }
675
676    /// Get the shared column metadata.
677    #[must_use]
678    pub fn metadata(&self) -> &Arc<ColMetaData> {
679        &self.metadata
680    }
681
682    /// Check if a column value is NULL.
683    #[must_use]
684    pub fn is_null(&self, index: usize) -> bool {
685        self.slices.get(index).map(|s| s.is_null).unwrap_or(true)
686    }
687
688    /// Check if a column value is NULL by name.
689    #[must_use]
690    pub fn is_null_by_name(&self, name: &str) -> bool {
691        self.metadata
692            .find_by_name(name)
693            .map(|i| self.is_null(i))
694            .unwrap_or(true)
695    }
696
697    // ========================================================================
698    // Internal Helpers
699    // ========================================================================
700
701    /// Parse a value from the buffer at the given slice.
702    ///
703    /// Uses the mssql-types decode module for efficient binary parsing.
704    /// Optimized to use zero-copy buffer slicing via Arc<Bytes>.
705    fn parse_value(&self, index: usize, slice: &ColumnSlice) -> Result<SqlValue, TypeError> {
706        if slice.is_null {
707            return Ok(SqlValue::Null);
708        }
709
710        let column = self
711            .metadata
712            .get(index)
713            .ok_or_else(|| TypeError::TypeMismatch {
714                expected: "valid column metadata",
715                actual: format!("no metadata for column {index}"),
716            })?;
717
718        // Calculate byte range for this column
719        let start = slice.offset as usize;
720        let end = start + slice.length as usize;
721
722        // Validate range
723        if end > self.buffer.len() {
724            return Err(TypeError::TypeMismatch {
725                expected: "valid byte range",
726                actual: format!(
727                    "range {}..{} exceeds buffer length {}",
728                    start,
729                    end,
730                    self.buffer.len()
731                ),
732            });
733        }
734
735        // Convert column metadata to TypeInfo for the decode module
736        let type_info = column.to_type_info();
737
738        // Use zero-copy slice of the buffer instead of allocating
739        // This avoids the overhead of Bytes::copy_from_slice
740        let mut buf = self.buffer.slice(start..end);
741
742        // Use the unified decode module for efficient parsing
743        decode_value(&mut buf, &type_info)
744    }
745}
746
747impl std::fmt::Debug for Row {
748    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
749        f.debug_struct("Row")
750            .field("columns", &self.metadata.columns.len())
751            .field("buffer_size", &self.buffer.len())
752            .field("has_cached_values", &self.values.is_some())
753            .finish()
754    }
755}
756
757/// Iterator over row values as SqlValue.
758pub struct RowIter<'a> {
759    row: &'a Row,
760    index: usize,
761}
762
763impl Iterator for RowIter<'_> {
764    type Item = SqlValue;
765
766    fn next(&mut self) -> Option<Self::Item> {
767        if self.index >= self.row.len() {
768            return None;
769        }
770        let value = self.row.get_raw(self.index);
771        self.index += 1;
772        value
773    }
774
775    fn size_hint(&self) -> (usize, Option<usize>) {
776        let remaining = self.row.len() - self.index;
777        (remaining, Some(remaining))
778    }
779}
780
781impl<'a> IntoIterator for &'a Row {
782    type Item = SqlValue;
783    type IntoIter = RowIter<'a>;
784
785    fn into_iter(self) -> Self::IntoIter {
786        RowIter {
787            row: self,
788            index: 0,
789        }
790    }
791}
792
793#[cfg(test)]
794#[allow(clippy::unwrap_used, clippy::expect_used)]
795mod tests {
796    use super::*;
797
798    #[test]
799    fn test_column_slice_null() {
800        let slice = ColumnSlice::null();
801        assert!(slice.is_null);
802        assert_eq!(slice.offset, 0);
803        assert_eq!(slice.length, 0);
804    }
805
806    #[test]
807    fn test_column_metadata() {
808        let col = Column::new("id", 0, "INT")
809            .with_nullable(false)
810            .with_precision_scale(10, 0);
811
812        assert_eq!(col.name, "id");
813        assert_eq!(col.index, 0);
814        assert!(!col.nullable);
815        assert_eq!(col.precision, Some(10));
816    }
817
818    #[test]
819    fn test_col_metadata_find_by_name() {
820        let meta = ColMetaData::new(vec![
821            Column::new("id", 0, "INT"),
822            Column::new("Name", 1, "NVARCHAR"),
823        ]);
824
825        assert_eq!(meta.find_by_name("id"), Some(0));
826        assert_eq!(meta.find_by_name("ID"), Some(0)); // case-insensitive
827        assert_eq!(meta.find_by_name("name"), Some(1));
828        assert_eq!(meta.find_by_name("unknown"), None);
829    }
830
831    #[test]
832    fn test_row_from_values_backward_compat() {
833        let columns = vec![
834            Column::new("id", 0, "INT"),
835            Column::new("name", 1, "NVARCHAR"),
836        ];
837        let values = vec![SqlValue::Int(42), SqlValue::String("Alice".to_string())];
838
839        let row = Row::from_values(columns, values);
840
841        assert_eq!(row.len(), 2);
842        assert_eq!(row.get::<i32>(0).unwrap(), 42);
843        assert_eq!(row.get_by_name::<String>("name").unwrap(), "Alice");
844    }
845
846    #[test]
847    fn test_row_is_null() {
848        let columns = vec![
849            Column::new("id", 0, "INT"),
850            Column::new("nullable_col", 1, "NVARCHAR"),
851        ];
852        let values = vec![SqlValue::Int(1), SqlValue::Null];
853
854        let row = Row::from_values(columns, values);
855
856        assert!(!row.is_null(0));
857        assert!(row.is_null(1));
858        assert!(row.is_null(99)); // Out of bounds returns true
859    }
860
861    #[test]
862    fn test_row_get_bytes_with_buffer() {
863        let buffer = Arc::new(Bytes::from_static(b"Hello World"));
864        let slices: Arc<[ColumnSlice]> = vec![
865            ColumnSlice::new(0, 5, false), // "Hello"
866            ColumnSlice::new(6, 5, false), // "World"
867        ]
868        .into();
869        let meta = Arc::new(ColMetaData::new(vec![
870            Column::new("greeting", 0, "VARCHAR"),
871            Column::new("subject", 1, "VARCHAR"),
872        ]));
873
874        let row = Row::new(buffer, slices, meta);
875
876        assert_eq!(row.get_bytes(0), Some(b"Hello".as_slice()));
877        assert_eq!(row.get_bytes(1), Some(b"World".as_slice()));
878    }
879
880    #[test]
881    fn test_row_get_str() {
882        let buffer = Arc::new(Bytes::from_static(b"Test"));
883        let slices: Arc<[ColumnSlice]> = vec![ColumnSlice::new(0, 4, false)].into();
884        let meta = Arc::new(ColMetaData::new(vec![Column::new("val", 0, "VARCHAR")]));
885
886        let row = Row::new(buffer, slices, meta);
887
888        let s = row.get_str(0).unwrap();
889        assert_eq!(s, "Test");
890        // Should be borrowed for valid UTF-8
891        assert!(matches!(s, Cow::Borrowed(_)));
892    }
893
894    #[test]
895    fn test_row_metadata_access() {
896        let columns = vec![Column::new("col1", 0, "INT")];
897        let row = Row::from_values(columns, vec![SqlValue::Int(1)]);
898
899        assert_eq!(row.columns().len(), 1);
900        assert_eq!(row.columns()[0].name, "col1");
901        assert_eq!(row.metadata().len(), 1);
902    }
903
904    /// Issue #157 regression: `try_get` must distinguish SQL NULL (Ok(None))
905    /// from a decode/conversion failure (Err). Previously both collapsed to
906    /// `None`, so a type mismatch on a nullable column silently read as NULL.
907    #[test]
908    fn test_try_get_distinguishes_null_from_conversion_error() {
909        let columns = vec![Column::new("a", 0, "NVARCHAR"), Column::new("b", 1, "INT")];
910        let row = Row::from_values(
911            columns,
912            vec![SqlValue::String("not a number".into()), SqlValue::Null],
913        );
914
915        // NULL → Ok(None)
916        let b: Option<i32> = row.try_get(1).expect("NULL must be Ok(None)");
917        assert!(b.is_none());
918
919        // Missing column/index → Ok(None) (lenient lookup is unchanged)
920        let missing: Option<i32> = row.try_get(9).expect("missing index must be Ok(None)");
921        assert!(missing.is_none());
922        let missing: Option<i32> = row
923            .try_get_by_name("no_such_column")
924            .expect("missing name must be Ok(None)");
925        assert!(missing.is_none());
926
927        // Conversion failure → Err, NOT Ok(None)
928        assert!(row.try_get::<i32>(0).is_err());
929        assert!(row.try_get_by_name::<i32>("a").is_err());
930
931        // The successful typed read still works
932        let a: Option<String> = row.try_get(0).expect("string read must succeed");
933        assert_eq!(a.as_deref(), Some("not a number"));
934    }
935
936    #[test]
937    fn test_row_get_stream() {
938        let buffer = Arc::new(Bytes::from_static(b"Hello, World!"));
939        let slices: Arc<[ColumnSlice]> = vec![
940            ColumnSlice::new(0, 5, false), // "Hello"
941            ColumnSlice::new(7, 5, false), // "World"
942            ColumnSlice::null(),           // NULL column
943        ]
944        .into();
945        let meta = Arc::new(ColMetaData::new(vec![
946            Column::new("greeting", 0, "VARBINARY"),
947            Column::new("subject", 1, "VARBINARY"),
948            Column::new("nullable", 2, "VARBINARY"),
949        ]));
950
951        let row = Row::new(buffer, slices, meta);
952
953        // Get stream for first column
954        let reader = row.get_stream(0).unwrap();
955        assert_eq!(reader.len(), Some(5));
956        assert_eq!(reader.as_bytes().as_ref(), b"Hello");
957
958        // Get stream for second column
959        let reader = row.get_stream(1).unwrap();
960        assert_eq!(reader.len(), Some(5));
961        assert_eq!(reader.as_bytes().as_ref(), b"World");
962
963        // NULL column returns None
964        assert!(row.get_stream(2).is_none());
965
966        // Out of bounds returns None
967        assert!(row.get_stream(99).is_none());
968    }
969
970    #[test]
971    fn test_row_get_stream_by_name() {
972        let buffer = Arc::new(Bytes::from_static(b"Binary data here"));
973        let slices: Arc<[ColumnSlice]> = vec![ColumnSlice::new(0, 11, false)].into();
974        let meta = Arc::new(ColMetaData::new(vec![Column::new(
975            "document",
976            0,
977            "VARBINARY",
978        )]));
979
980        let row = Row::new(buffer, slices, meta);
981
982        // Get by name (case-insensitive)
983        let reader = row.get_stream_by_name("document").unwrap();
984        assert_eq!(reader.len(), Some(11));
985
986        let reader = row.get_stream_by_name("DOCUMENT").unwrap();
987        assert_eq!(reader.len(), Some(11));
988
989        // Unknown column returns None
990        assert!(row.get_stream_by_name("unknown").is_none());
991    }
992}