Struct spark_connect_rs::dataframe::DataFrame
source · pub struct DataFrame {
pub spark_session: SparkSession,
pub logical_plan: LogicalPlanBuilder,
}Expand description
DataFrame is composed of a spark_session connecot ting to a remote
Spark Connect enabled cluster, and a logical_plan which represents
the Plan to be submitted to the cluster when an action is called
Fields§
§spark_session: SparkSessionGlobal SparkSession connecting to the remote cluster
logical_plan: LogicalPlanBuilderLogical Plan representing the unresolved Relation which will be submitted to the remote cluster
Implementations§
source§impl DataFrame
impl DataFrame
sourcepub fn new(
spark_session: SparkSession,
logical_plan: LogicalPlanBuilder
) -> DataFrame
pub fn new( spark_session: SparkSession, logical_plan: LogicalPlanBuilder ) -> DataFrame
create default DataFrame based on a spark session and initial logical plan
sourcepub async fn cache(&mut self) -> DataFrame
pub async fn cache(&mut self) -> DataFrame
Persists the DataFrame with the default storage::StorageLevel::MemoryAndDiskDeser (MEMORY_AND_DISK_DESER).
sourcepub fn coalesce(&mut self, num_partitions: u32) -> DataFrame
pub fn coalesce(&mut self, num_partitions: u32) -> DataFrame
Returns a new DataFrame that has exactly num_partitions partitions.
sourcepub fn colRegex(self, col_name: &str) -> Column
pub fn colRegex(self, col_name: &str) -> Column
Selects column based on the column name specified as a regex and returns it as Column.
sourcepub async fn collect(&mut self) -> Result<RecordBatch, SparkError>
pub async fn collect(&mut self) -> Result<RecordBatch, SparkError>
sourcepub async fn corr(&mut self, col1: &str, col2: &str) -> Option<f64>
pub async fn corr(&mut self, col1: &str, col2: &str) -> Option<f64>
Calculates the correlation of two columns of a DataFrame as a f64.
Currently only supports the Pearson Correlation Coefficient.
sourcepub async fn cov(&mut self, col1: &str, col2: &str) -> Option<f64>
pub async fn cov(&mut self, col1: &str, col2: &str) -> Option<f64>
Calculate the sample covariance for the given columns, specified by their names, as a f64
pub async fn createTempView(&mut self, name: &str)
pub async fn createGlobalTempView(&mut self, name: &str)
pub async fn createOrReplaceGlobalTempView(&mut self, name: &str)
pub async fn createOrReplaceTempView(&mut self, name: &str)
sourcepub fn crossJoin(&mut self, other: DataFrame) -> DataFrame
pub fn crossJoin(&mut self, other: DataFrame) -> DataFrame
Returns the cartesian product with another DataFrame.
sourcepub fn crosstab(&mut self, col1: &str, col2: &str) -> DataFrame
pub fn crosstab(&mut self, col1: &str, col2: &str) -> DataFrame
Computes a pair-wise frequency table of the given columns. Also known as a contingency table.
pub fn describe(&mut self, cols: Option<Vec<&str>>) -> DataFrame
sourcepub fn drop<T: ToVecExpr>(&mut self, cols: T) -> DataFrame
pub fn drop<T: ToVecExpr>(&mut self, cols: T) -> DataFrame
Returns a new DataFrame without the specified columns
sourcepub fn drop_duplicates(&mut self, cols: Option<Vec<&str>>) -> DataFrame
pub fn drop_duplicates(&mut self, cols: Option<Vec<&str>>) -> DataFrame
Return a new DataFrame with duplicate rows removed,
optionally only considering certain columns from a Vec<String>
If no columns are supplied then it all columns are used
Alias for dropDuplciates
pub fn dropDuplicates(&mut self, cols: Option<Vec<&str>>) -> DataFrame
pub fn dropna( &mut self, how: &str, threshold: Option<i32>, subset: Option<Vec<&str>> ) -> DataFrame
sourcepub async fn dtypes(&mut self) -> Vec<(String, Option<Kind>)>
pub async fn dtypes(&mut self) -> Vec<(String, Option<Kind>)>
Returns all column names and their data types as a Vec containing the field name as a String and the spark::data_type::Kind enum
pub fn exceptAll(&mut self, other: DataFrame) -> DataFrame
sourcepub async fn explain(
&mut self,
mode: Option<ExplainMode>
) -> Result<String, SparkError>
pub async fn explain( &mut self, mode: Option<ExplainMode> ) -> Result<String, SparkError>
Prints the spark::Plan to the console
§Arguments:
mode: ExplainMode Defaults tounspecifiedsimpleextendedcodegencostformattedunspecified
sourcepub fn filter<T: ToFilterExpr>(&mut self, condition: T) -> DataFrame
pub fn filter<T: ToFilterExpr>(&mut self, condition: T) -> DataFrame
Filters rows using a given conditions and returns a new DataFrame
§Example:
async {
df.filter("salary > 4000").collect().await?;
}Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
let mut df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await?;
df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
.await?;
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}pub async fn first(&mut self) -> RecordBatch
pub fn freqItems<'a, I>(&mut self, cols: I, support: Option<f64>) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub async fn head(&mut self, n: Option<i32>) -> RecordBatch
pub fn hint<T: ToVecExpr>( &mut self, name: &str, parameters: Option<T> ) -> DataFrame
pub async fn inputFiles(&mut self) -> Vec<String>
pub fn intersect(&mut self, other: DataFrame) -> DataFrame
pub fn intersectAll(&mut self, other: DataFrame) -> DataFrame
pub async fn isEmpty(&mut self) -> bool
sourcepub fn join<T: ToExpr>(
&mut self,
other: DataFrame,
on: Option<T>,
how: JoinType
) -> DataFrame
pub fn join<T: ToExpr>( &mut self, other: DataFrame, on: Option<T>, how: JoinType ) -> DataFrame
Joins with another DataFrame, using the given join expression.
§Example:
use spark_connect_rs::functionas::*
use spark_connect_rs::dataframe::JoinType;
async {
// join two dataframes where `id` == `name`
let condition = Some(col("id").eq("name"));
let df = df.join(df2, condition, JoinType::Inner);
}sourcepub fn offset(&mut self, num: i32) -> DataFrame
pub fn offset(&mut self, num: i32) -> DataFrame
Returns a new DataFrame by skiping the first n rows
pub fn orderBy<I>(&mut self, cols: I) -> DataFramewhere
I: IntoIterator<Item = Column>,
pub async fn persist(&mut self, storage_level: StorageLevel) -> DataFrame
pub async fn printSchema(&mut self, level: Option<i32>) -> String
pub async fn sameSemantics(&mut self, other: DataFrame) -> bool
sourcepub fn sample(
&mut self,
lower_bound: f64,
upper_bound: f64,
with_replacement: Option<bool>,
seed: Option<i64>
) -> DataFrame
pub fn sample( &mut self, lower_bound: f64, upper_bound: f64, with_replacement: Option<bool>, seed: Option<i64> ) -> DataFrame
Returns a sampled subset of this DataFrame
sourcepub async fn schema(&mut self) -> Option<DataType>
pub async fn schema(&mut self) -> Option<DataType>
Returns the schema of this DataFrame as a spark::DataType which contains the schema of a DataFrame
sourcepub fn select<T: ToVecExpr>(&mut self, cols: T) -> DataFrame
pub fn select<T: ToVecExpr>(&mut self, cols: T) -> DataFrame
Projects a set of expressions and returns a new DataFrame
§Arguments:
cols- An object that implements [ToVecExpr]
§Example:
async {
df.select(vec![col("age"), col("name")]).collect().await?;
}Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
let mut df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await?;
df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
.await?;
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}More examples
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path);
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}sourcepub fn selectExpr<'a, I>(&mut self, cols: I) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub fn selectExpr<'a, I>(&mut self, cols: I) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub async fn semanticHash(&mut self) -> i32
sourcepub async fn show(
&mut self,
num_rows: Option<i32>,
truncate: Option<i32>,
vertical: Option<bool>
) -> Result<(), SparkError>
pub async fn show( &mut self, num_rows: Option<i32>, truncate: Option<i32>, vertical: Option<bool> ) -> Result<(), SparkError>
Prints the first n rows to the console
§Arguments:
num_row: (int, optional) number of rows to show (default 10)truncate: (int, optional) If set to 0, it truncates the string. Any other number will not truncate the stringsvertical: (bool, optional) If set to true, prints output rows vertically (one line per column value).
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
let mut df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await?;
df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
.await?;
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}More examples
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path);
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths);
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await
.unwrap();
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await
.unwrap();
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}sourcepub fn sort(&mut self, cols: Vec<Column>) -> DataFrame
pub fn sort(&mut self, cols: Vec<Column>) -> DataFrame
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path);
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}pub fn sparkSession(self) -> SparkSession
pub async fn storageLevel(&mut self) -> StorageLevel
pub fn subtract(&mut self, other: DataFrame) -> DataFrame
sourcepub async fn tail(&mut self, limit: i32) -> RecordBatch
pub async fn tail(&mut self, limit: i32) -> RecordBatch
Returns the last n rows as a RecordBatch
Running tail requires moving the data and results in an action
pub async fn take(&mut self, n: i32) -> RecordBatch
pub fn toDF<'a, I>(&mut self, cols: I) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub fn union(&mut self, other: DataFrame) -> DataFrame
pub fn unionAll(&mut self, other: DataFrame) -> DataFrame
pub fn unionByName( &mut self, other: DataFrame, allow_missing_columns: Option<bool> ) -> DataFrame
pub async fn unpersist(&mut self, blocking: Option<bool>) -> DataFrame
pub fn withColumn(&mut self, colName: &str, col: Column) -> DataFrame
pub fn withColumns<I, K>(&mut self, colMap: I) -> DataFrame
sourcepub fn withColumnsRenamed<I, K, V>(&mut self, cols: I) -> DataFrame
pub fn withColumnsRenamed<I, K, V>(&mut self, cols: I) -> DataFrame
Returns a new DataFrame by renaming multiple columns from a
an iterator of containing a key/value pair with the key as the existing
column name and the value as the new column name.
sourcepub fn write(self) -> DataFrameWriter
pub fn write(self) -> DataFrameWriter
Returns a DataFrameWriter struct based on the current DataFrame
Examples found in repository?
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}More examples
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths);
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await
.unwrap();
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await
.unwrap();
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}Trait Implementations§
Auto Trait Implementations§
impl Freeze for DataFrame
impl !RefUnwindSafe for DataFrame
impl Send for DataFrame
impl Sync for DataFrame
impl Unpin for DataFrame
impl !UnwindSafe for DataFrame
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request