Struct spark_connect_rs::session::SparkSession
source · pub struct SparkSession {
pub client: Arc<Mutex<SparkConnectServiceClient<InterceptedService<Channel, MetadataInterceptor>>>>,
pub session_id: String,
pub metadata: Option<HashMap<String, String>>,
pub user_id: Option<String>,
pub token: Option<&'static str>,
}Expand description
The entry point to connecting to a Spark Cluster using the Spark Connection gRPC protocol.
Fields§
§client: Arc<Mutex<SparkConnectServiceClient<InterceptedService<Channel, MetadataInterceptor>>>>Spark Connection gRPC client interface
session_id: StringSpark Session ID
metadata: Option<HashMap<String, String>>gRPC metadata collected from the connection string
user_id: Option<String>§token: Option<&'static str>Implementations§
source§impl SparkSession
impl SparkSession
sourcepub fn range(
self,
start: Option<i64>,
end: i64,
step: i64,
num_partitions: Option<i32>
) -> DataFrame
pub fn range( self, start: Option<i64>, end: i64, step: i64, num_partitions: Option<i32> ) -> DataFrame
Create a DataFrame with a spingle column named id,
containing elements in a range from start (default 0) to
end (exclusive) with a step value step, and control the number
of partitions with num_partitions
Examples found in repository?
examples/writer.rs (line 16)
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load(vec![path.to_string()]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}sourcepub fn read(self) -> DataFrameReader
pub fn read(self) -> DataFrameReader
Returns a DataFrameReader that can be used to read datra in as a DataFrame
Examples found in repository?
examples/reader.rs (line 15)
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths);
df.select(vec![
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}More examples
examples/writer.rs (line 29)
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load(vec![path.to_string()]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}sourcepub async fn sql(&mut self, sql_query: &str) -> DataFrame
pub async fn sql(&mut self, sql_query: &str) -> DataFrame
Returns a DataFrame representing the result of the given query
Examples found in repository?
examples/sql.rs (line 16)
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
.build()
.await?;
let mut df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await;
df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
.await?;
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}pub async fn execute_plan( &mut self, plan: Option<Plan> ) -> Result<Streaming<ExecutePlanResponse>, Status>
sourcepub async fn consume_plan(
&mut self,
plan: Option<Plan>
) -> Result<Vec<RecordBatch>, ArrowError>
pub async fn consume_plan( &mut self, plan: Option<Plan> ) -> Result<Vec<RecordBatch>, ArrowError>
Call a service on the remote Spark Connect server by running a provided spark::Plan.
A spark::Plan produces a vector of RecordBatch records
pub async fn analyze_plan(&mut self, analyze: Option<Analyze>) -> Option<Result>
Trait Implementations§
source§impl Clone for SparkSession
impl Clone for SparkSession
source§fn clone(&self) -> SparkSession
fn clone(&self) -> SparkSession
Returns a copy of the value. Read more
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl !RefUnwindSafe for SparkSession
impl Send for SparkSession
impl Sync for SparkSession
impl Unpin for SparkSession
impl !UnwindSafe for SparkSession
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T in a tonic::Request