mssql_client/
row.rs

1//! Row representation for query results.
2//!
3//! This module implements the `Arc<Bytes>` pattern from ADR-004 for reduced-copy
4//! row data access. The `Row` struct holds a shared reference to the raw packet
5//! buffer, deferring allocation until explicitly requested.
6//!
7//! ## Access Patterns (per ADR-004)
8//!
9//! - `get_bytes()` - Returns borrowed slice into buffer (zero additional allocation)
10//! - `get_str()` - Returns Cow - borrowed if valid UTF-8, owned if conversion needed
11//! - `get_string()` - Allocates new String (explicit allocation)
12//! - `get<T>()` - Type-converting accessor with allocation only if needed
13
14use std::borrow::Cow;
15use std::sync::Arc;
16
17use bytes::Bytes;
18
19use mssql_types::decode::{TypeInfo, decode_value};
20use mssql_types::{FromSql, SqlValue, TypeError};
21
22use crate::blob::BlobReader;
23
24/// Column slice information pointing into the row buffer.
25///
26/// This is the internal representation that enables zero-copy access
27/// to column data within the shared buffer.
28#[derive(Debug, Clone, Copy)]
29pub struct ColumnSlice {
30    /// Offset into the buffer where this column's data begins.
31    pub offset: u32,
32    /// Length of the column data in bytes.
33    pub length: u32,
34    /// Whether this column value is NULL.
35    pub is_null: bool,
36}
37
38impl ColumnSlice {
39    /// Create a new column slice.
40    pub fn new(offset: u32, length: u32, is_null: bool) -> Self {
41        Self {
42            offset,
43            length,
44            is_null,
45        }
46    }
47
48    /// Create a NULL column slice.
49    pub fn null() -> Self {
50        Self {
51            offset: 0,
52            length: 0,
53            is_null: true,
54        }
55    }
56}
57
58/// Column metadata describing a result set column.
59#[derive(Debug, Clone)]
60pub struct Column {
61    /// Column name.
62    pub name: String,
63    /// Column index (0-based).
64    pub index: usize,
65    /// SQL type name (e.g., "INT", "NVARCHAR").
66    pub type_name: String,
67    /// Whether the column allows NULL values.
68    pub nullable: bool,
69    /// Maximum length for variable-length types.
70    pub max_length: Option<u32>,
71    /// Precision for numeric types.
72    pub precision: Option<u8>,
73    /// Scale for numeric types.
74    pub scale: Option<u8>,
75}
76
77impl Column {
78    /// Create a new column with basic metadata.
79    pub fn new(name: impl Into<String>, index: usize, type_name: impl Into<String>) -> Self {
80        Self {
81            name: name.into(),
82            index,
83            type_name: type_name.into(),
84            nullable: true,
85            max_length: None,
86            precision: None,
87            scale: None,
88        }
89    }
90
91    /// Set whether the column is nullable.
92    #[must_use]
93    pub fn with_nullable(mut self, nullable: bool) -> Self {
94        self.nullable = nullable;
95        self
96    }
97
98    /// Set the maximum length.
99    #[must_use]
100    pub fn with_max_length(mut self, max_length: u32) -> Self {
101        self.max_length = Some(max_length);
102        self
103    }
104
105    /// Set precision and scale for numeric types.
106    #[must_use]
107    pub fn with_precision_scale(mut self, precision: u8, scale: u8) -> Self {
108        self.precision = Some(precision);
109        self.scale = Some(scale);
110        self
111    }
112
113    /// Convert column metadata to TDS TypeInfo for decoding.
114    ///
115    /// Maps type names to TDS type IDs and constructs appropriate TypeInfo.
116    pub fn to_type_info(&self) -> TypeInfo {
117        let type_id = type_name_to_id(&self.type_name);
118        TypeInfo {
119            type_id,
120            length: self.max_length,
121            scale: self.scale,
122            precision: self.precision,
123            collation: None,
124        }
125    }
126}
127
128/// Map SQL type name to TDS type ID.
129fn type_name_to_id(name: &str) -> u8 {
130    match name.to_uppercase().as_str() {
131        // Integer types
132        "INT" | "INTEGER" => 0x38,
133        "BIGINT" => 0x7F,
134        "SMALLINT" => 0x34,
135        "TINYINT" => 0x30,
136        "BIT" => 0x32,
137
138        // Floating point
139        "FLOAT" => 0x3E,
140        "REAL" => 0x3B,
141
142        // Decimal/Numeric
143        "DECIMAL" | "NUMERIC" => 0x6C,
144        "MONEY" | "SMALLMONEY" => 0x6E,
145
146        // String types
147        "NVARCHAR" | "NCHAR" | "NTEXT" => 0xE7,
148        "VARCHAR" | "CHAR" | "TEXT" => 0xA7,
149
150        // Binary types
151        "VARBINARY" | "BINARY" | "IMAGE" => 0xA5,
152
153        // Date/Time types
154        "DATE" => 0x28,
155        "TIME" => 0x29,
156        "DATETIME2" => 0x2A,
157        "DATETIMEOFFSET" => 0x2B,
158        "DATETIME" => 0x3D,
159        "SMALLDATETIME" => 0x3F,
160
161        // GUID
162        "UNIQUEIDENTIFIER" => 0x24,
163
164        // XML
165        "XML" => 0xF1,
166
167        // Nullable variants (INTNTYPE, etc.)
168        _ if name.ends_with("N") => 0x26,
169
170        // Default to binary for unknown types
171        _ => 0xA5,
172    }
173}
174
175/// Shared column metadata for a result set.
176///
177/// This is shared across all rows in the result set to avoid
178/// duplicating metadata per row.
179#[derive(Debug, Clone)]
180pub struct ColMetaData {
181    /// Column definitions.
182    pub columns: Arc<[Column]>,
183}
184
185impl ColMetaData {
186    /// Create new column metadata from a list of columns.
187    pub fn new(columns: Vec<Column>) -> Self {
188        Self {
189            columns: columns.into(),
190        }
191    }
192
193    /// Get the number of columns.
194    #[must_use]
195    pub fn len(&self) -> usize {
196        self.columns.len()
197    }
198
199    /// Check if there are no columns.
200    #[must_use]
201    pub fn is_empty(&self) -> bool {
202        self.columns.is_empty()
203    }
204
205    /// Get a column by index.
206    #[must_use]
207    pub fn get(&self, index: usize) -> Option<&Column> {
208        self.columns.get(index)
209    }
210
211    /// Find a column index by name (case-insensitive).
212    #[must_use]
213    pub fn find_by_name(&self, name: &str) -> Option<usize> {
214        self.columns
215            .iter()
216            .position(|c| c.name.eq_ignore_ascii_case(name))
217    }
218}
219
220/// A row from a query result.
221///
222/// Implements the `Arc<Bytes>` pattern from ADR-004 for reduced memory allocation.
223/// The row holds a shared reference to the raw packet buffer and column slice
224/// information, deferring parsing and allocation until values are accessed.
225///
226/// # Memory Model
227///
228/// ```text
229/// Row {
230///     buffer: Arc<Bytes> ──────────► [raw packet data...]
231///     slices: Arc<[ColumnSlice]> ──► [{offset, length, is_null}, ...]
232///     metadata: Arc<ColMetaData> ──► [Column definitions...]
233/// }
234/// ```
235///
236/// Multiple `Row` instances from the same result set share the `metadata`.
237/// The `buffer` and `slices` are unique per row but use `Arc` for cheap cloning.
238///
239/// # Access Patterns
240///
241/// - **Zero-copy:** `get_bytes()`, `get_str()` (when UTF-8 valid)
242/// - **Allocating:** `get_string()`, `get::<String>()`
243/// - **Type-converting:** `get::<T>()` uses `FromSql` trait
244#[derive(Clone)]
245pub struct Row {
246    /// Shared reference to raw packet body containing row data.
247    buffer: Arc<Bytes>,
248    /// Column offsets into buffer.
249    slices: Arc<[ColumnSlice]>,
250    /// Column metadata (shared across result set).
251    metadata: Arc<ColMetaData>,
252    /// Cached parsed values (lazily populated).
253    /// This maintains backward compatibility with code expecting SqlValue access.
254    values: Option<Arc<[SqlValue]>>,
255}
256
257impl Row {
258    /// Create a new row with the `Arc<Bytes>` pattern.
259    ///
260    /// This is the primary constructor for the reduced-copy pattern.
261    pub fn new(buffer: Arc<Bytes>, slices: Arc<[ColumnSlice]>, metadata: Arc<ColMetaData>) -> Self {
262        Self {
263            buffer,
264            slices,
265            metadata,
266            values: None,
267        }
268    }
269
270    /// Create a row from pre-parsed values (backward compatibility).
271    ///
272    /// This constructor supports existing code that works with `SqlValue` directly.
273    /// It's less efficient than the buffer-based approach but maintains compatibility.
274    #[allow(dead_code)]
275    pub(crate) fn from_values(columns: Vec<Column>, values: Vec<SqlValue>) -> Self {
276        let metadata = Arc::new(ColMetaData::new(columns));
277        let slices: Arc<[ColumnSlice]> = values
278            .iter()
279            .enumerate()
280            .map(|(i, v)| ColumnSlice::new(i as u32, 0, v.is_null()))
281            .collect::<Vec<_>>()
282            .into();
283
284        Self {
285            buffer: Arc::new(Bytes::new()),
286            slices,
287            metadata,
288            values: Some(values.into()),
289        }
290    }
291
292    // ========================================================================
293    // Zero-Copy Access Methods (ADR-004)
294    // ========================================================================
295
296    /// Returns borrowed slice into buffer (zero additional allocation).
297    ///
298    /// This is the most efficient access method when you need raw bytes.
299    #[must_use]
300    pub fn get_bytes(&self, index: usize) -> Option<&[u8]> {
301        let slice = self.slices.get(index)?;
302        if slice.is_null {
303            return None;
304        }
305
306        let start = slice.offset as usize;
307        let end = start + slice.length as usize;
308
309        if end <= self.buffer.len() {
310            Some(&self.buffer[start..end])
311        } else {
312            None
313        }
314    }
315
316    /// Returns Cow - borrowed if valid UTF-8, owned if conversion needed.
317    ///
318    /// For UTF-8 data, this returns a borrowed reference (zero allocation).
319    /// For UTF-16 data (NVARCHAR), this allocates a new String.
320    #[must_use]
321    pub fn get_str(&self, index: usize) -> Option<Cow<'_, str>> {
322        let bytes = self.get_bytes(index)?;
323
324        // Try to interpret as UTF-8 first (zero allocation for ASCII/UTF-8 data)
325        match std::str::from_utf8(bytes) {
326            Ok(s) => Some(Cow::Borrowed(s)),
327            Err(_) => {
328                // Assume UTF-16LE (SQL Server NVARCHAR encoding)
329                // This requires allocation for the conversion
330                let utf16: Vec<u16> = bytes
331                    .chunks_exact(2)
332                    .map(|chunk| u16::from_le_bytes([chunk[0], chunk[1]]))
333                    .collect();
334
335                String::from_utf16(&utf16).ok().map(Cow::Owned)
336            }
337        }
338    }
339
340    /// Allocates new String (explicit allocation).
341    ///
342    /// Use this when you need an owned String.
343    #[must_use]
344    pub fn get_string(&self, index: usize) -> Option<String> {
345        self.get_str(index).map(|cow| cow.into_owned())
346    }
347
348    // ========================================================================
349    // Streaming Access (LOB support)
350    // ========================================================================
351
352    /// Get a streaming reader for a binary/text column.
353    ///
354    /// Returns a [`BlobReader`] that implements [`tokio::io::AsyncRead`] for
355    /// streaming access to large binary or text columns. This is useful for:
356    ///
357    /// - Streaming large data to files without fully loading into memory
358    /// - Processing data in chunks with progress tracking
359    /// - Copying data between I/O destinations efficiently
360    ///
361    /// # Supported Column Types
362    ///
363    /// - `VARBINARY`, `VARBINARY(MAX)`
364    /// - `VARCHAR`, `VARCHAR(MAX)`
365    /// - `NVARCHAR`, `NVARCHAR(MAX)`
366    /// - `TEXT`, `NTEXT`, `IMAGE` (legacy types)
367    /// - `XML`
368    ///
369    /// # Example
370    ///
371    /// ```rust,ignore
372    /// use tokio::io::AsyncWriteExt;
373    ///
374    /// // Stream a large VARBINARY(MAX) column to a file
375    /// let mut reader = row.get_stream(0)?;
376    /// let mut file = tokio::fs::File::create("output.bin").await?;
377    /// tokio::io::copy(&mut reader, &mut file).await?;
378    /// ```
379    ///
380    /// # Returns
381    ///
382    /// - `Some(BlobReader)` if the column contains binary/text data
383    /// - `None` if the column is NULL or the index is out of bounds
384    #[must_use]
385    pub fn get_stream(&self, index: usize) -> Option<BlobReader> {
386        let slice = self.slices.get(index)?;
387        if slice.is_null {
388            return None;
389        }
390
391        let start = slice.offset as usize;
392        let end = start + slice.length as usize;
393
394        if end <= self.buffer.len() {
395            // Use zero-copy slicing from Arc<Bytes>
396            let data = self.buffer.slice(start..end);
397            Some(BlobReader::from_bytes(data))
398        } else {
399            None
400        }
401    }
402
403    /// Get a streaming reader for a binary/text column by name.
404    ///
405    /// See [`get_stream`](Self::get_stream) for details.
406    ///
407    /// # Example
408    ///
409    /// ```rust,ignore
410    /// let mut reader = row.get_stream_by_name("document_content")?;
411    /// // Process the blob stream...
412    /// ```
413    #[must_use]
414    pub fn get_stream_by_name(&self, name: &str) -> Option<BlobReader> {
415        let index = self.metadata.find_by_name(name)?;
416        self.get_stream(index)
417    }
418
419    // ========================================================================
420    // Type-Converting Access (FromSql trait)
421    // ========================================================================
422
423    /// Get a value by column index with type conversion.
424    ///
425    /// Uses the `FromSql` trait to convert the raw value to the requested type.
426    pub fn get<T: FromSql>(&self, index: usize) -> Result<T, TypeError> {
427        // If we have cached values, use them
428        if let Some(ref values) = self.values {
429            return values
430                .get(index)
431                .ok_or_else(|| TypeError::TypeMismatch {
432                    expected: "valid column index",
433                    actual: format!("index {index} out of bounds"),
434                })
435                .and_then(T::from_sql);
436        }
437
438        // Otherwise, parse on demand from the buffer
439        let slice = self
440            .slices
441            .get(index)
442            .ok_or_else(|| TypeError::TypeMismatch {
443                expected: "valid column index",
444                actual: format!("index {index} out of bounds"),
445            })?;
446
447        if slice.is_null {
448            return Err(TypeError::UnexpectedNull);
449        }
450
451        // Parse via SqlValue then convert to target type
452        // Note: parse_value uses zero-copy buffer slicing (Arc<Bytes>::slice)
453        let value = self.parse_value(index, slice)?;
454        T::from_sql(&value)
455    }
456
457    /// Get a value by column name with type conversion.
458    pub fn get_by_name<T: FromSql>(&self, name: &str) -> Result<T, TypeError> {
459        let index = self
460            .metadata
461            .find_by_name(name)
462            .ok_or_else(|| TypeError::TypeMismatch {
463                expected: "valid column name",
464                actual: format!("column '{name}' not found"),
465            })?;
466
467        self.get(index)
468    }
469
470    /// Try to get a value by column index, returning None if NULL or not found.
471    pub fn try_get<T: FromSql>(&self, index: usize) -> Option<T> {
472        // If we have cached values, use them
473        if let Some(ref values) = self.values {
474            return values
475                .get(index)
476                .and_then(|v| T::from_sql_nullable(v).ok().flatten());
477        }
478
479        // Otherwise check the slice
480        let slice = self.slices.get(index)?;
481        if slice.is_null {
482            return None;
483        }
484
485        self.get(index).ok()
486    }
487
488    /// Try to get a value by column name, returning None if NULL or not found.
489    pub fn try_get_by_name<T: FromSql>(&self, name: &str) -> Option<T> {
490        let index = self.metadata.find_by_name(name)?;
491        self.try_get(index)
492    }
493
494    // ========================================================================
495    // Raw Value Access (backward compatibility)
496    // ========================================================================
497
498    /// Get the raw SQL value by index.
499    ///
500    /// Note: This may allocate if values haven't been cached.
501    #[must_use]
502    pub fn get_raw(&self, index: usize) -> Option<SqlValue> {
503        if let Some(ref values) = self.values {
504            return values.get(index).cloned();
505        }
506
507        let slice = self.slices.get(index)?;
508        self.parse_value(index, slice).ok()
509    }
510
511    /// Get the raw SQL value by column name.
512    #[must_use]
513    pub fn get_raw_by_name(&self, name: &str) -> Option<SqlValue> {
514        let index = self.metadata.find_by_name(name)?;
515        self.get_raw(index)
516    }
517
518    // ========================================================================
519    // Metadata Access
520    // ========================================================================
521
522    /// Get the number of columns in the row.
523    #[must_use]
524    pub fn len(&self) -> usize {
525        self.slices.len()
526    }
527
528    /// Check if the row is empty.
529    #[must_use]
530    pub fn is_empty(&self) -> bool {
531        self.slices.is_empty()
532    }
533
534    /// Get the column metadata.
535    #[must_use]
536    pub fn columns(&self) -> &[Column] {
537        &self.metadata.columns
538    }
539
540    /// Get the shared column metadata.
541    #[must_use]
542    pub fn metadata(&self) -> &Arc<ColMetaData> {
543        &self.metadata
544    }
545
546    /// Check if a column value is NULL.
547    #[must_use]
548    pub fn is_null(&self, index: usize) -> bool {
549        self.slices.get(index).map(|s| s.is_null).unwrap_or(true)
550    }
551
552    /// Check if a column value is NULL by name.
553    #[must_use]
554    pub fn is_null_by_name(&self, name: &str) -> bool {
555        self.metadata
556            .find_by_name(name)
557            .map(|i| self.is_null(i))
558            .unwrap_or(true)
559    }
560
561    // ========================================================================
562    // Internal Helpers
563    // ========================================================================
564
565    /// Parse a value from the buffer at the given slice.
566    ///
567    /// Uses the mssql-types decode module for efficient binary parsing.
568    /// Optimized to use zero-copy buffer slicing via Arc<Bytes>.
569    fn parse_value(&self, index: usize, slice: &ColumnSlice) -> Result<SqlValue, TypeError> {
570        if slice.is_null {
571            return Ok(SqlValue::Null);
572        }
573
574        let column = self
575            .metadata
576            .get(index)
577            .ok_or_else(|| TypeError::TypeMismatch {
578                expected: "valid column metadata",
579                actual: format!("no metadata for column {index}"),
580            })?;
581
582        // Calculate byte range for this column
583        let start = slice.offset as usize;
584        let end = start + slice.length as usize;
585
586        // Validate range
587        if end > self.buffer.len() {
588            return Err(TypeError::TypeMismatch {
589                expected: "valid byte range",
590                actual: format!(
591                    "range {}..{} exceeds buffer length {}",
592                    start,
593                    end,
594                    self.buffer.len()
595                ),
596            });
597        }
598
599        // Convert column metadata to TypeInfo for the decode module
600        let type_info = column.to_type_info();
601
602        // Use zero-copy slice of the buffer instead of allocating
603        // This avoids the overhead of Bytes::copy_from_slice
604        let mut buf = self.buffer.slice(start..end);
605
606        // Use the unified decode module for efficient parsing
607        decode_value(&mut buf, &type_info)
608    }
609}
610
611impl std::fmt::Debug for Row {
612    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
613        f.debug_struct("Row")
614            .field("columns", &self.metadata.columns.len())
615            .field("buffer_size", &self.buffer.len())
616            .field("has_cached_values", &self.values.is_some())
617            .finish()
618    }
619}
620
621/// Iterator over row values as SqlValue.
622pub struct RowIter<'a> {
623    row: &'a Row,
624    index: usize,
625}
626
627impl Iterator for RowIter<'_> {
628    type Item = SqlValue;
629
630    fn next(&mut self) -> Option<Self::Item> {
631        if self.index >= self.row.len() {
632            return None;
633        }
634        let value = self.row.get_raw(self.index);
635        self.index += 1;
636        value
637    }
638
639    fn size_hint(&self) -> (usize, Option<usize>) {
640        let remaining = self.row.len() - self.index;
641        (remaining, Some(remaining))
642    }
643}
644
645impl<'a> IntoIterator for &'a Row {
646    type Item = SqlValue;
647    type IntoIter = RowIter<'a>;
648
649    fn into_iter(self) -> Self::IntoIter {
650        RowIter {
651            row: self,
652            index: 0,
653        }
654    }
655}
656
657#[cfg(test)]
658#[allow(clippy::unwrap_used)]
659mod tests {
660    use super::*;
661
662    #[test]
663    fn test_column_slice_null() {
664        let slice = ColumnSlice::null();
665        assert!(slice.is_null);
666        assert_eq!(slice.offset, 0);
667        assert_eq!(slice.length, 0);
668    }
669
670    #[test]
671    fn test_column_metadata() {
672        let col = Column::new("id", 0, "INT")
673            .with_nullable(false)
674            .with_precision_scale(10, 0);
675
676        assert_eq!(col.name, "id");
677        assert_eq!(col.index, 0);
678        assert!(!col.nullable);
679        assert_eq!(col.precision, Some(10));
680    }
681
682    #[test]
683    fn test_col_metadata_find_by_name() {
684        let meta = ColMetaData::new(vec![
685            Column::new("id", 0, "INT"),
686            Column::new("Name", 1, "NVARCHAR"),
687        ]);
688
689        assert_eq!(meta.find_by_name("id"), Some(0));
690        assert_eq!(meta.find_by_name("ID"), Some(0)); // case-insensitive
691        assert_eq!(meta.find_by_name("name"), Some(1));
692        assert_eq!(meta.find_by_name("unknown"), None);
693    }
694
695    #[test]
696    fn test_row_from_values_backward_compat() {
697        let columns = vec![
698            Column::new("id", 0, "INT"),
699            Column::new("name", 1, "NVARCHAR"),
700        ];
701        let values = vec![SqlValue::Int(42), SqlValue::String("Alice".to_string())];
702
703        let row = Row::from_values(columns, values);
704
705        assert_eq!(row.len(), 2);
706        assert_eq!(row.get::<i32>(0).unwrap(), 42);
707        assert_eq!(row.get_by_name::<String>("name").unwrap(), "Alice");
708    }
709
710    #[test]
711    fn test_row_is_null() {
712        let columns = vec![
713            Column::new("id", 0, "INT"),
714            Column::new("nullable_col", 1, "NVARCHAR"),
715        ];
716        let values = vec![SqlValue::Int(1), SqlValue::Null];
717
718        let row = Row::from_values(columns, values);
719
720        assert!(!row.is_null(0));
721        assert!(row.is_null(1));
722        assert!(row.is_null(99)); // Out of bounds returns true
723    }
724
725    #[test]
726    fn test_row_get_bytes_with_buffer() {
727        let buffer = Arc::new(Bytes::from_static(b"Hello World"));
728        let slices: Arc<[ColumnSlice]> = vec![
729            ColumnSlice::new(0, 5, false), // "Hello"
730            ColumnSlice::new(6, 5, false), // "World"
731        ]
732        .into();
733        let meta = Arc::new(ColMetaData::new(vec![
734            Column::new("greeting", 0, "VARCHAR"),
735            Column::new("subject", 1, "VARCHAR"),
736        ]));
737
738        let row = Row::new(buffer, slices, meta);
739
740        assert_eq!(row.get_bytes(0), Some(b"Hello".as_slice()));
741        assert_eq!(row.get_bytes(1), Some(b"World".as_slice()));
742    }
743
744    #[test]
745    fn test_row_get_str() {
746        let buffer = Arc::new(Bytes::from_static(b"Test"));
747        let slices: Arc<[ColumnSlice]> = vec![ColumnSlice::new(0, 4, false)].into();
748        let meta = Arc::new(ColMetaData::new(vec![Column::new("val", 0, "VARCHAR")]));
749
750        let row = Row::new(buffer, slices, meta);
751
752        let s = row.get_str(0).unwrap();
753        assert_eq!(s, "Test");
754        // Should be borrowed for valid UTF-8
755        assert!(matches!(s, Cow::Borrowed(_)));
756    }
757
758    #[test]
759    fn test_row_metadata_access() {
760        let columns = vec![Column::new("col1", 0, "INT")];
761        let row = Row::from_values(columns, vec![SqlValue::Int(1)]);
762
763        assert_eq!(row.columns().len(), 1);
764        assert_eq!(row.columns()[0].name, "col1");
765        assert_eq!(row.metadata().len(), 1);
766    }
767
768    #[test]
769    fn test_row_get_stream() {
770        let buffer = Arc::new(Bytes::from_static(b"Hello, World!"));
771        let slices: Arc<[ColumnSlice]> = vec![
772            ColumnSlice::new(0, 5, false), // "Hello"
773            ColumnSlice::new(7, 5, false), // "World"
774            ColumnSlice::null(),           // NULL column
775        ]
776        .into();
777        let meta = Arc::new(ColMetaData::new(vec![
778            Column::new("greeting", 0, "VARBINARY"),
779            Column::new("subject", 1, "VARBINARY"),
780            Column::new("nullable", 2, "VARBINARY"),
781        ]));
782
783        let row = Row::new(buffer, slices, meta);
784
785        // Get stream for first column
786        let reader = row.get_stream(0).unwrap();
787        assert_eq!(reader.len(), Some(5));
788        assert_eq!(reader.as_bytes().as_ref(), b"Hello");
789
790        // Get stream for second column
791        let reader = row.get_stream(1).unwrap();
792        assert_eq!(reader.len(), Some(5));
793        assert_eq!(reader.as_bytes().as_ref(), b"World");
794
795        // NULL column returns None
796        assert!(row.get_stream(2).is_none());
797
798        // Out of bounds returns None
799        assert!(row.get_stream(99).is_none());
800    }
801
802    #[test]
803    fn test_row_get_stream_by_name() {
804        let buffer = Arc::new(Bytes::from_static(b"Binary data here"));
805        let slices: Arc<[ColumnSlice]> = vec![ColumnSlice::new(0, 11, false)].into();
806        let meta = Arc::new(ColMetaData::new(vec![Column::new(
807            "document",
808            0,
809            "VARBINARY",
810        )]));
811
812        let row = Row::new(buffer, slices, meta);
813
814        // Get by name (case-insensitive)
815        let reader = row.get_stream_by_name("document").unwrap();
816        assert_eq!(reader.len(), Some(11));
817
818        let reader = row.get_stream_by_name("DOCUMENT").unwrap();
819        assert_eq!(reader.len(), Some(11));
820
821        // Unknown column returns None
822        assert!(row.get_stream_by_name("unknown").is_none());
823    }
824}