Crate spark_connect_rs
source ·Expand description
Spark Connection Client for Rust
Currently, the Spark Connect client for Rust is highly experimental and should not be used in any production setting. This is currently a “proof of concept” to identify the methods of interacting with Spark cluster from rust.
§Quickstart
Create a Spark Session and create a DataFrame from a arrow::array::RecordBatch.
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use spark_connect_rs::functions::{col, lit}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
let name: ArrayRef = Arc::new(StringArray::from(vec!["Tom", "Alice", "Bob"]));
let age: ArrayRef = Arc::new(Int64Array::from(vec![14, 23, 16]));
let data = RecordBatch::try_from_iter(vec![("name", name), ("age", age)])?
let df = spark.createDataFrame(&data).await?
// 2 records total
let records = df.select("*")
.withColumn("age_plus", col("age") + lit(4))
.filter(col("name").contains("o"))
.count()
.await?;
Ok(())
};
Create a Spark Session and create a DataFrame from a SQL statement:
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
let df = spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`").await?;
// Show the first 5 records
df.filter("salary > 3000").show(Some(5), None, None).await?;
Ok(())
};
Create a Spark Session, read a CSV file into a DataFrame, apply function transformations, and write the results:
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use spark_connect_rs::functions as F;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs")
.build()
.await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths)?;
let df = df
.filter("age > 30")
.select([
F::col("name"),
F::col("age").cast("int")
]);
df.write()
.format("csv")
.option("header", "true")
.save("/opt/spark/examples/src/main/rust/people/")
.await?;
Ok(())
};
§Databricks Connection
Spark Connect is enabled for Databricks Runtime 13.3 LTS and above, and requires the feature
flag feature = "tls"
. The connection string for the remote session must contain the following
values in the string;
"sc://<workspace id>:443/;token=<personal access token>;x-databricks-cluster-id=<cluster-id>"
Re-exports§
pub use dataframe::DataFrame;
pub use dataframe::DataFrameReader;
pub use dataframe::DataFrameWriter;
pub use session::SparkSession;
pub use session::SparkSessionBuilder;
pub use arrow;
Modules§
- Spark Catalog representation through which the user may create, drop, alter or query underlying databases, tables, functions, etc.
- Column represents a column in a DataFrame that holds a spark::Expression
- DataFrame representation for Spark Connection
- Defines a SparkError for representing failures in various Spark operations. Most of these are wrappers for tonic or arrow error messages
- Traits for converting Rust Types to Spark Connect Expression Types
- A re-implementation of Spark functions
- A DataFrame created with an aggregate statement
- Logical Plan representation
- DataFrameReader & DataFrameWriter representations
- Spark Session containing the remote gRPC client
- Spark Connect gRPC protobuf translated using tonic
- Enum for handling Spark Storage representations
- Streaming implementation for the Spark Connect Client
- Rust Types to Spark Types