fluxus_api/stream/
datastream.rs1use crate::operators::{FilterOperator, MapOperator};
2use fluxus_core::ParallelConfig;
3use fluxus_sinks::Sink;
4use fluxus_sources::Source;
5use fluxus_transformers::{
6 InnerOperator, InnerSource, Operator, TransformSource, TransformSourceWithOperator,
7};
8use fluxus_utils::{models::StreamResult, window::WindowConfig};
9use std::sync::Arc;
10
11use super::WindowedStream;
12
13pub struct DataStream<T> {
15 pub(crate) source: Arc<InnerSource<T>>,
16 pub(crate) operators: Vec<Arc<InnerOperator<T, T>>>,
17 pub(crate) parallel_config: Option<ParallelConfig>,
18}
19
20impl<T> DataStream<T>
21where
22 T: Clone + Send + Sync + 'static,
23{
24 pub fn new<S>(source: S) -> Self
26 where
27 S: Source<T> + Send + Sync + 'static,
28 {
29 Self {
30 source: Arc::new(source),
31 operators: Vec::new(),
32 parallel_config: None,
33 }
34 }
35
36 pub fn parallel(mut self, parallelism: usize) -> Self {
38 self.parallel_config = Some(ParallelConfig {
39 parallelism,
40 buffer_size: 1024,
41 preserve_order: true,
42 });
43 self
44 }
45
46 pub fn map<F, R>(self, f: F) -> DataStream<R>
48 where
49 F: Fn(T) -> R + Send + Sync + 'static,
50 R: Clone + Send + Sync + 'static,
51 {
52 let mapper = MapOperator::new(f);
53 self.transform(mapper)
54 }
55
56 pub fn filter<F>(mut self, f: F) -> Self
58 where
59 F: Fn(&T) -> bool + Send + Sync + 'static,
60 {
61 let filter = FilterOperator::new(f);
62 self.operators.push(Arc::new(filter));
63 self
64 }
65
66 pub fn transform<O, R>(self, operator: O) -> DataStream<R>
68 where
69 O: Operator<T, R> + Send + Sync + 'static,
70 R: Clone + Send + Sync + 'static,
71 {
72 let source = TransformSourceWithOperator::new(self.source, operator);
73 DataStream {
74 source: Arc::new(source),
75 operators: Vec::new(),
76 parallel_config: self.parallel_config,
77 }
78 }
79
80 pub fn window(self, config: WindowConfig) -> WindowedStream<T> {
82 WindowedStream {
83 stream: self,
84 window_config: config,
85 }
86 }
87
88 pub async fn sink<K>(self, mut sink: K) -> StreamResult<()>
90 where
91 K: Sink<T> + Send + Sync + 'static,
92 {
93 let mut source = TransformSource::new(self.source);
94 source.set_operators(self.operators);
95
96 while let Some(record) = source.next().await? {
97 sink.write(record).await?;
98 }
99
100 sink.flush().await?;
101 sink.close().await
102 }
103}