1use {
2 super::{context::RowContext, evaluate::evaluate_stateless, filter::check_expr},
3 crate::{
4 ast::{
5 ColumnDef, ColumnUniqueOption, Dictionary, Expr, IndexItem, Join, Query, Select,
6 SelectItem, SetExpr, TableAlias, TableFactor, TableWithJoins, ToSql, ToSqlUnquoted,
7 Values,
8 },
9 data::{Key, Row, Value, get_alias, get_index},
10 executor::{evaluate::evaluate, select::select},
11 result::Result,
12 store::{DataRow, GStore},
13 },
14 async_recursion::async_recursion,
15 futures::{
16 future,
17 stream::{self, Stream, TryStreamExt},
18 },
19 serde::Serialize,
20 std::{borrow::Cow, collections::BTreeMap, fmt::Debug, iter, sync::Arc},
21 thiserror::Error as ThisError,
22};
23
24#[derive(ThisError, Serialize, Debug, PartialEq, Eq)]
25pub enum FetchError {
26 #[error("table not found: {0}")]
27 TableNotFound(String),
28
29 #[error("table alias not found: {0}")]
30 TableAliasNotFound(String),
31
32 #[error("SERIES has wrong size: {0}")]
33 SeriesSizeWrong(i64),
34
35 #[error("table '{0}' has {1} columns available but {2} column aliases specified")]
36 TooManyColumnAliases(String, usize, usize),
37
38 #[error("unreachable")]
39 Unreachable,
40}
41
42pub async fn fetch<'a, T: GStore>(
43 storage: &'a T,
44 table_name: &'a str,
45 columns: Option<Arc<[String]>>,
46 where_clause: Option<&'a Expr>,
47) -> Result<impl Stream<Item = Result<(Key, Row)>> + 'a> {
48 let columns = columns.unwrap_or_else(|| Arc::from([]));
49 let rows = storage
50 .scan_data(table_name)
51 .await?
52 .try_filter_map(move |(key, data_row)| {
53 let row = match data_row {
54 DataRow::Vec(values) => Row::Vec {
55 columns: Arc::clone(&columns),
56 values,
57 },
58 DataRow::Map(values) => Row::Map(values),
59 };
60
61 async move {
62 let Some(expr) = where_clause else {
63 return Ok(Some((key, row)));
64 };
65
66 let context = RowContext::new(table_name, Cow::Borrowed(&row), None);
67
68 check_expr(storage, Some(Arc::new(context)), None, expr)
69 .await
70 .map(|pass| pass.then_some((key, row)))
71 }
72 });
73
74 Ok(rows)
75}
76
77#[derive(futures_enum::Stream)]
78pub enum Rows<I1, I2, I3, I4> {
79 Derived(I1),
80 Table(I2),
81 Series(I3),
82 Dictionary(I4),
83}
84
85pub async fn fetch_relation_rows<'a, T: GStore>(
86 storage: &'a T,
87 table_factor: &'a TableFactor,
88 filter_context: Option<&Arc<RowContext<'a>>>,
89) -> Result<impl Stream<Item = Result<Row>> + 'a> {
90 let columns = Arc::from(
91 fetch_relation_columns(storage, table_factor)
92 .await?
93 .unwrap_or_default(),
94 );
95
96 match table_factor {
97 TableFactor::Derived { subquery, .. } => {
98 let filter_context = filter_context.map(Arc::clone);
99 let rows =
100 select(storage, subquery, filter_context)
101 .await?
102 .map_ok(move |row| match row {
103 Row::Vec { values, .. } => Row::Vec {
104 columns: Arc::clone(&columns),
105 values,
106 },
107 Row::Map(values) => Row::Map(values),
108 });
109
110 Ok(Rows::Derived(rows))
111 }
112 TableFactor::Table { name, .. } => {
113 let rows = {
114 #[derive(futures_enum::Stream)]
115 enum Rows<I1, I2, I3, I4> {
116 Indexed(I1),
117 PrimaryKey(I2),
118 PrimaryKeyEmpty(I3),
119 FullScan(I4),
120 }
121
122 match get_index(table_factor) {
123 Some(IndexItem::NonClustered {
124 name: index_name,
125 asc,
126 cmp_expr,
127 }) => {
128 let cmp_value = match cmp_expr {
129 Some((op, expr)) => {
130 let evaluated = evaluate(storage, None, None, expr).await?;
131
132 Some((op, evaluated.try_into()?))
133 }
134 None => None,
135 };
136
137 let rows = storage
138 .scan_indexed_data(name, index_name, *asc, cmp_value)
139 .await?
140 .map_ok(move |(_, data_row)| match data_row {
141 DataRow::Vec(values) => Row::Vec {
142 columns: Arc::clone(&columns),
143 values,
144 },
145 DataRow::Map(values) => Row::Map(values),
146 });
147
148 Rows::Indexed(rows)
149 }
150 Some(IndexItem::PrimaryKey(expr)) => {
151 let schema = storage
152 .fetch_schema(name)
153 .await?
154 .ok_or(FetchError::Unreachable)?;
155
156 let filter_context = filter_context.map(Arc::clone);
157 let evaluated = evaluate(storage, filter_context, None, expr).await?;
158
159 let column_def = schema
160 .column_defs
161 .as_ref()
162 .and_then(|column_defs| {
163 column_defs.iter().find(|column_def| {
164 column_def.unique.map(|u| u.is_primary) == Some(true)
165 })
166 })
167 .ok_or(FetchError::Unreachable)?;
168
169 let value =
170 evaluated.try_into_value(&column_def.data_type, column_def.nullable)?;
171 let key = Key::try_from(value)?;
172
173 match storage.fetch_data(name, &key).await? {
174 Some(data_row) => {
175 let row = match data_row {
176 DataRow::Vec(values) => Row::Vec {
177 columns: Arc::clone(&columns),
178 values,
179 },
180 DataRow::Map(values) => Row::Map(values),
181 };
182
183 Rows::PrimaryKey(stream::once(future::ready(Ok(row))))
184 }
185 None => Rows::PrimaryKeyEmpty(stream::empty()),
186 }
187 }
188 _ => {
189 let rows = storage.scan_data(name).await?.map_ok(move |(_, data_row)| {
190 match data_row {
191 DataRow::Vec(values) => Row::Vec {
192 columns: Arc::clone(&columns),
193 values,
194 },
195 DataRow::Map(values) => Row::Map(values),
196 }
197 });
198
199 Rows::FullScan(rows)
200 }
201 }
202 };
203
204 Ok(Rows::Table(rows))
205 }
206 TableFactor::Series { size, .. } => {
207 let value: Value = evaluate_stateless(None, size).await?.try_into()?;
208 let size: i64 = value.try_into()?;
209 let size = match size {
210 n if n >= 0 => size,
211 n => return Err(FetchError::SeriesSizeWrong(n).into()),
212 };
213
214 let columns = Arc::from(vec!["N".to_owned()]);
215 let rows = (1..=size).map(move |v| {
216 Ok(Row::Vec {
217 columns: Arc::clone(&columns),
218 values: vec![Value::I64(v)],
219 })
220 });
221
222 Ok(Rows::Series(stream::iter(rows)))
223 }
224 TableFactor::Dictionary { dict, .. } => {
225 let rows = {
226 #[derive(futures_enum::Stream)]
227 enum Rows<I1, I2, I3, I4> {
228 Tables(I1),
229 TableColumns(I2),
230 Indexes(I3),
231 Objects(I4),
232 }
233
234 match dict {
235 Dictionary::GlueObjects => {
236 let schemas = storage.fetch_all_schemas().await?;
237 let table_metas = storage
238 .scan_table_meta()
239 .await?
240 .collect::<Result<BTreeMap<_, _>>>()?;
241 let rows = schemas.into_iter().flat_map(move |schema| {
242 let meta = table_metas
243 .iter()
244 .find_map(|(table_name, hash_map)| {
245 (table_name == &schema.table_name).then(|| hash_map.clone())
246 })
247 .unwrap_or_default();
248
249 let table_rows = BTreeMap::from([
250 ("OBJECT_NAME".to_owned(), Value::Str(schema.table_name)),
251 ("OBJECT_TYPE".to_owned(), Value::Str("TABLE".to_owned())),
252 ])
253 .into_iter()
254 .chain(meta)
255 .collect::<BTreeMap<_, _>>();
256
257 let index_rows = schema.indexes.into_iter().map(|index| {
258 BTreeMap::from([
259 ("OBJECT_NAME".to_owned(), Value::Str(index.name)),
260 ("OBJECT_TYPE".to_owned(), Value::Str("INDEX".to_owned())),
261 ])
262 });
263
264 iter::once(table_rows)
265 .chain(index_rows)
266 .map(|hash_map| Ok(Row::Map(hash_map)))
267 });
268
269 Rows::Objects(stream::iter(rows))
270 }
271 Dictionary::GlueTables => {
272 let schemas = storage.fetch_all_schemas().await?;
273 let rows = schemas.into_iter().map(move |schema| {
274 Ok(Row::Vec {
275 columns: Arc::clone(&columns),
276 values: vec![
277 Value::Str(schema.table_name),
278 schema.comment.map_or(Value::Null, Value::Str),
279 ],
280 })
281 });
282
283 Rows::Tables(stream::iter(rows))
284 }
285 Dictionary::GlueTableColumns => {
286 let schemas = storage.fetch_all_schemas().await?;
287 let rows = schemas.into_iter().flat_map(move |schema| {
288 let columns = Arc::clone(&columns);
289 let table_name = schema.table_name;
290
291 schema
292 .column_defs
293 .unwrap_or_default()
294 .into_iter()
295 .enumerate()
296 .map(move |(index, column_def)| {
297 let values = vec![
298 Value::Str(table_name.clone()),
299 Value::Str(column_def.name),
300 Value::I64(index as i64 + 1),
301 Value::Bool(column_def.nullable),
302 column_def.unique.map_or(Value::Null, |unique| {
303 Value::Str(unique.to_sql())
304 }),
305 column_def
306 .default
307 .map_or(Value::Null, |expr| Value::Str(expr.to_sql())),
308 column_def.comment.map_or(Value::Null, Value::Str),
309 ];
310
311 Ok(Row::Vec {
312 columns: Arc::clone(&columns),
313 values,
314 })
315 })
316 });
317
318 Rows::TableColumns(stream::iter(rows))
319 }
320 Dictionary::GlueIndexes => {
321 let schemas = storage.fetch_all_schemas().await?;
322 let rows = schemas.into_iter().flat_map(move |schema| {
323 let column_defs = schema.column_defs.unwrap_or_default();
324 let primary_column = column_defs.iter().find_map(|column_def| {
325 let ColumnDef { name, unique, .. } = column_def;
326
327 (unique == &Some(ColumnUniqueOption { is_primary: true }))
328 .then_some(name)
329 });
330
331 let clustered = match primary_column {
332 Some(column_name) => {
333 let values = vec![
334 Value::Str(schema.table_name.clone()),
335 Value::Str("PRIMARY".to_owned()),
336 Value::Str("BOTH".to_owned()),
337 Value::Str(column_name.to_owned()),
338 Value::Bool(true),
339 ];
340
341 let row = Row::Vec {
342 columns: Arc::clone(&columns),
343 values,
344 };
345
346 vec![Ok(row)]
347 }
348 None => Vec::new(),
349 };
350
351 let columns = Arc::clone(&columns);
352 let non_clustered = schema.indexes.into_iter().map(move |index| {
353 let values = vec![
354 Value::Str(schema.table_name.clone()),
355 Value::Str(index.name),
356 Value::Str(index.order.to_string()),
357 Value::Str(index.expr.to_sql_unquoted()),
358 Value::Bool(false),
359 ];
360
361 Ok(Row::Vec {
362 columns: Arc::clone(&columns),
363 values,
364 })
365 });
366
367 clustered.into_iter().chain(non_clustered)
368 });
369
370 Rows::Indexes(stream::iter(rows))
371 }
372 }
373 };
374
375 Ok(Rows::Dictionary(rows))
376 }
377 }
378}
379
380pub async fn fetch_columns<T: GStore>(
381 storage: &T,
382 table_name: &str,
383) -> Result<Option<Vec<String>>> {
384 let columns = storage
385 .fetch_schema(table_name)
386 .await?
387 .ok_or_else(|| FetchError::TableNotFound(table_name.to_owned()))?
388 .column_defs
389 .map(|column_defs| {
390 column_defs
391 .into_iter()
392 .map(|column_def| column_def.name)
393 .collect()
394 });
395
396 Ok(columns)
397}
398
399#[async_recursion]
400pub async fn fetch_relation_columns<T>(
401 storage: &T,
402 table_factor: &TableFactor,
403) -> Result<Option<Vec<String>>>
404where
405 T: GStore,
406{
407 match table_factor {
408 TableFactor::Table { name, alias, .. } => {
409 let columns = fetch_columns(storage, name).await?;
410 match (columns, alias) {
411 (columns, None) => Ok(columns),
412 (None, Some(_)) => Ok(None),
413 (Some(columns), Some(alias)) if alias.columns.len() > columns.len() => {
414 Err(FetchError::TooManyColumnAliases(
415 name.to_string(),
416 columns.len(),
417 alias.columns.len(),
418 )
419 .into())
420 }
421 (Some(columns), Some(alias)) => Ok(Some(
422 alias
423 .columns
424 .iter()
425 .cloned()
426 .chain(columns[alias.columns.len()..columns.len()].to_vec())
427 .collect(),
428 )),
429 }
430 }
431 TableFactor::Series { .. } => Ok(Some(vec!["N".to_owned()])),
432 TableFactor::Dictionary { dict, .. } => Ok(Some(match dict {
433 Dictionary::GlueObjects => vec![
434 "OBJECT_NAME".to_owned(),
435 "OBJECT_TYPE".to_owned(),
436 "CREATED".to_owned(),
437 ],
438 Dictionary::GlueTables => vec!["TABLE_NAME".to_owned(), "COMMENT".to_owned()],
439 Dictionary::GlueTableColumns => vec![
440 "TABLE_NAME".to_owned(),
441 "COLUMN_NAME".to_owned(),
442 "COLUMN_ID".to_owned(),
443 "NULLABLE".to_owned(),
444 "KEY".to_owned(),
445 "DEFAULT".to_owned(),
446 "COMMENT".to_owned(),
447 ],
448 Dictionary::GlueIndexes => vec![
449 "TABLE_NAME".to_owned(),
450 "INDEX_NAME".to_owned(),
451 "ORDER".to_owned(),
452 "EXPRESSION".to_owned(),
453 "UNIQUENESS".to_owned(),
454 ],
455 })),
456 TableFactor::Derived {
457 subquery: Query { body, .. },
458 alias:
459 TableAlias {
460 columns: alias_columns,
461 name,
462 },
463 } => match body {
464 SetExpr::Select(statement) => {
465 let Select {
466 from:
467 TableWithJoins {
468 relation, joins, ..
469 },
470 projection,
471 ..
472 } = statement.as_ref();
473
474 let labels = fetch_labels(storage, relation, joins, projection).await?;
475 match labels {
476 None => Ok(None),
477 Some(labels) if alias_columns.is_empty() => Ok(Some(labels)),
478 Some(labels) if alias_columns.len() > labels.len() => {
479 Err(FetchError::TooManyColumnAliases(
480 name.to_string(),
481 labels.len(),
482 alias_columns.len(),
483 )
484 .into())
485 }
486 Some(labels) => Ok(Some(
487 alias_columns
488 .iter()
489 .cloned()
490 .chain(labels[alias_columns.len()..labels.len()].to_vec())
491 .collect(),
492 )),
493 }
494 }
495 SetExpr::Values(Values(values_list)) => {
496 let total_len = values_list[0].len();
497 let alias_len = alias_columns.len();
498 if alias_len > total_len {
499 return Err(FetchError::TooManyColumnAliases(
500 name.into(),
501 total_len,
502 alias_len,
503 )
504 .into());
505 }
506 let labels = (alias_len + 1..=total_len).map(|i| format!("column{i}"));
507 let labels = alias_columns
508 .iter()
509 .cloned()
510 .chain(labels)
511 .collect::<Vec<_>>();
512
513 Ok(Some(labels))
514 }
515 },
516 }
517}
518
519async fn fetch_join_columns<'a, T: GStore>(
520 storage: &T,
521 joins: &'a [Join],
522) -> Result<Option<Vec<(&'a String, Vec<String>)>>> {
523 let mut all_columns = Vec::with_capacity(joins.len());
524 for join in joins {
525 if let Some(columns) = fetch_relation_columns(storage, &join.relation).await? {
526 let alias = get_alias(&join.relation);
527 all_columns.push((alias, columns));
528 } else {
529 return Ok(None);
530 }
531 }
532 Ok(Some(all_columns))
533}
534
535pub async fn fetch_labels<T: GStore>(
536 storage: &T,
537 relation: &TableFactor,
538 joins: &[Join],
539 projection: &[SelectItem],
540) -> Result<Option<Vec<String>>> {
541 let table_alias = get_alias(relation);
542 let columns = fetch_relation_columns(storage, relation).await?;
543 let join_columns = fetch_join_columns(storage, joins).await?;
544
545 if (columns.is_none() || join_columns.is_none())
546 && projection.iter().any(|item| {
547 matches!(
548 item,
549 SelectItem::Wildcard | SelectItem::QualifiedWildcard(_)
550 )
551 })
552 {
553 return Ok(None);
554 }
555
556 let columns = columns.unwrap_or_default();
557 let join_columns = join_columns.unwrap_or_default();
558
559 projection
560 .iter()
561 .flat_map(|item| match item {
562 SelectItem::Wildcard => {
563 let columns = columns.iter().cloned();
564 let join_columns = join_columns.iter().flat_map(|(_, columns)| columns.clone());
565
566 columns.chain(join_columns).map(Ok).collect()
567 }
568 SelectItem::QualifiedWildcard(target_table_alias) => {
569 if table_alias == target_table_alias {
570 return columns.iter().cloned().map(Ok).collect();
571 }
572
573 let labels = join_columns
574 .iter()
575 .find(|(table_alias, _)| table_alias == &target_table_alias)
576 .map(|(_, columns)| columns.clone());
577
578 match labels {
579 Some(columns) => columns.into_iter().map(Ok).collect(),
580 None => {
581 vec![Err(FetchError::TableAliasNotFound(
582 target_table_alias.to_owned(),
583 )
584 .into())]
585 }
586 }
587 }
588 SelectItem::Expr { label, .. } => vec![Ok(label.to_owned())],
589 })
590 .collect::<Result<_>>()
591 .map(Some)
592}