Crate spark_connect_rs

source ·
Expand description

Spark Connection Client for Rust

Currently, the Spark Connect client for Rust is highly experimental and should not be used in any production setting. This is currently a “proof of concept” to identify the methods of interacting with Spark cluster from rust.

§Quickstart

Create a Spark Session and create a DataFrame from a arrow::array::RecordBatch.

use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use spark_connect_rs::functions::{col, lit}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
        .build()
        .await?;

    let name: ArrayRef = Arc::new(StringArray::from(vec!["Tom", "Alice", "Bob"]));
    let age: ArrayRef = Arc::new(Int64Array::from(vec![14, 23, 16]));

    let data = RecordBatch::try_from_iter(vec![("name", name), ("age", age)])?

    let df = spark.createDataFrame(&data).await?

    // 2 records total
    let records = df.select("*")
        .withColumn("age_plus", col("age") + lit(4))
        .filter(col("name").contains("o"))
        .count()
        .await?;

    Ok(())
};

Create a Spark Session and create a DataFrame from a SQL statement:

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
        .build()
        .await?;

    let df = spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`").await?;

    // Show the first 5 records
    df.filter("salary > 3000").show(Some(5), None, None).await?;

    Ok(())
};

Create a Spark Session, read a CSV file into a DataFrame, apply function transformations, and write the results:

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::functions as F;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
        .build()
        .await?;

    let paths = ["/opt/spark/examples/src/main/resources/people.csv"];

    let df = spark
        .read()
        .format("csv")
        .option("header", "True")
        .option("delimiter", ";")
        .load(paths)?;

    let df = df
        .filter("age > 30")
        .select([
            F::col("name"),
            F::col("age").cast("int")
        ]);

    df.write()
      .format("csv")
      .option("header", "true")
      .save("/opt/spark/examples/src/main/rust/people/")
      .await?;

    Ok(())
};

§Databricks Connection

Spark Connect is enabled for Databricks Runtime 13.3 LTS and above, and requires the feature flag feature = "tls". The connection string for the remote session must contain the following values in the string;

"sc://<workspace id>:443/;token=<personal access token>;x-databricks-cluster-id=<cluster-id>"

Re-exports§

Modules§

  • Spark Catalog representation through which the user may create, drop, alter or query underlying databases, tables, functions, etc.
  • Column represents a column in a DataFrame that holds a spark::Expression
  • DataFrame representation for Spark Connection
  • Defines a SparkError for representing failures in various Spark operations. Most of these are wrappers for tonic or arrow error messages
  • Traits for converting Rust Types to Spark Connect Expression Types
  • A re-implementation of Spark functions
  • A DataFrame created with an aggregate statement
  • Logical Plan representation
  • DataFrameReader & DataFrameWriter representations
  • Spark Session containing the remote gRPC client
  • Spark Connect gRPC protobuf translated using tonic
  • Enum for handling Spark Storage representations
  • Streaming implementation for the Spark Connect Client
  • Rust Types to Spark Types

Macros§