gluesql_core/executor/
fetch.rs

1use {
2    super::{context::RowContext, evaluate::evaluate_stateless, filter::check_expr},
3    crate::{
4        ast::{
5            ColumnDef, ColumnUniqueOption, Dictionary, Expr, IndexItem, Join, Query, Select,
6            SelectItem, SetExpr, TableAlias, TableFactor, TableWithJoins, ToSql, ToSqlUnquoted,
7            Values,
8        },
9        data::{Key, Row, Value, get_alias, get_index},
10        executor::{evaluate::evaluate, select::select},
11        result::Result,
12        store::{DataRow, GStore},
13    },
14    async_recursion::async_recursion,
15    futures::{
16        future,
17        stream::{self, Stream, TryStreamExt},
18    },
19    serde::Serialize,
20    std::{borrow::Cow, collections::BTreeMap, fmt::Debug, iter, sync::Arc},
21    thiserror::Error as ThisError,
22};
23
24#[derive(ThisError, Serialize, Debug, PartialEq, Eq)]
25pub enum FetchError {
26    #[error("table not found: {0}")]
27    TableNotFound(String),
28
29    #[error("table alias not found: {0}")]
30    TableAliasNotFound(String),
31
32    #[error("SERIES has wrong size: {0}")]
33    SeriesSizeWrong(i64),
34
35    #[error("table '{0}' has {1} columns available but {2} column aliases specified")]
36    TooManyColumnAliases(String, usize, usize),
37
38    #[error("unreachable")]
39    Unreachable,
40}
41
42pub async fn fetch<'a, T: GStore>(
43    storage: &'a T,
44    table_name: &'a str,
45    columns: Option<Arc<[String]>>,
46    where_clause: Option<&'a Expr>,
47) -> Result<impl Stream<Item = Result<(Key, Row)>> + 'a> {
48    let columns = columns.unwrap_or_else(|| Arc::from([]));
49    let rows = storage
50        .scan_data(table_name)
51        .await?
52        .try_filter_map(move |(key, data_row)| {
53            let row = match data_row {
54                DataRow::Vec(values) => Row::Vec {
55                    columns: Arc::clone(&columns),
56                    values,
57                },
58                DataRow::Map(values) => Row::Map(values),
59            };
60
61            async move {
62                let Some(expr) = where_clause else {
63                    return Ok(Some((key, row)));
64                };
65
66                let context = RowContext::new(table_name, Cow::Borrowed(&row), None);
67
68                check_expr(storage, Some(Arc::new(context)), None, expr)
69                    .await
70                    .map(|pass| pass.then_some((key, row)))
71            }
72        });
73
74    Ok(rows)
75}
76
77#[derive(futures_enum::Stream)]
78pub enum Rows<I1, I2, I3, I4> {
79    Derived(I1),
80    Table(I2),
81    Series(I3),
82    Dictionary(I4),
83}
84
85pub async fn fetch_relation_rows<'a, T: GStore>(
86    storage: &'a T,
87    table_factor: &'a TableFactor,
88    filter_context: Option<&Arc<RowContext<'a>>>,
89) -> Result<impl Stream<Item = Result<Row>> + 'a> {
90    let columns = Arc::from(
91        fetch_relation_columns(storage, table_factor)
92            .await?
93            .unwrap_or_default(),
94    );
95
96    match table_factor {
97        TableFactor::Derived { subquery, .. } => {
98            let filter_context = filter_context.map(Arc::clone);
99            let rows =
100                select(storage, subquery, filter_context)
101                    .await?
102                    .map_ok(move |row| match row {
103                        Row::Vec { values, .. } => Row::Vec {
104                            columns: Arc::clone(&columns),
105                            values,
106                        },
107                        Row::Map(values) => Row::Map(values),
108                    });
109
110            Ok(Rows::Derived(rows))
111        }
112        TableFactor::Table { name, .. } => {
113            let rows = {
114                #[derive(futures_enum::Stream)]
115                enum Rows<I1, I2, I3, I4> {
116                    Indexed(I1),
117                    PrimaryKey(I2),
118                    PrimaryKeyEmpty(I3),
119                    FullScan(I4),
120                }
121
122                match get_index(table_factor) {
123                    Some(IndexItem::NonClustered {
124                        name: index_name,
125                        asc,
126                        cmp_expr,
127                    }) => {
128                        let cmp_value = match cmp_expr {
129                            Some((op, expr)) => {
130                                let evaluated = evaluate(storage, None, None, expr).await?;
131
132                                Some((op, evaluated.try_into()?))
133                            }
134                            None => None,
135                        };
136
137                        let rows = storage
138                            .scan_indexed_data(name, index_name, *asc, cmp_value)
139                            .await?
140                            .map_ok(move |(_, data_row)| match data_row {
141                                DataRow::Vec(values) => Row::Vec {
142                                    columns: Arc::clone(&columns),
143                                    values,
144                                },
145                                DataRow::Map(values) => Row::Map(values),
146                            });
147
148                        Rows::Indexed(rows)
149                    }
150                    Some(IndexItem::PrimaryKey(expr)) => {
151                        let schema = storage
152                            .fetch_schema(name)
153                            .await?
154                            .ok_or(FetchError::Unreachable)?;
155
156                        let filter_context = filter_context.map(Arc::clone);
157                        let evaluated = evaluate(storage, filter_context, None, expr).await?;
158
159                        let column_def = schema
160                            .column_defs
161                            .as_ref()
162                            .and_then(|column_defs| {
163                                column_defs.iter().find(|column_def| {
164                                    column_def.unique.map(|u| u.is_primary) == Some(true)
165                                })
166                            })
167                            .ok_or(FetchError::Unreachable)?;
168
169                        let value =
170                            evaluated.try_into_value(&column_def.data_type, column_def.nullable)?;
171                        let key = Key::try_from(value)?;
172
173                        match storage.fetch_data(name, &key).await? {
174                            Some(data_row) => {
175                                let row = match data_row {
176                                    DataRow::Vec(values) => Row::Vec {
177                                        columns: Arc::clone(&columns),
178                                        values,
179                                    },
180                                    DataRow::Map(values) => Row::Map(values),
181                                };
182
183                                Rows::PrimaryKey(stream::once(future::ready(Ok(row))))
184                            }
185                            None => Rows::PrimaryKeyEmpty(stream::empty()),
186                        }
187                    }
188                    _ => {
189                        let rows = storage.scan_data(name).await?.map_ok(move |(_, data_row)| {
190                            match data_row {
191                                DataRow::Vec(values) => Row::Vec {
192                                    columns: Arc::clone(&columns),
193                                    values,
194                                },
195                                DataRow::Map(values) => Row::Map(values),
196                            }
197                        });
198
199                        Rows::FullScan(rows)
200                    }
201                }
202            };
203
204            Ok(Rows::Table(rows))
205        }
206        TableFactor::Series { size, .. } => {
207            let value: Value = evaluate_stateless(None, size).await?.try_into()?;
208            let size: i64 = value.try_into()?;
209            let size = match size {
210                n if n >= 0 => size,
211                n => return Err(FetchError::SeriesSizeWrong(n).into()),
212            };
213
214            let columns = Arc::from(vec!["N".to_owned()]);
215            let rows = (1..=size).map(move |v| {
216                Ok(Row::Vec {
217                    columns: Arc::clone(&columns),
218                    values: vec![Value::I64(v)],
219                })
220            });
221
222            Ok(Rows::Series(stream::iter(rows)))
223        }
224        TableFactor::Dictionary { dict, .. } => {
225            let rows = {
226                #[derive(futures_enum::Stream)]
227                enum Rows<I1, I2, I3, I4> {
228                    Tables(I1),
229                    TableColumns(I2),
230                    Indexes(I3),
231                    Objects(I4),
232                }
233
234                match dict {
235                    Dictionary::GlueObjects => {
236                        let schemas = storage.fetch_all_schemas().await?;
237                        let table_metas = storage
238                            .scan_table_meta()
239                            .await?
240                            .collect::<Result<BTreeMap<_, _>>>()?;
241                        let rows = schemas.into_iter().flat_map(move |schema| {
242                            let meta = table_metas
243                                .iter()
244                                .find_map(|(table_name, hash_map)| {
245                                    (table_name == &schema.table_name).then(|| hash_map.clone())
246                                })
247                                .unwrap_or_default();
248
249                            let table_rows = BTreeMap::from([
250                                ("OBJECT_NAME".to_owned(), Value::Str(schema.table_name)),
251                                ("OBJECT_TYPE".to_owned(), Value::Str("TABLE".to_owned())),
252                            ])
253                            .into_iter()
254                            .chain(meta)
255                            .collect::<BTreeMap<_, _>>();
256
257                            let index_rows = schema.indexes.into_iter().map(|index| {
258                                BTreeMap::from([
259                                    ("OBJECT_NAME".to_owned(), Value::Str(index.name)),
260                                    ("OBJECT_TYPE".to_owned(), Value::Str("INDEX".to_owned())),
261                                ])
262                            });
263
264                            iter::once(table_rows)
265                                .chain(index_rows)
266                                .map(|hash_map| Ok(Row::Map(hash_map)))
267                        });
268
269                        Rows::Objects(stream::iter(rows))
270                    }
271                    Dictionary::GlueTables => {
272                        let schemas = storage.fetch_all_schemas().await?;
273                        let rows = schemas.into_iter().map(move |schema| {
274                            Ok(Row::Vec {
275                                columns: Arc::clone(&columns),
276                                values: vec![
277                                    Value::Str(schema.table_name),
278                                    schema.comment.map_or(Value::Null, Value::Str),
279                                ],
280                            })
281                        });
282
283                        Rows::Tables(stream::iter(rows))
284                    }
285                    Dictionary::GlueTableColumns => {
286                        let schemas = storage.fetch_all_schemas().await?;
287                        let rows = schemas.into_iter().flat_map(move |schema| {
288                            let columns = Arc::clone(&columns);
289                            let table_name = schema.table_name;
290
291                            schema
292                                .column_defs
293                                .unwrap_or_default()
294                                .into_iter()
295                                .enumerate()
296                                .map(move |(index, column_def)| {
297                                    let values = vec![
298                                        Value::Str(table_name.clone()),
299                                        Value::Str(column_def.name),
300                                        Value::I64(index as i64 + 1),
301                                        Value::Bool(column_def.nullable),
302                                        column_def.unique.map_or(Value::Null, |unique| {
303                                            Value::Str(unique.to_sql())
304                                        }),
305                                        column_def
306                                            .default
307                                            .map_or(Value::Null, |expr| Value::Str(expr.to_sql())),
308                                        column_def.comment.map_or(Value::Null, Value::Str),
309                                    ];
310
311                                    Ok(Row::Vec {
312                                        columns: Arc::clone(&columns),
313                                        values,
314                                    })
315                                })
316                        });
317
318                        Rows::TableColumns(stream::iter(rows))
319                    }
320                    Dictionary::GlueIndexes => {
321                        let schemas = storage.fetch_all_schemas().await?;
322                        let rows = schemas.into_iter().flat_map(move |schema| {
323                            let column_defs = schema.column_defs.unwrap_or_default();
324                            let primary_column = column_defs.iter().find_map(|column_def| {
325                                let ColumnDef { name, unique, .. } = column_def;
326
327                                (unique == &Some(ColumnUniqueOption { is_primary: true }))
328                                    .then_some(name)
329                            });
330
331                            let clustered = match primary_column {
332                                Some(column_name) => {
333                                    let values = vec![
334                                        Value::Str(schema.table_name.clone()),
335                                        Value::Str("PRIMARY".to_owned()),
336                                        Value::Str("BOTH".to_owned()),
337                                        Value::Str(column_name.to_owned()),
338                                        Value::Bool(true),
339                                    ];
340
341                                    let row = Row::Vec {
342                                        columns: Arc::clone(&columns),
343                                        values,
344                                    };
345
346                                    vec![Ok(row)]
347                                }
348                                None => Vec::new(),
349                            };
350
351                            let columns = Arc::clone(&columns);
352                            let non_clustered = schema.indexes.into_iter().map(move |index| {
353                                let values = vec![
354                                    Value::Str(schema.table_name.clone()),
355                                    Value::Str(index.name),
356                                    Value::Str(index.order.to_string()),
357                                    Value::Str(index.expr.to_sql_unquoted()),
358                                    Value::Bool(false),
359                                ];
360
361                                Ok(Row::Vec {
362                                    columns: Arc::clone(&columns),
363                                    values,
364                                })
365                            });
366
367                            clustered.into_iter().chain(non_clustered)
368                        });
369
370                        Rows::Indexes(stream::iter(rows))
371                    }
372                }
373            };
374
375            Ok(Rows::Dictionary(rows))
376        }
377    }
378}
379
380pub async fn fetch_columns<T: GStore>(
381    storage: &T,
382    table_name: &str,
383) -> Result<Option<Vec<String>>> {
384    let columns = storage
385        .fetch_schema(table_name)
386        .await?
387        .ok_or_else(|| FetchError::TableNotFound(table_name.to_owned()))?
388        .column_defs
389        .map(|column_defs| {
390            column_defs
391                .into_iter()
392                .map(|column_def| column_def.name)
393                .collect()
394        });
395
396    Ok(columns)
397}
398
399#[async_recursion]
400pub async fn fetch_relation_columns<T>(
401    storage: &T,
402    table_factor: &TableFactor,
403) -> Result<Option<Vec<String>>>
404where
405    T: GStore,
406{
407    match table_factor {
408        TableFactor::Table { name, alias, .. } => {
409            let columns = fetch_columns(storage, name).await?;
410            match (columns, alias) {
411                (columns, None) => Ok(columns),
412                (None, Some(_)) => Ok(None),
413                (Some(columns), Some(alias)) if alias.columns.len() > columns.len() => {
414                    Err(FetchError::TooManyColumnAliases(
415                        name.to_string(),
416                        columns.len(),
417                        alias.columns.len(),
418                    )
419                    .into())
420                }
421                (Some(columns), Some(alias)) => Ok(Some(
422                    alias
423                        .columns
424                        .iter()
425                        .cloned()
426                        .chain(columns[alias.columns.len()..columns.len()].to_vec())
427                        .collect(),
428                )),
429            }
430        }
431        TableFactor::Series { .. } => Ok(Some(vec!["N".to_owned()])),
432        TableFactor::Dictionary { dict, .. } => Ok(Some(match dict {
433            Dictionary::GlueObjects => vec![
434                "OBJECT_NAME".to_owned(),
435                "OBJECT_TYPE".to_owned(),
436                "CREATED".to_owned(),
437            ],
438            Dictionary::GlueTables => vec!["TABLE_NAME".to_owned(), "COMMENT".to_owned()],
439            Dictionary::GlueTableColumns => vec![
440                "TABLE_NAME".to_owned(),
441                "COLUMN_NAME".to_owned(),
442                "COLUMN_ID".to_owned(),
443                "NULLABLE".to_owned(),
444                "KEY".to_owned(),
445                "DEFAULT".to_owned(),
446                "COMMENT".to_owned(),
447            ],
448            Dictionary::GlueIndexes => vec![
449                "TABLE_NAME".to_owned(),
450                "INDEX_NAME".to_owned(),
451                "ORDER".to_owned(),
452                "EXPRESSION".to_owned(),
453                "UNIQUENESS".to_owned(),
454            ],
455        })),
456        TableFactor::Derived {
457            subquery: Query { body, .. },
458            alias:
459                TableAlias {
460                    columns: alias_columns,
461                    name,
462                },
463        } => match body {
464            SetExpr::Select(statement) => {
465                let Select {
466                    from:
467                        TableWithJoins {
468                            relation, joins, ..
469                        },
470                    projection,
471                    ..
472                } = statement.as_ref();
473
474                let labels = fetch_labels(storage, relation, joins, projection).await?;
475                match labels {
476                    None => Ok(None),
477                    Some(labels) if alias_columns.is_empty() => Ok(Some(labels)),
478                    Some(labels) if alias_columns.len() > labels.len() => {
479                        Err(FetchError::TooManyColumnAliases(
480                            name.to_string(),
481                            labels.len(),
482                            alias_columns.len(),
483                        )
484                        .into())
485                    }
486                    Some(labels) => Ok(Some(
487                        alias_columns
488                            .iter()
489                            .cloned()
490                            .chain(labels[alias_columns.len()..labels.len()].to_vec())
491                            .collect(),
492                    )),
493                }
494            }
495            SetExpr::Values(Values(values_list)) => {
496                let total_len = values_list[0].len();
497                let alias_len = alias_columns.len();
498                if alias_len > total_len {
499                    return Err(FetchError::TooManyColumnAliases(
500                        name.into(),
501                        total_len,
502                        alias_len,
503                    )
504                    .into());
505                }
506                let labels = (alias_len + 1..=total_len).map(|i| format!("column{i}"));
507                let labels = alias_columns
508                    .iter()
509                    .cloned()
510                    .chain(labels)
511                    .collect::<Vec<_>>();
512
513                Ok(Some(labels))
514            }
515        },
516    }
517}
518
519async fn fetch_join_columns<'a, T: GStore>(
520    storage: &T,
521    joins: &'a [Join],
522) -> Result<Option<Vec<(&'a String, Vec<String>)>>> {
523    let mut all_columns = Vec::with_capacity(joins.len());
524    for join in joins {
525        if let Some(columns) = fetch_relation_columns(storage, &join.relation).await? {
526            let alias = get_alias(&join.relation);
527            all_columns.push((alias, columns));
528        } else {
529            return Ok(None);
530        }
531    }
532    Ok(Some(all_columns))
533}
534
535pub async fn fetch_labels<T: GStore>(
536    storage: &T,
537    relation: &TableFactor,
538    joins: &[Join],
539    projection: &[SelectItem],
540) -> Result<Option<Vec<String>>> {
541    let table_alias = get_alias(relation);
542    let columns = fetch_relation_columns(storage, relation).await?;
543    let join_columns = fetch_join_columns(storage, joins).await?;
544
545    if (columns.is_none() || join_columns.is_none())
546        && projection.iter().any(|item| {
547            matches!(
548                item,
549                SelectItem::Wildcard | SelectItem::QualifiedWildcard(_)
550            )
551        })
552    {
553        return Ok(None);
554    }
555
556    let columns = columns.unwrap_or_default();
557    let join_columns = join_columns.unwrap_or_default();
558
559    projection
560        .iter()
561        .flat_map(|item| match item {
562            SelectItem::Wildcard => {
563                let columns = columns.iter().cloned();
564                let join_columns = join_columns.iter().flat_map(|(_, columns)| columns.clone());
565
566                columns.chain(join_columns).map(Ok).collect()
567            }
568            SelectItem::QualifiedWildcard(target_table_alias) => {
569                if table_alias == target_table_alias {
570                    return columns.iter().cloned().map(Ok).collect();
571                }
572
573                let labels = join_columns
574                    .iter()
575                    .find(|(table_alias, _)| table_alias == &target_table_alias)
576                    .map(|(_, columns)| columns.clone());
577
578                match labels {
579                    Some(columns) => columns.into_iter().map(Ok).collect(),
580                    None => {
581                        vec![Err(FetchError::TableAliasNotFound(
582                            target_table_alias.to_owned(),
583                        )
584                        .into())]
585                    }
586                }
587            }
588            SelectItem::Expr { label, .. } => vec![Ok(label.to_owned())],
589        })
590        .collect::<Result<_>>()
591        .map(Some)
592}