use mysql_common::row::convert::FromRowError;
use std::{borrow::Cow, fmt, marker::PhantomData, result::Result as StdResult, sync::Arc};
use crate::{
conn::{routines::NextSetRoutine, PendingResult},
connection_like::Connection,
error::*,
prelude::{FromRow, Protocol},
Column, Row,
};
pub mod result_set_stream;
mod tests;
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ResultSetMeta {
Text(Arc<[Column]>),
Binary(Arc<[Column]>),
}
impl ResultSetMeta {
fn columns(&self) -> &Arc<[Column]> {
match self {
ResultSetMeta::Text(cols) | ResultSetMeta::Binary(cols) => cols,
}
}
}
pub struct QueryResult<'a, 't: 'a, P> {
conn: Connection<'a, 't>,
__phantom: PhantomData<P>,
}
impl<'a, 't: 'a, P> fmt::Debug for QueryResult<'a, 't, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueryResult")
.field("conn", &self.conn)
.field("__phantom", &"PhantomData<P>")
.finish()
}
}
impl<'a, 't: 'a, P> QueryResult<'a, 't, P>
where
P: Protocol,
{
pub fn new<T: Into<Connection<'a, 't>>>(conn: T) -> Self {
QueryResult {
conn: conn.into(),
__phantom: PhantomData,
}
}
fn has_rows(&self) -> bool {
self.conn
.get_pending_result()
.map(|pending_result| match pending_result {
Some(PendingResult::Pending(meta)) => meta.columns().len() > 0,
Some(PendingResult::Taken(meta)) => meta.columns().len() > 0,
None => false,
})
.unwrap_or(false)
}
pub fn is_empty(&self) -> bool {
!self.has_rows() && !self.conn.more_results_exists()
}
async fn next_row(&mut self, columns: Arc<[Column]>) -> crate::Result<Option<Row>> {
let mut row = None;
if columns.is_empty() {
self.conn.as_mut().set_pending_result(None)?;
} else {
let packet = match self.conn.as_mut().read_packet().await {
Ok(packet) => packet,
Err(err) => {
self.conn.as_mut().set_pending_result(None)?;
return Err(err);
}
};
if P::is_last_result_set_packet(self.conn.capabilities(), &packet) {
self.conn.as_mut().set_pending_result(None)?;
} else {
row = Some(P::read_result_set_row(&packet, columns)?);
}
}
Ok(row)
}
async fn next_set(&mut self) -> crate::Result<bool> {
if self.conn.more_results_exists() {
self.conn
.as_mut()
.routine(NextSetRoutine::<P>::new())
.await?;
}
Ok(self.conn.has_pending_result())
}
async fn next_row_or_next_set(&mut self, meta: ResultSetMeta) -> crate::Result<Option<Row>> {
let columns = meta.columns().clone();
self.next_row_or_next_set2(columns).await
}
async fn next_row_or_next_set2(
&mut self,
columns: Arc<[Column]>,
) -> crate::Result<Option<Row>> {
if let Some(row) = self.next_row(columns).await? {
Ok(Some(row))
} else {
self.next_set().await?;
Ok(None)
}
}
async fn skip_taken(&mut self, meta: Arc<ResultSetMeta>) -> crate::Result<()> {
while (self.next_row_or_next_set((*meta).clone()).await?).is_some() {}
Ok(())
}
#[doc(hidden)]
pub async fn next(&mut self) -> Result<Option<Row>> {
loop {
match self.conn.as_mut().use_pending_result()?.cloned() {
Some(PendingResult::Pending(meta)) => return self.next_row_or_next_set(meta).await,
Some(PendingResult::Taken(meta)) => self.skip_taken(meta).await?,
None => return Ok(None),
}
}
}
pub fn last_insert_id(&self) -> Option<u64> {
self.conn.last_insert_id()
}
pub fn affected_rows(&self) -> u64 {
self.conn.affected_rows()
}
pub fn info(&self) -> Cow<'_, str> {
self.conn.info()
}
pub fn warnings(&self) -> u16 {
self.conn.get_warnings()
}
pub async fn collect<R>(&mut self) -> Result<Vec<R>>
where
R: FromRow + Send + 'static,
{
self.reduce(Vec::new(), |mut acc, row| {
acc.push(FromRow::from_row(row));
acc
})
.await
}
pub async fn try_collect<R>(&mut self) -> Result<Vec<StdResult<R, FromRowError>>>
where
R: FromRow + Send + 'static,
{
self.reduce(Vec::new(), |mut acc, row| {
acc.push(FromRow::from_row_opt(row));
acc
})
.await
}
pub async fn collect_and_drop<R>(mut self) -> Result<Vec<R>>
where
R: FromRow + Send + 'static,
{
let output = self.collect::<R>().await?;
self.drop_result().await?;
Ok(output)
}
pub async fn try_collect_and_drop<R>(mut self) -> Result<Vec<StdResult<R, FromRowError>>>
where
R: FromRow + Send + 'static,
{
let output = self.try_collect().await?;
self.drop_result().await?;
Ok(output)
}
pub async fn for_each<F>(&mut self, mut fun: F) -> Result<()>
where
F: FnMut(Row),
{
if self.is_empty() {
Ok(())
} else {
while let Some(row) = self.next().await? {
fun(row);
}
Ok(())
}
}
pub async fn for_each_and_drop<F>(mut self, fun: F) -> Result<()>
where
F: FnMut(Row),
{
self.for_each(fun).await?;
self.drop_result().await?;
Ok(())
}
pub async fn map<F, U>(&mut self, mut fun: F) -> Result<Vec<U>>
where
F: FnMut(Row) -> U,
{
let mut acc = Vec::new();
while let Some(row) = self.next().await? {
acc.push(fun(crate::from_row(row)));
}
Ok(acc)
}
pub async fn map_and_drop<F, U>(mut self, fun: F) -> Result<Vec<U>>
where
F: FnMut(Row) -> U,
{
let rows = self.map(fun).await?;
self.drop_result().await?;
Ok(rows)
}
pub async fn reduce<T, F, U>(&mut self, mut init: U, mut fun: F) -> Result<U>
where
F: FnMut(U, T) -> U,
T: FromRow + Send + 'static,
{
while let Some(row) = self.next().await? {
init = fun(init, crate::from_row(row));
}
Ok(init)
}
pub async fn reduce_and_drop<T, F, U>(mut self, init: U, fun: F) -> Result<U>
where
F: FnMut(U, T) -> U,
T: FromRow + Send + 'static,
{
let acc = self.reduce(init, fun).await?;
self.drop_result().await?;
Ok(acc)
}
pub async fn drop_result(mut self) -> Result<()> {
loop {
while self.next().await?.is_some() {}
if !self.conn.has_pending_result() {
break Ok(());
}
}
}
pub fn columns_ref(&self) -> &[Column] {
self.conn
.get_pending_result()
.ok()
.flatten()
.map(|meta| match meta {
PendingResult::Pending(meta) => &meta.columns()[..],
PendingResult::Taken(meta) => &meta.columns()[..],
})
.unwrap_or_default()
}
pub fn columns(&self) -> Option<Arc<[Column]>> {
self.conn
.get_pending_result()
.ok()
.flatten()
.map(|meta| match meta {
PendingResult::Pending(meta) => meta.columns(),
PendingResult::Taken(meta) => meta.columns(),
})
.cloned()
}
}