use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use serde::Deserialize;
use tokio_stream::{Stream, StreamExt};
#[cfg(feature = "flight-sql")]
use arrow::record_batch::RecordBatch;
use crate::error::{Error, Result};
use crate::schema::SchemaRef;
use crate::value::Value;
#[derive(Clone, Debug)]
pub enum RowWithStats {
Row(Row),
Stats(ServerStats),
}
#[derive(Deserialize, Clone, Debug, Default)]
pub struct ServerStats {
#[serde(default)]
pub total_rows: usize,
#[serde(default)]
pub total_bytes: usize,
#[serde(default)]
pub read_rows: usize,
#[serde(default)]
pub read_bytes: usize,
#[serde(default)]
pub write_rows: usize,
#[serde(default)]
pub write_bytes: usize,
#[serde(default)]
pub running_time_ms: f64,
}
impl ServerStats {
pub fn normalize(&mut self) {
if self.total_rows == 0 {
self.total_rows = self.read_rows;
}
if self.total_bytes == 0 {
self.total_bytes = self.read_bytes;
}
}
}
impl From<databend_client::response::QueryStats> for ServerStats {
fn from(stats: databend_client::response::QueryStats) -> Self {
let mut p = Self {
total_rows: 0,
total_bytes: 0,
read_rows: stats.progresses.scan_progress.rows,
read_bytes: stats.progresses.scan_progress.bytes,
write_rows: stats.progresses.write_progress.rows,
write_bytes: stats.progresses.write_progress.bytes,
running_time_ms: stats.running_time_ms,
};
if let Some(total) = stats.progresses.total_scan {
p.total_rows = total.rows;
p.total_bytes = total.bytes;
}
p
}
}
#[derive(Clone, Debug, Default)]
pub struct Row(Vec<Value>);
impl TryFrom<(SchemaRef, &Vec<String>)> for Row {
type Error = Error;
fn try_from((schema, data): (SchemaRef, &Vec<String>)) -> Result<Self> {
let mut values: Vec<Value> = Vec::new();
for (i, field) in schema.fields().iter().enumerate() {
values.push(Value::try_from((&field.data_type, data[i].as_str()))?);
}
Ok(Self(values))
}
}
impl Row {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn values(&self) -> &[Value] {
&self.0
}
pub fn from_vec(values: Vec<Value>) -> Self {
Self(values)
}
}
impl IntoIterator for Row {
type Item = Value;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
#[derive(Clone, Debug)]
pub struct Rows {
schema: SchemaRef,
rows: Vec<Row>,
}
#[cfg(feature = "flight-sql")]
impl TryFrom<RecordBatch> for Rows {
type Error = Error;
fn try_from(batch: RecordBatch) -> Result<Self> {
let schema = batch.schema();
let mut rows: Vec<Row> = Vec::new();
for i in 0..batch.num_rows() {
let mut values: Vec<Value> = Vec::new();
for j in 0..schema.fields().len() {
let v = batch.column(j);
let field = schema.field(j);
let value = Value::try_from((field, v, i))?;
values.push(value);
}
rows.push(Row(values));
}
Ok(Self {
schema: Arc::new(schema.try_into()?),
rows,
})
}
}
impl IntoIterator for Rows {
type Item = Row;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.rows.into_iter()
}
}
impl Rows {
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn rows(&self) -> &[Row] {
&self.rows
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
}
pub struct RowIterator {
schema: SchemaRef,
it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>,
}
impl RowIterator {
pub fn new(schema: SchemaRef, it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>) -> Self {
Self { schema, it }
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub async fn try_collect<T>(mut self) -> Result<Vec<T>>
where
T: TryFrom<Row>,
T::Error: std::fmt::Display,
{
let mut ret = Vec::new();
while let Some(row) = self.it.next().await {
let v = T::try_from(row?).map_err(|e| Error::Parsing(e.to_string()))?;
ret.push(v)
}
Ok(ret)
}
}
impl Stream for RowIterator {
type Item = Result<Row>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.it).poll_next(cx)
}
}
pub struct RowStatsIterator {
schema: SchemaRef,
it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
}
impl RowStatsIterator {
pub fn new(
schema: SchemaRef,
it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
) -> Self {
Self { schema, it }
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub async fn filter_rows(self) -> RowIterator {
let it = self.it.filter_map(|r| match r {
Ok(RowWithStats::Row(r)) => Some(Ok(r)),
Ok(_) => None,
Err(err) => Some(Err(err)),
});
RowIterator::new(self.schema, Box::pin(it))
}
}
impl Stream for RowStatsIterator {
type Item = Result<RowWithStats>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.it).poll_next(cx)
}
}