Skip to main content

datafusion_iceberg_sql/
schema.rs

1use std::sync::Arc;
2
3use arrow_schema::Schema;
4use datafusion_common::DataFusionError;
5use datafusion_sql::{
6    planner::SqlToRel,
7    sqlparser::{dialect::GenericDialect, parser::Parser},
8};
9use iceberg_rust::{
10    catalog::CatalogList,
11    spec::{arrow::schema::new_fields_with_ids, types::StructType},
12};
13
14use crate::context::IcebergContext;
15
16pub async fn get_schema(
17    sql: &str,
18    relations: &[(String, String, String)],
19    catalogs: Arc<dyn CatalogList>,
20    branch: Option<&str>,
21) -> Result<StructType, DataFusionError> {
22    let context = IcebergContext::new(relations, catalogs, branch).await?;
23    let statement = Parser::parse_sql(&GenericDialect, sql)?
24        .pop()
25        .ok_or(DataFusionError::Internal("sql statement".to_string()))?;
26
27    let planner = SqlToRel::new(&context);
28
29    let logical_plan = planner.sql_statement_to_plan(statement)?;
30    let fields = new_fields_with_ids(logical_plan.schema().fields(), &mut 1);
31    let struct_type = StructType::try_from(&Schema::new(fields))
32        .map_err(|err| DataFusionError::External(Box::new(err)))?;
33    Ok(struct_type)
34}