Struct spark_connect_rs::DataFrame
source · pub struct DataFrame { /* private fields */ }
Expand description
The spark-connect-rs crate is currently just a meta-package shim for spark-connect-core DataFrame is composed of a SparkSession referencing a Spark Connect enabled cluster, and a LogicalPlanBuilder which represents the unresolved spark::Plan to be submitted to the cluster when an action is called.
The LogicalPlanBuilder is a series of unresolved logical plans, and every additional transformation takes the prior spark::Plan and builds onto it. The final unresolved logical plan is submitted to the spark connect server.
§createDataFrame & range
A DataFrame
can be created with an arrow::array::RecordBatch, or with spark.range(...)
let name: ArrayRef = Arc::new(StringArray::from(vec!["Tom", "Alice", "Bob"]));
let age: ArrayRef = Arc::new(Int64Array::from(vec![14, 23, 16]));
let data = RecordBatch::try_from_iter(vec![("name", name), ("age", age)])?
let df = spark.createDataFrame(&data).await?
§sql
A DataFrame
is created from a spark.sql()
statement
let df = spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`").await?;
§read & readStream
A DataFrame
is also created from a spark.read()
and spark.readStream()
statement.
let df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths)?;
Implementations§
source§impl DataFrame
impl DataFrame
sourcepub fn new(spark_session: SparkSession, plan: LogicalPlanBuilder) -> DataFrame
pub fn new(spark_session: SparkSession, plan: LogicalPlanBuilder) -> DataFrame
create default DataFrame based on a spark session and initial logical plan
sourcepub fn agg<T>(self, exprs: T) -> DataFramewhere
T: ToVecExpr,
pub fn agg<T>(self, exprs: T) -> DataFramewhere
T: ToVecExpr,
Aggregate on the entire DataFrame without groups (shorthand for df.groupBy().agg()
)
sourcepub async fn cache(self) -> DataFrame
pub async fn cache(self) -> DataFrame
Persists the DataFrame with the default storage::StorageLevel::MemoryAndDiskDeser (MEMORY_AND_DISK_DESER).
sourcepub fn coalesce(self, num_partitions: u32) -> DataFrame
pub fn coalesce(self, num_partitions: u32) -> DataFrame
Returns a new DataFrame that has exactly num_partitions
partitions.
sourcepub async fn count(self) -> Result<i64, SparkError>
pub async fn count(self) -> Result<i64, SparkError>
Returns the number of rows in this DataFrame
sourcepub fn colRegex(self, col_name: &str) -> Column
pub fn colRegex(self, col_name: &str) -> Column
Selects column based on the column name specified as a regex and returns it as Column.
sourcepub async fn collect(self) -> Result<RecordBatch, SparkError>
pub async fn collect(self) -> Result<RecordBatch, SparkError>
sourcepub async fn corr(self, col1: &str, col2: &str) -> Result<f64, SparkError>
pub async fn corr(self, col1: &str, col2: &str) -> Result<f64, SparkError>
Calculates the correlation of two columns of a DataFrame as a f64
.
Currently only supports the Pearson Correlation Coefficient.
sourcepub async fn cov(self, col1: &str, col2: &str) -> Result<f64, SparkError>
pub async fn cov(self, col1: &str, col2: &str) -> Result<f64, SparkError>
Calculate the sample covariance for the given columns, specified by their names, as a f64
sourcepub async fn createTempView(self, name: &str) -> Result<(), SparkError>
pub async fn createTempView(self, name: &str) -> Result<(), SparkError>
Creates a local temporary view with this DataFrame.
pub async fn createGlobalTempView(self, name: &str) -> Result<(), SparkError>
pub async fn createOrReplaceGlobalTempView( self, name: &str ) -> Result<(), SparkError>
sourcepub async fn createOrReplaceTempView(self, name: &str) -> Result<(), SparkError>
pub async fn createOrReplaceTempView(self, name: &str) -> Result<(), SparkError>
Creates or replaces a local temporary view with this DataFrame
sourcepub fn crossJoin(self, other: DataFrame) -> DataFrame
pub fn crossJoin(self, other: DataFrame) -> DataFrame
Returns the cartesian product with another DataFrame.
sourcepub fn crosstab(self, col1: &str, col2: &str) -> DataFrame
pub fn crosstab(self, col1: &str, col2: &str) -> DataFrame
Computes a pair-wise frequency table of the given columns. Also known as a contingency table.
sourcepub fn cube<T>(self, cols: T) -> GroupedDatawhere
T: ToVecExpr,
pub fn cube<T>(self, cols: T) -> GroupedDatawhere
T: ToVecExpr,
Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregations on them.
pub fn describe<'a, I>(self, cols: Option<I>) -> DataFrame
sourcepub fn drop<T>(self, cols: T) -> DataFramewhere
T: ToVecExpr,
pub fn drop<T>(self, cols: T) -> DataFramewhere
T: ToVecExpr,
Returns a new DataFrame without the specified columns
sourcepub fn drop_duplicates(self, cols: Option<Vec<&str>>) -> DataFrame
pub fn drop_duplicates(self, cols: Option<Vec<&str>>) -> DataFrame
Return a new DataFrame with duplicate rows removed,
optionally only considering certain columns from a Vec<String>
If no columns are supplied then it all columns are used
Alias for dropDuplciates
sourcepub fn dropDuplicates(self, cols: Option<Vec<&str>>) -> DataFrame
pub fn dropDuplicates(self, cols: Option<Vec<&str>>) -> DataFrame
Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
sourcepub fn dropna(
self,
how: &str,
threshold: Option<i32>,
subset: Option<Vec<&str>>
) -> DataFrame
pub fn dropna( self, how: &str, threshold: Option<i32>, subset: Option<Vec<&str>> ) -> DataFrame
Returns a new DataFrame omitting rows with null values.
sourcepub async fn dtypes(self) -> Result<Vec<(String, Kind)>, SparkError>
pub async fn dtypes(self) -> Result<Vec<(String, Kind)>, SparkError>
Returns all column names and their data types as a Vec containing the field name as a String and the spark::data_type::Kind enum
sourcepub fn exceptAll(self, other: DataFrame) -> DataFrame
pub fn exceptAll(self, other: DataFrame) -> DataFrame
Return a new DataFrame containing rows in this DataFrame but not in another DataFrame while preserving duplicates.
sourcepub async fn explain(
self,
mode: Option<ExplainMode>
) -> Result<String, SparkError>
pub async fn explain( self, mode: Option<ExplainMode> ) -> Result<String, SparkError>
Prints the spark::Plan to the console
§Arguments:
mode
: ExplainMode Defaults tounspecified
simple
extended
codegen
cost
formatted
unspecified
sourcepub fn filter<T>(self, condition: T) -> DataFramewhere
T: ToFilterExpr,
pub fn filter<T>(self, condition: T) -> DataFramewhere
T: ToFilterExpr,
sourcepub async fn first(self) -> Result<RecordBatch, SparkError>
pub async fn first(self) -> Result<RecordBatch, SparkError>
Returns the first row as a RecordBatch.
sourcepub fn freqItems<'a, I>(self, cols: I, support: Option<f64>) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub fn freqItems<'a, I>(self, cols: I, support: Option<f64>) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
Finding frequent items for columns, possibly with false positives.
sourcepub fn groupBy<T>(self, cols: Option<T>) -> GroupedDatawhere
T: ToVecExpr,
pub fn groupBy<T>(self, cols: Option<T>) -> GroupedDatawhere
T: ToVecExpr,
Groups the DataFrame using the specified columns, and returns a GroupedData object
sourcepub async fn head(self, n: Option<i32>) -> Result<RecordBatch, SparkError>
pub async fn head(self, n: Option<i32>) -> Result<RecordBatch, SparkError>
Returns the first n rows.
sourcepub fn hint<T>(self, name: &str, parameters: Option<T>) -> DataFramewhere
T: ToVecExpr,
pub fn hint<T>(self, name: &str, parameters: Option<T>) -> DataFramewhere
T: ToVecExpr,
Specifies some hint on the current DataFrame.
sourcepub async fn inputFiles(self) -> Result<Vec<String>, SparkError>
pub async fn inputFiles(self) -> Result<Vec<String>, SparkError>
Returns a best-effort snapshot of the files that compose this DataFrame
sourcepub fn intersect(self, other: DataFrame) -> DataFrame
pub fn intersect(self, other: DataFrame) -> DataFrame
Return a new DataFrame containing rows only in both this DataFrame and another DataFrame.
pub fn intersectAll(self, other: DataFrame) -> DataFrame
sourcepub async fn isEmpty(self) -> Result<bool, SparkError>
pub async fn isEmpty(self) -> Result<bool, SparkError>
Checks if the DataFrame is empty and returns a boolean value.
sourcepub async fn isStreaming(self) -> Result<bool, SparkError>
pub async fn isStreaming(self) -> Result<bool, SparkError>
Returns True if this DataFrame contains one or more sources that continuously return data as it arrives.
sourcepub fn join<T>(
self,
other: DataFrame,
on: Option<T>,
how: JoinType
) -> DataFramewhere
T: ToExpr,
pub fn join<T>(
self,
other: DataFrame,
on: Option<T>,
how: JoinType
) -> DataFramewhere
T: ToExpr,
Joins with another DataFrame, using the given join expression.
§Example:
use spark_connect_rs::functions::col;
use spark_connect_rs::dataframe::JoinType;
async {
// join two dataframes where `id` == `name`
let condition = Some(col("id").eq(col("name")));
let df = df.join(df2, condition, JoinType::Inner);
}
sourcepub fn melt<I, K>(
self,
ids: I,
values: Option<K>,
variable_column_name: &str,
value_column_name: &str
) -> DataFrame
pub fn melt<I, K>( self, ids: I, values: Option<K>, variable_column_name: &str, value_column_name: &str ) -> DataFrame
Alias for DataFrame::unpivot
pub fn orderBy<I>(self, cols: I) -> DataFramewhere
I: IntoIterator<Item = Column>,
pub async fn persist(self, storage_level: StorageLevel) -> DataFrame
sourcepub async fn printSchema(self, level: Option<i32>) -> Result<String, SparkError>
pub async fn printSchema(self, level: Option<i32>) -> Result<String, SparkError>
Prints out the schema in the tree format to a specific level number.
sourcepub fn rollup<T>(self, cols: T) -> GroupedDatawhere
T: ToVecExpr,
pub fn rollup<T>(self, cols: T) -> GroupedDatawhere
T: ToVecExpr,
Create a multi-dimensional rollup for the current DataFrame using the specified columns, and returns a GroupedData object
sourcepub async fn sameSemantics(self, other: DataFrame) -> Result<bool, SparkError>
pub async fn sameSemantics(self, other: DataFrame) -> Result<bool, SparkError>
Returns True when the logical query plans inside both DataFrames are equal and therefore return the same results.
sourcepub fn sample(
self,
lower_bound: f64,
upper_bound: f64,
with_replacement: Option<bool>,
seed: Option<i64>
) -> DataFrame
pub fn sample( self, lower_bound: f64, upper_bound: f64, with_replacement: Option<bool>, seed: Option<i64> ) -> DataFrame
Returns a sampled subset of this DataFrame
sourcepub async fn schema(self) -> Result<DataType, SparkError>
pub async fn schema(self) -> Result<DataType, SparkError>
Returns the schema of this DataFrame as a spark::DataType which contains the schema of a DataFrame
sourcepub fn select<T>(self, cols: T) -> DataFramewhere
T: ToVecExpr,
pub fn select<T>(self, cols: T) -> DataFramewhere
T: ToVecExpr,
Projects a set of expressions and returns a new DataFrame
§Arguments:
cols
- An object that implements ToVecExpr
§Example:
async {
df.select(vec![col("age"), col("name")]).collect().await?;
}
Examples found in repository?
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path)?;
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}
More examples
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path])?;
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}
sourcepub fn selectExpr<'a, I>(self, cols: I) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub fn selectExpr<'a, I>(self, cols: I) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub async fn semanticHash(self) -> Result<i32, SparkError>
sourcepub async fn show(
self,
num_rows: Option<i32>,
truncate: Option<i32>,
vertical: Option<bool>
) -> Result<(), SparkError>
pub async fn show( self, num_rows: Option<i32>, truncate: Option<i32>, vertical: Option<bool> ) -> Result<(), SparkError>
Prints the first n
rows to the console
§Arguments:
num_row
: (int, optional) number of rows to show (default 10)truncate
: (int, optional) If set to 0, it truncates the string. Any other number will not truncate the stringsvertical
: (bool, optional) If set to true, prints output rows vertically (one line per column value).
Examples found in repository?
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.sql("select 'apple' as word, 123 as count")
.await?;
df.write()
.mode(SaveMode::Overwrite)
.format("parquet")
.save("file:///tmp/spark-connect-write-example-output.parquet")
.await?;
let df = spark
.read()
.format("parquet")
.load(["file:///tmp/spark-connect-write-example-output.parquet"])?;
df.show(Some(100), None, None).await?;
// +---------------+
// | show_string |
// +---------------+
// | +-----+-----+ |
// | |word |count| |
// | +-----+-----+ |
// | |apple|123 | |
// | +-----+-----+ |
// | |
// +---------------+
Ok(())
}
More examples
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path)?;
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path])?;
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths)?;
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await?;
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await?;
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}
sourcepub fn sort<I>(self, cols: I) -> DataFramewhere
I: IntoIterator<Item = Column>,
pub fn sort<I>(self, cols: I) -> DataFramewhere
I: IntoIterator<Item = Column>,
Examples found in repository?
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path)?;
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}
pub fn sparkSession(self) -> Box<SparkSession>
pub async fn storageLevel(self) -> Result<StorageLevel, SparkError>
pub fn subtract(self, other: DataFrame) -> DataFrame
sourcepub async fn tail(self, limit: i32) -> Result<RecordBatch, SparkError>
pub async fn tail(self, limit: i32) -> Result<RecordBatch, SparkError>
Returns the last n
rows as a RecordBatch
Running tail requires moving the data and results in an action
pub async fn take(self, n: i32) -> Result<RecordBatch, SparkError>
pub fn toDF<'a, I>(self, cols: I) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub fn union(self, other: DataFrame) -> DataFrame
pub fn unionAll(self, other: DataFrame) -> DataFrame
pub fn unionByName( self, other: DataFrame, allow_missing_columns: Option<bool> ) -> DataFrame
pub async fn unpersist(self, blocking: Option<bool>) -> DataFrame
sourcepub fn unpivot<I, K>(
self,
ids: I,
values: Option<K>,
variable_column_name: &str,
value_column_name: &str
) -> DataFrame
pub fn unpivot<I, K>( self, ids: I, values: Option<K>, variable_column_name: &str, value_column_name: &str ) -> DataFrame
Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns set. This is the reverse to groupBy(…).pivot(…).agg(…), except for the aggregation, which cannot be reversed.
pub fn withColumn(self, colName: &str, col: Column) -> DataFrame
pub fn withColumns<I, K>(self, colMap: I) -> DataFrame
sourcepub fn withColumnsRenamed<I, K, V>(self, cols: I) -> DataFrame
pub fn withColumnsRenamed<I, K, V>(self, cols: I) -> DataFrame
Returns a new DataFrame by renaming multiple columns from a
an iterator of containing a key/value pair with the key as the existing
column name and the value as the new
column name.
sourcepub fn write(self) -> DataFrameWriter
pub fn write(self) -> DataFrameWriter
Returns a DataFrameWriter struct based on the current DataFrame
Examples found in repository?
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.sql("select 'apple' as word, 123 as count")
.await?;
df.write()
.mode(SaveMode::Overwrite)
.format("parquet")
.save("file:///tmp/spark-connect-write-example-output.parquet")
.await?;
let df = spark
.read()
.format("parquet")
.load(["file:///tmp/spark-connect-write-example-output.parquet"])?;
df.show(Some(100), None, None).await?;
// +---------------+
// | show_string |
// +---------------+
// | +-----+-----+ |
// | |word |count| |
// | +-----+-----+ |
// | |apple|123 | |
// | +-----+-----+ |
// | |
// +---------------+
Ok(())
}
More examples
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path])?;
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths)?;
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await?;
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await?;
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}
sourcepub fn writeStream(self) -> DataStreamWriter
pub fn writeStream(self) -> DataStreamWriter
Interface for DataStreamWriter to save the content of the streaming DataFrame out into external storage.
Examples found in repository?
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;
let df = spark
.readStream()
.format("rate")
.option("rowsPerSecond", "5")
.load(None)?;
let query = df
.writeStream()
.format("console")
.queryName("example_stream")
.outputMode(OutputMode::Append)
.trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
.start(None)
.await?;
// loop to get multiple progression stats
for _ in 1..5 {
thread::sleep(time::Duration::from_secs(5));
let val = &query.clone().lastProgress().await?;
println!("{}", val);
}
// stop the active stream
query.stop().await?;
Ok(())
}
Trait Implementations§
Auto Trait Implementations§
impl Freeze for DataFrame
impl !RefUnwindSafe for DataFrame
impl Send for DataFrame
impl Sync for DataFrame
impl Unpin for DataFrame
impl !UnwindSafe for DataFrame
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request