Struct spark_connect_rs::dataframe::dataframe::DataFrame
source · pub struct DataFrame {
pub spark_session: SparkSession,
pub logical_plan: LogicalPlanBuilder,
}
Expand description
DataFrame is composed of a spark_session
connecting to a remote
Spark Connect enabled cluster, and a logical_plan
which represents
the Plan
to be submitted to the cluster when an action is called
Fields§
§spark_session: SparkSession
Global SparkSession connecting to the remote cluster
logical_plan: LogicalPlanBuilder
Logical Plan representing the unresolved Relation which will be submitted to the remote cluster
Implementations§
source§impl DataFrame
impl DataFrame
sourcepub fn new(
spark_session: SparkSession,
logical_plan: LogicalPlanBuilder
) -> DataFrame
pub fn new( spark_session: SparkSession, logical_plan: LogicalPlanBuilder ) -> DataFrame
create default DataFrame based on a spark session and initial logical plan
sourcepub fn select(&mut self, cols: Vec<&str>) -> DataFrame
pub fn select(&mut self, cols: Vec<&str>) -> DataFrame
Projects a set of expressions and returns a new DataFrame
Arguments:
cols
is a vector of&str
which resolve to a specific column
Example:
async {
df.select(vec!["age", "name"]).collect().await?;
}
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths);
df.filter("age > 30")
.select(vec!["name"])
.show(Some(5), None, None)
.await?;
// print results
// +-------------+
// | show_string |
// +-------------+
// | +----+ |
// | |name| |
// | +----+ |
// | |Bob | |
// | +----+ |
// | |
// +-------------+
Ok(())
}
sourcepub fn selectExpr(&mut self, cols: Vec<&str>) -> DataFrame
pub fn selectExpr(&mut self, cols: Vec<&str>) -> DataFrame
Project a set of SQL expressions and returns a new DataFrame
This is a variant of select
that accepts SQL Expressions
Example:
async {
df.selectExpr(vec!["id * 2", "abs(id)"]).collect().await?;
}
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.selectExpr(vec!["id AS range_id"]);
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load(vec![path.to_string()]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}
sourcepub fn filter(&mut self, condition: &str) -> DataFrame
pub fn filter(&mut self, condition: &str) -> DataFrame
Filters rows using a given conditions and returns a new DataFrame
Example:
async {
df.filter("salary > 4000").collect().await?;
}
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths);
df.filter("age > 30")
.select(vec!["name"])
.show(Some(5), None, None)
.await?;
// print results
// +-------------+
// | show_string |
// +-------------+
// | +----+ |
// | |name| |
// | +----+ |
// | |Bob | |
// | +----+ |
// | |
// +-------------+
Ok(())
}
More examples
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
.build()
.await?;
let mut df =
spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`");
df.filter("salary > 3000").show(Some(5), None, None).await?;
// print results
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}
sourcepub async fn show(
&mut self,
num_rows: Option<i32>,
truncate: Option<i32>,
vertical: Option<bool>
) -> Result<(), ArrowError>
pub async fn show( &mut self, num_rows: Option<i32>, truncate: Option<i32>, vertical: Option<bool> ) -> Result<(), ArrowError>
Prints the first n
rows to the console
Arguments:
num_row
: (int, optional) number of rows to show (default 10)truncate
: (int, optional) If set to 0, it truncates the string. Any other number will not truncate the stringsvertical
: (bool, optional) If set to true, prints output rows vertically (one line per column value).
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];
let mut df = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths);
df.filter("age > 30")
.select(vec!["name"])
.show(Some(5), None, None)
.await?;
// print results
// +-------------+
// | show_string |
// +-------------+
// | +----+ |
// | |name| |
// | +----+ |
// | |Bob | |
// | +----+ |
// | |
// +-------------+
Ok(())
}
More examples
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=example_rs".to_string())
.build()
.await?;
let mut df =
spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`");
df.filter("salary > 3000").show(Some(5), None, None).await?;
// print results
// +-----------------+
// | show_string |
// +-----------------+
// | +------+------+ |
// | |name |salary| |
// | +------+------+ |
// | |Andy |4500 | |
// | |Justin|3500 | |
// | |Berta |4000 | |
// | +------+------+ |
// | |
// +-----------------+
Ok(())
}
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.selectExpr(vec!["id AS range_id"]);
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load(vec![path.to_string()]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}
sourcepub async fn tail(&mut self, limit: i32) -> Result<Vec<RecordBatch>, ArrowError>
pub async fn tail(&mut self, limit: i32) -> Result<Vec<RecordBatch>, ArrowError>
Returns the last n
rows as vector of [RecordBatch]
Running tail requires moving the data and results in an action
sourcepub fn write(self) -> DataFrameWriter
pub fn write(self) -> DataFrameWriter
Returns a DataFrameWriter struct based on the current DataFrame
Examples found in repository?
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::default().build().await?;
let df = spark
.clone()
.range(None, 1000, 1, Some(16))
.selectExpr(vec!["id AS range_id"]);
let path = "/opt/spark/examples/src/main/rust/employees/";
df.write()
.format("csv")
.option("header", "true")
.save(path)
.await?;
let mut df = spark
.clone()
.read()
.format("csv")
.option("header", "true")
.load(vec![path.to_string()]);
df.show(Some(10), None, None).await?;
// print results may slighty vary but should be close to the below
// +--------------------------+
// | show_string |
// +--------------------------+
// | +--------+ |
// | |range_id| |
// | +--------+ |
// | |312 | |
// | |313 | |
// | |314 | |
// | |315 | |
// | |316 | |
// | |317 | |
// | |318 | |
// | |319 | |
// | |320 | |
// | |321 | |
// | +--------+ |
// | only showing top 10 rows |
// | |
// +--------------------------+
Ok(())
}
Trait Implementations§
Auto Trait Implementations§
impl !RefUnwindSafe for DataFrame
impl Send for DataFrame
impl Sync for DataFrame
impl Unpin for DataFrame
impl !UnwindSafe for DataFrame
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request