use std::any::type_name;
use std::borrow::Cow;
use std::sync::Arc;
use std::task::Poll;
use std::{fmt, marker::PhantomData};
use futures_core::FusedStream;
use futures_core::{future::BoxFuture, Stream};
use futures_util::FutureExt;
use mysql_common::packets::{Column, OkPacket};
use crate::{
conn::PendingResult,
prelude::{FromRow, Protocol},
QueryResult, Row,
};
enum CowMut<'r, 'a: 'r, 't: 'a, P> {
Borrowed(&'r mut QueryResult<'a, 't, P>),
Owned(QueryResult<'a, 't, P>),
}
impl<'r, 'a: 'r, 't: 'a, P> fmt::Debug for CowMut<'r, 'a, 't, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Borrowed(arg0) => f.debug_tuple("Borrowed").field(arg0).finish(),
Self::Owned(arg0) => f.debug_tuple("Owned").field(arg0).finish(),
}
}
}
impl<'r, 'a: 'r, 't: 'a, P> AsMut<QueryResult<'a, 't, P>> for CowMut<'r, 'a, 't, P> {
fn as_mut(&mut self) -> &mut QueryResult<'a, 't, P> {
match self {
CowMut::Borrowed(q) => q,
CowMut::Owned(q) => q,
}
}
}
enum ResultSetStreamState<'r, 'a: 'r, 't: 'a, P> {
Idle(CowMut<'r, 'a, 't, P>),
NextFut(BoxFuture<'r, (crate::Result<Option<Row>>, CowMut<'r, 'a, 't, P>)>),
}
impl<'r, 'a: 'r, 't: 'a, P> fmt::Debug for ResultSetStreamState<'r, 'a, 't, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Idle(arg0) => f.debug_tuple("Idle").field(arg0).finish(),
Self::NextFut(_arg0) => f
.debug_tuple("NextFut")
.field(&type_name::<
BoxFuture<'r, (crate::Result<Option<Row>>, CowMut<'r, 'a, 't, P>)>,
>())
.finish(),
}
}
}
#[derive(Debug)]
pub struct ResultSetStream<'r, 'a: 'r, 't: 'a, T, P> {
query_result: Option<ResultSetStreamState<'r, 'a, 't, P>>,
ok_packet: Option<OkPacket<'static>>,
columns: Arc<[Column]>,
__from_row_type: PhantomData<T>,
}
impl<'r, 'a: 'r, 't: 'a, T, P> FusedStream for ResultSetStream<'r, 'a, 't, T, P>
where
P: Protocol + Unpin,
T: FromRow + Unpin + Send + 'static,
{
fn is_terminated(&self) -> bool {
self.query_result.is_none()
}
}
impl<'r, 'a: 'r, 't: 'a, T, P> ResultSetStream<'r, 'a, 't, T, P> {
pub fn last_insert_id(&self) -> Option<u64> {
self.ok_packet.as_ref().and_then(|ok| ok.last_insert_id())
}
pub fn affected_rows(&self) -> u64 {
self.ok_packet
.as_ref()
.map(|ok| ok.affected_rows())
.unwrap_or_default()
}
pub fn columns_ref(&self) -> &[Column] {
&self.columns[..]
}
pub fn columns(&self) -> Arc<[Column]> {
self.columns.clone()
}
pub fn info(&self) -> Cow<'_, str> {
self.ok_packet
.as_ref()
.and_then(|ok| ok.info_str())
.unwrap_or_default()
}
pub fn get_warnings(&self) -> u16 {
self.ok_packet
.as_ref()
.map(|ok| ok.warnings())
.unwrap_or_default()
}
pub fn ok_packet(&self) -> Option<&OkPacket<'static>> {
self.ok_packet.as_ref()
}
}
impl<'r, 'a: 'r, 't: 'a, T, P> Stream for ResultSetStream<'r, 'a, 't, T, P>
where
P: Protocol + Unpin,
T: FromRow + Unpin + Send + 'static,
{
type Item = crate::Result<T>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
let columns = this.columns.clone();
match this.query_result.take() {
Some(ResultSetStreamState::Idle(mut query_result)) => {
let fut = Box::pin(async move {
let row = query_result.as_mut().next_row_or_next_set2(columns).await;
(row, query_result)
});
this.query_result = Some(ResultSetStreamState::NextFut(fut));
}
Some(ResultSetStreamState::NextFut(mut fut)) => match fut.poll_unpin(cx) {
Poll::Ready((row, query_result)) => match row {
Ok(Some(row)) => {
this.query_result = Some(ResultSetStreamState::Idle(query_result));
return Poll::Ready(Some(Ok(crate::from_row(row))));
}
Ok(None) => return Poll::Ready(None),
Err(err) => return Poll::Ready(Some(Err(err))),
},
Poll::Pending => {
this.query_result = Some(ResultSetStreamState::NextFut(fut));
return Poll::Pending;
}
},
None => return Poll::Ready(None),
}
}
}
}
impl<'a, 't: 'a, P> QueryResult<'a, 't, P>
where
P: Protocol + Unpin,
{
async fn setup_stream(
&mut self,
) -> crate::Result<Option<(Option<OkPacket<'static>>, Arc<[Column]>)>> {
match self.conn.as_mut().use_pending_result()? {
Some(PendingResult::Taken(meta)) => {
let meta = (*meta).clone();
self.skip_taken(meta).await?;
}
Some(_) => (),
None => return Ok(None),
}
let ok_packet = self.conn.last_ok_packet().cloned();
let columns = match self.conn.as_mut().take_pending_result()? {
Some(meta) => meta.columns().clone(),
None => return Ok(None),
};
Ok(Some((ok_packet, columns)))
}
pub fn stream<'r, T: Unpin + FromRow + Send + 'static>(
&'r mut self,
) -> BoxFuture<'r, crate::Result<Option<ResultSetStream<'r, 'a, 't, T, P>>>> {
async move {
Ok(self
.setup_stream()
.await?
.map(
move |(ok_packet, columns)| ResultSetStream::<'r, 'a, 't, T, P> {
ok_packet,
columns,
query_result: Some(ResultSetStreamState::Idle(CowMut::Borrowed(self))),
__from_row_type: PhantomData,
},
))
}
.boxed()
}
pub fn stream_and_drop<T: Unpin + FromRow + Send + 'static>(
mut self,
) -> BoxFuture<'a, crate::Result<Option<ResultSetStream<'a, 'a, 't, T, P>>>> {
async move {
Ok(self
.setup_stream()
.await?
.map(|(ok_packet, columns)| ResultSetStream::<'a, 'a, 't, T, P> {
ok_packet,
columns,
query_result: Some(ResultSetStreamState::Idle(CowMut::Owned(self))),
__from_row_type: PhantomData,
}))
}
.boxed()
}
}