databend_driver_core/
raw_rows.rsuse std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio_stream::{Stream, StreamExt};
use crate::error::Error;
use crate::error::Result;
use crate::rows::Row;
use crate::rows::ServerStats;
use crate::schema::SchemaRef;
use crate::value::Value;
#[derive(Clone, Debug)]
pub enum RawRowWithStats {
Row(RawRow),
Stats(ServerStats),
}
#[derive(Clone, Debug, Default)]
pub struct RawRow {
pub row: Row,
pub raw_row: Vec<Option<String>>,
}
impl RawRow {
pub fn new(row: Row, raw_row: Vec<Option<String>>) -> Self {
Self { row, raw_row }
}
pub fn len(&self) -> usize {
self.raw_row.len()
}
pub fn is_empty(&self) -> bool {
self.raw_row.is_empty()
}
pub fn values(&self) -> &[Option<String>] {
&self.raw_row
}
pub fn schema(&self) -> SchemaRef {
self.row.schema()
}
}
impl TryFrom<(SchemaRef, Vec<Option<String>>)> for RawRow {
type Error = Error;
fn try_from((schema, data): (SchemaRef, Vec<Option<String>>)) -> Result<Self> {
let mut values: Vec<Value> = Vec::new();
for (i, field) in schema.fields().iter().enumerate() {
let val: Option<&str> = data.get(i).and_then(|v| v.as_deref());
values.push(Value::try_from((&field.data_type, val))?);
}
let row = Row::new(schema, values);
Ok(RawRow::new(row, data))
}
}
impl IntoIterator for RawRow {
type Item = Option<String>;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.raw_row.into_iter()
}
}
#[derive(Clone, Debug)]
pub struct RawRows {
rows: Vec<RawRow>,
}
impl RawRows {
pub fn new(rows: Vec<RawRow>) -> Self {
Self { rows }
}
pub fn rows(&self) -> &[RawRow] {
&self.rows
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
}
impl IntoIterator for RawRows {
type Item = RawRow;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.rows.into_iter()
}
}
pub struct RawRowIterator {
schema: SchemaRef,
it: Pin<Box<dyn Stream<Item = Result<RawRow>> + Send>>,
}
impl RawRowIterator {
pub fn new(
schema: SchemaRef,
it: Pin<Box<dyn Stream<Item = Result<RawRowWithStats>> + Send>>,
) -> Self {
let it = it.filter_map(|r| match r {
Ok(RawRowWithStats::Row(r)) => Some(Ok(r)),
Ok(_) => None,
Err(err) => Some(Err(err)),
});
Self {
schema,
it: Box::pin(it),
}
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for RawRowIterator {
type Item = Result<RawRow>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.it).poll_next(cx)
}
}