Skip to main content

mssql_client/
stream.rs

1//! Streaming query result support.
2//!
3//! This module provides streaming result sets for memory-efficient
4//! processing of large query results.
5//!
6//! ## Buffered vs True Streaming
7//!
8//! The underlying TDS response is reassembled into a single [`bytes::Bytes`]
9//! payload by [`mssql-codec`](mssql_codec). That payload is handed to the
10//! token parser which walks it once and enqueues each row's raw byte slice
11//! (a cheap, refcounted slice into the original [`bytes::Bytes`] per ADR-004)
12//! into the stream. Individual [`Row`]s are then decoded lazily when callers
13//! pull them — either via the [`Stream`]/[`Iterator`] impls or
14//! [`QueryStream::collect_all`].
15//!
16//! This "lazy decode" pattern keeps peak memory at roughly the size of the
17//! raw payload instead of payload + fully-typed `Vec<Row>`. Users who iterate
18//! and drop each [`Row`] see memory proportional to a single row at a time
19//! plus the shared raw payload. Users who `collect_all()` pay for the full
20//! `Vec<Row>` just like before.
21//!
22//! The same lazy-decode pattern applies to [`MultiResultStream`],
23//! [`ResultSet`], and [`ProcedureResult::result_sets`]: raw row bytes are
24//! stashed during response read and each [`Row`] is decoded when the caller
25//! pulls it. Because decoding can fail per row, [`ResultSet::next_row`]
26//! returns `Option<Result<Row, Error>>` rather than `Option<Row>` — callers
27//! observe decode errors at iteration time instead of at
28//! `call_procedure().await?` / `query_multiple().await?`.
29//!
30//! For truly large result sets, consider using OFFSET/FETCH pagination.
31
32use std::collections::VecDeque;
33use std::pin::Pin;
34use std::task::{Context, Poll};
35
36use futures_core::Stream;
37use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
38
39use crate::error::Error;
40use crate::row::{Column, Row};
41
42/// A row that may be already decoded or still held as raw TDS bytes.
43///
44/// The lazy-parse query path enqueues raw rows (cheap `Bytes` slices into
45/// the original response payload) and decodes them on demand. The eager
46/// path used by tests and [`MultiResultStream::into_query_streams`] wraps
47/// already-decoded rows.
48#[derive(Debug, Clone)]
49pub(crate) enum PendingRow {
50    /// Already-decoded row (eager path — tests + `MultiResultStream` compat).
51    Parsed(Row),
52    /// Raw TDS row bytes, to be decoded on pull.
53    Raw(RawRow),
54    /// Null-bitmap-compressed row bytes, to be decoded on pull.
55    Nbc(NbcRow),
56}
57
58/// A streaming result set from a query.
59///
60/// This stream yields rows one at a time, allowing processing of
61/// large result sets without loading everything into memory.
62///
63/// # Example
64///
65/// ```rust,ignore
66/// use futures::StreamExt;
67///
68/// let mut stream = client.query("SELECT * FROM large_table", &[]).await?;
69///
70/// while let Some(row) = stream.next().await {
71///     let row = row?;
72///     process_row(&row);
73/// }
74/// ```
75#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
76pub struct QueryStream<'a> {
77    /// Column metadata for the result set.
78    columns: Vec<Column>,
79    /// Buffered rows (typed or raw) from the response.
80    rows: VecDeque<PendingRow>,
81    /// Protocol metadata needed to decode raw rows. `None` if every pending
82    /// row is already [`PendingRow::Parsed`].
83    meta: Option<ColMetaData>,
84    /// Pre-resolved column decryptor for Always Encrypted result sets.
85    ///
86    /// Wrapped in `Arc` so lazy result-set containers (`ResultSet`) can share
87    /// this state without duplicating the derived-key material.
88    #[cfg(feature = "always-encrypted")]
89    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
90    /// Whether the stream has completed.
91    finished: bool,
92    /// Lifetime tied to the connection.
93    _marker: std::marker::PhantomData<&'a ()>,
94}
95
96impl QueryStream<'_> {
97    /// Create a new query stream from already-decoded rows.
98    ///
99    /// This is the eager constructor used by unit tests. Production query
100    /// paths use [`QueryStream::from_raw`] to defer row decoding.
101    /// [`MultiResultStream::into_query_streams`] uses
102    /// [`ResultSet::into_query_stream`] to preserve pending-row state.
103    #[cfg(test)]
104    pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
105        Self {
106            columns,
107            rows: rows.into_iter().map(PendingRow::Parsed).collect(),
108            meta: None,
109            #[cfg(feature = "always-encrypted")]
110            decryptor: None,
111            finished: false,
112            _marker: std::marker::PhantomData,
113        }
114    }
115
116    /// Create a query stream from raw row bytes and protocol metadata.
117    ///
118    /// Rows are decoded on demand as the stream is pulled. The `meta` must
119    /// describe every row in `pending`. If decryption is configured,
120    /// `decryptor` must cover the same column set.
121    pub(crate) fn from_raw(
122        columns: Vec<Column>,
123        pending: Vec<PendingRow>,
124        meta: ColMetaData,
125        #[cfg(feature = "always-encrypted")] decryptor: Option<
126            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
127        >,
128    ) -> Self {
129        Self {
130            columns,
131            rows: pending.into(),
132            meta: Some(meta),
133            #[cfg(feature = "always-encrypted")]
134            decryptor,
135            finished: false,
136            _marker: std::marker::PhantomData,
137        }
138    }
139
140    /// Create an empty query stream (no results).
141    #[allow(dead_code)]
142    pub(crate) fn empty() -> Self {
143        Self {
144            columns: Vec::new(),
145            rows: VecDeque::new(),
146            meta: None,
147            #[cfg(feature = "always-encrypted")]
148            decryptor: None,
149            finished: true,
150            _marker: std::marker::PhantomData,
151        }
152    }
153
154    /// Get the column metadata for this result set.
155    #[must_use]
156    pub fn columns(&self) -> &[Column] {
157        &self.columns
158    }
159
160    /// Check if the stream has finished.
161    #[must_use]
162    pub fn is_finished(&self) -> bool {
163        self.finished
164    }
165
166    /// Get the number of rows remaining in the buffer.
167    #[must_use]
168    pub fn rows_remaining(&self) -> usize {
169        self.rows.len()
170    }
171
172    /// Collect all remaining rows into a vector.
173    ///
174    /// This consumes the stream and loads all rows into memory. Each row is
175    /// decoded lazily here, so large raw payloads are freed as rows are
176    /// produced rather than held alongside the typed `Vec<Row>` throughout
177    /// the caller's query call.
178    ///
179    /// For very large result sets, consider iterating with the stream
180    /// instead.
181    pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
182        let mut out = Vec::with_capacity(self.rows.len());
183        while let Some(pending) = self.rows.pop_front() {
184            out.push(self.decode(pending)?);
185        }
186        self.finished = true;
187        Ok(out)
188    }
189
190    /// Try to get the next row synchronously (without async).
191    ///
192    /// Returns `None` when no more rows are available or the next pending
193    /// row fails to decode. Use [`Iterator::next`] instead if you need to
194    /// observe decode errors.
195    pub fn try_next(&mut self) -> Option<Row> {
196        self.next().and_then(|r| r.ok())
197    }
198
199    /// Decode a pending row into a typed [`Row`].
200    fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
201        match pending {
202            PendingRow::Parsed(row) => Ok(row),
203            PendingRow::Raw(raw) => {
204                let meta = self
205                    .meta
206                    .as_ref()
207                    .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
208                #[cfg(feature = "always-encrypted")]
209                if let Some(ref dec) = self.decryptor {
210                    return crate::column_parser::convert_raw_row_decrypted(
211                        &raw,
212                        meta,
213                        &self.columns,
214                        dec,
215                    );
216                }
217                crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
218            }
219            PendingRow::Nbc(nbc) => {
220                let meta = self
221                    .meta
222                    .as_ref()
223                    .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
224                #[cfg(feature = "always-encrypted")]
225                if let Some(ref dec) = self.decryptor {
226                    return crate::column_parser::convert_nbc_row_decrypted(
227                        &nbc,
228                        meta,
229                        &self.columns,
230                        dec,
231                    );
232                }
233                crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
234            }
235        }
236    }
237}
238
239impl Stream for QueryStream<'_> {
240    type Item = Result<Row, Error>;
241
242    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243        let this = self.get_mut();
244
245        if this.finished {
246            return Poll::Ready(None);
247        }
248
249        match this.rows.pop_front() {
250            Some(pending) => Poll::Ready(Some(this.decode(pending))),
251            None => {
252                this.finished = true;
253                Poll::Ready(None)
254            }
255        }
256    }
257}
258
259impl ExactSizeIterator for QueryStream<'_> {}
260
261impl Iterator for QueryStream<'_> {
262    type Item = Result<Row, Error>;
263
264    fn next(&mut self) -> Option<Self::Item> {
265        if self.finished {
266            return None;
267        }
268
269        match self.rows.pop_front() {
270            Some(pending) => Some(self.decode(pending)),
271            None => {
272                self.finished = true;
273                None
274            }
275        }
276    }
277
278    fn size_hint(&self) -> (usize, Option<usize>) {
279        let remaining = self.rows.len();
280        (remaining, Some(remaining))
281    }
282}
283
284/// Result of a non-query execution.
285///
286/// Contains the number of affected rows and any output parameters.
287#[derive(Debug, Clone)]
288#[non_exhaustive]
289#[must_use]
290pub struct ExecuteResult {
291    /// Number of rows affected by the statement.
292    pub rows_affected: u64,
293    /// Output parameters from stored procedures.
294    pub output_params: Vec<OutputParam>,
295}
296
297/// An output parameter from a stored procedure call.
298#[derive(Debug, Clone)]
299#[non_exhaustive]
300pub struct OutputParam {
301    /// Parameter name.
302    pub name: String,
303    /// Parameter value.
304    pub value: mssql_types::SqlValue,
305}
306
307impl ExecuteResult {
308    /// Create a new execute result.
309    pub fn new(rows_affected: u64) -> Self {
310        Self {
311            rows_affected,
312            output_params: Vec::new(),
313        }
314    }
315
316    /// Create a result with output parameters.
317    pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
318        Self {
319            rows_affected,
320            output_params,
321        }
322    }
323
324    /// Get an output parameter by name.
325    #[must_use]
326    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
327        self.output_params
328            .iter()
329            .find(|p| p.name.eq_ignore_ascii_case(name))
330    }
331}
332
333/// Result of a stored procedure execution.
334///
335/// Contains the return value, affected row count, output parameters,
336/// and any result sets produced by the procedure.
337///
338/// # Example
339///
340/// ```rust,ignore
341/// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
342///
343/// // Check the return value (RETURN statement in the proc)
344/// assert_eq!(result.return_value, 0);
345///
346/// // Process result sets
347/// for mut rs in result.result_sets {
348///     while let Some(row) = rs.next_row() {
349///         let row = row?;
350///         println!("{:?}", row);
351///     }
352/// }
353/// ```
354#[derive(Debug, Clone)]
355#[non_exhaustive]
356#[must_use]
357pub struct ProcedureResult {
358    /// Return value from the stored procedure's RETURN statement.
359    ///
360    /// Defaults to 0 if the procedure does not explicitly return a value,
361    /// which matches SQL Server's default behavior.
362    pub return_value: i32,
363    /// Total number of rows affected by statements within the procedure.
364    pub rows_affected: u64,
365    /// Output parameters returned by the procedure.
366    pub output_params: Vec<OutputParam>,
367    /// Result sets produced by SELECT statements within the procedure.
368    pub result_sets: Vec<ResultSet>,
369}
370
371impl ProcedureResult {
372    /// Create a new empty procedure result.
373    pub(crate) fn new() -> Self {
374        Self {
375            return_value: 0,
376            rows_affected: 0,
377            output_params: Vec::new(),
378            result_sets: Vec::new(),
379        }
380    }
381
382    /// Get the return value from the stored procedure.
383    ///
384    /// This is the value from the procedure's `RETURN` statement.
385    /// Defaults to 0 if not explicitly set by the procedure.
386    #[must_use]
387    pub fn get_return_value(&self) -> i32 {
388        self.return_value
389    }
390
391    /// Get an output parameter by name (case-insensitive).
392    ///
393    /// Strips the `@` prefix from both the search name and stored names
394    /// before comparing, so `get_output("result")` and `get_output("@result")`
395    /// are equivalent.
396    ///
397    /// # Example
398    ///
399    /// ```rust,ignore
400    /// let result = client.procedure("dbo.CalculateSum")?
401    ///     .input("@a", &10i32)
402    ///     .input("@b", &20i32)
403    ///     .output_int("@result")
404    ///     .execute().await?;
405    ///
406    /// let output = result.get_output("@result").expect("output param exists");
407    /// assert_eq!(output.value, SqlValue::Int(30));
408    /// ```
409    #[must_use]
410    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
411        let search = name.strip_prefix('@').unwrap_or(name);
412        self.output_params.iter().find(|p| {
413            let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
414            stored.eq_ignore_ascii_case(search)
415        })
416    }
417
418    /// Get the first result set, if any.
419    ///
420    /// Convenience method for procedures that return a single result set.
421    #[must_use]
422    pub fn first_result_set(&self) -> Option<&ResultSet> {
423        self.result_sets.first()
424    }
425
426    /// Check if the procedure produced any result sets.
427    #[must_use]
428    pub fn has_result_sets(&self) -> bool {
429        !self.result_sets.is_empty()
430    }
431}
432
433/// A single result set within a multi-result batch.
434///
435/// Rows are stored as `PendingRow` values that may be either already-decoded
436/// [`Row`]s (eager path, used by tests and direct construction) or raw TDS
437/// bytes (lazy path, used by [`crate::Client::call_procedure`] and
438/// [`crate::Client::query_multiple`]). Decoding happens on pull, so per-row
439/// decode errors surface through [`ResultSet::next_row`] and
440/// [`ResultSet::collect_all`].
441#[derive(Debug, Clone)]
442#[must_use]
443pub struct ResultSet {
444    /// Column metadata for this result set.
445    columns: Vec<Column>,
446    /// Pending rows — either pre-parsed or raw TDS bytes awaiting decode.
447    pending_rows: VecDeque<PendingRow>,
448    /// Protocol metadata required to decode raw rows. `None` when every
449    /// pending row is already [`PendingRow::Parsed`] (eager path).
450    meta: Option<ColMetaData>,
451    /// Pre-resolved column decryptor for Always Encrypted result sets.
452    ///
453    /// Wrapped in `Arc` so cloning a [`ResultSet`] stays cheap (clones share
454    /// the underlying decryptor state instead of duplicating derived keys).
455    #[cfg(feature = "always-encrypted")]
456    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
457}
458
459impl ResultSet {
460    /// Create a new result set from already-decoded rows.
461    ///
462    /// This is the eager constructor used by tests and callers that already
463    /// hold typed [`Row`]s. Production query paths use `ResultSet::from_raw`
464    /// (private) to defer row decoding.
465    pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
466        Self {
467            columns,
468            pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
469            meta: None,
470            #[cfg(feature = "always-encrypted")]
471            decryptor: None,
472        }
473    }
474
475    /// Create a result set from raw row bytes and protocol metadata.
476    ///
477    /// Rows are decoded on demand as the caller pulls them via
478    /// [`ResultSet::next_row`] or [`ResultSet::collect_all`]. The `meta` must
479    /// describe every row in `pending`. If decryption is configured,
480    /// `decryptor` must cover the same column set.
481    pub(crate) fn from_raw(
482        columns: Vec<Column>,
483        pending: Vec<PendingRow>,
484        meta: ColMetaData,
485        #[cfg(feature = "always-encrypted")] decryptor: Option<
486            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
487        >,
488    ) -> Self {
489        Self {
490            columns,
491            pending_rows: pending.into(),
492            meta: Some(meta),
493            #[cfg(feature = "always-encrypted")]
494            decryptor,
495        }
496    }
497
498    /// Get the column metadata.
499    #[must_use]
500    pub fn columns(&self) -> &[Column] {
501        &self.columns
502    }
503
504    /// Get the number of rows remaining.
505    #[must_use]
506    pub fn rows_remaining(&self) -> usize {
507        self.pending_rows.len()
508    }
509
510    /// Get the next row from this result set.
511    ///
512    /// Returns `None` when no more rows remain, or `Some(Err(_))` when the
513    /// next pending row fails to decode. The stream is not short-circuited
514    /// on decode error — the caller may continue to pull subsequent rows.
515    pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
516        self.pending_rows.pop_front().map(|p| self.decode(p))
517    }
518
519    /// Check if this result set is empty.
520    #[must_use]
521    pub fn is_empty(&self) -> bool {
522        self.pending_rows.is_empty()
523    }
524
525    /// Collect all remaining rows into a vector.
526    ///
527    /// Stops at the first decode error and returns it.
528    pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
529        let mut out = Vec::with_capacity(self.pending_rows.len());
530        while let Some(pending) = self.pending_rows.pop_front() {
531            out.push(self.decode(pending)?);
532        }
533        Ok(out)
534    }
535
536    /// Decode a pending row into a typed [`Row`].
537    fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
538        match pending {
539            PendingRow::Parsed(row) => Ok(row),
540            PendingRow::Raw(raw) => {
541                let meta = self
542                    .meta
543                    .as_ref()
544                    .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
545                #[cfg(feature = "always-encrypted")]
546                if let Some(ref dec) = self.decryptor {
547                    return crate::column_parser::convert_raw_row_decrypted(
548                        &raw,
549                        meta,
550                        &self.columns,
551                        dec,
552                    );
553                }
554                crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
555            }
556            PendingRow::Nbc(nbc) => {
557                let meta = self
558                    .meta
559                    .as_ref()
560                    .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
561                #[cfg(feature = "always-encrypted")]
562                if let Some(ref dec) = self.decryptor {
563                    return crate::column_parser::convert_nbc_row_decrypted(
564                        &nbc,
565                        meta,
566                        &self.columns,
567                        dec,
568                    );
569                }
570                crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
571            }
572        }
573    }
574
575    /// Consume this result set and produce a [`QueryStream`] that carries the
576    /// same pending rows and decode state.
577    ///
578    /// Used by [`MultiResultStream::into_query_streams`] — avoids eagerly
579    /// materializing rows when the caller wants stream-level ergonomics.
580    fn into_query_stream<'a>(self) -> QueryStream<'a> {
581        QueryStream {
582            columns: self.columns,
583            rows: self.pending_rows,
584            meta: self.meta,
585            #[cfg(feature = "always-encrypted")]
586            decryptor: self.decryptor,
587            finished: false,
588            _marker: std::marker::PhantomData,
589        }
590    }
591}
592
593/// Multiple result sets from a batch or stored procedure.
594///
595/// Some queries return multiple result sets (e.g., stored procedures
596/// with multiple SELECT statements, or batches with multiple queries).
597///
598/// # Example
599///
600/// ```rust,ignore
601/// // Execute a batch with multiple SELECTs
602/// let mut results = client.query_multiple("SELECT 1 AS a; SELECT 2 AS b, 3 AS c;", &[]).await?;
603///
604/// // Process first result set
605/// while let Some(row) = results.next_row().await? {
606///     println!("Result 1: {:?}", row);
607/// }
608///
609/// // Move to second result set
610/// if results.next_result().await? {
611///     while let Some(row) = results.next_row().await? {
612///         println!("Result 2: {:?}", row);
613///     }
614/// }
615/// ```
616#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
617pub struct MultiResultStream<'a> {
618    /// All result sets from the batch.
619    result_sets: Vec<ResultSet>,
620    /// Current result set index (0-based).
621    current_result: usize,
622    /// Lifetime tied to the connection.
623    _marker: std::marker::PhantomData<&'a ()>,
624}
625
626impl<'a> MultiResultStream<'a> {
627    /// Create a new multi-result stream from parsed result sets.
628    pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
629        Self {
630            result_sets,
631            current_result: 0,
632            _marker: std::marker::PhantomData,
633        }
634    }
635
636    /// Create an empty multi-result stream.
637    #[allow(dead_code)]
638    pub(crate) fn empty() -> Self {
639        Self {
640            result_sets: Vec::new(),
641            current_result: 0,
642            _marker: std::marker::PhantomData,
643        }
644    }
645
646    /// Get the current result set index (0-based).
647    #[must_use]
648    pub fn current_result_index(&self) -> usize {
649        self.current_result
650    }
651
652    /// Get the total number of result sets.
653    #[must_use]
654    pub fn result_count(&self) -> usize {
655        self.result_sets.len()
656    }
657
658    /// Check if there are more result sets after the current one.
659    #[must_use]
660    pub fn has_more_results(&self) -> bool {
661        self.current_result + 1 < self.result_sets.len()
662    }
663
664    /// Get the column metadata for the current result set.
665    ///
666    /// Returns `None` if there are no result sets or we've moved past all of them.
667    #[must_use]
668    pub fn columns(&self) -> Option<&[Column]> {
669        self.result_sets
670            .get(self.current_result)
671            .map(|rs| rs.columns())
672    }
673
674    /// Move to the next result set.
675    ///
676    /// Returns `true` if there is another result set, `false` if no more.
677    pub async fn next_result(&mut self) -> Result<bool, Error> {
678        if self.current_result + 1 < self.result_sets.len() {
679            self.current_result += 1;
680            Ok(true)
681        } else {
682            Ok(false)
683        }
684    }
685
686    /// Get the next row from the current result set.
687    ///
688    /// Returns `None` when no more rows in the current result set.
689    /// Call `next_result()` to move to the next result set.
690    ///
691    /// Per-row decode errors (from lazy row decoding) surface here as
692    /// `Err(_)`. Pre-2.9 this reader decoded rows eagerly and decode errors
693    /// surfaced at `query_multiple().await?` instead.
694    pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
695        if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
696            result_set.next_row().transpose()
697        } else {
698            Ok(None)
699        }
700    }
701
702    /// Get a mutable reference to the current result set.
703    #[must_use]
704    pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
705        self.result_sets.get_mut(self.current_result)
706    }
707
708    /// Collect all rows from the current result set.
709    ///
710    /// Returns `Ok(vec![])` if the current result index is out of range
711    /// (e.g., all result sets have been consumed). Propagates decode errors
712    /// from the underlying lazy row parser.
713    pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
714        match self.result_sets.get_mut(self.current_result) {
715            Some(rs) => rs.collect_all(),
716            None => Ok(Vec::new()),
717        }
718    }
719
720    /// Consume the stream and return all result sets as `QueryStream`s.
721    pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
722        self.result_sets
723            .into_iter()
724            .map(ResultSet::into_query_stream)
725            .collect()
726    }
727}
728
729#[cfg(test)]
730#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
731mod tests {
732    use super::*;
733
734    #[test]
735    fn test_execute_result() {
736        let result = ExecuteResult::new(42);
737        assert_eq!(result.rows_affected, 42);
738        assert!(result.output_params.is_empty());
739    }
740
741    #[test]
742    fn test_procedure_result_defaults() {
743        let result = ProcedureResult::new();
744        assert_eq!(result.return_value, 0);
745        assert_eq!(result.rows_affected, 0);
746        assert!(result.output_params.is_empty());
747        assert!(result.result_sets.is_empty());
748        assert!(!result.has_result_sets());
749        assert!(result.first_result_set().is_none());
750    }
751
752    #[test]
753    fn test_procedure_result_get_output() {
754        let mut result = ProcedureResult::new();
755        result.output_params.push(OutputParam {
756            name: "@Total".to_string(),
757            value: mssql_types::SqlValue::Int(42),
758        });
759        result.output_params.push(OutputParam {
760            name: "@Message".to_string(),
761            value: mssql_types::SqlValue::String("ok".to_string()),
762        });
763
764        // Exact match (case-insensitive)
765        assert!(result.get_output("@Total").is_some());
766        assert!(result.get_output("@total").is_some());
767        assert!(result.get_output("@TOTAL").is_some());
768
769        // @ prefix stripping
770        assert!(result.get_output("Total").is_some());
771        assert!(result.get_output("total").is_some());
772
773        // Non-existent
774        assert!(result.get_output("@NotHere").is_none());
775        assert!(result.get_output("NotHere").is_none());
776    }
777
778    #[test]
779    fn test_procedure_result_with_result_sets() {
780        use mssql_types::SqlValue;
781
782        let columns = vec![Column {
783            name: "id".to_string(),
784            index: 0,
785            type_name: "INT".to_string(),
786            nullable: false,
787            max_length: Some(4),
788            precision: None,
789            scale: None,
790            collation: None,
791        }];
792        let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
793        let rs = ResultSet::new(columns, rows);
794
795        let mut result = ProcedureResult::new();
796        result.result_sets.push(rs);
797        result.return_value = 7;
798        result.rows_affected = 5;
799
800        assert!(result.has_result_sets());
801        assert_eq!(result.get_return_value(), 7);
802        assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
803    }
804
805    #[test]
806    fn test_execute_result_with_outputs() {
807        let outputs = vec![OutputParam {
808            name: "ReturnValue".to_string(),
809            value: mssql_types::SqlValue::Int(100),
810        }];
811
812        let result = ExecuteResult::with_outputs(10, outputs);
813        assert_eq!(result.rows_affected, 10);
814        assert!(result.get_output("ReturnValue").is_some());
815        assert!(result.get_output("returnvalue").is_some()); // case-insensitive
816        assert!(result.get_output("NotFound").is_none());
817    }
818
819    #[test]
820    fn test_query_stream_columns() {
821        let columns = vec![Column {
822            name: "id".to_string(),
823            index: 0,
824            type_name: "INT".to_string(),
825            nullable: false,
826            max_length: Some(4),
827            precision: Some(0),
828            scale: Some(0),
829            collation: None,
830        }];
831
832        let stream = QueryStream::new(columns, Vec::new());
833        assert_eq!(stream.columns().len(), 1);
834        assert_eq!(stream.columns()[0].name, "id");
835        assert!(!stream.is_finished());
836    }
837
838    #[test]
839    fn test_query_stream_with_rows() {
840        use mssql_types::SqlValue;
841
842        let columns = vec![
843            Column {
844                name: "id".to_string(),
845                index: 0,
846                type_name: "INT".to_string(),
847                nullable: false,
848                max_length: Some(4),
849                precision: None,
850                scale: None,
851                collation: None,
852            },
853            Column {
854                name: "name".to_string(),
855                index: 1,
856                type_name: "NVARCHAR".to_string(),
857                nullable: true,
858                max_length: Some(100),
859                precision: None,
860                scale: None,
861                collation: None,
862            },
863        ];
864
865        let rows = vec![
866            Row::from_values(
867                columns.clone(),
868                vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
869            ),
870            Row::from_values(
871                columns.clone(),
872                vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
873            ),
874        ];
875
876        let mut stream = QueryStream::new(columns, rows);
877        assert_eq!(stream.columns().len(), 2);
878        assert_eq!(stream.rows_remaining(), 2);
879        assert!(!stream.is_finished());
880
881        // First row
882        let row1 = stream.try_next().unwrap();
883        assert_eq!(row1.get::<i32>(0).unwrap(), 1);
884        assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
885
886        // Second row
887        let row2 = stream.try_next().unwrap();
888        assert_eq!(row2.get::<i32>(0).unwrap(), 2);
889        assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
890
891        // No more rows
892        assert!(stream.try_next().is_none());
893        assert!(stream.is_finished());
894    }
895
896    #[test]
897    fn test_query_stream_iterator() {
898        use mssql_types::SqlValue;
899
900        let columns = vec![Column {
901            name: "val".to_string(),
902            index: 0,
903            type_name: "INT".to_string(),
904            nullable: false,
905            max_length: None,
906            precision: None,
907            scale: None,
908            collation: None,
909        }];
910
911        let rows = vec![
912            Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
913            Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
914            Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
915        ];
916
917        let mut stream = QueryStream::new(columns, rows);
918
919        // Use iterator — unwrap each Result so test failures are visible
920        // (QueryStream's Iterator impl always yields Ok, but we should
921        // not silently swallow errors if that ever changes)
922        let values: Vec<i32> = stream
923            .by_ref()
924            .map(|r| r.unwrap().get::<i32>(0).unwrap())
925            .collect();
926
927        assert_eq!(values, vec![10, 20, 30]);
928        assert!(stream.is_finished());
929    }
930
931    #[test]
932    fn test_query_stream_empty() {
933        let stream = QueryStream::empty();
934        assert!(stream.columns().is_empty());
935        assert_eq!(stream.rows_remaining(), 0);
936        assert!(stream.is_finished());
937    }
938
939    /// Exercises the lazy-decode path: rows are stored as raw TDS bytes and
940    /// decoded only when the caller pulls them. Mirrors what
941    /// `read_query_response` now produces and pins the contract between
942    /// `PendingRow::Raw` and the per-row decode in `poll_next`/`next`.
943    #[test]
944    fn test_query_stream_lazy_raw_row_decoding() {
945        use bytes::Bytes;
946        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
947        use tds_protocol::types::TypeId;
948
949        // Build raw row bytes for two columns: IntN(42) + IntN(NULL).
950        let mut data = Vec::new();
951        data.push(4); // IntN length prefix — 4 bytes
952        data.extend_from_slice(&42i32.to_le_bytes());
953        data.push(0); // IntN NULL (zero-length)
954
955        let meta = ColMetaData {
956            columns: vec![
957                ColumnData {
958                    name: "a".to_string(),
959                    type_id: TypeId::IntN,
960                    col_type: 0x26,
961                    flags: 0x00,
962                    user_type: 0,
963                    type_info: TypeInfo {
964                        max_length: Some(4),
965                        precision: None,
966                        scale: None,
967                        collation: None,
968                    },
969                    crypto_metadata: None,
970                },
971                ColumnData {
972                    name: "b".to_string(),
973                    type_id: TypeId::IntN,
974                    col_type: 0x26,
975                    flags: 0x01,
976                    user_type: 0,
977                    type_info: TypeInfo {
978                        max_length: Some(4),
979                        precision: None,
980                        scale: None,
981                        collation: None,
982                    },
983                    crypto_metadata: None,
984                },
985            ],
986            cek_table: None,
987        };
988
989        let columns = vec![
990            Column {
991                name: "a".to_string(),
992                index: 0,
993                type_name: "INT".to_string(),
994                nullable: false,
995                max_length: Some(4),
996                precision: None,
997                scale: None,
998                collation: None,
999            },
1000            Column {
1001                name: "b".to_string(),
1002                index: 1,
1003                type_name: "INT".to_string(),
1004                nullable: true,
1005                max_length: Some(4),
1006                precision: None,
1007                scale: None,
1008                collation: None,
1009            },
1010        ];
1011
1012        let pending = vec![PendingRow::Raw(RawRow {
1013            data: Bytes::from(data),
1014        })];
1015
1016        #[cfg(feature = "always-encrypted")]
1017        let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1018        #[cfg(not(feature = "always-encrypted"))]
1019        let mut stream = QueryStream::from_raw(columns, pending, meta);
1020
1021        assert_eq!(stream.rows_remaining(), 1);
1022        let row = stream
1023            .next()
1024            .expect("one row pending")
1025            .expect("row decoded successfully");
1026        assert_eq!(row.get::<i32>(0).unwrap(), 42);
1027        assert!(row.is_null(1));
1028        assert!(stream.next().is_none());
1029        assert!(stream.is_finished());
1030    }
1031
1032    /// Decoder errors must surface per-row via `Stream`/`Iterator` without
1033    /// derailing the stream state. Truncated raw bytes trigger a decode
1034    /// error that the caller observes as `Some(Err(_))`.
1035    #[test]
1036    fn test_query_stream_lazy_decode_error_propagates() {
1037        use bytes::Bytes;
1038        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1039        use tds_protocol::types::TypeId;
1040
1041        // Declare an Int4 column but provide only 2 bytes — decode must fail.
1042        let data = vec![0x01u8, 0x02];
1043
1044        let meta = ColMetaData {
1045            columns: vec![ColumnData {
1046                name: "a".to_string(),
1047                type_id: TypeId::Int4,
1048                col_type: 0x38,
1049                flags: 0x00,
1050                user_type: 0,
1051                type_info: TypeInfo {
1052                    max_length: Some(4),
1053                    precision: None,
1054                    scale: None,
1055                    collation: None,
1056                },
1057                crypto_metadata: None,
1058            }],
1059            cek_table: None,
1060        };
1061
1062        let columns = vec![Column {
1063            name: "a".to_string(),
1064            index: 0,
1065            type_name: "INT".to_string(),
1066            nullable: false,
1067            max_length: Some(4),
1068            precision: None,
1069            scale: None,
1070            collation: None,
1071        }];
1072
1073        let pending = vec![PendingRow::Raw(RawRow {
1074            data: Bytes::from(data),
1075        })];
1076
1077        #[cfg(feature = "always-encrypted")]
1078        let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1079        #[cfg(not(feature = "always-encrypted"))]
1080        let mut stream = QueryStream::from_raw(columns, pending, meta);
1081
1082        let item = stream.next().expect("pending row present");
1083        assert!(item.is_err(), "truncated bytes must surface a decode error");
1084        assert!(stream.next().is_none());
1085    }
1086
1087    /// Helper to build a single-column IntN metadata block for the lazy
1088    /// `ResultSet` / `MultiResultStream` tests below.
1089    #[cfg(test)]
1090    fn intn_meta_and_columns(
1091        col_name: &str,
1092        nullable: bool,
1093    ) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
1094        use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1095        use tds_protocol::types::TypeId;
1096        (
1097            ColMetaData {
1098                columns: vec![ColumnData {
1099                    name: col_name.to_string(),
1100                    type_id: TypeId::IntN,
1101                    col_type: 0x26,
1102                    flags: if nullable { 0x01 } else { 0x00 },
1103                    user_type: 0,
1104                    type_info: TypeInfo {
1105                        max_length: Some(4),
1106                        precision: None,
1107                        scale: None,
1108                        collation: None,
1109                    },
1110                    crypto_metadata: None,
1111                }],
1112                cek_table: None,
1113            },
1114            vec![Column {
1115                name: col_name.to_string(),
1116                index: 0,
1117                type_name: "INT".to_string(),
1118                nullable,
1119                max_length: Some(4),
1120                precision: None,
1121                scale: None,
1122                collation: None,
1123            }],
1124        )
1125    }
1126
1127    /// Exercises the `ResultSet` lazy-decode path introduced in 2.9.
1128    /// Mirrors `test_query_stream_lazy_raw_row_decoding` but via the
1129    /// result-set API that `call_procedure` / `query_multiple` expose.
1130    #[test]
1131    fn test_result_set_lazy_raw_row_decoding() {
1132        use bytes::Bytes;
1133        use tds_protocol::token::RawRow;
1134
1135        let (meta, columns) = intn_meta_and_columns("a", false);
1136
1137        // Two rows: 7 and 11 encoded as IntN(4).
1138        let pending = vec![
1139            PendingRow::Raw(RawRow {
1140                data: {
1141                    let mut b = Vec::with_capacity(5);
1142                    b.push(4);
1143                    b.extend_from_slice(&7i32.to_le_bytes());
1144                    Bytes::from(b)
1145                },
1146            }),
1147            PendingRow::Raw(RawRow {
1148                data: {
1149                    let mut b = Vec::with_capacity(5);
1150                    b.push(4);
1151                    b.extend_from_slice(&11i32.to_le_bytes());
1152                    Bytes::from(b)
1153                },
1154            }),
1155        ];
1156
1157        #[cfg(feature = "always-encrypted")]
1158        let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1159        #[cfg(not(feature = "always-encrypted"))]
1160        let mut rs = ResultSet::from_raw(columns, pending, meta);
1161
1162        assert_eq!(rs.rows_remaining(), 2);
1163        assert!(!rs.is_empty());
1164
1165        let row1 = rs.next_row().expect("row present").expect("decodes");
1166        assert_eq!(row1.get::<i32>(0).unwrap(), 7);
1167
1168        let row2 = rs.next_row().expect("row present").expect("decodes");
1169        assert_eq!(row2.get::<i32>(0).unwrap(), 11);
1170
1171        assert!(rs.next_row().is_none());
1172        assert!(rs.is_empty());
1173    }
1174
1175    /// Decoder errors in `ResultSet::next_row` must surface per-row without
1176    /// derailing further calls. Same contract as
1177    /// `test_query_stream_lazy_decode_error_propagates`.
1178    #[test]
1179    fn test_result_set_lazy_decode_error_propagates() {
1180        use bytes::Bytes;
1181        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1182        use tds_protocol::types::TypeId;
1183
1184        // Int4 (not IntN) with only 2 bytes → decode must fail.
1185        let meta = ColMetaData {
1186            columns: vec![ColumnData {
1187                name: "a".to_string(),
1188                type_id: TypeId::Int4,
1189                col_type: 0x38,
1190                flags: 0x00,
1191                user_type: 0,
1192                type_info: TypeInfo {
1193                    max_length: Some(4),
1194                    precision: None,
1195                    scale: None,
1196                    collation: None,
1197                },
1198                crypto_metadata: None,
1199            }],
1200            cek_table: None,
1201        };
1202        let columns = vec![Column {
1203            name: "a".to_string(),
1204            index: 0,
1205            type_name: "INT".to_string(),
1206            nullable: false,
1207            max_length: Some(4),
1208            precision: None,
1209            scale: None,
1210            collation: None,
1211        }];
1212
1213        let pending = vec![PendingRow::Raw(RawRow {
1214            data: Bytes::from(vec![0x01u8, 0x02]),
1215        })];
1216
1217        #[cfg(feature = "always-encrypted")]
1218        let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1219        #[cfg(not(feature = "always-encrypted"))]
1220        let mut rs = ResultSet::from_raw(columns, pending, meta);
1221
1222        let first = rs.next_row().expect("pending row present");
1223        assert!(
1224            first.is_err(),
1225            "truncated bytes must surface a decode error"
1226        );
1227        assert!(rs.next_row().is_none());
1228    }
1229
1230    /// `collect_all` on a lazy `ResultSet` decodes every pending row and
1231    /// propagates the first decode error. Ensures 2.9's signature change is
1232    /// exercised end-to-end.
1233    #[test]
1234    fn test_result_set_lazy_collect_all_success_and_error() {
1235        use bytes::Bytes;
1236        use tds_protocol::token::RawRow;
1237
1238        // Success: two rows decode cleanly.
1239        let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
1240        let pending_ok = vec![
1241            PendingRow::Raw(RawRow {
1242                data: {
1243                    let mut b = Vec::with_capacity(5);
1244                    b.push(4);
1245                    b.extend_from_slice(&10i32.to_le_bytes());
1246                    Bytes::from(b)
1247                },
1248            }),
1249            PendingRow::Raw(RawRow {
1250                data: {
1251                    let mut b = Vec::with_capacity(5);
1252                    b.push(4);
1253                    b.extend_from_slice(&20i32.to_le_bytes());
1254                    Bytes::from(b)
1255                },
1256            }),
1257        ];
1258
1259        #[cfg(feature = "always-encrypted")]
1260        let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
1261        #[cfg(not(feature = "always-encrypted"))]
1262        let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
1263        let rows = rs_ok.collect_all().expect("all rows decode");
1264        assert_eq!(rows.len(), 2);
1265        assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
1266        assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
1267        assert!(rs_ok.is_empty());
1268
1269        // Error: a truncated row (declared Int4 with only 2 bytes) makes
1270        // collect_all fail. collect_all short-circuits on the first Err.
1271        use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1272        use tds_protocol::types::TypeId;
1273        let meta_err = ColMetaData {
1274            columns: vec![ColumnData {
1275                name: "a".to_string(),
1276                type_id: TypeId::Int4,
1277                col_type: 0x38,
1278                flags: 0x00,
1279                user_type: 0,
1280                type_info: TypeInfo {
1281                    max_length: Some(4),
1282                    precision: None,
1283                    scale: None,
1284                    collation: None,
1285                },
1286                crypto_metadata: None,
1287            }],
1288            cek_table: None,
1289        };
1290        let cols_err = vec![Column {
1291            name: "a".to_string(),
1292            index: 0,
1293            type_name: "INT".to_string(),
1294            nullable: false,
1295            max_length: Some(4),
1296            precision: None,
1297            scale: None,
1298            collation: None,
1299        }];
1300        let pending_err = vec![PendingRow::Raw(RawRow {
1301            data: Bytes::from(vec![0x01u8, 0x02]),
1302        })];
1303
1304        #[cfg(feature = "always-encrypted")]
1305        let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
1306        #[cfg(not(feature = "always-encrypted"))]
1307        let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
1308        let err = rs_err.collect_all();
1309        assert!(err.is_err(), "collect_all must propagate decode error");
1310    }
1311
1312    /// `MultiResultStream` end-to-end lazy-decode path: two lazy `ResultSet`s
1313    /// decoded on demand as the caller walks through via `next_row` /
1314    /// `next_result`. Pins the 2.9 refactor of `read_multi_result_response`.
1315    #[tokio::test]
1316    async fn test_multi_result_stream_lazy_decode_across_result_sets() {
1317        use bytes::Bytes;
1318        use tds_protocol::token::RawRow;
1319
1320        let (meta1, cols1) = intn_meta_and_columns("a", false);
1321        let pending1 = vec![PendingRow::Raw(RawRow {
1322            data: {
1323                let mut b = Vec::with_capacity(5);
1324                b.push(4);
1325                b.extend_from_slice(&101i32.to_le_bytes());
1326                Bytes::from(b)
1327            },
1328        })];
1329        #[cfg(feature = "always-encrypted")]
1330        let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
1331        #[cfg(not(feature = "always-encrypted"))]
1332        let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
1333
1334        let (meta2, cols2) = intn_meta_and_columns("b", false);
1335        let pending2 = vec![PendingRow::Raw(RawRow {
1336            data: {
1337                let mut b = Vec::with_capacity(5);
1338                b.push(4);
1339                b.extend_from_slice(&202i32.to_le_bytes());
1340                Bytes::from(b)
1341            },
1342        })];
1343        #[cfg(feature = "always-encrypted")]
1344        let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
1345        #[cfg(not(feature = "always-encrypted"))]
1346        let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
1347
1348        let mut stream = MultiResultStream::new(vec![rs1, rs2]);
1349        assert_eq!(stream.result_count(), 2);
1350        assert_eq!(stream.current_result_index(), 0);
1351
1352        let row = stream
1353            .next_row()
1354            .await
1355            .expect("first row success")
1356            .expect("row present");
1357        assert_eq!(row.get::<i32>(0).unwrap(), 101);
1358        assert!(stream.next_row().await.expect("no more rows").is_none());
1359
1360        assert!(stream.has_more_results());
1361        assert!(stream.next_result().await.expect("advance ok"));
1362        assert_eq!(stream.current_result_index(), 1);
1363
1364        let row = stream
1365            .next_row()
1366            .await
1367            .expect("second row success")
1368            .expect("row present");
1369        assert_eq!(row.get::<i32>(0).unwrap(), 202);
1370    }
1371}