fluxus_api/stream/
datastream.rs

1use crate::{
2    InnerOperator, InnerSource,
3    operators::{FilterOperator, MapOperator},
4    source::{TransformSource, TransformSourceWithOperator},
5};
6use fluxus_core::{Operator, ParallelConfig, Sink, Source, StreamResult, WindowConfig};
7use std::sync::Arc;
8
9use super::WindowedStream;
10
11/// DataStream represents a stream of data elements
12pub struct DataStream<T> {
13    pub(crate) source: Arc<InnerSource<T>>,
14    pub(crate) operators: Vec<Arc<InnerOperator<T, T>>>,
15    pub(crate) parallel_config: Option<ParallelConfig>,
16}
17
18impl<T> DataStream<T>
19where
20    T: Clone + Send + Sync + 'static,
21{
22    /// Create a new DataStream from a source
23    pub fn new<S>(source: S) -> Self
24    where
25        S: Source<T> + Send + Sync + 'static,
26    {
27        Self {
28            source: Arc::new(source),
29            operators: Vec::new(),
30            parallel_config: None,
31        }
32    }
33
34    /// Set parallelism for the stream processing
35    pub fn parallel(mut self, parallelism: usize) -> Self {
36        self.parallel_config = Some(ParallelConfig {
37            parallelism,
38            buffer_size: 1024,
39            preserve_order: true,
40        });
41        self
42    }
43
44    /// Apply a map transformation
45    pub fn map<F, R>(self, f: F) -> DataStream<R>
46    where
47        F: Fn(T) -> R + Send + Sync + 'static,
48        R: Clone + Send + Sync + 'static,
49    {
50        let mapper = MapOperator::new(f);
51        self.transform(mapper)
52    }
53
54    /// Apply a filter transformation
55    pub fn filter<F>(mut self, f: F) -> Self
56    where
57        F: Fn(&T) -> bool + Send + Sync + 'static,
58    {
59        let filter = FilterOperator::new(f);
60        self.operators.push(Arc::new(filter));
61        self
62    }
63
64    /// Transform the stream using a custom operator
65    pub fn transform<O, R>(self, operator: O) -> DataStream<R>
66    where
67        O: Operator<T, R> + Send + Sync + 'static,
68        R: Clone + Send + Sync + 'static,
69    {
70        let source = TransformSourceWithOperator::new(self.source, operator);
71        DataStream {
72            source: Arc::new(source),
73            operators: Vec::new(),
74            parallel_config: self.parallel_config,
75        }
76    }
77
78    /// Apply windowing to the stream
79    pub fn window(self, config: WindowConfig) -> WindowedStream<T> {
80        WindowedStream {
81            stream: self,
82            window_config: config,
83        }
84    }
85
86    /// Write the stream to a sink
87    pub async fn sink<K>(self, mut sink: K) -> StreamResult<()>
88    where
89        K: Sink<T> + Send + Sync + 'static,
90    {
91        let mut source = TransformSource::new(self.source);
92        source.set_operators(self.operators);
93
94        while let Some(record) = source.next().await? {
95            sink.write(record).await?;
96        }
97
98        sink.flush().await?;
99        sink.close().await
100    }
101}