1use std::collections::VecDeque;
36use std::pin::Pin;
37use std::task::{Context, Poll};
38
39use futures_core::Stream;
40use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
41
42use crate::error::Error;
43use crate::row::{Column, Row};
44
45#[derive(Debug, Clone)]
52pub(crate) enum PendingRow {
53 Parsed(Row),
55 Raw(RawRow),
57 Nbc(NbcRow),
59}
60
61#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
87pub struct QueryStream<'a> {
88 columns: Vec<Column>,
90 rows: VecDeque<PendingRow>,
92 meta: Option<ColMetaData>,
95 #[cfg(feature = "always-encrypted")]
100 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
101 finished: bool,
103 _marker: std::marker::PhantomData<&'a ()>,
105}
106
107impl QueryStream<'_> {
108 #[cfg(test)]
115 pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
116 Self {
117 columns,
118 rows: rows.into_iter().map(PendingRow::Parsed).collect(),
119 meta: None,
120 #[cfg(feature = "always-encrypted")]
121 decryptor: None,
122 finished: false,
123 _marker: std::marker::PhantomData,
124 }
125 }
126
127 pub(crate) fn from_raw(
133 columns: Vec<Column>,
134 pending: Vec<PendingRow>,
135 meta: ColMetaData,
136 #[cfg(feature = "always-encrypted")] decryptor: Option<
137 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
138 >,
139 ) -> Self {
140 Self {
141 columns,
142 rows: pending.into(),
143 meta: Some(meta),
144 #[cfg(feature = "always-encrypted")]
145 decryptor,
146 finished: false,
147 _marker: std::marker::PhantomData,
148 }
149 }
150
151 #[allow(dead_code)]
153 pub(crate) fn empty() -> Self {
154 Self {
155 columns: Vec::new(),
156 rows: VecDeque::new(),
157 meta: None,
158 #[cfg(feature = "always-encrypted")]
159 decryptor: None,
160 finished: true,
161 _marker: std::marker::PhantomData,
162 }
163 }
164
165 #[must_use]
167 pub fn columns(&self) -> &[Column] {
168 &self.columns
169 }
170
171 #[must_use]
173 pub fn is_finished(&self) -> bool {
174 self.finished
175 }
176
177 #[must_use]
179 pub fn rows_remaining(&self) -> usize {
180 self.rows.len()
181 }
182
183 pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
193 let mut out = Vec::with_capacity(self.rows.len());
194 while let Some(pending) = self.rows.pop_front() {
195 out.push(self.decode(pending)?);
196 }
197 self.finished = true;
198 Ok(out)
199 }
200
201 pub fn try_next(&mut self) -> Option<Row> {
207 self.next().and_then(|r| r.ok())
208 }
209
210 fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
212 match pending {
213 PendingRow::Parsed(row) => Ok(row),
214 PendingRow::Raw(raw) => {
215 let meta = self
216 .meta
217 .as_ref()
218 .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
219 #[cfg(feature = "always-encrypted")]
220 if let Some(ref dec) = self.decryptor {
221 return crate::column_parser::convert_raw_row_decrypted(
222 &raw,
223 meta,
224 &self.columns,
225 dec,
226 );
227 }
228 crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
229 }
230 PendingRow::Nbc(nbc) => {
231 let meta = self
232 .meta
233 .as_ref()
234 .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
235 #[cfg(feature = "always-encrypted")]
236 if let Some(ref dec) = self.decryptor {
237 return crate::column_parser::convert_nbc_row_decrypted(
238 &nbc,
239 meta,
240 &self.columns,
241 dec,
242 );
243 }
244 crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
245 }
246 }
247 }
248}
249
250impl Stream for QueryStream<'_> {
251 type Item = Result<Row, Error>;
252
253 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254 let this = self.get_mut();
255
256 if this.finished {
257 return Poll::Ready(None);
258 }
259
260 match this.rows.pop_front() {
261 Some(pending) => Poll::Ready(Some(this.decode(pending))),
262 None => {
263 this.finished = true;
264 Poll::Ready(None)
265 }
266 }
267 }
268}
269
270impl ExactSizeIterator for QueryStream<'_> {}
271
272impl Iterator for QueryStream<'_> {
273 type Item = Result<Row, Error>;
274
275 fn next(&mut self) -> Option<Self::Item> {
276 if self.finished {
277 return None;
278 }
279
280 match self.rows.pop_front() {
281 Some(pending) => Some(self.decode(pending)),
282 None => {
283 self.finished = true;
284 None
285 }
286 }
287 }
288
289 fn size_hint(&self) -> (usize, Option<usize>) {
290 let remaining = self.rows.len();
291 (remaining, Some(remaining))
292 }
293}
294
295#[derive(Debug, Clone)]
299#[non_exhaustive]
300#[must_use]
301pub struct ExecuteResult {
302 pub rows_affected: u64,
304 pub output_params: Vec<OutputParam>,
306}
307
308#[derive(Debug, Clone)]
310#[non_exhaustive]
311pub struct OutputParam {
312 pub name: String,
314 pub value: mssql_types::SqlValue,
316}
317
318impl ExecuteResult {
319 pub fn new(rows_affected: u64) -> Self {
321 Self {
322 rows_affected,
323 output_params: Vec::new(),
324 }
325 }
326
327 pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
329 Self {
330 rows_affected,
331 output_params,
332 }
333 }
334
335 #[must_use]
337 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
338 self.output_params
339 .iter()
340 .find(|p| p.name.eq_ignore_ascii_case(name))
341 }
342}
343
344#[derive(Debug, Clone)]
369#[non_exhaustive]
370#[must_use]
371pub struct ProcedureResult {
372 pub return_value: i32,
377 pub rows_affected: u64,
379 pub output_params: Vec<OutputParam>,
381 pub result_sets: Vec<ResultSet>,
383}
384
385impl ProcedureResult {
386 pub(crate) fn new() -> Self {
388 Self {
389 return_value: 0,
390 rows_affected: 0,
391 output_params: Vec::new(),
392 result_sets: Vec::new(),
393 }
394 }
395
396 #[must_use]
401 pub fn get_return_value(&self) -> i32 {
402 self.return_value
403 }
404
405 #[must_use]
428 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
429 let search = name.strip_prefix('@').unwrap_or(name);
430 self.output_params.iter().find(|p| {
431 let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
432 stored.eq_ignore_ascii_case(search)
433 })
434 }
435
436 #[must_use]
440 pub fn first_result_set(&self) -> Option<&ResultSet> {
441 self.result_sets.first()
442 }
443
444 #[must_use]
446 pub fn has_result_sets(&self) -> bool {
447 !self.result_sets.is_empty()
448 }
449}
450
451#[derive(Debug, Clone)]
460#[must_use]
461pub struct ResultSet {
462 columns: Vec<Column>,
464 pending_rows: VecDeque<PendingRow>,
466 meta: Option<ColMetaData>,
469 #[cfg(feature = "always-encrypted")]
474 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
475}
476
477impl ResultSet {
478 pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
484 Self {
485 columns,
486 pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
487 meta: None,
488 #[cfg(feature = "always-encrypted")]
489 decryptor: None,
490 }
491 }
492
493 pub(crate) fn from_raw(
500 columns: Vec<Column>,
501 pending: Vec<PendingRow>,
502 meta: ColMetaData,
503 #[cfg(feature = "always-encrypted")] decryptor: Option<
504 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
505 >,
506 ) -> Self {
507 Self {
508 columns,
509 pending_rows: pending.into(),
510 meta: Some(meta),
511 #[cfg(feature = "always-encrypted")]
512 decryptor,
513 }
514 }
515
516 #[must_use]
518 pub fn columns(&self) -> &[Column] {
519 &self.columns
520 }
521
522 #[must_use]
524 pub fn rows_remaining(&self) -> usize {
525 self.pending_rows.len()
526 }
527
528 pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
534 self.pending_rows.pop_front().map(|p| self.decode(p))
535 }
536
537 #[must_use]
539 pub fn is_empty(&self) -> bool {
540 self.pending_rows.is_empty()
541 }
542
543 pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
547 let mut out = Vec::with_capacity(self.pending_rows.len());
548 while let Some(pending) = self.pending_rows.pop_front() {
549 out.push(self.decode(pending)?);
550 }
551 Ok(out)
552 }
553
554 fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
556 match pending {
557 PendingRow::Parsed(row) => Ok(row),
558 PendingRow::Raw(raw) => {
559 let meta = self
560 .meta
561 .as_ref()
562 .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
563 #[cfg(feature = "always-encrypted")]
564 if let Some(ref dec) = self.decryptor {
565 return crate::column_parser::convert_raw_row_decrypted(
566 &raw,
567 meta,
568 &self.columns,
569 dec,
570 );
571 }
572 crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
573 }
574 PendingRow::Nbc(nbc) => {
575 let meta = self
576 .meta
577 .as_ref()
578 .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
579 #[cfg(feature = "always-encrypted")]
580 if let Some(ref dec) = self.decryptor {
581 return crate::column_parser::convert_nbc_row_decrypted(
582 &nbc,
583 meta,
584 &self.columns,
585 dec,
586 );
587 }
588 crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
589 }
590 }
591 }
592
593 fn into_query_stream<'a>(self) -> QueryStream<'a> {
599 QueryStream {
600 columns: self.columns,
601 rows: self.pending_rows,
602 meta: self.meta,
603 #[cfg(feature = "always-encrypted")]
604 decryptor: self.decryptor,
605 finished: false,
606 _marker: std::marker::PhantomData,
607 }
608 }
609}
610
611#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
638pub struct MultiResultStream<'a> {
639 result_sets: Vec<ResultSet>,
641 current_result: usize,
643 _marker: std::marker::PhantomData<&'a ()>,
645}
646
647impl<'a> MultiResultStream<'a> {
648 pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
650 Self {
651 result_sets,
652 current_result: 0,
653 _marker: std::marker::PhantomData,
654 }
655 }
656
657 #[allow(dead_code)]
659 pub(crate) fn empty() -> Self {
660 Self {
661 result_sets: Vec::new(),
662 current_result: 0,
663 _marker: std::marker::PhantomData,
664 }
665 }
666
667 #[must_use]
669 pub fn current_result_index(&self) -> usize {
670 self.current_result
671 }
672
673 #[must_use]
675 pub fn result_count(&self) -> usize {
676 self.result_sets.len()
677 }
678
679 #[must_use]
681 pub fn has_more_results(&self) -> bool {
682 self.current_result + 1 < self.result_sets.len()
683 }
684
685 #[must_use]
689 pub fn columns(&self) -> Option<&[Column]> {
690 self.result_sets
691 .get(self.current_result)
692 .map(|rs| rs.columns())
693 }
694
695 pub async fn next_result(&mut self) -> Result<bool, Error> {
699 if self.current_result + 1 < self.result_sets.len() {
700 self.current_result += 1;
701 Ok(true)
702 } else {
703 Ok(false)
704 }
705 }
706
707 pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
716 if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
717 result_set.next_row().transpose()
718 } else {
719 Ok(None)
720 }
721 }
722
723 #[must_use]
725 pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
726 self.result_sets.get_mut(self.current_result)
727 }
728
729 pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
735 match self.result_sets.get_mut(self.current_result) {
736 Some(rs) => rs.collect_all(),
737 None => Ok(Vec::new()),
738 }
739 }
740
741 pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
743 self.result_sets
744 .into_iter()
745 .map(ResultSet::into_query_stream)
746 .collect()
747 }
748}
749
750#[cfg(test)]
751#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
752mod tests {
753 use super::*;
754
755 #[test]
756 fn test_execute_result() {
757 let result = ExecuteResult::new(42);
758 assert_eq!(result.rows_affected, 42);
759 assert!(result.output_params.is_empty());
760 }
761
762 #[test]
763 fn test_procedure_result_defaults() {
764 let result = ProcedureResult::new();
765 assert_eq!(result.return_value, 0);
766 assert_eq!(result.rows_affected, 0);
767 assert!(result.output_params.is_empty());
768 assert!(result.result_sets.is_empty());
769 assert!(!result.has_result_sets());
770 assert!(result.first_result_set().is_none());
771 }
772
773 #[test]
774 fn test_procedure_result_get_output() {
775 let mut result = ProcedureResult::new();
776 result.output_params.push(OutputParam {
777 name: "@Total".to_string(),
778 value: mssql_types::SqlValue::Int(42),
779 });
780 result.output_params.push(OutputParam {
781 name: "@Message".to_string(),
782 value: mssql_types::SqlValue::String("ok".to_string()),
783 });
784
785 assert!(result.get_output("@Total").is_some());
787 assert!(result.get_output("@total").is_some());
788 assert!(result.get_output("@TOTAL").is_some());
789
790 assert!(result.get_output("Total").is_some());
792 assert!(result.get_output("total").is_some());
793
794 assert!(result.get_output("@NotHere").is_none());
796 assert!(result.get_output("NotHere").is_none());
797 }
798
799 #[test]
800 fn test_procedure_result_with_result_sets() {
801 use mssql_types::SqlValue;
802
803 let columns = vec![Column {
804 name: "id".to_string(),
805 index: 0,
806 type_name: "INT".to_string(),
807 nullable: false,
808 max_length: Some(4),
809 precision: None,
810 scale: None,
811 collation: None,
812 }];
813 let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
814 let rs = ResultSet::new(columns, rows);
815
816 let mut result = ProcedureResult::new();
817 result.result_sets.push(rs);
818 result.return_value = 7;
819 result.rows_affected = 5;
820
821 assert!(result.has_result_sets());
822 assert_eq!(result.get_return_value(), 7);
823 assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
824 }
825
826 #[test]
827 fn test_execute_result_with_outputs() {
828 let outputs = vec![OutputParam {
829 name: "ReturnValue".to_string(),
830 value: mssql_types::SqlValue::Int(100),
831 }];
832
833 let result = ExecuteResult::with_outputs(10, outputs);
834 assert_eq!(result.rows_affected, 10);
835 assert!(result.get_output("ReturnValue").is_some());
836 assert!(result.get_output("returnvalue").is_some()); assert!(result.get_output("NotFound").is_none());
838 }
839
840 #[test]
841 fn test_query_stream_columns() {
842 let columns = vec![Column {
843 name: "id".to_string(),
844 index: 0,
845 type_name: "INT".to_string(),
846 nullable: false,
847 max_length: Some(4),
848 precision: Some(0),
849 scale: Some(0),
850 collation: None,
851 }];
852
853 let stream = QueryStream::new(columns, Vec::new());
854 assert_eq!(stream.columns().len(), 1);
855 assert_eq!(stream.columns()[0].name, "id");
856 assert!(!stream.is_finished());
857 }
858
859 #[test]
860 fn test_query_stream_with_rows() {
861 use mssql_types::SqlValue;
862
863 let columns = vec![
864 Column {
865 name: "id".to_string(),
866 index: 0,
867 type_name: "INT".to_string(),
868 nullable: false,
869 max_length: Some(4),
870 precision: None,
871 scale: None,
872 collation: None,
873 },
874 Column {
875 name: "name".to_string(),
876 index: 1,
877 type_name: "NVARCHAR".to_string(),
878 nullable: true,
879 max_length: Some(100),
880 precision: None,
881 scale: None,
882 collation: None,
883 },
884 ];
885
886 let rows = vec![
887 Row::from_values(
888 columns.clone(),
889 vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
890 ),
891 Row::from_values(
892 columns.clone(),
893 vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
894 ),
895 ];
896
897 let mut stream = QueryStream::new(columns, rows);
898 assert_eq!(stream.columns().len(), 2);
899 assert_eq!(stream.rows_remaining(), 2);
900 assert!(!stream.is_finished());
901
902 let row1 = stream.try_next().unwrap();
904 assert_eq!(row1.get::<i32>(0).unwrap(), 1);
905 assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
906
907 let row2 = stream.try_next().unwrap();
909 assert_eq!(row2.get::<i32>(0).unwrap(), 2);
910 assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
911
912 assert!(stream.try_next().is_none());
914 assert!(stream.is_finished());
915 }
916
917 #[test]
918 fn test_query_stream_iterator() {
919 use mssql_types::SqlValue;
920
921 let columns = vec![Column {
922 name: "val".to_string(),
923 index: 0,
924 type_name: "INT".to_string(),
925 nullable: false,
926 max_length: None,
927 precision: None,
928 scale: None,
929 collation: None,
930 }];
931
932 let rows = vec![
933 Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
934 Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
935 Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
936 ];
937
938 let mut stream = QueryStream::new(columns, rows);
939
940 let values: Vec<i32> = stream
944 .by_ref()
945 .map(|r| r.unwrap().get::<i32>(0).unwrap())
946 .collect();
947
948 assert_eq!(values, vec![10, 20, 30]);
949 assert!(stream.is_finished());
950 }
951
952 #[test]
953 fn test_query_stream_empty() {
954 let stream = QueryStream::empty();
955 assert!(stream.columns().is_empty());
956 assert_eq!(stream.rows_remaining(), 0);
957 assert!(stream.is_finished());
958 }
959
960 #[test]
965 fn test_query_stream_lazy_raw_row_decoding() {
966 use bytes::Bytes;
967 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
968 use tds_protocol::types::TypeId;
969
970 let mut data = Vec::new();
972 data.push(4); data.extend_from_slice(&42i32.to_le_bytes());
974 data.push(0); let meta = ColMetaData {
977 columns: vec![
978 ColumnData {
979 name: "a".to_string(),
980 type_id: TypeId::IntN,
981 col_type: 0x26,
982 flags: 0x00,
983 user_type: 0,
984 type_info: TypeInfo {
985 max_length: Some(4),
986 precision: None,
987 scale: None,
988 collation: None,
989 },
990 crypto_metadata: None,
991 },
992 ColumnData {
993 name: "b".to_string(),
994 type_id: TypeId::IntN,
995 col_type: 0x26,
996 flags: 0x01,
997 user_type: 0,
998 type_info: TypeInfo {
999 max_length: Some(4),
1000 precision: None,
1001 scale: None,
1002 collation: None,
1003 },
1004 crypto_metadata: None,
1005 },
1006 ],
1007 cek_table: None,
1008 };
1009
1010 let columns = vec![
1011 Column {
1012 name: "a".to_string(),
1013 index: 0,
1014 type_name: "INT".to_string(),
1015 nullable: false,
1016 max_length: Some(4),
1017 precision: None,
1018 scale: None,
1019 collation: None,
1020 },
1021 Column {
1022 name: "b".to_string(),
1023 index: 1,
1024 type_name: "INT".to_string(),
1025 nullable: true,
1026 max_length: Some(4),
1027 precision: None,
1028 scale: None,
1029 collation: None,
1030 },
1031 ];
1032
1033 let pending = vec![PendingRow::Raw(RawRow {
1034 data: Bytes::from(data),
1035 })];
1036
1037 #[cfg(feature = "always-encrypted")]
1038 let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1039 #[cfg(not(feature = "always-encrypted"))]
1040 let mut stream = QueryStream::from_raw(columns, pending, meta);
1041
1042 assert_eq!(stream.rows_remaining(), 1);
1043 let row = stream
1044 .next()
1045 .expect("one row pending")
1046 .expect("row decoded successfully");
1047 assert_eq!(row.get::<i32>(0).unwrap(), 42);
1048 assert!(row.is_null(1));
1049 assert!(stream.next().is_none());
1050 assert!(stream.is_finished());
1051 }
1052
1053 #[test]
1057 fn test_query_stream_lazy_decode_error_propagates() {
1058 use bytes::Bytes;
1059 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1060 use tds_protocol::types::TypeId;
1061
1062 let data = vec![0x01u8, 0x02];
1064
1065 let meta = ColMetaData {
1066 columns: vec![ColumnData {
1067 name: "a".to_string(),
1068 type_id: TypeId::Int4,
1069 col_type: 0x38,
1070 flags: 0x00,
1071 user_type: 0,
1072 type_info: TypeInfo {
1073 max_length: Some(4),
1074 precision: None,
1075 scale: None,
1076 collation: None,
1077 },
1078 crypto_metadata: None,
1079 }],
1080 cek_table: None,
1081 };
1082
1083 let columns = vec![Column {
1084 name: "a".to_string(),
1085 index: 0,
1086 type_name: "INT".to_string(),
1087 nullable: false,
1088 max_length: Some(4),
1089 precision: None,
1090 scale: None,
1091 collation: None,
1092 }];
1093
1094 let pending = vec![PendingRow::Raw(RawRow {
1095 data: Bytes::from(data),
1096 })];
1097
1098 #[cfg(feature = "always-encrypted")]
1099 let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1100 #[cfg(not(feature = "always-encrypted"))]
1101 let mut stream = QueryStream::from_raw(columns, pending, meta);
1102
1103 let item = stream.next().expect("pending row present");
1104 assert!(item.is_err(), "truncated bytes must surface a decode error");
1105 assert!(stream.next().is_none());
1106 }
1107
1108 #[cfg(test)]
1111 fn intn_meta_and_columns(
1112 col_name: &str,
1113 nullable: bool,
1114 ) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
1115 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1116 use tds_protocol::types::TypeId;
1117 (
1118 ColMetaData {
1119 columns: vec![ColumnData {
1120 name: col_name.to_string(),
1121 type_id: TypeId::IntN,
1122 col_type: 0x26,
1123 flags: if nullable { 0x01 } else { 0x00 },
1124 user_type: 0,
1125 type_info: TypeInfo {
1126 max_length: Some(4),
1127 precision: None,
1128 scale: None,
1129 collation: None,
1130 },
1131 crypto_metadata: None,
1132 }],
1133 cek_table: None,
1134 },
1135 vec![Column {
1136 name: col_name.to_string(),
1137 index: 0,
1138 type_name: "INT".to_string(),
1139 nullable,
1140 max_length: Some(4),
1141 precision: None,
1142 scale: None,
1143 collation: None,
1144 }],
1145 )
1146 }
1147
1148 #[test]
1152 fn test_result_set_lazy_raw_row_decoding() {
1153 use bytes::Bytes;
1154 use tds_protocol::token::RawRow;
1155
1156 let (meta, columns) = intn_meta_and_columns("a", false);
1157
1158 let pending = vec![
1160 PendingRow::Raw(RawRow {
1161 data: {
1162 let mut b = Vec::with_capacity(5);
1163 b.push(4);
1164 b.extend_from_slice(&7i32.to_le_bytes());
1165 Bytes::from(b)
1166 },
1167 }),
1168 PendingRow::Raw(RawRow {
1169 data: {
1170 let mut b = Vec::with_capacity(5);
1171 b.push(4);
1172 b.extend_from_slice(&11i32.to_le_bytes());
1173 Bytes::from(b)
1174 },
1175 }),
1176 ];
1177
1178 #[cfg(feature = "always-encrypted")]
1179 let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1180 #[cfg(not(feature = "always-encrypted"))]
1181 let mut rs = ResultSet::from_raw(columns, pending, meta);
1182
1183 assert_eq!(rs.rows_remaining(), 2);
1184 assert!(!rs.is_empty());
1185
1186 let row1 = rs.next_row().expect("row present").expect("decodes");
1187 assert_eq!(row1.get::<i32>(0).unwrap(), 7);
1188
1189 let row2 = rs.next_row().expect("row present").expect("decodes");
1190 assert_eq!(row2.get::<i32>(0).unwrap(), 11);
1191
1192 assert!(rs.next_row().is_none());
1193 assert!(rs.is_empty());
1194 }
1195
1196 #[test]
1200 fn test_result_set_lazy_decode_error_propagates() {
1201 use bytes::Bytes;
1202 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1203 use tds_protocol::types::TypeId;
1204
1205 let meta = ColMetaData {
1207 columns: vec![ColumnData {
1208 name: "a".to_string(),
1209 type_id: TypeId::Int4,
1210 col_type: 0x38,
1211 flags: 0x00,
1212 user_type: 0,
1213 type_info: TypeInfo {
1214 max_length: Some(4),
1215 precision: None,
1216 scale: None,
1217 collation: None,
1218 },
1219 crypto_metadata: None,
1220 }],
1221 cek_table: None,
1222 };
1223 let columns = vec![Column {
1224 name: "a".to_string(),
1225 index: 0,
1226 type_name: "INT".to_string(),
1227 nullable: false,
1228 max_length: Some(4),
1229 precision: None,
1230 scale: None,
1231 collation: None,
1232 }];
1233
1234 let pending = vec![PendingRow::Raw(RawRow {
1235 data: Bytes::from(vec![0x01u8, 0x02]),
1236 })];
1237
1238 #[cfg(feature = "always-encrypted")]
1239 let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1240 #[cfg(not(feature = "always-encrypted"))]
1241 let mut rs = ResultSet::from_raw(columns, pending, meta);
1242
1243 let first = rs.next_row().expect("pending row present");
1244 assert!(
1245 first.is_err(),
1246 "truncated bytes must surface a decode error"
1247 );
1248 assert!(rs.next_row().is_none());
1249 }
1250
1251 #[test]
1255 fn test_result_set_lazy_collect_all_success_and_error() {
1256 use bytes::Bytes;
1257 use tds_protocol::token::RawRow;
1258
1259 let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
1261 let pending_ok = vec![
1262 PendingRow::Raw(RawRow {
1263 data: {
1264 let mut b = Vec::with_capacity(5);
1265 b.push(4);
1266 b.extend_from_slice(&10i32.to_le_bytes());
1267 Bytes::from(b)
1268 },
1269 }),
1270 PendingRow::Raw(RawRow {
1271 data: {
1272 let mut b = Vec::with_capacity(5);
1273 b.push(4);
1274 b.extend_from_slice(&20i32.to_le_bytes());
1275 Bytes::from(b)
1276 },
1277 }),
1278 ];
1279
1280 #[cfg(feature = "always-encrypted")]
1281 let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
1282 #[cfg(not(feature = "always-encrypted"))]
1283 let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
1284 let rows = rs_ok.collect_all().expect("all rows decode");
1285 assert_eq!(rows.len(), 2);
1286 assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
1287 assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
1288 assert!(rs_ok.is_empty());
1289
1290 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1293 use tds_protocol::types::TypeId;
1294 let meta_err = ColMetaData {
1295 columns: vec![ColumnData {
1296 name: "a".to_string(),
1297 type_id: TypeId::Int4,
1298 col_type: 0x38,
1299 flags: 0x00,
1300 user_type: 0,
1301 type_info: TypeInfo {
1302 max_length: Some(4),
1303 precision: None,
1304 scale: None,
1305 collation: None,
1306 },
1307 crypto_metadata: None,
1308 }],
1309 cek_table: None,
1310 };
1311 let cols_err = vec![Column {
1312 name: "a".to_string(),
1313 index: 0,
1314 type_name: "INT".to_string(),
1315 nullable: false,
1316 max_length: Some(4),
1317 precision: None,
1318 scale: None,
1319 collation: None,
1320 }];
1321 let pending_err = vec![PendingRow::Raw(RawRow {
1322 data: Bytes::from(vec![0x01u8, 0x02]),
1323 })];
1324
1325 #[cfg(feature = "always-encrypted")]
1326 let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
1327 #[cfg(not(feature = "always-encrypted"))]
1328 let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
1329 let err = rs_err.collect_all();
1330 assert!(err.is_err(), "collect_all must propagate decode error");
1331 }
1332
1333 #[tokio::test]
1337 async fn test_multi_result_stream_lazy_decode_across_result_sets() {
1338 use bytes::Bytes;
1339 use tds_protocol::token::RawRow;
1340
1341 let (meta1, cols1) = intn_meta_and_columns("a", false);
1342 let pending1 = vec![PendingRow::Raw(RawRow {
1343 data: {
1344 let mut b = Vec::with_capacity(5);
1345 b.push(4);
1346 b.extend_from_slice(&101i32.to_le_bytes());
1347 Bytes::from(b)
1348 },
1349 })];
1350 #[cfg(feature = "always-encrypted")]
1351 let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
1352 #[cfg(not(feature = "always-encrypted"))]
1353 let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
1354
1355 let (meta2, cols2) = intn_meta_and_columns("b", false);
1356 let pending2 = vec![PendingRow::Raw(RawRow {
1357 data: {
1358 let mut b = Vec::with_capacity(5);
1359 b.push(4);
1360 b.extend_from_slice(&202i32.to_le_bytes());
1361 Bytes::from(b)
1362 },
1363 })];
1364 #[cfg(feature = "always-encrypted")]
1365 let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
1366 #[cfg(not(feature = "always-encrypted"))]
1367 let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
1368
1369 let mut stream = MultiResultStream::new(vec![rs1, rs2]);
1370 assert_eq!(stream.result_count(), 2);
1371 assert_eq!(stream.current_result_index(), 0);
1372
1373 let row = stream
1374 .next_row()
1375 .await
1376 .expect("first row success")
1377 .expect("row present");
1378 assert_eq!(row.get::<i32>(0).unwrap(), 101);
1379 assert!(stream.next_row().await.expect("no more rows").is_none());
1380
1381 assert!(stream.has_more_results());
1382 assert!(stream.next_result().await.expect("advance ok"));
1383 assert_eq!(stream.current_result_index(), 1);
1384
1385 let row = stream
1386 .next_row()
1387 .await
1388 .expect("second row success")
1389 .expect("row present");
1390 assert_eq!(row.get::<i32>(0).unwrap(), 202);
1391 }
1392}