Struct spark_connect_rs::readwriter::DataFrameReader
source · pub struct DataFrameReader { /* private fields */ }Expand description
DataFrameReader represents the entrypoint to create a DataFrame from a specific file format.
Implementations§
source§impl DataFrameReader
impl DataFrameReader
sourcepub fn new(spark_session: SparkSession) -> Self
pub fn new(spark_session: SparkSession) -> Self
Create a new DataFrameReader with a SparkSession
sourcepub fn format(self, format: &str) -> Self
pub fn format(self, format: &str) -> Self
Specifies the input data source format
Examples found in repository?
examples/reader.rs (line 16)
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path);
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}More examples
examples/writer.rs (line 33)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}examples/delta.rs (line 22)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths);
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await
.unwrap();
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await
.unwrap();
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}sourcepub fn option(self, key: &str, value: &str) -> Self
pub fn option(self, key: &str, value: &str) -> Self
Add an input option for the underlying data source
Examples found in repository?
examples/reader.rs (line 17)
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path);
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}More examples
examples/writer.rs (line 34)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}examples/delta.rs (line 23)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths);
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await
.unwrap();
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await
.unwrap();
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}sourcepub fn options<I, K, V>(self, options: I) -> Self
pub fn options<I, K, V>(self, options: I) -> Self
Set many input options based on an iterator of (key/value pairs) for the underlying data source
sourcepub fn load<'a, I>(&mut self, paths: I) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
pub fn load<'a, I>(&mut self, paths: I) -> DataFramewhere
I: IntoIterator<Item = &'a str>,
Loads data from a data source and returns it as a DataFrame
Example:
let path = vec!["some/dir/path/on/the/remote/cluster/"];
// returns a DataFrame from a csv file with a header from a the specific path
let mut df = spark.read().format("csv").option("header", "true").load(path);Examples found in repository?
examples/reader.rs (line 19)
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let path = ["/opt/spark/examples/src/main/resources/people.csv"];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(path);
df.select([
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}More examples
examples/writer.rs (line 35)
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load([path]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}examples/delta.rs (line 26)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = ["/opt/spark/examples/src/main/resources/people.csv"];
let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths);
df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await
.unwrap();
spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await
.unwrap();
// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+
Ok(())
}Trait Implementations§
source§impl Clone for DataFrameReader
impl Clone for DataFrameReader
source§fn clone(&self) -> DataFrameReader
fn clone(&self) -> DataFrameReader
Returns a copy of the value. Read more
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source. Read moreAuto Trait Implementations§
impl Freeze for DataFrameReader
impl !RefUnwindSafe for DataFrameReader
impl Send for DataFrameReader
impl Sync for DataFrameReader
impl Unpin for DataFrameReader
impl !UnwindSafe for DataFrameReader
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T in a tonic::Request