Skip to main content

mssql_client/
stream.rs

1//! Streaming query result support.
2//!
3//! This module provides streaming result sets for memory-efficient
4//! processing of large query results.
5//!
6//! ## Buffered vs True Streaming
7//!
8//! The current implementation uses a buffered approach where all rows from
9//! the TDS response are parsed upfront. This works well because:
10//!
11//! 1. TDS responses arrive as complete messages (reassembled by mssql-codec)
12//! 2. Memory is shared via `Arc<Bytes>` pattern per ADR-004
13//! 3. No complex lifetime/borrow issues with the connection
14//!
15//! For truly large result sets, consider using OFFSET/FETCH pagination.
16
17use std::collections::VecDeque;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use futures_core::Stream;
22
23use crate::error::Error;
24use crate::row::{Column, Row};
25
26/// A streaming result set from a query.
27///
28/// This stream yields rows one at a time, allowing processing of
29/// large result sets without loading everything into memory.
30///
31/// # Example
32///
33/// ```rust,ignore
34/// use futures::StreamExt;
35///
36/// let mut stream = client.query("SELECT * FROM large_table", &[]).await?;
37///
38/// while let Some(row) = stream.next().await {
39///     let row = row?;
40///     process_row(&row);
41/// }
42/// ```
43#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
44pub struct QueryStream<'a> {
45    /// Column metadata for the result set.
46    columns: Vec<Column>,
47    /// Buffered rows from the response.
48    rows: VecDeque<Row>,
49    /// Whether the stream has completed.
50    finished: bool,
51    /// Lifetime tied to the connection.
52    _marker: std::marker::PhantomData<&'a ()>,
53}
54
55impl QueryStream<'_> {
56    /// Create a new query stream with columns and buffered rows.
57    pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
58        Self {
59            columns,
60            rows: rows.into(),
61            finished: false,
62            _marker: std::marker::PhantomData,
63        }
64    }
65
66    /// Create an empty query stream (no results).
67    #[allow(dead_code)]
68    pub(crate) fn empty() -> Self {
69        Self {
70            columns: Vec::new(),
71            rows: VecDeque::new(),
72            finished: true,
73            _marker: std::marker::PhantomData,
74        }
75    }
76
77    /// Get the column metadata for this result set.
78    #[must_use]
79    pub fn columns(&self) -> &[Column] {
80        &self.columns
81    }
82
83    /// Check if the stream has finished.
84    #[must_use]
85    pub fn is_finished(&self) -> bool {
86        self.finished
87    }
88
89    /// Get the number of rows remaining in the buffer.
90    #[must_use]
91    pub fn rows_remaining(&self) -> usize {
92        self.rows.len()
93    }
94
95    /// Collect all remaining rows into a vector.
96    ///
97    /// This consumes the stream and loads all rows into memory.
98    /// For large result sets, consider iterating with the stream instead.
99    pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
100        // Drain all remaining rows from the buffer
101        let rows: Vec<Row> = self.rows.drain(..).collect();
102        self.finished = true;
103        Ok(rows)
104    }
105
106    /// Try to get the next row synchronously (without async).
107    ///
108    /// Returns `None` when no more rows are available.
109    pub fn try_next(&mut self) -> Option<Row> {
110        if self.finished {
111            return None;
112        }
113
114        match self.rows.pop_front() {
115            Some(row) => Some(row),
116            None => {
117                self.finished = true;
118                None
119            }
120        }
121    }
122}
123
124impl Stream for QueryStream<'_> {
125    type Item = Result<Row, Error>;
126
127    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128        let this = self.get_mut();
129
130        if this.finished {
131            return Poll::Ready(None);
132        }
133
134        // Pop the next row from the buffer
135        match this.rows.pop_front() {
136            Some(row) => Poll::Ready(Some(Ok(row))),
137            None => {
138                this.finished = true;
139                Poll::Ready(None)
140            }
141        }
142    }
143}
144
145impl ExactSizeIterator for QueryStream<'_> {}
146
147impl Iterator for QueryStream<'_> {
148    type Item = Result<Row, Error>;
149
150    fn next(&mut self) -> Option<Self::Item> {
151        if self.finished {
152            return None;
153        }
154
155        match self.rows.pop_front() {
156            Some(row) => Some(Ok(row)),
157            None => {
158                self.finished = true;
159                None
160            }
161        }
162    }
163
164    fn size_hint(&self) -> (usize, Option<usize>) {
165        let remaining = self.rows.len();
166        (remaining, Some(remaining))
167    }
168}
169
170/// Result of a non-query execution.
171///
172/// Contains the number of affected rows and any output parameters.
173#[derive(Debug, Clone)]
174#[non_exhaustive]
175#[must_use]
176pub struct ExecuteResult {
177    /// Number of rows affected by the statement.
178    pub rows_affected: u64,
179    /// Output parameters from stored procedures.
180    pub output_params: Vec<OutputParam>,
181}
182
183/// An output parameter from a stored procedure call.
184#[derive(Debug, Clone)]
185#[non_exhaustive]
186pub struct OutputParam {
187    /// Parameter name.
188    pub name: String,
189    /// Parameter value.
190    pub value: mssql_types::SqlValue,
191}
192
193impl ExecuteResult {
194    /// Create a new execute result.
195    pub fn new(rows_affected: u64) -> Self {
196        Self {
197            rows_affected,
198            output_params: Vec::new(),
199        }
200    }
201
202    /// Create a result with output parameters.
203    pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
204        Self {
205            rows_affected,
206            output_params,
207        }
208    }
209
210    /// Get an output parameter by name.
211    #[must_use]
212    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
213        self.output_params
214            .iter()
215            .find(|p| p.name.eq_ignore_ascii_case(name))
216    }
217}
218
219/// A single result set within a multi-result batch.
220#[derive(Debug)]
221#[must_use]
222pub struct ResultSet {
223    /// Column metadata for this result set.
224    columns: Vec<Column>,
225    /// Rows in this result set.
226    rows: VecDeque<Row>,
227}
228
229impl ResultSet {
230    /// Create a new result set.
231    pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
232        Self {
233            columns,
234            rows: rows.into(),
235        }
236    }
237
238    /// Get the column metadata.
239    #[must_use]
240    pub fn columns(&self) -> &[Column] {
241        &self.columns
242    }
243
244    /// Get the number of rows remaining.
245    #[must_use]
246    pub fn rows_remaining(&self) -> usize {
247        self.rows.len()
248    }
249
250    /// Get the next row from this result set.
251    pub fn next_row(&mut self) -> Option<Row> {
252        self.rows.pop_front()
253    }
254
255    /// Check if this result set is empty.
256    #[must_use]
257    pub fn is_empty(&self) -> bool {
258        self.rows.is_empty()
259    }
260
261    /// Collect all remaining rows into a vector.
262    pub fn collect_all(&mut self) -> Vec<Row> {
263        self.rows.drain(..).collect()
264    }
265}
266
267/// Multiple result sets from a batch or stored procedure.
268///
269/// Some queries return multiple result sets (e.g., stored procedures
270/// with multiple SELECT statements, or batches with multiple queries).
271///
272/// # Example
273///
274/// ```rust,ignore
275/// // Execute a batch with multiple SELECTs
276/// let mut results = client.query_multiple("SELECT 1 AS a; SELECT 2 AS b, 3 AS c;", &[]).await?;
277///
278/// // Process first result set
279/// while let Some(row) = results.next_row().await? {
280///     println!("Result 1: {:?}", row);
281/// }
282///
283/// // Move to second result set
284/// if results.next_result().await? {
285///     while let Some(row) = results.next_row().await? {
286///         println!("Result 2: {:?}", row);
287///     }
288/// }
289/// ```
290#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
291pub struct MultiResultStream<'a> {
292    /// All result sets from the batch.
293    result_sets: Vec<ResultSet>,
294    /// Current result set index (0-based).
295    current_result: usize,
296    /// Lifetime tied to the connection.
297    _marker: std::marker::PhantomData<&'a ()>,
298}
299
300impl<'a> MultiResultStream<'a> {
301    /// Create a new multi-result stream from parsed result sets.
302    pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
303        Self {
304            result_sets,
305            current_result: 0,
306            _marker: std::marker::PhantomData,
307        }
308    }
309
310    /// Create an empty multi-result stream.
311    #[allow(dead_code)]
312    pub(crate) fn empty() -> Self {
313        Self {
314            result_sets: Vec::new(),
315            current_result: 0,
316            _marker: std::marker::PhantomData,
317        }
318    }
319
320    /// Get the current result set index (0-based).
321    #[must_use]
322    pub fn current_result_index(&self) -> usize {
323        self.current_result
324    }
325
326    /// Get the total number of result sets.
327    #[must_use]
328    pub fn result_count(&self) -> usize {
329        self.result_sets.len()
330    }
331
332    /// Check if there are more result sets after the current one.
333    #[must_use]
334    pub fn has_more_results(&self) -> bool {
335        self.current_result + 1 < self.result_sets.len()
336    }
337
338    /// Get the column metadata for the current result set.
339    ///
340    /// Returns `None` if there are no result sets or we've moved past all of them.
341    #[must_use]
342    pub fn columns(&self) -> Option<&[Column]> {
343        self.result_sets
344            .get(self.current_result)
345            .map(|rs| rs.columns())
346    }
347
348    /// Move to the next result set.
349    ///
350    /// Returns `true` if there is another result set, `false` if no more.
351    pub async fn next_result(&mut self) -> Result<bool, Error> {
352        if self.current_result + 1 < self.result_sets.len() {
353            self.current_result += 1;
354            Ok(true)
355        } else {
356            Ok(false)
357        }
358    }
359
360    /// Get the next row from the current result set.
361    ///
362    /// Returns `None` when no more rows in the current result set.
363    /// Call `next_result()` to move to the next result set.
364    pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
365        if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
366            Ok(result_set.next_row())
367        } else {
368            Ok(None)
369        }
370    }
371
372    /// Get a mutable reference to the current result set.
373    #[must_use]
374    pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
375        self.result_sets.get_mut(self.current_result)
376    }
377
378    /// Collect all rows from the current result set.
379    pub fn collect_current(&mut self) -> Vec<Row> {
380        self.result_sets
381            .get_mut(self.current_result)
382            .map(|rs| rs.collect_all())
383            .unwrap_or_default()
384    }
385
386    /// Consume the stream and return all result sets as `QueryStream`s.
387    pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
388        self.result_sets
389            .into_iter()
390            .map(|rs| QueryStream::new(rs.columns, rs.rows.into()))
391            .collect()
392    }
393}
394
395#[cfg(test)]
396#[allow(clippy::unwrap_used)]
397mod tests {
398    use super::*;
399
400    #[test]
401    fn test_execute_result() {
402        let result = ExecuteResult::new(42);
403        assert_eq!(result.rows_affected, 42);
404        assert!(result.output_params.is_empty());
405    }
406
407    #[test]
408    fn test_execute_result_with_outputs() {
409        let outputs = vec![OutputParam {
410            name: "ReturnValue".to_string(),
411            value: mssql_types::SqlValue::Int(100),
412        }];
413
414        let result = ExecuteResult::with_outputs(10, outputs);
415        assert_eq!(result.rows_affected, 10);
416        assert!(result.get_output("ReturnValue").is_some());
417        assert!(result.get_output("returnvalue").is_some()); // case-insensitive
418        assert!(result.get_output("NotFound").is_none());
419    }
420
421    #[test]
422    fn test_query_stream_columns() {
423        let columns = vec![Column {
424            name: "id".to_string(),
425            index: 0,
426            type_name: "INT".to_string(),
427            nullable: false,
428            max_length: Some(4),
429            precision: Some(0),
430            scale: Some(0),
431            collation: None,
432        }];
433
434        let stream = QueryStream::new(columns, Vec::new());
435        assert_eq!(stream.columns().len(), 1);
436        assert_eq!(stream.columns()[0].name, "id");
437        assert!(!stream.is_finished());
438    }
439
440    #[test]
441    fn test_query_stream_with_rows() {
442        use mssql_types::SqlValue;
443
444        let columns = vec![
445            Column {
446                name: "id".to_string(),
447                index: 0,
448                type_name: "INT".to_string(),
449                nullable: false,
450                max_length: Some(4),
451                precision: None,
452                scale: None,
453                collation: None,
454            },
455            Column {
456                name: "name".to_string(),
457                index: 1,
458                type_name: "NVARCHAR".to_string(),
459                nullable: true,
460                max_length: Some(100),
461                precision: None,
462                scale: None,
463                collation: None,
464            },
465        ];
466
467        let rows = vec![
468            Row::from_values(
469                columns.clone(),
470                vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
471            ),
472            Row::from_values(
473                columns.clone(),
474                vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
475            ),
476        ];
477
478        let mut stream = QueryStream::new(columns, rows);
479        assert_eq!(stream.columns().len(), 2);
480        assert_eq!(stream.rows_remaining(), 2);
481        assert!(!stream.is_finished());
482
483        // First row
484        let row1 = stream.try_next().unwrap();
485        assert_eq!(row1.get::<i32>(0).unwrap(), 1);
486        assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
487
488        // Second row
489        let row2 = stream.try_next().unwrap();
490        assert_eq!(row2.get::<i32>(0).unwrap(), 2);
491        assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
492
493        // No more rows
494        assert!(stream.try_next().is_none());
495        assert!(stream.is_finished());
496    }
497
498    #[test]
499    fn test_query_stream_iterator() {
500        use mssql_types::SqlValue;
501
502        let columns = vec![Column {
503            name: "val".to_string(),
504            index: 0,
505            type_name: "INT".to_string(),
506            nullable: false,
507            max_length: None,
508            precision: None,
509            scale: None,
510            collation: None,
511        }];
512
513        let rows = vec![
514            Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
515            Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
516            Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
517        ];
518
519        let mut stream = QueryStream::new(columns, rows);
520
521        // Use iterator
522        let values: Vec<i32> = stream
523            .by_ref()
524            .filter_map(|r| r.ok())
525            .map(|r| r.get::<i32>(0).unwrap())
526            .collect();
527
528        assert_eq!(values, vec![10, 20, 30]);
529        assert!(stream.is_finished());
530    }
531
532    #[test]
533    fn test_query_stream_empty() {
534        let stream = QueryStream::empty();
535        assert!(stream.columns().is_empty());
536        assert_eq!(stream.rows_remaining(), 0);
537        assert!(stream.is_finished());
538    }
539}