1use std::collections::VecDeque;
34use std::pin::Pin;
35use std::task::{Context, Poll};
36
37use futures_core::Stream;
38use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
39
40use crate::error::Error;
41use crate::row::{Column, Row};
42
43#[derive(Debug, Clone)]
50pub(crate) enum PendingRow {
51 Parsed(Row),
53 Raw(RawRow),
55 Nbc(NbcRow),
57}
58
59#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
83pub struct QueryStream<'a> {
84 columns: Vec<Column>,
86 rows: VecDeque<PendingRow>,
88 meta: Option<ColMetaData>,
91 #[cfg(feature = "always-encrypted")]
96 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
97 finished: bool,
99 _marker: std::marker::PhantomData<&'a ()>,
101}
102
103impl QueryStream<'_> {
104 #[cfg(test)]
111 pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
112 Self {
113 columns,
114 rows: rows.into_iter().map(PendingRow::Parsed).collect(),
115 meta: None,
116 #[cfg(feature = "always-encrypted")]
117 decryptor: None,
118 finished: false,
119 _marker: std::marker::PhantomData,
120 }
121 }
122
123 pub(crate) fn from_raw(
129 columns: Vec<Column>,
130 pending: Vec<PendingRow>,
131 meta: ColMetaData,
132 #[cfg(feature = "always-encrypted")] decryptor: Option<
133 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
134 >,
135 ) -> Self {
136 Self {
137 columns,
138 rows: pending.into(),
139 meta: Some(meta),
140 #[cfg(feature = "always-encrypted")]
141 decryptor,
142 finished: false,
143 _marker: std::marker::PhantomData,
144 }
145 }
146
147 #[allow(dead_code)]
149 pub(crate) fn empty() -> Self {
150 Self {
151 columns: Vec::new(),
152 rows: VecDeque::new(),
153 meta: None,
154 #[cfg(feature = "always-encrypted")]
155 decryptor: None,
156 finished: true,
157 _marker: std::marker::PhantomData,
158 }
159 }
160
161 #[must_use]
163 pub fn columns(&self) -> &[Column] {
164 &self.columns
165 }
166
167 #[must_use]
169 pub fn is_finished(&self) -> bool {
170 self.finished
171 }
172
173 #[must_use]
175 pub fn rows_remaining(&self) -> usize {
176 self.rows.len()
177 }
178
179 pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
189 let mut out = Vec::with_capacity(self.rows.len());
190 while let Some(pending) = self.rows.pop_front() {
191 out.push(self.decode(pending)?);
192 }
193 self.finished = true;
194 Ok(out)
195 }
196
197 pub fn try_next(&mut self) -> Option<Row> {
203 self.next().and_then(|r| r.ok())
204 }
205
206 fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
208 match pending {
209 PendingRow::Parsed(row) => Ok(row),
210 PendingRow::Raw(raw) => {
211 let meta = self
212 .meta
213 .as_ref()
214 .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
215 #[cfg(feature = "always-encrypted")]
216 if let Some(ref dec) = self.decryptor {
217 return crate::column_parser::convert_raw_row_decrypted(
218 &raw,
219 meta,
220 &self.columns,
221 dec,
222 );
223 }
224 crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
225 }
226 PendingRow::Nbc(nbc) => {
227 let meta = self
228 .meta
229 .as_ref()
230 .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
231 #[cfg(feature = "always-encrypted")]
232 if let Some(ref dec) = self.decryptor {
233 return crate::column_parser::convert_nbc_row_decrypted(
234 &nbc,
235 meta,
236 &self.columns,
237 dec,
238 );
239 }
240 crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
241 }
242 }
243 }
244}
245
246impl Stream for QueryStream<'_> {
247 type Item = Result<Row, Error>;
248
249 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
250 let this = self.get_mut();
251
252 if this.finished {
253 return Poll::Ready(None);
254 }
255
256 match this.rows.pop_front() {
257 Some(pending) => Poll::Ready(Some(this.decode(pending))),
258 None => {
259 this.finished = true;
260 Poll::Ready(None)
261 }
262 }
263 }
264}
265
266impl ExactSizeIterator for QueryStream<'_> {}
267
268impl Iterator for QueryStream<'_> {
269 type Item = Result<Row, Error>;
270
271 fn next(&mut self) -> Option<Self::Item> {
272 if self.finished {
273 return None;
274 }
275
276 match self.rows.pop_front() {
277 Some(pending) => Some(self.decode(pending)),
278 None => {
279 self.finished = true;
280 None
281 }
282 }
283 }
284
285 fn size_hint(&self) -> (usize, Option<usize>) {
286 let remaining = self.rows.len();
287 (remaining, Some(remaining))
288 }
289}
290
291#[derive(Debug, Clone)]
295#[non_exhaustive]
296#[must_use]
297pub struct ExecuteResult {
298 pub rows_affected: u64,
300 pub output_params: Vec<OutputParam>,
302}
303
304#[derive(Debug, Clone)]
306#[non_exhaustive]
307pub struct OutputParam {
308 pub name: String,
310 pub value: mssql_types::SqlValue,
312}
313
314impl ExecuteResult {
315 pub fn new(rows_affected: u64) -> Self {
317 Self {
318 rows_affected,
319 output_params: Vec::new(),
320 }
321 }
322
323 pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
325 Self {
326 rows_affected,
327 output_params,
328 }
329 }
330
331 #[must_use]
333 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
334 self.output_params
335 .iter()
336 .find(|p| p.name.eq_ignore_ascii_case(name))
337 }
338}
339
340#[derive(Debug, Clone)]
365#[non_exhaustive]
366#[must_use]
367pub struct ProcedureResult {
368 pub return_value: i32,
373 pub rows_affected: u64,
375 pub output_params: Vec<OutputParam>,
377 pub result_sets: Vec<ResultSet>,
379}
380
381impl ProcedureResult {
382 pub(crate) fn new() -> Self {
384 Self {
385 return_value: 0,
386 rows_affected: 0,
387 output_params: Vec::new(),
388 result_sets: Vec::new(),
389 }
390 }
391
392 #[must_use]
397 pub fn get_return_value(&self) -> i32 {
398 self.return_value
399 }
400
401 #[must_use]
424 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
425 let search = name.strip_prefix('@').unwrap_or(name);
426 self.output_params.iter().find(|p| {
427 let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
428 stored.eq_ignore_ascii_case(search)
429 })
430 }
431
432 #[must_use]
436 pub fn first_result_set(&self) -> Option<&ResultSet> {
437 self.result_sets.first()
438 }
439
440 #[must_use]
442 pub fn has_result_sets(&self) -> bool {
443 !self.result_sets.is_empty()
444 }
445}
446
447#[derive(Debug, Clone)]
456#[must_use]
457pub struct ResultSet {
458 columns: Vec<Column>,
460 pending_rows: VecDeque<PendingRow>,
462 meta: Option<ColMetaData>,
465 #[cfg(feature = "always-encrypted")]
470 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
471}
472
473impl ResultSet {
474 pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
480 Self {
481 columns,
482 pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
483 meta: None,
484 #[cfg(feature = "always-encrypted")]
485 decryptor: None,
486 }
487 }
488
489 pub(crate) fn from_raw(
496 columns: Vec<Column>,
497 pending: Vec<PendingRow>,
498 meta: ColMetaData,
499 #[cfg(feature = "always-encrypted")] decryptor: Option<
500 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
501 >,
502 ) -> Self {
503 Self {
504 columns,
505 pending_rows: pending.into(),
506 meta: Some(meta),
507 #[cfg(feature = "always-encrypted")]
508 decryptor,
509 }
510 }
511
512 #[must_use]
514 pub fn columns(&self) -> &[Column] {
515 &self.columns
516 }
517
518 #[must_use]
520 pub fn rows_remaining(&self) -> usize {
521 self.pending_rows.len()
522 }
523
524 pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
530 self.pending_rows.pop_front().map(|p| self.decode(p))
531 }
532
533 #[must_use]
535 pub fn is_empty(&self) -> bool {
536 self.pending_rows.is_empty()
537 }
538
539 pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
543 let mut out = Vec::with_capacity(self.pending_rows.len());
544 while let Some(pending) = self.pending_rows.pop_front() {
545 out.push(self.decode(pending)?);
546 }
547 Ok(out)
548 }
549
550 fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
552 match pending {
553 PendingRow::Parsed(row) => Ok(row),
554 PendingRow::Raw(raw) => {
555 let meta = self
556 .meta
557 .as_ref()
558 .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
559 #[cfg(feature = "always-encrypted")]
560 if let Some(ref dec) = self.decryptor {
561 return crate::column_parser::convert_raw_row_decrypted(
562 &raw,
563 meta,
564 &self.columns,
565 dec,
566 );
567 }
568 crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
569 }
570 PendingRow::Nbc(nbc) => {
571 let meta = self
572 .meta
573 .as_ref()
574 .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
575 #[cfg(feature = "always-encrypted")]
576 if let Some(ref dec) = self.decryptor {
577 return crate::column_parser::convert_nbc_row_decrypted(
578 &nbc,
579 meta,
580 &self.columns,
581 dec,
582 );
583 }
584 crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
585 }
586 }
587 }
588
589 fn into_query_stream<'a>(self) -> QueryStream<'a> {
595 QueryStream {
596 columns: self.columns,
597 rows: self.pending_rows,
598 meta: self.meta,
599 #[cfg(feature = "always-encrypted")]
600 decryptor: self.decryptor,
601 finished: false,
602 _marker: std::marker::PhantomData,
603 }
604 }
605}
606
607#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
634pub struct MultiResultStream<'a> {
635 result_sets: Vec<ResultSet>,
637 current_result: usize,
639 _marker: std::marker::PhantomData<&'a ()>,
641}
642
643impl<'a> MultiResultStream<'a> {
644 pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
646 Self {
647 result_sets,
648 current_result: 0,
649 _marker: std::marker::PhantomData,
650 }
651 }
652
653 #[allow(dead_code)]
655 pub(crate) fn empty() -> Self {
656 Self {
657 result_sets: Vec::new(),
658 current_result: 0,
659 _marker: std::marker::PhantomData,
660 }
661 }
662
663 #[must_use]
665 pub fn current_result_index(&self) -> usize {
666 self.current_result
667 }
668
669 #[must_use]
671 pub fn result_count(&self) -> usize {
672 self.result_sets.len()
673 }
674
675 #[must_use]
677 pub fn has_more_results(&self) -> bool {
678 self.current_result + 1 < self.result_sets.len()
679 }
680
681 #[must_use]
685 pub fn columns(&self) -> Option<&[Column]> {
686 self.result_sets
687 .get(self.current_result)
688 .map(|rs| rs.columns())
689 }
690
691 pub async fn next_result(&mut self) -> Result<bool, Error> {
695 if self.current_result + 1 < self.result_sets.len() {
696 self.current_result += 1;
697 Ok(true)
698 } else {
699 Ok(false)
700 }
701 }
702
703 pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
712 if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
713 result_set.next_row().transpose()
714 } else {
715 Ok(None)
716 }
717 }
718
719 #[must_use]
721 pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
722 self.result_sets.get_mut(self.current_result)
723 }
724
725 pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
731 match self.result_sets.get_mut(self.current_result) {
732 Some(rs) => rs.collect_all(),
733 None => Ok(Vec::new()),
734 }
735 }
736
737 pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
739 self.result_sets
740 .into_iter()
741 .map(ResultSet::into_query_stream)
742 .collect()
743 }
744}
745
746#[cfg(test)]
747#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
748mod tests {
749 use super::*;
750
751 #[test]
752 fn test_execute_result() {
753 let result = ExecuteResult::new(42);
754 assert_eq!(result.rows_affected, 42);
755 assert!(result.output_params.is_empty());
756 }
757
758 #[test]
759 fn test_procedure_result_defaults() {
760 let result = ProcedureResult::new();
761 assert_eq!(result.return_value, 0);
762 assert_eq!(result.rows_affected, 0);
763 assert!(result.output_params.is_empty());
764 assert!(result.result_sets.is_empty());
765 assert!(!result.has_result_sets());
766 assert!(result.first_result_set().is_none());
767 }
768
769 #[test]
770 fn test_procedure_result_get_output() {
771 let mut result = ProcedureResult::new();
772 result.output_params.push(OutputParam {
773 name: "@Total".to_string(),
774 value: mssql_types::SqlValue::Int(42),
775 });
776 result.output_params.push(OutputParam {
777 name: "@Message".to_string(),
778 value: mssql_types::SqlValue::String("ok".to_string()),
779 });
780
781 assert!(result.get_output("@Total").is_some());
783 assert!(result.get_output("@total").is_some());
784 assert!(result.get_output("@TOTAL").is_some());
785
786 assert!(result.get_output("Total").is_some());
788 assert!(result.get_output("total").is_some());
789
790 assert!(result.get_output("@NotHere").is_none());
792 assert!(result.get_output("NotHere").is_none());
793 }
794
795 #[test]
796 fn test_procedure_result_with_result_sets() {
797 use mssql_types::SqlValue;
798
799 let columns = vec![Column {
800 name: "id".to_string(),
801 index: 0,
802 type_name: "INT".to_string(),
803 nullable: false,
804 max_length: Some(4),
805 precision: None,
806 scale: None,
807 collation: None,
808 }];
809 let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
810 let rs = ResultSet::new(columns, rows);
811
812 let mut result = ProcedureResult::new();
813 result.result_sets.push(rs);
814 result.return_value = 7;
815 result.rows_affected = 5;
816
817 assert!(result.has_result_sets());
818 assert_eq!(result.get_return_value(), 7);
819 assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
820 }
821
822 #[test]
823 fn test_execute_result_with_outputs() {
824 let outputs = vec![OutputParam {
825 name: "ReturnValue".to_string(),
826 value: mssql_types::SqlValue::Int(100),
827 }];
828
829 let result = ExecuteResult::with_outputs(10, outputs);
830 assert_eq!(result.rows_affected, 10);
831 assert!(result.get_output("ReturnValue").is_some());
832 assert!(result.get_output("returnvalue").is_some()); assert!(result.get_output("NotFound").is_none());
834 }
835
836 #[test]
837 fn test_query_stream_columns() {
838 let columns = vec![Column {
839 name: "id".to_string(),
840 index: 0,
841 type_name: "INT".to_string(),
842 nullable: false,
843 max_length: Some(4),
844 precision: Some(0),
845 scale: Some(0),
846 collation: None,
847 }];
848
849 let stream = QueryStream::new(columns, Vec::new());
850 assert_eq!(stream.columns().len(), 1);
851 assert_eq!(stream.columns()[0].name, "id");
852 assert!(!stream.is_finished());
853 }
854
855 #[test]
856 fn test_query_stream_with_rows() {
857 use mssql_types::SqlValue;
858
859 let columns = vec![
860 Column {
861 name: "id".to_string(),
862 index: 0,
863 type_name: "INT".to_string(),
864 nullable: false,
865 max_length: Some(4),
866 precision: None,
867 scale: None,
868 collation: None,
869 },
870 Column {
871 name: "name".to_string(),
872 index: 1,
873 type_name: "NVARCHAR".to_string(),
874 nullable: true,
875 max_length: Some(100),
876 precision: None,
877 scale: None,
878 collation: None,
879 },
880 ];
881
882 let rows = vec![
883 Row::from_values(
884 columns.clone(),
885 vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
886 ),
887 Row::from_values(
888 columns.clone(),
889 vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
890 ),
891 ];
892
893 let mut stream = QueryStream::new(columns, rows);
894 assert_eq!(stream.columns().len(), 2);
895 assert_eq!(stream.rows_remaining(), 2);
896 assert!(!stream.is_finished());
897
898 let row1 = stream.try_next().unwrap();
900 assert_eq!(row1.get::<i32>(0).unwrap(), 1);
901 assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
902
903 let row2 = stream.try_next().unwrap();
905 assert_eq!(row2.get::<i32>(0).unwrap(), 2);
906 assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
907
908 assert!(stream.try_next().is_none());
910 assert!(stream.is_finished());
911 }
912
913 #[test]
914 fn test_query_stream_iterator() {
915 use mssql_types::SqlValue;
916
917 let columns = vec![Column {
918 name: "val".to_string(),
919 index: 0,
920 type_name: "INT".to_string(),
921 nullable: false,
922 max_length: None,
923 precision: None,
924 scale: None,
925 collation: None,
926 }];
927
928 let rows = vec![
929 Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
930 Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
931 Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
932 ];
933
934 let mut stream = QueryStream::new(columns, rows);
935
936 let values: Vec<i32> = stream
940 .by_ref()
941 .map(|r| r.unwrap().get::<i32>(0).unwrap())
942 .collect();
943
944 assert_eq!(values, vec![10, 20, 30]);
945 assert!(stream.is_finished());
946 }
947
948 #[test]
949 fn test_query_stream_empty() {
950 let stream = QueryStream::empty();
951 assert!(stream.columns().is_empty());
952 assert_eq!(stream.rows_remaining(), 0);
953 assert!(stream.is_finished());
954 }
955
956 #[test]
961 fn test_query_stream_lazy_raw_row_decoding() {
962 use bytes::Bytes;
963 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
964 use tds_protocol::types::TypeId;
965
966 let mut data = Vec::new();
968 data.push(4); data.extend_from_slice(&42i32.to_le_bytes());
970 data.push(0); let meta = ColMetaData {
973 columns: vec![
974 ColumnData {
975 name: "a".to_string(),
976 type_id: TypeId::IntN,
977 col_type: 0x26,
978 flags: 0x00,
979 user_type: 0,
980 type_info: TypeInfo {
981 max_length: Some(4),
982 precision: None,
983 scale: None,
984 collation: None,
985 },
986 crypto_metadata: None,
987 },
988 ColumnData {
989 name: "b".to_string(),
990 type_id: TypeId::IntN,
991 col_type: 0x26,
992 flags: 0x01,
993 user_type: 0,
994 type_info: TypeInfo {
995 max_length: Some(4),
996 precision: None,
997 scale: None,
998 collation: None,
999 },
1000 crypto_metadata: None,
1001 },
1002 ],
1003 cek_table: None,
1004 };
1005
1006 let columns = vec![
1007 Column {
1008 name: "a".to_string(),
1009 index: 0,
1010 type_name: "INT".to_string(),
1011 nullable: false,
1012 max_length: Some(4),
1013 precision: None,
1014 scale: None,
1015 collation: None,
1016 },
1017 Column {
1018 name: "b".to_string(),
1019 index: 1,
1020 type_name: "INT".to_string(),
1021 nullable: true,
1022 max_length: Some(4),
1023 precision: None,
1024 scale: None,
1025 collation: None,
1026 },
1027 ];
1028
1029 let pending = vec![PendingRow::Raw(RawRow {
1030 data: Bytes::from(data),
1031 })];
1032
1033 #[cfg(feature = "always-encrypted")]
1034 let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1035 #[cfg(not(feature = "always-encrypted"))]
1036 let mut stream = QueryStream::from_raw(columns, pending, meta);
1037
1038 assert_eq!(stream.rows_remaining(), 1);
1039 let row = stream
1040 .next()
1041 .expect("one row pending")
1042 .expect("row decoded successfully");
1043 assert_eq!(row.get::<i32>(0).unwrap(), 42);
1044 assert!(row.is_null(1));
1045 assert!(stream.next().is_none());
1046 assert!(stream.is_finished());
1047 }
1048
1049 #[test]
1053 fn test_query_stream_lazy_decode_error_propagates() {
1054 use bytes::Bytes;
1055 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1056 use tds_protocol::types::TypeId;
1057
1058 let data = vec![0x01u8, 0x02];
1060
1061 let meta = ColMetaData {
1062 columns: vec![ColumnData {
1063 name: "a".to_string(),
1064 type_id: TypeId::Int4,
1065 col_type: 0x38,
1066 flags: 0x00,
1067 user_type: 0,
1068 type_info: TypeInfo {
1069 max_length: Some(4),
1070 precision: None,
1071 scale: None,
1072 collation: None,
1073 },
1074 crypto_metadata: None,
1075 }],
1076 cek_table: None,
1077 };
1078
1079 let columns = vec![Column {
1080 name: "a".to_string(),
1081 index: 0,
1082 type_name: "INT".to_string(),
1083 nullable: false,
1084 max_length: Some(4),
1085 precision: None,
1086 scale: None,
1087 collation: None,
1088 }];
1089
1090 let pending = vec![PendingRow::Raw(RawRow {
1091 data: Bytes::from(data),
1092 })];
1093
1094 #[cfg(feature = "always-encrypted")]
1095 let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1096 #[cfg(not(feature = "always-encrypted"))]
1097 let mut stream = QueryStream::from_raw(columns, pending, meta);
1098
1099 let item = stream.next().expect("pending row present");
1100 assert!(item.is_err(), "truncated bytes must surface a decode error");
1101 assert!(stream.next().is_none());
1102 }
1103
1104 #[cfg(test)]
1107 fn intn_meta_and_columns(
1108 col_name: &str,
1109 nullable: bool,
1110 ) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
1111 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1112 use tds_protocol::types::TypeId;
1113 (
1114 ColMetaData {
1115 columns: vec![ColumnData {
1116 name: col_name.to_string(),
1117 type_id: TypeId::IntN,
1118 col_type: 0x26,
1119 flags: if nullable { 0x01 } else { 0x00 },
1120 user_type: 0,
1121 type_info: TypeInfo {
1122 max_length: Some(4),
1123 precision: None,
1124 scale: None,
1125 collation: None,
1126 },
1127 crypto_metadata: None,
1128 }],
1129 cek_table: None,
1130 },
1131 vec![Column {
1132 name: col_name.to_string(),
1133 index: 0,
1134 type_name: "INT".to_string(),
1135 nullable,
1136 max_length: Some(4),
1137 precision: None,
1138 scale: None,
1139 collation: None,
1140 }],
1141 )
1142 }
1143
1144 #[test]
1148 fn test_result_set_lazy_raw_row_decoding() {
1149 use bytes::Bytes;
1150 use tds_protocol::token::RawRow;
1151
1152 let (meta, columns) = intn_meta_and_columns("a", false);
1153
1154 let pending = vec![
1156 PendingRow::Raw(RawRow {
1157 data: {
1158 let mut b = Vec::with_capacity(5);
1159 b.push(4);
1160 b.extend_from_slice(&7i32.to_le_bytes());
1161 Bytes::from(b)
1162 },
1163 }),
1164 PendingRow::Raw(RawRow {
1165 data: {
1166 let mut b = Vec::with_capacity(5);
1167 b.push(4);
1168 b.extend_from_slice(&11i32.to_le_bytes());
1169 Bytes::from(b)
1170 },
1171 }),
1172 ];
1173
1174 #[cfg(feature = "always-encrypted")]
1175 let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1176 #[cfg(not(feature = "always-encrypted"))]
1177 let mut rs = ResultSet::from_raw(columns, pending, meta);
1178
1179 assert_eq!(rs.rows_remaining(), 2);
1180 assert!(!rs.is_empty());
1181
1182 let row1 = rs.next_row().expect("row present").expect("decodes");
1183 assert_eq!(row1.get::<i32>(0).unwrap(), 7);
1184
1185 let row2 = rs.next_row().expect("row present").expect("decodes");
1186 assert_eq!(row2.get::<i32>(0).unwrap(), 11);
1187
1188 assert!(rs.next_row().is_none());
1189 assert!(rs.is_empty());
1190 }
1191
1192 #[test]
1196 fn test_result_set_lazy_decode_error_propagates() {
1197 use bytes::Bytes;
1198 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1199 use tds_protocol::types::TypeId;
1200
1201 let meta = ColMetaData {
1203 columns: vec![ColumnData {
1204 name: "a".to_string(),
1205 type_id: TypeId::Int4,
1206 col_type: 0x38,
1207 flags: 0x00,
1208 user_type: 0,
1209 type_info: TypeInfo {
1210 max_length: Some(4),
1211 precision: None,
1212 scale: None,
1213 collation: None,
1214 },
1215 crypto_metadata: None,
1216 }],
1217 cek_table: None,
1218 };
1219 let columns = vec![Column {
1220 name: "a".to_string(),
1221 index: 0,
1222 type_name: "INT".to_string(),
1223 nullable: false,
1224 max_length: Some(4),
1225 precision: None,
1226 scale: None,
1227 collation: None,
1228 }];
1229
1230 let pending = vec![PendingRow::Raw(RawRow {
1231 data: Bytes::from(vec![0x01u8, 0x02]),
1232 })];
1233
1234 #[cfg(feature = "always-encrypted")]
1235 let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1236 #[cfg(not(feature = "always-encrypted"))]
1237 let mut rs = ResultSet::from_raw(columns, pending, meta);
1238
1239 let first = rs.next_row().expect("pending row present");
1240 assert!(
1241 first.is_err(),
1242 "truncated bytes must surface a decode error"
1243 );
1244 assert!(rs.next_row().is_none());
1245 }
1246
1247 #[test]
1251 fn test_result_set_lazy_collect_all_success_and_error() {
1252 use bytes::Bytes;
1253 use tds_protocol::token::RawRow;
1254
1255 let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
1257 let pending_ok = vec![
1258 PendingRow::Raw(RawRow {
1259 data: {
1260 let mut b = Vec::with_capacity(5);
1261 b.push(4);
1262 b.extend_from_slice(&10i32.to_le_bytes());
1263 Bytes::from(b)
1264 },
1265 }),
1266 PendingRow::Raw(RawRow {
1267 data: {
1268 let mut b = Vec::with_capacity(5);
1269 b.push(4);
1270 b.extend_from_slice(&20i32.to_le_bytes());
1271 Bytes::from(b)
1272 },
1273 }),
1274 ];
1275
1276 #[cfg(feature = "always-encrypted")]
1277 let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
1278 #[cfg(not(feature = "always-encrypted"))]
1279 let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
1280 let rows = rs_ok.collect_all().expect("all rows decode");
1281 assert_eq!(rows.len(), 2);
1282 assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
1283 assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
1284 assert!(rs_ok.is_empty());
1285
1286 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1289 use tds_protocol::types::TypeId;
1290 let meta_err = ColMetaData {
1291 columns: vec![ColumnData {
1292 name: "a".to_string(),
1293 type_id: TypeId::Int4,
1294 col_type: 0x38,
1295 flags: 0x00,
1296 user_type: 0,
1297 type_info: TypeInfo {
1298 max_length: Some(4),
1299 precision: None,
1300 scale: None,
1301 collation: None,
1302 },
1303 crypto_metadata: None,
1304 }],
1305 cek_table: None,
1306 };
1307 let cols_err = vec![Column {
1308 name: "a".to_string(),
1309 index: 0,
1310 type_name: "INT".to_string(),
1311 nullable: false,
1312 max_length: Some(4),
1313 precision: None,
1314 scale: None,
1315 collation: None,
1316 }];
1317 let pending_err = vec![PendingRow::Raw(RawRow {
1318 data: Bytes::from(vec![0x01u8, 0x02]),
1319 })];
1320
1321 #[cfg(feature = "always-encrypted")]
1322 let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
1323 #[cfg(not(feature = "always-encrypted"))]
1324 let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
1325 let err = rs_err.collect_all();
1326 assert!(err.is_err(), "collect_all must propagate decode error");
1327 }
1328
1329 #[tokio::test]
1333 async fn test_multi_result_stream_lazy_decode_across_result_sets() {
1334 use bytes::Bytes;
1335 use tds_protocol::token::RawRow;
1336
1337 let (meta1, cols1) = intn_meta_and_columns("a", false);
1338 let pending1 = vec![PendingRow::Raw(RawRow {
1339 data: {
1340 let mut b = Vec::with_capacity(5);
1341 b.push(4);
1342 b.extend_from_slice(&101i32.to_le_bytes());
1343 Bytes::from(b)
1344 },
1345 })];
1346 #[cfg(feature = "always-encrypted")]
1347 let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
1348 #[cfg(not(feature = "always-encrypted"))]
1349 let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
1350
1351 let (meta2, cols2) = intn_meta_and_columns("b", false);
1352 let pending2 = vec![PendingRow::Raw(RawRow {
1353 data: {
1354 let mut b = Vec::with_capacity(5);
1355 b.push(4);
1356 b.extend_from_slice(&202i32.to_le_bytes());
1357 Bytes::from(b)
1358 },
1359 })];
1360 #[cfg(feature = "always-encrypted")]
1361 let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
1362 #[cfg(not(feature = "always-encrypted"))]
1363 let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
1364
1365 let mut stream = MultiResultStream::new(vec![rs1, rs2]);
1366 assert_eq!(stream.result_count(), 2);
1367 assert_eq!(stream.current_result_index(), 0);
1368
1369 let row = stream
1370 .next_row()
1371 .await
1372 .expect("first row success")
1373 .expect("row present");
1374 assert_eq!(row.get::<i32>(0).unwrap(), 101);
1375 assert!(stream.next_row().await.expect("no more rows").is_none());
1376
1377 assert!(stream.has_more_results());
1378 assert!(stream.next_result().await.expect("advance ok"));
1379 assert_eq!(stream.current_result_index(), 1);
1380
1381 let row = stream
1382 .next_row()
1383 .await
1384 .expect("second row success")
1385 .expect("row present");
1386 assert_eq!(row.get::<i32>(0).unwrap(), 202);
1387 }
1388}