1use std::pin::Pin;
16use std::task::Context;
17use std::task::Poll;
18
19use serde::Deserialize;
20use tokio_stream::{Stream, StreamExt};
21
22use crate::error::{Error, Result};
23use crate::value::Value;
24use arrow::record_batch::RecordBatch;
25use databend_client::schema::SchemaRef;
26use databend_client::ResultFormatSettings;
27use jiff::tz::TimeZone;
28
29#[derive(Clone, Debug)]
30pub enum RowWithStats {
31 Row(Row),
32 Stats(ServerStats),
33}
34
35#[derive(Deserialize, Clone, Debug, Default)]
36pub struct ServerStats {
37 #[serde(default)]
38 pub total_rows: usize,
39 #[serde(default)]
40 pub total_bytes: usize,
41
42 #[serde(default)]
43 pub read_rows: usize,
44 #[serde(default)]
45 pub read_bytes: usize,
46
47 #[serde(default)]
48 pub write_rows: usize,
49 #[serde(default)]
50 pub write_bytes: usize,
51
52 #[serde(default)]
53 pub running_time_ms: f64,
54
55 #[serde(default)]
56 pub spill_file_nums: usize,
57
58 #[serde(default)]
59 pub spill_bytes: usize,
60}
61
62impl ServerStats {
63 pub fn normalize(&mut self) {
64 if self.total_rows == 0 {
65 self.total_rows = self.read_rows;
66 }
67 if self.total_bytes == 0 {
68 self.total_bytes = self.read_bytes;
69 }
70 }
71
72 pub fn merge(&mut self, other: &ServerStats) {
73 self.total_rows += other.total_rows;
74 self.total_bytes += other.total_bytes;
75 self.read_rows += other.read_rows;
76 self.read_bytes += other.read_bytes;
77 self.write_rows += other.write_rows;
78 self.write_bytes += other.write_bytes;
79 self.running_time_ms += other.running_time_ms;
80 self.spill_file_nums += other.spill_file_nums;
81 self.spill_bytes += other.spill_bytes;
82 }
83}
84
85impl From<databend_client::QueryStats> for ServerStats {
86 fn from(stats: databend_client::QueryStats) -> Self {
87 let mut p = Self {
88 total_rows: 0,
89 total_bytes: 0,
90 read_rows: stats.progresses.scan_progress.rows,
91 read_bytes: stats.progresses.scan_progress.bytes,
92 write_rows: stats.progresses.write_progress.rows,
93 write_bytes: stats.progresses.write_progress.bytes,
94 spill_file_nums: stats.progresses.spill_progress.file_nums,
95 spill_bytes: stats.progresses.spill_progress.bytes,
96 running_time_ms: stats.running_time_ms,
97 };
98 if let Some(total) = stats.progresses.total_scan {
99 p.total_rows = total.rows;
100 p.total_bytes = total.bytes;
101 }
102 p
103 }
104}
105
106#[derive(Clone, Debug, Default)]
107pub struct Row {
108 schema: SchemaRef,
109 values: Vec<Value>,
110}
111
112impl Row {
113 pub fn new(schema: SchemaRef, values: Vec<Value>) -> Self {
114 Self { schema, values }
115 }
116
117 pub fn len(&self) -> usize {
118 self.values.len()
119 }
120
121 pub fn is_empty(&self) -> bool {
122 self.values.is_empty()
123 }
124
125 pub fn values(&self) -> &[Value] {
126 &self.values
127 }
128
129 pub fn schema(&self) -> SchemaRef {
130 self.schema.clone()
131 }
132
133 pub fn from_vec(schema: SchemaRef, values: Vec<Value>) -> Self {
134 Self { schema, values }
135 }
136}
137
138impl TryFrom<(SchemaRef, Vec<Option<String>>, &TimeZone)> for Row {
139 type Error = Error;
140
141 fn try_from((schema, data, tz): (SchemaRef, Vec<Option<String>>, &TimeZone)) -> Result<Self> {
142 let mut values: Vec<Value> = Vec::with_capacity(data.len());
143 for (field, val) in schema.fields().iter().zip(data.into_iter()) {
144 values.push(Value::try_from((&field.data_type, val, tz))?);
145 }
146 Ok(Self::new(schema, values))
147 }
148}
149
150impl IntoIterator for Row {
151 type Item = Value;
152 type IntoIter = std::vec::IntoIter<Self::Item>;
153
154 fn into_iter(self) -> Self::IntoIter {
155 self.values.into_iter()
156 }
157}
158
159#[derive(Clone, Debug)]
160pub struct Rows {
161 rows: Vec<Row>,
162}
163
164impl Rows {
165 pub fn new(rows: Vec<Row>) -> Self {
166 Self { rows }
167 }
168
169 pub fn rows(&self) -> &[Row] {
174 &self.rows
175 }
176
177 pub fn len(&self) -> usize {
178 self.rows.len()
179 }
180
181 pub fn is_empty(&self) -> bool {
182 self.rows.is_empty()
183 }
184}
185
186impl TryFrom<(RecordBatch, ResultFormatSettings)> for Rows {
187 type Error = Error;
188 fn try_from((batch, settings): (RecordBatch, ResultFormatSettings)) -> Result<Self> {
189 let batch_schema = batch.schema();
190 let schema = SchemaRef::new(batch_schema.clone().try_into()?);
191 let mut rows: Vec<Row> = Vec::new();
192 for i in 0..batch.num_rows() {
193 let mut values: Vec<Value> = Vec::new();
194 for j in 0..batch_schema.fields().len() {
195 let v = batch.column(j);
196 let field = batch_schema.field(j);
197 let value = Value::try_from((field, v, i, &settings))?;
198 values.push(value);
199 }
200 rows.push(Row::new(schema.clone(), values));
201 }
202 Ok(Self::new(rows))
203 }
204}
205
206impl IntoIterator for Rows {
207 type Item = Row;
208 type IntoIter = std::vec::IntoIter<Self::Item>;
209
210 fn into_iter(self) -> Self::IntoIter {
211 self.rows.into_iter()
212 }
213}
214
215macro_rules! replace_expr {
216 ($_t:tt $sub:expr) => {
217 $sub
218 };
219}
220
221macro_rules! impl_tuple_from_row {
223 ( $($Ti:tt),+ ) => {
224 impl<$($Ti),+> TryFrom<Row> for ($($Ti,)+)
225 where
226 $($Ti: TryFrom<Value>),+
227 {
228 type Error = String;
229 fn try_from(row: Row) -> Result<Self, String> {
230 let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
234
235 if expected_len != row.len() {
236 return Err(format!("row size mismatch: expected {} columns, got {}", expected_len, row.len()));
237 }
238 let mut vals_iter = row.into_iter().enumerate();
239
240 Ok((
241 $(
242 {
243 let (col_ix, col_value) = vals_iter
244 .next()
245 .unwrap(); let t = col_value.get_type();
248 $Ti::try_from(col_value)
249 .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
250 }
251 ,)+
252 ))
253 }
254 }
255 }
256}
257
258impl_tuple_from_row!(T1);
260impl_tuple_from_row!(T1, T2);
261impl_tuple_from_row!(T1, T2, T3);
262impl_tuple_from_row!(T1, T2, T3, T4);
263impl_tuple_from_row!(T1, T2, T3, T4, T5);
264impl_tuple_from_row!(T1, T2, T3, T4, T5, T6);
265impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7);
266impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8);
267impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
268impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
269impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
270impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
271impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
272impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
273impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
274impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
275
276pub struct RowIterator {
277 schema: SchemaRef,
278 it: Option<Pin<Box<dyn Stream<Item = Result<Row>> + Send>>>,
279}
280
281impl RowIterator {
282 pub fn new(schema: SchemaRef, it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>) -> Self {
283 Self {
284 schema,
285 it: Some(it),
286 }
287 }
288
289 pub fn schema(&self) -> SchemaRef {
290 self.schema.clone()
291 }
292
293 pub async fn try_collect<T>(mut self) -> Result<Vec<T>>
294 where
295 T: TryFrom<Row>,
296 T::Error: std::fmt::Display,
297 {
298 if let Some(it) = &mut self.it {
299 let mut ret = Vec::new();
300 while let Some(row) = it.next().await {
301 let v = T::try_from(row?).map_err(|e| Error::Parsing(e.to_string()))?;
302 ret.push(v)
303 }
304 Ok(ret)
305 } else {
306 Err(Error::BadArgument("RowIterator already closed".to_owned()))
307 }
308 }
309
310 pub fn close(&mut self) {
311 self.it = None;
312 }
313}
314
315impl Stream for RowIterator {
316 type Item = Result<Row>;
317
318 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
319 if let Some(it) = self.it.as_mut() {
320 Pin::new(it).poll_next(cx)
321 } else {
322 Poll::Ready(Some(Err(Error::BadArgument(
323 "RowIterator already closed".to_owned(),
324 ))))
325 }
326 }
327}
328
329pub struct RowStatsIterator {
330 schema: SchemaRef,
331 it: Option<Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>>,
332}
333
334impl RowStatsIterator {
335 pub fn new(
336 schema: SchemaRef,
337 it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
338 ) -> Self {
339 Self {
340 schema,
341 it: Some(it),
342 }
343 }
344
345 pub fn schema(&self) -> SchemaRef {
346 self.schema.clone()
347 }
348
349 pub async fn filter_rows(self) -> Result<RowIterator> {
350 if let Some(it) = self.it {
351 let it = it.filter_map(|r| match r {
352 Ok(RowWithStats::Row(r)) => Some(Ok(r)),
353 Ok(_) => None,
354 Err(err) => Some(Err(err)),
355 });
356 Ok(RowIterator::new(self.schema, Box::pin(it)))
357 } else {
358 Err(Error::BadArgument("RowIterator already closed".to_owned()))
359 }
360 }
361
362 pub fn close(&mut self) {
363 self.it = None;
364 }
365}
366
367impl Stream for RowStatsIterator {
368 type Item = Result<RowWithStats>;
369
370 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
371 if let Some(it) = self.it.as_mut() {
372 Pin::new(it).poll_next(cx)
373 } else {
374 Poll::Ready(Some(Err(Error::BadArgument(
375 "RowStatsIterator already closed".to_owned(),
376 ))))
377 }
378 }
379}