1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// This example demonstrates creating a Spark DataFrame from a CSV with read options
// and then adding transformations for 'select' & 'sort'
// The resulting dataframe is saved in the `delta` format as a `managed` table
// and `spark.sql` queries are run against the delta table
//
// The remote spark session must have the spark package `io.delta:delta-spark_2.12:{DELTA_VERSION}` enabled.
// Where the `DELTA_VERSION` is the specified Delta Lake version.

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::dataframe::SaveMode;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut spark: SparkSession = SparkSessionBuilder::default().build().await?;

    let paths = ["/opt/spark/examples/src/main/resources/people.csv"];

    let df = spark
        .clone()
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .option("inferSchema", "True")
        .load(paths);

    df.write()
        .format("delta")
        .mode(SaveMode::Overwrite)
        .saveAsTable("default.people_delta")
        .await
        .unwrap();

    spark
        .sql("DESCRIBE HISTORY default.people_delta")
        .await?
        .show(Some(1), None, Some(true))
        .await
        .unwrap();

    // print results
    // +-------------------------------------------------------------------------------------------------------+
    // | show_string                                                                                           |
    // +-------------------------------------------------------------------------------------------------------+
    // | -RECORD 0-------------------------------------------------------------------------------------------- |
    // |  version             | 3                                                                              |
    // |  timestamp           | 2024-03-16 13:46:23.552                                                        |
    // |  userId              | NULL                                                                           |
    // |  userName            | NULL                                                                           |
    // |  operation           | CREATE OR REPLACE TABLE AS SELECT                                              |
    // |  operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}}  |
    // |  job                 | NULL                                                                           |
    // |  notebook            | NULL                                                                           |
    // |  clusterId           | NULL                                                                           |
    // |  readVersion         | 2                                                                              |
    // |  isolationLevel      | Serializable                                                                   |
    // |  isBlindAppend       | false                                                                          |
    // |  operationMetrics    | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988}                     |
    // |  userMetadata        | NULL                                                                           |
    // |  engineInfo          | Apache-Spark/3.5.0 Delta-Lake/3.0.0                                            |
    // | only showing top 1 row                                                                                |
    // |                                                                                                       |
    // +-------------------------------------------------------------------------------------------------------+

    Ok(())
}