use crate::tds::stream::ReceivedToken;
use crate::{row::ColumnType, Column, Row};
use futures_util::{
ready,
stream::{BoxStream, Peekable, Stream, StreamExt, TryStreamExt},
};
use std::{
fmt::Debug,
pin::Pin,
sync::Arc,
task::{self, Poll},
};
pub struct QueryStream<'a> {
token_stream: Peekable<BoxStream<'a, crate::Result<ReceivedToken>>>,
columns: Option<Arc<Vec<Column>>>,
result_set_index: Option<usize>,
}
impl<'a> Debug for QueryStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryStream")
.field(
"token_stream",
&"BoxStream<'a, crate::Result<ReceivedToken>>",
)
.finish()
}
}
impl<'a> QueryStream<'a> {
pub(crate) fn new(token_stream: BoxStream<'a, crate::Result<ReceivedToken>>) -> Self {
Self {
token_stream: token_stream.peekable(),
columns: None,
result_set_index: None,
}
}
pub(crate) async fn forward_to_metadata(&mut self) -> crate::Result<()> {
loop {
let item = Pin::new(&mut self.token_stream)
.peek()
.await
.map(|r| r.as_ref().map_err(|e| e.clone()))
.transpose()?;
match item {
Some(ReceivedToken::NewResultset(_)) => break,
Some(_) => {
self.token_stream.try_next().await?;
}
None => break,
}
}
Ok(())
}
pub async fn columns(&mut self) -> crate::Result<Option<&[Column]>> {
use ReceivedToken::*;
loop {
let item = Pin::new(&mut self.token_stream)
.peek()
.await
.map(|r| r.as_ref().map_err(|e| e.clone()))
.transpose()?;
match item {
Some(token) => match token {
NewResultset(metadata) => {
self.columns = Some(Arc::new(metadata.columns().collect()));
break;
}
Row(_) => {
break;
}
_ => {
self.token_stream.try_next().await?;
continue;
}
},
None => {
break;
}
}
}
Ok(self.columns.as_ref().map(|c| c.as_slice()))
}
pub async fn into_results(mut self) -> crate::Result<Vec<Vec<Row>>> {
let mut results: Vec<Vec<Row>> = Vec::new();
let mut result: Option<Vec<Row>> = None;
while let Some(item) = self.try_next().await? {
match (item, &mut result) {
(QueryItem::Row(row), None) => {
result = Some(vec![row]);
}
(QueryItem::Row(row), Some(ref mut result)) => result.push(row),
(QueryItem::Metadata(_), None) => {
result = Some(Vec::new());
}
(QueryItem::Metadata(_), ref mut previous_result) => {
results.push(previous_result.take().unwrap());
result = None;
}
}
}
if let Some(result) = result {
results.push(result);
}
Ok(results)
}
pub async fn into_first_result(self) -> crate::Result<Vec<Row>> {
let mut results = self.into_results().await?.into_iter();
let rows = results.next().unwrap_or_default();
Ok(rows)
}
pub async fn into_row(self) -> crate::Result<Option<Row>> {
let mut results = self.into_first_result().await?.into_iter();
Ok(results.next())
}
pub fn into_row_stream(self) -> BoxStream<'a, crate::Result<Row>> {
let s = self.try_filter_map(|item| async {
match item {
QueryItem::Row(row) => Ok(Some(row)),
QueryItem::Metadata(_) => Ok(None),
}
});
Box::pin(s)
}
}
#[derive(Debug, Clone)]
pub struct ResultMetadata {
columns: Arc<Vec<Column>>,
result_index: usize,
}
impl ResultMetadata {
pub fn columns(&self) -> &[Column] {
&self.columns
}
pub fn result_index(&self) -> usize {
self.result_index
}
}
#[derive(Debug)]
pub enum QueryItem {
Row(Row),
Metadata(ResultMetadata),
}
impl QueryItem {
pub(crate) fn metadata(columns: Arc<Vec<Column>>, result_index: usize) -> Self {
Self::Metadata(ResultMetadata {
columns,
result_index,
})
}
pub fn as_metadata(&self) -> Option<&ResultMetadata> {
match self {
QueryItem::Row(_) => None,
QueryItem::Metadata(ref metadata) => Some(metadata),
}
}
pub fn as_row(&self) -> Option<&Row> {
match self {
QueryItem::Row(ref row) => Some(row),
QueryItem::Metadata(_) => None,
}
}
pub fn into_metadata(self) -> Option<ResultMetadata> {
match self {
QueryItem::Row(_) => None,
QueryItem::Metadata(metadata) => Some(metadata),
}
}
pub fn into_row(self) -> Option<Row> {
match self {
QueryItem::Row(row) => Some(row),
QueryItem::Metadata(_) => None,
}
}
}
impl<'a> Stream for QueryStream<'a> {
type Item = crate::Result<QueryItem>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
let token = match ready!(this.token_stream.poll_next_unpin(cx)) {
Some(res) => res?,
None => return Poll::Ready(None),
};
return match token {
ReceivedToken::NewResultset(meta) => {
let column_meta = meta
.columns
.iter()
.map(|x| Column {
name: x.col_name.to_string(),
column_type: ColumnType::from(&x.base.ty),
})
.collect::<Vec<_>>();
let column_meta = Arc::new(column_meta);
this.columns = Some(column_meta.clone());
this.result_set_index = this.result_set_index.map(|i| i + 1);
let query_item =
QueryItem::metadata(column_meta, *this.result_set_index.get_or_insert(0));
return Poll::Ready(Some(Ok(query_item)));
}
ReceivedToken::Row(data) => {
let columns = this.columns.as_ref().unwrap().clone();
let result_index = this.result_set_index.unwrap();
let row = Row {
columns,
data,
result_index,
};
Poll::Ready(Some(Ok(QueryItem::Row(row))))
}
_ => continue,
};
}
}
}