1use std::collections::VecDeque;
18use std::pin::Pin;
19use std::task::{Context, Poll};
20
21use futures_core::Stream;
22
23use crate::error::Error;
24use crate::row::{Column, Row};
25
26#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
44pub struct QueryStream<'a> {
45 columns: Vec<Column>,
47 rows: VecDeque<Row>,
49 finished: bool,
51 _marker: std::marker::PhantomData<&'a ()>,
53}
54
55impl QueryStream<'_> {
56 pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
58 Self {
59 columns,
60 rows: rows.into(),
61 finished: false,
62 _marker: std::marker::PhantomData,
63 }
64 }
65
66 #[allow(dead_code)]
68 pub(crate) fn empty() -> Self {
69 Self {
70 columns: Vec::new(),
71 rows: VecDeque::new(),
72 finished: true,
73 _marker: std::marker::PhantomData,
74 }
75 }
76
77 #[must_use]
79 pub fn columns(&self) -> &[Column] {
80 &self.columns
81 }
82
83 #[must_use]
85 pub fn is_finished(&self) -> bool {
86 self.finished
87 }
88
89 #[must_use]
91 pub fn rows_remaining(&self) -> usize {
92 self.rows.len()
93 }
94
95 pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
100 let rows: Vec<Row> = self.rows.drain(..).collect();
102 self.finished = true;
103 Ok(rows)
104 }
105
106 pub fn try_next(&mut self) -> Option<Row> {
110 if self.finished {
111 return None;
112 }
113
114 match self.rows.pop_front() {
115 Some(row) => Some(row),
116 None => {
117 self.finished = true;
118 None
119 }
120 }
121 }
122}
123
124impl Stream for QueryStream<'_> {
125 type Item = Result<Row, Error>;
126
127 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
128 let this = self.get_mut();
129
130 if this.finished {
131 return Poll::Ready(None);
132 }
133
134 match this.rows.pop_front() {
136 Some(row) => Poll::Ready(Some(Ok(row))),
137 None => {
138 this.finished = true;
139 Poll::Ready(None)
140 }
141 }
142 }
143}
144
145impl ExactSizeIterator for QueryStream<'_> {}
146
147impl Iterator for QueryStream<'_> {
148 type Item = Result<Row, Error>;
149
150 fn next(&mut self) -> Option<Self::Item> {
151 if self.finished {
152 return None;
153 }
154
155 match self.rows.pop_front() {
156 Some(row) => Some(Ok(row)),
157 None => {
158 self.finished = true;
159 None
160 }
161 }
162 }
163
164 fn size_hint(&self) -> (usize, Option<usize>) {
165 let remaining = self.rows.len();
166 (remaining, Some(remaining))
167 }
168}
169
170#[derive(Debug, Clone)]
174#[non_exhaustive]
175#[must_use]
176pub struct ExecuteResult {
177 pub rows_affected: u64,
179 pub output_params: Vec<OutputParam>,
181}
182
183#[derive(Debug, Clone)]
185#[non_exhaustive]
186pub struct OutputParam {
187 pub name: String,
189 pub value: mssql_types::SqlValue,
191}
192
193impl ExecuteResult {
194 pub fn new(rows_affected: u64) -> Self {
196 Self {
197 rows_affected,
198 output_params: Vec::new(),
199 }
200 }
201
202 pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
204 Self {
205 rows_affected,
206 output_params,
207 }
208 }
209
210 #[must_use]
212 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
213 self.output_params
214 .iter()
215 .find(|p| p.name.eq_ignore_ascii_case(name))
216 }
217}
218
219#[derive(Debug)]
221#[must_use]
222pub struct ResultSet {
223 columns: Vec<Column>,
225 rows: VecDeque<Row>,
227}
228
229impl ResultSet {
230 pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
232 Self {
233 columns,
234 rows: rows.into(),
235 }
236 }
237
238 #[must_use]
240 pub fn columns(&self) -> &[Column] {
241 &self.columns
242 }
243
244 #[must_use]
246 pub fn rows_remaining(&self) -> usize {
247 self.rows.len()
248 }
249
250 pub fn next_row(&mut self) -> Option<Row> {
252 self.rows.pop_front()
253 }
254
255 #[must_use]
257 pub fn is_empty(&self) -> bool {
258 self.rows.is_empty()
259 }
260
261 pub fn collect_all(&mut self) -> Vec<Row> {
263 self.rows.drain(..).collect()
264 }
265}
266
267#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
291pub struct MultiResultStream<'a> {
292 result_sets: Vec<ResultSet>,
294 current_result: usize,
296 _marker: std::marker::PhantomData<&'a ()>,
298}
299
300impl<'a> MultiResultStream<'a> {
301 pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
303 Self {
304 result_sets,
305 current_result: 0,
306 _marker: std::marker::PhantomData,
307 }
308 }
309
310 #[allow(dead_code)]
312 pub(crate) fn empty() -> Self {
313 Self {
314 result_sets: Vec::new(),
315 current_result: 0,
316 _marker: std::marker::PhantomData,
317 }
318 }
319
320 #[must_use]
322 pub fn current_result_index(&self) -> usize {
323 self.current_result
324 }
325
326 #[must_use]
328 pub fn result_count(&self) -> usize {
329 self.result_sets.len()
330 }
331
332 #[must_use]
334 pub fn has_more_results(&self) -> bool {
335 self.current_result + 1 < self.result_sets.len()
336 }
337
338 #[must_use]
342 pub fn columns(&self) -> Option<&[Column]> {
343 self.result_sets
344 .get(self.current_result)
345 .map(|rs| rs.columns())
346 }
347
348 pub async fn next_result(&mut self) -> Result<bool, Error> {
352 if self.current_result + 1 < self.result_sets.len() {
353 self.current_result += 1;
354 Ok(true)
355 } else {
356 Ok(false)
357 }
358 }
359
360 pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
365 if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
366 Ok(result_set.next_row())
367 } else {
368 Ok(None)
369 }
370 }
371
372 #[must_use]
374 pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
375 self.result_sets.get_mut(self.current_result)
376 }
377
378 pub fn collect_current(&mut self) -> Vec<Row> {
380 self.result_sets
381 .get_mut(self.current_result)
382 .map(|rs| rs.collect_all())
383 .unwrap_or_default()
384 }
385
386 pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
388 self.result_sets
389 .into_iter()
390 .map(|rs| QueryStream::new(rs.columns, rs.rows.into()))
391 .collect()
392 }
393}
394
395#[cfg(test)]
396#[allow(clippy::unwrap_used)]
397mod tests {
398 use super::*;
399
400 #[test]
401 fn test_execute_result() {
402 let result = ExecuteResult::new(42);
403 assert_eq!(result.rows_affected, 42);
404 assert!(result.output_params.is_empty());
405 }
406
407 #[test]
408 fn test_execute_result_with_outputs() {
409 let outputs = vec![OutputParam {
410 name: "ReturnValue".to_string(),
411 value: mssql_types::SqlValue::Int(100),
412 }];
413
414 let result = ExecuteResult::with_outputs(10, outputs);
415 assert_eq!(result.rows_affected, 10);
416 assert!(result.get_output("ReturnValue").is_some());
417 assert!(result.get_output("returnvalue").is_some()); assert!(result.get_output("NotFound").is_none());
419 }
420
421 #[test]
422 fn test_query_stream_columns() {
423 let columns = vec![Column {
424 name: "id".to_string(),
425 index: 0,
426 type_name: "INT".to_string(),
427 nullable: false,
428 max_length: Some(4),
429 precision: Some(0),
430 scale: Some(0),
431 collation: None,
432 }];
433
434 let stream = QueryStream::new(columns, Vec::new());
435 assert_eq!(stream.columns().len(), 1);
436 assert_eq!(stream.columns()[0].name, "id");
437 assert!(!stream.is_finished());
438 }
439
440 #[test]
441 fn test_query_stream_with_rows() {
442 use mssql_types::SqlValue;
443
444 let columns = vec![
445 Column {
446 name: "id".to_string(),
447 index: 0,
448 type_name: "INT".to_string(),
449 nullable: false,
450 max_length: Some(4),
451 precision: None,
452 scale: None,
453 collation: None,
454 },
455 Column {
456 name: "name".to_string(),
457 index: 1,
458 type_name: "NVARCHAR".to_string(),
459 nullable: true,
460 max_length: Some(100),
461 precision: None,
462 scale: None,
463 collation: None,
464 },
465 ];
466
467 let rows = vec![
468 Row::from_values(
469 columns.clone(),
470 vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
471 ),
472 Row::from_values(
473 columns.clone(),
474 vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
475 ),
476 ];
477
478 let mut stream = QueryStream::new(columns, rows);
479 assert_eq!(stream.columns().len(), 2);
480 assert_eq!(stream.rows_remaining(), 2);
481 assert!(!stream.is_finished());
482
483 let row1 = stream.try_next().unwrap();
485 assert_eq!(row1.get::<i32>(0).unwrap(), 1);
486 assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
487
488 let row2 = stream.try_next().unwrap();
490 assert_eq!(row2.get::<i32>(0).unwrap(), 2);
491 assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
492
493 assert!(stream.try_next().is_none());
495 assert!(stream.is_finished());
496 }
497
498 #[test]
499 fn test_query_stream_iterator() {
500 use mssql_types::SqlValue;
501
502 let columns = vec![Column {
503 name: "val".to_string(),
504 index: 0,
505 type_name: "INT".to_string(),
506 nullable: false,
507 max_length: None,
508 precision: None,
509 scale: None,
510 collation: None,
511 }];
512
513 let rows = vec![
514 Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
515 Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
516 Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
517 ];
518
519 let mut stream = QueryStream::new(columns, rows);
520
521 let values: Vec<i32> = stream
523 .by_ref()
524 .filter_map(|r| r.ok())
525 .map(|r| r.get::<i32>(0).unwrap())
526 .collect();
527
528 assert_eq!(values, vec![10, 20, 30]);
529 assert!(stream.is_finished());
530 }
531
532 #[test]
533 fn test_query_stream_empty() {
534 let stream = QueryStream::empty();
535 assert!(stream.columns().is_empty());
536 assert_eq!(stream.rows_remaining(), 0);
537 assert!(stream.is_finished());
538 }
539}