1use std::collections::VecDeque;
33use std::pin::Pin;
34use std::task::{Context, Poll};
35
36use futures_core::Stream;
37use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
38
39use crate::error::Error;
40use crate::row::{Column, Row};
41
42#[derive(Debug, Clone)]
49pub(crate) enum PendingRow {
50 Parsed(Row),
52 Raw(RawRow),
54 Nbc(NbcRow),
56}
57
58#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
76pub struct QueryStream<'a> {
77 columns: Vec<Column>,
79 rows: VecDeque<PendingRow>,
81 meta: Option<ColMetaData>,
84 #[cfg(feature = "always-encrypted")]
89 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
90 finished: bool,
92 _marker: std::marker::PhantomData<&'a ()>,
94}
95
96impl QueryStream<'_> {
97 #[cfg(test)]
104 pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
105 Self {
106 columns,
107 rows: rows.into_iter().map(PendingRow::Parsed).collect(),
108 meta: None,
109 #[cfg(feature = "always-encrypted")]
110 decryptor: None,
111 finished: false,
112 _marker: std::marker::PhantomData,
113 }
114 }
115
116 pub(crate) fn from_raw(
122 columns: Vec<Column>,
123 pending: Vec<PendingRow>,
124 meta: ColMetaData,
125 #[cfg(feature = "always-encrypted")] decryptor: Option<
126 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
127 >,
128 ) -> Self {
129 Self {
130 columns,
131 rows: pending.into(),
132 meta: Some(meta),
133 #[cfg(feature = "always-encrypted")]
134 decryptor,
135 finished: false,
136 _marker: std::marker::PhantomData,
137 }
138 }
139
140 #[allow(dead_code)]
142 pub(crate) fn empty() -> Self {
143 Self {
144 columns: Vec::new(),
145 rows: VecDeque::new(),
146 meta: None,
147 #[cfg(feature = "always-encrypted")]
148 decryptor: None,
149 finished: true,
150 _marker: std::marker::PhantomData,
151 }
152 }
153
154 #[must_use]
156 pub fn columns(&self) -> &[Column] {
157 &self.columns
158 }
159
160 #[must_use]
162 pub fn is_finished(&self) -> bool {
163 self.finished
164 }
165
166 #[must_use]
168 pub fn rows_remaining(&self) -> usize {
169 self.rows.len()
170 }
171
172 pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
182 let mut out = Vec::with_capacity(self.rows.len());
183 while let Some(pending) = self.rows.pop_front() {
184 out.push(self.decode(pending)?);
185 }
186 self.finished = true;
187 Ok(out)
188 }
189
190 pub fn try_next(&mut self) -> Option<Row> {
196 self.next().and_then(|r| r.ok())
197 }
198
199 fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
201 match pending {
202 PendingRow::Parsed(row) => Ok(row),
203 PendingRow::Raw(raw) => {
204 let meta = self
205 .meta
206 .as_ref()
207 .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
208 #[cfg(feature = "always-encrypted")]
209 if let Some(ref dec) = self.decryptor {
210 return crate::column_parser::convert_raw_row_decrypted(
211 &raw,
212 meta,
213 &self.columns,
214 dec,
215 );
216 }
217 crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
218 }
219 PendingRow::Nbc(nbc) => {
220 let meta = self
221 .meta
222 .as_ref()
223 .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
224 #[cfg(feature = "always-encrypted")]
225 if let Some(ref dec) = self.decryptor {
226 return crate::column_parser::convert_nbc_row_decrypted(
227 &nbc,
228 meta,
229 &self.columns,
230 dec,
231 );
232 }
233 crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
234 }
235 }
236 }
237}
238
239impl Stream for QueryStream<'_> {
240 type Item = Result<Row, Error>;
241
242 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
243 let this = self.get_mut();
244
245 if this.finished {
246 return Poll::Ready(None);
247 }
248
249 match this.rows.pop_front() {
250 Some(pending) => Poll::Ready(Some(this.decode(pending))),
251 None => {
252 this.finished = true;
253 Poll::Ready(None)
254 }
255 }
256 }
257}
258
259impl ExactSizeIterator for QueryStream<'_> {}
260
261impl Iterator for QueryStream<'_> {
262 type Item = Result<Row, Error>;
263
264 fn next(&mut self) -> Option<Self::Item> {
265 if self.finished {
266 return None;
267 }
268
269 match self.rows.pop_front() {
270 Some(pending) => Some(self.decode(pending)),
271 None => {
272 self.finished = true;
273 None
274 }
275 }
276 }
277
278 fn size_hint(&self) -> (usize, Option<usize>) {
279 let remaining = self.rows.len();
280 (remaining, Some(remaining))
281 }
282}
283
284#[derive(Debug, Clone)]
288#[non_exhaustive]
289#[must_use]
290pub struct ExecuteResult {
291 pub rows_affected: u64,
293 pub output_params: Vec<OutputParam>,
295}
296
297#[derive(Debug, Clone)]
299#[non_exhaustive]
300pub struct OutputParam {
301 pub name: String,
303 pub value: mssql_types::SqlValue,
305}
306
307impl ExecuteResult {
308 pub fn new(rows_affected: u64) -> Self {
310 Self {
311 rows_affected,
312 output_params: Vec::new(),
313 }
314 }
315
316 pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
318 Self {
319 rows_affected,
320 output_params,
321 }
322 }
323
324 #[must_use]
326 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
327 self.output_params
328 .iter()
329 .find(|p| p.name.eq_ignore_ascii_case(name))
330 }
331}
332
333#[derive(Debug, Clone)]
355#[non_exhaustive]
356#[must_use]
357pub struct ProcedureResult {
358 pub return_value: i32,
363 pub rows_affected: u64,
365 pub output_params: Vec<OutputParam>,
367 pub result_sets: Vec<ResultSet>,
369}
370
371impl ProcedureResult {
372 pub(crate) fn new() -> Self {
374 Self {
375 return_value: 0,
376 rows_affected: 0,
377 output_params: Vec::new(),
378 result_sets: Vec::new(),
379 }
380 }
381
382 #[must_use]
387 pub fn get_return_value(&self) -> i32 {
388 self.return_value
389 }
390
391 #[must_use]
410 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
411 let search = name.strip_prefix('@').unwrap_or(name);
412 self.output_params.iter().find(|p| {
413 let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
414 stored.eq_ignore_ascii_case(search)
415 })
416 }
417
418 #[must_use]
422 pub fn first_result_set(&self) -> Option<&ResultSet> {
423 self.result_sets.first()
424 }
425
426 #[must_use]
428 pub fn has_result_sets(&self) -> bool {
429 !self.result_sets.is_empty()
430 }
431}
432
433#[derive(Debug, Clone)]
442#[must_use]
443pub struct ResultSet {
444 columns: Vec<Column>,
446 pending_rows: VecDeque<PendingRow>,
448 meta: Option<ColMetaData>,
451 #[cfg(feature = "always-encrypted")]
456 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
457}
458
459impl ResultSet {
460 pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
466 Self {
467 columns,
468 pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
469 meta: None,
470 #[cfg(feature = "always-encrypted")]
471 decryptor: None,
472 }
473 }
474
475 pub(crate) fn from_raw(
482 columns: Vec<Column>,
483 pending: Vec<PendingRow>,
484 meta: ColMetaData,
485 #[cfg(feature = "always-encrypted")] decryptor: Option<
486 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
487 >,
488 ) -> Self {
489 Self {
490 columns,
491 pending_rows: pending.into(),
492 meta: Some(meta),
493 #[cfg(feature = "always-encrypted")]
494 decryptor,
495 }
496 }
497
498 #[must_use]
500 pub fn columns(&self) -> &[Column] {
501 &self.columns
502 }
503
504 #[must_use]
506 pub fn rows_remaining(&self) -> usize {
507 self.pending_rows.len()
508 }
509
510 pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
516 self.pending_rows.pop_front().map(|p| self.decode(p))
517 }
518
519 #[must_use]
521 pub fn is_empty(&self) -> bool {
522 self.pending_rows.is_empty()
523 }
524
525 pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
529 let mut out = Vec::with_capacity(self.pending_rows.len());
530 while let Some(pending) = self.pending_rows.pop_front() {
531 out.push(self.decode(pending)?);
532 }
533 Ok(out)
534 }
535
536 fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
538 match pending {
539 PendingRow::Parsed(row) => Ok(row),
540 PendingRow::Raw(raw) => {
541 let meta = self
542 .meta
543 .as_ref()
544 .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
545 #[cfg(feature = "always-encrypted")]
546 if let Some(ref dec) = self.decryptor {
547 return crate::column_parser::convert_raw_row_decrypted(
548 &raw,
549 meta,
550 &self.columns,
551 dec,
552 );
553 }
554 crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
555 }
556 PendingRow::Nbc(nbc) => {
557 let meta = self
558 .meta
559 .as_ref()
560 .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
561 #[cfg(feature = "always-encrypted")]
562 if let Some(ref dec) = self.decryptor {
563 return crate::column_parser::convert_nbc_row_decrypted(
564 &nbc,
565 meta,
566 &self.columns,
567 dec,
568 );
569 }
570 crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
571 }
572 }
573 }
574
575 fn into_query_stream<'a>(self) -> QueryStream<'a> {
581 QueryStream {
582 columns: self.columns,
583 rows: self.pending_rows,
584 meta: self.meta,
585 #[cfg(feature = "always-encrypted")]
586 decryptor: self.decryptor,
587 finished: false,
588 _marker: std::marker::PhantomData,
589 }
590 }
591}
592
593#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
617pub struct MultiResultStream<'a> {
618 result_sets: Vec<ResultSet>,
620 current_result: usize,
622 _marker: std::marker::PhantomData<&'a ()>,
624}
625
626impl<'a> MultiResultStream<'a> {
627 pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
629 Self {
630 result_sets,
631 current_result: 0,
632 _marker: std::marker::PhantomData,
633 }
634 }
635
636 #[allow(dead_code)]
638 pub(crate) fn empty() -> Self {
639 Self {
640 result_sets: Vec::new(),
641 current_result: 0,
642 _marker: std::marker::PhantomData,
643 }
644 }
645
646 #[must_use]
648 pub fn current_result_index(&self) -> usize {
649 self.current_result
650 }
651
652 #[must_use]
654 pub fn result_count(&self) -> usize {
655 self.result_sets.len()
656 }
657
658 #[must_use]
660 pub fn has_more_results(&self) -> bool {
661 self.current_result + 1 < self.result_sets.len()
662 }
663
664 #[must_use]
668 pub fn columns(&self) -> Option<&[Column]> {
669 self.result_sets
670 .get(self.current_result)
671 .map(|rs| rs.columns())
672 }
673
674 pub async fn next_result(&mut self) -> Result<bool, Error> {
678 if self.current_result + 1 < self.result_sets.len() {
679 self.current_result += 1;
680 Ok(true)
681 } else {
682 Ok(false)
683 }
684 }
685
686 pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
695 if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
696 result_set.next_row().transpose()
697 } else {
698 Ok(None)
699 }
700 }
701
702 #[must_use]
704 pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
705 self.result_sets.get_mut(self.current_result)
706 }
707
708 pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
714 match self.result_sets.get_mut(self.current_result) {
715 Some(rs) => rs.collect_all(),
716 None => Ok(Vec::new()),
717 }
718 }
719
720 pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
722 self.result_sets
723 .into_iter()
724 .map(ResultSet::into_query_stream)
725 .collect()
726 }
727}
728
729#[cfg(test)]
730#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
731mod tests {
732 use super::*;
733
734 #[test]
735 fn test_execute_result() {
736 let result = ExecuteResult::new(42);
737 assert_eq!(result.rows_affected, 42);
738 assert!(result.output_params.is_empty());
739 }
740
741 #[test]
742 fn test_procedure_result_defaults() {
743 let result = ProcedureResult::new();
744 assert_eq!(result.return_value, 0);
745 assert_eq!(result.rows_affected, 0);
746 assert!(result.output_params.is_empty());
747 assert!(result.result_sets.is_empty());
748 assert!(!result.has_result_sets());
749 assert!(result.first_result_set().is_none());
750 }
751
752 #[test]
753 fn test_procedure_result_get_output() {
754 let mut result = ProcedureResult::new();
755 result.output_params.push(OutputParam {
756 name: "@Total".to_string(),
757 value: mssql_types::SqlValue::Int(42),
758 });
759 result.output_params.push(OutputParam {
760 name: "@Message".to_string(),
761 value: mssql_types::SqlValue::String("ok".to_string()),
762 });
763
764 assert!(result.get_output("@Total").is_some());
766 assert!(result.get_output("@total").is_some());
767 assert!(result.get_output("@TOTAL").is_some());
768
769 assert!(result.get_output("Total").is_some());
771 assert!(result.get_output("total").is_some());
772
773 assert!(result.get_output("@NotHere").is_none());
775 assert!(result.get_output("NotHere").is_none());
776 }
777
778 #[test]
779 fn test_procedure_result_with_result_sets() {
780 use mssql_types::SqlValue;
781
782 let columns = vec![Column {
783 name: "id".to_string(),
784 index: 0,
785 type_name: "INT".to_string(),
786 nullable: false,
787 max_length: Some(4),
788 precision: None,
789 scale: None,
790 collation: None,
791 }];
792 let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
793 let rs = ResultSet::new(columns, rows);
794
795 let mut result = ProcedureResult::new();
796 result.result_sets.push(rs);
797 result.return_value = 7;
798 result.rows_affected = 5;
799
800 assert!(result.has_result_sets());
801 assert_eq!(result.get_return_value(), 7);
802 assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
803 }
804
805 #[test]
806 fn test_execute_result_with_outputs() {
807 let outputs = vec![OutputParam {
808 name: "ReturnValue".to_string(),
809 value: mssql_types::SqlValue::Int(100),
810 }];
811
812 let result = ExecuteResult::with_outputs(10, outputs);
813 assert_eq!(result.rows_affected, 10);
814 assert!(result.get_output("ReturnValue").is_some());
815 assert!(result.get_output("returnvalue").is_some()); assert!(result.get_output("NotFound").is_none());
817 }
818
819 #[test]
820 fn test_query_stream_columns() {
821 let columns = vec![Column {
822 name: "id".to_string(),
823 index: 0,
824 type_name: "INT".to_string(),
825 nullable: false,
826 max_length: Some(4),
827 precision: Some(0),
828 scale: Some(0),
829 collation: None,
830 }];
831
832 let stream = QueryStream::new(columns, Vec::new());
833 assert_eq!(stream.columns().len(), 1);
834 assert_eq!(stream.columns()[0].name, "id");
835 assert!(!stream.is_finished());
836 }
837
838 #[test]
839 fn test_query_stream_with_rows() {
840 use mssql_types::SqlValue;
841
842 let columns = vec![
843 Column {
844 name: "id".to_string(),
845 index: 0,
846 type_name: "INT".to_string(),
847 nullable: false,
848 max_length: Some(4),
849 precision: None,
850 scale: None,
851 collation: None,
852 },
853 Column {
854 name: "name".to_string(),
855 index: 1,
856 type_name: "NVARCHAR".to_string(),
857 nullable: true,
858 max_length: Some(100),
859 precision: None,
860 scale: None,
861 collation: None,
862 },
863 ];
864
865 let rows = vec![
866 Row::from_values(
867 columns.clone(),
868 vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
869 ),
870 Row::from_values(
871 columns.clone(),
872 vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
873 ),
874 ];
875
876 let mut stream = QueryStream::new(columns, rows);
877 assert_eq!(stream.columns().len(), 2);
878 assert_eq!(stream.rows_remaining(), 2);
879 assert!(!stream.is_finished());
880
881 let row1 = stream.try_next().unwrap();
883 assert_eq!(row1.get::<i32>(0).unwrap(), 1);
884 assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
885
886 let row2 = stream.try_next().unwrap();
888 assert_eq!(row2.get::<i32>(0).unwrap(), 2);
889 assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
890
891 assert!(stream.try_next().is_none());
893 assert!(stream.is_finished());
894 }
895
896 #[test]
897 fn test_query_stream_iterator() {
898 use mssql_types::SqlValue;
899
900 let columns = vec![Column {
901 name: "val".to_string(),
902 index: 0,
903 type_name: "INT".to_string(),
904 nullable: false,
905 max_length: None,
906 precision: None,
907 scale: None,
908 collation: None,
909 }];
910
911 let rows = vec![
912 Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
913 Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
914 Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
915 ];
916
917 let mut stream = QueryStream::new(columns, rows);
918
919 let values: Vec<i32> = stream
923 .by_ref()
924 .map(|r| r.unwrap().get::<i32>(0).unwrap())
925 .collect();
926
927 assert_eq!(values, vec![10, 20, 30]);
928 assert!(stream.is_finished());
929 }
930
931 #[test]
932 fn test_query_stream_empty() {
933 let stream = QueryStream::empty();
934 assert!(stream.columns().is_empty());
935 assert_eq!(stream.rows_remaining(), 0);
936 assert!(stream.is_finished());
937 }
938
939 #[test]
944 fn test_query_stream_lazy_raw_row_decoding() {
945 use bytes::Bytes;
946 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
947 use tds_protocol::types::TypeId;
948
949 let mut data = Vec::new();
951 data.push(4); data.extend_from_slice(&42i32.to_le_bytes());
953 data.push(0); let meta = ColMetaData {
956 columns: vec![
957 ColumnData {
958 name: "a".to_string(),
959 type_id: TypeId::IntN,
960 col_type: 0x26,
961 flags: 0x00,
962 user_type: 0,
963 type_info: TypeInfo {
964 max_length: Some(4),
965 precision: None,
966 scale: None,
967 collation: None,
968 },
969 crypto_metadata: None,
970 },
971 ColumnData {
972 name: "b".to_string(),
973 type_id: TypeId::IntN,
974 col_type: 0x26,
975 flags: 0x01,
976 user_type: 0,
977 type_info: TypeInfo {
978 max_length: Some(4),
979 precision: None,
980 scale: None,
981 collation: None,
982 },
983 crypto_metadata: None,
984 },
985 ],
986 cek_table: None,
987 };
988
989 let columns = vec![
990 Column {
991 name: "a".to_string(),
992 index: 0,
993 type_name: "INT".to_string(),
994 nullable: false,
995 max_length: Some(4),
996 precision: None,
997 scale: None,
998 collation: None,
999 },
1000 Column {
1001 name: "b".to_string(),
1002 index: 1,
1003 type_name: "INT".to_string(),
1004 nullable: true,
1005 max_length: Some(4),
1006 precision: None,
1007 scale: None,
1008 collation: None,
1009 },
1010 ];
1011
1012 let pending = vec![PendingRow::Raw(RawRow {
1013 data: Bytes::from(data),
1014 })];
1015
1016 #[cfg(feature = "always-encrypted")]
1017 let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1018 #[cfg(not(feature = "always-encrypted"))]
1019 let mut stream = QueryStream::from_raw(columns, pending, meta);
1020
1021 assert_eq!(stream.rows_remaining(), 1);
1022 let row = stream
1023 .next()
1024 .expect("one row pending")
1025 .expect("row decoded successfully");
1026 assert_eq!(row.get::<i32>(0).unwrap(), 42);
1027 assert!(row.is_null(1));
1028 assert!(stream.next().is_none());
1029 assert!(stream.is_finished());
1030 }
1031
1032 #[test]
1036 fn test_query_stream_lazy_decode_error_propagates() {
1037 use bytes::Bytes;
1038 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1039 use tds_protocol::types::TypeId;
1040
1041 let data = vec![0x01u8, 0x02];
1043
1044 let meta = ColMetaData {
1045 columns: vec![ColumnData {
1046 name: "a".to_string(),
1047 type_id: TypeId::Int4,
1048 col_type: 0x38,
1049 flags: 0x00,
1050 user_type: 0,
1051 type_info: TypeInfo {
1052 max_length: Some(4),
1053 precision: None,
1054 scale: None,
1055 collation: None,
1056 },
1057 crypto_metadata: None,
1058 }],
1059 cek_table: None,
1060 };
1061
1062 let columns = vec![Column {
1063 name: "a".to_string(),
1064 index: 0,
1065 type_name: "INT".to_string(),
1066 nullable: false,
1067 max_length: Some(4),
1068 precision: None,
1069 scale: None,
1070 collation: None,
1071 }];
1072
1073 let pending = vec![PendingRow::Raw(RawRow {
1074 data: Bytes::from(data),
1075 })];
1076
1077 #[cfg(feature = "always-encrypted")]
1078 let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1079 #[cfg(not(feature = "always-encrypted"))]
1080 let mut stream = QueryStream::from_raw(columns, pending, meta);
1081
1082 let item = stream.next().expect("pending row present");
1083 assert!(item.is_err(), "truncated bytes must surface a decode error");
1084 assert!(stream.next().is_none());
1085 }
1086
1087 #[cfg(test)]
1090 fn intn_meta_and_columns(
1091 col_name: &str,
1092 nullable: bool,
1093 ) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
1094 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1095 use tds_protocol::types::TypeId;
1096 (
1097 ColMetaData {
1098 columns: vec![ColumnData {
1099 name: col_name.to_string(),
1100 type_id: TypeId::IntN,
1101 col_type: 0x26,
1102 flags: if nullable { 0x01 } else { 0x00 },
1103 user_type: 0,
1104 type_info: TypeInfo {
1105 max_length: Some(4),
1106 precision: None,
1107 scale: None,
1108 collation: None,
1109 },
1110 crypto_metadata: None,
1111 }],
1112 cek_table: None,
1113 },
1114 vec![Column {
1115 name: col_name.to_string(),
1116 index: 0,
1117 type_name: "INT".to_string(),
1118 nullable,
1119 max_length: Some(4),
1120 precision: None,
1121 scale: None,
1122 collation: None,
1123 }],
1124 )
1125 }
1126
1127 #[test]
1131 fn test_result_set_lazy_raw_row_decoding() {
1132 use bytes::Bytes;
1133 use tds_protocol::token::RawRow;
1134
1135 let (meta, columns) = intn_meta_and_columns("a", false);
1136
1137 let pending = vec![
1139 PendingRow::Raw(RawRow {
1140 data: {
1141 let mut b = Vec::with_capacity(5);
1142 b.push(4);
1143 b.extend_from_slice(&7i32.to_le_bytes());
1144 Bytes::from(b)
1145 },
1146 }),
1147 PendingRow::Raw(RawRow {
1148 data: {
1149 let mut b = Vec::with_capacity(5);
1150 b.push(4);
1151 b.extend_from_slice(&11i32.to_le_bytes());
1152 Bytes::from(b)
1153 },
1154 }),
1155 ];
1156
1157 #[cfg(feature = "always-encrypted")]
1158 let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1159 #[cfg(not(feature = "always-encrypted"))]
1160 let mut rs = ResultSet::from_raw(columns, pending, meta);
1161
1162 assert_eq!(rs.rows_remaining(), 2);
1163 assert!(!rs.is_empty());
1164
1165 let row1 = rs.next_row().expect("row present").expect("decodes");
1166 assert_eq!(row1.get::<i32>(0).unwrap(), 7);
1167
1168 let row2 = rs.next_row().expect("row present").expect("decodes");
1169 assert_eq!(row2.get::<i32>(0).unwrap(), 11);
1170
1171 assert!(rs.next_row().is_none());
1172 assert!(rs.is_empty());
1173 }
1174
1175 #[test]
1179 fn test_result_set_lazy_decode_error_propagates() {
1180 use bytes::Bytes;
1181 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1182 use tds_protocol::types::TypeId;
1183
1184 let meta = ColMetaData {
1186 columns: vec![ColumnData {
1187 name: "a".to_string(),
1188 type_id: TypeId::Int4,
1189 col_type: 0x38,
1190 flags: 0x00,
1191 user_type: 0,
1192 type_info: TypeInfo {
1193 max_length: Some(4),
1194 precision: None,
1195 scale: None,
1196 collation: None,
1197 },
1198 crypto_metadata: None,
1199 }],
1200 cek_table: None,
1201 };
1202 let columns = vec![Column {
1203 name: "a".to_string(),
1204 index: 0,
1205 type_name: "INT".to_string(),
1206 nullable: false,
1207 max_length: Some(4),
1208 precision: None,
1209 scale: None,
1210 collation: None,
1211 }];
1212
1213 let pending = vec![PendingRow::Raw(RawRow {
1214 data: Bytes::from(vec![0x01u8, 0x02]),
1215 })];
1216
1217 #[cfg(feature = "always-encrypted")]
1218 let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1219 #[cfg(not(feature = "always-encrypted"))]
1220 let mut rs = ResultSet::from_raw(columns, pending, meta);
1221
1222 let first = rs.next_row().expect("pending row present");
1223 assert!(
1224 first.is_err(),
1225 "truncated bytes must surface a decode error"
1226 );
1227 assert!(rs.next_row().is_none());
1228 }
1229
1230 #[test]
1234 fn test_result_set_lazy_collect_all_success_and_error() {
1235 use bytes::Bytes;
1236 use tds_protocol::token::RawRow;
1237
1238 let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
1240 let pending_ok = vec![
1241 PendingRow::Raw(RawRow {
1242 data: {
1243 let mut b = Vec::with_capacity(5);
1244 b.push(4);
1245 b.extend_from_slice(&10i32.to_le_bytes());
1246 Bytes::from(b)
1247 },
1248 }),
1249 PendingRow::Raw(RawRow {
1250 data: {
1251 let mut b = Vec::with_capacity(5);
1252 b.push(4);
1253 b.extend_from_slice(&20i32.to_le_bytes());
1254 Bytes::from(b)
1255 },
1256 }),
1257 ];
1258
1259 #[cfg(feature = "always-encrypted")]
1260 let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
1261 #[cfg(not(feature = "always-encrypted"))]
1262 let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
1263 let rows = rs_ok.collect_all().expect("all rows decode");
1264 assert_eq!(rows.len(), 2);
1265 assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
1266 assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
1267 assert!(rs_ok.is_empty());
1268
1269 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1272 use tds_protocol::types::TypeId;
1273 let meta_err = ColMetaData {
1274 columns: vec![ColumnData {
1275 name: "a".to_string(),
1276 type_id: TypeId::Int4,
1277 col_type: 0x38,
1278 flags: 0x00,
1279 user_type: 0,
1280 type_info: TypeInfo {
1281 max_length: Some(4),
1282 precision: None,
1283 scale: None,
1284 collation: None,
1285 },
1286 crypto_metadata: None,
1287 }],
1288 cek_table: None,
1289 };
1290 let cols_err = vec![Column {
1291 name: "a".to_string(),
1292 index: 0,
1293 type_name: "INT".to_string(),
1294 nullable: false,
1295 max_length: Some(4),
1296 precision: None,
1297 scale: None,
1298 collation: None,
1299 }];
1300 let pending_err = vec![PendingRow::Raw(RawRow {
1301 data: Bytes::from(vec![0x01u8, 0x02]),
1302 })];
1303
1304 #[cfg(feature = "always-encrypted")]
1305 let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
1306 #[cfg(not(feature = "always-encrypted"))]
1307 let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
1308 let err = rs_err.collect_all();
1309 assert!(err.is_err(), "collect_all must propagate decode error");
1310 }
1311
1312 #[tokio::test]
1316 async fn test_multi_result_stream_lazy_decode_across_result_sets() {
1317 use bytes::Bytes;
1318 use tds_protocol::token::RawRow;
1319
1320 let (meta1, cols1) = intn_meta_and_columns("a", false);
1321 let pending1 = vec![PendingRow::Raw(RawRow {
1322 data: {
1323 let mut b = Vec::with_capacity(5);
1324 b.push(4);
1325 b.extend_from_slice(&101i32.to_le_bytes());
1326 Bytes::from(b)
1327 },
1328 })];
1329 #[cfg(feature = "always-encrypted")]
1330 let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
1331 #[cfg(not(feature = "always-encrypted"))]
1332 let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
1333
1334 let (meta2, cols2) = intn_meta_and_columns("b", false);
1335 let pending2 = vec![PendingRow::Raw(RawRow {
1336 data: {
1337 let mut b = Vec::with_capacity(5);
1338 b.push(4);
1339 b.extend_from_slice(&202i32.to_le_bytes());
1340 Bytes::from(b)
1341 },
1342 })];
1343 #[cfg(feature = "always-encrypted")]
1344 let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
1345 #[cfg(not(feature = "always-encrypted"))]
1346 let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
1347
1348 let mut stream = MultiResultStream::new(vec![rs1, rs2]);
1349 assert_eq!(stream.result_count(), 2);
1350 assert_eq!(stream.current_result_index(), 0);
1351
1352 let row = stream
1353 .next_row()
1354 .await
1355 .expect("first row success")
1356 .expect("row present");
1357 assert_eq!(row.get::<i32>(0).unwrap(), 101);
1358 assert!(stream.next_row().await.expect("no more rows").is_none());
1359
1360 assert!(stream.has_more_results());
1361 assert!(stream.next_result().await.expect("advance ok"));
1362 assert_eq!(stream.current_result_index(), 1);
1363
1364 let row = stream
1365 .next_row()
1366 .await
1367 .expect("second row success")
1368 .expect("row present");
1369 assert_eq!(row.get::<i32>(0).unwrap(), 202);
1370 }
1371}