Skip to main content

mssql_client/
row.rs

1//! Row representation for query results.
2//!
3//! This module implements the `Arc<Bytes>` pattern from ADR-004 for reduced-copy
4//! row data access. The `Row` struct holds a shared reference to the raw packet
5//! buffer, deferring allocation until explicitly requested.
6//!
7//! ## Access Patterns (per ADR-004)
8//!
9//! - `get_bytes()` - Returns borrowed slice into buffer (zero additional allocation)
10//! - `get_str()` - Returns Cow - borrowed if valid UTF-8, owned if conversion needed
11//! - `get_string()` - Allocates new String (explicit allocation)
12//! - `get<T>()` - Type-converting accessor with allocation only if needed
13
14use std::borrow::Cow;
15use std::sync::Arc;
16
17use bytes::Bytes;
18
19use mssql_types::decode::{TypeInfo, decode_value};
20use mssql_types::{FromSql, SqlValue, TypeError};
21
22use crate::blob::BlobReader;
23
24/// Column slice information pointing into the row buffer.
25///
26/// This is the internal representation that enables zero-copy access
27/// to column data within the shared buffer.
28#[derive(Debug, Clone, Copy)]
29#[non_exhaustive]
30pub struct ColumnSlice {
31    /// Offset into the buffer where this column's data begins.
32    pub offset: u32,
33    /// Length of the column data in bytes.
34    pub length: u32,
35    /// Whether this column value is NULL.
36    pub is_null: bool,
37}
38
39impl ColumnSlice {
40    /// Create a new column slice.
41    pub fn new(offset: u32, length: u32, is_null: bool) -> Self {
42        Self {
43            offset,
44            length,
45            is_null,
46        }
47    }
48
49    /// Create a NULL column slice.
50    pub fn null() -> Self {
51        Self {
52            offset: 0,
53            length: 0,
54            is_null: true,
55        }
56    }
57}
58
59/// Column metadata describing a result set column.
60///
61/// This struct is marked `#[non_exhaustive]` to allow adding new fields
62/// in future versions without breaking semver compatibility. Use
63/// [`Column::new()`] or builder methods to construct instances.
64#[derive(Debug, Clone)]
65#[non_exhaustive]
66pub struct Column {
67    /// Column name.
68    pub name: String,
69    /// Column index (0-based).
70    pub index: usize,
71    /// SQL type name (e.g., "INT", "NVARCHAR").
72    pub type_name: String,
73    /// Whether the column allows NULL values.
74    pub nullable: bool,
75    /// Maximum length for variable-length types.
76    pub max_length: Option<u32>,
77    /// Precision for numeric types.
78    pub precision: Option<u8>,
79    /// Scale for numeric types.
80    pub scale: Option<u8>,
81    /// Collation for string types (VARCHAR, CHAR, TEXT).
82    ///
83    /// Used for proper encoding/decoding of non-Unicode string data.
84    /// When present, enables collation-aware decoding that correctly
85    /// handles locale-specific ANSI encodings (e.g., Shift_JIS, GB18030).
86    pub collation: Option<tds_protocol::Collation>,
87}
88
89impl Column {
90    /// Create a new column with basic metadata.
91    pub fn new(name: impl Into<String>, index: usize, type_name: impl Into<String>) -> Self {
92        Self {
93            name: name.into(),
94            index,
95            type_name: type_name.into(),
96            nullable: true,
97            max_length: None,
98            precision: None,
99            scale: None,
100            collation: None,
101        }
102    }
103
104    /// Set whether the column is nullable.
105    #[must_use]
106    pub fn with_nullable(mut self, nullable: bool) -> Self {
107        self.nullable = nullable;
108        self
109    }
110
111    /// Set the maximum length.
112    #[must_use]
113    pub fn with_max_length(mut self, max_length: u32) -> Self {
114        self.max_length = Some(max_length);
115        self
116    }
117
118    /// Set precision and scale for numeric types.
119    #[must_use]
120    pub fn with_precision_scale(mut self, precision: u8, scale: u8) -> Self {
121        self.precision = Some(precision);
122        self.scale = Some(scale);
123        self
124    }
125
126    /// Set the collation for string types.
127    ///
128    /// Used for proper encoding/decoding of non-Unicode string data (VARCHAR, CHAR, TEXT).
129    #[must_use]
130    pub fn with_collation(mut self, collation: tds_protocol::Collation) -> Self {
131        self.collation = Some(collation);
132        self
133    }
134
135    /// Get the encoding name for this column's collation.
136    ///
137    /// Returns the name of the character encoding used for this column's data,
138    /// or "unknown" if the collation is not set or the encoding feature is disabled.
139    ///
140    /// # Examples
141    ///
142    /// - `"Shift_JIS"` - Japanese encoding (LCID 0x0411)
143    /// - `"GB18030"` - Simplified Chinese (LCID 0x0804)
144    /// - `"UTF-8"` - SQL Server 2019+ UTF-8 collation
145    /// - `"windows-1252"` - Latin/Western European (LCID 0x0409)
146    /// - `"unknown"` - No collation or unsupported encoding
147    #[must_use]
148    pub fn encoding_name(&self) -> &'static str {
149        #[cfg(feature = "encoding")]
150        if let Some(ref collation) = self.collation {
151            return collation.encoding_name();
152        }
153        "unknown"
154    }
155
156    /// Check if this column uses UTF-8 encoding.
157    ///
158    /// Returns `true` if the column has a SQL Server 2019+ UTF-8 collation,
159    /// which is indicated by fUTF8 (bit 26, 0x0400_0000) being set in the
160    /// collation info field.
161    #[must_use]
162    pub fn is_utf8_collation(&self) -> bool {
163        #[cfg(feature = "encoding")]
164        if let Some(ref collation) = self.collation {
165            return collation.is_utf8();
166        }
167        false
168    }
169
170    /// Convert column metadata to TDS TypeInfo for decoding.
171    ///
172    /// Maps type names to TDS type IDs and constructs appropriate TypeInfo.
173    pub fn to_type_info(&self) -> TypeInfo {
174        let type_id = type_name_to_id(&self.type_name);
175        TypeInfo {
176            type_id,
177            length: self.max_length,
178            scale: self.scale,
179            precision: self.precision,
180            collation: self.collation.map(|c| mssql_types::decode::Collation {
181                lcid: c.lcid,
182                flags: c.sort_id,
183            }),
184        }
185    }
186}
187
188/// Map SQL type name to TDS type ID.
189fn type_name_to_id(name: &str) -> u8 {
190    match name.to_uppercase().as_str() {
191        // Integer types
192        "INT" | "INTEGER" => 0x38,
193        "BIGINT" => 0x7F,
194        "SMALLINT" => 0x34,
195        "TINYINT" => 0x30,
196        "BIT" => 0x32,
197
198        // Floating point
199        "FLOAT" => 0x3E,
200        "REAL" => 0x3B,
201
202        // Decimal/Numeric
203        "DECIMAL" | "NUMERIC" => 0x6C,
204        "MONEY" | "SMALLMONEY" => 0x6E,
205
206        // String types
207        "NVARCHAR" | "NCHAR" | "NTEXT" => 0xE7,
208        "VARCHAR" | "CHAR" | "TEXT" => 0xA7,
209
210        // Binary types
211        "VARBINARY" | "BINARY" | "IMAGE" => 0xA5,
212
213        // Date/Time types
214        "DATE" => 0x28,
215        "TIME" => 0x29,
216        "DATETIME2" => 0x2A,
217        "DATETIMEOFFSET" => 0x2B,
218        "DATETIME" => 0x3D,
219        "SMALLDATETIME" => 0x3F,
220
221        // GUID
222        "UNIQUEIDENTIFIER" => 0x24,
223
224        // XML
225        "XML" => 0xF1,
226
227        // Nullable variants (INTNTYPE, etc.)
228        _ if name.ends_with("N") => 0x26,
229
230        // Default to binary for unknown types
231        _ => 0xA5,
232    }
233}
234
235/// Shared column metadata for a result set.
236///
237/// This is shared across all rows in the result set to avoid
238/// duplicating metadata per row.
239#[derive(Debug, Clone)]
240pub struct ColMetaData {
241    /// Column definitions.
242    pub columns: Arc<[Column]>,
243}
244
245impl ColMetaData {
246    /// Create new column metadata from a list of columns.
247    pub fn new(columns: Vec<Column>) -> Self {
248        Self {
249            columns: columns.into(),
250        }
251    }
252
253    /// Get the number of columns.
254    #[must_use]
255    pub fn len(&self) -> usize {
256        self.columns.len()
257    }
258
259    /// Check if there are no columns.
260    #[must_use]
261    pub fn is_empty(&self) -> bool {
262        self.columns.is_empty()
263    }
264
265    /// Get a column by index.
266    #[must_use]
267    pub fn get(&self, index: usize) -> Option<&Column> {
268        self.columns.get(index)
269    }
270
271    /// Find a column index by name (case-insensitive).
272    #[must_use]
273    pub fn find_by_name(&self, name: &str) -> Option<usize> {
274        self.columns
275            .iter()
276            .position(|c| c.name.eq_ignore_ascii_case(name))
277    }
278}
279
280/// A row from a query result.
281///
282/// Implements the `Arc<Bytes>` pattern from ADR-004 for reduced memory allocation.
283/// The row holds a shared reference to the raw packet buffer and column slice
284/// information, deferring parsing and allocation until values are accessed.
285///
286/// # Memory Model
287///
288/// ```text
289/// Row {
290///     buffer: Arc<Bytes> ──────────► [raw packet data...]
291///     slices: Arc<[ColumnSlice]> ──► [{offset, length, is_null}, ...]
292///     metadata: Arc<ColMetaData> ──► [Column definitions...]
293/// }
294/// ```
295///
296/// Multiple `Row` instances from the same result set share the `metadata`.
297/// The `buffer` and `slices` are unique per row but use `Arc` for cheap cloning.
298///
299/// # Access Patterns
300///
301/// - **Zero-copy:** `get_bytes()`, `get_str()` (when UTF-8 valid)
302/// - **Allocating:** `get_string()`, `get::<String>()`
303/// - **Type-converting:** `get::<T>()` uses `FromSql` trait
304#[derive(Clone)]
305pub struct Row {
306    /// Shared reference to raw packet body containing row data.
307    buffer: Arc<Bytes>,
308    /// Column offsets into buffer.
309    slices: Arc<[ColumnSlice]>,
310    /// Column metadata (shared across result set).
311    metadata: Arc<ColMetaData>,
312    /// Cached parsed values (lazily populated).
313    /// This maintains backward compatibility with code expecting SqlValue access.
314    values: Option<Arc<[SqlValue]>>,
315}
316
317impl Row {
318    /// Create a new row with the `Arc<Bytes>` pattern.
319    ///
320    /// This is the primary constructor for the reduced-copy pattern.
321    pub fn new(buffer: Arc<Bytes>, slices: Arc<[ColumnSlice]>, metadata: Arc<ColMetaData>) -> Self {
322        Self {
323            buffer,
324            slices,
325            metadata,
326            values: None,
327        }
328    }
329
330    /// Create a row from pre-parsed values (backward compatibility).
331    ///
332    /// This constructor supports existing code that works with `SqlValue` directly.
333    /// It's less efficient than the buffer-based approach but maintains compatibility.
334    pub fn from_values(columns: Vec<Column>, values: Vec<SqlValue>) -> Self {
335        let metadata = Arc::new(ColMetaData::new(columns));
336        let slices: Arc<[ColumnSlice]> = values
337            .iter()
338            .enumerate()
339            .map(|(i, v)| ColumnSlice::new(i as u32, 0, v.is_null()))
340            .collect::<Vec<_>>()
341            .into();
342
343        Self {
344            buffer: Arc::new(Bytes::new()),
345            slices,
346            metadata,
347            values: Some(values.into()),
348        }
349    }
350
351    // ========================================================================
352    // Zero-Copy Access Methods (ADR-004)
353    // ========================================================================
354
355    /// Returns borrowed slice into buffer (zero additional allocation).
356    ///
357    /// This is the most efficient access method when you need raw bytes.
358    #[must_use]
359    pub fn get_bytes(&self, index: usize) -> Option<&[u8]> {
360        let slice = self.slices.get(index)?;
361        if slice.is_null {
362            return None;
363        }
364
365        let start = slice.offset as usize;
366        let end = start + slice.length as usize;
367
368        if end <= self.buffer.len() {
369            Some(&self.buffer[start..end])
370        } else {
371            None
372        }
373    }
374
375    /// Returns Cow - borrowed if valid UTF-8, owned if conversion needed.
376    ///
377    /// For UTF-8 data, this returns a borrowed reference (zero allocation).
378    /// For VARCHAR data with collation, uses collation-aware decoding.
379    /// For UTF-16 data (NVARCHAR), decodes as UTF-16LE.
380    ///
381    /// # Collation-Aware Decoding
382    ///
383    /// When the `encoding` feature is enabled and the column has collation metadata,
384    /// VARCHAR data is decoded using the appropriate character encoding based on the
385    /// collation's LCID. This correctly handles:
386    ///
387    /// - Japanese (Shift_JIS/CP932)
388    /// - Simplified Chinese (GB18030/CP936)
389    /// - Traditional Chinese (Big5/CP950)
390    /// - Korean (EUC-KR/CP949)
391    /// - Windows code pages 874, 1250-1258
392    /// - SQL Server 2019+ UTF-8 collations
393    #[must_use]
394    pub fn get_str(&self, index: usize) -> Option<Cow<'_, str>> {
395        let bytes = self.get_bytes(index)?;
396
397        // Try to interpret as UTF-8 first (zero allocation for ASCII/UTF-8 data)
398        match std::str::from_utf8(bytes) {
399            Ok(s) => Some(Cow::Borrowed(s)),
400            Err(_) => {
401                // Check if we have collation metadata for this column
402                #[cfg(feature = "encoding")]
403                if let Some(column) = self.metadata.get(index) {
404                    if let Some(ref collation) = column.collation {
405                        // Use collation-aware decoding for VARCHAR/CHAR types
406                        if let Some(encoding) = collation.encoding() {
407                            let (decoded, _, had_errors) = encoding.decode(bytes);
408                            if had_errors {
409                                tracing::warn!(
410                                    column_name = %column.name,
411                                    column_index = index,
412                                    encoding = %encoding.name(),
413                                    lcid = collation.lcid,
414                                    byte_len = bytes.len(),
415                                    "collation-aware decoding had errors, falling back to UTF-16LE"
416                                );
417                            } else {
418                                return Some(Cow::Owned(decoded.into_owned()));
419                            }
420                        } else {
421                            tracing::debug!(
422                                column_name = %column.name,
423                                column_index = index,
424                                lcid = collation.lcid,
425                                "no encoding found for LCID, falling back to UTF-16LE"
426                            );
427                        }
428                    }
429                }
430
431                // Assume UTF-16LE (SQL Server NVARCHAR encoding)
432                // This requires allocation for the conversion
433                let utf16: Vec<u16> = bytes
434                    .chunks_exact(2)
435                    .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
436                    .collect();
437
438                String::from_utf16(&utf16).ok().map(Cow::Owned)
439            }
440        }
441    }
442
443    /// Allocates new String (explicit allocation).
444    ///
445    /// Use this when you need an owned String.
446    #[must_use]
447    pub fn get_string(&self, index: usize) -> Option<String> {
448        self.get_str(index).map(|cow| cow.into_owned())
449    }
450
451    // ========================================================================
452    // Streaming Access (LOB support)
453    // ========================================================================
454
455    /// Get a streaming reader for a binary/text column.
456    ///
457    /// Returns a [`BlobReader`] that implements [`tokio::io::AsyncRead`] for
458    /// streaming access to large binary or text columns. This is useful for:
459    ///
460    /// - Streaming large data to files without fully loading into memory
461    /// - Processing data in chunks with progress tracking
462    /// - Copying data between I/O destinations efficiently
463    ///
464    /// # Supported Column Types
465    ///
466    /// - `VARBINARY`, `VARBINARY(MAX)`
467    /// - `VARCHAR`, `VARCHAR(MAX)`
468    /// - `NVARCHAR`, `NVARCHAR(MAX)`
469    /// - `TEXT`, `NTEXT`, `IMAGE` (legacy types)
470    /// - `XML`
471    ///
472    /// # Example
473    ///
474    /// ```text
475    /// use tokio::io::AsyncWriteExt;
476    ///
477    /// // Stream a large VARBINARY(MAX) column to a file
478    /// let mut reader = row.get_stream(0)?;
479    /// let mut file = tokio::fs::File::create("output.bin").await?;
480    /// tokio::io::copy(&mut reader, &mut file).await?;
481    /// ```
482    ///
483    /// # Returns
484    ///
485    /// - `Some(BlobReader)` if the column contains binary/text data
486    /// - `None` if the column is NULL or the index is out of bounds
487    #[must_use]
488    pub fn get_stream(&self, index: usize) -> Option<BlobReader> {
489        let slice = self.slices.get(index)?;
490        if slice.is_null {
491            return None;
492        }
493
494        let start = slice.offset as usize;
495        let end = start + slice.length as usize;
496
497        if end <= self.buffer.len() {
498            // Use zero-copy slicing from Arc<Bytes>
499            let data = self.buffer.slice(start..end);
500            Some(BlobReader::from_bytes(data))
501        } else {
502            None
503        }
504    }
505
506    /// Get a streaming reader for a binary/text column by name.
507    ///
508    /// See [`get_stream`](Self::get_stream) for details.
509    ///
510    /// # Example
511    ///
512    /// ```text
513    /// let mut reader = row.get_stream_by_name("document_content")?;
514    /// // Process the blob stream...
515    /// ```
516    #[must_use]
517    pub fn get_stream_by_name(&self, name: &str) -> Option<BlobReader> {
518        let index = self.metadata.find_by_name(name)?;
519        self.get_stream(index)
520    }
521
522    // ========================================================================
523    // Type-Converting Access (FromSql trait)
524    // ========================================================================
525
526    /// Get a value by column index with type conversion.
527    ///
528    /// Uses the `FromSql` trait to convert the raw value to the requested type.
529    pub fn get<T: FromSql>(&self, index: usize) -> Result<T, TypeError> {
530        // If we have cached values, use them
531        if let Some(ref values) = self.values {
532            return values
533                .get(index)
534                .ok_or_else(|| TypeError::TypeMismatch {
535                    expected: "valid column index",
536                    actual: format!("index {index} out of bounds"),
537                })
538                .and_then(T::from_sql);
539        }
540
541        // Otherwise, parse on demand from the buffer
542        let slice = self
543            .slices
544            .get(index)
545            .ok_or_else(|| TypeError::TypeMismatch {
546                expected: "valid column index",
547                actual: format!("index {index} out of bounds"),
548            })?;
549
550        if slice.is_null {
551            return Err(TypeError::UnexpectedNull);
552        }
553
554        // Parse via SqlValue then convert to target type
555        // Note: parse_value uses zero-copy buffer slicing (Arc<Bytes>::slice)
556        let value = self.parse_value(index, slice)?;
557        T::from_sql(&value)
558    }
559
560    /// Get a value by column name with type conversion.
561    pub fn get_by_name<T: FromSql>(&self, name: &str) -> Result<T, TypeError> {
562        let index = self
563            .metadata
564            .find_by_name(name)
565            .ok_or_else(|| TypeError::TypeMismatch {
566                expected: "valid column name",
567                actual: format!("column '{name}' not found"),
568            })?;
569
570        self.get(index)
571    }
572
573    /// Try to get a value by column index.
574    ///
575    /// Returns `Ok(None)` when the column is NULL or the index is out of
576    /// bounds. Decode and conversion failures are errors — they were
577    /// previously swallowed as `None`, which made a type mismatch on a
578    /// nullable column silently read as NULL (issue #157).
579    ///
580    /// # Errors
581    ///
582    /// Returns [`TypeError`] if the column value cannot be decoded or
583    /// converted to `T`.
584    pub fn try_get<T: FromSql>(&self, index: usize) -> Result<Option<T>, TypeError> {
585        // If we have cached values, use them
586        if let Some(ref values) = self.values {
587            return match values.get(index) {
588                Some(v) => T::from_sql_nullable(v),
589                None => Ok(None),
590            };
591        }
592
593        // Otherwise check the slice
594        let Some(slice) = self.slices.get(index) else {
595            return Ok(None);
596        };
597        if slice.is_null {
598            return Ok(None);
599        }
600
601        self.get(index).map(Some)
602    }
603
604    /// Try to get a value by column name.
605    ///
606    /// Returns `Ok(None)` when the column is NULL or no column with this
607    /// name exists. Decode and conversion failures are errors — see
608    /// [`try_get`](Self::try_get).
609    ///
610    /// # Errors
611    ///
612    /// Returns [`TypeError`] if the column value cannot be decoded or
613    /// converted to `T`.
614    pub fn try_get_by_name<T: FromSql>(&self, name: &str) -> Result<Option<T>, TypeError> {
615        match self.metadata.find_by_name(name) {
616            Some(index) => self.try_get(index),
617            None => Ok(None),
618        }
619    }
620
621    // ========================================================================
622    // Raw Value Access (backward compatibility)
623    // ========================================================================
624
625    /// Get the raw SQL value by index.
626    ///
627    /// Note: This may allocate if values haven't been cached.
628    #[must_use]
629    pub fn get_raw(&self, index: usize) -> Option<SqlValue> {
630        if let Some(ref values) = self.values {
631            return values.get(index).cloned();
632        }
633
634        let slice = self.slices.get(index)?;
635        self.parse_value(index, slice).ok()
636    }
637
638    /// Get the raw SQL value by column name.
639    #[must_use]
640    pub fn get_raw_by_name(&self, name: &str) -> Option<SqlValue> {
641        let index = self.metadata.find_by_name(name)?;
642        self.get_raw(index)
643    }
644
645    // ========================================================================
646    // Metadata Access
647    // ========================================================================
648
649    /// Get the number of columns in the row.
650    #[must_use]
651    pub fn len(&self) -> usize {
652        self.slices.len()
653    }
654
655    /// Check if the row is empty.
656    #[must_use]
657    pub fn is_empty(&self) -> bool {
658        self.slices.is_empty()
659    }
660
661    /// Get the column metadata.
662    #[must_use]
663    pub fn columns(&self) -> &[Column] {
664        &self.metadata.columns
665    }
666
667    /// Get the shared column metadata.
668    #[must_use]
669    pub fn metadata(&self) -> &Arc<ColMetaData> {
670        &self.metadata
671    }
672
673    /// Check if a column value is NULL.
674    #[must_use]
675    pub fn is_null(&self, index: usize) -> bool {
676        self.slices.get(index).map(|s| s.is_null).unwrap_or(true)
677    }
678
679    /// Check if a column value is NULL by name.
680    #[must_use]
681    pub fn is_null_by_name(&self, name: &str) -> bool {
682        self.metadata
683            .find_by_name(name)
684            .map(|i| self.is_null(i))
685            .unwrap_or(true)
686    }
687
688    // ========================================================================
689    // Internal Helpers
690    // ========================================================================
691
692    /// Parse a value from the buffer at the given slice.
693    ///
694    /// Uses the mssql-types decode module for efficient binary parsing.
695    /// Optimized to use zero-copy buffer slicing via Arc<Bytes>.
696    fn parse_value(&self, index: usize, slice: &ColumnSlice) -> Result<SqlValue, TypeError> {
697        if slice.is_null {
698            return Ok(SqlValue::Null);
699        }
700
701        let column = self
702            .metadata
703            .get(index)
704            .ok_or_else(|| TypeError::TypeMismatch {
705                expected: "valid column metadata",
706                actual: format!("no metadata for column {index}"),
707            })?;
708
709        // Calculate byte range for this column
710        let start = slice.offset as usize;
711        let end = start + slice.length as usize;
712
713        // Validate range
714        if end > self.buffer.len() {
715            return Err(TypeError::TypeMismatch {
716                expected: "valid byte range",
717                actual: format!(
718                    "range {}..{} exceeds buffer length {}",
719                    start,
720                    end,
721                    self.buffer.len()
722                ),
723            });
724        }
725
726        // Convert column metadata to TypeInfo for the decode module
727        let type_info = column.to_type_info();
728
729        // Use zero-copy slice of the buffer instead of allocating
730        // This avoids the overhead of Bytes::copy_from_slice
731        let mut buf = self.buffer.slice(start..end);
732
733        // Use the unified decode module for efficient parsing
734        decode_value(&mut buf, &type_info)
735    }
736}
737
738impl std::fmt::Debug for Row {
739    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
740        f.debug_struct("Row")
741            .field("columns", &self.metadata.columns.len())
742            .field("buffer_size", &self.buffer.len())
743            .field("has_cached_values", &self.values.is_some())
744            .finish()
745    }
746}
747
748/// Iterator over row values as SqlValue.
749pub struct RowIter<'a> {
750    row: &'a Row,
751    index: usize,
752}
753
754impl Iterator for RowIter<'_> {
755    type Item = SqlValue;
756
757    fn next(&mut self) -> Option<Self::Item> {
758        if self.index >= self.row.len() {
759            return None;
760        }
761        let value = self.row.get_raw(self.index);
762        self.index += 1;
763        value
764    }
765
766    fn size_hint(&self) -> (usize, Option<usize>) {
767        let remaining = self.row.len() - self.index;
768        (remaining, Some(remaining))
769    }
770}
771
772impl<'a> IntoIterator for &'a Row {
773    type Item = SqlValue;
774    type IntoIter = RowIter<'a>;
775
776    fn into_iter(self) -> Self::IntoIter {
777        RowIter {
778            row: self,
779            index: 0,
780        }
781    }
782}
783
784#[cfg(test)]
785#[allow(clippy::unwrap_used, clippy::expect_used)]
786mod tests {
787    use super::*;
788
789    #[test]
790    fn test_column_slice_null() {
791        let slice = ColumnSlice::null();
792        assert!(slice.is_null);
793        assert_eq!(slice.offset, 0);
794        assert_eq!(slice.length, 0);
795    }
796
797    #[test]
798    fn test_column_metadata() {
799        let col = Column::new("id", 0, "INT")
800            .with_nullable(false)
801            .with_precision_scale(10, 0);
802
803        assert_eq!(col.name, "id");
804        assert_eq!(col.index, 0);
805        assert!(!col.nullable);
806        assert_eq!(col.precision, Some(10));
807    }
808
809    #[test]
810    fn test_col_metadata_find_by_name() {
811        let meta = ColMetaData::new(vec![
812            Column::new("id", 0, "INT"),
813            Column::new("Name", 1, "NVARCHAR"),
814        ]);
815
816        assert_eq!(meta.find_by_name("id"), Some(0));
817        assert_eq!(meta.find_by_name("ID"), Some(0)); // case-insensitive
818        assert_eq!(meta.find_by_name("name"), Some(1));
819        assert_eq!(meta.find_by_name("unknown"), None);
820    }
821
822    #[test]
823    fn test_row_from_values_backward_compat() {
824        let columns = vec![
825            Column::new("id", 0, "INT"),
826            Column::new("name", 1, "NVARCHAR"),
827        ];
828        let values = vec![SqlValue::Int(42), SqlValue::String("Alice".to_string())];
829
830        let row = Row::from_values(columns, values);
831
832        assert_eq!(row.len(), 2);
833        assert_eq!(row.get::<i32>(0).unwrap(), 42);
834        assert_eq!(row.get_by_name::<String>("name").unwrap(), "Alice");
835    }
836
837    #[test]
838    fn test_row_is_null() {
839        let columns = vec![
840            Column::new("id", 0, "INT"),
841            Column::new("nullable_col", 1, "NVARCHAR"),
842        ];
843        let values = vec![SqlValue::Int(1), SqlValue::Null];
844
845        let row = Row::from_values(columns, values);
846
847        assert!(!row.is_null(0));
848        assert!(row.is_null(1));
849        assert!(row.is_null(99)); // Out of bounds returns true
850    }
851
852    #[test]
853    fn test_row_get_bytes_with_buffer() {
854        let buffer = Arc::new(Bytes::from_static(b"Hello World"));
855        let slices: Arc<[ColumnSlice]> = vec![
856            ColumnSlice::new(0, 5, false), // "Hello"
857            ColumnSlice::new(6, 5, false), // "World"
858        ]
859        .into();
860        let meta = Arc::new(ColMetaData::new(vec![
861            Column::new("greeting", 0, "VARCHAR"),
862            Column::new("subject", 1, "VARCHAR"),
863        ]));
864
865        let row = Row::new(buffer, slices, meta);
866
867        assert_eq!(row.get_bytes(0), Some(b"Hello".as_slice()));
868        assert_eq!(row.get_bytes(1), Some(b"World".as_slice()));
869    }
870
871    #[test]
872    fn test_row_get_str() {
873        let buffer = Arc::new(Bytes::from_static(b"Test"));
874        let slices: Arc<[ColumnSlice]> = vec![ColumnSlice::new(0, 4, false)].into();
875        let meta = Arc::new(ColMetaData::new(vec![Column::new("val", 0, "VARCHAR")]));
876
877        let row = Row::new(buffer, slices, meta);
878
879        let s = row.get_str(0).unwrap();
880        assert_eq!(s, "Test");
881        // Should be borrowed for valid UTF-8
882        assert!(matches!(s, Cow::Borrowed(_)));
883    }
884
885    #[test]
886    fn test_row_metadata_access() {
887        let columns = vec![Column::new("col1", 0, "INT")];
888        let row = Row::from_values(columns, vec![SqlValue::Int(1)]);
889
890        assert_eq!(row.columns().len(), 1);
891        assert_eq!(row.columns()[0].name, "col1");
892        assert_eq!(row.metadata().len(), 1);
893    }
894
895    /// Issue #157 regression: `try_get` must distinguish SQL NULL (Ok(None))
896    /// from a decode/conversion failure (Err). Previously both collapsed to
897    /// `None`, so a type mismatch on a nullable column silently read as NULL.
898    #[test]
899    fn test_try_get_distinguishes_null_from_conversion_error() {
900        let columns = vec![Column::new("a", 0, "NVARCHAR"), Column::new("b", 1, "INT")];
901        let row = Row::from_values(
902            columns,
903            vec![SqlValue::String("not a number".into()), SqlValue::Null],
904        );
905
906        // NULL → Ok(None)
907        let b: Option<i32> = row.try_get(1).expect("NULL must be Ok(None)");
908        assert!(b.is_none());
909
910        // Missing column/index → Ok(None) (lenient lookup is unchanged)
911        let missing: Option<i32> = row.try_get(9).expect("missing index must be Ok(None)");
912        assert!(missing.is_none());
913        let missing: Option<i32> = row
914            .try_get_by_name("no_such_column")
915            .expect("missing name must be Ok(None)");
916        assert!(missing.is_none());
917
918        // Conversion failure → Err, NOT Ok(None)
919        assert!(row.try_get::<i32>(0).is_err());
920        assert!(row.try_get_by_name::<i32>("a").is_err());
921
922        // The successful typed read still works
923        let a: Option<String> = row.try_get(0).expect("string read must succeed");
924        assert_eq!(a.as_deref(), Some("not a number"));
925    }
926
927    #[test]
928    fn test_row_get_stream() {
929        let buffer = Arc::new(Bytes::from_static(b"Hello, World!"));
930        let slices: Arc<[ColumnSlice]> = vec![
931            ColumnSlice::new(0, 5, false), // "Hello"
932            ColumnSlice::new(7, 5, false), // "World"
933            ColumnSlice::null(),           // NULL column
934        ]
935        .into();
936        let meta = Arc::new(ColMetaData::new(vec![
937            Column::new("greeting", 0, "VARBINARY"),
938            Column::new("subject", 1, "VARBINARY"),
939            Column::new("nullable", 2, "VARBINARY"),
940        ]));
941
942        let row = Row::new(buffer, slices, meta);
943
944        // Get stream for first column
945        let reader = row.get_stream(0).unwrap();
946        assert_eq!(reader.len(), Some(5));
947        assert_eq!(reader.as_bytes().as_ref(), b"Hello");
948
949        // Get stream for second column
950        let reader = row.get_stream(1).unwrap();
951        assert_eq!(reader.len(), Some(5));
952        assert_eq!(reader.as_bytes().as_ref(), b"World");
953
954        // NULL column returns None
955        assert!(row.get_stream(2).is_none());
956
957        // Out of bounds returns None
958        assert!(row.get_stream(99).is_none());
959    }
960
961    #[test]
962    fn test_row_get_stream_by_name() {
963        let buffer = Arc::new(Bytes::from_static(b"Binary data here"));
964        let slices: Arc<[ColumnSlice]> = vec![ColumnSlice::new(0, 11, false)].into();
965        let meta = Arc::new(ColMetaData::new(vec![Column::new(
966            "document",
967            0,
968            "VARBINARY",
969        )]));
970
971        let row = Row::new(buffer, slices, meta);
972
973        // Get by name (case-insensitive)
974        let reader = row.get_stream_by_name("document").unwrap();
975        assert_eq!(reader.len(), Some(11));
976
977        let reader = row.get_stream_by_name("DOCUMENT").unwrap();
978        assert_eq!(reader.len(), Some(11));
979
980        // Unknown column returns None
981        assert!(row.get_stream_by_name("unknown").is_none());
982    }
983}