pub struct DataFrame {
    pub spark_session: SparkSession,
    pub logical_plan: LogicalPlanBuilder,
}
Expand description

DataFrame is composed of a spark_session connecting to a remote Spark Connect enabled cluster, and a logical_plan which represents the Plan to be submitted to the cluster when an action is called

Fields§

§spark_session: SparkSession

Global SparkSession connecting to the remote cluster

§logical_plan: LogicalPlanBuilder

Logical Plan representing the unresolved Relation which will be submitted to the remote cluster

Implementations§

source§

impl DataFrame

source

pub fn new( spark_session: SparkSession, logical_plan: LogicalPlanBuilder ) -> DataFrame

create default DataFrame based on a spark session and initial logical plan

source

pub fn select(&mut self, cols: Vec<&str>) -> DataFrame

Projects a set of expressions and returns a new DataFrame

Arguments:
  • cols is a vector of &str which resolve to a specific column
Example:
async {
    df.select(vec!["age", "name"]).collect().await?;
}
Examples found in repository?
examples/reader.rs (line 22)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];

    let mut df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(paths);

    df.filter("age > 30")
        .select(vec!["name"])
        .show(Some(5), None, None)
        .await?;

    // print results
    // +-------------+
    // | show_string |
    // +-------------+
    // | +----+      |
    // | |name|      |
    // | +----+      |
    // | |Bob |      |
    // | +----+      |
    // |             |
    // +-------------+

    Ok(())
}
source

pub fn selectExpr(&mut self, cols: Vec<&str>) -> DataFrame

Project a set of SQL expressions and returns a new DataFrame

This is a variant of select that accepts SQL Expressions

Example:
async {
    df.selectExpr(vec!["id * 2", "abs(id)"]).collect().await?;
}
Examples found in repository?
examples/writer.rs (line 15)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .selectExpr(vec!["id AS range_id"]);

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .option("header", "true")
        .save(path)
        .await?;

    let mut df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load(vec![path.to_string()]);

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}
source

pub fn filter(&mut self, condition: &str) -> DataFrame

Filters rows using a given conditions and returns a new DataFrame

Example:
async {
    df.filter("salary > 4000").collect().await?;
}
Examples found in repository?
examples/reader.rs (line 21)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];

    let mut df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(paths);

    df.filter("age > 30")
        .select(vec!["name"])
        .show(Some(5), None, None)
        .await?;

    // print results
    // +-------------+
    // | show_string |
    // +-------------+
    // | +----+      |
    // | |name|      |
    // | +----+      |
    // | |Bob |      |
    // | +----+      |
    // |             |
    // +-------------+

    Ok(())
}
More examples
Hide additional examples
examples/sql.rs (line 17)
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
            .build()
            .await?;

    let mut df =
        spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`");

    df.filter("salary > 3000").show(Some(5), None, None).await?;

    // print results
    // +-----------------+
    // | show_string     |
    // +-----------------+
    // | +------+------+ |
    // | |name  |salary| |
    // | +------+------+ |
    // | |Andy  |4500  | |
    // | |Justin|3500  | |
    // | |Berta |4000  | |
    // | +------+------+ |
    // |                 |
    // +-----------------+

    Ok(())
}
source

pub fn limit(&mut self, limit: i32) -> DataFrame

Limits the result count o thte number specified and returns a new DataFrame

Example:
async {
    df.limit(10).collect().await?;
}
source

pub fn repartition( &mut self, num_partitions: i32, shuffle: Option<bool> ) -> DataFrame

Returns a new DataFrame partitioned by the given partition number and shuffle option

Arguments
  • num_partitions: the target number of partitions
  • (optional) shuffle: to induce a shuffle. Default is false
source

pub async fn show( &mut self, num_rows: Option<i32>, truncate: Option<i32>, vertical: Option<bool> ) -> Result<(), ArrowError>

Prints the first n rows to the console

Arguments:
  • num_row: (int, optional) number of rows to show (default 10)
  • truncate: (int, optional) If set to 0, it truncates the string. Any other number will not truncate the strings
  • vertical: (bool, optional) If set to true, prints output rows vertically (one line per column value).
Examples found in repository?
examples/reader.rs (line 23)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];

    let mut df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(paths);

    df.filter("age > 30")
        .select(vec!["name"])
        .show(Some(5), None, None)
        .await?;

    // print results
    // +-------------+
    // | show_string |
    // +-------------+
    // | +----+      |
    // | |name|      |
    // | +----+      |
    // | |Bob |      |
    // | +----+      |
    // |             |
    // +-------------+

    Ok(())
}
More examples
Hide additional examples
examples/sql.rs (line 17)
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
            .build()
            .await?;

    let mut df =
        spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`");

    df.filter("salary > 3000").show(Some(5), None, None).await?;

    // print results
    // +-----------------+
    // | show_string     |
    // +-----------------+
    // | +------+------+ |
    // | |name  |salary| |
    // | +------+------+ |
    // | |Andy  |4500  | |
    // | |Justin|3500  | |
    // | |Berta |4000  | |
    // | +------+------+ |
    // |                 |
    // +-----------------+

    Ok(())
}
examples/writer.rs (line 32)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .selectExpr(vec!["id AS range_id"]);

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .option("header", "true")
        .save(path)
        .await?;

    let mut df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load(vec![path.to_string()]);

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}
source

pub async fn tail(&mut self, limit: i32) -> Result<Vec<RecordBatch>, ArrowError>

Returns the last n rows as vector of [RecordBatch]

Running tail requires moving the data and results in an action

source

pub async fn collect(&mut self) -> Result<Vec<RecordBatch>, ArrowError>

Returns all records as a vector of [RecordBatch]

Example:
async {
    df.collect().await?;
}
source

pub fn write(self) -> DataFrameWriter

Returns a DataFrameWriter struct based on the current DataFrame

Examples found in repository?
examples/writer.rs (line 19)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let df = spark
        .clone()
        .range(None, 1000, 1, Some(16))
        .selectExpr(vec!["id AS range_id"]);

    let path = "/opt/spark/examples/src/main/rust/employees/";

    df.write()
        .format("csv")
        .option("header", "true")
        .save(path)
        .await?;

    let mut df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "true")
        .load(vec![path.to_string()]);

    df.show(Some(10), None, None).await?;

    // print results may slighty vary but should be close to the below
    // +--------------------------+
    // | show_string              |
    // +--------------------------+
    // | +--------+               |
    // | |range_id|               |
    // | +--------+               |
    // | |312     |               |
    // | |313     |               |
    // | |314     |               |
    // | |315     |               |
    // | |316     |               |
    // | |317     |               |
    // | |318     |               |
    // | |319     |               |
    // | |320     |               |
    // | |321     |               |
    // | +--------+               |
    // | only showing top 10 rows |
    // |                          |
    // +--------------------------+

    Ok(())
}

Trait Implementations§

source§

impl Clone for DataFrame

source§

fn clone(&self) -> DataFrame

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for DataFrame

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> FromRef<T> for Twhere T: Clone,

§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for Twhere V: MultiLane<T>,

§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more