Struct datafusion::dataframe::DataFrame
source · pub struct DataFrame { /* private fields */ }
Expand description
DataFrame represents a logical set of rows with the same named columns. Similar to a Pandas DataFrame or Spark DataFrame
DataFrames are typically created by the read_csv
and read_parquet
methods on the
SessionContext and can then be modified
by calling the transformation methods, such as filter
, select
, aggregate
, and limit
to build up a query definition.
The query can be executed by calling the collect
method.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
let results = df.collect();
Implementations§
source§impl DataFrame
impl DataFrame
sourcepub fn new(session_state: SessionState, plan: LogicalPlan) -> Self
pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self
Create a new Table based on an existing logical plan
sourcepub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>
pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>
Create a physical plan
sourcepub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>
pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>
Filter the DataFrame by column. Returns a new DataFrame only containing the specified columns.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select_columns(&["a", "b"])?;
sourcepub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>
pub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>
Create a projection based on arbitrary expressions.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select(vec![col("a") * col("b"), col("c")])?;
sourcepub fn unnest_column(self, column: &str) -> Result<DataFrame>
pub fn unnest_column(self, column: &str) -> Result<DataFrame>
Expand each list element of a column to multiple rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.unnest_column("a")?;
sourcepub fn filter(self, predicate: Expr) -> Result<DataFrame>
pub fn filter(self, predicate: Expr) -> Result<DataFrame>
Filter a DataFrame to only include rows that match the specified filter expression.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?;
sourcepub fn aggregate(
self,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>
) -> Result<DataFrame>
pub fn aggregate( self, group_expr: Vec<Expr>, aggr_expr: Vec<Expr> ) -> Result<DataFrame>
Perform an aggregate query with optional grouping expressions.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
// The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
let _ = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;
// The following use is the equivalent of "SELECT MIN(b)"
let _ = df.aggregate(vec![], vec![min(col("b"))])?;
sourcepub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>
pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>
Limit the number of rows returned from this DataFrame.
skip
- Number of rows to skip before fetch any row
fetch
- Maximum number of rows to fetch, after skipping skip
rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.limit(0, Some(100))?;
sourcepub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>
pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>
sourcepub fn distinct(self) -> Result<DataFrame>
pub fn distinct(self) -> Result<DataFrame>
Filter out duplicate rows
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.distinct()?;
sourcepub async fn describe(self) -> Result<Self>
pub async fn describe(self) -> Result<Self>
Summary statistics for a DataFrame. Only summarizes numeric datatypes at the moment and returns nulls for non numeric datatypes. Try in keep output similar to pandas
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
df.describe().await.unwrap();
sourcepub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame>
pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame>
Sort the DataFrame by the specified sorting expressions. Any expression can be turned into a sort expression by calling its sort method.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
sourcepub fn join(
self,
right: DataFrame,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>
) -> Result<DataFrame>
pub fn join( self, right: DataFrame, join_type: JoinType, left_cols: &[&str], right_cols: &[&str], filter: Option<Expr> ) -> Result<DataFrame>
Join this DataFrame with another DataFrame using the specified columns as join keys.
Filter expression expected to contain non-equality predicates that can not be pushed down to any of join inputs. In case of outer join, filter applied to only matched rows.
let ctx = SessionContext::new();
let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
.select(vec![
col("a").alias("a2"),
col("b").alias("b2"),
col("c").alias("c2")])?;
let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
let batches = join.collect().await?;
sourcepub fn join_on(
self,
right: DataFrame,
join_type: JoinType,
on_exprs: impl IntoIterator<Item = Expr>
) -> Result<DataFrame>
pub fn join_on( self, right: DataFrame, join_type: JoinType, on_exprs: impl IntoIterator<Item = Expr> ) -> Result<DataFrame>
Join this DataFrame with another DataFrame using the specified expressions.
Simply a thin wrapper over join
where the join keys are not provided,
and the provided expressions are AND’ed together to form the filter expression.
let ctx = SessionContext::new();
let left = ctx
.read_csv("tests/data/example.csv", CsvReadOptions::new())
.await?;
let right = ctx
.read_csv("tests/data/example.csv", CsvReadOptions::new())
.await?
.select(vec![
col("a").alias("a2"),
col("b").alias("b2"),
col("c").alias("c2"),
])?;
let join_on = left.join_on(
right,
JoinType::Inner,
[col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
)?;
let batches = join_on.collect().await?;
sourcepub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>
pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>
Repartition a DataFrame based on a logical partitioning scheme.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
sourcepub async fn count(self) -> Result<usize>
pub async fn count(self) -> Result<usize>
Run a count aggregate on the DataFrame and execute the DataFrame to collect this count and return it as a usize, to find the total number of rows after executing the DataFrame.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let count = df.count().await?;
sourcepub async fn collect(self) -> Result<Vec<RecordBatch>>
pub async fn collect(self) -> Result<Vec<RecordBatch>>
Convert the logical plan represented by this DataFrame into a physical plan and execute it, collecting all resulting batches into memory Executes this DataFrame and collects all results into a vector of RecordBatch.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
sourcepub async fn show(self) -> Result<()>
pub async fn show(self) -> Result<()>
Print results.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show().await?;
sourcepub async fn show_limit(self, num: usize) -> Result<()>
pub async fn show_limit(self, num: usize) -> Result<()>
Print results and limit rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show_limit(10).await?;
sourcepub async fn execute_stream(self) -> Result<SendableRecordBatchStream>
pub async fn execute_stream(self) -> Result<SendableRecordBatchStream>
Executes this DataFrame and returns a stream over a single partition
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let stream = df.execute_stream().await?;
sourcepub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>
pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>
Executes this DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect_partitioned().await?;
sourcepub async fn execute_stream_partitioned(
self
) -> Result<Vec<SendableRecordBatchStream>>
pub async fn execute_stream_partitioned( self ) -> Result<Vec<SendableRecordBatchStream>>
Executes this DataFrame and returns one stream per partition.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.execute_stream_partitioned().await?;
sourcepub fn schema(&self) -> &DFSchema
pub fn schema(&self) -> &DFSchema
Returns the schema describing the output of this DataFrame in terms of columns returned, where each column has a name, data type, and nullability attribute.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let schema = df.schema();
sourcepub fn logical_plan(&self) -> &LogicalPlan
pub fn logical_plan(&self) -> &LogicalPlan
Return the unoptimized logical plan
sourcepub fn into_parts(self) -> (SessionState, LogicalPlan)
pub fn into_parts(self) -> (SessionState, LogicalPlan)
Returns both the LogicalPlan
and SessionState
that comprise this DataFrame
sourcepub fn into_unoptimized_plan(self) -> LogicalPlan
pub fn into_unoptimized_plan(self) -> LogicalPlan
Return the logical plan represented by this DataFrame without running the optimizers
Note: This method should not be used outside testing, as it loses the snapshot
of the SessionState
attached to this DataFrame
and consequently subsequent
operations may take place against a different state
sourcepub fn into_optimized_plan(self) -> Result<LogicalPlan>
pub fn into_optimized_plan(self) -> Result<LogicalPlan>
Return the optimized logical plan represented by this DataFrame.
Note: This method should not be used outside testing, as it loses the snapshot
of the SessionState
attached to this DataFrame
and consequently subsequent
operations may take place against a different state
sourcepub fn into_view(self) -> Arc<dyn TableProvider>
pub fn into_view(self) -> Arc<dyn TableProvider>
Converts this DataFrame
into a TableProvider
that can be registered
as a table view using SessionContext::register_table
.
Note: This discards the SessionState
associated with this
DataFrame
in favour of the one passed to TableProvider::scan
sourcepub fn to_logical_plan(self) -> Result<LogicalPlan>
👎Deprecated since 23.0.0: Use DataFrame::into_optimized_plan
pub fn to_logical_plan(self) -> Result<LogicalPlan>
Return the optimized logical plan represented by this DataFrame.
Note: This method should not be used outside testing, as it loses the snapshot
of the SessionState
attached to this DataFrame
and consequently subsequent
operations may take place against a different state
sourcepub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>
pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>
Return a DataFrame with the explanation of its plan so far.
if analyze
is specified, runs the plan and reports metrics
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
sourcepub fn registry(&self) -> &dyn FunctionRegistry
pub fn registry(&self) -> &dyn FunctionRegistry
Return a FunctionRegistry
used to plan udf’s calls
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let f = df.registry();
// use f.udf("name", vec![...]) to use the udf
sourcepub async fn write_parquet(
self,
path: &str,
writer_properties: Option<WriterProperties>
) -> Result<()>
pub async fn write_parquet( self, path: &str, writer_properties: Option<WriterProperties> ) -> Result<()>
Write a DataFrame
to a Parquet file.
sourcepub async fn write_json(self, path: impl AsRef<str>) -> Result<()>
pub async fn write_json(self, path: impl AsRef<str>) -> Result<()>
Executes a query and writes the results to a partitioned JSON file.
sourcepub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>
pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>
Add an additional column to the DataFrame.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column("ab_sum", col("a") + col("b"))?;
sourcepub fn with_column_renamed(
self,
old_name: impl Into<Column>,
new_name: &str
) -> Result<DataFrame>
pub fn with_column_renamed( self, old_name: impl Into<Column>, new_name: &str ) -> Result<DataFrame>
Rename one column by applying a new projection. This is a no-op if the column to be renamed does not exist.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column_renamed("ab_sum", "total")?;
sourcepub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self>
pub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self>
Convert a prepare logical plan into its inner logical plan with all params replaced with their corresponding values