Struct spark_connect_rs::dataframe::DataFrame
source · pub struct DataFrame {
pub spark_session: SparkSession,
pub logical_plan: LogicalPlanBuilder,
}Expand description
DataFrame is composed of a spark_session connecting to a remote
Spark Connect enabled cluster, and a logical_plan which represents
the Plan to be submitted to the cluster when an action is called
Fields§
§spark_session: SparkSessionGlobal SparkSession connecting to the remote cluster
logical_plan: LogicalPlanBuilderLogical Plan representing the unresolved Relation which will be submitted to the remote cluster
Implementations§
source§impl DataFrame
impl DataFrame
sourcepub fn new(
spark_session: SparkSession,
logical_plan: LogicalPlanBuilder
) -> DataFrame
pub fn new( spark_session: SparkSession, logical_plan: LogicalPlanBuilder ) -> DataFrame
create default DataFrame based on a spark session and initial logical plan
sourcepub fn select<T: ToVecExpr>(&mut self, cols: T) -> DataFrame
pub fn select<T: ToVecExpr>(&mut self, cols: T) -> DataFrame
Projects a set of expressions and returns a new DataFrame
§Arguments:
colsis a vector of&strwhich resolve to a specific column
§Example:
async {
df.select(vec![col("age"), col("name")]).collect().await?;
}Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
.build()
.await?;
let mut df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await;
df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
.await?;
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}More examples
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths);
df.select(vec![
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load(vec![path.to_string()]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}sourcepub fn selectExpr(&mut self, cols: Vec<&str>) -> DataFrame
pub fn selectExpr(&mut self, cols: Vec<&str>) -> DataFrame
sourcepub fn filter<T: ToFilterExpr>(&mut self, condition: T) -> DataFrame
pub fn filter<T: ToFilterExpr>(&mut self, condition: T) -> DataFrame
Filters rows using a given conditions and returns a new DataFrame
§Example:
async {
df.filter("salary > 4000").collect().await?;
}Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
.build()
.await?;
let mut df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await;
df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
.await?;
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}pub fn contains(&mut self, condition: Column) -> DataFrame
sourcepub fn sort(&mut self, cols: Vec<Column>) -> DataFrame
pub fn sort(&mut self, cols: Vec<Column>) -> DataFrame
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths);
df.select(vec![
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}sourcepub fn drop_duplicates(&mut self, cols: Option<Vec<&str>>) -> DataFrame
pub fn drop_duplicates(&mut self, cols: Option<Vec<&str>>) -> DataFrame
Return a new DataFrame with duplicate rows removed,
optionally only considering certain columns from a Vec<String>
If no columns are supplied then it all columns are used
Alias for dropDuplciates
pub fn dropDuplicates(&mut self, cols: Option<Vec<&str>>) -> DataFrame
sourcepub fn withColumnsRenamed(&mut self, cols: HashMap<String, String>) -> DataFrame
pub fn withColumnsRenamed(&mut self, cols: HashMap<String, String>) -> DataFrame
Returns a new DataFrame by renaming multiple columns from a
HashMap<String, String> containing the existing as the key
and the new as the value.
sourcepub fn drop(&mut self, cols: Vec<String>) -> DataFrame
pub fn drop(&mut self, cols: Vec<String>) -> DataFrame
Returns a new DataFrame without the specified columns
sourcepub fn sample(
&mut self,
lower_bound: f64,
upper_bound: f64,
with_replacement: Option<bool>,
seed: Option<i64>
) -> DataFrame
pub fn sample( &mut self, lower_bound: f64, upper_bound: f64, with_replacement: Option<bool>, seed: Option<i64> ) -> DataFrame
Returns a sampled subset of this DataFrame
sourcepub fn offset(&mut self, num: i32) -> DataFrame
pub fn offset(&mut self, num: i32) -> DataFrame
Returns a new DataFrame by skiping the first n rows
sourcepub async fn schema(&mut self) -> Schema
pub async fn schema(&mut self) -> Schema
Returns the schema of this DataFrame as a spark::analyze_plan_response::Schema which contains the schema of a DataFrame
sourcepub async fn explain(&mut self, mode: &str)
pub async fn explain(&mut self, mode: &str)
Prints the spark::Plan to the console
§Arguments:
mode: &str. Defaults tounspecifiedsimpleextendedcodegencostformattedunspecified
sourcepub async fn show(
&mut self,
num_rows: Option<i32>,
truncate: Option<i32>,
vertical: Option<bool>
) -> Result<(), ArrowError>
pub async fn show( &mut self, num_rows: Option<i32>, truncate: Option<i32>, vertical: Option<bool> ) -> Result<(), ArrowError>
Prints the first n rows to the console
§Arguments:
num_row: (int, optional) number of rows to show (default 10)truncate: (int, optional) If set to 0, it truncates the string. Any other number will not truncate the stringsvertical: (bool, optional) If set to true, prints output rows vertically (one line per column value).
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
.build()
.await?;
let mut df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await;
df.filter("salary >= 3500")
.select("*")
.show(Some(5), None, None)
.await?;
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}More examples
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths);
df.select(vec![
F::col("name"),
F::col("age").cast("int").alias("age_int"),
(F::lit(3.0) + F::col("age").cast("int")).alias("addition"),
])
.sort(vec![F::col("name").desc()])
.show(Some(5), None, None)
.await?;
// print results
// +--------------------------+
// | show_string |
// +--------------------------+
// | +-----+-------+--------+ |
// | |name |age_int|addition| |
// | +-----+-------+--------+ |
// | |Jorge|30 |33.0 | |
// | |Bob |32 |35.0 | |
// | +-----+-------+--------+ |
// | |
// +--------------------------+
Ok(())
}11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load(vec![path.to_string()]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}sourcepub async fn tail(&mut self, limit: i32) -> Result<Vec<RecordBatch>, ArrowError>
pub async fn tail(&mut self, limit: i32) -> Result<Vec<RecordBatch>, ArrowError>
Returns the last n rows as vector of RecordBatch
Running tail requires moving the data and results in an action
sourcepub async fn collect(&mut self) -> Result<Vec<RecordBatch>, ArrowError>
pub async fn collect(&mut self) -> Result<Vec<RecordBatch>, ArrowError>
sourcepub fn write(self) -> DataFrameWriter
pub fn write(self) -> DataFrameWriter
Returns a DataFrameWriter struct based on the current DataFrame
Examples found in repository?
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.select(col("id").alias("range_id"));
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load(vec![path.to_string()]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for DataFrame
impl Send for DataFrame
impl Sync for DataFrame
impl Unpin for DataFrame
impl !UnwindSafe for DataFrame
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request