Struct datafusion::execution::context::SessionContext
source · pub struct SessionContext { /* private fields */ }
Expand description
Main interface for executing queries with DataFusion. Maintains the state of the connection between a user and an instance of the DataFusion engine.
Overview
SessionContext
provides the following functionality:
- Create a DataFrame from a CSV or Parquet data source.
- Register a CSV or Parquet data source as a table that can be referenced from a SQL query.
- Register a custom data source that can be referenced from a SQL query.
- Execution a SQL query
The following example demonstrates how to use the context to execute a query against a CSV data source using the DataFrame API:
use datafusion::prelude::*;
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
let results = df.collect();
The following example demonstrates how to execute the same query using SQL:
use datafusion::prelude::*;
let mut ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100").await?;
SessionContext
, SessionState
, and TaskContext
A SessionContext
can be created from a SessionConfig
and
stores the state for a particular query session. A single
SessionContext
can run multiple queries.
SessionState
contains information available during query
planning (creating LogicalPlan
s and ExecutionPlan
s).
TaskContext
contains the state available during query
execution ExecutionPlan::execute
. It contains a subset of the
information inSessionState
and is created from a
SessionContext
or a SessionState
.
Implementations§
source§impl SessionContext
impl SessionContext
sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new SessionContext
using the default SessionConfig
.
sourcepub async fn refresh_catalogs(&self) -> Result<()>
pub async fn refresh_catalogs(&self) -> Result<()>
Finds any ListingSchemaProvider
s and instructs them to reload tables from “disk”
sourcepub fn with_config(config: SessionConfig) -> Self
pub fn with_config(config: SessionConfig) -> Self
Creates a new SessionContext
using the provided
SessionConfig
and a new RuntimeEnv
.
See Self::with_config_rt
for more details on resource
limits.
sourcepub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self
pub fn with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self
Creates a new SessionContext
using the provided
SessionConfig
and a RuntimeEnv
.
Resource Limits
By default, each new SessionContext
creates a new
RuntimeEnv
, and therefore will not enforce memory or disk
limits for queries run on different SessionContext
s.
To enforce resource limits (e.g. to limit the total amount of
memory used) across all DataFusion queries in a process,
all SessionContext
’s should be configured with the
same RuntimeEnv
.
sourcepub fn with_state(state: SessionState) -> Self
pub fn with_state(state: SessionState) -> Self
Creates a new SessionContext
using the provided SessionState
sourcepub fn session_start_time(&self) -> DateTime<Utc>
pub fn session_start_time(&self) -> DateTime<Utc>
Returns the time this SessionContext
was created
sourcepub fn register_batch(
&self,
table_name: &str,
batch: RecordBatch
) -> Result<Option<Arc<dyn TableProvider>>>
pub fn register_batch( &self, table_name: &str, batch: RecordBatch ) -> Result<Option<Arc<dyn TableProvider>>>
Registers the RecordBatch
as the specified table name
sourcepub fn runtime_env(&self) -> Arc<RuntimeEnv>
pub fn runtime_env(&self) -> Arc<RuntimeEnv>
Return the RuntimeEnv used to run queries with this SessionContext
sourcepub fn session_id(&self) -> String
pub fn session_id(&self) -> String
Returns an id that uniquely identifies this SessionContext
.
sourcepub fn table_factory(
&self,
file_type: &str
) -> Option<Arc<dyn TableProviderFactory>>
pub fn table_factory( &self, file_type: &str ) -> Option<Arc<dyn TableProviderFactory>>
Return the TableProviderFactory
that is registered for the
specified file type, if any.
sourcepub fn enable_ident_normalization(&self) -> bool
pub fn enable_ident_normalization(&self) -> bool
Return the enable_ident_normalization
of this Session
sourcepub fn copied_config(&self) -> SessionConfig
pub fn copied_config(&self) -> SessionConfig
Return a copied version of config for this Session
sourcepub async fn sql(&self, sql: &str) -> Result<DataFrame>
pub async fn sql(&self, sql: &str) -> Result<DataFrame>
Creates a DataFrame
that will execute a SQL query.
Note: This API implements DDL statements such as CREATE TABLE
and
CREATE VIEW
and DML statements such as INSERT INTO
with in-memory
default implementations.
If this is not desirable, consider using SessionState::create_logical_plan()
which
does not mutate the state based on such statements.
sourcepub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame>
pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result<DataFrame>
Execute the LogicalPlan
, return a DataFrame
sourcepub fn register_variable(
&self,
variable_type: VarType,
provider: Arc<dyn VarProvider + Send + Sync>
)
pub fn register_variable( &self, variable_type: VarType, provider: Arc<dyn VarProvider + Send + Sync> )
Registers a variable provider within this context.
sourcepub fn register_udf(&self, f: ScalarUDF)
pub fn register_udf(&self, f: ScalarUDF)
Registers a scalar UDF within this context.
Note in SQL queries, function names are looked up using lowercase unless the query uses quotes. For example,
SELECT MY_FUNC(x)...
will look for a function named "my_func"
SELECT "my_FUNC"(x)
will look for a function named "my_FUNC"
sourcepub fn register_udaf(&self, f: AggregateUDF)
pub fn register_udaf(&self, f: AggregateUDF)
Registers an aggregate UDF within this context.
Note in SQL queries, aggregate names are looked up using lowercase unless the query uses quotes. For example,
SELECT MY_UDAF(x)...
will look for an aggregate named "my_udaf"
SELECT "my_UDAF"(x)
will look for an aggregate named "my_UDAF"
sourcepub async fn read_avro<P: DataFilePaths>(
&self,
table_paths: P,
options: AvroReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_avro<P: DataFilePaths>( &self, table_paths: P, options: AvroReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame
for reading an Avro data source.
For more control such as reading multiple files, you can use
read_table
with a ListingTable
.
For an example, see read_csv
sourcepub async fn read_json<P: DataFilePaths>(
&self,
table_paths: P,
options: NdJsonReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_json<P: DataFilePaths>( &self, table_paths: P, options: NdJsonReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame
for reading an JSON data source.
For more control such as reading multiple files, you can use
read_table
with a ListingTable
.
For an example, see read_csv
sourcepub fn read_empty(&self) -> Result<DataFrame>
pub fn read_empty(&self) -> Result<DataFrame>
Creates an empty DataFrame.
sourcepub async fn read_csv<P: DataFilePaths>(
&self,
table_paths: P,
options: CsvReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_csv<P: DataFilePaths>( &self, table_paths: P, options: CsvReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame
for reading a CSV data source.
For more control such as reading multiple files, you can use
read_table
with a ListingTable
.
Example usage is given below:
use datafusion::prelude::*;
let ctx = SessionContext::new();
// You can read a single file using `read_csv`
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// you can also read multiple files:
let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?;
sourcepub async fn read_parquet<P: DataFilePaths>(
&self,
table_paths: P,
options: ParquetReadOptions<'_>
) -> Result<DataFrame>
pub async fn read_parquet<P: DataFilePaths>( &self, table_paths: P, options: ParquetReadOptions<'_> ) -> Result<DataFrame>
Creates a DataFrame
for reading a Parquet data source.
For more control such as reading multiple files, you can use
read_table
with a ListingTable
.
For an example, see read_csv
sourcepub fn read_table(&self, provider: Arc<dyn TableProvider>) -> Result<DataFrame>
pub fn read_table(&self, provider: Arc<dyn TableProvider>) -> Result<DataFrame>
Creates a DataFrame
for a TableProvider
such as a
ListingTable
or a custom user defined provider.
sourcepub fn read_batch(&self, batch: RecordBatch) -> Result<DataFrame>
pub fn read_batch(&self, batch: RecordBatch) -> Result<DataFrame>
Creates a DataFrame
for reading a RecordBatch
sourcepub async fn register_listing_table(
&self,
name: &str,
table_path: impl AsRef<str>,
options: ListingOptions,
provided_schema: Option<SchemaRef>,
sql_definition: Option<String>
) -> Result<()>
pub async fn register_listing_table( &self, name: &str, table_path: impl AsRef<str>, options: ListingOptions, provided_schema: Option<SchemaRef>, sql_definition: Option<String> ) -> Result<()>
Registers a [ListingTable]
that can assemble multiple files
from locations in an ObjectStore
instance into a single
table.
This method is async
because it might need to resolve the schema.
sourcepub async fn register_csv(
&self,
name: &str,
table_path: &str,
options: CsvReadOptions<'_>
) -> Result<()>
pub async fn register_csv( &self, name: &str, table_path: &str, options: CsvReadOptions<'_> ) -> Result<()>
Registers a CSV file as a table which can referenced from SQL statements executed against this context.
sourcepub async fn register_json(
&self,
name: &str,
table_path: &str,
options: NdJsonReadOptions<'_>
) -> Result<()>
pub async fn register_json( &self, name: &str, table_path: &str, options: NdJsonReadOptions<'_> ) -> Result<()>
Registers a JSON file as a table that it can be referenced from SQL statements executed against this context.
sourcepub async fn register_parquet(
&self,
name: &str,
table_path: &str,
options: ParquetReadOptions<'_>
) -> Result<()>
pub async fn register_parquet( &self, name: &str, table_path: &str, options: ParquetReadOptions<'_> ) -> Result<()>
Registers a Parquet file as a table that can be referenced from SQL statements executed against this context.
sourcepub async fn register_avro(
&self,
name: &str,
table_path: &str,
options: AvroReadOptions<'_>
) -> Result<()>
pub async fn register_avro( &self, name: &str, table_path: &str, options: AvroReadOptions<'_> ) -> Result<()>
Registers an Avro file as a table that can be referenced from SQL statements executed against this context.
sourcepub fn register_catalog(
&self,
name: impl Into<String>,
catalog: Arc<dyn CatalogProvider>
) -> Option<Arc<dyn CatalogProvider>>
pub fn register_catalog( &self, name: impl Into<String>, catalog: Arc<dyn CatalogProvider> ) -> Option<Arc<dyn CatalogProvider>>
Registers a named catalog using a custom CatalogProvider
so that
it can be referenced from SQL statements executed against this
context.
Returns the CatalogProvider
previously registered for this
name, if any
sourcepub fn catalog_names(&self) -> Vec<String>
pub fn catalog_names(&self) -> Vec<String>
Retrieves the list of available catalog names.
sourcepub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>
pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>
Retrieves a CatalogProvider
instance by name
sourcepub fn register_table<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>,
provider: Arc<dyn TableProvider>
) -> Result<Option<Arc<dyn TableProvider>>>
pub fn register_table<'a>( &'a self, table_ref: impl Into<TableReference<'a>>, provider: Arc<dyn TableProvider> ) -> Result<Option<Arc<dyn TableProvider>>>
Registers a TableProvider
as a table that can be
referenced from SQL statements executed against this context.
Returns the TableProvider
previously registered for this
reference, if any
sourcepub fn deregister_table<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>
) -> Result<Option<Arc<dyn TableProvider>>>
pub fn deregister_table<'a>( &'a self, table_ref: impl Into<TableReference<'a>> ) -> Result<Option<Arc<dyn TableProvider>>>
Deregisters the given table.
Returns the registered provider, if any
sourcepub fn table_exist<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>
) -> Result<bool>
pub fn table_exist<'a>( &'a self, table_ref: impl Into<TableReference<'a>> ) -> Result<bool>
Return true
if the specified table exists in the schema provider.
sourcepub async fn table<'a>(
&self,
table_ref: impl Into<TableReference<'a>>
) -> Result<DataFrame>
pub async fn table<'a>( &self, table_ref: impl Into<TableReference<'a>> ) -> Result<DataFrame>
Retrieves a DataFrame
representing a table previously
registered by calling the register_table
function.
Returns an error if no table has been registered with the provided reference.
sourcepub async fn table_provider<'a>(
&self,
table_ref: impl Into<TableReference<'a>>
) -> Result<Arc<dyn TableProvider>>
pub async fn table_provider<'a>( &self, table_ref: impl Into<TableReference<'a>> ) -> Result<Arc<dyn TableProvider>>
Return a TableProvider
for the specified table.
sourcepub fn tables(&self) -> Result<HashSet<String>>
👎Deprecated since 23.0.0: Please use the catalog provider interface (SessionContext::catalog
) to examine available catalogs, schemas, and tables
pub fn tables(&self) -> Result<HashSet<String>>
SessionContext::catalog
) to examine available catalogs, schemas, and tablesReturns the set of available tables in the default catalog and schema.
Use table
to get a specific table.
sourcepub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan>
👎Deprecated since 23.0.0: Use SessionState::optimize to ensure a consistent state for planning and execution
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan>
Optimizes the logical plan by applying optimizer rules.
sourcepub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan
) -> Result<Arc<dyn ExecutionPlan>>
👎Deprecated since 23.0.0: Use SessionState::create_physical_plan or DataFrame::create_physical_plan to ensure a consistent state for planning and execution
pub async fn create_physical_plan( &self, logical_plan: &LogicalPlan ) -> Result<Arc<dyn ExecutionPlan>>
Creates a physical plan from a logical plan.
sourcepub async fn write_csv(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>
) -> Result<()>
pub async fn write_csv( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str> ) -> Result<()>
Executes a query and writes the results to a partitioned CSV file.
sourcepub async fn write_json(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>
) -> Result<()>
pub async fn write_json( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str> ) -> Result<()>
Executes a query and writes the results to a partitioned JSON file.
sourcepub async fn write_parquet(
&self,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
writer_properties: Option<WriterProperties>
) -> Result<()>
pub async fn write_parquet( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str>, writer_properties: Option<WriterProperties> ) -> Result<()>
Executes a query and writes the results to a partitioned Parquet file.
sourcepub fn task_ctx(&self) -> Arc<TaskContext>
pub fn task_ctx(&self) -> Arc<TaskContext>
Get a new TaskContext to run in this session
sourcepub fn state(&self) -> SessionState
pub fn state(&self) -> SessionState
Snapshots the SessionState
of this SessionContext
setting the
query_execution_start_time
to the current time
sourcepub fn state_weak_ref(&self) -> Weak<RwLock<SessionState>>
pub fn state_weak_ref(&self) -> Weak<RwLock<SessionState>>
Get weak reference to SessionState
sourcepub fn register_catalog_list(&mut self, catalog_list: Arc<dyn CatalogList>)
pub fn register_catalog_list(&mut self, catalog_list: Arc<dyn CatalogList>)
Register CatalogList
in SessionState
Trait Implementations§
source§impl Clone for SessionContext
impl Clone for SessionContext
source§fn clone(&self) -> SessionContext
fn clone(&self) -> SessionContext
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl Default for SessionContext
impl Default for SessionContext
source§impl From<&SessionContext> for TaskContext
impl From<&SessionContext> for TaskContext
Create a new task context instance from SessionContext