fluxus_api/stream/
datastream.rs

1use crate::operators::{FilterOperator, MapOperator};
2use fluxus_core::ParallelConfig;
3use fluxus_sinks::Sink;
4use fluxus_sources::Source;
5use fluxus_transformers::{
6    InnerOperator, InnerSource, Operator, TransformSource, TransformSourceWithOperator,
7};
8use fluxus_utils::{models::StreamResult, window::WindowConfig};
9use std::sync::Arc;
10
11use super::WindowedStream;
12
13/// DataStream represents a stream of data elements
14pub struct DataStream<T> {
15    pub(crate) source: Arc<InnerSource<T>>,
16    pub(crate) operators: Vec<Arc<InnerOperator<T, T>>>,
17    pub(crate) parallel_config: Option<ParallelConfig>,
18}
19
20impl<T> DataStream<T>
21where
22    T: Clone + Send + Sync + 'static,
23{
24    /// Create a new DataStream from a source
25    pub fn new<S>(source: S) -> Self
26    where
27        S: Source<T> + Send + Sync + 'static,
28    {
29        Self {
30            source: Arc::new(source),
31            operators: Vec::new(),
32            parallel_config: None,
33        }
34    }
35
36    /// Set parallelism for the stream processing
37    pub fn parallel(mut self, parallelism: usize) -> Self {
38        self.parallel_config = Some(ParallelConfig {
39            parallelism,
40            buffer_size: 1024,
41            preserve_order: true,
42        });
43        self
44    }
45
46    /// Apply a map transformation
47    pub fn map<F, R>(self, f: F) -> DataStream<R>
48    where
49        F: Fn(T) -> R + Send + Sync + 'static,
50        R: Clone + Send + Sync + 'static,
51    {
52        let mapper = MapOperator::new(f);
53        self.transform(mapper)
54    }
55
56    /// Apply a filter transformation
57    pub fn filter<F>(mut self, f: F) -> Self
58    where
59        F: Fn(&T) -> bool + Send + Sync + 'static,
60    {
61        let filter = FilterOperator::new(f);
62        self.operators.push(Arc::new(filter));
63        self
64    }
65
66    /// Transform the stream using a custom operator
67    pub fn transform<O, R>(self, operator: O) -> DataStream<R>
68    where
69        O: Operator<T, R> + Send + Sync + 'static,
70        R: Clone + Send + Sync + 'static,
71    {
72        let source = TransformSourceWithOperator::new(self.source, operator);
73        DataStream {
74            source: Arc::new(source),
75            operators: Vec::new(),
76            parallel_config: self.parallel_config,
77        }
78    }
79
80    /// Apply windowing to the stream
81    pub fn window(self, config: WindowConfig) -> WindowedStream<T> {
82        WindowedStream {
83            stream: self,
84            window_config: config,
85        }
86    }
87
88    /// Write the stream to a sink
89    pub async fn sink<K>(self, mut sink: K) -> StreamResult<()>
90    where
91        K: Sink<T> + Send + Sync + 'static,
92    {
93        let mut source = TransformSource::new(self.source);
94        source.set_operators(self.operators);
95
96        while let Some(record) = source.next().await? {
97            sink.write(record).await?;
98        }
99
100        sink.flush().await?;
101        sink.close().await
102    }
103}