use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use tds_protocol::token::{ColMetaData, NbcRow, RawRow};
use crate::error::Error;
use crate::row::{Column, Row};
#[derive(Debug, Clone)]
pub(crate) enum PendingRow {
Parsed(Row),
Raw(RawRow),
Nbc(NbcRow),
}
#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
pub struct QueryStream<'a> {
columns: Vec<Column>,
rows: VecDeque<PendingRow>,
meta: Option<ColMetaData>,
#[cfg(feature = "always-encrypted")]
decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
finished: bool,
_marker: std::marker::PhantomData<&'a ()>,
}
impl QueryStream<'_> {
#[cfg(test)]
pub(crate) fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
Self {
columns,
rows: rows.into_iter().map(PendingRow::Parsed).collect(),
meta: None,
#[cfg(feature = "always-encrypted")]
decryptor: None,
finished: false,
_marker: std::marker::PhantomData,
}
}
pub(crate) fn from_raw(
columns: Vec<Column>,
pending: Vec<PendingRow>,
meta: ColMetaData,
#[cfg(feature = "always-encrypted")] decryptor: Option<
std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
>,
) -> Self {
Self {
columns,
rows: pending.into(),
meta: Some(meta),
#[cfg(feature = "always-encrypted")]
decryptor,
finished: false,
_marker: std::marker::PhantomData,
}
}
#[allow(dead_code)]
pub(crate) fn empty() -> Self {
Self {
columns: Vec::new(),
rows: VecDeque::new(),
meta: None,
#[cfg(feature = "always-encrypted")]
decryptor: None,
finished: true,
_marker: std::marker::PhantomData,
}
}
#[must_use]
pub fn columns(&self) -> &[Column] {
&self.columns
}
#[must_use]
pub fn is_finished(&self) -> bool {
self.finished
}
#[must_use]
pub fn rows_remaining(&self) -> usize {
self.rows.len()
}
pub async fn collect_all(mut self) -> Result<Vec<Row>, Error> {
let mut out = Vec::with_capacity(self.rows.len());
while let Some(pending) = self.rows.pop_front() {
out.push(self.decode(pending)?);
}
self.finished = true;
Ok(out)
}
pub fn try_next(&mut self) -> Option<Row> {
self.next().and_then(|r| r.ok())
}
fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
match pending {
PendingRow::Parsed(row) => Ok(row),
PendingRow::Raw(raw) => {
let meta = self
.meta
.as_ref()
.ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
#[cfg(feature = "always-encrypted")]
if let Some(ref dec) = self.decryptor {
return crate::column_parser::convert_raw_row_decrypted(
&raw,
meta,
&self.columns,
dec,
);
}
crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
}
PendingRow::Nbc(nbc) => {
let meta = self
.meta
.as_ref()
.ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
#[cfg(feature = "always-encrypted")]
if let Some(ref dec) = self.decryptor {
return crate::column_parser::convert_nbc_row_decrypted(
&nbc,
meta,
&self.columns,
dec,
);
}
crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
}
}
}
}
impl Stream for QueryStream<'_> {
type Item = Result<Row, Error>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.finished {
return Poll::Ready(None);
}
match this.rows.pop_front() {
Some(pending) => Poll::Ready(Some(this.decode(pending))),
None => {
this.finished = true;
Poll::Ready(None)
}
}
}
}
impl ExactSizeIterator for QueryStream<'_> {}
impl Iterator for QueryStream<'_> {
type Item = Result<Row, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
match self.rows.pop_front() {
Some(pending) => Some(self.decode(pending)),
None => {
self.finished = true;
None
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.rows.len();
(remaining, Some(remaining))
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
#[must_use]
pub struct ExecuteResult {
pub rows_affected: u64,
pub output_params: Vec<OutputParam>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct OutputParam {
pub name: String,
pub value: mssql_types::SqlValue,
}
impl ExecuteResult {
pub fn new(rows_affected: u64) -> Self {
Self {
rows_affected,
output_params: Vec::new(),
}
}
pub fn with_outputs(rows_affected: u64, output_params: Vec<OutputParam>) -> Self {
Self {
rows_affected,
output_params,
}
}
#[must_use]
pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
self.output_params
.iter()
.find(|p| p.name.eq_ignore_ascii_case(name))
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
#[must_use]
pub struct ProcedureResult {
pub return_value: i32,
pub rows_affected: u64,
pub output_params: Vec<OutputParam>,
pub result_sets: Vec<ResultSet>,
}
impl ProcedureResult {
pub(crate) fn new() -> Self {
Self {
return_value: 0,
rows_affected: 0,
output_params: Vec::new(),
result_sets: Vec::new(),
}
}
#[must_use]
pub fn get_return_value(&self) -> i32 {
self.return_value
}
#[must_use]
pub fn get_output(&self, name: &str) -> Option<&OutputParam> {
let search = name.strip_prefix('@').unwrap_or(name);
self.output_params.iter().find(|p| {
let stored = p.name.strip_prefix('@').unwrap_or(&p.name);
stored.eq_ignore_ascii_case(search)
})
}
#[must_use]
pub fn first_result_set(&self) -> Option<&ResultSet> {
self.result_sets.first()
}
#[must_use]
pub fn has_result_sets(&self) -> bool {
!self.result_sets.is_empty()
}
}
#[derive(Debug, Clone)]
#[must_use]
pub struct ResultSet {
columns: Vec<Column>,
pending_rows: VecDeque<PendingRow>,
meta: Option<ColMetaData>,
#[cfg(feature = "always-encrypted")]
decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
}
impl ResultSet {
pub fn new(columns: Vec<Column>, rows: Vec<Row>) -> Self {
Self {
columns,
pending_rows: rows.into_iter().map(PendingRow::Parsed).collect(),
meta: None,
#[cfg(feature = "always-encrypted")]
decryptor: None,
}
}
pub(crate) fn from_raw(
columns: Vec<Column>,
pending: Vec<PendingRow>,
meta: ColMetaData,
#[cfg(feature = "always-encrypted")] decryptor: Option<
std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
>,
) -> Self {
Self {
columns,
pending_rows: pending.into(),
meta: Some(meta),
#[cfg(feature = "always-encrypted")]
decryptor,
}
}
#[must_use]
pub fn columns(&self) -> &[Column] {
&self.columns
}
#[must_use]
pub fn rows_remaining(&self) -> usize {
self.pending_rows.len()
}
pub fn next_row(&mut self) -> Option<Result<Row, Error>> {
self.pending_rows.pop_front().map(|p| self.decode(p))
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.pending_rows.is_empty()
}
pub fn collect_all(&mut self) -> Result<Vec<Row>, Error> {
let mut out = Vec::with_capacity(self.pending_rows.len());
while let Some(pending) = self.pending_rows.pop_front() {
out.push(self.decode(pending)?);
}
Ok(out)
}
fn decode(&self, pending: PendingRow) -> Result<Row, Error> {
match pending {
PendingRow::Parsed(row) => Ok(row),
PendingRow::Raw(raw) => {
let meta = self
.meta
.as_ref()
.ok_or_else(|| Error::Protocol("row metadata missing for raw row".into()))?;
#[cfg(feature = "always-encrypted")]
if let Some(ref dec) = self.decryptor {
return crate::column_parser::convert_raw_row_decrypted(
&raw,
meta,
&self.columns,
dec,
);
}
crate::column_parser::convert_raw_row(&raw, meta, &self.columns)
}
PendingRow::Nbc(nbc) => {
let meta = self
.meta
.as_ref()
.ok_or_else(|| Error::Protocol("row metadata missing for NBC row".into()))?;
#[cfg(feature = "always-encrypted")]
if let Some(ref dec) = self.decryptor {
return crate::column_parser::convert_nbc_row_decrypted(
&nbc,
meta,
&self.columns,
dec,
);
}
crate::column_parser::convert_nbc_row(&nbc, meta, &self.columns)
}
}
}
fn into_query_stream<'a>(self) -> QueryStream<'a> {
QueryStream {
columns: self.columns,
rows: self.pending_rows,
meta: self.meta,
#[cfg(feature = "always-encrypted")]
decryptor: self.decryptor,
finished: false,
_marker: std::marker::PhantomData,
}
}
}
#[must_use = "streams must be consumed; dropping a stream discards remaining results"]
pub struct MultiResultStream<'a> {
result_sets: Vec<ResultSet>,
current_result: usize,
_marker: std::marker::PhantomData<&'a ()>,
}
impl<'a> MultiResultStream<'a> {
pub(crate) fn new(result_sets: Vec<ResultSet>) -> Self {
Self {
result_sets,
current_result: 0,
_marker: std::marker::PhantomData,
}
}
#[allow(dead_code)]
pub(crate) fn empty() -> Self {
Self {
result_sets: Vec::new(),
current_result: 0,
_marker: std::marker::PhantomData,
}
}
#[must_use]
pub fn current_result_index(&self) -> usize {
self.current_result
}
#[must_use]
pub fn result_count(&self) -> usize {
self.result_sets.len()
}
#[must_use]
pub fn has_more_results(&self) -> bool {
self.current_result + 1 < self.result_sets.len()
}
#[must_use]
pub fn columns(&self) -> Option<&[Column]> {
self.result_sets
.get(self.current_result)
.map(|rs| rs.columns())
}
pub async fn next_result(&mut self) -> Result<bool, Error> {
if self.current_result + 1 < self.result_sets.len() {
self.current_result += 1;
Ok(true)
} else {
Ok(false)
}
}
pub async fn next_row(&mut self) -> Result<Option<Row>, Error> {
if let Some(result_set) = self.result_sets.get_mut(self.current_result) {
result_set.next_row().transpose()
} else {
Ok(None)
}
}
#[must_use]
pub fn current_result_set(&mut self) -> Option<&mut ResultSet> {
self.result_sets.get_mut(self.current_result)
}
pub fn collect_current(&mut self) -> Result<Vec<Row>, Error> {
match self.result_sets.get_mut(self.current_result) {
Some(rs) => rs.collect_all(),
None => Ok(Vec::new()),
}
}
pub fn into_query_streams(self) -> Vec<QueryStream<'a>> {
self.result_sets
.into_iter()
.map(ResultSet::into_query_stream)
.collect()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_execute_result() {
let result = ExecuteResult::new(42);
assert_eq!(result.rows_affected, 42);
assert!(result.output_params.is_empty());
}
#[test]
fn test_procedure_result_defaults() {
let result = ProcedureResult::new();
assert_eq!(result.return_value, 0);
assert_eq!(result.rows_affected, 0);
assert!(result.output_params.is_empty());
assert!(result.result_sets.is_empty());
assert!(!result.has_result_sets());
assert!(result.first_result_set().is_none());
}
#[test]
fn test_procedure_result_get_output() {
let mut result = ProcedureResult::new();
result.output_params.push(OutputParam {
name: "@Total".to_string(),
value: mssql_types::SqlValue::Int(42),
});
result.output_params.push(OutputParam {
name: "@Message".to_string(),
value: mssql_types::SqlValue::String("ok".to_string()),
});
assert!(result.get_output("@Total").is_some());
assert!(result.get_output("@total").is_some());
assert!(result.get_output("@TOTAL").is_some());
assert!(result.get_output("Total").is_some());
assert!(result.get_output("total").is_some());
assert!(result.get_output("@NotHere").is_none());
assert!(result.get_output("NotHere").is_none());
}
#[test]
fn test_procedure_result_with_result_sets() {
use mssql_types::SqlValue;
let columns = vec![Column {
name: "id".to_string(),
index: 0,
type_name: "INT".to_string(),
nullable: false,
max_length: Some(4),
precision: None,
scale: None,
collation: None,
}];
let rows = vec![Row::from_values(columns.clone(), vec![SqlValue::Int(1)])];
let rs = ResultSet::new(columns, rows);
let mut result = ProcedureResult::new();
result.result_sets.push(rs);
result.return_value = 7;
result.rows_affected = 5;
assert!(result.has_result_sets());
assert_eq!(result.get_return_value(), 7);
assert_eq!(result.first_result_set().unwrap().columns().len(), 1);
}
#[test]
fn test_execute_result_with_outputs() {
let outputs = vec![OutputParam {
name: "ReturnValue".to_string(),
value: mssql_types::SqlValue::Int(100),
}];
let result = ExecuteResult::with_outputs(10, outputs);
assert_eq!(result.rows_affected, 10);
assert!(result.get_output("ReturnValue").is_some());
assert!(result.get_output("returnvalue").is_some()); assert!(result.get_output("NotFound").is_none());
}
#[test]
fn test_query_stream_columns() {
let columns = vec![Column {
name: "id".to_string(),
index: 0,
type_name: "INT".to_string(),
nullable: false,
max_length: Some(4),
precision: Some(0),
scale: Some(0),
collation: None,
}];
let stream = QueryStream::new(columns, Vec::new());
assert_eq!(stream.columns().len(), 1);
assert_eq!(stream.columns()[0].name, "id");
assert!(!stream.is_finished());
}
#[test]
fn test_query_stream_with_rows() {
use mssql_types::SqlValue;
let columns = vec![
Column {
name: "id".to_string(),
index: 0,
type_name: "INT".to_string(),
nullable: false,
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
Column {
name: "name".to_string(),
index: 1,
type_name: "NVARCHAR".to_string(),
nullable: true,
max_length: Some(100),
precision: None,
scale: None,
collation: None,
},
];
let rows = vec![
Row::from_values(
columns.clone(),
vec![SqlValue::Int(1), SqlValue::String("Alice".to_string())],
),
Row::from_values(
columns.clone(),
vec![SqlValue::Int(2), SqlValue::String("Bob".to_string())],
),
];
let mut stream = QueryStream::new(columns, rows);
assert_eq!(stream.columns().len(), 2);
assert_eq!(stream.rows_remaining(), 2);
assert!(!stream.is_finished());
let row1 = stream.try_next().unwrap();
assert_eq!(row1.get::<i32>(0).unwrap(), 1);
assert_eq!(row1.get_by_name::<String>("name").unwrap(), "Alice");
let row2 = stream.try_next().unwrap();
assert_eq!(row2.get::<i32>(0).unwrap(), 2);
assert_eq!(row2.get_by_name::<String>("name").unwrap(), "Bob");
assert!(stream.try_next().is_none());
assert!(stream.is_finished());
}
#[test]
fn test_query_stream_iterator() {
use mssql_types::SqlValue;
let columns = vec![Column {
name: "val".to_string(),
index: 0,
type_name: "INT".to_string(),
nullable: false,
max_length: None,
precision: None,
scale: None,
collation: None,
}];
let rows = vec![
Row::from_values(columns.clone(), vec![SqlValue::Int(10)]),
Row::from_values(columns.clone(), vec![SqlValue::Int(20)]),
Row::from_values(columns.clone(), vec![SqlValue::Int(30)]),
];
let mut stream = QueryStream::new(columns, rows);
let values: Vec<i32> = stream
.by_ref()
.map(|r| r.unwrap().get::<i32>(0).unwrap())
.collect();
assert_eq!(values, vec![10, 20, 30]);
assert!(stream.is_finished());
}
#[test]
fn test_query_stream_empty() {
let stream = QueryStream::empty();
assert!(stream.columns().is_empty());
assert_eq!(stream.rows_remaining(), 0);
assert!(stream.is_finished());
}
#[test]
fn test_query_stream_lazy_raw_row_decoding() {
use bytes::Bytes;
use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
use tds_protocol::types::TypeId;
let mut data = Vec::new();
data.push(4); data.extend_from_slice(&42i32.to_le_bytes());
data.push(0);
let meta = ColMetaData {
columns: vec![
ColumnData {
name: "a".to_string(),
type_id: TypeId::IntN,
col_type: 0x26,
flags: 0x00,
user_type: 0,
type_info: TypeInfo {
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
crypto_metadata: None,
},
ColumnData {
name: "b".to_string(),
type_id: TypeId::IntN,
col_type: 0x26,
flags: 0x01,
user_type: 0,
type_info: TypeInfo {
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
crypto_metadata: None,
},
],
cek_table: None,
};
let columns = vec![
Column {
name: "a".to_string(),
index: 0,
type_name: "INT".to_string(),
nullable: false,
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
Column {
name: "b".to_string(),
index: 1,
type_name: "INT".to_string(),
nullable: true,
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
];
let pending = vec![PendingRow::Raw(RawRow {
data: Bytes::from(data),
})];
#[cfg(feature = "always-encrypted")]
let mut stream = QueryStream::from_raw(columns, pending, meta, None);
#[cfg(not(feature = "always-encrypted"))]
let mut stream = QueryStream::from_raw(columns, pending, meta);
assert_eq!(stream.rows_remaining(), 1);
let row = stream
.next()
.expect("one row pending")
.expect("row decoded successfully");
assert_eq!(row.get::<i32>(0).unwrap(), 42);
assert!(row.is_null(1));
assert!(stream.next().is_none());
assert!(stream.is_finished());
}
#[test]
fn test_query_stream_lazy_decode_error_propagates() {
use bytes::Bytes;
use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
use tds_protocol::types::TypeId;
let data = vec![0x01u8, 0x02];
let meta = ColMetaData {
columns: vec![ColumnData {
name: "a".to_string(),
type_id: TypeId::Int4,
col_type: 0x38,
flags: 0x00,
user_type: 0,
type_info: TypeInfo {
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
crypto_metadata: None,
}],
cek_table: None,
};
let columns = vec![Column {
name: "a".to_string(),
index: 0,
type_name: "INT".to_string(),
nullable: false,
max_length: Some(4),
precision: None,
scale: None,
collation: None,
}];
let pending = vec![PendingRow::Raw(RawRow {
data: Bytes::from(data),
})];
#[cfg(feature = "always-encrypted")]
let mut stream = QueryStream::from_raw(columns, pending, meta, None);
#[cfg(not(feature = "always-encrypted"))]
let mut stream = QueryStream::from_raw(columns, pending, meta);
let item = stream.next().expect("pending row present");
assert!(item.is_err(), "truncated bytes must surface a decode error");
assert!(stream.next().is_none());
}
#[cfg(test)]
fn intn_meta_and_columns(
col_name: &str,
nullable: bool,
) -> (tds_protocol::token::ColMetaData, Vec<Column>) {
use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
use tds_protocol::types::TypeId;
(
ColMetaData {
columns: vec![ColumnData {
name: col_name.to_string(),
type_id: TypeId::IntN,
col_type: 0x26,
flags: if nullable { 0x01 } else { 0x00 },
user_type: 0,
type_info: TypeInfo {
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
crypto_metadata: None,
}],
cek_table: None,
},
vec![Column {
name: col_name.to_string(),
index: 0,
type_name: "INT".to_string(),
nullable,
max_length: Some(4),
precision: None,
scale: None,
collation: None,
}],
)
}
#[test]
fn test_result_set_lazy_raw_row_decoding() {
use bytes::Bytes;
use tds_protocol::token::RawRow;
let (meta, columns) = intn_meta_and_columns("a", false);
let pending = vec![
PendingRow::Raw(RawRow {
data: {
let mut b = Vec::with_capacity(5);
b.push(4);
b.extend_from_slice(&7i32.to_le_bytes());
Bytes::from(b)
},
}),
PendingRow::Raw(RawRow {
data: {
let mut b = Vec::with_capacity(5);
b.push(4);
b.extend_from_slice(&11i32.to_le_bytes());
Bytes::from(b)
},
}),
];
#[cfg(feature = "always-encrypted")]
let mut rs = ResultSet::from_raw(columns, pending, meta, None);
#[cfg(not(feature = "always-encrypted"))]
let mut rs = ResultSet::from_raw(columns, pending, meta);
assert_eq!(rs.rows_remaining(), 2);
assert!(!rs.is_empty());
let row1 = rs.next_row().expect("row present").expect("decodes");
assert_eq!(row1.get::<i32>(0).unwrap(), 7);
let row2 = rs.next_row().expect("row present").expect("decodes");
assert_eq!(row2.get::<i32>(0).unwrap(), 11);
assert!(rs.next_row().is_none());
assert!(rs.is_empty());
}
#[test]
fn test_result_set_lazy_decode_error_propagates() {
use bytes::Bytes;
use tds_protocol::token::{ColMetaData, ColumnData, RawRow, TypeInfo};
use tds_protocol::types::TypeId;
let meta = ColMetaData {
columns: vec![ColumnData {
name: "a".to_string(),
type_id: TypeId::Int4,
col_type: 0x38,
flags: 0x00,
user_type: 0,
type_info: TypeInfo {
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
crypto_metadata: None,
}],
cek_table: None,
};
let columns = vec![Column {
name: "a".to_string(),
index: 0,
type_name: "INT".to_string(),
nullable: false,
max_length: Some(4),
precision: None,
scale: None,
collation: None,
}];
let pending = vec![PendingRow::Raw(RawRow {
data: Bytes::from(vec![0x01u8, 0x02]),
})];
#[cfg(feature = "always-encrypted")]
let mut rs = ResultSet::from_raw(columns, pending, meta, None);
#[cfg(not(feature = "always-encrypted"))]
let mut rs = ResultSet::from_raw(columns, pending, meta);
let first = rs.next_row().expect("pending row present");
assert!(
first.is_err(),
"truncated bytes must surface a decode error"
);
assert!(rs.next_row().is_none());
}
#[test]
fn test_result_set_lazy_collect_all_success_and_error() {
use bytes::Bytes;
use tds_protocol::token::RawRow;
let (meta_ok, cols_ok) = intn_meta_and_columns("a", false);
let pending_ok = vec![
PendingRow::Raw(RawRow {
data: {
let mut b = Vec::with_capacity(5);
b.push(4);
b.extend_from_slice(&10i32.to_le_bytes());
Bytes::from(b)
},
}),
PendingRow::Raw(RawRow {
data: {
let mut b = Vec::with_capacity(5);
b.push(4);
b.extend_from_slice(&20i32.to_le_bytes());
Bytes::from(b)
},
}),
];
#[cfg(feature = "always-encrypted")]
let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok, None);
#[cfg(not(feature = "always-encrypted"))]
let mut rs_ok = ResultSet::from_raw(cols_ok, pending_ok, meta_ok);
let rows = rs_ok.collect_all().expect("all rows decode");
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].get::<i32>(0).unwrap(), 10);
assert_eq!(rows[1].get::<i32>(0).unwrap(), 20);
assert!(rs_ok.is_empty());
use tds_protocol::token::{ColMetaData, ColumnData, TypeInfo};
use tds_protocol::types::TypeId;
let meta_err = ColMetaData {
columns: vec![ColumnData {
name: "a".to_string(),
type_id: TypeId::Int4,
col_type: 0x38,
flags: 0x00,
user_type: 0,
type_info: TypeInfo {
max_length: Some(4),
precision: None,
scale: None,
collation: None,
},
crypto_metadata: None,
}],
cek_table: None,
};
let cols_err = vec![Column {
name: "a".to_string(),
index: 0,
type_name: "INT".to_string(),
nullable: false,
max_length: Some(4),
precision: None,
scale: None,
collation: None,
}];
let pending_err = vec![PendingRow::Raw(RawRow {
data: Bytes::from(vec![0x01u8, 0x02]),
})];
#[cfg(feature = "always-encrypted")]
let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err, None);
#[cfg(not(feature = "always-encrypted"))]
let mut rs_err = ResultSet::from_raw(cols_err, pending_err, meta_err);
let err = rs_err.collect_all();
assert!(err.is_err(), "collect_all must propagate decode error");
}
#[tokio::test]
async fn test_multi_result_stream_lazy_decode_across_result_sets() {
use bytes::Bytes;
use tds_protocol::token::RawRow;
let (meta1, cols1) = intn_meta_and_columns("a", false);
let pending1 = vec![PendingRow::Raw(RawRow {
data: {
let mut b = Vec::with_capacity(5);
b.push(4);
b.extend_from_slice(&101i32.to_le_bytes());
Bytes::from(b)
},
})];
#[cfg(feature = "always-encrypted")]
let rs1 = ResultSet::from_raw(cols1, pending1, meta1, None);
#[cfg(not(feature = "always-encrypted"))]
let rs1 = ResultSet::from_raw(cols1, pending1, meta1);
let (meta2, cols2) = intn_meta_and_columns("b", false);
let pending2 = vec![PendingRow::Raw(RawRow {
data: {
let mut b = Vec::with_capacity(5);
b.push(4);
b.extend_from_slice(&202i32.to_le_bytes());
Bytes::from(b)
},
})];
#[cfg(feature = "always-encrypted")]
let rs2 = ResultSet::from_raw(cols2, pending2, meta2, None);
#[cfg(not(feature = "always-encrypted"))]
let rs2 = ResultSet::from_raw(cols2, pending2, meta2);
let mut stream = MultiResultStream::new(vec![rs1, rs2]);
assert_eq!(stream.result_count(), 2);
assert_eq!(stream.current_result_index(), 0);
let row = stream
.next_row()
.await
.expect("first row success")
.expect("row present");
assert_eq!(row.get::<i32>(0).unwrap(), 101);
assert!(stream.next_row().await.expect("no more rows").is_none());
assert!(stream.has_more_results());
assert!(stream.next_result().await.expect("advance ok"));
assert_eq!(stream.current_result_index(), 1);
let row = stream
.next_row()
.await
.expect("second row success")
.expect("row present");
assert_eq!(row.get::<i32>(0).unwrap(), 202);
}
}