Skip to main content

azof_datafusion/
context.rs

1use crate::parse::rewrite_and_extract_tables;
2use crate::AzofTableProvider;
3use datafusion::dataframe::DataFrame;
4use datafusion::prelude::SessionContext;
5use object_store::local::LocalFileSystem;
6use object_store::path::Path;
7use object_store::ObjectStore;
8use std::error::Error;
9use std::sync::Arc;
10
11pub struct ExecutionContext {
12    ctx: SessionContext,
13    store_path: Path,
14    store: Arc<dyn ObjectStore>,
15}
16
17impl ExecutionContext {
18    pub fn new(path: String) -> ExecutionContext {
19        ExecutionContext {
20            ctx: SessionContext::new(),
21            store_path: Path::from(path),
22            store: Arc::new(LocalFileSystem::new()),
23        }
24    }
25
26    pub async fn sql(&self, sql: &str) -> Result<DataFrame, Box<dyn Error>> {
27        let mut stmt = self.ctx.state().sql_to_statement(sql, "snowflake")?;
28        let tables = rewrite_and_extract_tables(&mut stmt)?;
29
30        for versioned_table in tables {
31            let table_ref = versioned_table.versioned_name.to_string();
32            let provider = AzofTableProvider::new(
33                self.store_path.clone(),
34                self.store.clone(),
35                versioned_table.name.to_string(),
36                versioned_table.as_of,
37            )
38            .await?;
39
40            if !self.ctx.table_exist(&table_ref)? {
41                self.ctx.register_table(table_ref, Arc::new(provider))?;
42            }
43        }
44
45        let plan = self.ctx.state().statement_to_plan(stmt).await?;
46        Ok(self.ctx.execute_logical_plan(plan).await?)
47    }
48}