Struct spark_connect_rs::dataframe::DataStreamReader

source ·
pub struct DataStreamReader { /* private fields */ }
Expand description

DataStreamReader represents the entrypoint to create a streaming DataFrame

Implementations§

source§

impl DataStreamReader

source

pub fn new(spark_session: SparkSession) -> DataStreamReader

source

pub fn format(self, format: &str) -> DataStreamReader

Specifies the input data source format

Examples found in repository?
examples/readstream.rs (line 18)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
            .build()
            .await?;

    let df = spark
        .readStream()
        .format("rate")
        .option("rowsPerSecond", "5")
        .load(None)?;

    let query = df
        .writeStream()
        .format("console")
        .queryName("example_stream")
        .outputMode(OutputMode::Append)
        .trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
        .start(None)
        .await?;

    // loop to get multiple progression stats
    for _ in 1..5 {
        thread::sleep(time::Duration::from_secs(5));
        let val = &query.clone().lastProgress().await?;
        println!("{}", val);
    }

    // stop the active stream
    query.stop().await?;

    Ok(())
}
source

pub fn schema(self, schema: &str) -> DataStreamReader

Schema of the stream in DDL format (e.g. "name string, age int")

source

pub fn option(self, key: &str, value: &str) -> DataStreamReader

Add an input option for the underlying data source

Examples found in repository?
examples/readstream.rs (line 19)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
            .build()
            .await?;

    let df = spark
        .readStream()
        .format("rate")
        .option("rowsPerSecond", "5")
        .load(None)?;

    let query = df
        .writeStream()
        .format("console")
        .queryName("example_stream")
        .outputMode(OutputMode::Append)
        .trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
        .start(None)
        .await?;

    // loop to get multiple progression stats
    for _ in 1..5 {
        thread::sleep(time::Duration::from_secs(5));
        let val = &query.clone().lastProgress().await?;
        println!("{}", val);
    }

    // stop the active stream
    query.stop().await?;

    Ok(())
}
source

pub fn options<I, K, V>(self, options: I) -> DataStreamReader
where I: IntoIterator<Item = (K, V)>, K: AsRef<str>, V: AsRef<str>,

Set many input options based on an iterator of (key/value pairs) for the underlying data source

source

pub fn load(self, path: Option<&str>) -> Result<DataFrame, SparkError>

Examples found in repository?
examples/readstream.rs (line 20)
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let spark: SparkSession =
        SparkSessionBuilder::remote("sc://127.0.0.1:15002/;user_id=stream_example")
            .build()
            .await?;

    let df = spark
        .readStream()
        .format("rate")
        .option("rowsPerSecond", "5")
        .load(None)?;

    let query = df
        .writeStream()
        .format("console")
        .queryName("example_stream")
        .outputMode(OutputMode::Append)
        .trigger(Trigger::ProcessingTimeInterval("1 seconds".to_string()))
        .start(None)
        .await?;

    // loop to get multiple progression stats
    for _ in 1..5 {
        thread::sleep(time::Duration::from_secs(5));
        let val = &query.clone().lastProgress().await?;
        println!("{}", val);
    }

    // stop the active stream
    query.stop().await?;

    Ok(())
}

Trait Implementations§

source§

impl Clone for DataStreamReader

source§

fn clone(&self) -> DataStreamReader

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl Debug for DataStreamReader

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FromRef<T> for T
where T: Clone,

source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T> ToOwned for T
where T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more