Struct datafusion::dataframe::DataFrame
source · [−]pub struct DataFrame { /* private fields */ }
Expand description
DataFrame represents a logical set of rows with the same named columns. Similar to a Pandas DataFrame or Spark DataFrame
DataFrames are typically created by the read_csv
and read_parquet
methods on the
SessionContext and can then be modified
by calling the transformation methods, such as filter
, select
, aggregate
, and limit
to build up a query definition.
The query can be executed by calling the collect
method.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
let results = df.collect();
Implementations
sourceimpl DataFrame
impl DataFrame
sourcepub fn new(session_state: Arc<RwLock<SessionState>>, plan: &LogicalPlan) -> Self
pub fn new(session_state: Arc<RwLock<SessionState>>, plan: &LogicalPlan) -> Self
Create a new Table based on an existing logical plan
sourcepub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>>
pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>>
Create a physical plan
sourcepub fn select_columns(&self, columns: &[&str]) -> Result<Arc<DataFrame>>
pub fn select_columns(&self, columns: &[&str]) -> Result<Arc<DataFrame>>
Filter the DataFrame by column. Returns a new DataFrame only containing the specified columns.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.select_columns(&["a", "b"])?;
sourcepub fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<DataFrame>>
pub fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<DataFrame>>
Create a projection based on arbitrary expressions.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.select(vec![col("a") * col("b"), col("c")])?;
sourcepub fn filter(&self, predicate: Expr) -> Result<Arc<DataFrame>>
pub fn filter(&self, predicate: Expr) -> Result<Arc<DataFrame>>
Filter a DataFrame to only include rows that match the specified filter expression.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?;
sourcepub fn aggregate(
&self,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>
) -> Result<Arc<DataFrame>>
pub fn aggregate(
&self,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>
) -> Result<Arc<DataFrame>>
Perform an aggregate query with optional grouping expressions.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
// The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
let _ = df.aggregate(vec![col("a")], vec![min(col("b"))])?;
// The following use is the equivalent of "SELECT MIN(b)"
let _ = df.aggregate(vec![], vec![min(col("b"))])?;
sourcepub fn limit(&self, skip: usize, fetch: Option<usize>) -> Result<Arc<DataFrame>>
pub fn limit(&self, skip: usize, fetch: Option<usize>) -> Result<Arc<DataFrame>>
Limit the number of rows returned from this DataFrame.
skip
- Number of rows to skip before fetch any row
fetch
- Maximum number of rows to fetch, after skipping skip
rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.limit(0, Some(100))?;
sourcepub fn distinct(&self) -> Result<Arc<DataFrame>>
pub fn distinct(&self) -> Result<Arc<DataFrame>>
Filter out duplicate rows
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.distinct()?;
sourcepub fn sort(&self, expr: Vec<Expr>) -> Result<Arc<DataFrame>>
pub fn sort(&self, expr: Vec<Expr>) -> Result<Arc<DataFrame>>
Sort the DataFrame by the specified sorting expressions. Any expression can be turned into a sort expression by calling its sort method.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
sourcepub fn join(
&self,
right: Arc<DataFrame>,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>
) -> Result<Arc<DataFrame>>
pub fn join(
&self,
right: Arc<DataFrame>,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>
) -> Result<Arc<DataFrame>>
Join this DataFrame with another DataFrame using the specified columns as join keys.
Filter expression expected to contain non-equality predicates that can not be pushed down to any of join inputs. In case of outer join, filter applied to only matched rows.
let ctx = SessionContext::new();
let left = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let right = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?
.select(vec![
col("a").alias("a2"),
col("b").alias("b2"),
col("c").alias("c2")])?;
let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
let batches = join.collect().await?;
sourcepub fn repartition(
&self,
partitioning_scheme: Partitioning
) -> Result<Arc<DataFrame>>
pub fn repartition(
&self,
partitioning_scheme: Partitioning
) -> Result<Arc<DataFrame>>
Repartition a DataFrame based on a logical partitioning scheme.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
sourcepub async fn collect(&self) -> Result<Vec<RecordBatch>>
pub async fn collect(&self) -> Result<Vec<RecordBatch>>
Convert the logical plan represented by this DataFrame into a physical plan and execute it, collecting all resulting batches into memory Executes this DataFrame and collects all results into a vector of RecordBatch.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
sourcepub async fn show(&self) -> Result<()>
pub async fn show(&self) -> Result<()>
Print results.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
df.show().await?;
sourcepub async fn show_limit(&self, num: usize) -> Result<()>
pub async fn show_limit(&self, num: usize) -> Result<()>
Print results and limit rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
df.show_limit(10).await?;
sourcepub async fn execute_stream(&self) -> Result<SendableRecordBatchStream>
pub async fn execute_stream(&self) -> Result<SendableRecordBatchStream>
Executes this DataFrame and returns a stream over a single partition
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let stream = df.execute_stream().await?;
sourcepub async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>
pub async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>
Executes this DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect_partitioned().await?;
sourcepub async fn execute_stream_partitioned(
&self
) -> Result<Vec<SendableRecordBatchStream>>
pub async fn execute_stream_partitioned(
&self
) -> Result<Vec<SendableRecordBatchStream>>
Executes this DataFrame and returns one stream per partition.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let batches = df.execute_stream_partitioned().await?;
sourcepub fn schema(&self) -> &DFSchema
pub fn schema(&self) -> &DFSchema
Returns the schema describing the output of this DataFrame in terms of columns returned, where each column has a name, data type, and nullability attribute.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let schema = df.schema();
sourcepub fn to_unoptimized_plan(&self) -> LogicalPlan
pub fn to_unoptimized_plan(&self) -> LogicalPlan
Return the unoptimized logical plan represented by this DataFrame.
sourcepub fn to_logical_plan(&self) -> Result<LogicalPlan>
pub fn to_logical_plan(&self) -> Result<LogicalPlan>
Return the optimized logical plan represented by this DataFrame.
sourcepub fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<DataFrame>>
pub fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<DataFrame>>
Return a DataFrame with the explanation of its plan so far.
if analyze
is specified, runs the plan and reports metrics
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
sourcepub fn registry(&self) -> Arc<dyn FunctionRegistry>
pub fn registry(&self) -> Arc<dyn FunctionRegistry>
Return a FunctionRegistry
used to plan udf’s calls
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let f = df.registry();
// use f.udf("name", vec![...]) to use the udf
sourcepub async fn write_parquet(
&self,
path: &str,
writer_properties: Option<WriterProperties>
) -> Result<()>
pub async fn write_parquet(
&self,
path: &str,
writer_properties: Option<WriterProperties>
) -> Result<()>
Write a DataFrame
to a Parquet file.
sourcepub async fn write_json(&self, path: impl AsRef<str>) -> Result<()>
pub async fn write_json(&self, path: impl AsRef<str>) -> Result<()>
Executes a query and writes the results to a partitioned JSON file.
sourcepub fn with_column(&self, name: &str, expr: Expr) -> Result<Arc<DataFrame>>
pub fn with_column(&self, name: &str, expr: Expr) -> Result<Arc<DataFrame>>
Add an additional column to the DataFrame.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column("ab_sum", col("a") + col("b"))?;
sourcepub fn with_column_renamed(
&self,
old_name: &str,
new_name: &str
) -> Result<Arc<DataFrame>>
pub fn with_column_renamed(
&self,
old_name: &str,
new_name: &str
) -> Result<Arc<DataFrame>>
Rename one column by applying a new projection. This is a no-op if the column to be renamed does not exist.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column_renamed("ab_sum", "total")?;