1use std::pin::Pin;
16use std::task::Context;
17use std::task::Poll;
18
19use serde::Deserialize;
20use tokio_stream::{Stream, StreamExt};
21
22#[cfg(feature = "flight-sql")]
23use arrow::record_batch::RecordBatch;
24
25use crate::error::{Error, Result};
26use crate::schema::SchemaRef;
27use crate::value::Value;
28
29#[derive(Clone, Debug)]
30pub enum RowWithStats {
31 Row(Row),
32 Stats(ServerStats),
33}
34
35#[derive(Deserialize, Clone, Debug, Default)]
36pub struct ServerStats {
37 #[serde(default)]
38 pub total_rows: usize,
39 #[serde(default)]
40 pub total_bytes: usize,
41
42 #[serde(default)]
43 pub read_rows: usize,
44 #[serde(default)]
45 pub read_bytes: usize,
46
47 #[serde(default)]
48 pub write_rows: usize,
49 #[serde(default)]
50 pub write_bytes: usize,
51
52 #[serde(default)]
53 pub running_time_ms: f64,
54
55 #[serde(default)]
56 pub spill_file_nums: usize,
57
58 #[serde(default)]
59 pub spill_bytes: usize,
60}
61
62impl ServerStats {
63 pub fn normalize(&mut self) {
64 if self.total_rows == 0 {
65 self.total_rows = self.read_rows;
66 }
67 if self.total_bytes == 0 {
68 self.total_bytes = self.read_bytes;
69 }
70 }
71
72 pub fn merge(&mut self, other: &ServerStats) {
73 self.total_rows += other.total_rows;
74 self.total_bytes += other.total_bytes;
75 self.read_rows += other.read_rows;
76 self.read_bytes += other.read_bytes;
77 self.write_rows += other.write_rows;
78 self.write_bytes += other.write_bytes;
79 self.running_time_ms += other.running_time_ms;
80 self.spill_file_nums += other.spill_file_nums;
81 self.spill_bytes += other.spill_bytes;
82 }
83}
84
85impl From<databend_client::QueryStats> for ServerStats {
86 fn from(stats: databend_client::QueryStats) -> Self {
87 let mut p = Self {
88 total_rows: 0,
89 total_bytes: 0,
90 read_rows: stats.progresses.scan_progress.rows,
91 read_bytes: stats.progresses.scan_progress.bytes,
92 write_rows: stats.progresses.write_progress.rows,
93 write_bytes: stats.progresses.write_progress.bytes,
94 spill_file_nums: stats.progresses.spill_progress.file_nums,
95 spill_bytes: stats.progresses.spill_progress.bytes,
96 running_time_ms: stats.running_time_ms,
97 };
98 if let Some(total) = stats.progresses.total_scan {
99 p.total_rows = total.rows;
100 p.total_bytes = total.bytes;
101 }
102 p
103 }
104}
105
106#[derive(Clone, Debug, Default)]
107pub struct Row {
108 schema: SchemaRef,
109 values: Vec<Value>,
110}
111
112impl Row {
113 pub fn new(schema: SchemaRef, values: Vec<Value>) -> Self {
114 Self { schema, values }
115 }
116
117 pub fn len(&self) -> usize {
118 self.values.len()
119 }
120
121 pub fn is_empty(&self) -> bool {
122 self.values.is_empty()
123 }
124
125 pub fn values(&self) -> &[Value] {
126 &self.values
127 }
128
129 pub fn schema(&self) -> SchemaRef {
130 self.schema.clone()
131 }
132
133 pub fn from_vec(schema: SchemaRef, values: Vec<Value>) -> Self {
134 Self { schema, values }
135 }
136}
137
138impl TryFrom<(SchemaRef, Vec<Option<String>>)> for Row {
139 type Error = Error;
140
141 fn try_from((schema, data): (SchemaRef, Vec<Option<String>>)) -> Result<Self> {
142 let mut values: Vec<Value> = Vec::new();
143 for (i, field) in schema.fields().iter().enumerate() {
144 let val: Option<&str> = data.get(i).and_then(|v| v.as_deref());
145 values.push(Value::try_from((&field.data_type, val))?);
146 }
147 Ok(Self::new(schema, values))
148 }
149}
150
151impl IntoIterator for Row {
152 type Item = Value;
153 type IntoIter = std::vec::IntoIter<Self::Item>;
154
155 fn into_iter(self) -> Self::IntoIter {
156 self.values.into_iter()
157 }
158}
159
160#[derive(Clone, Debug)]
161pub struct Rows {
162 rows: Vec<Row>,
163}
164
165impl Rows {
166 pub fn new(rows: Vec<Row>) -> Self {
167 Self { rows }
168 }
169
170 pub fn rows(&self) -> &[Row] {
175 &self.rows
176 }
177
178 pub fn len(&self) -> usize {
179 self.rows.len()
180 }
181
182 pub fn is_empty(&self) -> bool {
183 self.rows.is_empty()
184 }
185}
186
187#[cfg(feature = "flight-sql")]
188impl TryFrom<RecordBatch> for Rows {
189 type Error = Error;
190 fn try_from(batch: RecordBatch) -> Result<Self> {
191 let batch_schema = batch.schema();
192 let schema = SchemaRef::new(batch_schema.clone().try_into()?);
193 let mut rows: Vec<Row> = Vec::new();
194 for i in 0..batch.num_rows() {
195 let mut values: Vec<Value> = Vec::new();
196 for j in 0..batch_schema.fields().len() {
197 let v = batch.column(j);
198 let field = batch_schema.field(j);
199 let value = Value::try_from((field, v, i))?;
200 values.push(value);
201 }
202 rows.push(Row::new(schema.clone(), values));
203 }
204 Ok(Self::new(rows))
205 }
206}
207
208impl IntoIterator for Rows {
209 type Item = Row;
210 type IntoIter = std::vec::IntoIter<Self::Item>;
211
212 fn into_iter(self) -> Self::IntoIter {
213 self.rows.into_iter()
214 }
215}
216
217macro_rules! replace_expr {
218 ($_t:tt $sub:expr) => {
219 $sub
220 };
221}
222
223macro_rules! impl_tuple_from_row {
225 ( $($Ti:tt),+ ) => {
226 impl<$($Ti),+> TryFrom<Row> for ($($Ti,)+)
227 where
228 $($Ti: TryFrom<Value>),+
229 {
230 type Error = String;
231 fn try_from(row: Row) -> Result<Self, String> {
232 let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
236
237 if expected_len != row.len() {
238 return Err(format!("row size mismatch: expected {} columns, got {}", expected_len, row.len()));
239 }
240 let mut vals_iter = row.into_iter().enumerate();
241
242 Ok((
243 $(
244 {
245 let (col_ix, col_value) = vals_iter
246 .next()
247 .unwrap(); let t = col_value.get_type();
250 $Ti::try_from(col_value)
251 .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
252 }
253 ,)+
254 ))
255 }
256 }
257 }
258}
259
260impl_tuple_from_row!(T1);
262impl_tuple_from_row!(T1, T2);
263impl_tuple_from_row!(T1, T2, T3);
264impl_tuple_from_row!(T1, T2, T3, T4);
265impl_tuple_from_row!(T1, T2, T3, T4, T5);
266impl_tuple_from_row!(T1, T2, T3, T4, T5, T6);
267impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7);
268impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8);
269impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
270impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
271impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
272impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
273impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
274impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
275impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
276impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
277
278pub struct RowIterator {
279 schema: SchemaRef,
280 it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>,
281}
282
283impl RowIterator {
284 pub fn new(schema: SchemaRef, it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>) -> Self {
285 Self { schema, it }
286 }
287
288 pub fn schema(&self) -> SchemaRef {
289 self.schema.clone()
290 }
291
292 pub async fn try_collect<T>(mut self) -> Result<Vec<T>>
293 where
294 T: TryFrom<Row>,
295 T::Error: std::fmt::Display,
296 {
297 let mut ret = Vec::new();
298 while let Some(row) = self.it.next().await {
299 let v = T::try_from(row?).map_err(|e| Error::Parsing(e.to_string()))?;
300 ret.push(v)
301 }
302 Ok(ret)
303 }
304}
305
306impl Stream for RowIterator {
307 type Item = Result<Row>;
308
309 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
310 Pin::new(&mut self.it).poll_next(cx)
311 }
312}
313
314pub struct RowStatsIterator {
315 schema: SchemaRef,
316 it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
317}
318
319impl RowStatsIterator {
320 pub fn new(
321 schema: SchemaRef,
322 it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
323 ) -> Self {
324 Self { schema, it }
325 }
326
327 pub fn schema(&self) -> SchemaRef {
328 self.schema.clone()
329 }
330
331 pub async fn filter_rows(self) -> RowIterator {
332 let it = self.it.filter_map(|r| match r {
333 Ok(RowWithStats::Row(r)) => Some(Ok(r)),
334 Ok(_) => None,
335 Err(err) => Some(Err(err)),
336 });
337 RowIterator::new(self.schema, Box::pin(it))
338 }
339}
340
341impl Stream for RowStatsIterator {
342 type Item = Result<RowWithStats>;
343
344 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
345 Pin::new(&mut self.it).poll_next(cx)
346 }
347}