Struct spark_connect_rs::SparkSession
source · pub struct SparkSession { /* private fields */ }
Expand description
The spark-connect-rs crate is currently just a meta-package shim for spark-connect-core The entry point to connecting to a Spark Cluster using the Spark Connection gRPC protocol.
Implementations§
source§impl SparkSession
impl SparkSession
pub fn new( client: SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>> ) -> SparkSession
sourcepub fn range(
self,
start: Option<i64>,
end: i64,
step: i64,
num_partitions: Option<i32>
) -> DataFrame
pub fn range( self, start: Option<i64>, end: i64, step: i64, num_partitions: Option<i32> ) -> DataFrame
Create a DataFrame with a spingle column named id
,
containing elements in a range from start
(default 0) to
end
(exclusive) with a step value step
, and control the number
of partitions with num_partitions
Examples found in repository?
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path])?;
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}
sourcepub fn read(self) -> DataFrameReader
pub fn read(self) -> DataFrameReader
Returns a DataFrameReader that can be used to read datra in as a DataFrame
Examples found in repository?
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.sql("select 'apple' as word, 123 as count")
.await?;
df.write()
.mode(SaveMode::Overwrite)
.format("parquet")
.save("file:///tmp/spark-connect-write-example-output.parquet")
.await?;
let df = spark
.read()
.format("parquet")
.load(["file:///tmp/spark-connect-write-example-output.parquet"])?;
df.show(Some(100), None, None).await?;
// +---------------+
// | show_string |
// +---------------+
// | +-----+-----+ |
// | |word |count| |
// | +-----+-----+ |
// | |apple|123 | |
// | +-----+-----+ |
// | |
// +---------------+
Ok(())
}
More examples
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path)?;
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path])?;
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths)?;
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await?;
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await?;
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}
sourcepub fn readStream(self) -> DataStreamReader
pub fn readStream(self) -> DataStreamReader
Returns a DataFrameReader that can be used to read datra in as a DataFrame
Examples found in repository?
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;
let df = spark
.readStream()
.format("rate")
.option("rowsPerSecond", "5")
.load(None)?;
let query = df
.writeStream()
.format("console")
.queryName("example_stream")
.outputMode(OutputMode::Append)
.trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
.start(None)
.await?;
// loop to get multiple progression stats
for _ in 1..5 {
thread::sleep(time::Duration::from_secs(5));
let val = &query.clone().lastProgress().await?;
println!("{}", val);
}
// stop the active stream
query.stop().await?;
Ok(())
}
pub fn table(self, name: &str) -> Result<DataFrame, SparkError>
sourcepub fn catalog(self) -> Catalog
pub fn catalog(self) -> Catalog
Interface through which the user may create, drop, alter or query underlying databases, tables, functions, etc.
sourcepub async fn sql(self, sql_query: &str) -> Result<DataFrame, SparkError>
pub async fn sql(self, sql_query: &str) -> Result<DataFrame, SparkError>
Returns a DataFrame representing the result of the given query
Examples found in repository?
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.clone()
.sql("select 'apple' as word, 123 as count")
.await?;
df.write()
.mode(SaveMode::Overwrite)
.format("parquet")
.save("file:///tmp/spark-connect-write-example-output.parquet")
.await?;
let df = spark
.read()
.format("parquet")
.load(["file:///tmp/spark-connect-write-example-output.parquet"])?;
df.show(Some(100), None, None).await?;
// +---------------+
// | show_string |
// +---------------+
// | +-----+-----+ |
// | |word |count| |
// | +-----+-----+ |
// | |apple|123 | |
// | +-----+-----+ |
// | |
// +---------------+
Ok(())
}
More examples
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths)?;
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await?;
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await?;
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}
pub fn createDataFrame( self, data: &RecordBatch ) -> Result<DataFrame, SparkError>
sourcepub fn session_id(&self) -> &str
pub fn session_id(&self) -> &str
Return the session ID
sourcepub fn client(
self
) -> SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>>
pub fn client( self ) -> SparkConnectClient<InterceptedService<Channel, MetadataInterceptor>>
Spark Connection gRPC client interface
Trait Implementations§
source§impl Clone for SparkSession
impl Clone for SparkSession
source§fn clone(&self) -> SparkSession
fn clone(&self) -> SparkSession
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moreAuto Trait Implementations§
impl Freeze for SparkSession
impl !RefUnwindSafe for SparkSession
impl Send for SparkSession
impl Sync for SparkSession
impl Unpin for SparkSession
impl !UnwindSafe for SparkSession
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request