Skip to main content

mssql_client/
stream.rs

1//! Query result sets with lazy per-row decoding.
2//!
3//! The full server response is buffered in memory first; rows are then
4//! *decoded* lazily as callers pull them. This is not incremental
5//! network streaming — see the next section for what that means for memory.
6//!
7//! ## Buffered vs True Streaming
8//!
9//! The underlying TDS response is reassembled into a single [`bytes::Bytes`]
10//! payload by [`mssql-codec`](mssql_codec). That payload is handed to the
11//! token parser which walks it once and enqueues each row's raw byte slice
12//! (a cheap, refcounted slice into the original [`bytes::Bytes`] per ADR-004)
13//! into the stream. Individual [`Row`]s are then decoded lazily when callers
14//! pull them — either via the [`Stream`]/[`Iterator`] impls or
15//! [`QueryStream::collect_all`].
16//!
17//! This "lazy decode" pattern keeps peak memory at roughly the size of the
18//! raw payload instead of payload + fully-typed `Vec<Row>`. Users who iterate
19//! and drop each [`Row`] see memory proportional to a single row at a time
20//! plus the shared raw payload. Users who `collect_all()` pay for the full
21//! `Vec<Row>` just like before.
22//!
23//! The same lazy-decode pattern applies to [`MultiResultStream`],
24//! [`ResultSet`], and [`ProcedureResult::result_sets`]: raw row bytes are
25//! stashed during response read and each [`Row`] is decoded when the caller
26//! pulls it. Because decoding can fail per row, [`ResultSet::next_row`]
27//! returns `Option<Result<Row, Error>>` rather than `Option<Row>` — callers
28//! observe decode errors at iteration time instead of at
29//! `call_procedure().await?` / `query_multiple().await?`.
30//!
31//! For truly large result sets that should not be buffered at all, use
32//! [`Client::query_stream`](crate::Client::query_stream), which reads packets
33//! from the network on demand (peak memory ~one row), or page with OFFSET/FETCH.
34
35use std::collections::VecDeque;
36use std::pin::Pin;
37use std::task::{Context, Poll};
38
39use futures_core::Stream;
40use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
41
42use crate::error::Error;
43use crate::row::{Column, Row};
44
45/// A row that may be already decoded or still held as raw TDS bytes.
46///
47/// The lazy-parse query path enqueues raw rows (cheap `Bytes` slices into
48/// the original response payload) and decodes them on demand. The eager
49/// path used by tests and [`MultiResultStream::into_query_streams`] wraps
50/// already-decoded rows.
51#[derive(Debug, Clone)]
52pub(crate) enum PendingRow {
53    /// Already-decoded row (eager path — tests + `MultiResultStream` compat).
54    Parsed(Row),
55    /// Raw TDS row bytes, to be decoded on pull.
56    Raw(RawRow),
57    /// Null-bitmap-compressed row bytes, to be decoded on pull.
58    Nbc(NbcRow),
59}
60
61/// A result set from a query, yielding rows one at a time.
62///
63/// The complete server response is already buffered in memory by the time
64/// this is returned; each [`Row`] is *decoded* lazily as it is pulled, not
65/// fetched incrementally from the network. Peak memory is therefore roughly
66/// the size of the raw response payload regardless of how you iterate. For
67/// genuinely large result sets, use
68/// [`Client::query_stream`](crate::Client::query_stream) (incremental, peak
69/// memory ~one row) or page with `OFFSET`/`FETCH` in SQL rather than relying on
70/// this type to bound memory.
71///
72/// # Example
73///
74/// ```rust,no_run
75/// # use mssql_client::Row;
76/// # fn process_row(_: &Row) {}
77/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
78/// let stream = client.query("SELECT * FROM large_table", &[]).await?;
79/// for row in stream {
80///     let row = row?;
81///     process_row(&row);
82/// }
83/// # Ok(())
84/// # }
85/// ```
86#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
87pub struct QueryStream<'a> {
88    /// Column metadata for the result set.
89    columns: Vec<Column>,
90    /// Buffered rows (typed or raw) from the response.
91    rows: VecDeque<PendingRow>,
92    /// Protocol metadata needed to decode raw rows. `None` if every pending
93    /// row is already [`PendingRow::Parsed`].
94    meta: Option<ColMetaData>,
95    /// Pre-resolved column decryptor for Always Encrypted result sets.
96    ///
97    /// Wrapped in `Arc` so lazy result-set containers (`ResultSet`) can share
98    /// this state without duplicating the derived-key material.
99    #[cfg(feature = "always-encrypted")]
100    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
101    /// Whether the stream has completed.
102    finished: bool,
103    /// Lifetime tied to the connection.
104    _marker: std::marker::PhantomData<&'a ()>,
105}
106
107impl QueryStream<'_> {
108    /// Create a new query stream from already-decoded rows.
109    ///
110    /// This is the eager constructor used by unit tests. Production query
111    /// paths use [`QueryStream::from_raw`] to defer row decoding.
112    /// [`MultiResultStream::into_query_streams`] uses
113    /// [`ResultSet::into_query_stream`] to preserve pending-row state.
114    #[cfg(test)]
115    pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
116        Self {
117            columns,
118            rows: rows.into_iter().map(PendingRow::Parsed).collect(),
119            meta: None,
120            #[cfg(feature = "always-encrypted")]
121            decryptor: None,
122            finished: false,
123            _marker: std::marker::PhantomData,
124        }
125    }
126
127    /// Create a query stream from raw row bytes and protocol metadata.
128    ///
129    /// Rows are decoded on demand as the stream is pulled. The `meta` must
130    /// describe every row in `pending`. If decryption is configured,
131    /// `decryptor` must cover the same column set.
132    pub(crate) fn from_raw(
133        columns: Vec<Column>,
134        pending: Vec<PendingRow>,
135        meta: ColMetaData,
136        #[cfg(feature = "always-encrypted")] decryptor: Option<
137            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
138        >,
139    ) -> Self {
140        Self {
141            columns,
142            rows: pending.into(),
143            meta: Some(meta),
144            #[cfg(feature = "always-encrypted")]
145            decryptor,
146            finished: false,
147            _marker: std::marker::PhantomData,
148        }
149    }
150
151    /// Create an empty query stream (no results).
152    #[allow(dead_code)]
153    pub(crate) fn empty() -> Self {
154        Self {
155            columns: Vec::new(),
156            rows: VecDeque::new(),
157            meta: None,
158            #[cfg(feature = "always-encrypted")]
159            decryptor: None,
160            finished: true,
161            _marker: std::marker::PhantomData,
162        }
163    }
164
165    /// Get the column metadata for this result set.
166    #[must_use]
167    pub fn columns(&self) -> &[Column] {
168        &self.columns
169    }
170
171    /// Check if the stream has finished.
172    #[must_use]
173    pub fn is_finished(&self) -> bool {
174        self.finished
175    }
176
177    /// Get the number of rows remaining in the buffer.
178    #[must_use]
179    pub fn rows_remaining(&self) -> usize {
180        self.rows.len()
181    }
182
183    /// Collect all remaining rows into a vector.
184    ///
185    /// This consumes the stream and loads all rows into memory. Each row is
186    /// decoded lazily here, so large raw payloads are freed as rows are
187    /// produced rather than held alongside the typed `Vec<Row>` throughout
188    /// the caller's query call.
189    ///
190    /// For very large result sets, consider iterating with the stream
191    /// instead.
192    pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
193        let mut out = Vec::with_capacity(self.rows.len());
194        while let Some(pending) = self.rows.pop_front() {
195            out.push(self.decode(pending)?);
196        }
197        self.finished = true;
198        Ok(out)
199    }
200
201    /// Try to get the next row synchronously (without async).
202    ///
203    /// Returns `None` when no more rows are available or the next pending
204    /// row fails to decode. Use [`Iterator::next`] instead if you need to
205    /// observe decode errors.
206    pub fn try_next(&mut self) -> Option<Row> {
207        self.next().and_then(|r| r.ok())
208    }
209
210    /// Decode a pending row into a typed [`Row`].
211    fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
212        match pending {
213            PendingRow::Parsed(row) => Ok(row),
214            PendingRow::Raw(raw) => {
215                let meta = self
216                    .meta
217                    .as_ref()
218                    .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
219                #[cfg(feature = "always-encrypted")]
220                if let Some(ref dec) = self.decryptor {
221                    return crate::column_parser::convert_raw_row_decrypted(
222                        &raw,
223                        meta,
224                        &self.columns,
225                        dec,
226                    );
227                }
228                crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
229            }
230            PendingRow::Nbc(nbc) => {
231                let meta = self
232                    .meta
233                    .as_ref()
234                    .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
235                #[cfg(feature = "always-encrypted")]
236                if let Some(ref dec) = self.decryptor {
237                    return crate::column_parser::convert_nbc_row_decrypted(
238                        &nbc,
239                        meta,
240                        &self.columns,
241                        dec,
242                    );
243                }
244                crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
245            }
246        }
247    }
248}
249
250impl Stream for QueryStream<'_> {
251    type Item = Result<Row, Error>;
252
253    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254        let this = self.get_mut();
255
256        if this.finished {
257            return Poll::Ready(None);
258        }
259
260        match this.rows.pop_front() {
261            Some(pending) => Poll::Ready(Some(this.decode(pending))),
262            None => {
263                this.finished = true;
264                Poll::Ready(None)
265            }
266        }
267    }
268}
269
270impl ExactSizeIterator for QueryStream<'_> {}
271
272impl Iterator for QueryStream<'_> {
273    type Item = Result<Row, Error>;
274
275    fn next(&mut self) -> Option<Self::Item> {
276        if self.finished {
277            return None;
278        }
279
280        match self.rows.pop_front() {
281            Some(pending) => Some(self.decode(pending)),
282            None => {
283                self.finished = true;
284                None
285            }
286        }
287    }
288
289    fn size_hint(&self) -> (usize, Option<usize>) {
290        let remaining = self.rows.len();
291        (remaining, Some(remaining))
292    }
293}
294
295/// Result of a non-query execution.
296///
297/// Contains the number of affected rows and any output parameters.
298#[derive(Debug, Clone)]
299#[non_exhaustive]
300#[must_use]
301pub struct ExecuteResult {
302    /// Number of rows affected by the statement.
303    pub rows_affected: u64,
304    /// Output parameters from stored procedures.
305    pub output_params: Vec<OutputParam>,
306}
307
308/// An output parameter from a stored procedure call.
309#[derive(Debug, Clone)]
310#[non_exhaustive]
311pub struct OutputParam {
312    /// Parameter name.
313    pub name: String,
314    /// Parameter value.
315    pub value: mssql_types::SqlValue,
316}
317
318impl ExecuteResult {
319    /// Create a new execute result.
320    pub fn new(rows_affected: u64) -> Self {
321        Self {
322            rows_affected,
323            output_params: Vec::new(),
324        }
325    }
326
327    /// Create a result with output parameters.
328    pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
329        Self {
330            rows_affected,
331            output_params,
332        }
333    }
334
335    /// Get an output parameter by name.
336    #[must_use]
337    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
338        self.output_params
339            .iter()
340            .find(|p| p.name.eq_ignore_ascii_case(name))
341    }
342}
343
344/// Result of a stored procedure execution.
345///
346/// Contains the return value, affected row count, output parameters,
347/// and any result sets produced by the procedure.
348///
349/// # Example
350///
351/// ```rust,no_run
352/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
353/// let result = client.call_procedure("dbo.GetUser", &[&1i32]).await?;
354///
355/// // Check the return value (RETURN statement in the proc)
356/// assert_eq!(result.return_value, 0);
357///
358/// // Process result sets
359/// for mut rs in result.result_sets {
360///     while let Some(row) = rs.next_row() {
361///         let row = row?;
362///         println!("{:?}", row);
363///     }
364/// }
365/// # Ok(())
366/// # }
367/// ```
368#[derive(Debug, Clone)]
369#[non_exhaustive]
370#[must_use]
371pub struct ProcedureResult {
372    /// Return value from the stored procedure's RETURN statement.
373    ///
374    /// Defaults to 0 if the procedure does not explicitly return a value,
375    /// which matches SQL Server's default behavior.
376    pub return_value: i32,
377    /// Total number of rows affected by statements within the procedure.
378    pub rows_affected: u64,
379    /// Output parameters returned by the procedure.
380    pub output_params: Vec<OutputParam>,
381    /// Result sets produced by SELECT statements within the procedure.
382    pub result_sets: Vec<ResultSet>,
383}
384
385impl ProcedureResult {
386    /// Create a new empty procedure result.
387    pub(crate) fn new() -> Self {
388        Self {
389            return_value: 0,
390            rows_affected: 0,
391            output_params: Vec::new(),
392            result_sets: Vec::new(),
393        }
394    }
395
396    /// Get the return value from the stored procedure.
397    ///
398    /// This is the value from the procedure's `RETURN` statement.
399    /// Defaults to 0 if not explicitly set by the procedure.
400    #[must_use]
401    pub fn get_return_value(&self) -> i32 {
402        self.return_value
403    }
404
405    /// Get an output parameter by name (case-insensitive).
406    ///
407    /// Strips the `@` prefix from both the search name and stored names
408    /// before comparing, so `get_output("result")` and `get_output("@result")`
409    /// are equivalent.
410    ///
411    /// # Example
412    ///
413    /// ```rust,no_run
414    /// # use mssql_client::SqlValue;
415    /// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
416    /// let result = client.procedure("dbo.CalculateSum")?
417    ///     .input("@a", &10i32)
418    ///     .input("@b", &20i32)
419    ///     .output_int("@result")
420    ///     .execute().await?;
421    ///
422    /// let output = result.get_output("@result").expect("output param exists");
423    /// assert_eq!(output.value, SqlValue::Int(30));
424    /// # Ok(())
425    /// # }
426    /// ```
427    #[must_use]
428    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
429        let search = name.strip_prefix('@').unwrap_or(name);
430        self.output_params.iter().find(|p| {
431            let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
432            stored.eq_ignore_ascii_case(search)
433        })
434    }
435
436    /// Get the first result set, if any.
437    ///
438    /// Convenience method for procedures that return a single result set.
439    #[must_use]
440    pub fn first_result_set(&self) -> Option<&ResultSet> {
441        self.result_sets.first()
442    }
443
444    /// Check if the procedure produced any result sets.
445    #[must_use]
446    pub fn has_result_sets(&self) -> bool {
447        !self.result_sets.is_empty()
448    }
449}
450
451/// A single result set within a multi-result batch.
452///
453/// Rows are stored as `PendingRow` values that may be either already-decoded
454/// [`Row`]s (eager path, used by tests and direct construction) or raw TDS
455/// bytes (lazy path, used by [`crate::Client::call_procedure`] and
456/// [`crate::Client::query_multiple`]). Decoding happens on pull, so per-row
457/// decode errors surface through [`ResultSet::next_row`] and
458/// [`ResultSet::collect_all`].
459#[derive(Debug, Clone)]
460#[must_use]
461pub struct ResultSet {
462    /// Column metadata for this result set.
463    columns: Vec<Column>,
464    /// Pending rows — either pre-parsed or raw TDS bytes awaiting decode.
465    pending_rows: VecDeque<PendingRow>,
466    /// Protocol metadata required to decode raw rows. `None` when every
467    /// pending row is already [`PendingRow::Parsed`] (eager path).
468    meta: Option<ColMetaData>,
469    /// Pre-resolved column decryptor for Always Encrypted result sets.
470    ///
471    /// Wrapped in `Arc` so cloning a [`ResultSet`] stays cheap (clones share
472    /// the underlying decryptor state instead of duplicating derived keys).
473    #[cfg(feature = "always-encrypted")]
474    decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
475}
476
477impl ResultSet {
478    /// Create a new result set from already-decoded rows.
479    ///
480    /// This is the eager constructor used by tests and callers that already
481    /// hold typed [`Row`]s. Production query paths use `ResultSet::from_raw`
482    /// (private) to defer row decoding.
483    pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
484        Self {
485            columns,
486            pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
487            meta: None,
488            #[cfg(feature = "always-encrypted")]
489            decryptor: None,
490        }
491    }
492
493    /// Create a result set from raw row bytes and protocol metadata.
494    ///
495    /// Rows are decoded on demand as the caller pulls them via
496    /// [`ResultSet::next_row`] or [`ResultSet::collect_all`]. The `meta` must
497    /// describe every row in `pending`. If decryption is configured,
498    /// `decryptor` must cover the same column set.
499    pub(crate) fn from_raw(
500        columns: Vec<Column>,
501        pending: Vec<PendingRow>,
502        meta: ColMetaData,
503        #[cfg(feature = "always-encrypted")] decryptor: Option<
504            std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
505        >,
506    ) -> Self {
507        Self {
508            columns,
509            pending_rows: pending.into(),
510            meta: Some(meta),
511            #[cfg(feature = "always-encrypted")]
512            decryptor,
513        }
514    }
515
516    /// Get the column metadata.
517    #[must_use]
518    pub fn columns(&self) -> &[Column] {
519        &self.columns
520    }
521
522    /// Get the number of rows remaining.
523    #[must_use]
524    pub fn rows_remaining(&self) -> usize {
525        self.pending_rows.len()
526    }
527
528    /// Get the next row from this result set.
529    ///
530    /// Returns `None` when no more rows remain, or `Some(Err(_))` when the
531    /// next pending row fails to decode. The stream is not short-circuited
532    /// on decode error — the caller may continue to pull subsequent rows.
533    pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
534        self.pending_rows.pop_front().map(|p| self.decode(p))
535    }
536
537    /// Check if this result set is empty.
538    #[must_use]
539    pub fn is_empty(&self) -> bool {
540        self.pending_rows.is_empty()
541    }
542
543    /// Collect all remaining rows into a vector.
544    ///
545    /// Stops at the first decode error and returns it.
546    pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
547        let mut out = Vec::with_capacity(self.pending_rows.len());
548        while let Some(pending) = self.pending_rows.pop_front() {
549            out.push(self.decode(pending)?);
550        }
551        Ok(out)
552    }
553
554    /// Decode a pending row into a typed [`Row`].
555    fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
556        match pending {
557            PendingRow::Parsed(row) => Ok(row),
558            PendingRow::Raw(raw) => {
559                let meta = self
560                    .meta
561                    .as_ref()
562                    .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
563                #[cfg(feature = "always-encrypted")]
564                if let Some(ref dec) = self.decryptor {
565                    return crate::column_parser::convert_raw_row_decrypted(
566                        &raw,
567                        meta,
568                        &self.columns,
569                        dec,
570                    );
571                }
572                crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
573            }
574            PendingRow::Nbc(nbc) => {
575                let meta = self
576                    .meta
577                    .as_ref()
578                    .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
579                #[cfg(feature = "always-encrypted")]
580                if let Some(ref dec) = self.decryptor {
581                    return crate::column_parser::convert_nbc_row_decrypted(
582                        &nbc,
583                        meta,
584                        &self.columns,
585                        dec,
586                    );
587                }
588                crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
589            }
590        }
591    }
592
593    /// Consume this result set and produce a [`QueryStream`] that carries the
594    /// same pending rows and decode state.
595    ///
596    /// Used by [`MultiResultStream::into_query_streams`] — avoids eagerly
597    /// materializing rows when the caller wants stream-level ergonomics.
598    fn into_query_stream<'a>(self) -> QueryStream<'a> {
599        QueryStream {
600            columns: self.columns,
601            rows: self.pending_rows,
602            meta: self.meta,
603            #[cfg(feature = "always-encrypted")]
604            decryptor: self.decryptor,
605            finished: false,
606            _marker: std::marker::PhantomData,
607        }
608    }
609}
610
611/// Multiple result sets from a batch or stored procedure.
612///
613/// Some queries return multiple result sets (e.g., stored procedures
614/// with multiple SELECT statements, or batches with multiple queries).
615///
616/// # Example
617///
618/// ```rust,no_run
619/// # async fn ex(client: &mut mssql_client::Client<mssql_client::Ready>) -> Result<(), mssql_client::Error> {
620/// // Execute a batch with multiple SELECT statements
621/// let mut results = client.query_multiple("SELECT 1 AS a; SELECT 2 AS b, 3 AS c;", &[]).await?;
622///
623/// // Process first result set
624/// while let Some(row) = results.next_row().await? {
625///     println!("Result 1: {:?}", row);
626/// }
627///
628/// // Move to second result set
629/// if results.next_result().await? {
630///     while let Some(row) = results.next_row().await? {
631///         println!("Result 2: {:?}", row);
632///     }
633/// }
634/// # Ok(())
635/// # }
636/// ```
637#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
638pub struct MultiResultStream<'a> {
639    /// All result sets from the batch.
640    result_sets: Vec<ResultSet>,
641    /// Current result set index (0-based).
642    current_result: usize,
643    /// Lifetime tied to the connection.
644    _marker: std::marker::PhantomData<&'a ()>,
645}
646
647impl<'a> MultiResultStream<'a> {
648    /// Create a new multi-result stream from parsed result sets.
649    pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
650        Self {
651            result_sets,
652            current_result: 0,
653            _marker: std::marker::PhantomData,
654        }
655    }
656
657    /// Create an empty multi-result stream.
658    #[allow(dead_code)]
659    pub(crate) fn empty() -> Self {
660        Self {
661            result_sets: Vec::new(),
662            current_result: 0,
663            _marker: std::marker::PhantomData,
664        }
665    }
666
667    /// Get the current result set index (0-based).
668    #[must_use]
669    pub fn current_result_index(&self) -> usize {
670        self.current_result
671    }
672
673    /// Get the total number of result sets.
674    #[must_use]
675    pub fn result_count(&self) -> usize {
676        self.result_sets.len()
677    }
678
679    /// Check if there are more result sets after the current one.
680    #[must_use]
681    pub fn has_more_results(&self) -> bool {
682        self.current_result + 1 < self.result_sets.len()
683    }
684
685    /// Get the column metadata for the current result set.
686    ///
687    /// Returns `None` if there are no result sets or we've moved past all of them.
688    #[must_use]
689    pub fn columns(&self) -> Option<&[Column]> {
690        self.result_sets
691            .get(self.current_result)
692            .map(|rs| rs.columns())
693    }
694
695    /// Move to the next result set.
696    ///
697    /// Returns `true` if there is another result set, `false` if no more.
698    pub async fn next_result(&mut self) -> Result<bool, Error> {
699        if self.current_result + 1 < self.result_sets.len() {
700            self.current_result += 1;
701            Ok(true)
702        } else {
703            Ok(false)
704        }
705    }
706
707    /// Get the next row from the current result set.
708    ///
709    /// Returns `None` when no more rows in the current result set.
710    /// Call `next_result()` to move to the next result set.
711    ///
712    /// Per-row decode errors (from lazy row decoding) surface here as
713    /// `Err(_)`. Pre-2.9 this reader decoded rows eagerly and decode errors
714    /// surfaced at `query_multiple().await?` instead.
715    pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
716        if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
717            result_set.next_row().transpose()
718        } else {
719            Ok(None)
720        }
721    }
722
723    /// Get a mutable reference to the current result set.
724    #[must_use]
725    pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
726        self.result_sets.get_mut(self.current_result)
727    }
728
729    /// Collect all rows from the current result set.
730    ///
731    /// Returns `Ok(vec![])` if the current result index is out of range
732    /// (e.g., all result sets have been consumed). Propagates decode errors
733    /// from the underlying lazy row parser.
734    pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
735        match self.result_sets.get_mut(self.current_result) {
736            Some(rs) => rs.collect_all(),
737            None => Ok(Vec::new()),
738        }
739    }
740
741    /// Consume the stream and return all result sets as `QueryStream`s.
742    pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
743        self.result_sets
744            .into_iter()
745            .map(ResultSet::into_query_stream)
746            .collect()
747    }
748}
749
750#[cfg(test)]
751#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
752mod tests {
753    use super::*;
754
755    #[test]
756    fn test_execute_result() {
757        let result = ExecuteResult::new(42);
758        assert_eq!(result.rows_affected, 42);
759        assert!(result.output_params.is_empty());
760    }
761
762    #[test]
763    fn test_procedure_result_defaults() {
764        let result = ProcedureResult::new();
765        assert_eq!(result.return_value, 0);
766        assert_eq!(result.rows_affected, 0);
767        assert!(result.output_params.is_empty());
768        assert!(result.result_sets.is_empty());
769        assert!(!result.has_result_sets());
770        assert!(result.first_result_set().is_none());
771    }
772
773    #[test]
774    fn test_procedure_result_get_output() {
775        let mut result = ProcedureResult::new();
776        result.output_params.push(OutputParam {
777            name: "@Total".to_string(),
778            value: mssql_types::SqlValue::Int(42),
779        });
780        result.output_params.push(OutputParam {
781            name: "@Message".to_string(),
782            value: mssql_types::SqlValue::String("ok".to_string()),
783        });
784
785        // Exact match (case-insensitive)
786        assert!(result.get_output("@Total").is_some());
787        assert!(result.get_output("@total").is_some());
788        assert!(result.get_output("@TOTAL").is_some());
789
790        // @ prefix stripping
791        assert!(result.get_output("Total").is_some());
792        assert!(result.get_output("total").is_some());
793
794        // Non-existent
795        assert!(result.get_output("@NotHere").is_none());
796        assert!(result.get_output("NotHere").is_none());
797    }
798
799    #[test]
800    fn test_procedure_result_with_result_sets() {
801        use mssql_types::SqlValue;
802
803        let columns = vec![Column {
804            name: "id".to_string(),
805            index: 0,
806            type_name: "INT".to_string(),
807            nullable: false,
808            max_length: Some(4),
809            precision: None,
810            scale: None,
811            collation: None,
812        }];
813        let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
814        let rs = ResultSet::new(columns, rows);
815
816        let mut result = ProcedureResult::new();
817        result.result_sets.push(rs);
818        result.return_value = 7;
819        result.rows_affected = 5;
820
821        assert!(result.has_result_sets());
822        assert_eq!(result.get_return_value(), 7);
823        assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
824    }
825
826    #[test]
827    fn test_execute_result_with_outputs() {
828        let outputs = vec![OutputParam {
829            name: "ReturnValue".to_string(),
830            value: mssql_types::SqlValue::Int(100),
831        }];
832
833        let result = ExecuteResult::with_outputs(10, outputs);
834        assert_eq!(result.rows_affected, 10);
835        assert!(result.get_output("ReturnValue").is_some());
836        assert!(result.get_output("returnvalue").is_some()); // case-insensitive
837        assert!(result.get_output("NotFound").is_none());
838    }
839
840    #[test]
841    fn test_query_stream_columns() {
842        let columns = vec![Column {
843            name: "id".to_string(),
844            index: 0,
845            type_name: "INT".to_string(),
846            nullable: false,
847            max_length: Some(4),
848            precision: Some(0),
849            scale: Some(0),
850            collation: None,
851        }];
852
853        let stream = QueryStream::new(columns, Vec::new());
854        assert_eq!(stream.columns().len(), 1);
855        assert_eq!(stream.columns()[0].name, "id");
856        assert!(!stream.is_finished());
857    }
858
859    #[test]
860    fn test_query_stream_with_rows() {
861        use mssql_types::SqlValue;
862
863        let columns = vec![
864            Column {
865                name: "id".to_string(),
866                index: 0,
867                type_name: "INT".to_string(),
868                nullable: false,
869                max_length: Some(4),
870                precision: None,
871                scale: None,
872                collation: None,
873            },
874            Column {
875                name: "name".to_string(),
876                index: 1,
877                type_name: "NVARCHAR".to_string(),
878                nullable: true,
879                max_length: Some(100),
880                precision: None,
881                scale: None,
882                collation: None,
883            },
884        ];
885
886        let rows = vec![
887            Row::from_values(
888                columns.clone(),
889                vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
890            ),
891            Row::from_values(
892                columns.clone(),
893                vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
894            ),
895        ];
896
897        let mut stream = QueryStream::new(columns, rows);
898        assert_eq!(stream.columns().len(), 2);
899        assert_eq!(stream.rows_remaining(), 2);
900        assert!(!stream.is_finished());
901
902        // First row
903        let row1 = stream.try_next().unwrap();
904        assert_eq!(row1.get::<i32>(0).unwrap(), 1);
905        assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
906
907        // Second row
908        let row2 = stream.try_next().unwrap();
909        assert_eq!(row2.get::<i32>(0).unwrap(), 2);
910        assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
911
912        // No more rows
913        assert!(stream.try_next().is_none());
914        assert!(stream.is_finished());
915    }
916
917    #[test]
918    fn test_query_stream_iterator() {
919        use mssql_types::SqlValue;
920
921        let columns = vec![Column {
922            name: "val".to_string(),
923            index: 0,
924            type_name: "INT".to_string(),
925            nullable: false,
926            max_length: None,
927            precision: None,
928            scale: None,
929            collation: None,
930        }];
931
932        let rows = vec![
933            Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
934            Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
935            Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
936        ];
937
938        let mut stream = QueryStream::new(columns, rows);
939
940        // Use iterator — unwrap each Result so test failures are visible
941        // (QueryStream's Iterator impl always yields Ok, but we should
942        // not silently swallow errors if that ever changes)
943        let values: Vec<i32> = stream
944            .by_ref()
945            .map(|r| r.unwrap().get::<i32>(0).unwrap())
946            .collect();
947
948        assert_eq!(values, vec![10, 20, 30]);
949        assert!(stream.is_finished());
950    }
951
952    #[test]
953    fn test_query_stream_empty() {
954        let stream = QueryStream::empty();
955        assert!(stream.columns().is_empty());
956        assert_eq!(stream.rows_remaining(), 0);
957        assert!(stream.is_finished());
958    }
959
960    /// Exercises the lazy-decode path: rows are stored as raw TDS bytes and
961    /// decoded only when the caller pulls them. Mirrors what
962    /// `read_query_response` now produces and pins the contract between
963    /// `PendingRow::Raw` and the per-row decode in `poll_next`/`next`.
964    #[test]
965    fn test_query_stream_lazy_raw_row_decoding() {
966        use bytes::Bytes;
967        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
968        use tds_protocol::types::TypeId;
969
970        // Build raw row bytes for two columns: IntN(42) + IntN(NULL).
971        let mut data = Vec::new();
972        data.push(4); // IntN length prefix — 4 bytes
973        data.extend_from_slice(&42i32.to_le_bytes());
974        data.push(0); // IntN NULL (zero-length)
975
976        let meta = ColMetaData {
977            columns: vec![
978                ColumnData {
979                    name: "a".to_string(),
980                    type_id: TypeId::IntN,
981                    col_type: 0x26,
982                    flags: 0x00,
983                    user_type: 0,
984                    type_info: TypeInfo {
985                        max_length: Some(4),
986                        precision: None,
987                        scale: None,
988                        collation: None,
989                    },
990                    crypto_metadata: None,
991                },
992                ColumnData {
993                    name: "b".to_string(),
994                    type_id: TypeId::IntN,
995                    col_type: 0x26,
996                    flags: 0x01,
997                    user_type: 0,
998                    type_info: TypeInfo {
999                        max_length: Some(4),
1000                        precision: None,
1001                        scale: None,
1002                        collation: None,
1003                    },
1004                    crypto_metadata: None,
1005                },
1006            ],
1007            cek_table: None,
1008        };
1009
1010        let columns = vec![
1011            Column {
1012                name: "a".to_string(),
1013                index: 0,
1014                type_name: "INT".to_string(),
1015                nullable: false,
1016                max_length: Some(4),
1017                precision: None,
1018                scale: None,
1019                collation: None,
1020            },
1021            Column {
1022                name: "b".to_string(),
1023                index: 1,
1024                type_name: "INT".to_string(),
1025                nullable: true,
1026                max_length: Some(4),
1027                precision: None,
1028                scale: None,
1029                collation: None,
1030            },
1031        ];
1032
1033        let pending = vec![PendingRow::Raw(RawRow {
1034            data: Bytes::from(data),
1035        })];
1036
1037        #[cfg(feature = "always-encrypted")]
1038        let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1039        #[cfg(not(feature = "always-encrypted"))]
1040        let mut stream = QueryStream::from_raw(columns, pending, meta);
1041
1042        assert_eq!(stream.rows_remaining(), 1);
1043        let row = stream
1044            .next()
1045            .expect("one row pending")
1046            .expect("row decoded successfully");
1047        assert_eq!(row.get::<i32>(0).unwrap(), 42);
1048        assert!(row.is_null(1));
1049        assert!(stream.next().is_none());
1050        assert!(stream.is_finished());
1051    }
1052
1053    /// Decoder errors must surface per-row via `Stream`/`Iterator` without
1054    /// derailing the stream state. Truncated raw bytes trigger a decode
1055    /// error that the caller observes as `Some(Err(_))`.
1056    #[test]
1057    fn test_query_stream_lazy_decode_error_propagates() {
1058        use bytes::Bytes;
1059        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1060        use tds_protocol::types::TypeId;
1061
1062        // Declare an Int4 column but provide only 2 bytes — decode must fail.
1063        let data = vec![0x01u8, 0x02];
1064
1065        let meta = ColMetaData {
1066            columns: vec![ColumnData {
1067                name: "a".to_string(),
1068                type_id: TypeId::Int4,
1069                col_type: 0x38,
1070                flags: 0x00,
1071                user_type: 0,
1072                type_info: TypeInfo {
1073                    max_length: Some(4),
1074                    precision: None,
1075                    scale: None,
1076                    collation: None,
1077                },
1078                crypto_metadata: None,
1079            }],
1080            cek_table: None,
1081        };
1082
1083        let columns = vec![Column {
1084            name: "a".to_string(),
1085            index: 0,
1086            type_name: "INT".to_string(),
1087            nullable: false,
1088            max_length: Some(4),
1089            precision: None,
1090            scale: None,
1091            collation: None,
1092        }];
1093
1094        let pending = vec![PendingRow::Raw(RawRow {
1095            data: Bytes::from(data),
1096        })];
1097
1098        #[cfg(feature = "always-encrypted")]
1099        let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1100        #[cfg(not(feature = "always-encrypted"))]
1101        let mut stream = QueryStream::from_raw(columns, pending, meta);
1102
1103        let item = stream.next().expect("pending row present");
1104        assert!(item.is_err(), "truncated bytes must surface a decode error");
1105        assert!(stream.next().is_none());
1106    }
1107
1108    /// Helper to build a single-column IntN metadata block for the lazy
1109    /// `ResultSet` / `MultiResultStream` tests below.
1110    #[cfg(test)]
1111    fn intn_meta_and_columns(
1112        col_name: &str,
1113        nullable: bool,
1114    ) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
1115        use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1116        use tds_protocol::types::TypeId;
1117        (
1118            ColMetaData {
1119                columns: vec![ColumnData {
1120                    name: col_name.to_string(),
1121                    type_id: TypeId::IntN,
1122                    col_type: 0x26,
1123                    flags: if nullable { 0x01 } else { 0x00 },
1124                    user_type: 0,
1125                    type_info: TypeInfo {
1126                        max_length: Some(4),
1127                        precision: None,
1128                        scale: None,
1129                        collation: None,
1130                    },
1131                    crypto_metadata: None,
1132                }],
1133                cek_table: None,
1134            },
1135            vec![Column {
1136                name: col_name.to_string(),
1137                index: 0,
1138                type_name: "INT".to_string(),
1139                nullable,
1140                max_length: Some(4),
1141                precision: None,
1142                scale: None,
1143                collation: None,
1144            }],
1145        )
1146    }
1147
1148    /// Exercises the `ResultSet` lazy-decode path introduced in 2.9.
1149    /// Mirrors `test_query_stream_lazy_raw_row_decoding` but via the
1150    /// result-set API that `call_procedure` / `query_multiple` expose.
1151    #[test]
1152    fn test_result_set_lazy_raw_row_decoding() {
1153        use bytes::Bytes;
1154        use tds_protocol::token::RawRow;
1155
1156        let (meta, columns) = intn_meta_and_columns("a", false);
1157
1158        // Two rows: 7 and 11 encoded as IntN(4).
1159        let pending = vec![
1160            PendingRow::Raw(RawRow {
1161                data: {
1162                    let mut b = Vec::with_capacity(5);
1163                    b.push(4);
1164                    b.extend_from_slice(&7i32.to_le_bytes());
1165                    Bytes::from(b)
1166                },
1167            }),
1168            PendingRow::Raw(RawRow {
1169                data: {
1170                    let mut b = Vec::with_capacity(5);
1171                    b.push(4);
1172                    b.extend_from_slice(&11i32.to_le_bytes());
1173                    Bytes::from(b)
1174                },
1175            }),
1176        ];
1177
1178        #[cfg(feature = "always-encrypted")]
1179        let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1180        #[cfg(not(feature = "always-encrypted"))]
1181        let mut rs = ResultSet::from_raw(columns, pending, meta);
1182
1183        assert_eq!(rs.rows_remaining(), 2);
1184        assert!(!rs.is_empty());
1185
1186        let row1 = rs.next_row().expect("row present").expect("decodes");
1187        assert_eq!(row1.get::<i32>(0).unwrap(), 7);
1188
1189        let row2 = rs.next_row().expect("row present").expect("decodes");
1190        assert_eq!(row2.get::<i32>(0).unwrap(), 11);
1191
1192        assert!(rs.next_row().is_none());
1193        assert!(rs.is_empty());
1194    }
1195
1196    /// Decoder errors in `ResultSet::next_row` must surface per-row without
1197    /// derailing further calls. Same contract as
1198    /// `test_query_stream_lazy_decode_error_propagates`.
1199    #[test]
1200    fn test_result_set_lazy_decode_error_propagates() {
1201        use bytes::Bytes;
1202        use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1203        use tds_protocol::types::TypeId;
1204
1205        // Int4 (not IntN) with only 2 bytes → decode must fail.
1206        let meta = ColMetaData {
1207            columns: vec![ColumnData {
1208                name: "a".to_string(),
1209                type_id: TypeId::Int4,
1210                col_type: 0x38,
1211                flags: 0x00,
1212                user_type: 0,
1213                type_info: TypeInfo {
1214                    max_length: Some(4),
1215                    precision: None,
1216                    scale: None,
1217                    collation: None,
1218                },
1219                crypto_metadata: None,
1220            }],
1221            cek_table: None,
1222        };
1223        let columns = vec![Column {
1224            name: "a".to_string(),
1225            index: 0,
1226            type_name: "INT".to_string(),
1227            nullable: false,
1228            max_length: Some(4),
1229            precision: None,
1230            scale: None,
1231            collation: None,
1232        }];
1233
1234        let pending = vec![PendingRow::Raw(RawRow {
1235            data: Bytes::from(vec![0x01u8, 0x02]),
1236        })];
1237
1238        #[cfg(feature = "always-encrypted")]
1239        let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1240        #[cfg(not(feature = "always-encrypted"))]
1241        let mut rs = ResultSet::from_raw(columns, pending, meta);
1242
1243        let first = rs.next_row().expect("pending row present");
1244        assert!(
1245            first.is_err(),
1246            "truncated bytes must surface a decode error"
1247        );
1248        assert!(rs.next_row().is_none());
1249    }
1250
1251    /// `collect_all` on a lazy `ResultSet` decodes every pending row and
1252    /// propagates the first decode error. Ensures 2.9's signature change is
1253    /// exercised end-to-end.
1254    #[test]
1255    fn test_result_set_lazy_collect_all_success_and_error() {
1256        use bytes::Bytes;
1257        use tds_protocol::token::RawRow;
1258
1259        // Success: two rows decode cleanly.
1260        let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
1261        let pending_ok = vec![
1262            PendingRow::Raw(RawRow {
1263                data: {
1264                    let mut b = Vec::with_capacity(5);
1265                    b.push(4);
1266                    b.extend_from_slice(&10i32.to_le_bytes());
1267                    Bytes::from(b)
1268                },
1269            }),
1270            PendingRow::Raw(RawRow {
1271                data: {
1272                    let mut b = Vec::with_capacity(5);
1273                    b.push(4);
1274                    b.extend_from_slice(&20i32.to_le_bytes());
1275                    Bytes::from(b)
1276                },
1277            }),
1278        ];
1279
1280        #[cfg(feature = "always-encrypted")]
1281        let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
1282        #[cfg(not(feature = "always-encrypted"))]
1283        let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
1284        let rows = rs_ok.collect_all().expect("all rows decode");
1285        assert_eq!(rows.len(), 2);
1286        assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
1287        assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
1288        assert!(rs_ok.is_empty());
1289
1290        // Error: a truncated row (declared Int4 with only 2 bytes) makes
1291        // collect_all fail. collect_all short-circuits on the first Err.
1292        use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1293        use tds_protocol::types::TypeId;
1294        let meta_err = ColMetaData {
1295            columns: vec![ColumnData {
1296                name: "a".to_string(),
1297                type_id: TypeId::Int4,
1298                col_type: 0x38,
1299                flags: 0x00,
1300                user_type: 0,
1301                type_info: TypeInfo {
1302                    max_length: Some(4),
1303                    precision: None,
1304                    scale: None,
1305                    collation: None,
1306                },
1307                crypto_metadata: None,
1308            }],
1309            cek_table: None,
1310        };
1311        let cols_err = vec![Column {
1312            name: "a".to_string(),
1313            index: 0,
1314            type_name: "INT".to_string(),
1315            nullable: false,
1316            max_length: Some(4),
1317            precision: None,
1318            scale: None,
1319            collation: None,
1320        }];
1321        let pending_err = vec![PendingRow::Raw(RawRow {
1322            data: Bytes::from(vec![0x01u8, 0x02]),
1323        })];
1324
1325        #[cfg(feature = "always-encrypted")]
1326        let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
1327        #[cfg(not(feature = "always-encrypted"))]
1328        let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
1329        let err = rs_err.collect_all();
1330        assert!(err.is_err(), "collect_all must propagate decode error");
1331    }
1332
1333    /// `MultiResultStream` end-to-end lazy-decode path: two lazy `ResultSet`s
1334    /// decoded on demand as the caller walks through via `next_row` /
1335    /// `next_result`. Pins the 2.9 refactor of `read_multi_result_response`.
1336    #[tokio::test]
1337    async fn test_multi_result_stream_lazy_decode_across_result_sets() {
1338        use bytes::Bytes;
1339        use tds_protocol::token::RawRow;
1340
1341        let (meta1, cols1) = intn_meta_and_columns("a", false);
1342        let pending1 = vec![PendingRow::Raw(RawRow {
1343            data: {
1344                let mut b = Vec::with_capacity(5);
1345                b.push(4);
1346                b.extend_from_slice(&101i32.to_le_bytes());
1347                Bytes::from(b)
1348            },
1349        })];
1350        #[cfg(feature = "always-encrypted")]
1351        let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
1352        #[cfg(not(feature = "always-encrypted"))]
1353        let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
1354
1355        let (meta2, cols2) = intn_meta_and_columns("b", false);
1356        let pending2 = vec![PendingRow::Raw(RawRow {
1357            data: {
1358                let mut b = Vec::with_capacity(5);
1359                b.push(4);
1360                b.extend_from_slice(&202i32.to_le_bytes());
1361                Bytes::from(b)
1362            },
1363        })];
1364        #[cfg(feature = "always-encrypted")]
1365        let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
1366        #[cfg(not(feature = "always-encrypted"))]
1367        let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
1368
1369        let mut stream = MultiResultStream::new(vec![rs1, rs2]);
1370        assert_eq!(stream.result_count(), 2);
1371        assert_eq!(stream.current_result_index(), 0);
1372
1373        let row = stream
1374            .next_row()
1375            .await
1376            .expect("first row success")
1377            .expect("row present");
1378        assert_eq!(row.get::<i32>(0).unwrap(), 101);
1379        assert!(stream.next_row().await.expect("no more rows").is_none());
1380
1381        assert!(stream.has_more_results());
1382        assert!(stream.next_result().await.expect("advance ok"));
1383        assert_eq!(stream.current_result_index(), 1);
1384
1385        let row = stream
1386            .next_row()
1387            .await
1388            .expect("second row success")
1389            .expect("row present");
1390        assert_eq!(row.get::<i32>(0).unwrap(), 202);
1391    }
1392}