fluxus_api/stream/
datastream.rs

1use crate::operators::{FilterOperator, FlatMapOperator, MapOperator};
2use fluxus_core::ParallelConfig;
3use fluxus_sinks::Sink;
4use fluxus_sources::Source;
5use fluxus_transformers::{
6    InnerOperator, InnerSource, Operator, TransformSource, TransformSourceWithOperator,
7};
8use fluxus_utils::{
9    models::{StreamError, StreamResult},
10    window::WindowConfig,
11};
12use std::sync::{
13    Arc,
14    atomic::{AtomicUsize, Ordering},
15};
16
17use super::WindowedStream;
18
19/// DataStream represents a stream of data elements
20pub struct DataStream<T> {
21    pub(crate) source: Arc<InnerSource<T>>,
22    pub(crate) operators: Vec<Arc<InnerOperator<T, T>>>,
23    pub(crate) parallel_config: Option<ParallelConfig>,
24}
25
26impl<T> DataStream<T>
27where
28    T: Clone + Send + Sync + 'static,
29{
30    /// Create a new DataStream from a source
31    pub fn new<S>(source: S) -> Self
32    where
33        S: Source<T> + Send + Sync + 'static,
34    {
35        Self {
36            source: Arc::new(source),
37            operators: Vec::new(),
38            parallel_config: None,
39        }
40    }
41
42    /// Set parallelism for the stream processing
43    pub fn parallel(mut self, parallelism: usize) -> Self {
44        self.parallel_config = Some(ParallelConfig {
45            parallelism,
46            buffer_size: 1024,
47            preserve_order: true,
48        });
49        self
50    }
51
52    /// Apply a map transformation
53    pub fn map<F, R>(self, f: F) -> DataStream<R>
54    where
55        F: Fn(T) -> R + Send + Sync + 'static,
56        R: Clone + Send + Sync + 'static,
57    {
58        let mapper = MapOperator::new(f);
59        self.transform(mapper)
60    }
61
62    /// Apply a filter transformation
63    pub fn filter<F>(mut self, f: F) -> Self
64    where
65        F: Fn(&T) -> bool + Send + Sync + 'static,
66    {
67        let filter = FilterOperator::new(f);
68        self.operators.push(Arc::new(filter));
69        self
70    }
71
72    /// Apply a flat map transformation
73    pub fn flat_map<F, R, I>(self, f: F) -> DataStream<R>
74    where
75        F: Fn(T) -> I + Send + Sync + 'static,
76        R: Clone + Send + Sync + 'static,
77        I: IntoIterator<Item = R> + Send + Sync + 'static,
78    {
79        self.transform(FlatMapOperator::new(f))
80    }
81
82    /// Apply a limit transformation that keeps the first n elements
83    pub fn limit(self, n: usize) -> Self {
84        let n = AtomicUsize::new(n);
85        self.filter(move |_| {
86            if n.load(Ordering::SeqCst) > 0 {
87                n.fetch_sub(1, Ordering::SeqCst);
88                true
89            } else {
90                false
91            }
92        })
93    }
94
95    /// Transform the stream using a custom operator
96    pub fn transform<O, R>(self, operator: O) -> DataStream<R>
97    where
98        O: Operator<T, R> + Send + Sync + 'static,
99        R: Clone + Send + Sync + 'static,
100    {
101        let source = TransformSourceWithOperator::new(self.source, operator, self.operators);
102        DataStream {
103            source: Arc::new(source),
104            operators: Vec::new(),
105            parallel_config: self.parallel_config,
106        }
107    }
108
109    /// Apply windowing to the stream
110    pub fn window(self, config: WindowConfig) -> WindowedStream<T> {
111        WindowedStream {
112            stream: self,
113            window_config: config,
114        }
115    }
116
117    /// Write the stream to a sink
118    pub async fn sink<K>(self, mut sink: K) -> StreamResult<()>
119    where
120        K: Sink<T> + Send + Sync + 'static,
121    {
122        let mut source = TransformSource::new(self.source);
123        source.set_operators(self.operators);
124
125        loop {
126            match source.next().await {
127                Ok(Some(record)) => sink.write(record).await?,
128                Ok(None) => break,
129                Err(e) => match e {
130                    StreamError::EOF => break,
131                    StreamError::Wait(ms) => {
132                        tokio::time::sleep(std::time::Duration::from_millis(ms)).await
133                    }
134                    _ => return Err(e),
135                },
136            }
137        }
138
139        sink.flush().await?;
140        sink.close().await
141    }
142}
143
144impl<T> DataStream<Vec<T>>
145where
146    T: Clone + Send + Sync + 'static,
147{
148    /// Flatten the stream
149    pub fn flatten(self) -> DataStream<T> {
150        self.transform(FlatMapOperator::new(|v| v))
151    }
152}