Struct spark_connect_rs::dataframe::DataStreamWriter
source · pub struct DataStreamWriter { /* private fields */ }
Expand description
DataStreamWriter provides the ability to output a StreamingQuery which can then be used to monitor the active stream
Implementations§
source§impl DataStreamWriter
impl DataStreamWriter
sourcepub fn new(dataframe: DataFrame) -> DataStreamWriter
pub fn new(dataframe: DataFrame) -> DataStreamWriter
Create a new DataStreamWriter from a provided streaming DataFrame
§Defaults
format
: None,output_mode
: OutputMode,query_name
: None,trigger
: Trigger,partition_by
: vec![],write_options
: HashMap::new()
sourcepub fn format(self, format: &str) -> DataStreamWriter
pub fn format(self, format: &str) -> DataStreamWriter
Target format to output the StreamingQuery
Examples found in repository?
examples/readstream.rs (line 24)
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;
let df = spark
.readStream()
.format("rate")
.option("rowsPerSecond", "5")
.load(None)?;
let query = df
.writeStream()
.format("console")
.queryName("example_stream")
.outputMode(OutputMode::Append)
.trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
.start(None)
.await?;
// loop to get multiple progression stats
for _ in 1..5 {
thread::sleep(time::Duration::from_secs(5));
let val = &query.clone().lastProgress().await?;
println!("{}", val);
}
// stop the active stream
query.stop().await?;
Ok(())
}
sourcepub fn outputMode(self, outputMode: OutputMode) -> DataStreamWriter
pub fn outputMode(self, outputMode: OutputMode) -> DataStreamWriter
Examples found in repository?
examples/readstream.rs (line 26)
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;
let df = spark
.readStream()
.format("rate")
.option("rowsPerSecond", "5")
.load(None)?;
let query = df
.writeStream()
.format("console")
.queryName("example_stream")
.outputMode(OutputMode::Append)
.trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
.start(None)
.await?;
// loop to get multiple progression stats
for _ in 1..5 {
thread::sleep(time::Duration::from_secs(5));
let val = &query.clone().lastProgress().await?;
println!("{}", val);
}
// stop the active stream
query.stop().await?;
Ok(())
}
sourcepub fn partitionBy<'a, I>(self, cols: I) -> DataStreamWriterwhere
I: IntoIterator<Item = &'a str>,
pub fn partitionBy<'a, I>(self, cols: I) -> DataStreamWriterwhere
I: IntoIterator<Item = &'a str>,
Partitions the output by the given columns on the file system
sourcepub fn option(self, key: &str, value: &str) -> DataStreamWriter
pub fn option(self, key: &str, value: &str) -> DataStreamWriter
Add an input option for the underlying data source
sourcepub fn options<I, K, V>(self, options: I) -> DataStreamWriter
pub fn options<I, K, V>(self, options: I) -> DataStreamWriter
Set many input options based on an iterator of (key/value pairs) for the underlying data source
sourcepub fn queryName(self, name: &str) -> DataStreamWriter
pub fn queryName(self, name: &str) -> DataStreamWriter
Examples found in repository?
examples/readstream.rs (line 25)
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;
let df = spark
.readStream()
.format("rate")
.option("rowsPerSecond", "5")
.load(None)?;
let query = df
.writeStream()
.format("console")
.queryName("example_stream")
.outputMode(OutputMode::Append)
.trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
.start(None)
.await?;
// loop to get multiple progression stats
for _ in 1..5 {
thread::sleep(time::Duration::from_secs(5));
let val = &query.clone().lastProgress().await?;
println!("{}", val);
}
// stop the active stream
query.stop().await?;
Ok(())
}
sourcepub fn trigger(self, trigger: Trigger) -> DataStreamWriter
pub fn trigger(self, trigger: Trigger) -> DataStreamWriter
Query trigger for data to be processed by
Examples found in repository?
examples/readstream.rs (line 27)
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;
let df = spark
.readStream()
.format("rate")
.option("rowsPerSecond", "5")
.load(None)?;
let query = df
.writeStream()
.format("console")
.queryName("example_stream")
.outputMode(OutputMode::Append)
.trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
.start(None)
.await?;
// loop to get multiple progression stats
for _ in 1..5 {
thread::sleep(time::Duration::from_secs(5));
let val = &query.clone().lastProgress().await?;
println!("{}", val);
}
// stop the active stream
query.stop().await?;
Ok(())
}
sourcepub async fn start(
self,
path: Option<&str>
) -> Result<StreamingQuery, SparkError>
pub async fn start( self, path: Option<&str> ) -> Result<StreamingQuery, SparkError>
Start a streaming job to save the contents of the StreamingQuery to a data source.
The data source is specified by the format
and a set of options
.
Examples found in repository?
examples/readstream.rs (line 28)
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession =
SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
.build()
.await?;
let df = spark
.readStream()
.format("rate")
.option("rowsPerSecond", "5")
.load(None)?;
let query = df
.writeStream()
.format("console")
.queryName("example_stream")
.outputMode(OutputMode::Append)
.trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
.start(None)
.await?;
// loop to get multiple progression stats
for _ in 1..5 {
thread::sleep(time::Duration::from_secs(5));
let val = &query.clone().lastProgress().await?;
println!("{}", val);
}
// stop the active stream
query.stop().await?;
Ok(())
}
sourcepub async fn toTable(
self,
tableName: &str
) -> Result<StreamingQuery, SparkError>
pub async fn toTable( self, tableName: &str ) -> Result<StreamingQuery, SparkError>
Start a streaming job to save the contents of the StreamingQuery to a table.
Trait Implementations§
source§impl Clone for DataStreamWriter
impl Clone for DataStreamWriter
source§fn clone(&self) -> DataStreamWriter
fn clone(&self) -> DataStreamWriter
Returns a copy of the value. Read more
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
Performs copy-assignment from
source
. Read moreAuto Trait Implementations§
impl Freeze for DataStreamWriter
impl !RefUnwindSafe for DataStreamWriter
impl Send for DataStreamWriter
impl Sync for DataStreamWriter
impl Unpin for DataStreamWriter
impl !UnwindSafe for DataStreamWriter
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message
T
in a tonic::Request