Struct datafusion::execution::context::SessionContext
source · pub struct SessionContext { /* private fields */ }Expand description
Main interface for executing queries with DataFusion. Maintains the state of the connection between a user and an instance of the DataFusion engine.
Overview
SessionContext provides the following functionality:
- Create a DataFrame from a CSV or Parquet data source.
- Register a CSV or Parquet data source as a table that can be referenced from a SQL query.
- Register a custom data source that can be referenced from a SQL query.
- Execution a SQL query
Example: DataFrame API
The following example demonstrates how to use the context to execute a query against a CSV data source using the DataFrame API:
use datafusion::prelude::*;
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
let results = df
.collect()
.await?;
assert_batches_eq!(
&[
"+---+----------------+",
"| a | MIN(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+",
],
&results
);Example: SQL API
The following example demonstrates how to execute the same query using SQL:
use datafusion::prelude::*;
let mut ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
let results = ctx
.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")
.await?
.collect()
.await?;
assert_batches_eq!(
&[
"+---+----------------+",
"| a | MIN(example.b) |",
"+---+----------------+",
"| 1 | 2 |",
"+---+----------------+",
],
&results
);SessionContext, SessionState, and TaskContext
A SessionContext can be created from a SessionConfig and
stores the state for a particular query session. A single
SessionContext can run multiple queries.
SessionState contains information available during query
planning (creating LogicalPlans and ExecutionPlans).
TaskContext contains the state available during query
execution ExecutionPlan::execute. It contains a subset of the
information inSessionState and is created from a
SessionContext or a SessionState.
Implementations§
source§impl SessionContext
impl SessionContext
sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new SessionContext using the default SessionConfig.
sourcepub async fn refresh_catalogs(&self) -> Result<()>
pub async fn refresh_catalogs(&self) -> Result<()>
Finds any ListingSchemaProviders and instructs them to reload tables from “disk”
sourcepub fn with_config(config: SessionConfig) -> Self
pub fn with_config(config: SessionConfig) -> Self
Creates a new SessionContext using the provided
SessionConfig and a new RuntimeEnv.
See Self::with_config_rt for more details on resource
limits.
sourcepub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self
pub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self
Creates a new SessionContext using the provided
SessionConfig and a RuntimeEnv.
Resource Limits
By default, each new SessionContext creates a new
RuntimeEnv, and therefore will not enforce memory or disk
limits for queries run on different SessionContexts.
To enforce resource limits (e.g. to limit the total amount of
memory used) across all DataFusion queries in a process,
all SessionContext’s should be configured with the
same RuntimeEnv.
sourcepub fn with_state(state: SessionState) -> Self
pub fn with_state(state: SessionState) -> Self
Creates a new SessionContext using the provided SessionState
sourcepub fn session_start_time(&self) -> DateTime<Utc>
pub fn session_start_time(&self) -> DateTime<Utc>
Returns the time this SessionContext was created
sourcepub fn register_batch(
&self,
table_name: &str,
batch: RecordBatch
) -> Result<Option<Arc<dyn TableProvider>>>
pub fn register_batch( &self, table_name: &str, batch: RecordBatch ) -> Result<Option<Arc<dyn TableProvider>>>
Registers the RecordBatch as the specified table name
sourcepub fn runtime_env(&self) -> Arc<RuntimeEnv>
pub fn runtime_env(&self) -> Arc<RuntimeEnv>
Return the RuntimeEnv used to run queries with this SessionContext
sourcepub fn session_id(&self) -> String
pub fn session_id(&self) -> String
Returns an id that uniquely identifies this SessionContext.
sourcepub fn table_factory(
&self,
file_type: &str
) -> Option<Arc<dyn TableProviderFactory>>
pub fn table_factory( &self, file_type: &str ) -> Option<Arc<dyn TableProviderFactory>>
Return the TableProviderFactory that is registered for the
specified file type, if any.
sourcepub fn enable_ident_normalization(&self) -> bool
pub fn enable_ident_normalization(&self) -> bool
Return the enable_ident_normalization of this Session
sourcepub fn copied_config(&self) -> SessionConfig
pub fn copied_config(&self) -> SessionConfig
Return a copied version of config for this Session
sourcepub async fn sql(&self, sql: &str) -> Result<DataFrame>
pub async fn sql(&self, sql: &str) -> Result<DataFrame>
Creates a DataFrame from SQL query text.
Note: This API implements DDL statements such as CREATE TABLE and
CREATE VIEW and DML statements such as INSERT INTO with in-memory
default implementations. See Self::sql_with_options.
Example: Running SQL queries
See the example on Self
Example: Creating a Table with SQL
use datafusion::prelude::*;
let mut ctx = SessionContext::new();
ctx
.sql("CREATE TABLE foo (x INTEGER)")
.await?
.collect()
.await?;
assert!(ctx.table_exist("foo").unwrap());sourcepub async fn sql_with_options(
&self,
sql: &str,
options: SQLOptions
) -> Result<DataFrame>
pub async fn sql_with_options( &self, sql: &str, options: SQLOptions ) -> Result<DataFrame>
Creates a DataFrame from SQL query text, first validating
that the queries are allowed by options
Example: Preventing Creating a Table with SQL
If you want to avoid creating tables, or modifying data or the
session, set SQLOptions appropriately:
use datafusion::prelude::*;
let mut ctx = SessionContext::new();
let options = SQLOptions::new()
.with_allow_ddl(false);
let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options)
.await
.unwrap_err();
assert!(
err.to_string().starts_with("Error during planning: DDL not supported: CreateMemoryTable")
);sourcepub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame>
pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame>
Execute the LogicalPlan, return a DataFrame. This API
is not featured limited (so all SQL such as CREATE TABLE and
COPY will be run).
If you wish to limit the type of plan that can be run from
SQL, see Self::sql_with_options and
SQLOptions::verify_plan.
sourcepub fn register_variable(
&self,
variable_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>
)
pub fn register_variable( &self, variable_type: VarType, provider: Arc<dyn VarProvider + Send + Sync> )
Registers a variable provider within this context.
sourcepub fn register_udf(&self, f: ScalarUDF)
pub fn register_udf(&self, f: ScalarUDF)
Registers a scalar UDF within this context.
Note in SQL queries, function names are looked up using lowercase unless the query uses quotes. For example,
SELECT MY_FUNC(x)...will look for a function named"my_func"SELECT "my_FUNC"(x)will look for a function named"my_FUNC"
sourcepub fn register_udaf(&self, f: AggregateUDF)
pub fn register_udaf(&self, f: AggregateUDF)
Registers an aggregate UDF within this context.
Note in SQL queries, aggregate names are looked up using lowercase unless the query uses quotes. For example,
SELECT MY_UDAF(x)...will look for an aggregate named"my_udaf"SELECT "my_UDAF"(x)will look for an aggregate named"my_UDAF"
sourcepub fn register_udwf(&self, f: WindowUDF)
pub fn register_udwf(&self, f: WindowUDF)
Registers a window UDF within this context.
Note in SQL queries, window function names are looked up using lowercase unless the query uses quotes. For example,
SELECT MY_UDWF(x)...will look for a window function named"my_udwf"SELECT "my_UDWF"(x)will look for a window function named"my_UDWF"
sourcepub async fn read_avro<P: DataFilePaths>(
&self,
table_paths: P,
options: AvroReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_avro<P: DataFilePaths>( &self, table_paths: P, options: AvroReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame for reading an Avro data source.
For more control such as reading multiple files, you can use
read_table with a ListingTable.
For an example, see read_csv
sourcepub async fn read_json<P: DataFilePaths>(
&self,
table_paths: P,
options: NdJsonReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_json<P: DataFilePaths>( &self, table_paths: P, options: NdJsonReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame for reading an JSON data source.
For more control such as reading multiple files, you can use
read_table with a ListingTable.
For an example, see read_csv
sourcepub async fn read_arrow<P: DataFilePaths>(
&self,
table_paths: P,
options: ArrowReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_arrow<P: DataFilePaths>( &self, table_paths: P, options: ArrowReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame for reading an Arrow data source.
For more control such as reading multiple files, you can use
read_table with a ListingTable.
For an example, see read_csv
sourcepub fn read_empty(&self) -> Result<DataFrame>
pub fn read_empty(&self) -> Result<DataFrame>
Creates an empty DataFrame.
sourcepub async fn read_csv<P: DataFilePaths>(
&self,
table_paths: P,
options: CsvReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_csv<P: DataFilePaths>( &self, table_paths: P, options: CsvReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame for reading a CSV data source.
For more control such as reading multiple files, you can use
read_table with a ListingTable.
Example usage is given below:
use datafusion::prelude::*;
let ctx = SessionContext::new();
// You can read a single file using `read_csv`
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// you can also read multiple files:
let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?;sourcepub async fn read_parquet<P: DataFilePaths>(
&self,
table_paths: P,
options: ParquetReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_parquet<P: DataFilePaths>( &self, table_paths: P, options: ParquetReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame for reading a Parquet data source.
For more control such as reading multiple files, you can use
read_table with a ListingTable.
For an example, see read_csv
sourcepub fn read_table(&self, provider: Arc<dyn TableProvider>) -> Result<DataFrame>
pub fn read_table(&self, provider: Arc<dyn TableProvider>) -> Result<DataFrame>
Creates a DataFrame for a TableProvider such as a
ListingTable or a custom user defined provider.
sourcepub fn read_batch(&self, batch: RecordBatch) -> Result<DataFrame>
pub fn read_batch(&self, batch: RecordBatch) -> Result<DataFrame>
Creates a DataFrame for reading a RecordBatch
sourcepub async fn register_listing_table(
&self,
name: &str,
table_path: impl AsRef<str>,
options: ListingOptions,
provided_schema: Option<SchemaRef>,
sql_definition: Option<String>
) -> Result<()>
pub async fn register_listing_table( &self, name: &str, table_path: impl AsRef<str>, options: ListingOptions, provided_schema: Option<SchemaRef>, sql_definition: Option<String> ) -> Result<()>
Registers a ListingTable that can assemble multiple files
from locations in an ObjectStore instance into a single
table.
This method is async because it might need to resolve the schema.
sourcepub async fn register_csv(
&self,
name: &str,
table_path: &str,
options: CsvReadOptions<'_>
) -> Result<()>
pub async fn register_csv( &self, name: &str, table_path: &str, options: CsvReadOptions<'_> ) -> Result<()>
Registers a CSV file as a table which can referenced from SQL statements executed against this context.
sourcepub async fn register_json(
&self,
name: &str,
table_path: &str,
options: NdJsonReadOptions<'_>
) -> Result<()>
pub async fn register_json( &self, name: &str, table_path: &str, options: NdJsonReadOptions<'_> ) -> Result<()>
Registers a JSON file as a table that it can be referenced from SQL statements executed against this context.
sourcepub async fn register_parquet(
&self,
name: &str,
table_path: &str,
options: ParquetReadOptions<'_>
) -> Result<()>
pub async fn register_parquet( &self, name: &str, table_path: &str, options: ParquetReadOptions<'_> ) -> Result<()>
Registers a Parquet file as a table that can be referenced from SQL statements executed against this context.
sourcepub async fn register_avro(
&self,
name: &str,
table_path: &str,
options: AvroReadOptions<'_>
) -> Result<()>
pub async fn register_avro( &self, name: &str, table_path: &str, options: AvroReadOptions<'_> ) -> Result<()>
Registers an Avro file as a table that can be referenced from SQL statements executed against this context.
sourcepub async fn register_arrow(
&self,
name: &str,
table_path: &str,
options: ArrowReadOptions<'_>
) -> Result<()>
pub async fn register_arrow( &self, name: &str, table_path: &str, options: ArrowReadOptions<'_> ) -> Result<()>
Registers an Arrow file as a table that can be referenced from SQL statements executed against this context.
sourcepub fn register_catalog(
&self,
name: impl Into<String>,
catalog: Arc<dyn CatalogProvider>
) -> Option<Arc<dyn CatalogProvider>>
pub fn register_catalog( &self, name: impl Into<String>, catalog: Arc<dyn CatalogProvider> ) -> Option<Arc<dyn CatalogProvider>>
Registers a named catalog using a custom CatalogProvider so that
it can be referenced from SQL statements executed against this
context.
Returns the CatalogProvider previously registered for this
name, if any
sourcepub fn catalog_names(&self) -> Vec<String>
pub fn catalog_names(&self) -> Vec<String>
Retrieves the list of available catalog names.
sourcepub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>
pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>
Retrieves a CatalogProvider instance by name
sourcepub fn register_table<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>,
provider: Arc<dyn TableProvider>
) -> Result<Option<Arc<dyn TableProvider>>>
pub fn register_table<'a>( &'a self, table_ref: impl Into<TableReference<'a>>, provider: Arc<dyn TableProvider> ) -> Result<Option<Arc<dyn TableProvider>>>
Registers a TableProvider as a table that can be
referenced from SQL statements executed against this context.
Returns the TableProvider previously registered for this
reference, if any
sourcepub fn deregister_table<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>
) -> Result<Option<Arc<dyn TableProvider>>>
pub fn deregister_table<'a>( &'a self, table_ref: impl Into<TableReference<'a>> ) -> Result<Option<Arc<dyn TableProvider>>>
Deregisters the given table.
Returns the registered provider, if any
sourcepub fn table_exist<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>
) -> Result<bool>
pub fn table_exist<'a>( &'a self, table_ref: impl Into<TableReference<'a>> ) -> Result<bool>
Return true if the specified table exists in the schema provider.
sourcepub async fn table<'a>(
&self,
table_ref: impl Into<TableReference<'a>>
) -> Result<DataFrame>
pub async fn table<'a>( &self, table_ref: impl Into<TableReference<'a>> ) -> Result<DataFrame>
Retrieves a DataFrame representing a table previously
registered by calling the register_table function.
Returns an error if no table has been registered with the provided reference.
sourcepub async fn table_provider<'a>(
&self,
table_ref: impl Into<TableReference<'a>>
) -> Result<Arc<dyn TableProvider>>
pub async fn table_provider<'a>( &self, table_ref: impl Into<TableReference<'a>> ) -> Result<Arc<dyn TableProvider>>
Return a TableProvider for the specified table.
sourcepub fn tables(&self) -> Result<HashSet<String>>
👎Deprecated since 23.0.0: Please use the catalog provider interface (SessionContext::catalog) to examine available catalogs, schemas, and tables
pub fn tables(&self) -> Result<HashSet<String>>
SessionContext::catalog) to examine available catalogs, schemas, and tablesReturns the set of available tables in the default catalog and schema.
Use table to get a specific table.
sourcepub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan>
👎Deprecated since 23.0.0: Use SessionState::optimize to ensure a consistent state for planning and execution
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan>
Optimizes the logical plan by applying optimizer rules.
sourcepub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan
) -> Result<Arc<dyn ExecutionPlan>>
👎Deprecated since 23.0.0: Use SessionState::create_physical_plan or DataFrame::create_physical_plan to ensure a consistent state for planning and execution
pub async fn create_physical_plan( &self, logical_plan: &LogicalPlan ) -> Result<Arc<dyn ExecutionPlan>>
Creates a physical plan from a logical plan.
sourcepub async fn write_csv(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>
) -> Result<()>
pub async fn write_csv( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str> ) -> Result<()>
Executes a query and writes the results to a partitioned CSV file.
sourcepub async fn write_json(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>
) -> Result<()>
pub async fn write_json( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str> ) -> Result<()>
Executes a query and writes the results to a partitioned JSON file.
sourcepub async fn write_parquet(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
writer_properties: Option<WriterProperties>
) -> Result<()>
pub async fn write_parquet( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str>, writer_properties: Option<WriterProperties> ) -> Result<()>
Executes a query and writes the results to a partitioned Parquet file.
sourcepub fn task_ctx(&self) -> Arc<TaskContext>
pub fn task_ctx(&self) -> Arc<TaskContext>
Get a new TaskContext to run in this session
sourcepub fn state(&self) -> SessionState
pub fn state(&self) -> SessionState
Snapshots the SessionState of this SessionContext setting the
query_execution_start_time to the current time
sourcepub fn state_weak_ref(&self) -> Weak<RwLock<SessionState>>
pub fn state_weak_ref(&self) -> Weak<RwLock<SessionState>>
Get weak reference to SessionState
sourcepub fn register_catalog_list(&mut self, catalog_list: Arc<dyn CatalogList>)
pub fn register_catalog_list(&mut self, catalog_list: Arc<dyn CatalogList>)
Register CatalogList in SessionState
Trait Implementations§
source§impl Clone for SessionContext
impl Clone for SessionContext
source§fn clone(&self) -> SessionContext
fn clone(&self) -> SessionContext
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moresource§impl Default for SessionContext
impl Default for SessionContext
source§impl From<&SessionContext> for TaskContext
impl From<&SessionContext> for TaskContext
Create a new task context instance from SessionContext