datafusion_iceberg_sql/
schema.rs

1use std::sync::Arc;
2
3use arrow_schema::{FieldRef, Schema};
4use datafusion_common::DataFusionError;
5use datafusion_sql::{
6    planner::SqlToRel,
7    sqlparser::{dialect::GenericDialect, parser::Parser},
8};
9use iceberg_rust::{catalog::CatalogList, spec::types::StructType};
10
11use crate::context::IcebergContext;
12
13pub async fn get_schema(
14    sql: &str,
15    relations: &[(String, String, String)],
16    catalogs: Arc<dyn CatalogList>,
17    branch: Option<&str>,
18) -> Result<StructType, DataFusionError> {
19    let context = IcebergContext::new(relations, catalogs, branch).await?;
20    let statement = Parser::parse_sql(&GenericDialect, sql)?
21        .pop()
22        .ok_or(DataFusionError::Internal("sql statement".to_string()))?;
23
24    let planner = SqlToRel::new(&context);
25
26    let logical_plan = planner.sql_statement_to_plan(statement)?;
27    let fields: Vec<FieldRef> = logical_plan.schema().fields().iter().cloned().collect();
28    let struct_type = StructType::try_from(&Schema::new(fields))
29        .map_err(|err| DataFusionError::External(Box::new(err)))?;
30    Ok(struct_type)
31}