pub struct DataFrame { /* private fields */ }
Expand description

DataFrame represents a logical set of rows with the same named columns. Similar to a Pandas DataFrame or Spark DataFrame

DataFrames are typically created by the read_csv and read_parquet methods on the SessionContext and can then be modified by calling the transformation methods, such as filter, select, aggregate, and limit to build up a query definition.

The query can be executed by calling the collect method.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
           .aggregate(vec![col("a")], vec![min(col("b"))])?
           .limit(0, Some(100))?;
let results = df.collect();

Implementations§

source§

impl DataFrame

source

pub async fn write_parquet( self, path: &str, options: DataFrameWriteOptions, writer_properties: Option<WriterProperties> ) -> Result<Vec<RecordBatch>, DataFusionError>

Write a DataFrame to a Parquet file.

source§

impl DataFrame

source

pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self

Create a new Table based on an existing logical plan

source

pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>

Create a physical plan

source

pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>

Filter the DataFrame by column. Returns a new DataFrame only containing the specified columns.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select_columns(&["a", "b"])?;
source

pub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>

Create a projection based on arbitrary expressions.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select(vec![col("a") * col("b"), col("c")])?;
source

pub fn unnest_column(self, column: &str) -> Result<DataFrame>

Expand each list element of a column to multiple rows.

Seee also:

  1. UnnestOptions documentation for the behavior of unnest
  2. Self::unnest_column_with_options
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.unnest_column("a")?;
source

pub fn unnest_column_with_options( self, column: &str, options: UnnestOptions ) -> Result<DataFrame>

Expand each list element of a column to multiple rows, with behavior controlled by UnnestOptions.

Please see the documentation on UnnestOptions for more details about the meaning of unnest.

source

pub fn filter(self, predicate: Expr) -> Result<DataFrame>

Filter a DataFrame to only include rows that match the specified filter expression.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?;
source

pub fn aggregate( self, group_expr: Vec<Expr>, aggr_expr: Vec<Expr> ) -> Result<DataFrame>

Perform an aggregate query with optional grouping expressions.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;

// The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
let _ = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;

// The following use is the equivalent of "SELECT MIN(b)"
let _ = df.aggregate(vec![], vec![min(col("b"))])?;
source

pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame>

Apply one or more window functions (Expr::WindowFunction) to extend the schema

source

pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>

Limit the number of rows returned from this DataFrame.

skip - Number of rows to skip before fetch any row

fetch - Maximum number of rows to fetch, after skipping skip rows.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.limit(0, Some(100))?;
source

pub fn union(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the union of two DataFrames, preserving duplicate rows.The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.union(d2)?;
source

pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the distinct union of two DataFrames. The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.union_distinct(d2)?;
source

pub fn distinct(self) -> Result<DataFrame>

Filter out duplicate rows

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.distinct()?;
source

pub async fn describe(self) -> Result<Self>

Summary statistics for a DataFrame. Only summarizes numeric datatypes at the moment and returns nulls for non numeric datatypes. Try in keep output similar to pandas

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
df.describe().await.unwrap();
source

pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame>

Sort the DataFrame by the specified sorting expressions. Any expression can be turned into a sort expression by calling its sort method.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
source

pub fn join( self, right: DataFrame, join_type: JoinType, left_cols: &[&str], right_cols: &[&str], filter: Option<Expr> ) -> Result<DataFrame>

Join this DataFrame with another DataFrame using explicitly specified columns and an optional filter expression.

See join_on for a more concise way to specify the join condition. Since DataFusion will automatically identify and optimize equality predicates there is no performance difference between this function and join_on

left_cols and right_cols are used to form “equijoin” predicates (see example below), which are then combined with the optional filter expression.

Note that in case of outer join, the filter is applied to only matched rows.

Example
let ctx = SessionContext::new();
let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
  .select(vec![
    col("a").alias("a2"),
    col("b").alias("b2"),
    col("c").alias("c2")])?;
// Perform the equivalent of `left INNER JOIN right ON (a = a2 AND b = b2)`
// finding all pairs of rows from `left` and `right` where `a = a2` and `b = b2`.
let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
let batches = join.collect().await?;
source

pub fn join_on( self, right: DataFrame, join_type: JoinType, on_exprs: impl IntoIterator<Item = Expr> ) -> Result<DataFrame>

Join this DataFrame with another DataFrame using the specified expressions.

Note that DataFusion automatically optimizes joins, including identifying and optimizing equality predicates.

Example
let ctx = SessionContext::new();
let left = ctx
    .read_csv("tests/data/example.csv", CsvReadOptions::new())
    .await?;
let right = ctx
    .read_csv("tests/data/example.csv", CsvReadOptions::new())
    .await?
    .select(vec![
        col("a").alias("a2"),
        col("b").alias("b2"),
        col("c").alias("c2"),
    ])?;

// Perform the equivalent of `left INNER JOIN right ON (a != a2 AND b != b2)`
// finding all pairs of rows from `left` and `right` where
// where `a != a2` and `b != b2`.
let join_on = left.join_on(
    right,
    JoinType::Inner,
    [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
)?;
let batches = join_on.collect().await?;
source

pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>

Repartition a DataFrame based on a logical partitioning scheme.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
source

pub async fn count(self) -> Result<usize>

Run a count aggregate on the DataFrame and execute the DataFrame to collect this count and return it as a usize, to find the total number of rows after executing the DataFrame.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let count = df.count().await?;
source

pub async fn collect(self) -> Result<Vec<RecordBatch>>

Convert the logical plan represented by this DataFrame into a physical plan and execute it, collecting all resulting batches into memory Executes this DataFrame and collects all results into a vector of RecordBatch.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
source

pub async fn show(self) -> Result<()>

Print results.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show().await?;
source

pub async fn show_limit(self, num: usize) -> Result<()>

Print results and limit rows.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show_limit(10).await?;
source

pub fn task_ctx(&self) -> TaskContext

Get a new TaskContext to run in this session

source

pub async fn execute_stream(self) -> Result<SendableRecordBatchStream>

Executes this DataFrame and returns a stream over a single partition

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let stream = df.execute_stream().await?;
source

pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>

Executes this DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect_partitioned().await?;
source

pub async fn execute_stream_partitioned( self ) -> Result<Vec<SendableRecordBatchStream>>

Executes this DataFrame and returns one stream per partition.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.execute_stream_partitioned().await?;
source

pub fn schema(&self) -> &DFSchema

Returns the schema describing the output of this DataFrame in terms of columns returned, where each column has a name, data type, and nullability attribute.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let schema = df.schema();
source

pub fn logical_plan(&self) -> &LogicalPlan

Return the unoptimized logical plan

source

pub fn into_parts(self) -> (SessionState, LogicalPlan)

Returns both the LogicalPlan and SessionState that comprise this DataFrame

source

pub fn into_unoptimized_plan(self) -> LogicalPlan

Return the logical plan represented by this DataFrame without running the optimizers

Note: This method should not be used outside testing, as it loses the snapshot of the SessionState attached to this DataFrame and consequently subsequent operations may take place against a different state

source

pub fn into_optimized_plan(self) -> Result<LogicalPlan>

Return the optimized logical plan represented by this DataFrame.

Note: This method should not be used outside testing, as it loses the snapshot of the SessionState attached to this DataFrame and consequently subsequent operations may take place against a different state

source

pub fn into_view(self) -> Arc<dyn TableProvider>

Converts this DataFrame into a TableProvider that can be registered as a table view using SessionContext::register_table.

Note: This discards the SessionState associated with this DataFrame in favour of the one passed to TableProvider::scan

source

pub fn to_logical_plan(self) -> Result<LogicalPlan>

👎Deprecated since 23.0.0: Use DataFrame::into_optimized_plan

Return the optimized logical plan represented by this DataFrame.

Note: This method should not be used outside testing, as it loses the snapshot of the SessionState attached to this DataFrame and consequently subsequent operations may take place against a different state

source

pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>

Return a DataFrame with the explanation of its plan so far.

if analyze is specified, runs the plan and reports metrics

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
source

pub fn registry(&self) -> &dyn FunctionRegistry

Return a FunctionRegistry used to plan udf’s calls

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let f = df.registry();
// use f.udf("name", vec![...]) to use the udf
source

pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the intersection of two DataFrames. The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.intersect(d2)?;
source

pub fn except(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the exception of two DataFrames. The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.except(d2)?;
source

pub async fn write_table( self, table_name: &str, write_options: DataFrameWriteOptions ) -> Result<Vec<RecordBatch>, DataFusionError>

Write this DataFrame to the referenced table This method uses on the same underlying implementation as the SQL Insert Into statement. Unlike most other DataFrame methods, this method executes eagerly, writing data, and returning the count of rows written.

source

pub async fn write_csv( self, path: &str, options: DataFrameWriteOptions, writer_properties: Option<WriterBuilder> ) -> Result<Vec<RecordBatch>, DataFusionError>

Write a DataFrame to a CSV file.

source

pub async fn write_json( self, path: &str, options: DataFrameWriteOptions ) -> Result<Vec<RecordBatch>, DataFusionError>

Executes a query and writes the results to a partitioned JSON file.

source

pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>

Add an additional column to the DataFrame.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column("ab_sum", col("a") + col("b"))?;
source

pub fn with_column_renamed( self, old_name: impl Into<String>, new_name: &str ) -> Result<DataFrame>

Rename one column by applying a new projection. This is a no-op if the column to be renamed does not exist.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column_renamed("ab_sum", "total")?;
source

pub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self>

Replace all parameters in logical plan with the specified values, in preparation for execution.

Example
use datafusion::prelude::*;
let mut ctx = SessionContext::new();
let results = ctx
  .sql("SELECT a FROM example WHERE b = $1")
  .await?
   // replace $1 with value 2
  .with_param_values(vec![
     // value at index 0 --> $1
     ScalarValue::from(2i64)
   ])?
  .collect()
  .await?;
assert_batches_eq!(
 &[
   "+---+",
   "| a |",
   "+---+",
   "| 1 |",
   "+---+",
 ],
 &results
);
source

pub async fn cache(self) -> Result<DataFrame>

Cache DataFrame as a memory table.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.cache().await?;

Trait Implementations§

source§

impl Clone for DataFrame

source§

fn clone(&self) -> DataFrame

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for DataFrame

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

§

fn vzip(self) -> V