use bytes::{Buf, Bytes, BytesMut};
use tds_protocol::ProtocolError;
use tds_protocol::token::{ColMetaData, ColumnData, NbcRow, RawRow, Token, TokenParser};
use tds_protocol::types::TypeId;
use crate::Client;
use crate::client::response::server_token_to_error;
use crate::error::{Error, Result};
use crate::plp::{PlpDecoder, PlpEvent};
use crate::row::{Column, Row};
use crate::state::{ConnectionState, Ready};
pub(crate) fn is_plp_max(col: &ColumnData) -> bool {
match col.type_id {
TypeId::BigVarChar | TypeId::BigVarBinary | TypeId::NVarChar => {
col.type_info.max_length == Some(0xFFFF)
}
TypeId::Xml => true,
_ => false,
}
}
#[must_use = "streams must be consumed; dropping a stream discards remaining rows"]
pub struct BlobStream<'a, S: ConnectionState = Ready> {
client: &'a mut Client<S>,
buf: Bytes,
eom: bool,
encryption_enabled: bool,
meta: ColMetaData,
prefix_meta: ColMetaData,
scalar_row_meta: std::sync::Arc<crate::row::ColMetaData>,
blob_index: usize,
plp: Option<PlpDecoder>,
blob_null: bool,
finished: bool,
}
impl<'a, S: ConnectionState> BlobStream<'a, S> {
pub(crate) fn new(
client: &'a mut Client<S>,
buf: Bytes,
eom: bool,
encryption_enabled: bool,
meta: ColMetaData,
blob_index: usize,
) -> Self {
let prefix_meta = ColMetaData {
columns: meta.columns.iter().take(blob_index).cloned().collect(),
cek_table: meta.cek_table.clone(),
};
let scalar_columns = Client::<S>::build_columns(&prefix_meta);
Self {
client,
buf,
eom,
encryption_enabled,
meta,
prefix_meta,
scalar_row_meta: std::sync::Arc::new(crate::row::ColMetaData::new(scalar_columns)),
blob_index,
plp: None,
blob_null: false,
finished: true, }
.started()
}
fn started(mut self) -> Self {
self.finished = false;
self
}
#[must_use]
pub fn columns(&self) -> &[Column] {
&self.scalar_row_meta.columns
}
pub async fn next(&mut self) -> Result<Option<Row>> {
if self.finished {
return Ok(None);
}
self.drain_current_blob().await?;
loop {
if self.buf.is_empty() {
if !self.pull_packet().await? {
self.finish();
return Ok(None);
}
continue;
}
match self.buf[0] {
0xD1 => return Ok(Some(self.decode_row().await?)),
0xD2 => return Ok(Some(self.decode_nbc_row().await?)),
_ => match self.parse_control_token().await? {
Control::Finished => {
self.finish();
return Ok(None);
}
Control::Continue => continue,
},
}
}
}
pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
loop {
let event = match self.plp.as_mut() {
Some(plp) if !plp.is_done() => plp.pull(&mut self.buf)?,
_ => return Ok(None),
};
match event {
PlpEvent::Data(d) => return Ok(Some(d)),
PlpEvent::End => return Ok(None),
PlpEvent::NeedMore => {
if !self.pull_packet().await? {
return Err(Error::ConnectionClosed);
}
}
}
}
}
pub async fn copy_blob_to<W>(&mut self, w: &mut W) -> Result<u64>
where
W: tokio::io::AsyncWrite + Unpin,
{
use tokio::io::AsyncWriteExt;
let mut total = 0u64;
while let Some(chunk) = self.read_chunk().await? {
w.write_all(&chunk).await.map_err(Error::from)?;
total += chunk.len() as u64;
}
Ok(total)
}
#[must_use]
pub fn blob_len(&self) -> Option<u64> {
if self.blob_null {
return None;
}
self.plp.as_ref().and_then(PlpDecoder::total_len)
}
#[must_use]
pub fn blob_is_null(&self) -> bool {
self.blob_null
}
fn finish(&mut self) {
self.finished = true;
self.client.note_response_drained();
}
async fn decode_row(&mut self) -> Result<Row> {
loop {
let mut view: &[u8] = &self.buf[..];
let before = view.len();
view.advance(1); match RawRow::decode_prefix(&mut view, &self.meta, self.blob_index) {
Ok(raw) => {
let consumed = before - view.len();
self.buf.advance(consumed);
let row = crate::column_parser::convert_raw_row(
&raw,
&self.prefix_meta,
&self.scalar_row_meta,
)?;
self.plp = Some(PlpDecoder::new());
self.blob_null = false;
return Ok(row);
}
Err(ProtocolError::UnexpectedEof) if !self.eom => {
self.pull_packet().await?;
}
Err(e) => return Err(e.into()),
}
}
}
async fn decode_nbc_row(&mut self) -> Result<Row> {
loop {
let mut view: &[u8] = &self.buf[..];
let before = view.len();
view.advance(1); match NbcRow::decode_prefix(&mut view, &self.meta, self.blob_index) {
Ok(nbc) => {
let consumed = before - view.len();
self.buf.advance(consumed);
let blob_null = nbc.is_null(self.blob_index);
let row = crate::column_parser::convert_nbc_row(
&nbc,
&self.prefix_meta,
&self.scalar_row_meta,
)?;
self.blob_null = blob_null;
self.plp = if blob_null {
None
} else {
Some(PlpDecoder::new())
};
return Ok(row);
}
Err(ProtocolError::UnexpectedEof) if !self.eom => {
self.pull_packet().await?;
}
Err(e) => return Err(e.into()),
}
}
}
async fn parse_control_token(&mut self) -> Result<Control> {
loop {
let mut parser =
TokenParser::new(self.buf.clone()).with_encryption(self.encryption_enabled);
match parser.next_token_with_metadata(Some(&self.meta)) {
Ok(Some(token)) => {
let consumed = self.buf.len() - parser.remaining();
self.buf.advance(consumed);
return self.classify(token);
}
Ok(None) => {
if self.eom {
return Ok(Control::Finished);
}
self.pull_packet().await?;
}
Err(ProtocolError::UnexpectedEof | ProtocolError::IncompletePacket { .. })
if !self.eom =>
{
self.pull_packet().await?;
}
Err(e) => return Err(e.into()),
}
}
}
fn classify(&mut self, token: Token) -> Result<Control> {
match token {
Token::Done(d) => {
if d.status.error {
return Err(Error::Query(
"query failed (server set error flag in DONE token)".to_string(),
));
}
Ok(if d.status.more {
Control::Continue
} else {
Control::Finished
})
}
Token::Error(e) => Err(server_token_to_error(&e)),
Token::ColMetaData(_) => Err(Error::Protocol(
"query_stream_blob does not support multiple result sets".to_string(),
)),
Token::EnvChange(ref e) => {
self.client.apply_transaction_env_change(e);
Ok(Control::Continue)
}
_ => Ok(Control::Continue),
}
}
async fn drain_current_blob(&mut self) -> Result<()> {
if self.plp.is_some() && !self.blob_null {
while self.read_chunk().await?.is_some() {}
}
self.plp = None;
self.blob_null = false;
Ok(())
}
async fn pull_packet(&mut self) -> Result<bool> {
match self.client.read_response_packet().await? {
Some((payload, is_eom)) => {
if self.buf.is_empty() {
self.buf = payload;
} else {
let mut joined = BytesMut::with_capacity(self.buf.len() + payload.len());
joined.extend_from_slice(&self.buf);
joined.extend_from_slice(&payload);
self.buf = joined.freeze();
}
self.eom |= is_eom;
Ok(true)
}
None => {
self.eom = true;
Ok(false)
}
}
}
}
enum Control {
Continue,
Finished,
}