Struct datafusion::dataframe::DataFrame
source · [−]pub struct DataFrame { /* private fields */ }
Expand description
DataFrame represents a logical set of rows with the same named columns. Similar to a Pandas DataFrame or Spark DataFrame
DataFrames are typically created by the read_csv
and read_parquet
methods on the
SessionContext and can then be modified
by calling the transformation methods, such as filter
, select
, aggregate
, and limit
to build up a query definition.
The query can be executed by calling the collect
method.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
let results = df.collect();
Implementations
sourceimpl DataFrame
impl DataFrame
sourcepub fn new(session_state: Arc<RwLock<SessionState>>, plan: &LogicalPlan) -> Self
pub fn new(session_state: Arc<RwLock<SessionState>>, plan: &LogicalPlan) -> Self
Create a new Table based on an existing logical plan
sourcepub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>>
pub async fn create_physical_plan(&self) -> Result<Arc<dyn ExecutionPlan>>
Create a physical plan
sourcepub fn select_columns(&self, columns: &[&str]) -> Result<Arc<DataFrame>>
pub fn select_columns(&self, columns: &[&str]) -> Result<Arc<DataFrame>>
Filter the DataFrame by column. Returns a new DataFrame only containing the specified columns.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.select_columns(&["a", "b"])?;
sourcepub fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<DataFrame>>
pub fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<DataFrame>>
Create a projection based on arbitrary expressions.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.select(vec![col("a") * col("b"), col("c")])?;
sourcepub fn filter(&self, predicate: Expr) -> Result<Arc<DataFrame>>
pub fn filter(&self, predicate: Expr) -> Result<Arc<DataFrame>>
Filter a DataFrame to only include rows that match the specified filter expression.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?;
sourcepub fn aggregate(
&self,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>
) -> Result<Arc<DataFrame>>
pub fn aggregate(
&self,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>
) -> Result<Arc<DataFrame>>
Perform an aggregate query with optional grouping expressions.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
// The following use is the equivalent of "SELECT MIN(b) GROUP BY a"
let _ = df.aggregate(vec![col("a")], vec![min(col("b"))])?;
// The following use is the equivalent of "SELECT MIN(b)"
let _ = df.aggregate(vec![], vec![min(col("b"))])?;
sourcepub fn limit(&self, skip: usize, fetch: Option<usize>) -> Result<Arc<DataFrame>>
pub fn limit(&self, skip: usize, fetch: Option<usize>) -> Result<Arc<DataFrame>>
Limit the number of rows returned from this DataFrame.
skip
- Number of rows to skip before fetch any row
fetch
- Maximum number of rows to fetch, after skipping skip
rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.limit(0, Some(100))?;
sourcepub fn distinct(&self) -> Result<Arc<DataFrame>>
pub fn distinct(&self) -> Result<Arc<DataFrame>>
Filter out duplicate rows
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.distinct()?;
sourcepub fn sort(&self, expr: Vec<Expr>) -> Result<Arc<DataFrame>>
pub fn sort(&self, expr: Vec<Expr>) -> Result<Arc<DataFrame>>
Sort the DataFrame by the specified sorting expressions. Any expression can be turned into a sort expression by calling its sort method.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?;
sourcepub fn join(
&self,
right: Arc<DataFrame>,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>
) -> Result<Arc<DataFrame>>
pub fn join(
&self,
right: Arc<DataFrame>,
join_type: JoinType,
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>
) -> Result<Arc<DataFrame>>
Join this DataFrame with another DataFrame using the specified columns as join keys.
Filter expression expected to contain non-equality predicates that can not be pushed down to any of join inputs. In case of outer join, filter applied to only matched rows.
let ctx = SessionContext::new();
let left = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let right = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?
.select(vec![
col("a").alias("a2"),
col("b").alias("b2"),
col("c").alias("c2")])?;
let join = left.join(right, JoinType::Inner, &["a", "b"], &["a2", "b2"], None)?;
let batches = join.collect().await?;
sourcepub fn repartition(
&self,
partitioning_scheme: Partitioning
) -> Result<Arc<DataFrame>>
pub fn repartition(
&self,
partitioning_scheme: Partitioning
) -> Result<Arc<DataFrame>>
Repartition a DataFrame based on a logical partitioning scheme.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
sourcepub async fn collect(&self) -> Result<Vec<RecordBatch>>
pub async fn collect(&self) -> Result<Vec<RecordBatch>>
Convert the logical plan represented by this DataFrame into a physical plan and execute it, collecting all resulting batches into memory Executes this DataFrame and collects all results into a vector of RecordBatch.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect().await?;
sourcepub async fn show(&self) -> Result<()>
pub async fn show(&self) -> Result<()>
Print results.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
df.show().await?;
sourcepub async fn show_limit(&self, num: usize) -> Result<()>
pub async fn show_limit(&self, num: usize) -> Result<()>
Print results and limit rows.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
df.show_limit(10).await?;
sourcepub async fn execute_stream(&self) -> Result<SendableRecordBatchStream>
pub async fn execute_stream(&self) -> Result<SendableRecordBatchStream>
Executes this DataFrame and returns a stream over a single partition
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let stream = df.execute_stream().await?;
sourcepub async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>
pub async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>>
Executes this DataFrame and collects all results into a vector of vector of RecordBatch maintaining the input partitioning.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let batches = df.collect_partitioned().await?;
sourcepub async fn execute_stream_partitioned(
&self
) -> Result<Vec<SendableRecordBatchStream>>
pub async fn execute_stream_partitioned(
&self
) -> Result<Vec<SendableRecordBatchStream>>
Executes this DataFrame and returns one stream per partition.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let batches = df.execute_stream_partitioned().await?;
sourcepub fn schema(&self) -> &DFSchema
pub fn schema(&self) -> &DFSchema
Returns the schema describing the output of this DataFrame in terms of columns returned, where each column has a name, data type, and nullability attribute.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let schema = df.schema();
sourcepub fn to_unoptimized_plan(&self) -> LogicalPlan
pub fn to_unoptimized_plan(&self) -> LogicalPlan
Return the unoptimized logical plan represented by this DataFrame.
sourcepub fn to_logical_plan(&self) -> Result<LogicalPlan>
pub fn to_logical_plan(&self) -> Result<LogicalPlan>
Return the optimized logical plan represented by this DataFrame.
sourcepub fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<DataFrame>>
pub fn explain(&self, verbose: bool, analyze: bool) -> Result<Arc<DataFrame>>
Return a DataFrame with the explanation of its plan so far.
if analyze
is specified, runs the plan and reports metrics
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let batches = df.limit(0, Some(100))?.explain(false, false)?.collect().await?;
sourcepub fn registry(&self) -> Arc<dyn FunctionRegistry>
pub fn registry(&self) -> Arc<dyn FunctionRegistry>
Return a FunctionRegistry
used to plan udf’s calls
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let f = df.registry();
// use f.udf("name", vec![...]) to use the udf
sourcepub async fn write_parquet(
&self,
path: &str,
writer_properties: Option<WriterProperties>
) -> Result<()>
pub async fn write_parquet(
&self,
path: &str,
writer_properties: Option<WriterProperties>
) -> Result<()>
Write a DataFrame
to a Parquet file.
sourcepub async fn write_json(&self, path: impl AsRef<str>) -> Result<()>
pub async fn write_json(&self, path: impl AsRef<str>) -> Result<()>
Executes a query and writes the results to a partitioned JSON file.
sourcepub fn with_column(&self, name: &str, expr: Expr) -> Result<Arc<DataFrame>>
pub fn with_column(&self, name: &str, expr: Expr) -> Result<Arc<DataFrame>>
Add an additional column to the DataFrame.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column("ab_sum", col("a") + col("b"))?;
sourcepub fn with_column_renamed(
&self,
old_name: &str,
new_name: &str
) -> Result<Arc<DataFrame>>
pub fn with_column_renamed(
&self,
old_name: &str,
new_name: &str
) -> Result<Arc<DataFrame>>
Rename one column by applying a new projection. This is a no-op if the column to be renamed does not exist.
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
let df = df.with_column_renamed("ab_sum", "total")?;
Trait Implementations
sourceimpl TableProvider for DataFrame
impl TableProvider for DataFrame
sourcefn table_type(&self) -> TableType
fn table_type(&self) -> TableType
Get the type of this table for metadata/catalog purposes.
sourcefn scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_ctx: &'life1 SessionState,
projection: &'life2 Option<Vec<usize>>,
filters: &'life3 [Expr],
limit: Option<usize>
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
fn scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_ctx: &'life1 SessionState,
projection: &'life2 Option<Vec<usize>>,
filters: &'life3 [Expr],
limit: Option<usize>
) -> Pin<Box<dyn Future<Output = Result<Arc<dyn ExecutionPlan>>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Self: 'async_trait,
Create an ExecutionPlan that will scan the table. The table provider will be usually responsible of grouping the source data into partitions that can be efficiently parallelized or distributed. Read more
sourcefn get_table_definition(&self) -> Option<&str>
fn get_table_definition(&self) -> Option<&str>
Get the create statement used to create this table, if available.
sourcefn supports_filter_pushdown(
&self,
_filter: &Expr
) -> Result<TableProviderFilterPushDown>
fn supports_filter_pushdown(
&self,
_filter: &Expr
) -> Result<TableProviderFilterPushDown>
Tests whether the table provider can make use of a filter expression to optimise data retrieval. Read more
Auto Trait Implementations
impl !RefUnwindSafe for DataFrame
impl Send for DataFrame
impl Sync for DataFrame
impl Unpin for DataFrame
impl !UnwindSafe for DataFrame
Blanket Implementations
sourceimpl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more