fluxus_api/stream/
datastream.rs1use crate::operators::{FilterOperator, FlatMapOperator, MapOperator};
2use fluxus_core::ParallelConfig;
3use fluxus_sinks::Sink;
4use fluxus_sources::Source;
5use fluxus_transformers::{
6 InnerOperator, InnerSource, Operator, TransformSource, TransformSourceWithOperator,
7};
8use fluxus_utils::{
9 models::{StreamError, StreamResult},
10 window::WindowConfig,
11};
12use std::sync::{
13 Arc,
14 atomic::{AtomicUsize, Ordering},
15};
16
17use super::WindowedStream;
18
19pub struct DataStream<T> {
21 pub(crate) source: Arc<InnerSource<T>>,
22 pub(crate) operators: Vec<Arc<InnerOperator<T, T>>>,
23 pub(crate) parallel_config: Option<ParallelConfig>,
24}
25
26impl<T> DataStream<T>
27where
28 T: Clone + Send + Sync + 'static,
29{
30 pub fn new<S>(source: S) -> Self
32 where
33 S: Source<T> + Send + Sync + 'static,
34 {
35 Self {
36 source: Arc::new(source),
37 operators: Vec::new(),
38 parallel_config: None,
39 }
40 }
41
42 pub fn parallel(mut self, parallelism: usize) -> Self {
44 self.parallel_config = Some(ParallelConfig {
45 parallelism,
46 buffer_size: 1024,
47 preserve_order: true,
48 });
49 self
50 }
51
52 pub fn map<F, R>(self, f: F) -> DataStream<R>
54 where
55 F: Fn(T) -> R + Send + Sync + 'static,
56 R: Clone + Send + Sync + 'static,
57 {
58 let mapper = MapOperator::new(f);
59 self.transform(mapper)
60 }
61
62 pub fn filter<F>(mut self, f: F) -> Self
64 where
65 F: Fn(&T) -> bool + Send + Sync + 'static,
66 {
67 let filter = FilterOperator::new(f);
68 self.operators.push(Arc::new(filter));
69 self
70 }
71
72 pub fn flat_map<F, R, I>(self, f: F) -> DataStream<R>
74 where
75 F: Fn(T) -> I + Send + Sync + 'static,
76 R: Clone + Send + Sync + 'static,
77 I: IntoIterator<Item = R> + Send + Sync + 'static,
78 {
79 self.transform(FlatMapOperator::new(f))
80 }
81
82 pub fn limit(self, n: usize) -> Self {
84 let n = AtomicUsize::new(n);
85 self.filter(move |_| {
86 if n.load(Ordering::SeqCst) > 0 {
87 n.fetch_sub(1, Ordering::SeqCst);
88 true
89 } else {
90 false
91 }
92 })
93 }
94
95 pub fn transform<O, R>(self, operator: O) -> DataStream<R>
97 where
98 O: Operator<T, R> + Send + Sync + 'static,
99 R: Clone + Send + Sync + 'static,
100 {
101 let source = TransformSourceWithOperator::new(self.source, operator, self.operators);
102 DataStream {
103 source: Arc::new(source),
104 operators: Vec::new(),
105 parallel_config: self.parallel_config,
106 }
107 }
108
109 pub fn window(self, config: WindowConfig) -> WindowedStream<T> {
111 WindowedStream {
112 stream: self,
113 window_config: config,
114 }
115 }
116
117 pub async fn sink<K>(self, mut sink: K) -> StreamResult<()>
119 where
120 K: Sink<T> + Send + Sync + 'static,
121 {
122 let mut source = TransformSource::new(self.source);
123 source.set_operators(self.operators);
124
125 loop {
126 match source.next().await {
127 Ok(Some(record)) => sink.write(record).await?,
128 Ok(None) => break,
129 Err(e) => match e {
130 StreamError::EOF => break,
131 StreamError::Wait(ms) => {
132 tokio::time::sleep(std::time::Duration::from_millis(ms)).await
133 }
134 _ => return Err(e),
135 },
136 }
137 }
138
139 sink.flush().await?;
140 sink.close().await
141 }
142}
143
144impl<T> DataStream<Vec<T>>
145where
146 T: Clone + Send + Sync + 'static,
147{
148 pub fn flatten(self) -> DataStream<T> {
150 self.transform(FlatMapOperator::new(|v| v))
151 }
152}