Struct spark_connect_rs::DataFrame

source ·
pub struct DataFrame { /* private fields */ }
Expand description

The spark-connect-rs crate is currently just a meta-package shim for spark-connect-core DataFrame is composed of a SparkSession referencing a Spark Connect enabled cluster, and a LogicalPlanBuilder which represents the unresolved spark::Plan to be submitted to the cluster when an action is called.

The LogicalPlanBuilder is a series of unresolved logical plans, and every additional transformation takes the prior spark::Plan and builds onto it. The final unresolved logical plan is submitted to the spark connect server.

§createDataFrame & range

A DataFrame can be created with an arrow::array::RecordBatch, or with spark.range(...)

let name: ArrayRef = Arc::new(StringArray::from(vec!["Tom", "Alice", "Bob"]));
let age: ArrayRef = Arc::new(Int64Array::from(vec![14, 23, 16]));

let data = RecordBatch::try_from_iter(vec![("name", name), ("age", age)])?

let df = spark.createDataFrame(&data).await?

§sql

A DataFrame is created from a spark.sql() statement

let df = spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`").await?;

§read & readStream

A DataFrame is also created from a spark.read() and spark.readStream() statement.

let df = spark
    .read()
    .format("csv")
    .option("header", "True")
    .option("delimiter", ";")
    .load(paths)?;

Implementations§

source§

impl DataFrame

source

pub fn new(spark_session: SparkSession, plan: LogicalPlanBuilder) -> DataFrame

create default DataFrame based on a spark session and initial logical plan

source

pub fn agg<T>(self, exprs: T) -> DataFrame
where T: ToVecExpr,

Aggregate on the entire DataFrame without groups (shorthand for df.groupBy().agg())

source

pub fn alias(self, alias: &str) -> DataFrame

Returns a new DataFrame with an alias set.

source

pub async fn cache(self) -> DataFrame

Persists the DataFrame with the default storage::StorageLevel::MemoryAndDiskDeser (MEMORY_AND_DISK_DESER).

source

pub fn coalesce(self, num_partitions: u32) -> DataFrame

Returns a new DataFrame that has exactly num_partitions partitions.

source

pub async fn count(self) -> Result<i64, SparkError>

Returns the number of rows in this DataFrame

source

pub fn colRegex(self, col_name: &str) -> Column

Selects column based on the column name specified as a regex and returns it as Column.

source

pub async fn collect(self) -> Result<RecordBatch, SparkError>

Returns all records as a RecordBatch

§Example:
async {
    df.collect().await?;
}
source

pub async fn columns(self) -> Result<Vec<String>, SparkError>

Retrieves the names of all columns in the DataFrame as a Vec<String>. The order of the column names in the list reflects their order in the DataFrame.

source

pub async fn corr(self, col1: &str, col2: &str) -> Result<f64, SparkError>

Calculates the correlation of two columns of a DataFrame as a f64. Currently only supports the Pearson Correlation Coefficient.

source

pub async fn cov(self, col1: &str, col2: &str) -> Result<f64, SparkError>

Calculate the sample covariance for the given columns, specified by their names, as a f64

source

pub async fn createTempView(self, name: &str) -> Result<(), SparkError>

Creates a local temporary view with this DataFrame.

source

pub async fn createGlobalTempView(self, name: &str) -> Result<(), SparkError>

source

pub async fn createOrReplaceGlobalTempView( self, name: &str ) -> Result<(), SparkError>

source

pub async fn createOrReplaceTempView(self, name: &str) -> Result<(), SparkError>

Creates or replaces a local temporary view with this DataFrame

source

pub fn crossJoin(self, other: DataFrame) -> DataFrame

Returns the cartesian product with another DataFrame.

source

pub fn crosstab(self, col1: &str, col2: &str) -> DataFrame

Computes a pair-wise frequency table of the given columns. Also known as a contingency table.

source

pub fn cube<T>(self, cols: T) -> GroupedData
where T: ToVecExpr,

Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregations on them.

source

pub fn describe<'a, I>(self, cols: Option<I>) -> DataFrame
where I: IntoIterator<Item = &'a str> + Default,

source

pub fn distinct(self) -> DataFrame

Returns a new DataFrame containing the distinct rows in this DataFrame.

source

pub fn drop<T>(self, cols: T) -> DataFrame
where T: ToVecExpr,

Returns a new DataFrame without the specified columns

source

pub fn drop_duplicates(self, cols: Option<Vec<&str>>) -> DataFrame

Return a new DataFrame with duplicate rows removed, optionally only considering certain columns from a Vec<String>

If no columns are supplied then it all columns are used

Alias for dropDuplciates

source

pub fn dropDuplicates(self, cols: Option<Vec<&str>>) -> DataFrame

Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.

source

pub fn dropna( self, how: &str, threshold: Option<i32>, subset: Option<Vec<&str>> ) -> DataFrame

Returns a new DataFrame omitting rows with null values.

source

pub async fn dtypes(self) -> Result<Vec<(String, Kind)>, SparkError>

Returns all column names and their data types as a Vec containing the field name as a String and the spark::data_type::Kind enum

source

pub fn exceptAll(self, other: DataFrame) -> DataFrame

Return a new DataFrame containing rows in this DataFrame but not in another DataFrame while preserving duplicates.

source

pub async fn explain( self, mode: Option<ExplainMode> ) -> Result<String, SparkError>

Prints the spark::Plan to the console

§Arguments:
  • mode: ExplainMode Defaults to unspecified
    • simple
    • extended
    • codegen
    • cost
    • formatted
    • unspecified
source

pub fn filter<T>(self, condition: T) -> DataFrame
where T: ToFilterExpr,

Filters rows using a given conditions and returns a new DataFrame

§Example:
async {
    df.filter("salary > 4000").collect().await?;
}
source

pub async fn first(self) -> Result<RecordBatch, SparkError>

Returns the first row as a RecordBatch.

source

pub fn freqItems<'a, I>(self, cols: I, support: Option<f64>) -> DataFrame
where I: IntoIterator<Item = &'a str>,

Finding frequent items for columns, possibly with false positives.

source

pub fn groupBy<T>(self, cols: Option<T>) -> GroupedData
where T: ToVecExpr,

Groups the DataFrame using the specified columns, and returns a GroupedData object

source

pub async fn head(self, n: Option<i32>) -> Result<RecordBatch, SparkError>

Returns the first n rows.

source

pub fn hint<T>(self, name: &str, parameters: Option<T>) -> DataFrame
where T: ToVecExpr,

Specifies some hint on the current DataFrame.

source

pub async fn inputFiles(self) -> Result<Vec<String>, SparkError>

Returns a best-effort snapshot of the files that compose this DataFrame

source

pub fn intersect(self, other: DataFrame) -> DataFrame

Return a new DataFrame containing rows only in both this DataFrame and another DataFrame.

source

pub fn intersectAll(self, other: DataFrame) -> DataFrame

source

pub async fn isEmpty(self) -> Result<bool, SparkError>

Checks if the DataFrame is empty and returns a boolean value.

source

pub async fn isStreaming(self) -> Result<bool, SparkError>

Returns True if this DataFrame contains one or more sources that continuously return data as it arrives.

source

pub fn join<T>( self, other: DataFrame, on: Option<T>, how: JoinType ) -> DataFrame
where T: ToExpr,

Joins with another DataFrame, using the given join expression.

§Example:
use spark_connect_rs::functions::col;
use spark_connect_rs::dataframe::JoinType;

async {
    // join two dataframes where `id` == `name`
    let condition = Some(col("id").eq(col("name")));
    let df = df.join(df2, condition, JoinType::Inner);
}
source

pub fn limit(self, limit: i32) -> DataFrame

Limits the result count o thte number specified and returns a new DataFrame

§Example:
async {
    df.limit(10).collect().await?;
}
source

pub fn melt<I, K>( self, ids: I, values: Option<K>, variable_column_name: &str, value_column_name: &str ) -> DataFrame
where I: ToVecExpr, K: ToVecExpr,

source

pub fn offset(self, num: i32) -> DataFrame

Returns a new DataFrame by skiping the first n rows

source

pub fn orderBy<I>(self, cols: I) -> DataFrame
where I: IntoIterator<Item = Column>,

source

pub async fn persist(self, storage_level: StorageLevel) -> DataFrame

source

pub async fn printSchema(self, level: Option<i32>) -> Result<String, SparkError>

Prints out the schema in the tree format to a specific level number.

source

pub fn repartition( self, num_partitions: u32, shuffle: Option<bool> ) -> DataFrame

Returns a new DataFrame partitioned by the given partition number and shuffle option

§Arguments
  • num_partitions: the target number of partitions
  • (optional) shuffle: to induce a shuffle. Default is false
source

pub fn rollup<T>(self, cols: T) -> GroupedData
where T: ToVecExpr,

Create a multi-dimensional rollup for the current DataFrame using the specified columns, and returns a GroupedData object

source

pub async fn sameSemantics(self, other: DataFrame) -> Result<bool, SparkError>

Returns True when the logical query plans inside both DataFrames are equal and therefore return the same results.

source

pub fn sample( self, lower_bound: f64, upper_bound: f64, with_replacement: Option<bool>, seed: Option<i64> ) -> DataFrame

Returns a sampled subset of this DataFrame

source

pub async fn schema(self) -> Result<DataType, SparkError>

Returns the schema of this DataFrame as a spark::DataType which contains the schema of a DataFrame

source

pub fn select<T>(self, cols: T) -> DataFrame
where T: ToVecExpr,

Projects a set of expressions and returns a new DataFrame

§Arguments:
§Example:
async {
    df.select(vec![col("age"), col("name")]).collect().await?;
}
Examples found in repository?
examples/reader.rs (lines 22-26)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let path = ["/opt/spark/examples/src/main/resources/people.csv"];

    let df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(path)?;

    df.select([
        F::col("name"),
        F::col("age").cast("int").alias("age_int"),
        (F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
    ])
    .sort(vec![F::col("name").desc()])
    .show(Some(5), None, None)
    .await?;

    // print results
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +-----+-------+--------+ |
    // | |name |age_int|addition| |
    // | +-----+-------+--------+ |
    // | |Jorge|30     |33.0    | |
    // | |Bob  |32     |35.0    | |
    // | +-----+-------+--------+ |
    // |                          |
    // +--------------------------+

    Ok(())
}
More examples
Hide additional examples
examples/writer.rs (line 21)
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
        .build()
        .await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .select(col("id").alias("range_id"));

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .mode(SaveMode::Overwrite)
        .option("header", "true")
        .save(path)
        .await?;

    let df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load([path])?;

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}
source

pub fn selectExpr<'a, I>(self, cols: I) -> DataFrame
where I: IntoIterator<Item = &'a str>,

Project a set of SQL expressions and returns a new DataFrame

This is a variant of select that accepts SQL Expressions

§Example:
async {
    df.selectExpr(vec!["id * 2", "abs(id)"]).collect().await?;
}
source

pub async fn semanticHash(self) -> Result<i32, SparkError>

source

pub async fn show( self, num_rows: Option<i32>, truncate: Option<i32>, vertical: Option<bool> ) -> Result<(), SparkError>

Prints the first n rows to the console

§Arguments:
  • num_row: (int, optional) number of rows to show (default 10)
  • truncate: (int, optional) If set to 0, it truncates the string. Any other number will not truncate the strings
  • vertical: (bool, optional) If set to true, prints output rows vertically (one line per column value).
Examples found in repository?
examples/sql.rs (line 29)
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
        .build()
        .await?;

    let df = spark
        .clone()
        .sql("select 'apple' as word, 123 as count")
        .await?;

    df.write()
        .mode(SaveMode::Overwrite)
        .format("parquet")
        .save("file:///tmp/spark-connect-write-example-output.parquet")
        .await?;

    let df = spark
        .read()
        .format("parquet")
        .load(["file:///tmp/spark-connect-write-example-output.parquet"])?;

    df.show(Some(100), None, None).await?;

    // +---------------+
    // | show_string   |
    // +---------------+
    // | +-----+-----+ |
    // | |word |count| |
    // | +-----+-----+ |
    // | |apple|123  | |
    // | +-----+-----+ |
    // |               |
    // +---------------+

    Ok(())
}
More examples
Hide additional examples
examples/reader.rs (line 28)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let path = ["/opt/spark/examples/src/main/resources/people.csv"];

    let df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(path)?;

    df.select([
        F::col("name"),
        F::col("age").cast("int").alias("age_int"),
        (F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
    ])
    .sort(vec![F::col("name").desc()])
    .show(Some(5), None, None)
    .await?;

    // print results
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +-----+-------+--------+ |
    // | |name |age_int|addition| |
    // | +-----+-------+--------+ |
    // | |Jorge|30     |33.0    | |
    // | |Bob  |32     |35.0    | |
    // | +-----+-------+--------+ |
    // |                          |
    // +--------------------------+

    Ok(())
}
examples/writer.rs (line 39)
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
        .build()
        .await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .select(col("id").alias("range_id"));

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .mode(SaveMode::Overwrite)
        .option("header", "true")
        .save(path)
        .await?;

    let df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load([path])?;

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}
examples/delta.rs (line 39)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
        .build()
        .await?;

    let paths = ["/opt/spark/examples/src/main/resources/people.csv"];

    let df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .option("inferSchema", "True")
        .load(paths)?;

    df.write()
        .format("delta")
        .mode(SaveMode::Overwrite)
        .saveAsTable("default.people_delta")
        .await?;

    spark
        .sql("DESCRIBE HISTORY default.people_delta")
        .await?
        .show(Some(1), None, Some(true))
        .await?;

    // print results
    // +-------------------------------------------------------------------------------------------------------+
    // | show_string                                                                                           |
    // +-------------------------------------------------------------------------------------------------------+
    // | -RECORD 0-------------------------------------------------------------------------------------------- |
    // |  version             | 3                                                                              |
    // |  timestamp           | 2024-03-16 13:46:23.552                                                        |
    // |  userId              | NULL                                                                           |
    // |  userName            | NULL                                                                           |
    // |  operation           | CREATE OR REPLACE TABLE AS SELECT                                              |
    // |  operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}}  |
    // |  job                 | NULL                                                                           |
    // |  notebook            | NULL                                                                           |
    // |  clusterId           | NULL                                                                           |
    // |  readVersion         | 2                                                                              |
    // |  isolationLevel      | Serializable                                                                   |
    // |  isBlindAppend       | false                                                                          |
    // |  operationMetrics    | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988}                     |
    // |  userMetadata        | NULL                                                                           |
    // |  engineInfo          | Apache-Spark/3.5.0 Delta-Lake/3.0.0                                            |
    // | only showing top 1 row                                                                                |
    // |                                                                                                       |
    // +-------------------------------------------------------------------------------------------------------+

    Ok(())
}
source

pub fn sort<I>(self, cols: I) -> DataFrame
where I: IntoIterator<Item = Column>,

Examples found in repository?
examples/reader.rs (line 27)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let path = ["/opt/spark/examples/src/main/resources/people.csv"];

    let df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(path)?;

    df.select([
        F::col("name"),
        F::col("age").cast("int").alias("age_int"),
        (F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
    ])
    .sort(vec![F::col("name").desc()])
    .show(Some(5), None, None)
    .await?;

    // print results
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +-----+-------+--------+ |
    // | |name |age_int|addition| |
    // | +-----+-------+--------+ |
    // | |Jorge|30     |33.0    | |
    // | |Bob  |32     |35.0    | |
    // | +-----+-------+--------+ |
    // |                          |
    // +--------------------------+

    Ok(())
}
source

pub fn sparkSession(self) -> Box<SparkSession>

source

pub async fn storageLevel(self) -> Result<StorageLevel, SparkError>

source

pub fn subtract(self, other: DataFrame) -> DataFrame

source

pub async fn tail(self, limit: i32) -> Result<RecordBatch, SparkError>

Returns the last n rows as a RecordBatch

Running tail requires moving the data and results in an action

source

pub async fn take(self, n: i32) -> Result<RecordBatch, SparkError>

source

pub fn toDF<'a, I>(self, cols: I) -> DataFrame
where I: IntoIterator<Item = &'a str>,

source

pub fn union(self, other: DataFrame) -> DataFrame

source

pub fn unionAll(self, other: DataFrame) -> DataFrame

source

pub fn unionByName( self, other: DataFrame, allow_missing_columns: Option<bool> ) -> DataFrame

source

pub async fn unpersist(self, blocking: Option<bool>) -> DataFrame

source

pub fn unpivot<I, K>( self, ids: I, values: Option<K>, variable_column_name: &str, value_column_name: &str ) -> DataFrame
where I: ToVecExpr, K: ToVecExpr,

Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns set. This is the reverse to groupBy(…).pivot(…).agg(…), except for the aggregation, which cannot be reversed.

source

pub fn withColumn(self, colName: &str, col: Column) -> DataFrame

source

pub fn withColumns<I, K>(self, colMap: I) -> DataFrame
where I: IntoIterator<Item = (K, Column)>, K: ToString,

source

pub fn withColumnsRenamed<I, K, V>(self, cols: I) -> DataFrame
where I: IntoIterator<Item = (K, V)>, K: AsRef<str>, V: AsRef<str>,

Returns a new DataFrame by renaming multiple columns from a an iterator of containing a key/value pair with the key as the existing column name and the value as the new column name.

source

pub fn write(self) -> DataFrameWriter

Returns a DataFrameWriter struct based on the current DataFrame

Examples found in repository?
examples/sql.rs (line 18)
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
        .build()
        .await?;

    let df = spark
        .clone()
        .sql("select 'apple' as word, 123 as count")
        .await?;

    df.write()
        .mode(SaveMode::Overwrite)
        .format("parquet")
        .save("file:///tmp/spark-connect-write-example-output.parquet")
        .await?;

    let df = spark
        .read()
        .format("parquet")
        .load(["file:///tmp/spark-connect-write-example-output.parquet"])?;

    df.show(Some(100), None, None).await?;

    // +---------------+
    // | show_string   |
    // +---------------+
    // | +-----+-----+ |
    // | |word |count| |
    // | +-----+-----+ |
    // | |apple|123  | |
    // | +-----+-----+ |
    // |               |
    // +---------------+

    Ok(())
}
More examples
Hide additional examples
examples/writer.rs (line 25)
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
        .build()
        .await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .select(col("id").alias("range_id"));

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .mode(SaveMode::Overwrite)
        .option("header", "true")
        .save(path)
        .await?;

    let df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load([path])?;

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}
examples/delta.rs (line 30)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
        .build()
        .await?;

    let paths = ["/opt/spark/examples/src/main/resources/people.csv"];

    let df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .option("inferSchema", "True")
        .load(paths)?;

    df.write()
        .format("delta")
        .mode(SaveMode::Overwrite)
        .saveAsTable("default.people_delta")
        .await?;

    spark
        .sql("DESCRIBE HISTORY default.people_delta")
        .await?
        .show(Some(1), None, Some(true))
        .await?;

    // print results
    // +-------------------------------------------------------------------------------------------------------+
    // | show_string                                                                                           |
    // +-------------------------------------------------------------------------------------------------------+
    // | -RECORD 0-------------------------------------------------------------------------------------------- |
    // |  version             | 3                                                                              |
    // |  timestamp           | 2024-03-16 13:46:23.552                                                        |
    // |  userId              | NULL                                                                           |
    // |  userName            | NULL                                                                           |
    // |  operation           | CREATE OR REPLACE TABLE AS SELECT                                              |
    // |  operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}}  |
    // |  job                 | NULL                                                                           |
    // |  notebook            | NULL                                                                           |
    // |  clusterId           | NULL                                                                           |
    // |  readVersion         | 2                                                                              |
    // |  isolationLevel      | Serializable                                                                   |
    // |  isBlindAppend       | false                                                                          |
    // |  operationMetrics    | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988}                     |
    // |  userMetadata        | NULL                                                                           |
    // |  engineInfo          | Apache-Spark/3.5.0 Delta-Lake/3.0.0                                            |
    // | only showing top 1 row                                                                                |
    // |                                                                                                       |
    // +-------------------------------------------------------------------------------------------------------+

    Ok(())
}
source

pub fn writeStream(self) -> DataStreamWriter

Interface for DataStreamWriter to save the content of the streaming DataFrame out into external storage.

Examples found in repository?
examples/readstream.rs (line 23)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
            .build()
            .await?;

    let df = spark
        .readStream()
        .format("rate")
        .option("rowsPerSecond", "5")
        .load(None)?;

    let query = df
        .writeStream()
        .format("console")
        .queryName("example_stream")
        .outputMode(OutputMode::Append)
        .trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
        .start(None)
        .await?;

    // loop to get multiple progression stats
    for _ in 1..5 {
        thread::sleep(time::Duration::from_secs(5));
        let val = &query.clone().lastProgress().await?;
        println!("{}", val);
    }

    // stop the active stream
    query.stop().await?;

    Ok(())
}

Trait Implementations§

source§

impl Clone for DataFrame

source§

fn clone(&self) -> DataFrame

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for DataFrame

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FromRef<T> for T
where T: Clone,

source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more