fluxus_api/stream/
datastream.rs1use crate::{
2 InnerOperator, InnerSource,
3 operators::{FilterOperator, MapOperator},
4 source::{TransformSource, TransformSourceWithOperator},
5};
6use fluxus_core::{Operator, ParallelConfig, Sink, Source, StreamResult, WindowConfig};
7use std::sync::Arc;
8
9use super::WindowedStream;
10
11pub struct DataStream<T> {
13 pub(crate) source: Arc<InnerSource<T>>,
14 pub(crate) operators: Vec<Arc<InnerOperator<T, T>>>,
15 pub(crate) parallel_config: Option<ParallelConfig>,
16}
17
18impl<T> DataStream<T>
19where
20 T: Clone + Send + Sync + 'static,
21{
22 pub fn new<S>(source: S) -> Self
24 where
25 S: Source<T> + Send + Sync + 'static,
26 {
27 Self {
28 source: Arc::new(source),
29 operators: Vec::new(),
30 parallel_config: None,
31 }
32 }
33
34 pub fn parallel(mut self, parallelism: usize) -> Self {
36 self.parallel_config = Some(ParallelConfig {
37 parallelism,
38 buffer_size: 1024,
39 preserve_order: true,
40 });
41 self
42 }
43
44 pub fn map<F, R>(self, f: F) -> DataStream<R>
46 where
47 F: Fn(T) -> R + Send + Sync + 'static,
48 R: Clone + Send + Sync + 'static,
49 {
50 let mapper = MapOperator::new(f);
51 self.transform(mapper)
52 }
53
54 pub fn filter<F>(mut self, f: F) -> Self
56 where
57 F: Fn(&T) -> bool + Send + Sync + 'static,
58 {
59 let filter = FilterOperator::new(f);
60 self.operators.push(Arc::new(filter));
61 self
62 }
63
64 pub fn transform<O, R>(self, operator: O) -> DataStream<R>
66 where
67 O: Operator<T, R> + Send + Sync + 'static,
68 R: Clone + Send + Sync + 'static,
69 {
70 let source = TransformSourceWithOperator::new(self.source, operator);
71 DataStream {
72 source: Arc::new(source),
73 operators: Vec::new(),
74 parallel_config: self.parallel_config,
75 }
76 }
77
78 pub fn window(self, config: WindowConfig) -> WindowedStream<T> {
80 WindowedStream {
81 stream: self,
82 window_config: config,
83 }
84 }
85
86 pub async fn sink<K>(self, mut sink: K) -> StreamResult<()>
88 where
89 K: Sink<T> + Send + Sync + 'static,
90 {
91 let mut source = TransformSource::new(self.source);
92 source.set_operators(self.operators);
93
94 while let Some(record) = source.next().await? {
95 sink.write(record).await?;
96 }
97
98 sink.flush().await?;
99 sink.close().await
100 }
101}