Struct deltalake::datafusion::prelude::SessionContext

source ·
pub struct SessionContext { /* private fields */ }
Expand description

Main interface for executing queries with DataFusion. Maintains the state of the connection between a user and an instance of the DataFusion engine.

§Overview

SessionContext provides the following functionality:

  • Create a DataFrame from a CSV or Parquet data source.
  • Register a CSV or Parquet data source as a table that can be referenced from a SQL query.
  • Register a custom data source that can be referenced from a SQL query.
  • Execution a SQL query

§Example: DataFrame API

The following example demonstrates how to use the context to execute a query against a CSV data source using the DataFrame API:

use datafusion::prelude::*;
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
           .aggregate(vec![col("a")], vec![min(col("b"))])?
           .limit(0, Some(100))?;
let results = df
  .collect()
  .await?;
assert_batches_eq!(
 &[
   "+---+----------------+",
   "| a | MIN(?table?.b) |",
   "+---+----------------+",
   "| 1 | 2              |",
   "+---+----------------+",
 ],
 &results
);

§Example: SQL API

The following example demonstrates how to execute the same query using SQL:

use datafusion::prelude::*;
let mut ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
let results = ctx
  .sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")
  .await?
  .collect()
  .await?;
assert_batches_eq!(
 &[
   "+---+----------------+",
   "| a | MIN(example.b) |",
   "+---+----------------+",
   "| 1 | 2              |",
   "+---+----------------+",
 ],
 &results
);

§SessionContext, SessionState, and TaskContext

A SessionContext can be created from a SessionConfig and stores the state for a particular query session. A single SessionContext can run multiple queries.

SessionState contains information available during query planning (creating LogicalPlans and ExecutionPlans).

TaskContext contains the state available during query execution ExecutionPlan::execute. It contains a subset of the information inSessionState and is created from a SessionContext or a SessionState.

Implementations§

source§

impl SessionContext

source

pub async fn read_avro<P>( &self, table_paths: P, options: AvroReadOptions<'_> ) -> Result<DataFrame, DataFusionError>
where P: DataFilePaths,

Creates a DataFrame for reading an Avro data source.

For more control such as reading multiple files, you can use read_table with a super::ListingTable.

For an example, see read_csv

source

pub async fn register_avro( &self, name: &str, table_path: &str, options: AvroReadOptions<'_> ) -> Result<(), DataFusionError>

Registers an Avro file as a table that can be referenced from SQL statements executed against this context.

source§

impl SessionContext

source

pub async fn read_csv<P>( &self, table_paths: P, options: CsvReadOptions<'_> ) -> Result<DataFrame, DataFusionError>
where P: DataFilePaths,

Creates a DataFrame for reading a CSV data source.

For more control such as reading multiple files, you can use read_table with a super::ListingTable.

Example usage is given below:

use datafusion::prelude::*;
let ctx = SessionContext::new();
// You can read a single file using `read_csv`
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// you can also read multiple files:
let df = ctx.read_csv(vec!["tests/data/example.csv", "tests/data/example.csv"], CsvReadOptions::new()).await?;
source

pub async fn register_csv( &self, name: &str, table_path: &str, options: CsvReadOptions<'_> ) -> Result<(), DataFusionError>

Registers a CSV file as a table which can referenced from SQL statements executed against this context.

source

pub async fn write_csv( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str> ) -> Result<(), DataFusionError>

Executes a query and writes the results to a partitioned CSV file.

source§

impl SessionContext

source

pub async fn read_json<P>( &self, table_paths: P, options: NdJsonReadOptions<'_> ) -> Result<DataFrame, DataFusionError>
where P: DataFilePaths,

Creates a DataFrame for reading an JSON data source.

For more control such as reading multiple files, you can use read_table with a super::ListingTable.

For an example, see read_csv

source

pub async fn register_json( &self, name: &str, table_path: &str, options: NdJsonReadOptions<'_> ) -> Result<(), DataFusionError>

Registers a JSON file as a table that it can be referenced from SQL statements executed against this context.

source

pub async fn write_json( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str> ) -> Result<(), DataFusionError>

Executes a query and writes the results to a partitioned JSON file.

source§

impl SessionContext

source

pub async fn read_parquet<P>( &self, table_paths: P, options: ParquetReadOptions<'_> ) -> Result<DataFrame, DataFusionError>
where P: DataFilePaths,

Creates a DataFrame for reading a Parquet data source.

For more control such as reading multiple files, you can use read_table with a super::ListingTable.

For an example, see read_csv

source

pub async fn register_parquet( &self, name: &str, table_path: &str, options: ParquetReadOptions<'_> ) -> Result<(), DataFusionError>

Registers a Parquet file as a table that can be referenced from SQL statements executed against this context.

source

pub async fn write_parquet( &self, plan: Arc<dyn ExecutionPlan>, path: impl AsRef<str>, writer_properties: Option<WriterProperties> ) -> Result<(), DataFusionError>

Executes a query and writes the results to a partitioned Parquet file.

source§

impl SessionContext

source

pub fn new() -> SessionContext

Creates a new SessionContext using the default SessionConfig.

source

pub async fn refresh_catalogs(&self) -> Result<(), DataFusionError>

Finds any ListingSchemaProviders and instructs them to reload tables from “disk”

source

pub fn new_with_config(config: SessionConfig) -> SessionContext

Creates a new SessionContext using the provided SessionConfig and a new RuntimeEnv.

See Self::new_with_config_rt for more details on resource limits.

source

pub fn with_config(config: SessionConfig) -> SessionContext

👎Deprecated since 32.0.0: Use SessionContext::new_with_config

Creates a new SessionContext using the provided SessionConfig and a new RuntimeEnv.

source

pub fn new_with_config_rt( config: SessionConfig, runtime: Arc<RuntimeEnv> ) -> SessionContext

Creates a new SessionContext using the provided SessionConfig and a RuntimeEnv.

§Resource Limits

By default, each new SessionContext creates a new RuntimeEnv, and therefore will not enforce memory or disk limits for queries run on different SessionContexts.

To enforce resource limits (e.g. to limit the total amount of memory used) across all DataFusion queries in a process, all SessionContext’s should be configured with the same RuntimeEnv.

source

pub fn with_config_rt( config: SessionConfig, runtime: Arc<RuntimeEnv> ) -> SessionContext

👎Deprecated since 32.0.0: Use SessionState::new_with_config_rt

Creates a new SessionContext using the provided SessionConfig and a RuntimeEnv.

source

pub fn new_with_state(state: SessionState) -> SessionContext

Creates a new SessionContext using the provided SessionState

source

pub fn with_state(state: SessionState) -> SessionContext

👎Deprecated since 32.0.0: Use SessionState::new_with_state

Creates a new SessionContext using the provided SessionState

source

pub fn session_start_time(&self) -> DateTime<Utc>

Returns the time this SessionContext was created

source

pub fn with_function_factory( self, function_factory: Arc<dyn FunctionFactory> ) -> SessionContext

Registers a FunctionFactory to handle CREATE FUNCTION statements

source

pub fn register_batch( &self, table_name: &str, batch: RecordBatch ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError>

Registers the RecordBatch as the specified table name

source

pub fn runtime_env(&self) -> Arc<RuntimeEnv>

Return the RuntimeEnv used to run queries with this SessionContext

source

pub fn session_id(&self) -> String

Returns an id that uniquely identifies this SessionContext.

source

pub fn table_factory( &self, file_type: &str ) -> Option<Arc<dyn TableProviderFactory>>

Return the TableProviderFactory that is registered for the specified file type, if any.

source

pub fn enable_ident_normalization(&self) -> bool

Return the enable_ident_normalization of this Session

source

pub fn copied_config(&self) -> SessionConfig

Return a copied version of config for this Session

source

pub fn copied_table_options(&self) -> TableOptions

Return a copied version of table options for this Session

source

pub async fn sql(&self, sql: &str) -> Result<DataFrame, DataFusionError>

Creates a DataFrame from SQL query text.

Note: This API implements DDL statements such as CREATE TABLE and CREATE VIEW and DML statements such as INSERT INTO with in-memory default implementations. See Self::sql_with_options.

§Example: Running SQL queries

See the example on Self

§Example: Creating a Table with SQL
use datafusion::prelude::*;
let mut ctx = SessionContext::new();
ctx
  .sql("CREATE TABLE foo (x INTEGER)")
  .await?
  .collect()
  .await?;
assert!(ctx.table_exist("foo").unwrap());
source

pub async fn sql_with_options( &self, sql: &str, options: SQLOptions ) -> Result<DataFrame, DataFusionError>

Creates a DataFrame from SQL query text, first validating that the queries are allowed by options

§Example: Preventing Creating a Table with SQL

If you want to avoid creating tables, or modifying data or the session, set SQLOptions appropriately:

use datafusion::prelude::*;
let mut ctx = SessionContext::new();
let options = SQLOptions::new()
  .with_allow_ddl(false);
let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options)
  .await
  .unwrap_err();
assert!(
  err.to_string().starts_with("Error during planning: DDL not supported: CreateMemoryTable")
);
source

pub async fn execute_logical_plan( &self, plan: LogicalPlan ) -> Result<DataFrame, DataFusionError>

Execute the LogicalPlan, return a DataFrame. This API is not featured limited (so all SQL such as CREATE TABLE and COPY will be run).

If you wish to limit the type of plan that can be run from SQL, see Self::sql_with_options and SQLOptions::verify_plan.

source

pub fn register_variable( &self, variable_type: VarType, provider: Arc<dyn VarProvider + Sync + Send> )

Registers a variable provider within this context.

source

pub fn register_udtf(&self, name: &str, fun: Arc<dyn TableFunctionImpl>)

Register a table UDF with this context

source

pub fn register_udf(&self, f: ScalarUDF)

Registers a scalar UDF within this context.

Note in SQL queries, function names are looked up using lowercase unless the query uses quotes. For example,

  • SELECT MY_FUNC(x)... will look for a function named "my_func"
  • SELECT "my_FUNC"(x) will look for a function named "my_FUNC" Any functions registered with the udf name or its aliases will be overwritten with this new function
source

pub fn register_udaf(&self, f: AggregateUDF)

Registers an aggregate UDF within this context.

Note in SQL queries, aggregate names are looked up using lowercase unless the query uses quotes. For example,

  • SELECT MY_UDAF(x)... will look for an aggregate named "my_udaf"
  • SELECT "my_UDAF"(x) will look for an aggregate named "my_UDAF"
source

pub fn register_udwf(&self, f: WindowUDF)

Registers a window UDF within this context.

Note in SQL queries, window function names are looked up using lowercase unless the query uses quotes. For example,

  • SELECT MY_UDWF(x)... will look for a window function named "my_udwf"
  • SELECT "my_UDWF"(x) will look for a window function named "my_UDWF"
source

pub fn deregister_udf(&self, name: &str)

Deregisters a UDF within this context.

source

pub fn deregister_udaf(&self, name: &str)

Deregisters a UDAF within this context.

source

pub fn deregister_udwf(&self, name: &str)

Deregisters a UDWF within this context.

source

pub async fn read_arrow<P>( &self, table_paths: P, options: ArrowReadOptions<'_> ) -> Result<DataFrame, DataFusionError>
where P: DataFilePaths,

Creates a DataFrame for reading an Arrow data source.

For more control such as reading multiple files, you can use read_table with a ListingTable.

For an example, see read_csv

source

pub fn read_empty(&self) -> Result<DataFrame, DataFusionError>

Creates an empty DataFrame.

source

pub fn read_table( &self, provider: Arc<dyn TableProvider> ) -> Result<DataFrame, DataFusionError>

Creates a DataFrame for a TableProvider such as a ListingTable or a custom user defined provider.

source

pub fn read_batch( &self, batch: RecordBatch ) -> Result<DataFrame, DataFusionError>

Creates a DataFrame for reading a RecordBatch

source

pub fn read_batches( &self, batches: impl IntoIterator<Item = RecordBatch> ) -> Result<DataFrame, DataFusionError>

Create a DataFrame for reading a [Vec[RecordBatch]]

source

pub async fn register_listing_table( &self, name: &str, table_path: impl AsRef<str>, options: ListingOptions, provided_schema: Option<Arc<Schema>>, sql_definition: Option<String> ) -> Result<(), DataFusionError>

Registers a ListingTable that can assemble multiple files from locations in an ObjectStore instance into a single table.

This method is async because it might need to resolve the schema.

source

pub async fn register_arrow( &self, name: &str, table_path: &str, options: ArrowReadOptions<'_> ) -> Result<(), DataFusionError>

Registers an Arrow file as a table that can be referenced from SQL statements executed against this context.

source

pub fn register_catalog( &self, name: impl Into<String>, catalog: Arc<dyn CatalogProvider> ) -> Option<Arc<dyn CatalogProvider>>

Registers a named catalog using a custom CatalogProvider so that it can be referenced from SQL statements executed against this context.

Returns the CatalogProvider previously registered for this name, if any

source

pub fn catalog_names(&self) -> Vec<String>

Retrieves the list of available catalog names.

source

pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>

Retrieves a CatalogProvider instance by name

source

pub fn register_table<'a>( &'a self, table_ref: impl Into<TableReference<'a>>, provider: Arc<dyn TableProvider> ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError>

Registers a TableProvider as a table that can be referenced from SQL statements executed against this context.

Returns the TableProvider previously registered for this reference, if any

source

pub fn deregister_table<'a>( &'a self, table_ref: impl Into<TableReference<'a>> ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError>

Deregisters the given table.

Returns the registered provider, if any

source

pub fn table_exist<'a>( &'a self, table_ref: impl Into<TableReference<'a>> ) -> Result<bool, DataFusionError>

Return true if the specified table exists in the schema provider.

source

pub async fn table<'a>( &self, table_ref: impl Into<TableReference<'a>> ) -> Result<DataFrame, DataFusionError>

Retrieves a DataFrame representing a table previously registered by calling the register_table function.

Returns an error if no table has been registered with the provided reference.

source

pub async fn table_provider<'a>( &self, table_ref: impl Into<TableReference<'a>> ) -> Result<Arc<dyn TableProvider>, DataFusionError>

Return a TableProvider for the specified table.

source

pub fn task_ctx(&self) -> Arc<TaskContext>

Get a new TaskContext to run in this session

source

pub fn state(&self) -> SessionState

Snapshots the SessionState of this SessionContext setting the query_execution_start_time to the current time

source

pub fn state_weak_ref(&self) -> Weak<RwLock<RawRwLock, SessionState>>

Get weak reference to SessionState

source

pub fn register_catalog_list( &mut self, catalog_list: Arc<dyn CatalogProviderList> )

source

pub fn register_table_options_extension<T>(&self, extension: T)
where T: ConfigExtension,

Registers a ConfigExtension as a table option extention that can be referenced from SQL statements executed against this context.

Trait Implementations§

source§

impl Clone for SessionContext

source§

fn clone(&self) -> SessionContext

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Default for SessionContext

source§

fn default() -> SessionContext

Returns the “default value” for a type. Read more
source§

impl From<&SessionContext> for TaskContext

Create a new task context instance from SessionContext

source§

fn from(session: &SessionContext) -> TaskContext

Converts to this type from the input type.
source§

impl From<DeltaSessionContext> for SessionContext

source§

fn from(value: DeltaSessionContext) -> SessionContext

Converts to this type from the input type.
source§

impl FunctionRegistry for SessionContext

source§

fn udfs(&self) -> HashSet<String>

Set of all available udfs.
source§

fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>, DataFusionError>

Returns a reference to the user defined scalar function (udf) named name.
source§

fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>, DataFusionError>

Returns a reference to the user defined aggregate function (udaf) named name.
source§

fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>, DataFusionError>

Returns a reference to the user defined window function (udwf) named name.
source§

fn register_udf( &mut self, udf: Arc<ScalarUDF> ) -> Result<Option<Arc<ScalarUDF>>, DataFusionError>

Registers a new ScalarUDF, returning any previously registered implementation. Read more
source§

fn register_udaf( &mut self, udaf: Arc<AggregateUDF> ) -> Result<Option<Arc<AggregateUDF>>, DataFusionError>

Registers a new AggregateUDF, returning any previously registered implementation. Read more
source§

fn register_udwf( &mut self, udwf: Arc<WindowUDF> ) -> Result<Option<Arc<WindowUDF>>, DataFusionError>

Registers a new WindowUDF, returning any previously registered implementation. Read more
source§

fn register_function_rewrite( &mut self, rewrite: Arc<dyn FunctionRewrite + Sync + Send> ) -> Result<(), DataFusionError>

Registers a new FunctionRewrite with the registry. Read more
source§

fn deregister_udf( &mut self, _name: &str ) -> Result<Option<Arc<ScalarUDF>>, DataFusionError>

Deregisters a ScalarUDF, returning the implementation that was deregistered. Read more
source§

fn deregister_udaf( &mut self, _name: &str ) -> Result<Option<Arc<AggregateUDF>>, DataFusionError>

Deregisters a AggregateUDF, returning the implementation that was deregistered. Read more
source§

fn deregister_udwf( &mut self, _name: &str ) -> Result<Option<Arc<WindowUDF>>, DataFusionError>

Deregisters a WindowUDF, returning the implementation that was deregistered. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoEither for T

source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

impl<T> Ungil for T
where T: Send,