pub struct DataFrame {
    pub spark_session: SparkSession,
    pub logical_plan: LogicalPlanBuilder,
}
Expand description

DataFrame is composed of a spark_session connecting to a remote Spark Connect enabled cluster, and a logical_plan which represents the Plan to be submitted to the cluster when an action is called

Fields§

§spark_session: SparkSession

Global SparkSession connecting to the remote cluster

§logical_plan: LogicalPlanBuilder

Logical Plan representing the unresolved Relation which will be submitted to the remote cluster

Implementations§

source§

impl DataFrame

source

pub fn new( spark_session: SparkSession, logical_plan: LogicalPlanBuilder ) -> DataFrame

create default DataFrame based on a spark session and initial logical plan

source

pub fn select<T: ToVecExpr>(&mut self, cols: T) -> DataFrame

Projects a set of expressions and returns a new DataFrame

§Arguments:
  • cols is a vector of &str which resolve to a specific column
§Example:
async {
    df.select(vec![col("age"), col("name")]).collect().await?;
}
Examples found in repository?
examples/sql.rs (line 20)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
            .build()
            .await?;

    let mut df = spark
        .sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
        .await;

    df.filter("salary >= 3500")
        .select("*")
        .show(Some(5), None, None)
        .await?;

    // +-----------------+
    // | show_string     |
    // +-----------------+
    // | +------+------+ |
    // | |name  |salary| |
    // | +------+------+ |
    // | |Andy  |4500  | |
    // | |Justin|3500  | |
    // | |Berta |4000  | |
    // | +------+------+ |
    // |                 |
    // +-----------------+

    Ok(())
}
More examples
Hide additional examples
examples/reader.rs (lines 21-25)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];

    let mut df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(paths);

    df.select(vec![
        F::col("name"),
        F::col("age").cast("int").alias("age_int"),
        (F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
    ])
    .sort(vec![F::col("name").desc()])
    .show(Some(5), None, None)
    .await?;

    // print results
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +-----+-------+--------+ |
    // | |name |age_int|addition| |
    // | +-----+-------+--------+ |
    // | |Jorge|30     |33.0    | |
    // | |Bob  |32     |35.0    | |
    // | +-----+-------+--------+ |
    // |                          |
    // +--------------------------+

    Ok(())
}
examples/writer.rs (line 17)
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .select(col("id").alias("range_id"));

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .option("header", "true")
        .save(path)
        .await?;

    let mut df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load(vec![path.to_string()]);

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}
source

pub fn selectExpr(&mut self, cols: Vec<&str>) -> DataFrame

Project a set of SQL expressions and returns a new DataFrame

This is a variant of select that accepts SQL Expressions

§Example:
async {
    df.selectExpr(vec!["id * 2", "abs(id)"]).collect().await?;
}
source

pub fn filter<T: ToFilterExpr>(&mut self, condition: T) -> DataFrame

Filters rows using a given conditions and returns a new DataFrame

§Example:
async {
    df.filter("salary > 4000").collect().await?;
}
Examples found in repository?
examples/sql.rs (line 19)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
            .build()
            .await?;

    let mut df = spark
        .sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
        .await;

    df.filter("salary >= 3500")
        .select("*")
        .show(Some(5), None, None)
        .await?;

    // +-----------------+
    // | show_string     |
    // +-----------------+
    // | +------+------+ |
    // | |name  |salary| |
    // | +------+------+ |
    // | |Andy  |4500  | |
    // | |Justin|3500  | |
    // | |Berta |4000  | |
    // | +------+------+ |
    // |                 |
    // +-----------------+

    Ok(())
}
source

pub fn contains(&mut self, condition: Column) -> DataFrame

source

pub fn sort(&mut self, cols: Vec<Column>) -> DataFrame

Examples found in repository?
examples/reader.rs (line 26)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];

    let mut df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(paths);

    df.select(vec![
        F::col("name"),
        F::col("age").cast("int").alias("age_int"),
        (F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
    ])
    .sort(vec![F::col("name").desc()])
    .show(Some(5), None, None)
    .await?;

    // print results
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +-----+-------+--------+ |
    // | |name |age_int|addition| |
    // | +-----+-------+--------+ |
    // | |Jorge|30     |33.0    | |
    // | |Bob  |32     |35.0    | |
    // | +-----+-------+--------+ |
    // |                          |
    // +--------------------------+

    Ok(())
}
source

pub fn limit(&mut self, limit: i32) -> DataFrame

Limits the result count o thte number specified and returns a new DataFrame

§Example:
async {
    df.limit(10).collect().await?;
}
source

pub fn drop_duplicates(&mut self, cols: Option<Vec<&str>>) -> DataFrame

Return a new DataFrame with duplicate rows removed, optionally only considering certain columns from a Vec<String>

If no columns are supplied then it all columns are used

Alias for dropDuplciates

source

pub fn dropDuplicates(&mut self, cols: Option<Vec<&str>>) -> DataFrame

source

pub fn withColumnsRenamed(&mut self, cols: HashMap<String, String>) -> DataFrame

Returns a new DataFrame by renaming multiple columns from a HashMap<String, String> containing the existing as the key and the new as the value.

source

pub fn drop(&mut self, cols: Vec<String>) -> DataFrame

Returns a new DataFrame without the specified columns

source

pub fn sample( &mut self, lower_bound: f64, upper_bound: f64, with_replacement: Option<bool>, seed: Option<i64> ) -> DataFrame

Returns a sampled subset of this DataFrame

source

pub fn repartition( &mut self, num_partitions: i32, shuffle: Option<bool> ) -> DataFrame

Returns a new DataFrame partitioned by the given partition number and shuffle option

§Arguments
  • num_partitions: the target number of partitions
  • (optional) shuffle: to induce a shuffle. Default is false
source

pub fn offset(&mut self, num: i32) -> DataFrame

Returns a new DataFrame by skiping the first n rows

source

pub async fn schema(&mut self) -> Schema

Returns the schema of this DataFrame as a spark::analyze_plan_response::Schema which contains the schema of a DataFrame

source

pub async fn explain(&mut self, mode: &str)

Prints the spark::Plan to the console

§Arguments:
  • mode: &str. Defaults to unspecified
    • simple
    • extended
    • codegen
    • cost
    • formatted
    • unspecified
source

pub async fn show( &mut self, num_rows: Option<i32>, truncate: Option<i32>, vertical: Option<bool> ) -> Result<(), ArrowError>

Prints the first n rows to the console

§Arguments:
  • num_row: (int, optional) number of rows to show (default 10)
  • truncate: (int, optional) If set to 0, it truncates the string. Any other number will not truncate the strings
  • vertical: (bool, optional) If set to true, prints output rows vertically (one line per column value).
Examples found in repository?
examples/sql.rs (line 21)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
            .build()
            .await?;

    let mut df = spark
        .sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
        .await;

    df.filter("salary >= 3500")
        .select("*")
        .show(Some(5), None, None)
        .await?;

    // +-----------------+
    // | show_string     |
    // +-----------------+
    // | +------+------+ |
    // | |name  |salary| |
    // | +------+------+ |
    // | |Andy  |4500  | |
    // | |Justin|3500  | |
    // | |Berta |4000  | |
    // | +------+------+ |
    // |                 |
    // +-----------------+

    Ok(())
}
More examples
Hide additional examples
examples/reader.rs (line 27)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];

    let mut df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(paths);

    df.select(vec![
        F::col("name"),
        F::col("age").cast("int").alias("age_int"),
        (F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
    ])
    .sort(vec![F::col("name").desc()])
    .show(Some(5), None, None)
    .await?;

    // print results
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +-----+-------+--------+ |
    // | |name |age_int|addition| |
    // | +-----+-------+--------+ |
    // | |Jorge|30     |33.0    | |
    // | |Bob  |32     |35.0    | |
    // | +-----+-------+--------+ |
    // |                          |
    // +--------------------------+

    Ok(())
}
examples/writer.rs (line 34)
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .select(col("id").alias("range_id"));

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .option("header", "true")
        .save(path)
        .await?;

    let mut df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load(vec![path.to_string()]);

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}
source

pub async fn tail(&mut self, limit: i32) -> Result<Vec<RecordBatch>, ArrowError>

Returns the last n rows as vector of RecordBatch

Running tail requires moving the data and results in an action

source

pub async fn collect(&mut self) -> Result<Vec<RecordBatch>, ArrowError>

Returns all records as a vector of RecordBatch

§Example:
async {
    df.collect().await?;
}
source

pub fn write(self) -> DataFrameWriter

Returns a DataFrameWriter struct based on the current DataFrame

Examples found in repository?
examples/writer.rs (line 21)
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .select(col("id").alias("range_id"));

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .option("header", "true")
        .save(path)
        .await?;

    let mut df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load(vec![path.to_string()]);

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}

Trait Implementations§

source§

impl Clone for DataFrame

source§

fn clone(&self) -> DataFrame

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for DataFrame

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FromRef<T> for T
where T: Clone,

source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more