Crate spark_connect_rs

Source
Expand description

Spark Connection Client for Rust

Currently, the Spark Connect client for Rust is highly experimental and should not be used in any production setting. This is currently a “proof of concept” to identify the methods of interacting with Spark cluster from rust.

§Quickstart

Create a Spark Session and create a DataFrame from a arrow::array::RecordBatch.

use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use spark_connect_rs::functions::{col, lit}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
        .build()
        .await?;

    let name: ArrayRef = Arc::new(StringArray::from(vec!["Tom", "Alice", "Bob"]));
    let age: ArrayRef = Arc::new(Int64Array::from(vec![14, 23, 16]));

    let data = RecordBatch::try_from_iter(vec![("name", name), ("age", age)])?

    let df = spark.create_dataframe(&data).await?

    // 2 records total
    let records = df.select(["*"])
        .with_column("age_plus", col("age") + lit(4))
        .filter(col("name").contains("o"))
        .count()
        .await?;

    Ok(())
};

Create a Spark Session and create a DataFrame from a SQL statement:

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
        .build()
        .await?;

    let df = spark.sql("SELECT * FROM json.`/datasets/employees.json`").await?;

    // Show the first 5 records
    df.filter("salary > 3000").show(Some(5), None, None).await?;

    Ok(())
};

Create a Spark Session, read a CSV file into a DataFrame, apply function transformations, and write the results:

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::functions as F;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
        .build()
        .await?;

    let paths = ["/datasets/people.csv"];

    let df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(paths)?;

    let df = df
        .filter("age > 30")
        .select([
            F::col("name"),
            F::col("age").cast("int")
        ]);

    df.write()
      .format("csv")
      .option("header", "true")
      .save("/opt/spark/examples/src/main/rust/people/")
      .await?;

    Ok(())
};

§Databricks Connection

Spark Connect is enabled for Databricks Runtime 13.3 LTS and above, and requires the feature flag feature = "tls". The connection string for the remote session must contain the following values in the string;

"sc://<workspace id>:443/;token=<personal access token>;x-databricks-cluster-id=<cluster-id>"

Re-exports§

Modules§

  • Spark Catalog representation through which the user may create, drop, alter or query underlying databases, tables, functions, etc.
  • Implementation of the SparkConnectServiceClient
  • Column represents a column in a DataFrame that holds a spark::Expression
  • Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
  • DataFrame representation for Spark Connection
  • Defines a SparkError for representing failures in various Spark operations. Most of these are wrappers for tonic or arrow error messages
  • Traits for converting Rust Types to Spark Connect Expression Types
  • A re-implementation of Spark functions
  • A DataFrame created with an aggregate statement
  • Logical Plan representation
  • DataFrameReader & DataFrameWriter representations
  • Spark Session containing the remote gRPC client
  • Spark Connect gRPC protobuf translated using tonic
  • Enum for handling Spark Storage representations
  • Streaming implementation for the Spark Connect Client
  • Rust Types to Spark Types
  • Utility structs for defining a window over a DataFrame