spring_batch_rs/core/
step.rs

1use crate::BatchError;
2use log::{debug, info, warn};
3use serde::{de::DeserializeOwned, Serialize};
4use std::{
5    cell::Cell,
6    time::{Duration, Instant},
7};
8use uuid::Uuid;
9
10use super::{
11    build_name,
12    item::{DefaultProcessor, ItemProcessor, ItemReader, ItemWriter},
13};
14
15type StepResult<T> = Result<T, T>;
16
17type ChunkResult<T> = Result<T, BatchError>;
18
19pub trait Step {
20    /// Executes the step.
21    ///
22    /// Returns a `StepResult` containing the execution details if the step is successful,
23    /// or an error if the step fails.
24    fn execute(&self) -> StepResult<StepExecution>;
25
26    /// Gets the status of the step.
27    ///
28    /// Returns a `StepStatus` indicating the current status of the step.
29    fn get_status(&self) -> StepStatus;
30
31    /// Gets the name of the step.
32    ///
33    /// Returns a reference to the name of the step.
34    fn get_name(&self) -> &String;
35
36    /// Gets the ID of the step.
37    ///
38    /// Returns the UUID representing the ID of the step.
39    fn get_id(&self) -> Uuid;
40
41    /// Gets the number of items read by the step.
42    ///
43    /// Returns the count of items read by the step.
44    fn get_read_count(&self) -> usize;
45
46    /// Gets the number of items written by the step.
47    ///
48    /// Returns the count of items written by the step.
49    fn get_write_count(&self) -> usize;
50
51    /// Gets the number of read errors encountered by the step.
52    ///
53    /// Returns the count of read errors encountered by the step.
54    fn get_read_error_count(&self) -> usize;
55
56    /// Gets the number of write errors encountered by the step.
57    ///
58    /// Returns the count of write errors encountered by the step.
59    fn get_write_error_count(&self) -> usize;
60}
61
62/// Represents the status of a chunk.
63#[derive(Debug, PartialEq)]
64pub enum ChunkStatus {
65    /// The chunk has been fully processed.
66    Finished,
67    /// The chunk is full and ready to be processed.
68    Full,
69}
70
71/// Represents the status of a step.
72#[derive(Debug, PartialEq, Clone, Copy)]
73pub enum StepStatus {
74    /// The step executed successfully.
75    Success,
76    /// An error occurred during the read operation.
77    ReadError,
78    /// An error occurred during the processing operation.
79    ProcessorError,
80    /// An error occurred during the write operation.
81    WriteError,
82    /// The step is starting.
83    Starting,
84}
85
86/// Represents the execution details of a step.
87#[derive(Debug)]
88pub struct StepExecution {
89    /// The start time of the step execution.
90    pub start: Instant,
91    /// The end time of the step execution.
92    pub end: Instant,
93    /// The duration of the step execution.
94    pub duration: Duration,
95}
96
97/// Represents an instance of a step in a batch job.
98pub struct StepInstance<'a, R, W> {
99    id: Uuid,
100    name: String,
101    status: Cell<StepStatus>,
102    reader: &'a dyn ItemReader<R>,
103    processor: &'a dyn ItemProcessor<R, W>,
104    writer: &'a dyn ItemWriter<W>,
105    chunk_size: usize,
106    skip_limit: usize,
107    read_count: Cell<usize>,
108    write_count: Cell<usize>,
109    read_error_count: Cell<usize>,
110    process_error_count: Cell<usize>,
111    write_error_count: Cell<usize>,
112}
113
114impl<'a, R, W> Step for StepInstance<'a, R, W> {
115    fn execute(&self) -> StepResult<StepExecution> {
116        // Start the timer
117        let start = Instant::now();
118
119        // Log the start of the step
120        info!("Start of step: {}, id: {}", self.name, self.id);
121
122        // Open the writer and handle any errors
123        Self::manage_error(self.writer.open());
124
125        // Create a vector to store the read items
126        let mut read_items: Vec<R> = Vec::with_capacity(self.chunk_size);
127
128        // Loop until the chunk is finished or an error occurs
129        loop {
130            // Read a chunk of items
131            let read_chunk_result = self.read_chunk(&mut read_items);
132
133            // Handle read errors
134            if read_chunk_result.is_err() {
135                self.set_status(StepStatus::ReadError);
136                break;
137            }
138
139            // Process the chunk of items
140            let processor_chunk_result = self.process_chunk(&read_items);
141
142            // Handle processing errors
143            if processor_chunk_result.is_err() {
144                self.set_status(StepStatus::ProcessorError);
145                break;
146            }
147
148            // Write the processed items
149            let write_chunk_result = self.write_chunk(&processor_chunk_result.unwrap());
150
151            // Handle write errors
152            if write_chunk_result.is_err() {
153                self.set_status(StepStatus::WriteError);
154                break;
155            }
156
157            // Check if the chunk is finished
158            if read_chunk_result.unwrap() == ChunkStatus::Finished {
159                self.set_status(StepStatus::Success);
160                break;
161            }
162        }
163
164        // Close the writer and handle any errors
165        Self::manage_error(self.writer.close());
166
167        // Log the end of the step
168        info!("End of step: {}, id: {}", self.name, self.id);
169
170        // Calculate the step execution details
171        let step_execution = StepExecution {
172            start,
173            end: Instant::now(),
174            duration: start.elapsed(),
175        };
176
177        // Return the step execution details if the step is successful,
178        // or an error if the step failed
179        if StepStatus::Success == self.status.get() {
180            Ok(step_execution)
181        } else {
182            Err(step_execution)
183        }
184    }
185
186    fn get_status(&self) -> StepStatus {
187        self.status.get()
188    }
189
190    fn get_name(&self) -> &String {
191        &self.name
192    }
193
194    fn get_id(&self) -> Uuid {
195        self.id
196    }
197
198    fn get_read_count(&self) -> usize {
199        self.read_count.get()
200    }
201
202    fn get_write_count(&self) -> usize {
203        self.write_count.get()
204    }
205
206    fn get_read_error_count(&self) -> usize {
207        self.read_error_count.get()
208    }
209
210    fn get_write_error_count(&self) -> usize {
211        self.write_error_count.get()
212    }
213}
214
215/// Represents an instance of a step in a batch job.
216impl<'a, R, W> StepInstance<'a, R, W> {
217    /// Sets the status of the step instance.
218    ///
219    /// # Arguments
220    ///
221    /// * `status` - The status to set for the step instance.
222    fn set_status(&self, status: StepStatus) {
223        self.status.set(status);
224    }
225
226    /// Checks if the skip limit for the step instance has been reached.
227    ///
228    /// Returns `true` if the skip limit has been reached, `false` otherwise.
229    fn is_skip_limit_reached(&self) -> bool {
230        self.read_error_count.get() + self.write_error_count.get() + self.process_error_count.get()
231            > self.skip_limit
232    }
233
234    /// Reads a chunk of items from the reader.
235    ///
236    /// # Arguments
237    ///
238    /// * `read_items` - A mutable reference to a vector where the read items will be stored.
239    ///
240    /// Returns a `ChunkResult` indicating the status of the read operation.
241    fn read_chunk(&self, read_items: &mut Vec<R>) -> ChunkResult<ChunkStatus> {
242        debug!("Start reading chunk");
243        read_items.clear();
244
245        loop {
246            let read_result = self.reader.read();
247
248            match read_result {
249                Ok(item) => {
250                    match item {
251                        Some(item) => {
252                            read_items.push(item);
253                            self.inc_read_count();
254                        }
255                        None => {
256                            // All items of reader have been read
257                            debug!("End reading chunk: FINISHED");
258                            return Ok(ChunkStatus::Finished);
259                        }
260                    };
261
262                    if read_items.len() == self.chunk_size {
263                        // The chunk is full, we can process and write items
264                        debug!("End reading chunk: FULL");
265                        return Ok(ChunkStatus::Full);
266                    }
267                }
268                Err(err) => {
269                    self.inc_read_error_count();
270                    if self.is_skip_limit_reached() {
271                        return Err(BatchError::ItemReader("error limit reached".to_string()));
272                    } else {
273                        warn!("Error occurred during read item: {}", err);
274                    }
275                }
276            }
277        }
278    }
279
280    /// Processes a chunk of read items using the processor.
281    ///
282    /// # Arguments
283    ///
284    /// * `read_items` - A reference to a vector containing the read items.
285    ///
286    /// Returns a `Result` containing a vector of processed items or a `BatchError` if an error occurred.
287    fn process_chunk(&self, read_items: &Vec<R>) -> Result<Vec<W>, BatchError> {
288        let mut processed_items = Vec::with_capacity(read_items.len());
289
290        debug!("Start processing chunk");
291        for item in read_items {
292            let result = self.processor.process(item);
293
294            match result {
295                Ok(item) => {
296                    debug!("Processing item");
297                    processed_items.push(item)
298                }
299                Err(err) => {
300                    self.inc_process_error_count(1);
301                    if self.is_skip_limit_reached() {
302                        return Err(BatchError::ItemProcessor(err.to_string()));
303                    } else {
304                        warn!("ItemProcessor error: {}", err.to_string());
305                    }
306                }
307            };
308        }
309        debug!("End processing chunk");
310
311        Ok(processed_items)
312    }
313
314    /// Writes a chunk of processed items using the writer.
315    ///
316    /// # Arguments
317    ///
318    /// * `processed_items` - A slice containing the processed items to write.
319    ///
320    /// Returns a `Result` indicating the success of the write operation or a `BatchError` if an error occurred.
321    fn write_chunk(&self, processed_items: &[W]) -> Result<(), BatchError> {
322        debug!("Start writing chunk");
323
324        let result = self.writer.write(processed_items);
325        match result {
326            Ok(()) => {
327                debug!("ItemWriter success")
328            }
329            Err(err) => {
330                self.inc_write_error_count(processed_items.len());
331                if self.is_skip_limit_reached() {
332                    return Err(BatchError::ItemWriter(err.to_string()));
333                } else {
334                    warn!("Error occurred during write item: {}", err);
335                }
336            }
337        }
338
339        match self.writer.flush() {
340            Ok(()) => {
341                self.inc_write_count(processed_items.len());
342                debug!("End writing chunk");
343                Ok(())
344            }
345            Err(err) => {
346                self.inc_write_error_count(processed_items.len());
347                if self.is_skip_limit_reached() {
348                    Err(BatchError::ItemWriter(err.to_string()))
349                } else {
350                    warn!("Error occurred during flush item: {}", err);
351                    Ok(())
352                }
353            }
354        }
355    }
356
357    /// Increments the read count by 1.
358    fn inc_read_count(&self) {
359        self.read_count.set(self.read_count.get() + 1);
360    }
361
362    /// Increments the read error count by 1.
363    fn inc_read_error_count(&self) {
364        self.read_error_count.set(self.read_error_count.get() + 1);
365    }
366
367    /// Increments the write count by the specified amount.
368    ///
369    /// # Arguments
370    ///
371    /// * `write_count` - The amount to increment the write count by.
372    fn inc_write_count(&self, write_count: usize) {
373        self.write_count.set(self.write_count.get() + write_count);
374    }
375
376    /// Increments the write error count by the specified amount.
377    ///
378    /// # Arguments
379    ///
380    /// * `write_count` - The amount to increment the write error count by.
381    fn inc_write_error_count(&self, write_count: usize) {
382        self.write_error_count
383            .set(self.write_error_count.get() + write_count);
384    }
385
386    /// Increments the process error count by the specified amount.
387    ///
388    /// # Arguments
389    ///
390    /// * `write_count` - The amount to increment the process error count by.
391    fn inc_process_error_count(&self, write_count: usize) {
392        self.process_error_count
393            .set(self.process_error_count.get() + write_count);
394    }
395
396    /// Manages the error returned by a step instance operation.
397    ///
398    /// # Arguments
399    ///
400    /// * `result` - The result of the step instance operation.
401    fn manage_error(result: Result<(), BatchError>) {
402        match result {
403            Ok(()) => {}
404            Err(error) => {
405                panic!("{}", error.to_string());
406            }
407        };
408    }
409}
410
411#[derive(Default)]
412pub struct StepBuilder<'a, R, W> {
413    name: Option<String>,
414    reader: Option<&'a dyn ItemReader<R>>,
415    processor: Option<&'a dyn ItemProcessor<R, W>>,
416    writer: Option<&'a dyn ItemWriter<W>>,
417    chunk_size: usize,
418    skip_limit: usize,
419}
420
421impl<'a, R: Serialize, W: DeserializeOwned> StepBuilder<'a, R, W> {
422    pub fn new() -> StepBuilder<'a, R, W> {
423        Self {
424            name: None,
425            reader: None,
426            processor: None,
427            writer: None,
428            chunk_size: 1,
429            skip_limit: 0,
430        }
431    }
432
433    pub fn name(mut self, name: String) -> StepBuilder<'a, R, W> {
434        self.name = Some(name);
435        self
436    }
437
438    pub fn reader(mut self, reader: &'a impl ItemReader<R>) -> StepBuilder<'a, R, W> {
439        self.reader = Some(reader);
440        self
441    }
442
443    pub fn processor(mut self, processor: &'a impl ItemProcessor<R, W>) -> StepBuilder<'a, R, W> {
444        self.processor = Some(processor);
445        self
446    }
447
448    pub fn writer(mut self, writer: &'a impl ItemWriter<W>) -> StepBuilder<'a, R, W> {
449        self.writer = Some(writer);
450        self
451    }
452
453    pub fn chunk(mut self, chunk_size: usize) -> StepBuilder<'a, R, W> {
454        self.chunk_size = chunk_size;
455        self
456    }
457
458    pub fn skip_limit(mut self, skip_limit: usize) -> StepBuilder<'a, R, W> {
459        self.skip_limit = skip_limit;
460        self
461    }
462
463    pub fn build(self) -> StepInstance<'a, R, W> {
464        let default_processor = &DefaultProcessor {};
465
466        StepInstance {
467            id: Uuid::new_v4(),
468            name: self.name.unwrap_or(build_name()),
469            status: Cell::new(StepStatus::Starting),
470            reader: self.reader.unwrap(),
471            processor: self.processor.unwrap_or(default_processor),
472            writer: self.writer.unwrap(),
473            chunk_size: self.chunk_size,
474            skip_limit: self.skip_limit,
475            write_error_count: Cell::new(0),
476            process_error_count: Cell::new(0),
477            read_error_count: Cell::new(0),
478            write_count: Cell::new(0),
479            read_count: Cell::new(0),
480        }
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use anyhow::Result;
487    use mockall::mock;
488    use serde::{Deserialize, Serialize};
489
490    use crate::{
491        core::{
492            item::{
493                ItemProcessor, ItemProcessorResult, ItemReader, ItemReaderResult, ItemWriter,
494                ItemWriterResult,
495            },
496            step::StepStatus,
497        },
498        BatchError,
499    };
500
501    use super::{Step, StepBuilder, StepInstance};
502
503    mock! {
504        pub TestItemReader {}
505        impl ItemReader<Car> for TestItemReader {
506            fn read(&self) -> ItemReaderResult<Car>;
507        }
508    }
509
510    mock! {
511        pub TestProcessor {}
512        impl ItemProcessor<Car, Car> for TestProcessor {
513            fn process(&self, item: &Car) -> ItemProcessorResult<Car>;
514        }
515    }
516
517    mock! {
518        pub TestItemWriter {}
519        impl ItemWriter<Car> for TestItemWriter {
520            fn write(&self, items: &[Car]) -> ItemWriterResult;
521        }
522    }
523
524    #[derive(Deserialize, Serialize, Debug, Clone)]
525    struct Car {
526        year: u16,
527        make: String,
528        model: String,
529        description: String,
530    }
531
532    fn mock_read(i: &mut u16, error_count: u16, end_count: u16) -> ItemReaderResult<Car> {
533        if end_count > 0 && *i == end_count {
534            return Ok(None);
535        } else if error_count > 0 && *i == error_count {
536            return Err(BatchError::ItemReader("mock read error".to_string()));
537        }
538
539        let car = Car {
540            year: 1979,
541            make: "make".to_owned(),
542            model: "model".to_owned(),
543            description: "description".to_owned(),
544        };
545        *i += 1;
546        Ok(Some(car))
547    }
548
549    fn mock_process(i: &mut u16, error_at: &[u16]) -> ItemProcessorResult<Car> {
550        *i += 1;
551        if error_at.contains(i) {
552            return Err(BatchError::ItemProcessor("mock process error".to_string()));
553        }
554
555        let car = Car {
556            year: 1979,
557            make: "make".to_owned(),
558            model: "model".to_owned(),
559            description: "description".to_owned(),
560        };
561        Ok(car)
562    }
563
564    #[test]
565    fn step_should_succeded_with_empty_data() -> Result<()> {
566        let mut reader = MockTestItemReader::default();
567        let reader_result = Ok(None);
568        reader.expect_read().return_once(move || reader_result);
569
570        let mut processor = MockTestProcessor::default();
571        processor.expect_process().never();
572
573        let mut writer = MockTestItemWriter::default();
574        writer.expect_write().times(1).returning(|_| Ok(()));
575
576        let step: StepInstance<Car, Car> = StepBuilder::new()
577            .name("test".to_string())
578            .reader(&reader)
579            .processor(&processor)
580            .writer(&writer)
581            .chunk(3)
582            .build();
583
584        let result = step.execute();
585
586        assert!(result.is_ok());
587        assert_eq!(step.get_name(), "test");
588        assert!(!step.get_name().is_empty());
589        assert!(!step.get_id().is_nil());
590        assert_eq!(step.get_status(), StepStatus::Success);
591
592        Ok(())
593    }
594
595    #[test]
596    fn step_should_failed_with_processor_error() -> Result<()> {
597        let mut i = 0;
598        let mut reader = MockTestItemReader::default();
599        reader
600            .expect_read()
601            .returning(move || mock_read(&mut i, 0, 4));
602
603        let mut processor = MockTestProcessor::default();
604        let mut i = 0;
605        processor
606            .expect_process()
607            .returning(move |_| mock_process(&mut i, &[2]));
608
609        let mut writer = MockTestItemWriter::default();
610        writer.expect_write().never();
611
612        let step: StepInstance<Car, Car> = StepBuilder::new()
613            .reader(&reader)
614            .processor(&processor)
615            .writer(&writer)
616            .chunk(3)
617            .build();
618
619        let result = step.execute();
620
621        assert!(result.is_err());
622        assert_eq!(step.get_status(), StepStatus::ProcessorError);
623
624        Ok(())
625    }
626
627    #[test]
628    fn step_should_failed_with_write_error() -> Result<()> {
629        let mut i = 0;
630        let mut reader = MockTestItemReader::default();
631        reader
632            .expect_read()
633            .returning(move || mock_read(&mut i, 0, 4));
634
635        let mut processor = MockTestProcessor::default();
636        let mut i = 0;
637        processor
638            .expect_process()
639            .returning(move |_| mock_process(&mut i, &[]));
640
641        let mut writer = MockTestItemWriter::default();
642        let result = Err(BatchError::ItemWriter("mock write error".to_string()));
643        writer.expect_write().return_once(move |_| result);
644
645        let step: StepInstance<Car, Car> = StepBuilder::new()
646            .reader(&reader)
647            .processor(&processor)
648            .writer(&writer)
649            .chunk(3)
650            .build();
651
652        let result = step.execute();
653
654        assert!(result.is_err());
655        assert_eq!(step.get_status(), StepStatus::WriteError);
656
657        Ok(())
658    }
659
660    #[test]
661    fn step_should_succeed_even_with_processor_error() -> Result<()> {
662        let mut i = 0;
663        let mut reader = MockTestItemReader::default();
664        reader
665            .expect_read()
666            .returning(move || mock_read(&mut i, 0, 4));
667
668        let mut processor = MockTestProcessor::default();
669        let mut i = 0;
670        processor
671            .expect_process()
672            .returning(move |_| mock_process(&mut i, &[2]));
673
674        let mut writer = MockTestItemWriter::default();
675        writer.expect_write().times(2).returning(|_| Ok(()));
676
677        let step: StepInstance<Car, Car> = StepBuilder::new()
678            .reader(&reader)
679            .processor(&processor)
680            .writer(&writer)
681            .chunk(3)
682            .skip_limit(1)
683            .build();
684
685        let result = step.execute();
686
687        assert!(result.is_ok());
688        assert_eq!(step.get_status(), StepStatus::Success);
689
690        Ok(())
691    }
692
693    #[test]
694    fn step_should_succeed_even_with_write_error() -> Result<()> {
695        let mut i = 0;
696        let mut reader = MockTestItemReader::default();
697        reader
698            .expect_read()
699            .returning(move || mock_read(&mut i, 0, 4));
700
701        let mut processor = MockTestProcessor::default();
702        let mut i = 0;
703        processor
704            .expect_process()
705            .returning(move |_| mock_process(&mut i, &[2]));
706
707        let mut writer = MockTestItemWriter::default();
708        writer.expect_write().times(2).returning(|_| Ok(()));
709
710        let step: StepInstance<Car, Car> = StepBuilder::new()
711            .reader(&reader)
712            .processor(&processor)
713            .writer(&writer)
714            .chunk(3)
715            .skip_limit(1)
716            .build();
717
718        let result = step.execute();
719
720        assert!(result.is_ok());
721        assert_eq!(step.get_status(), StepStatus::Success);
722
723        Ok(())
724    }
725}