1use std::pin::Pin;
16use std::task::Context;
17use std::task::Poll;
18
19use serde::Deserialize;
20use tokio_stream::{Stream, StreamExt};
21
22#[cfg(feature = "flight-sql")]
23use arrow::record_batch::RecordBatch;
24
25use crate::error::{Error, Result};
26use crate::schema::SchemaRef;
27use crate::value::Value;
28
29#[derive(Clone, Debug)]
30pub enum RowWithStats {
31 Row(Row),
32 Stats(ServerStats),
33}
34
35#[derive(Deserialize, Clone, Debug, Default)]
36pub struct ServerStats {
37 #[serde(default)]
38 pub total_rows: usize,
39 #[serde(default)]
40 pub total_bytes: usize,
41
42 #[serde(default)]
43 pub read_rows: usize,
44 #[serde(default)]
45 pub read_bytes: usize,
46
47 #[serde(default)]
48 pub write_rows: usize,
49 #[serde(default)]
50 pub write_bytes: usize,
51
52 #[serde(default)]
53 pub running_time_ms: f64,
54
55 #[serde(default)]
56 pub spill_file_nums: usize,
57
58 #[serde(default)]
59 pub spill_bytes: usize,
60}
61
62impl ServerStats {
63 pub fn normalize(&mut self) {
64 if self.total_rows == 0 {
65 self.total_rows = self.read_rows;
66 }
67 if self.total_bytes == 0 {
68 self.total_bytes = self.read_bytes;
69 }
70 }
71}
72
73impl From<databend_client::QueryStats> for ServerStats {
74 fn from(stats: databend_client::QueryStats) -> Self {
75 let mut p = Self {
76 total_rows: 0,
77 total_bytes: 0,
78 read_rows: stats.progresses.scan_progress.rows,
79 read_bytes: stats.progresses.scan_progress.bytes,
80 write_rows: stats.progresses.write_progress.rows,
81 write_bytes: stats.progresses.write_progress.bytes,
82 spill_file_nums: stats.progresses.spill_progress.file_nums,
83 spill_bytes: stats.progresses.spill_progress.bytes,
84 running_time_ms: stats.running_time_ms,
85 };
86 if let Some(total) = stats.progresses.total_scan {
87 p.total_rows = total.rows;
88 p.total_bytes = total.bytes;
89 }
90 p
91 }
92}
93
94#[derive(Clone, Debug, Default)]
95pub struct Row {
96 schema: SchemaRef,
97 values: Vec<Value>,
98}
99
100impl Row {
101 pub fn new(schema: SchemaRef, values: Vec<Value>) -> Self {
102 Self { schema, values }
103 }
104
105 pub fn len(&self) -> usize {
106 self.values.len()
107 }
108
109 pub fn is_empty(&self) -> bool {
110 self.values.is_empty()
111 }
112
113 pub fn values(&self) -> &[Value] {
114 &self.values
115 }
116
117 pub fn schema(&self) -> SchemaRef {
118 self.schema.clone()
119 }
120
121 pub fn from_vec(schema: SchemaRef, values: Vec<Value>) -> Self {
122 Self { schema, values }
123 }
124}
125
126impl TryFrom<(SchemaRef, Vec<Option<String>>)> for Row {
127 type Error = Error;
128
129 fn try_from((schema, data): (SchemaRef, Vec<Option<String>>)) -> Result<Self> {
130 let mut values: Vec<Value> = Vec::new();
131 for (i, field) in schema.fields().iter().enumerate() {
132 let val: Option<&str> = data.get(i).and_then(|v| v.as_deref());
133 values.push(Value::try_from((&field.data_type, val))?);
134 }
135 Ok(Self::new(schema, values))
136 }
137}
138
139impl IntoIterator for Row {
140 type Item = Value;
141 type IntoIter = std::vec::IntoIter<Self::Item>;
142
143 fn into_iter(self) -> Self::IntoIter {
144 self.values.into_iter()
145 }
146}
147
148#[derive(Clone, Debug)]
149pub struct Rows {
150 rows: Vec<Row>,
151}
152
153impl Rows {
154 pub fn new(rows: Vec<Row>) -> Self {
155 Self { rows }
156 }
157
158 pub fn rows(&self) -> &[Row] {
163 &self.rows
164 }
165
166 pub fn len(&self) -> usize {
167 self.rows.len()
168 }
169
170 pub fn is_empty(&self) -> bool {
171 self.rows.is_empty()
172 }
173}
174
175#[cfg(feature = "flight-sql")]
176impl TryFrom<RecordBatch> for Rows {
177 type Error = Error;
178 fn try_from(batch: RecordBatch) -> Result<Self> {
179 let batch_schema = batch.schema();
180 let schema = SchemaRef::new(batch_schema.clone().try_into()?);
181 let mut rows: Vec<Row> = Vec::new();
182 for i in 0..batch.num_rows() {
183 let mut values: Vec<Value> = Vec::new();
184 for j in 0..batch_schema.fields().len() {
185 let v = batch.column(j);
186 let field = batch_schema.field(j);
187 let value = Value::try_from((field, v, i))?;
188 values.push(value);
189 }
190 rows.push(Row::new(schema.clone(), values));
191 }
192 Ok(Self::new(rows))
193 }
194}
195
196impl IntoIterator for Rows {
197 type Item = Row;
198 type IntoIter = std::vec::IntoIter<Self::Item>;
199
200 fn into_iter(self) -> Self::IntoIter {
201 self.rows.into_iter()
202 }
203}
204
205macro_rules! replace_expr {
206 ($_t:tt $sub:expr) => {
207 $sub
208 };
209}
210
211macro_rules! impl_tuple_from_row {
213 ( $($Ti:tt),+ ) => {
214 impl<$($Ti),+> TryFrom<Row> for ($($Ti,)+)
215 where
216 $($Ti: TryFrom<Value>),+
217 {
218 type Error = String;
219 fn try_from(row: Row) -> Result<Self, String> {
220 let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
224
225 if expected_len != row.len() {
226 return Err(format!("row size mismatch: expected {} columns, got {}", expected_len, row.len()));
227 }
228 let mut vals_iter = row.into_iter().enumerate();
229
230 Ok((
231 $(
232 {
233 let (col_ix, col_value) = vals_iter
234 .next()
235 .unwrap(); let t = col_value.get_type();
238 $Ti::try_from(col_value)
239 .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
240 }
241 ,)+
242 ))
243 }
244 }
245 }
246}
247
248impl_tuple_from_row!(T1);
250impl_tuple_from_row!(T1, T2);
251impl_tuple_from_row!(T1, T2, T3);
252impl_tuple_from_row!(T1, T2, T3, T4);
253impl_tuple_from_row!(T1, T2, T3, T4, T5);
254impl_tuple_from_row!(T1, T2, T3, T4, T5, T6);
255impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7);
256impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8);
257impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
258impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
259impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
260impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
261impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
262impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
263impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
264impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
265
266pub struct RowIterator {
267 schema: SchemaRef,
268 it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>,
269}
270
271impl RowIterator {
272 pub fn new(schema: SchemaRef, it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>) -> Self {
273 Self { schema, it }
274 }
275
276 pub fn schema(&self) -> SchemaRef {
277 self.schema.clone()
278 }
279
280 pub async fn try_collect<T>(mut self) -> Result<Vec<T>>
281 where
282 T: TryFrom<Row>,
283 T::Error: std::fmt::Display,
284 {
285 let mut ret = Vec::new();
286 while let Some(row) = self.it.next().await {
287 let v = T::try_from(row?).map_err(|e| Error::Parsing(e.to_string()))?;
288 ret.push(v)
289 }
290 Ok(ret)
291 }
292}
293
294impl Stream for RowIterator {
295 type Item = Result<Row>;
296
297 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298 Pin::new(&mut self.it).poll_next(cx)
299 }
300}
301
302pub struct RowStatsIterator {
303 schema: SchemaRef,
304 it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
305}
306
307impl RowStatsIterator {
308 pub fn new(
309 schema: SchemaRef,
310 it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
311 ) -> Self {
312 Self { schema, it }
313 }
314
315 pub fn schema(&self) -> SchemaRef {
316 self.schema.clone()
317 }
318
319 pub async fn filter_rows(self) -> RowIterator {
320 let it = self.it.filter_map(|r| match r {
321 Ok(RowWithStats::Row(r)) => Some(Ok(r)),
322 Ok(_) => None,
323 Err(err) => Some(Err(err)),
324 });
325 RowIterator::new(self.schema, Box::pin(it))
326 }
327}
328
329impl Stream for RowStatsIterator {
330 type Item = Result<RowWithStats>;
331
332 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333 Pin::new(&mut self.it).poll_next(cx)
334 }
335}