arcon/dataflow/stream/
filter.rs

1use crate::{
2    data::ArconType,
3    dataflow::{
4        builder::OperatorBuilder,
5        stream::{OperatorExt, Stream},
6    },
7    index::EmptyState,
8    stream::operator::function,
9    util::ArconFnBounds,
10};
11use std::sync::Arc;
12
13/// Extension trait for filter operations
14pub trait FilterExt<T: ArconType> {
15    /// Filter out records based on the given predicate
16    ///
17    /// # Example
18    /// ```rust
19    /// use arcon::prelude::*;
20    /// let stream: Stream<u64> = (0..100)
21    ///     .to_stream(|conf| {
22    ///         conf.set_arcon_time(ArconTime::Process);
23    ///     })
24    ///     .filter(|x| x < &50);
25    /// ```
26    fn filter<F: Fn(&T) -> bool + ArconFnBounds>(self, f: F) -> Self;
27}
28
29impl<T: ArconType> FilterExt<T> for Stream<T> {
30    #[must_use]
31    fn filter<F: Fn(&T) -> bool + ArconFnBounds>(self, f: F) -> Self {
32        self.operator(OperatorBuilder {
33            operator: Arc::new(move || function::Filter::new(f.clone())),
34            state: Arc::new(|_| EmptyState),
35            conf: Default::default(),
36        })
37    }
38}