Struct datafusion::dataframe::DataFrame
source · pub struct DataFrame { /* private fields */ }
Expand description
DataFrame represents a logical set of rows with the same named columns. Similar to a Pandas DataFrame or Spark DataFrame
DataFrames are typically created by the read_csv
and read_parquet
methods on the
SessionContext and can then be modified
by calling the transformation methods, such as filter
, select
, aggregate
, and limit
to build up a query definition.
The query can be executed by calling the collect
method.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
let results = df.collect();
Implementations§
source§impl DataFrame
impl DataFrame
sourcepub async fn write_parquet(
self,
path: &str,
options: DataFrameWriteOptions,
writer_properties: Option<WriterProperties>
) -> Result<Vec<RecordBatch>, DataFusionError>
pub async fn write_parquet( self, path: &str, options: DataFrameWriteOptions, writer_properties: Option<WriterProperties> ) -> Result<Vec<RecordBatch>, DataFusionError>
Write a DataFrame
to a Parquet file.
source§impl DataFrame
impl DataFrame
sourcepub fn new(session_state: SessionState, plan: LogicalPlan) -> Self
pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self
Create a new Table based on an existing logical plan
sourcepub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>
pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>
Create a physical plan
sourcepub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>
pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>
Filter the DataFrame by column. Returns a new DataFrame only containing the specified columns.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select_columns(&["a", "b"])?;
sourcepub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>
pub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>
Create a projection based on arbitrary expressions.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select(vec![col("a") * col("b"), col("c")])?;
sourcepub fn unnest_column(self, column: &str) -> Result<DataFrame>
pub fn unnest_column(self, column: &str) -> Result<DataFrame>
Expand each list element of a column to multiple rows.
Seee also:
UnnestOptions
documentation for the behavior ofunnest
Self::unnest_column_with_options
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.unnest_column("a")?;
sourcepub fn unnest_column_with_options(
self,
column: &str,
options: UnnestOptions
) -> Result<DataFrame>
pub fn unnest_column_with_options( self, column: &str, options: UnnestOptions ) -> Result<DataFrame>
Expand each list element of a column to multiple rows, with
behavior controlled by UnnestOptions
.
Please see the documentation on UnnestOptions
for more
details about the meaning of unnest.
sourcepub fn filter(self, predicate: Expr) -> Result<DataFrame>
pub fn filter(self, predicate: Expr) -> Result<DataFrame>
Filter a DataFrame to only include rows that match the specified filter expression.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?;
sourcepub fn aggregate(
self,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>
) -> Result<DataFrame>
pub fn aggregate( self, group_expr: Vec<Expr>, aggr_expr: Vec<Expr> ) -> Result<DataFrame>
Perform an aggregate query with optional grouping expressions.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
let _ = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
// The following use is the equivalent of "SELECT MIN(b)"
let _ = df.aggregate(vec![], vec![min(col("b"))])?;
sourcepub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame>
pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame>
Apply one or more window functions (Expr::WindowFunction
) to extend the schema
sourcepub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>
pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>
Limit the number of rows returned from this DataFrame.
skip
- Number of rows to skip before fetch any row
fetch
- Maximum number of rows to fetch, after skipping skip
rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.limit(0, Some(100))?;
sourcepub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>
pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>
sourcepub fn distinct(self) -> Result<DataFrame>
pub fn distinct(self) -> Result<DataFrame>
Filter out duplicate rows
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.distinct()?;
sourcepub async fn describe(self) -> Result<Self>
pub async fn describe(self) -> Result<Self>
Summary statistics for a DataFrame. Only summarizes numeric datatypes at the moment and returns nulls for non numeric datatypes. Try in keep output similar to pandas
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
df.describe().await.unwrap();
sourcepub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame>
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame>
Sort the DataFrame by the specified sorting expressions. Any expression can be turned into a sort expression by calling its sort method.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
sourcepub fn join(
self,
right: DataFrame,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>
) -> Result<DataFrame>
pub fn join( self, right: DataFrame, join_type: JoinType, left_cols: &[&str], right_cols: &[&str], filter: Option<Expr> ) -> Result<DataFrame>
Join this DataFrame
with another DataFrame
using explicitly specified
columns and an optional filter expression.
See join_on
for a more concise way to specify the
join condition. Since DataFusion will automatically identify and
optimize equality predicates there is no performance difference between
this function and join_on
left_cols
and right_cols
are used to form “equijoin” predicates (see
example below), which are then combined with the optional filter
expression.
Note that in case of outer join, the filter
is applied to only matched rows.
Example
let ctx = SessionContext::new();
let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
.select(vec![
col("a").alias("a2"),
col("b").alias("b2"),
col("c").alias("c2")])?;
// Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
// finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
let batches = join.collect().await?;
sourcepub fn join_on(
self,
right: DataFrame,
join_type: JoinType,
on_exprs: impl IntoIterator<Item = Expr>
) -> Result<DataFrame>
pub fn join_on( self, right: DataFrame, join_type: JoinType, on_exprs: impl IntoIterator<Item = Expr> ) -> Result<DataFrame>
Join this DataFrame
with another DataFrame
using the specified
expressions.
Note that DataFusion automatically optimizes joins, including identifying and optimizing equality predicates.
Example
let ctx = SessionContext::new();
let left = ctx
.read_csv("tests/data/example.csv", CsvReadOptions::new())
.await?;
let right = ctx
.read_csv("tests/data/example.csv", CsvReadOptions::new())
.await?
.select(vec![
col("a").alias("a2"),
col("b").alias("b2"),
col("c").alias("c2"),
])?;
// Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
// finding all pairs of rows from `left` and `right` where
// where `a != a2` and `b != b2`.
let join_on = left.join_on(
right,
JoinType::Inner,
[col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
)?;
let batches = join_on.collect().await?;
sourcepub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>
pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>
Repartition a DataFrame based on a logical partitioning scheme.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
sourcepub async fn count(self) -> Result<usize>
pub async fn count(self) -> Result<usize>
Run a count aggregate on the DataFrame and execute the DataFrame to collect this count and return it as a usize, to find the total number of rows after executing the DataFrame.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let count = df.count().await?;
sourcepub async fn collect(self) -> Result<Vec<RecordBatch>>
pub async fn collect(self) -> Result<Vec<RecordBatch>>
Convert the logical plan represented by this DataFrame into a physical plan and execute it, collecting all resulting batches into memory Executes this DataFrame and collects all results into a vector of RecordBatch.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
sourcepub async fn show(self) -> Result<()>
pub async fn show(self) -> Result<()>
Print results.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show().await?;
sourcepub async fn show_limit(self, num: usize) -> Result<()>
pub async fn show_limit(self, num: usize) -> Result<()>
Print results and limit rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show_limit(10).await?;
sourcepub fn task_ctx(&self) -> TaskContext
pub fn task_ctx(&self) -> TaskContext
Get a new TaskContext to run in this session
sourcepub async fn execute_stream(self) -> Result<SendableRecordBatchStream>
pub async fn execute_stream(self) -> Result<SendableRecordBatchStream>
Executes this DataFrame and returns a stream over a single partition
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let stream = df.execute_stream().await?;
sourcepub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>
pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>
Executes this DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect_partitioned().await?;
sourcepub async fn execute_stream_partitioned(
self
) -> Result<Vec<SendableRecordBatchStream>>
pub async fn execute_stream_partitioned( self ) -> Result<Vec<SendableRecordBatchStream>>
Executes this DataFrame and returns one stream per partition.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.execute_stream_partitioned().await?;
sourcepub fn schema(&self) -> &DFSchema
pub fn schema(&self) -> &DFSchema
Returns the schema describing the output of this DataFrame in terms of columns returned, where each column has a name, data type, and nullability attribute.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let schema = df.schema();
sourcepub fn logical_plan(&self) -> &LogicalPlan
pub fn logical_plan(&self) -> &LogicalPlan
Return the unoptimized logical plan
sourcepub fn into_parts(self) -> (SessionState, LogicalPlan)
pub fn into_parts(self) -> (SessionState, LogicalPlan)
Returns both the LogicalPlan
and SessionState
that comprise this DataFrame
sourcepub fn into_unoptimized_plan(self) -> LogicalPlan
pub fn into_unoptimized_plan(self) -> LogicalPlan
Return the logical plan represented by this DataFrame without running the optimizers
Note: This method should not be used outside testing, as it loses the snapshot
of the SessionState
attached to this DataFrame
and consequently subsequent
operations may take place against a different state
sourcepub fn into_optimized_plan(self) -> Result<LogicalPlan>
pub fn into_optimized_plan(self) -> Result<LogicalPlan>
Return the optimized logical plan represented by this DataFrame.
Note: This method should not be used outside testing, as it loses the snapshot
of the SessionState
attached to this DataFrame
and consequently subsequent
operations may take place against a different state
sourcepub fn into_view(self) -> Arc<dyn TableProvider>
pub fn into_view(self) -> Arc<dyn TableProvider>
Converts this DataFrame
into a TableProvider
that can be registered
as a table view using SessionContext::register_table
.
Note: This discards the SessionState
associated with this
DataFrame
in favour of the one passed to TableProvider::scan
sourcepub fn to_logical_plan(self) -> Result<LogicalPlan>
👎Deprecated since 23.0.0: Use DataFrame::into_optimized_plan
pub fn to_logical_plan(self) -> Result<LogicalPlan>
Return the optimized logical plan represented by this DataFrame.
Note: This method should not be used outside testing, as it loses the snapshot
of the SessionState
attached to this DataFrame
and consequently subsequent
operations may take place against a different state
sourcepub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>
pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>
Return a DataFrame with the explanation of its plan so far.
if analyze
is specified, runs the plan and reports metrics
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
sourcepub fn registry(&self) -> &dyn FunctionRegistry
pub fn registry(&self) -> &dyn FunctionRegistry
Return a FunctionRegistry
used to plan udf’s calls
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let f = df.registry();
// use f.udf("name", vec![...]) to use the udf
sourcepub async fn write_table(
self,
table_name: &str,
write_options: DataFrameWriteOptions
) -> Result<Vec<RecordBatch>, DataFusionError>
pub async fn write_table( self, table_name: &str, write_options: DataFrameWriteOptions ) -> Result<Vec<RecordBatch>, DataFusionError>
Write this DataFrame to the referenced table This method uses on the same underlying implementation as the SQL Insert Into statement. Unlike most other DataFrame methods, this method executes eagerly, writing data, and returning the count of rows written.
sourcepub async fn write_csv(
self,
path: &str,
options: DataFrameWriteOptions,
writer_properties: Option<WriterBuilder>
) -> Result<Vec<RecordBatch>, DataFusionError>
pub async fn write_csv( self, path: &str, options: DataFrameWriteOptions, writer_properties: Option<WriterBuilder> ) -> Result<Vec<RecordBatch>, DataFusionError>
Write a DataFrame
to a CSV file.
sourcepub async fn write_json(
self,
path: &str,
options: DataFrameWriteOptions
) -> Result<Vec<RecordBatch>, DataFusionError>
pub async fn write_json( self, path: &str, options: DataFrameWriteOptions ) -> Result<Vec<RecordBatch>, DataFusionError>
Executes a query and writes the results to a partitioned JSON file.
sourcepub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>
pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>
Add an additional column to the DataFrame.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column("ab_sum", col("a") + col("b"))?;
sourcepub fn with_column_renamed(
self,
old_name: impl Into<String>,
new_name: &str
) -> Result<DataFrame>
pub fn with_column_renamed( self, old_name: impl Into<String>, new_name: &str ) -> Result<DataFrame>
Rename one column by applying a new projection. This is a no-op if the column to be renamed does not exist.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column_renamed("ab_sum", "total")?;
sourcepub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self>
pub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self>
Replace all parameters in logical plan with the specified values, in preparation for execution.
Example
use datafusion::prelude::*;
let mut ctx = SessionContext::new();
let results = ctx
.sql("SELECT a FROM example WHERE b = $1")
.await?
// replace $1 with value 2
.with_param_values(vec![
// value at index 0 --> $1
ScalarValue::from(2i64)
])?
.collect()
.await?;
assert_batches_eq!(
&[
"+---+",
"| a |",
"+---+",
"| 1 |",
"+---+",
],
&results
);