1use std::collections::VecDeque;
36use std::pin::Pin;
37use std::sync::Arc;
38use std::task::{Context, Poll};
39
40use futures_core::Stream;
41use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
42
43use crate::error::Error;
44use crate::row::{Column, Row};
45
46#[derive(Debug, Clone)]
53pub(crate) enum PendingRow {
54 Parsed(Row),
56 Raw(RawRow),
58 Nbc(NbcRow),
60}
61
62#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
88pub struct QueryStream<'a> {
89 row_meta: Arc<crate::row::ColMetaData>,
91 rows: VecDeque<PendingRow>,
93 meta: Option<ColMetaData>,
96 #[cfg(feature = "always-encrypted")]
101 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
102 finished: bool,
104 _marker: std::marker::PhantomData<&'a ()>,
106}
107
108impl QueryStream<'_> {
109 #[cfg(test)]
116 pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
117 Self {
118 row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
119 rows: rows.into_iter().map(PendingRow::Parsed).collect(),
120 meta: None,
121 #[cfg(feature = "always-encrypted")]
122 decryptor: None,
123 finished: false,
124 _marker: std::marker::PhantomData,
125 }
126 }
127
128 pub(crate) fn from_raw(
134 columns: Vec<Column>,
135 pending: Vec<PendingRow>,
136 meta: ColMetaData,
137 #[cfg(feature = "always-encrypted")] decryptor: Option<
138 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
139 >,
140 ) -> Self {
141 Self {
142 row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
143 rows: pending.into(),
144 meta: Some(meta),
145 #[cfg(feature = "always-encrypted")]
146 decryptor,
147 finished: false,
148 _marker: std::marker::PhantomData,
149 }
150 }
151
152 #[allow(dead_code)]
154 pub(crate) fn empty() -> Self {
155 Self {
156 row_meta: Arc::new(crate::row::ColMetaData::new(Vec::new())),
157 rows: VecDeque::new(),
158 meta: None,
159 #[cfg(feature = "always-encrypted")]
160 decryptor: None,
161 finished: true,
162 _marker: std::marker::PhantomData,
163 }
164 }
165
166 #[must_use]
168 pub fn columns(&self) -> &[Column] {
169 &self.row_meta.columns
170 }
171
172 #[must_use]
174 pub fn is_finished(&self) -> bool {
175 self.finished
176 }
177
178 #[must_use]
180 pub fn rows_remaining(&self) -> usize {
181 self.rows.len()
182 }
183
184 pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
194 let mut out = Vec::with_capacity(self.rows.len());
195 while let Some(pending) = self.rows.pop_front() {
196 out.push(self.decode(pending)?);
197 }
198 self.finished = true;
199 Ok(out)
200 }
201
202 pub fn try_next(&mut self) -> Option<Row> {
208 self.next().and_then(|r| r.ok())
209 }
210
211 fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
213 match pending {
214 PendingRow::Parsed(row) => Ok(row),
215 PendingRow::Raw(raw) => {
216 let meta = self
217 .meta
218 .as_ref()
219 .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
220 #[cfg(feature = "always-encrypted")]
221 if let Some(ref dec) = self.decryptor {
222 return crate::column_parser::convert_raw_row_decrypted(
223 &raw,
224 meta,
225 &self.row_meta,
226 dec,
227 );
228 }
229 crate::column_parser::convert_raw_row(&raw, meta, &self.row_meta)
230 }
231 PendingRow::Nbc(nbc) => {
232 let meta = self
233 .meta
234 .as_ref()
235 .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
236 #[cfg(feature = "always-encrypted")]
237 if let Some(ref dec) = self.decryptor {
238 return crate::column_parser::convert_nbc_row_decrypted(
239 &nbc,
240 meta,
241 &self.row_meta,
242 dec,
243 );
244 }
245 crate::column_parser::convert_nbc_row(&nbc, meta, &self.row_meta)
246 }
247 }
248 }
249}
250
251impl Stream for QueryStream<'_> {
252 type Item = Result<Row, Error>;
253
254 fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
255 let this = self.get_mut();
256
257 if this.finished {
258 return Poll::Ready(None);
259 }
260
261 match this.rows.pop_front() {
262 Some(pending) => Poll::Ready(Some(this.decode(pending))),
263 None => {
264 this.finished = true;
265 Poll::Ready(None)
266 }
267 }
268 }
269}
270
271impl ExactSizeIterator for QueryStream<'_> {}
272
273impl Iterator for QueryStream<'_> {
274 type Item = Result<Row, Error>;
275
276 fn next(&mut self) -> Option<Self::Item> {
277 if self.finished {
278 return None;
279 }
280
281 match self.rows.pop_front() {
282 Some(pending) => Some(self.decode(pending)),
283 None => {
284 self.finished = true;
285 None
286 }
287 }
288 }
289
290 fn size_hint(&self) -> (usize, Option<usize>) {
291 let remaining = self.rows.len();
292 (remaining, Some(remaining))
293 }
294}
295
296#[derive(Debug, Clone)]
300#[non_exhaustive]
301#[must_use]
302pub struct ExecuteResult {
303 pub rows_affected: u64,
305 pub output_params: Vec<OutputParam>,
307}
308
309#[derive(Debug, Clone)]
311#[non_exhaustive]
312pub struct OutputParam {
313 pub name: String,
315 pub value: mssql_types::SqlValue,
317}
318
319impl ExecuteResult {
320 pub fn new(rows_affected: u64) -> Self {
322 Self {
323 rows_affected,
324 output_params: Vec::new(),
325 }
326 }
327
328 pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
330 Self {
331 rows_affected,
332 output_params,
333 }
334 }
335
336 #[must_use]
338 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
339 self.output_params
340 .iter()
341 .find(|p| p.name.eq_ignore_ascii_case(name))
342 }
343}
344
345#[derive(Debug, Clone)]
370#[non_exhaustive]
371#[must_use]
372pub struct ProcedureResult {
373 pub return_value: i32,
378 pub rows_affected: u64,
380 pub output_params: Vec<OutputParam>,
382 pub result_sets: Vec<ResultSet>,
384}
385
386impl ProcedureResult {
387 pub(crate) fn new() -> Self {
389 Self {
390 return_value: 0,
391 rows_affected: 0,
392 output_params: Vec::new(),
393 result_sets: Vec::new(),
394 }
395 }
396
397 #[must_use]
402 pub fn get_return_value(&self) -> i32 {
403 self.return_value
404 }
405
406 #[must_use]
429 pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
430 let search = name.strip_prefix('@').unwrap_or(name);
431 self.output_params.iter().find(|p| {
432 let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
433 stored.eq_ignore_ascii_case(search)
434 })
435 }
436
437 #[must_use]
441 pub fn first_result_set(&self) -> Option<&ResultSet> {
442 self.result_sets.first()
443 }
444
445 #[must_use]
447 pub fn has_result_sets(&self) -> bool {
448 !self.result_sets.is_empty()
449 }
450}
451
452#[derive(Debug, Clone)]
461#[must_use]
462pub struct ResultSet {
463 row_meta: Arc<crate::row::ColMetaData>,
465 pending_rows: VecDeque<PendingRow>,
467 meta: Option<ColMetaData>,
470 #[cfg(feature = "always-encrypted")]
475 decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
476}
477
478impl ResultSet {
479 pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
485 Self {
486 row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
487 pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
488 meta: None,
489 #[cfg(feature = "always-encrypted")]
490 decryptor: None,
491 }
492 }
493
494 pub(crate) fn from_raw(
501 columns: Vec<Column>,
502 pending: Vec<PendingRow>,
503 meta: ColMetaData,
504 #[cfg(feature = "always-encrypted")] decryptor: Option<
505 std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
506 >,
507 ) -> Self {
508 Self {
509 row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
510 pending_rows: pending.into(),
511 meta: Some(meta),
512 #[cfg(feature = "always-encrypted")]
513 decryptor,
514 }
515 }
516
517 #[must_use]
519 pub fn columns(&self) -> &[Column] {
520 &self.row_meta.columns
521 }
522
523 #[must_use]
525 pub fn rows_remaining(&self) -> usize {
526 self.pending_rows.len()
527 }
528
529 pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
535 self.pending_rows.pop_front().map(|p| self.decode(p))
536 }
537
538 #[must_use]
540 pub fn is_empty(&self) -> bool {
541 self.pending_rows.is_empty()
542 }
543
544 pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
548 let mut out = Vec::with_capacity(self.pending_rows.len());
549 while let Some(pending) = self.pending_rows.pop_front() {
550 out.push(self.decode(pending)?);
551 }
552 Ok(out)
553 }
554
555 fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
557 match pending {
558 PendingRow::Parsed(row) => Ok(row),
559 PendingRow::Raw(raw) => {
560 let meta = self
561 .meta
562 .as_ref()
563 .ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
564 #[cfg(feature = "always-encrypted")]
565 if let Some(ref dec) = self.decryptor {
566 return crate::column_parser::convert_raw_row_decrypted(
567 &raw,
568 meta,
569 &self.row_meta,
570 dec,
571 );
572 }
573 crate::column_parser::convert_raw_row(&raw, meta, &self.row_meta)
574 }
575 PendingRow::Nbc(nbc) => {
576 let meta = self
577 .meta
578 .as_ref()
579 .ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
580 #[cfg(feature = "always-encrypted")]
581 if let Some(ref dec) = self.decryptor {
582 return crate::column_parser::convert_nbc_row_decrypted(
583 &nbc,
584 meta,
585 &self.row_meta,
586 dec,
587 );
588 }
589 crate::column_parser::convert_nbc_row(&nbc, meta, &self.row_meta)
590 }
591 }
592 }
593
594 fn into_query_stream<'a>(self) -> QueryStream<'a> {
600 QueryStream {
601 row_meta: self.row_meta,
602 rows: self.pending_rows,
603 meta: self.meta,
604 #[cfg(feature = "always-encrypted")]
605 decryptor: self.decryptor,
606 finished: false,
607 _marker: std::marker::PhantomData,
608 }
609 }
610}
611
612#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
639pub struct MultiResultStream<'a> {
640 result_sets: Vec<ResultSet>,
642 current_result: usize,
644 _marker: std::marker::PhantomData<&'a ()>,
646}
647
648impl<'a> MultiResultStream<'a> {
649 pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
651 Self {
652 result_sets,
653 current_result: 0,
654 _marker: std::marker::PhantomData,
655 }
656 }
657
658 #[allow(dead_code)]
660 pub(crate) fn empty() -> Self {
661 Self {
662 result_sets: Vec::new(),
663 current_result: 0,
664 _marker: std::marker::PhantomData,
665 }
666 }
667
668 #[must_use]
670 pub fn current_result_index(&self) -> usize {
671 self.current_result
672 }
673
674 #[must_use]
676 pub fn result_count(&self) -> usize {
677 self.result_sets.len()
678 }
679
680 #[must_use]
682 pub fn has_more_results(&self) -> bool {
683 self.current_result + 1 < self.result_sets.len()
684 }
685
686 #[must_use]
690 pub fn columns(&self) -> Option<&[Column]> {
691 self.result_sets
692 .get(self.current_result)
693 .map(|rs| rs.columns())
694 }
695
696 pub async fn next_result(&mut self) -> Result<bool, Error> {
700 if self.current_result + 1 < self.result_sets.len() {
701 self.current_result += 1;
702 Ok(true)
703 } else {
704 Ok(false)
705 }
706 }
707
708 pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
717 if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
718 result_set.next_row().transpose()
719 } else {
720 Ok(None)
721 }
722 }
723
724 #[must_use]
726 pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
727 self.result_sets.get_mut(self.current_result)
728 }
729
730 pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
736 match self.result_sets.get_mut(self.current_result) {
737 Some(rs) => rs.collect_all(),
738 None => Ok(Vec::new()),
739 }
740 }
741
742 pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
744 self.result_sets
745 .into_iter()
746 .map(ResultSet::into_query_stream)
747 .collect()
748 }
749}
750
751#[cfg(test)]
752#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
753mod tests {
754 use super::*;
755
756 #[test]
757 fn test_execute_result() {
758 let result = ExecuteResult::new(42);
759 assert_eq!(result.rows_affected, 42);
760 assert!(result.output_params.is_empty());
761 }
762
763 #[test]
764 fn test_procedure_result_defaults() {
765 let result = ProcedureResult::new();
766 assert_eq!(result.return_value, 0);
767 assert_eq!(result.rows_affected, 0);
768 assert!(result.output_params.is_empty());
769 assert!(result.result_sets.is_empty());
770 assert!(!result.has_result_sets());
771 assert!(result.first_result_set().is_none());
772 }
773
774 #[test]
775 fn test_procedure_result_get_output() {
776 let mut result = ProcedureResult::new();
777 result.output_params.push(OutputParam {
778 name: "@Total".to_string(),
779 value: mssql_types::SqlValue::Int(42),
780 });
781 result.output_params.push(OutputParam {
782 name: "@Message".to_string(),
783 value: mssql_types::SqlValue::String("ok".to_string()),
784 });
785
786 assert!(result.get_output("@Total").is_some());
788 assert!(result.get_output("@total").is_some());
789 assert!(result.get_output("@TOTAL").is_some());
790
791 assert!(result.get_output("Total").is_some());
793 assert!(result.get_output("total").is_some());
794
795 assert!(result.get_output("@NotHere").is_none());
797 assert!(result.get_output("NotHere").is_none());
798 }
799
800 #[test]
801 fn test_procedure_result_with_result_sets() {
802 use mssql_types::SqlValue;
803
804 let columns = vec![Column {
805 name: "id".to_string(),
806 index: 0,
807 type_name: "INT".to_string(),
808 nullable: false,
809 max_length: Some(4),
810 precision: None,
811 scale: None,
812 collation: None,
813 }];
814 let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
815 let rs = ResultSet::new(columns, rows);
816
817 let mut result = ProcedureResult::new();
818 result.result_sets.push(rs);
819 result.return_value = 7;
820 result.rows_affected = 5;
821
822 assert!(result.has_result_sets());
823 assert_eq!(result.get_return_value(), 7);
824 assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
825 }
826
827 #[test]
828 fn test_execute_result_with_outputs() {
829 let outputs = vec![OutputParam {
830 name: "ReturnValue".to_string(),
831 value: mssql_types::SqlValue::Int(100),
832 }];
833
834 let result = ExecuteResult::with_outputs(10, outputs);
835 assert_eq!(result.rows_affected, 10);
836 assert!(result.get_output("ReturnValue").is_some());
837 assert!(result.get_output("returnvalue").is_some()); assert!(result.get_output("NotFound").is_none());
839 }
840
841 #[test]
842 fn test_query_stream_columns() {
843 let columns = vec![Column {
844 name: "id".to_string(),
845 index: 0,
846 type_name: "INT".to_string(),
847 nullable: false,
848 max_length: Some(4),
849 precision: Some(0),
850 scale: Some(0),
851 collation: None,
852 }];
853
854 let stream = QueryStream::new(columns, Vec::new());
855 assert_eq!(stream.columns().len(), 1);
856 assert_eq!(stream.columns()[0].name, "id");
857 assert!(!stream.is_finished());
858 }
859
860 #[test]
861 fn test_query_stream_with_rows() {
862 use mssql_types::SqlValue;
863
864 let columns = vec![
865 Column {
866 name: "id".to_string(),
867 index: 0,
868 type_name: "INT".to_string(),
869 nullable: false,
870 max_length: Some(4),
871 precision: None,
872 scale: None,
873 collation: None,
874 },
875 Column {
876 name: "name".to_string(),
877 index: 1,
878 type_name: "NVARCHAR".to_string(),
879 nullable: true,
880 max_length: Some(100),
881 precision: None,
882 scale: None,
883 collation: None,
884 },
885 ];
886
887 let rows = vec![
888 Row::from_values(
889 columns.clone(),
890 vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
891 ),
892 Row::from_values(
893 columns.clone(),
894 vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
895 ),
896 ];
897
898 let mut stream = QueryStream::new(columns, rows);
899 assert_eq!(stream.columns().len(), 2);
900 assert_eq!(stream.rows_remaining(), 2);
901 assert!(!stream.is_finished());
902
903 let row1 = stream.try_next().unwrap();
905 assert_eq!(row1.get::<i32>(0).unwrap(), 1);
906 assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
907
908 let row2 = stream.try_next().unwrap();
910 assert_eq!(row2.get::<i32>(0).unwrap(), 2);
911 assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
912
913 assert!(stream.try_next().is_none());
915 assert!(stream.is_finished());
916 }
917
918 #[test]
919 fn test_query_stream_iterator() {
920 use mssql_types::SqlValue;
921
922 let columns = vec![Column {
923 name: "val".to_string(),
924 index: 0,
925 type_name: "INT".to_string(),
926 nullable: false,
927 max_length: None,
928 precision: None,
929 scale: None,
930 collation: None,
931 }];
932
933 let rows = vec![
934 Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
935 Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
936 Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
937 ];
938
939 let mut stream = QueryStream::new(columns, rows);
940
941 let values: Vec<i32> = stream
945 .by_ref()
946 .map(|r| r.unwrap().get::<i32>(0).unwrap())
947 .collect();
948
949 assert_eq!(values, vec![10, 20, 30]);
950 assert!(stream.is_finished());
951 }
952
953 #[test]
954 fn test_query_stream_empty() {
955 let stream = QueryStream::empty();
956 assert!(stream.columns().is_empty());
957 assert_eq!(stream.rows_remaining(), 0);
958 assert!(stream.is_finished());
959 }
960
961 #[test]
966 fn test_query_stream_lazy_raw_row_decoding() {
967 use bytes::Bytes;
968 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
969 use tds_protocol::types::TypeId;
970
971 let mut data = Vec::new();
973 data.push(4); data.extend_from_slice(&42i32.to_le_bytes());
975 data.push(0); let meta = ColMetaData {
978 columns: vec![
979 ColumnData {
980 name: "a".to_string(),
981 type_id: TypeId::IntN,
982 col_type: 0x26,
983 flags: 0x00,
984 user_type: 0,
985 type_info: TypeInfo {
986 max_length: Some(4),
987 precision: None,
988 scale: None,
989 collation: None,
990 },
991 crypto_metadata: None,
992 },
993 ColumnData {
994 name: "b".to_string(),
995 type_id: TypeId::IntN,
996 col_type: 0x26,
997 flags: 0x01,
998 user_type: 0,
999 type_info: TypeInfo {
1000 max_length: Some(4),
1001 precision: None,
1002 scale: None,
1003 collation: None,
1004 },
1005 crypto_metadata: None,
1006 },
1007 ],
1008 cek_table: None,
1009 };
1010
1011 let columns = vec![
1012 Column {
1013 name: "a".to_string(),
1014 index: 0,
1015 type_name: "INT".to_string(),
1016 nullable: false,
1017 max_length: Some(4),
1018 precision: None,
1019 scale: None,
1020 collation: None,
1021 },
1022 Column {
1023 name: "b".to_string(),
1024 index: 1,
1025 type_name: "INT".to_string(),
1026 nullable: true,
1027 max_length: Some(4),
1028 precision: None,
1029 scale: None,
1030 collation: None,
1031 },
1032 ];
1033
1034 let pending = vec![PendingRow::Raw(RawRow {
1035 data: Bytes::from(data),
1036 })];
1037
1038 #[cfg(feature = "always-encrypted")]
1039 let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1040 #[cfg(not(feature = "always-encrypted"))]
1041 let mut stream = QueryStream::from_raw(columns, pending, meta);
1042
1043 assert_eq!(stream.rows_remaining(), 1);
1044 let row = stream
1045 .next()
1046 .expect("one row pending")
1047 .expect("row decoded successfully");
1048 assert_eq!(row.get::<i32>(0).unwrap(), 42);
1049 assert!(row.is_null(1));
1050 assert!(stream.next().is_none());
1051 assert!(stream.is_finished());
1052 }
1053
1054 #[test]
1058 fn test_query_stream_lazy_decode_error_propagates() {
1059 use bytes::Bytes;
1060 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1061 use tds_protocol::types::TypeId;
1062
1063 let data = vec![0x01u8, 0x02];
1065
1066 let meta = ColMetaData {
1067 columns: vec![ColumnData {
1068 name: "a".to_string(),
1069 type_id: TypeId::Int4,
1070 col_type: 0x38,
1071 flags: 0x00,
1072 user_type: 0,
1073 type_info: TypeInfo {
1074 max_length: Some(4),
1075 precision: None,
1076 scale: None,
1077 collation: None,
1078 },
1079 crypto_metadata: None,
1080 }],
1081 cek_table: None,
1082 };
1083
1084 let columns = vec![Column {
1085 name: "a".to_string(),
1086 index: 0,
1087 type_name: "INT".to_string(),
1088 nullable: false,
1089 max_length: Some(4),
1090 precision: None,
1091 scale: None,
1092 collation: None,
1093 }];
1094
1095 let pending = vec![PendingRow::Raw(RawRow {
1096 data: Bytes::from(data),
1097 })];
1098
1099 #[cfg(feature = "always-encrypted")]
1100 let mut stream = QueryStream::from_raw(columns, pending, meta, None);
1101 #[cfg(not(feature = "always-encrypted"))]
1102 let mut stream = QueryStream::from_raw(columns, pending, meta);
1103
1104 let item = stream.next().expect("pending row present");
1105 assert!(item.is_err(), "truncated bytes must surface a decode error");
1106 assert!(stream.next().is_none());
1107 }
1108
1109 #[cfg(test)]
1112 fn intn_meta_and_columns(
1113 col_name: &str,
1114 nullable: bool,
1115 ) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
1116 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1117 use tds_protocol::types::TypeId;
1118 (
1119 ColMetaData {
1120 columns: vec![ColumnData {
1121 name: col_name.to_string(),
1122 type_id: TypeId::IntN,
1123 col_type: 0x26,
1124 flags: if nullable { 0x01 } else { 0x00 },
1125 user_type: 0,
1126 type_info: TypeInfo {
1127 max_length: Some(4),
1128 precision: None,
1129 scale: None,
1130 collation: None,
1131 },
1132 crypto_metadata: None,
1133 }],
1134 cek_table: None,
1135 },
1136 vec![Column {
1137 name: col_name.to_string(),
1138 index: 0,
1139 type_name: "INT".to_string(),
1140 nullable,
1141 max_length: Some(4),
1142 precision: None,
1143 scale: None,
1144 collation: None,
1145 }],
1146 )
1147 }
1148
1149 #[test]
1153 fn test_result_set_lazy_raw_row_decoding() {
1154 use bytes::Bytes;
1155 use tds_protocol::token::RawRow;
1156
1157 let (meta, columns) = intn_meta_and_columns("a", false);
1158
1159 let pending = vec![
1161 PendingRow::Raw(RawRow {
1162 data: {
1163 let mut b = Vec::with_capacity(5);
1164 b.push(4);
1165 b.extend_from_slice(&7i32.to_le_bytes());
1166 Bytes::from(b)
1167 },
1168 }),
1169 PendingRow::Raw(RawRow {
1170 data: {
1171 let mut b = Vec::with_capacity(5);
1172 b.push(4);
1173 b.extend_from_slice(&11i32.to_le_bytes());
1174 Bytes::from(b)
1175 },
1176 }),
1177 ];
1178
1179 #[cfg(feature = "always-encrypted")]
1180 let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1181 #[cfg(not(feature = "always-encrypted"))]
1182 let mut rs = ResultSet::from_raw(columns, pending, meta);
1183
1184 assert_eq!(rs.rows_remaining(), 2);
1185 assert!(!rs.is_empty());
1186
1187 let row1 = rs.next_row().expect("row present").expect("decodes");
1188 assert_eq!(row1.get::<i32>(0).unwrap(), 7);
1189
1190 let row2 = rs.next_row().expect("row present").expect("decodes");
1191 assert_eq!(row2.get::<i32>(0).unwrap(), 11);
1192
1193 assert!(rs.next_row().is_none());
1194 assert!(rs.is_empty());
1195 }
1196
1197 #[test]
1201 fn test_result_set_lazy_decode_error_propagates() {
1202 use bytes::Bytes;
1203 use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
1204 use tds_protocol::types::TypeId;
1205
1206 let meta = ColMetaData {
1208 columns: vec![ColumnData {
1209 name: "a".to_string(),
1210 type_id: TypeId::Int4,
1211 col_type: 0x38,
1212 flags: 0x00,
1213 user_type: 0,
1214 type_info: TypeInfo {
1215 max_length: Some(4),
1216 precision: None,
1217 scale: None,
1218 collation: None,
1219 },
1220 crypto_metadata: None,
1221 }],
1222 cek_table: None,
1223 };
1224 let columns = vec![Column {
1225 name: "a".to_string(),
1226 index: 0,
1227 type_name: "INT".to_string(),
1228 nullable: false,
1229 max_length: Some(4),
1230 precision: None,
1231 scale: None,
1232 collation: None,
1233 }];
1234
1235 let pending = vec![PendingRow::Raw(RawRow {
1236 data: Bytes::from(vec![0x01u8, 0x02]),
1237 })];
1238
1239 #[cfg(feature = "always-encrypted")]
1240 let mut rs = ResultSet::from_raw(columns, pending, meta, None);
1241 #[cfg(not(feature = "always-encrypted"))]
1242 let mut rs = ResultSet::from_raw(columns, pending, meta);
1243
1244 let first = rs.next_row().expect("pending row present");
1245 assert!(
1246 first.is_err(),
1247 "truncated bytes must surface a decode error"
1248 );
1249 assert!(rs.next_row().is_none());
1250 }
1251
1252 #[test]
1256 fn test_result_set_lazy_collect_all_success_and_error() {
1257 use bytes::Bytes;
1258 use tds_protocol::token::RawRow;
1259
1260 let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
1262 let pending_ok = vec![
1263 PendingRow::Raw(RawRow {
1264 data: {
1265 let mut b = Vec::with_capacity(5);
1266 b.push(4);
1267 b.extend_from_slice(&10i32.to_le_bytes());
1268 Bytes::from(b)
1269 },
1270 }),
1271 PendingRow::Raw(RawRow {
1272 data: {
1273 let mut b = Vec::with_capacity(5);
1274 b.push(4);
1275 b.extend_from_slice(&20i32.to_le_bytes());
1276 Bytes::from(b)
1277 },
1278 }),
1279 ];
1280
1281 #[cfg(feature = "always-encrypted")]
1282 let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
1283 #[cfg(not(feature = "always-encrypted"))]
1284 let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
1285 let rows = rs_ok.collect_all().expect("all rows decode");
1286 assert_eq!(rows.len(), 2);
1287 assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
1288 assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
1289 assert!(rs_ok.is_empty());
1290
1291 use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
1294 use tds_protocol::types::TypeId;
1295 let meta_err = ColMetaData {
1296 columns: vec![ColumnData {
1297 name: "a".to_string(),
1298 type_id: TypeId::Int4,
1299 col_type: 0x38,
1300 flags: 0x00,
1301 user_type: 0,
1302 type_info: TypeInfo {
1303 max_length: Some(4),
1304 precision: None,
1305 scale: None,
1306 collation: None,
1307 },
1308 crypto_metadata: None,
1309 }],
1310 cek_table: None,
1311 };
1312 let cols_err = vec![Column {
1313 name: "a".to_string(),
1314 index: 0,
1315 type_name: "INT".to_string(),
1316 nullable: false,
1317 max_length: Some(4),
1318 precision: None,
1319 scale: None,
1320 collation: None,
1321 }];
1322 let pending_err = vec![PendingRow::Raw(RawRow {
1323 data: Bytes::from(vec![0x01u8, 0x02]),
1324 })];
1325
1326 #[cfg(feature = "always-encrypted")]
1327 let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
1328 #[cfg(not(feature = "always-encrypted"))]
1329 let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
1330 let err = rs_err.collect_all();
1331 assert!(err.is_err(), "collect_all must propagate decode error");
1332 }
1333
1334 #[tokio::test]
1338 async fn test_multi_result_stream_lazy_decode_across_result_sets() {
1339 use bytes::Bytes;
1340 use tds_protocol::token::RawRow;
1341
1342 let (meta1, cols1) = intn_meta_and_columns("a", false);
1343 let pending1 = vec![PendingRow::Raw(RawRow {
1344 data: {
1345 let mut b = Vec::with_capacity(5);
1346 b.push(4);
1347 b.extend_from_slice(&101i32.to_le_bytes());
1348 Bytes::from(b)
1349 },
1350 })];
1351 #[cfg(feature = "always-encrypted")]
1352 let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
1353 #[cfg(not(feature = "always-encrypted"))]
1354 let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
1355
1356 let (meta2, cols2) = intn_meta_and_columns("b", false);
1357 let pending2 = vec![PendingRow::Raw(RawRow {
1358 data: {
1359 let mut b = Vec::with_capacity(5);
1360 b.push(4);
1361 b.extend_from_slice(&202i32.to_le_bytes());
1362 Bytes::from(b)
1363 },
1364 })];
1365 #[cfg(feature = "always-encrypted")]
1366 let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
1367 #[cfg(not(feature = "always-encrypted"))]
1368 let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
1369
1370 let mut stream = MultiResultStream::new(vec![rs1, rs2]);
1371 assert_eq!(stream.result_count(), 2);
1372 assert_eq!(stream.current_result_index(), 0);
1373
1374 let row = stream
1375 .next_row()
1376 .await
1377 .expect("first row success")
1378 .expect("row present");
1379 assert_eq!(row.get::<i32>(0).unwrap(), 101);
1380 assert!(stream.next_row().await.expect("no more rows").is_none());
1381
1382 assert!(stream.has_more_results());
1383 assert!(stream.next_result().await.expect("advance ok"));
1384 assert_eq!(stream.current_result_index(), 1);
1385
1386 let row = stream
1387 .next_row()
1388 .await
1389 .expect("second row success")
1390 .expect("row present");
1391 assert_eq!(row.get::<i32>(0).unwrap(), 202);
1392 }
1393}