use std::sync::Arc;
use tds_protocol::token::{ColMetaData, Token};
use crate::Client;
use crate::error::{Error, Result};
use crate::row::{Column, Row};
use crate::row_source::{Pull, RowSource};
use crate::state::{ConnectionState, Ready};
#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
pub struct RowStream<'a, S: ConnectionState = Ready> {
client: &'a mut Client<S>,
source: RowSource,
row_meta: Arc<crate::row::ColMetaData>,
meta: ColMetaData,
#[cfg(feature = "always-encrypted")]
decryptor: Option<std::sync::Arc<crate::column_decryptor::ColumnDecryptor>>,
finished: bool,
}
impl<'a, S: ConnectionState> RowStream<'a, S> {
pub(crate) fn new(
client: &'a mut Client<S>,
source: RowSource,
columns: Vec<Column>,
meta: ColMetaData,
#[cfg(feature = "always-encrypted")] decryptor: Option<
std::sync::Arc<crate::column_decryptor::ColumnDecryptor>,
>,
) -> Self {
Self {
client,
source,
row_meta: Arc::new(crate::row::ColMetaData::new(columns)),
meta,
#[cfg(feature = "always-encrypted")]
decryptor,
finished: false,
}
}
pub(crate) fn empty(client: &'a mut Client<S>) -> Self {
Self {
client,
source: RowSource::new(false),
row_meta: Arc::new(crate::row::ColMetaData::new(Vec::new())),
meta: ColMetaData::default(),
#[cfg(feature = "always-encrypted")]
decryptor: None,
finished: true,
}
}
#[must_use]
pub fn columns(&self) -> &[Column] {
&self.row_meta.columns
}
#[must_use]
pub fn is_finished(&self) -> bool {
self.finished
}
pub async fn try_next(&mut self) -> Result<Option<Row>> {
if self.finished {
return Ok(None);
}
loop {
match self.source.pull()? {
Pull::Token(Token::Row(raw)) => return Ok(Some(self.decode_raw(&raw)?)),
Pull::Token(Token::NbcRow(nbc)) => return Ok(Some(self.decode_nbc(&nbc)?)),
Pull::Token(Token::ColMetaData(meta)) => {
self.switch_result_set(meta).await?;
}
Pull::Token(Token::Error(err)) => {
self.finish();
return Err(crate::client::response::server_token_to_error(&err));
}
Pull::Token(Token::Done(done)) => {
if done.status.error {
self.finish();
return Err(Error::Query(
"query failed (server set error flag in DONE token)".to_string(),
));
}
}
Pull::Token(Token::EnvChange(env)) => {
self.client.apply_transaction_env_change(&env);
}
Pull::Token(_) => {
}
Pull::NeedMore => match self.client.read_response_packet().await? {
Some((payload, is_eom)) => self.source.push_packet(payload, is_eom),
None => {
self.finish();
return Err(Error::ConnectionClosed);
}
},
Pull::End => {
self.finish();
return Ok(None);
}
}
}
}
pub async fn collect_all(mut self) -> Result<Vec<Row>> {
let mut out = Vec::new();
while let Some(row) = self.try_next().await? {
out.push(row);
}
Ok(out)
}
pub async fn cancel(mut self) -> Result<()> {
if self.finished {
return Ok(());
}
self.finished = true;
self.client.cancel_in_flight_response().await
}
fn finish(&mut self) {
self.finished = true;
self.client.note_response_drained();
}
async fn switch_result_set(&mut self, meta: ColMetaData) -> Result<()> {
self.row_meta = Arc::new(crate::row::ColMetaData::new(Client::<S>::build_columns(
&meta,
)));
#[cfg(feature = "always-encrypted")]
{
self.decryptor = self
.client
.resolve_decryptor(&meta)
.await?
.map(std::sync::Arc::new);
}
self.meta = meta;
Ok(())
}
fn decode_raw(&self, raw: &tds_protocol::token::RawRow) -> Result<Row> {
#[cfg(feature = "always-encrypted")]
if let Some(ref dec) = self.decryptor {
return crate::column_parser::convert_raw_row_decrypted(
raw,
&self.meta,
&self.row_meta,
dec,
);
}
crate::column_parser::convert_raw_row(raw, &self.meta, &self.row_meta)
}
fn decode_nbc(&self, nbc: &tds_protocol::token::NbcRow) -> Result<Row> {
#[cfg(feature = "always-encrypted")]
if let Some(ref dec) = self.decryptor {
return crate::column_parser::convert_nbc_row_decrypted(
nbc,
&self.meta,
&self.row_meta,
dec,
);
}
crate::column_parser::convert_nbc_row(nbc, &self.meta, &self.row_meta)
}
}