batch_processing/sync/step/complex_step/
mod.rs1use crate::sync::step::{DeciderCallback, SyncStep};
2use crate::sync::step::step_builder::StepBuilderTrait;
3
4pub trait ComplexStepBuilderTrait<I: Sized, O: Sized> {
6 fn reader(self, reader: Box<dyn Fn() -> Box<dyn Iterator<Item=I>> + Send>) -> Self;
16
17 fn processor(self, processor: Box<dyn Fn() -> Box<dyn Fn(I) -> O> + Send>) -> Self;
27
28 fn writer(self, writer: Box<dyn Fn() -> Box<dyn Fn(&Vec<O>) -> ()> + Send>) -> Self;
38
39 fn chunk_size(self, chunk_size: usize) -> Self;
49}
50
51const DEFAULT_CHUNK_SIZE: usize = 1000;
53
54impl<I: Sized + 'static, O: Sized + 'static> ComplexStepBuilderTrait<I, O> for ComplexStepBuilder<I, O> {
55 fn reader(self, reader: Box<dyn Fn() -> Box<dyn Iterator<Item=I>> + Send>) -> Self {
56 ComplexStepBuilder {
57 reader: Some(reader),
58 ..self
59 }
60 }
61
62 fn processor(self, processor: Box<dyn Fn() -> Box<dyn Fn(I) -> O> + Send>) -> Self {
63 ComplexStepBuilder {
64 processor: Some(processor),
65 ..self
66 }
67 }
68
69 fn writer(self, writer: Box<dyn Fn() -> Box<dyn Fn(&Vec<O>) -> ()> + Send>) -> Self {
70 ComplexStepBuilder {
71 writer: Some(writer),
72 ..self
73 }
74 }
75
76 fn chunk_size(self, chunk_size: usize) -> Self {
77 ComplexStepBuilder {
78 chunk_size: Some(chunk_size),
79 ..self
80 }
81 }
82}
83
84pub struct ComplexStepBuilder<I: Sized, O: Sized> {
86 reader: Option<Box<dyn Fn() -> Box<dyn Iterator<Item=I>> + Send>>,
88 processor: Option<Box<dyn Fn() -> Box<dyn Fn(I) -> O> + Send>>,
90 writer: Option<Box<dyn Fn() -> Box<dyn Fn(&Vec<O>) -> ()> + Send>>,
92 chunk_size: Option<usize>,
94 step: SyncStep,
96}
97
98impl<I: Sized + 'static, O: Sized + 'static> StepBuilderTrait for ComplexStepBuilder<I, O> where Self: Sized {
99 fn decider(self, decider: DeciderCallback) -> Self {
109 ComplexStepBuilder {
110 step: SyncStep {
111 decider: Some(decider),
112 ..self.step
113 },
114 ..self
115 }
116 }
117
118 fn throw_tolerant(self) -> Self {
124 ComplexStepBuilder {
125 step: SyncStep {
126 throw_tolerant: Some(true),
127 ..self.step
128 },
129 ..self
130 }
131 }
132
133 #[inline]
143 fn get(name: String) -> Self {
144 ComplexStepBuilder {
145 reader: None,
146 processor: None,
147 writer: None,
148 chunk_size: None,
149 step: SyncStep {
150 name,
151 callback: None,
152 decider: None,
153 end_time: None,
154 start_time: None,
155 throw_tolerant: None,
156 },
157 }
158 }
159
160 fn validate(self) -> Self {
166 if self.step.name.is_empty() {
167 panic!("Name is required");
168 }
169
170 if self.reader.is_none() {
171 panic!("Reader is required");
172 }
173
174 if self.processor.is_none() {
175 panic!("Processor is required");
176 }
177
178 if self.writer.is_none() {
179 panic!("Writer is required");
180 }
181
182 return self;
183 }
184
185 fn build(self) -> SyncStep {
191 let mut current_self = self.validate();
192
193 current_self.step.callback = Some(Box::new(move || {
194 let reader = current_self.reader.unwrap();
195 let processor = current_self.processor.unwrap().as_mut()();
196 let writer = current_self.writer.unwrap().as_mut()();
197 let chunk_size = current_self.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
198 let mut vec = Vec::with_capacity(chunk_size);
199
200 for chunk in reader() {
201 vec.push(processor(chunk));
202
203 if vec.len() == chunk_size {
204 writer(&vec);
205 vec.clear();
206 }
207 }
208
209 if !vec.is_empty() {
210 writer(&vec);
211 }
212 }));
213
214 return current_self.step;
215 }
216}
217
218pub fn get<I: Sized + 'static, O: Sized + 'static>(name: String) -> ComplexStepBuilder<I, O> {
228 ComplexStepBuilder::get(name)
229}