pub struct DataFrame { /* private fields */ }
Expand description

DataFrame represents a logical set of rows with the same named columns. Similar to a Pandas DataFrame or Spark DataFrame

DataFrames are typically created by the read_csv and read_parquet methods on the SessionContext and can then be modified by calling the transformation methods, such as filter, select, aggregate, and limit to build up a query definition.

The query can be executed by calling the collect method.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
           .aggregate(vec![col("a")], vec![min(col("b"))])?
           .limit(0, Some(100))?;
let results = df.collect();

Implementations§

source§

impl DataFrame

source

pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self

Create a new Table based on an existing logical plan

source

pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>>

Create a physical plan

source

pub fn select_columns(self, columns: &[&str]) -> Result<DataFrame>

Filter the DataFrame by column. Returns a new DataFrame only containing the specified columns.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select_columns(&["a", "b"])?;
source

pub fn select(self, expr_list: Vec<Expr>) -> Result<DataFrame>

Create a projection based on arbitrary expressions.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.select(vec![col("a") * col("b"), col("c")])?;
source

pub fn unnest_column(self, column: &str) -> Result<DataFrame>

Expand each list element of a column to multiple rows.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.unnest_column("a")?;
source

pub fn filter(self, predicate: Expr) -> Result<DataFrame>

Filter a DataFrame to only include rows that match the specified filter expression.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?;
source

pub fn aggregate( self, group_expr: Vec<Expr>, aggr_expr: Vec<Expr> ) -> Result<DataFrame>

Perform an aggregate query with optional grouping expressions.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;

// The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
let _ = df.clone().aggregate(vec![col("a")], vec![min(col("b"))])?;

// The following use is the equivalent of "SELECT MIN(b)"
let _ = df.aggregate(vec![], vec![min(col("b"))])?;
source

pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame>

Apply one or more window functions (Expr::WindowFunction) to extend the schema

source

pub fn limit(self, skip: usize, fetch: Option<usize>) -> Result<DataFrame>

Limit the number of rows returned from this DataFrame.

skip - Number of rows to skip before fetch any row

fetch - Maximum number of rows to fetch, after skipping skip rows.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.limit(0, Some(100))?;
source

pub fn union(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the union of two DataFrames, preserving duplicate rows.The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.union(d2)?;
source

pub fn union_distinct(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the distinct union of two DataFrames. The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.union_distinct(d2)?;
source

pub fn distinct(self) -> Result<DataFrame>

Filter out duplicate rows

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.distinct()?;
source

pub async fn describe(self) -> Result<Self>

Summary statistics for a DataFrame. Only summarizes numeric datatypes at the moment and returns nulls for non numeric datatypes. Try in keep output similar to pandas

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
df.describe().await.unwrap();
source

pub fn sort(self, expr: Vec<Expr>) -> Result<DataFrame>

Sort the DataFrame by the specified sorting expressions. Any expression can be turned into a sort expression by calling its sort method.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
source

pub fn join( self, right: DataFrame, join_type: JoinType, left_cols: &[&str], right_cols: &[&str], filter: Option<Expr> ) -> Result<DataFrame>

Join this DataFrame with another DataFrame using the specified columns as join keys.

Filter expression expected to contain non-equality predicates that can not be pushed down to any of join inputs. In case of outer join, filter applied to only matched rows.

let ctx = SessionContext::new();
let left = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let right = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
  .select(vec![
    col("a").alias("a2"),
    col("b").alias("b2"),
    col("c").alias("c2")])?;
let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
let batches = join.collect().await?;
source

pub fn join_on( self, right: DataFrame, join_type: JoinType, on_exprs: impl IntoIterator<Item = Expr> ) -> Result<DataFrame>

Join this DataFrame with another DataFrame using the specified expressions.

Simply a thin wrapper over join where the join keys are not provided, and the provided expressions are AND’ed together to form the filter expression.

let ctx = SessionContext::new();
let left = ctx
    .read_csv("tests/data/example.csv", CsvReadOptions::new())
    .await?;
let right = ctx
    .read_csv("tests/data/example.csv", CsvReadOptions::new())
    .await?
    .select(vec![
        col("a").alias("a2"),
        col("b").alias("b2"),
        col("c").alias("c2"),
    ])?;
let join_on = left.join_on(
    right,
    JoinType::Inner,
    [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
)?;
let batches = join_on.collect().await?;
source

pub fn repartition(self, partitioning_scheme: Partitioning) -> Result<DataFrame>

Repartition a DataFrame based on a logical partitioning scheme.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
source

pub async fn count(self) -> Result<usize>

Run a count aggregate on the DataFrame and execute the DataFrame to collect this count and return it as a usize, to find the total number of rows after executing the DataFrame.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let count = df.count().await?;
source

pub async fn collect(self) -> Result<Vec<RecordBatch>>

Convert the logical plan represented by this DataFrame into a physical plan and execute it, collecting all resulting batches into memory Executes this DataFrame and collects all results into a vector of RecordBatch.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
source

pub async fn show(self) -> Result<()>

Print results.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show().await?;
source

pub async fn show_limit(self, num: usize) -> Result<()>

Print results and limit rows.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
df.show_limit(10).await?;
source

pub async fn execute_stream(self) -> Result<SendableRecordBatchStream>

Executes this DataFrame and returns a stream over a single partition

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let stream = df.execute_stream().await?;
source

pub async fn collect_partitioned(self) -> Result<Vec<Vec<RecordBatch>>>

Executes this DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect_partitioned().await?;
source

pub async fn execute_stream_partitioned( self ) -> Result<Vec<SendableRecordBatchStream>>

Executes this DataFrame and returns one stream per partition.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.execute_stream_partitioned().await?;
source

pub fn schema(&self) -> &DFSchema

Returns the schema describing the output of this DataFrame in terms of columns returned, where each column has a name, data type, and nullability attribute.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let schema = df.schema();
source

pub fn logical_plan(&self) -> &LogicalPlan

Return the unoptimized logical plan

source

pub fn into_parts(self) -> (SessionState, LogicalPlan)

Returns both the LogicalPlan and SessionState that comprise this DataFrame

source

pub fn into_unoptimized_plan(self) -> LogicalPlan

Return the logical plan represented by this DataFrame without running the optimizers

Note: This method should not be used outside testing, as it loses the snapshot of the SessionState attached to this DataFrame and consequently subsequent operations may take place against a different state

source

pub fn into_optimized_plan(self) -> Result<LogicalPlan>

Return the optimized logical plan represented by this DataFrame.

Note: This method should not be used outside testing, as it loses the snapshot of the SessionState attached to this DataFrame and consequently subsequent operations may take place against a different state

source

pub fn into_view(self) -> Arc<dyn TableProvider>

Converts this DataFrame into a TableProvider that can be registered as a table view using SessionContext::register_table.

Note: This discards the SessionState associated with this DataFrame in favour of the one passed to TableProvider::scan

source

pub fn to_logical_plan(self) -> Result<LogicalPlan>

👎Deprecated since 23.0.0: Use DataFrame::into_optimized_plan

Return the optimized logical plan represented by this DataFrame.

Note: This method should not be used outside testing, as it loses the snapshot of the SessionState attached to this DataFrame and consequently subsequent operations may take place against a different state

source

pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame>

Return a DataFrame with the explanation of its plan so far.

if analyze is specified, runs the plan and reports metrics

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
source

pub fn registry(&self) -> &dyn FunctionRegistry

Return a FunctionRegistry used to plan udf’s calls

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let f = df.registry();
// use f.udf("name", vec![...]) to use the udf
source

pub fn intersect(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the intersection of two DataFrames. The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.intersect(d2)?;
source

pub fn except(self, dataframe: DataFrame) -> Result<DataFrame>

Calculate the exception of two DataFrames. The two DataFrames must have exactly the same schema

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let d2 = df.clone();
let df = df.except(d2)?;
source

pub async fn write_csv(self, path: &str) -> Result<()>

Write a DataFrame to a CSV file.

source

pub async fn write_parquet( self, path: &str, writer_properties: Option<WriterProperties> ) -> Result<()>

Write a DataFrame to a Parquet file.

source

pub async fn write_json(self, path: impl AsRef<str>) -> Result<()>

Executes a query and writes the results to a partitioned JSON file.

source

pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame>

Add an additional column to the DataFrame.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column("ab_sum", col("a") + col("b"))?;
source

pub fn with_column_renamed( self, old_name: impl Into<Column>, new_name: &str ) -> Result<DataFrame>

Rename one column by applying a new projection. This is a no-op if the column to be renamed does not exist.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column_renamed("ab_sum", "total")?;
source

pub fn with_param_values(self, param_values: Vec<ScalarValue>) -> Result<Self>

Convert a prepare logical plan into its inner logical plan with all params replaced with their corresponding values

source

pub async fn cache(self) -> Result<DataFrame>

Cache DataFrame as a memory table.

let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.cache().await?;

Trait Implementations§

source§

impl Clone for DataFrame

source§

fn clone(&self) -> DataFrame

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for DataFrame

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> Same<T> for T

§

type Output = T

Should always be Self
source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more