batch_processing/sync/step/complex_step/
mod.rs

1use crate::sync::step::{DeciderCallback, SyncStep};
2use crate::sync::step::step_builder::StepBuilderTrait;
3
4/// A trait for building complex synchronous steps.
5pub trait ComplexStepBuilderTrait<I: Sized, O: Sized> {
6    /// Sets the reader function for the step.
7    ///
8    /// # Arguments
9    ///
10    /// * `reader` - The reader function.
11    ///
12    /// # Returns `Self`
13    ///
14    /// Returns a modified builder instance.
15    fn reader(self, reader: Box<dyn Fn() -> Box<dyn Iterator<Item=I>> + Send>) -> Self;
16
17    /// Sets the processor function for the step.
18    ///
19    /// # Arguments
20    ///
21    /// * `processor` - The processor function.
22    ///
23    /// # Returns `Self`
24    ///
25    /// Returns a modified builder instance.
26    fn processor(self, processor: Box<dyn Fn() -> Box<dyn Fn(I) -> O> + Send>) -> Self;
27
28    /// Sets the writer function for the step.
29    ///
30    /// # Arguments
31    ///
32    /// * `writer` - The writer function.
33    ///
34    /// # Returns `Self`
35    ///
36    /// Returns a modified builder instance.
37    fn writer(self, writer: Box<dyn Fn() -> Box<dyn Fn(&Vec<O>) -> ()> + Send>) -> Self;
38
39    /// Sets the chunk size for processing data in chunks.
40    ///
41    /// # Arguments
42    ///
43    /// * `chunk_size` - The chunk size.
44    ///
45    /// # Returns `Self`
46    ///
47    /// Returns a modified builder instance.
48    fn chunk_size(self, chunk_size: usize) -> Self;
49}
50
51/// The default chunk size for processing data in chunks.
52const DEFAULT_CHUNK_SIZE: usize = 1000;
53
54impl<I: Sized + 'static, O: Sized + 'static> ComplexStepBuilderTrait<I, O> for ComplexStepBuilder<I, O> {
55    fn reader(self, reader: Box<dyn Fn() -> Box<dyn Iterator<Item=I>> + Send>) -> Self {
56        ComplexStepBuilder {
57            reader: Some(reader),
58            ..self
59        }
60    }
61
62    fn processor(self, processor: Box<dyn Fn() -> Box<dyn Fn(I) -> O> + Send>) -> Self {
63        ComplexStepBuilder {
64            processor: Some(processor),
65            ..self
66        }
67    }
68
69    fn writer(self, writer: Box<dyn Fn() -> Box<dyn Fn(&Vec<O>) -> ()> + Send>) -> Self {
70        ComplexStepBuilder {
71            writer: Some(writer),
72            ..self
73        }
74    }
75
76    fn chunk_size(self, chunk_size: usize) -> Self {
77        ComplexStepBuilder {
78            chunk_size: Some(chunk_size),
79            ..self
80        }
81    }
82}
83
84/// A builder struct for constructing complex synchronous steps.
85pub struct ComplexStepBuilder<I: Sized, O: Sized> {
86    /// The reader function for the step.
87    reader: Option<Box<dyn Fn() -> Box<dyn Iterator<Item=I>> + Send>>,
88    /// The processor function for the step.
89    processor: Option<Box<dyn Fn() -> Box<dyn Fn(I) -> O> + Send>>,
90    /// The writer function for the step.
91    writer: Option<Box<dyn Fn() -> Box<dyn Fn(&Vec<O>) -> ()> + Send>>,
92    /// The chunk size for processing data in chunks.
93    chunk_size: Option<usize>,
94    /// The synchronous step being constructed.
95    step: SyncStep,
96}
97
98impl<I: Sized + 'static, O: Sized + 'static> StepBuilderTrait for ComplexStepBuilder<I, O> where Self: Sized {
99    /// Sets the decider callback for the step.
100    ///
101    /// # Arguments
102    ///
103    /// * `decider` - The decider callback function.
104    ///
105    /// # Returns `Self`
106    ///
107    /// Returns a modified builder instance.
108    fn decider(self, decider: DeciderCallback) -> Self {
109        ComplexStepBuilder {
110            step: SyncStep {
111                decider: Some(decider),
112                ..self.step
113            },
114            ..self
115        }
116    }
117
118    /// Configures the step to be tolerant to thrown exceptions.
119    ///
120    /// # Returns `Self`
121    ///
122    /// Returns a modified builder instance.
123    fn throw_tolerant(self) -> Self {
124        ComplexStepBuilder {
125            step: SyncStep {
126                throw_tolerant: Some(true),
127                ..self.step
128            },
129            ..self
130        }
131    }
132
133    /// Initializes a new builder instance with the given name.
134    ///
135    /// # Arguments
136    ///
137    /// * `name` - The name of the step.
138    ///
139    /// # Returns `Self`
140    ///
141    /// Returns a new builder instance.
142    #[inline]
143    fn get(name: String) -> Self {
144        ComplexStepBuilder {
145            reader: None,
146            processor: None,
147            writer: None,
148            chunk_size: None,
149            step: SyncStep {
150                name,
151                callback: None,
152                decider: None,
153                end_time: None,
154                start_time: None,
155                throw_tolerant: None,
156            },
157        }
158    }
159
160    /// Validates the builder configuration.
161    ///
162    /// # Returns `Self`
163    ///
164    /// Returns a modified builder instance if validation succeeds.
165    fn validate(self) -> Self {
166        if self.step.name.is_empty() {
167            panic!("Name is required");
168        }
169
170        if self.reader.is_none() {
171            panic!("Reader is required");
172        }
173
174        if self.processor.is_none() {
175            panic!("Processor is required");
176        }
177
178        if self.writer.is_none() {
179            panic!("Writer is required");
180        }
181
182        return self;
183    }
184
185    /// Builds and returns the configured synchronous step.
186    ///
187    /// # Returns `SyncStep`
188    ///
189    /// Returns the configured synchronous step.
190    fn build(self) -> SyncStep {
191        let mut current_self = self.validate();
192
193        current_self.step.callback = Some(Box::new(move || {
194            let reader = current_self.reader.unwrap();
195            let processor = current_self.processor.unwrap().as_mut()();
196            let writer = current_self.writer.unwrap().as_mut()();
197            let chunk_size = current_self.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
198            let mut vec = Vec::with_capacity(chunk_size);
199
200            for chunk in reader() {
201                vec.push(processor(chunk));
202
203                if vec.len() == chunk_size {
204                    writer(&vec);
205                    vec.clear();
206                }
207            }
208
209            if !vec.is_empty() {
210                writer(&vec);
211            }
212        }));
213
214        return current_self.step;
215    }
216}
217
218/// Initializes a new complex step builder with the given name.
219///
220/// # Arguments
221///
222/// * `name` - The name of the step.
223///
224/// # Returns `ComplexStepBuilder`
225///
226/// Returns a new complex step builder instance.
227pub fn get<I: Sized + 'static, O: Sized + 'static>(name: String) -> ComplexStepBuilder<I, O> {
228    ComplexStepBuilder::get(name)
229}