pub struct DataFrame { /* private fields */ }
Expand description
Represents a logical set of rows with the same named columns.
Similar to a Pandas DataFrame or Spark DataFrame, a DataFusion DataFrame represents a 2 dimensional table of rows and columns.
The typical workflow using DataFrames looks like
-
Create a DataFrame via methods on SessionContext, such as
read_csv
andread_parquet
. -
Build a desired calculation by calling methods such as
filter
,select
,aggregate
, andlimit
-
Execute into
RecordBatch
es by callingcollect
A DataFrame
is a wrapper around a LogicalPlan
and the SessionState
required for execution.
DataFrames are “lazy” in the sense that most methods do not actually compute
anything, they just build up a plan. Calling collect
executes the plan
using the same DataFusion planning and execution process used to execute SQL
and other queries.
§Example
let ctx = SessionContext::new();
// Read the data from a csv file
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// create a new dataframe that computes the equivalent of
// `SELECT a, MIN(b) FROM df WHERE a <= b GROUP BY a LIMIT 100;`
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// Perform the actual computation
let results = df.collect();
Implementations§
Source§impl DataFrame
impl DataFrame
Sourcepub async fn write_parquet(
self,
path: &str,
options: DataFrameWriteOptions,
writer_options: Option<TableParquetOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError>
pub async fn write_parquet( self, path: &str, options: DataFrameWriteOptions, writer_options: Option<TableParquetOptions>, ) -> Result<Vec<RecordBatch>, DataFusionError>
Execute the DataFrame
and write the results to Parquet file(s).
§Example
use datafusion::dataframe::DataFrameWriteOptions;
let ctx = SessionContext::new();
// Sort the data by column "b" and write it to a new location
ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
.sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
.write_parquet(
"output.parquet",
DataFrameWriteOptions::new(),
None, // can also specify parquet writing options here
).await?;
Source§impl DataFrame
impl DataFrame
Sourcepub fn new(session_state: SessionState, plan: LogicalPlan) -> Self
pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self
Create a new DataFrame
based on an existing LogicalPlan
This is a low-level method and is not typically used by end users. See
SessionContext::read_csv
and other methods for creating a
DataFrame
from an existing datasource.
Sourcepub fn parse_sql_expr(&self, sql: &str) -> Result<Expr>
pub fn parse_sql_expr(&self, sql: &str) -> Result<Expr>
Creates logical expression from a SQL query text. The expression is created and processed against the current schema.
§Example: Parsing SQL queries
// datafusion will parse number as i64 first.
let sql = "a > 1 and b in (1, 10)";
let expected = col("a").gt(lit(1 as i64))
.and(col("b").in_list(vec![lit(1 as i64), lit(10 as i64)], false));
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let expr = df.parse_sql_expr(sql)?;
assert_eq!(expected, expr);
Sourcepub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>
pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>
Consume the DataFrame and produce a physical plan
Sourcepub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>
pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>
Filter the DataFrame by column. Returns a new DataFrame only containing the specified columns.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select_columns(&["a", "b"])?;
let expected = vec![
"+---+---+",
"| a | b |",
"+---+---+",
"| 1 | 2 |",
"+---+---+"
];
Sourcepub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame>
pub fn select_exprs(self, exprs: &[&str]) -> Result<DataFrame>
Project arbitrary list of expression strings into a new DataFrame
.
Method will parse string expressions into logical plan expressions.
The output DataFrame
has one column for each element in exprs
.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df : DataFrame = df.select_exprs(&["a * b", "c"])?;
Sourcepub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>
pub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>
Project arbitrary expressions (like SQL SELECT expressions) into a new
DataFrame
.
The output DataFrame
has one column for each element in expr_list
.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select(vec![col("a"), col("b") * col("c")])?;
let expected = vec![
"+---+-----------------------+",
"| a | ?table?.b * ?table?.c |",
"+---+-----------------------+",
"| 1 | 6 |",
"+---+-----------------------+"
];
Sourcepub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame>
pub fn drop_columns(self, columns: &[&str]) -> Result<DataFrame>
Returns a new DataFrame containing all columns except the specified columns.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// +----+----+----+
// | a | b | c |
// +----+----+----+
// | 1 | 2 | 3 |
// +----+----+----+
let df = df.drop_columns(&["a"])?;
let expected = vec![
"+---+---+",
"| b | c |",
"+---+---+",
"| 2 | 3 |",
"+---+---+"
];
Sourcepub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame>
pub fn unnest_columns(self, columns: &[&str]) -> Result<DataFrame>
Expand multiple list/struct columns into a set of rows and new columns.
See also: UnnestOptions
documentation for the behavior of unnest
§Example
let ctx = SessionContext::new();
let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
// expand into multiple columns if it's json array, flatten field name if it's nested structure
let df = df.unnest_columns(&["b","c","d"])?;
let expected = vec![
"+---+------+-------+-----+-----+",
"| a | b | c | d.e | d.f |",
"+---+------+-------+-----+-----+",
"| 1 | 2.0 | false | 1 | 2 |",
"| 1 | 1.3 | true | 1 | 2 |",
"| 1 | -6.1 | | 1 | 2 |",
"| 2 | 3.0 | false | | |",
"| 2 | 2.3 | true | | |",
"| 2 | -7.1 | | | |",
"+---+------+-------+-----+-----+"
];
Sourcepub fn unnest_columns_with_options(
self,
columns: &[&str],
options: UnnestOptions,
) -> Result<DataFrame>
pub fn unnest_columns_with_options( self, columns: &[&str], options: UnnestOptions, ) -> Result<DataFrame>
Expand multiple list columns into a set of rows, with
behavior controlled by UnnestOptions
.
Please see the documentation on UnnestOptions
for more
details about the meaning of unnest.
Sourcepub fn filter(self, predicate: Expr) -> Result<DataFrame>
pub fn filter(self, predicate: Expr) -> Result<DataFrame>
Return a DataFrame with only rows for which predicate
evaluates to
true
.
Rows for which predicate
evaluates to false
or null
are filtered out.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?;
// all rows where a <= b are returned
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 1 | 2 | 3 |",
"| 4 | 5 | 6 |",
"| 7 | 8 | 9 |",
"+---+---+---+"
];
Sourcepub fn aggregate(
self,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<DataFrame>
pub fn aggregate( self, group_expr: Vec<Expr>, aggr_expr: Vec<Expr>, ) -> Result<DataFrame>
Return a new DataFrame
that aggregates the rows of the current
DataFrame
, first optionally grouping by the given expressions.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
// The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
let df1 = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
let expected1 = vec![
"+---+----------------+",
"| a | min(?table?.b) |",
"+---+----------------+",
"| 1 | 2 |",
"| 4 | 5 |",
"| 7 | 8 |",
"+---+----------------+"
];
assert_batches_sorted_eq!(expected1, &df1.collect().await?);
// The following use is the equivalent of "SELECT MIN(b)"
let df2 = df.aggregate(vec![], vec![min(col("b"))])?;
let expected2 = vec![
"+----------------+",
"| min(?table?.b) |",
"+----------------+",
"| 2 |",
"+----------------+"
];
Sourcepub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame>
pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame>
Return a new DataFrame that adds the result of evaluating one or more
window functions (Expr::WindowFunction
) to the existing columns
Sourcepub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>
pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>
Returns a new DataFrame
with a limited number of rows.
§Arguments
skip
- Number of rows to skip before fetch any row
fetch
- Maximum number of rows to return, after skipping skip
rows.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df = df.limit(1, Some(2))?;
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 4 | 5 | 6 |",
"| 7 | 8 | 9 |",
"+---+---+---+"
];
Sourcepub fn union(self, dataframe: DataFrame) -> Result<DataFrame>
pub fn union(self, dataframe: DataFrame) -> Result<DataFrame>
Calculate the union of two DataFrame
s, preserving duplicate rows.
The two DataFrame
s must have exactly the same schema
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await? ;
let d2 = df.clone();
let df = df.union(d2)?;
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 1 | 2 | 3 |",
"| 1 | 2 | 3 |",
"+---+---+---+"
];
Sourcepub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>
pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>
Calculate the distinct union of two DataFrame
s.
The two DataFrame
s must have exactly the same schema. Any duplicate
rows are discarded.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.union_distinct(d2)?;
// df2 are duplicate of df
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 1 | 2 | 3 |",
"+---+---+---+"
];
Sourcepub fn distinct(self) -> Result<DataFrame>
pub fn distinct(self) -> Result<DataFrame>
Return a new DataFrame
with all duplicated rows removed.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.distinct()?;
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 1 | 2 | 3 |",
"+---+---+---+"
];
Sourcepub fn distinct_on(
self,
on_expr: Vec<Expr>,
select_expr: Vec<Expr>,
sort_expr: Option<Vec<SortExpr>>,
) -> Result<DataFrame>
pub fn distinct_on( self, on_expr: Vec<Expr>, select_expr: Vec<Expr>, sort_expr: Option<Vec<SortExpr>>, ) -> Result<DataFrame>
Return a new DataFrame
with duplicated rows removed as per the specified expression list
according to the provided sorting expressions grouped by the DISTINCT ON
clause
expressions.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
// Return a single row (a, b) for each distinct value of a
.distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
let expected = vec![
"+---+---+",
"| a | b |",
"+---+---+",
"| 1 | 2 |",
"+---+---+"
];
Sourcepub async fn describe(self) -> Result<Self>
pub async fn describe(self) -> Result<Self>
Return a new DataFrame
that has statistics for a DataFrame.
Only summarizes numeric datatypes at the moment and returns nulls for non numeric datatypes. The output format is modeled after pandas
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
let stat = df.describe().await?;
let expected = vec![
"+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
"| describe | c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment |",
"+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+",
"| count | 9.0 | 9 | 9 | 9.0 | 9 | 9.0 | 9 | 9 |",
"| max | 10.0 | Customer#000000010 | xKiAFTjUsCuxfeleNqefumTrjS | 20.0 | 30-114-968-4951 | 9561.95 | MACHINERY | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious |",
"| mean | 6.0 | null | null | 9.88888888888889 | null | 5153.2155555555555 | null | null |",
"| median | 6.0 | null | null | 8.0 | null | 6819.74 | null | null |",
"| min | 2.0 | Customer#000000002 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 1.0 | 11-719-748-3364 | 121.65 | AUTOMOBILE | deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov |",
"| null_count | 0.0 | 0 | 0 | 0.0 | 0 | 0.0 | 0 | 0 |",
"| std | 2.7386127875258306 | null | null | 7.2188026092359046 | null | 3522.169804254585 | null | null |",
"+------------+--------------------+--------------------+------------------------------------+--------------------+-----------------+--------------------+--------------+----------------------------------------------------------------------------------------------------------+"];
assert_batches_sorted_eq!(expected, &stat.collect().await?);
Sourcepub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame>
pub fn sort_by(self, expr: Vec<Expr>) -> Result<DataFrame>
Apply a sort by provided expressions with default direction
Sourcepub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame>
pub fn sort(self, expr: Vec<SortExpr>) -> Result<DataFrame>
Sort the DataFrame by the specified sorting expressions.
Note that any expression can be turned into a sort expression by calling its sort method.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df = df.sort(vec![
col("a").sort(false, true), // a DESC, nulls first
col("b").sort(true, false), // b ASC, nulls last
])?;
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 1 | 2 | 3 |",
"| 4 | 5 | 6 |",
"| 7 | 8 | 9 |",
"+---+---+---+",
];
Sourcepub fn join(
self,
right: DataFrame,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>,
) -> Result<DataFrame>
pub fn join( self, right: DataFrame, join_type: JoinType, left_cols: &[&str], right_cols: &[&str], filter: Option<Expr>, ) -> Result<DataFrame>
Join this DataFrame
with another DataFrame
using explicitly specified
columns and an optional filter expression.
See join_on
for a more concise way to specify the
join condition. Since DataFusion will automatically identify and
optimize equality predicates there is no performance difference between
this function and join_on
left_cols
and right_cols
are used to form “equijoin” predicates (see
example below), which are then combined with the optional filter
expression.
Note that in case of outer join, the filter
is applied to only matched rows.
§Example
let ctx = SessionContext::new();
let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
.select(vec![
col("a").alias("a2"),
col("b").alias("b2"),
col("c").alias("c2")])?;
// Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
// finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
let expected = vec![
"+---+---+---+----+----+----+",
"| a | b | c | a2 | b2 | c2 |",
"+---+---+---+----+----+----+",
"| 1 | 2 | 3 | 1 | 2 | 3 |",
"+---+---+---+----+----+----+"
];
assert_batches_sorted_eq!(expected, &join.collect().await?);
Sourcepub fn join_on(
self,
right: DataFrame,
join_type: JoinType,
on_exprs: impl IntoIterator<Item = Expr>,
) -> Result<DataFrame>
pub fn join_on( self, right: DataFrame, join_type: JoinType, on_exprs: impl IntoIterator<Item = Expr>, ) -> Result<DataFrame>
Join this DataFrame
with another DataFrame
using the specified
expressions.
Note that DataFusion automatically optimizes joins, including identifying and optimizing equality predicates.
§Example
let ctx = SessionContext::new();
let left = ctx
.read_csv("tests/data/example.csv", CsvReadOptions::new())
.await?;
let right = ctx
.read_csv("tests/data/example.csv", CsvReadOptions::new())
.await?
.select(vec![
col("a").alias("a2"),
col("b").alias("b2"),
col("c").alias("c2"),
])?;
// Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
// finding all pairs of rows from `left` and `right` where
// where `a != a2` and `b != b2`.
let join_on = left.join_on(
right,
JoinType::Inner,
[col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
)?;
let expected = vec![
"+---+---+---+----+----+----+",
"| a | b | c | a2 | b2 | c2 |",
"+---+---+---+----+----+----+",
"+---+---+---+----+----+----+"
];
Sourcepub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>
pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>
Repartition a DataFrame based on a logical partitioning scheme.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 1 | 2 | 3 |",
"| 4 | 5 | 6 |",
"| 7 | 8 | 9 |",
"+---+---+---+"
];
Sourcepub async fn count(self) -> Result<usize>
pub async fn count(self) -> Result<usize>
Return the total number of rows in this DataFrame
.
Note that this method will actually run a plan to calculate the count, which may be slow for large or complicated DataFrames.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let count = df.count().await?; // 1
Sourcepub async fn collect(self) -> Result<Vec<RecordBatch>>
pub async fn collect(self) -> Result<Vec<RecordBatch>>
Execute this DataFrame
and buffer all resulting RecordBatch
es into memory.
Prior to calling collect
, modifying a DataFrame simply updates a plan
(no actual computation is performed). collect
triggers the computation.
See Self::execute_stream
to execute a DataFrame without buffering.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
Sourcepub async fn show(self) -> Result<()>
pub async fn show(self) -> Result<()>
Execute the DataFrame
and print the results to the console.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show().await?;
Sourcepub async fn show_limit(self, num: usize) -> Result<()>
pub async fn show_limit(self, num: usize) -> Result<()>
Execute the DataFrame
and print only the first num
rows of the
result to the console.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show_limit(10).await?;
Sourcepub fn task_ctx(&self) -> TaskContext
pub fn task_ctx(&self) -> TaskContext
Return a new TaskContext
which would be used to execute this DataFrame
Sourcepub async fn execute_stream(self) -> Result<SendableRecordBatchStream>
pub async fn execute_stream(self) -> Result<SendableRecordBatchStream>
Executes this DataFrame and returns a stream over a single partition
See Self::collect to buffer the RecordBatch
es in memory.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let stream = df.execute_stream().await?;
§Aborting Execution
Dropping the stream will abort the execution of the query, and free up any allocated resources
Sourcepub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>
pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>
Executes this DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect_partitioned().await?;
Sourcepub async fn execute_stream_partitioned(
self,
) -> Result<Vec<SendableRecordBatchStream>>
pub async fn execute_stream_partitioned( self, ) -> Result<Vec<SendableRecordBatchStream>>
Executes this DataFrame and returns one stream per partition.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.execute_stream_partitioned().await?;
§Aborting Execution
Dropping the stream will abort the execution of the query, and free up any allocated resources
Sourcepub fn schema(&self) -> &DFSchema
pub fn schema(&self) -> &DFSchema
Returns the DFSchema
describing the output of this DataFrame.
The output DFSchema
contains information on the name, data type, and
nullability for each column.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let schema = df.schema();
Sourcepub fn logical_plan(&self) -> &LogicalPlan
pub fn logical_plan(&self) -> &LogicalPlan
Return a reference to the unoptimized LogicalPlan
that comprises
this DataFrame.
See Self::into_unoptimized_plan
for more details.
Sourcepub fn into_parts(self) -> (SessionState, LogicalPlan)
pub fn into_parts(self) -> (SessionState, LogicalPlan)
Returns both the LogicalPlan
and SessionState
that comprise this DataFrame
Sourcepub fn into_unoptimized_plan(self) -> LogicalPlan
pub fn into_unoptimized_plan(self) -> LogicalPlan
Return the LogicalPlan
represented by this DataFrame without running
any optimizers
Note: This method should not be used outside testing, as it loses the
snapshot of the SessionState
attached to this DataFrame
and
consequently subsequent operations may take place against a different
state (e.g. a different value of now()
)
See Self::into_parts
to retrieve the owned LogicalPlan
and
corresponding SessionState
.
Sourcepub fn into_optimized_plan(self) -> Result<LogicalPlan>
pub fn into_optimized_plan(self) -> Result<LogicalPlan>
Return the optimized LogicalPlan
represented by this DataFrame.
Note: This method should not be used outside testing – see
Self::into_unoptimized_plan
for more details.
Sourcepub fn into_view(self) -> Arc<dyn TableProvider>
pub fn into_view(self) -> Arc<dyn TableProvider>
Converts this DataFrame
into a TableProvider
that can be registered
as a table view using SessionContext::register_table
.
Note: This discards the SessionState
associated with this
DataFrame
in favour of the one passed to TableProvider::scan
Sourcepub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>
pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>
Return a DataFrame with the explanation of its plan so far.
if analyze
is specified, runs the plan and reports metrics
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
Sourcepub fn registry(&self) -> &dyn FunctionRegistry
pub fn registry(&self) -> &dyn FunctionRegistry
Return a FunctionRegistry
used to plan udf’s calls
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let f = df.registry();
// use f.udf("name", vec![...]) to use the udf
Sourcepub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame>
pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame>
Calculate the intersection of two DataFrame
s. The two DataFrame
s must have exactly the same schema
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let df = df.intersect(d2)?;
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 1 | 2 | 3 |",
"+---+---+---+"
];
Sourcepub fn except(self, dataframe: DataFrame) -> Result<DataFrame>
pub fn except(self, dataframe: DataFrame) -> Result<DataFrame>
Calculate the exception of two DataFrame
s. The two DataFrame
s must have exactly the same schema
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example_long.csv", CsvReadOptions::new()).await?;
let d2 = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let result = df.except(d2)?;
// those columns are not in example.csv, but in example_long.csv
let expected = vec![
"+---+---+---+",
"| a | b | c |",
"+---+---+---+",
"| 4 | 5 | 6 |",
"| 7 | 8 | 9 |",
"+---+---+---+"
];
Sourcepub async fn write_table(
self,
table_name: &str,
write_options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError>
pub async fn write_table( self, table_name: &str, write_options: DataFrameWriteOptions, ) -> Result<Vec<RecordBatch>, DataFusionError>
Execute this DataFrame
and write the results to table_name
.
Returns a single RecordBatch containing a single column and row representing the count of total rows written.
Unlike most other DataFrame
methods, this method executes eagerly.
Data is written to the table using the TableProvider::insert_into
method. This is the same underlying implementation used by SQL INSERT INTO
statements.
Sourcepub async fn write_csv(
self,
path: &str,
options: DataFrameWriteOptions,
writer_options: Option<CsvOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError>
pub async fn write_csv( self, path: &str, options: DataFrameWriteOptions, writer_options: Option<CsvOptions>, ) -> Result<Vec<RecordBatch>, DataFusionError>
Execute the DataFrame
and write the results to CSV file(s).
§Example
use datafusion::dataframe::DataFrameWriteOptions;
let ctx = SessionContext::new();
// Sort the data by column "b" and write it to a new location
ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
.sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
.write_csv(
"output.csv",
DataFrameWriteOptions::new(),
None, // can also specify CSV writing options here
).await?;
Sourcepub async fn write_json(
self,
path: &str,
options: DataFrameWriteOptions,
writer_options: Option<JsonOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError>
pub async fn write_json( self, path: &str, options: DataFrameWriteOptions, writer_options: Option<JsonOptions>, ) -> Result<Vec<RecordBatch>, DataFusionError>
Execute the DataFrame
and write the results to JSON file(s).
§Example
use datafusion::dataframe::DataFrameWriteOptions;
let ctx = SessionContext::new();
// Sort the data by column "b" and write it to a new location
ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
.sort(vec![col("b").sort(true, true)])? // sort by b asc, nulls first
.write_json(
"output.json",
DataFrameWriteOptions::new(),
None
).await?;
Sourcepub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>
pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>
Add an additional column to the DataFrame.
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column("ab_sum", col("a") + col("b"))?;
Sourcepub fn with_column_renamed(
self,
old_name: impl Into<String>,
new_name: &str,
) -> Result<DataFrame>
pub fn with_column_renamed( self, old_name: impl Into<String>, new_name: &str, ) -> Result<DataFrame>
Rename one column by applying a new projection. This is a no-op if the column to be renamed does not exist.
The method supports case sensitive rename with wrapping column name into one of following symbols ( “ or ’ or ` )
Alternatively setting DataFusion param datafusion.sql_parser.enable_ident_normalization
to false
will enable
case sensitive rename without need to wrap column name into special symbols
§Example
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column_renamed("ab_sum", "total")?;
Sourcepub fn with_param_values(
self,
query_values: impl Into<ParamValues>,
) -> Result<Self>
pub fn with_param_values( self, query_values: impl Into<ParamValues>, ) -> Result<Self>
Replace all parameters in logical plan with the specified values, in preparation for execution.
§Example
use datafusion::prelude::*;
let ctx = SessionContext::new();
let results = ctx
.sql("SELECT a FROM example WHERE b = $1")
.await?
// replace $1 with value 2
.with_param_values(vec![
// value at index 0 --> $1
ScalarValue::from(2i64)
])?
.collect()
.await?;
assert_batches_eq!(
&[
"+---+",
"| a |",
"+---+",
"| 1 |",
"+---+",
],
&results
);
// Note you can also provide named parameters
let results = ctx
.sql("SELECT a FROM example WHERE b = $my_param")
.await?
// replace $my_param with value 2
// Note you can also use a HashMap as well
.with_param_values(vec![
("my_param", ScalarValue::from(2i64))
])?
.collect()
.await?;
assert_batches_eq!(
&[
"+---+",
"| a |",
"+---+",
"| 1 |",
"+---+",
],
&results
);
Trait Implementations§
Auto Trait Implementations§
impl Freeze for DataFrame
impl !RefUnwindSafe for DataFrame
impl Send for DataFrame
impl Sync for DataFrame
impl Unpin for DataFrame
impl !UnwindSafe for DataFrame
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§unsafe fn clone_to_uninit(&self, dst: *mut T)
unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more