1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
use std::sync::Arc;

use arrow_schema::{FieldRef, Schema};
use datafusion_common::DataFusionError;
use datafusion_sql::{
    planner::SqlToRel,
    sqlparser::{dialect::GenericDialect, parser::Parser},
};
use iceberg_rust::{catalog::CatalogList, spec::types::StructType};

use crate::context::IcebergContext;

pub async fn get_schema(
    sql: &str,
    relations: &[(String, String, String)],
    catalogs: Arc<dyn CatalogList>,
    branch: Option<&str>,
) -> Result<StructType, DataFusionError> {
    let context = IcebergContext::new(relations, catalogs, branch).await?;
    let statement = Parser::parse_sql(&GenericDialect, sql)?
        .pop()
        .ok_or(DataFusionError::Internal("sql statement".to_string()))?;

    let planner = SqlToRel::new(&context);

    let logical_plan = planner.sql_statement_to_plan(statement)?;
    let fields: Vec<FieldRef> = logical_plan
        .schema()
        .fields()
        .iter()
        .map(|field| field.field())
        .cloned()
        .collect();
    let struct_type = StructType::try_from(&Schema::new(fields))
        .map_err(|err| DataFusionError::Internal(err.to_string()))?;
    Ok(struct_type)
}