mssql_client/
stream.rs

1//! Streaming query result support.
2//!
3//! This module provides streaming result sets for memory-efficient
4//! processing of large query results.
5//!
6//! ## Buffered vs True Streaming
7//!
8//! The current implementation uses a buffered approach where all rows from
9//! the TDS response are parsed upfront. This works well because:
10//!
11//! 1. TDS responses arrive as complete messages (reassembled by mssql-codec)
12//! 2. Memory is shared via `Arc<Bytes>` pattern per ADR-004
13//! 3. No complex lifetime/borrow issues with the connection
14//!
15//! For truly large result sets, consider using OFFSET/FETCH pagination.
16
17use std::collections::VecDeque;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use futures_core::Stream;
22
23use crate::error::Error;
24use crate::row::{Column, Row};
25
26/// A streaming result set from a query.
27///
28/// This stream yields rows one at a time, allowing processing of
29/// large result sets without loading everything into memory.
30///
31/// # Example
32///
33/// ```rust,ignore
34/// use futures::StreamExt;
35///
36/// let mut stream = client.query("SELECT * FROM large_table", &[]).await?;
37///
38/// while let Some(row) = stream.next().await {
39///     let row = row?;
40///     process_row(&row);
41/// }
42/// ```
43pub struct QueryStream<'a> {
44    /// Column metadata for the result set.
45    columns: Vec<Column>,
46    /// Buffered rows from the response.
47    rows: VecDeque<Row>,
48    /// Whether the stream has completed.
49    finished: bool,
50    /// Lifetime tied to the connection.
51    _marker: std::marker::PhantomData<&'a ()>,
52}
53
54impl QueryStream<'_> {
55    /// Create a new query stream with columns and buffered rows.
56    pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
57        Self {
58            columns,
59            rows: rows.into(),
60            finished: false,
61            _marker: std::marker::PhantomData,
62        }
63    }
64
65    /// Create an empty query stream (no results).
66    #[allow(dead_code)]
67    pub(crate) fn empty() -> Self {
68        Self {
69            columns: Vec::new(),
70            rows: VecDeque::new(),
71            finished: true,
72            _marker: std::marker::PhantomData,
73        }
74    }
75
76    /// Get the column metadata for this result set.
77    #[must_use]
78    pub fn columns(&self) -> &[Column] {
79        &self.columns
80    }
81
82    /// Check if the stream has finished.
83    #[must_use]
84    pub fn is_finished(&self) -> bool {
85        self.finished
86    }
87
88    /// Get the number of rows remaining in the buffer.
89    #[must_use]
90    pub fn rows_remaining(&self) -> usize {
91        self.rows.len()
92    }
93
94    /// Collect all remaining rows into a vector.
95    ///
96    /// This consumes the stream and loads all rows into memory.
97    /// For large result sets, consider iterating with the stream instead.
98    pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
99        // Drain all remaining rows from the buffer
100        let rows: Vec<Row> = self.rows.drain(..).collect();
101        self.finished = true;
102        Ok(rows)
103    }
104
105    /// Try to get the next row synchronously (without async).
106    ///
107    /// Returns `None` when no more rows are available.
108    pub fn try_next(&mut self) -> Option<Row> {
109        if self.finished {
110            return None;
111        }
112
113        match self.rows.pop_front() {
114            Some(row) => Some(row),
115            None => {
116                self.finished = true;
117                None
118            }
119        }
120    }
121}
122
123impl Stream for QueryStream<'_> {
124    type Item = Result<Row, Error>;
125
126    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        let this = self.get_mut();
128
129        if this.finished {
130            return Poll::Ready(None);
131        }
132
133        // Pop the next row from the buffer
134        match this.rows.pop_front() {
135            Some(row) => Poll::Ready(Some(Ok(row))),
136            None => {
137                this.finished = true;
138                Poll::Ready(None)
139            }
140        }
141    }
142}
143
144impl ExactSizeIterator for QueryStream<'_> {}
145
146impl Iterator for QueryStream<'_> {
147    type Item = Result<Row, Error>;
148
149    fn next(&mut self) -> Option<Self::Item> {
150        if self.finished {
151            return None;
152        }
153
154        match self.rows.pop_front() {
155            Some(row) => Some(Ok(row)),
156            None => {
157                self.finished = true;
158                None
159            }
160        }
161    }
162
163    fn size_hint(&self) -> (usize, Option<usize>) {
164        let remaining = self.rows.len();
165        (remaining, Some(remaining))
166    }
167}
168
169/// Result of a non-query execution.
170///
171/// Contains the number of affected rows and any output parameters.
172#[derive(Debug, Clone)]
173pub struct ExecuteResult {
174    /// Number of rows affected by the statement.
175    pub rows_affected: u64,
176    /// Output parameters from stored procedures.
177    pub output_params: Vec<OutputParam>,
178}
179
180/// An output parameter from a stored procedure call.
181#[derive(Debug, Clone)]
182pub struct OutputParam {
183    /// Parameter name.
184    pub name: String,
185    /// Parameter value.
186    pub value: mssql_types::SqlValue,
187}
188
189impl ExecuteResult {
190    /// Create a new execute result.
191    pub fn new(rows_affected: u64) -> Self {
192        Self {
193            rows_affected,
194            output_params: Vec::new(),
195        }
196    }
197
198    /// Create a result with output parameters.
199    pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
200        Self {
201            rows_affected,
202            output_params,
203        }
204    }
205
206    /// Get an output parameter by name.
207    #[must_use]
208    pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
209        self.output_params
210            .iter()
211            .find(|p| p.name.eq_ignore_ascii_case(name))
212    }
213}
214
215/// A single result set within a multi-result batch.
216#[derive(Debug)]
217pub struct ResultSet {
218    /// Column metadata for this result set.
219    columns: Vec<Column>,
220    /// Rows in this result set.
221    rows: VecDeque<Row>,
222}
223
224impl ResultSet {
225    /// Create a new result set.
226    pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
227        Self {
228            columns,
229            rows: rows.into(),
230        }
231    }
232
233    /// Get the column metadata.
234    #[must_use]
235    pub fn columns(&self) -> &[Column] {
236        &self.columns
237    }
238
239    /// Get the number of rows remaining.
240    #[must_use]
241    pub fn rows_remaining(&self) -> usize {
242        self.rows.len()
243    }
244
245    /// Get the next row from this result set.
246    pub fn next_row(&mut self) -> Option<Row> {
247        self.rows.pop_front()
248    }
249
250    /// Check if this result set is empty.
251    #[must_use]
252    pub fn is_empty(&self) -> bool {
253        self.rows.is_empty()
254    }
255
256    /// Collect all remaining rows into a vector.
257    pub fn collect_all(&mut self) -> Vec<Row> {
258        self.rows.drain(..).collect()
259    }
260}
261
262/// Multiple result sets from a batch or stored procedure.
263///
264/// Some queries return multiple result sets (e.g., stored procedures
265/// with multiple SELECT statements, or batches with multiple queries).
266///
267/// # Example
268///
269/// ```rust,ignore
270/// // Execute a batch with multiple SELECTs
271/// let mut results = client.query_multiple("SELECT 1 AS a; SELECT 2 AS b, 3 AS c;", &[]).await?;
272///
273/// // Process first result set
274/// while let Some(row) = results.next_row().await? {
275///     println!("Result 1: {:?}", row);
276/// }
277///
278/// // Move to second result set
279/// if results.next_result().await? {
280///     while let Some(row) = results.next_row().await? {
281///         println!("Result 2: {:?}", row);
282///     }
283/// }
284/// ```
285pub struct MultiResultStream<'a> {
286    /// All result sets from the batch.
287    result_sets: Vec<ResultSet>,
288    /// Current result set index (0-based).
289    current_result: usize,
290    /// Lifetime tied to the connection.
291    _marker: std::marker::PhantomData<&'a ()>,
292}
293
294impl<'a> MultiResultStream<'a> {
295    /// Create a new multi-result stream from parsed result sets.
296    pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
297        Self {
298            result_sets,
299            current_result: 0,
300            _marker: std::marker::PhantomData,
301        }
302    }
303
304    /// Create an empty multi-result stream.
305    #[allow(dead_code)]
306    pub(crate) fn empty() -> Self {
307        Self {
308            result_sets: Vec::new(),
309            current_result: 0,
310            _marker: std::marker::PhantomData,
311        }
312    }
313
314    /// Get the current result set index (0-based).
315    #[must_use]
316    pub fn current_result_index(&self) -> usize {
317        self.current_result
318    }
319
320    /// Get the total number of result sets.
321    #[must_use]
322    pub fn result_count(&self) -> usize {
323        self.result_sets.len()
324    }
325
326    /// Check if there are more result sets after the current one.
327    #[must_use]
328    pub fn has_more_results(&self) -> bool {
329        self.current_result + 1 < self.result_sets.len()
330    }
331
332    /// Get the column metadata for the current result set.
333    ///
334    /// Returns `None` if there are no result sets or we've moved past all of them.
335    #[must_use]
336    pub fn columns(&self) -> Option<&[Column]> {
337        self.result_sets
338            .get(self.current_result)
339            .map(|rs| rs.columns())
340    }
341
342    /// Move to the next result set.
343    ///
344    /// Returns `true` if there is another result set, `false` if no more.
345    pub async fn next_result(&mut self) -> Result<bool, Error> {
346        if self.current_result + 1 < self.result_sets.len() {
347            self.current_result += 1;
348            Ok(true)
349        } else {
350            Ok(false)
351        }
352    }
353
354    /// Get the next row from the current result set.
355    ///
356    /// Returns `None` when no more rows in the current result set.
357    /// Call `next_result()` to move to the next result set.
358    pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
359        if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
360            Ok(result_set.next_row())
361        } else {
362            Ok(None)
363        }
364    }
365
366    /// Get a mutable reference to the current result set.
367    #[must_use]
368    pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
369        self.result_sets.get_mut(self.current_result)
370    }
371
372    /// Collect all rows from the current result set.
373    pub fn collect_current(&mut self) -> Vec<Row> {
374        self.result_sets
375            .get_mut(self.current_result)
376            .map(|rs| rs.collect_all())
377            .unwrap_or_default()
378    }
379
380    /// Consume the stream and return all result sets as `QueryStream`s.
381    pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
382        self.result_sets
383            .into_iter()
384            .map(|rs| QueryStream::new(rs.columns, rs.rows.into()))
385            .collect()
386    }
387}
388
389#[cfg(test)]
390#[allow(clippy::unwrap_used)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_execute_result() {
396        let result = ExecuteResult::new(42);
397        assert_eq!(result.rows_affected, 42);
398        assert!(result.output_params.is_empty());
399    }
400
401    #[test]
402    fn test_execute_result_with_outputs() {
403        let outputs = vec![OutputParam {
404            name: "ReturnValue".to_string(),
405            value: mssql_types::SqlValue::Int(100),
406        }];
407
408        let result = ExecuteResult::with_outputs(10, outputs);
409        assert_eq!(result.rows_affected, 10);
410        assert!(result.get_output("ReturnValue").is_some());
411        assert!(result.get_output("returnvalue").is_some()); // case-insensitive
412        assert!(result.get_output("NotFound").is_none());
413    }
414
415    #[test]
416    fn test_query_stream_columns() {
417        let columns = vec![Column {
418            name: "id".to_string(),
419            index: 0,
420            type_name: "INT".to_string(),
421            nullable: false,
422            max_length: Some(4),
423            precision: Some(0),
424            scale: Some(0),
425            collation: None,
426        }];
427
428        let stream = QueryStream::new(columns, Vec::new());
429        assert_eq!(stream.columns().len(), 1);
430        assert_eq!(stream.columns()[0].name, "id");
431        assert!(!stream.is_finished());
432    }
433
434    #[test]
435    fn test_query_stream_with_rows() {
436        use mssql_types::SqlValue;
437
438        let columns = vec![
439            Column {
440                name: "id".to_string(),
441                index: 0,
442                type_name: "INT".to_string(),
443                nullable: false,
444                max_length: Some(4),
445                precision: None,
446                scale: None,
447                collation: None,
448            },
449            Column {
450                name: "name".to_string(),
451                index: 1,
452                type_name: "NVARCHAR".to_string(),
453                nullable: true,
454                max_length: Some(100),
455                precision: None,
456                scale: None,
457                collation: None,
458            },
459        ];
460
461        let rows = vec![
462            Row::from_values(
463                columns.clone(),
464                vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
465            ),
466            Row::from_values(
467                columns.clone(),
468                vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
469            ),
470        ];
471
472        let mut stream = QueryStream::new(columns, rows);
473        assert_eq!(stream.columns().len(), 2);
474        assert_eq!(stream.rows_remaining(), 2);
475        assert!(!stream.is_finished());
476
477        // First row
478        let row1 = stream.try_next().unwrap();
479        assert_eq!(row1.get::<i32>(0).unwrap(), 1);
480        assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
481
482        // Second row
483        let row2 = stream.try_next().unwrap();
484        assert_eq!(row2.get::<i32>(0).unwrap(), 2);
485        assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
486
487        // No more rows
488        assert!(stream.try_next().is_none());
489        assert!(stream.is_finished());
490    }
491
492    #[test]
493    fn test_query_stream_iterator() {
494        use mssql_types::SqlValue;
495
496        let columns = vec![Column {
497            name: "val".to_string(),
498            index: 0,
499            type_name: "INT".to_string(),
500            nullable: false,
501            max_length: None,
502            precision: None,
503            scale: None,
504            collation: None,
505        }];
506
507        let rows = vec![
508            Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
509            Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
510            Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
511        ];
512
513        let mut stream = QueryStream::new(columns, rows);
514
515        // Use iterator
516        let values: Vec<i32> = stream
517            .by_ref()
518            .filter_map(|r| r.ok())
519            .map(|r| r.get::<i32>(0).unwrap())
520            .collect();
521
522        assert_eq!(values, vec![10, 20, 30]);
523        assert!(stream.is_finished());
524    }
525
526    #[test]
527    fn test_query_stream_empty() {
528        let stream = QueryStream::empty();
529        assert!(stream.columns().is_empty());
530        assert_eq!(stream.rows_remaining(), 0);
531        assert!(stream.is_finished());
532    }
533}