1use crate::BatchError;
2use log::{debug, info, warn};
3use serde::{de::DeserializeOwned, Serialize};
4use std::{
5 cell::Cell,
6 time::{Duration, Instant},
7};
8use uuid::Uuid;
9
10use super::{
11 build_name,
12 item::{DefaultProcessor, ItemProcessor, ItemReader, ItemWriter},
13};
14
15type StepResult<T> = Result<T, T>;
16
17type ChunkResult<T> = Result<T, BatchError>;
18
19pub trait Step {
20 fn execute(&self) -> StepResult<StepExecution>;
25
26 fn get_status(&self) -> StepStatus;
30
31 fn get_name(&self) -> &String;
35
36 fn get_id(&self) -> Uuid;
40
41 fn get_read_count(&self) -> usize;
45
46 fn get_write_count(&self) -> usize;
50
51 fn get_read_error_count(&self) -> usize;
55
56 fn get_write_error_count(&self) -> usize;
60}
61
62#[derive(Debug, PartialEq)]
64pub enum ChunkStatus {
65 Finished,
67 Full,
69}
70
71#[derive(Debug, PartialEq, Clone, Copy)]
73pub enum StepStatus {
74 Success,
76 ReadError,
78 ProcessorError,
80 WriteError,
82 Starting,
84}
85
86#[derive(Debug)]
88pub struct StepExecution {
89 pub start: Instant,
91 pub end: Instant,
93 pub duration: Duration,
95}
96
97pub struct StepInstance<'a, R, W> {
99 id: Uuid,
100 name: String,
101 status: Cell<StepStatus>,
102 reader: &'a dyn ItemReader<R>,
103 processor: &'a dyn ItemProcessor<R, W>,
104 writer: &'a dyn ItemWriter<W>,
105 chunk_size: usize,
106 skip_limit: usize,
107 read_count: Cell<usize>,
108 write_count: Cell<usize>,
109 read_error_count: Cell<usize>,
110 process_error_count: Cell<usize>,
111 write_error_count: Cell<usize>,
112}
113
114impl<'a, R, W> Step for StepInstance<'a, R, W> {
115 fn execute(&self) -> StepResult<StepExecution> {
116 let start = Instant::now();
118
119 info!("Start of step: {}, id: {}", self.name, self.id);
121
122 Self::manage_error(self.writer.open());
124
125 let mut read_items: Vec<R> = Vec::with_capacity(self.chunk_size);
127
128 loop {
130 let read_chunk_result = self.read_chunk(&mut read_items);
132
133 if read_chunk_result.is_err() {
135 self.set_status(StepStatus::ReadError);
136 break;
137 }
138
139 let processor_chunk_result = self.process_chunk(&read_items);
141
142 if processor_chunk_result.is_err() {
144 self.set_status(StepStatus::ProcessorError);
145 break;
146 }
147
148 let write_chunk_result = self.write_chunk(&processor_chunk_result.unwrap());
150
151 if write_chunk_result.is_err() {
153 self.set_status(StepStatus::WriteError);
154 break;
155 }
156
157 if read_chunk_result.unwrap() == ChunkStatus::Finished {
159 self.set_status(StepStatus::Success);
160 break;
161 }
162 }
163
164 Self::manage_error(self.writer.close());
166
167 info!("End of step: {}, id: {}", self.name, self.id);
169
170 let step_execution = StepExecution {
172 start,
173 end: Instant::now(),
174 duration: start.elapsed(),
175 };
176
177 if StepStatus::Success == self.status.get() {
180 Ok(step_execution)
181 } else {
182 Err(step_execution)
183 }
184 }
185
186 fn get_status(&self) -> StepStatus {
187 self.status.get()
188 }
189
190 fn get_name(&self) -> &String {
191 &self.name
192 }
193
194 fn get_id(&self) -> Uuid {
195 self.id
196 }
197
198 fn get_read_count(&self) -> usize {
199 self.read_count.get()
200 }
201
202 fn get_write_count(&self) -> usize {
203 self.write_count.get()
204 }
205
206 fn get_read_error_count(&self) -> usize {
207 self.read_error_count.get()
208 }
209
210 fn get_write_error_count(&self) -> usize {
211 self.write_error_count.get()
212 }
213}
214
215impl<'a, R, W> StepInstance<'a, R, W> {
217 fn set_status(&self, status: StepStatus) {
223 self.status.set(status);
224 }
225
226 fn is_skip_limit_reached(&self) -> bool {
230 self.read_error_count.get() + self.write_error_count.get() + self.process_error_count.get()
231 > self.skip_limit
232 }
233
234 fn read_chunk(&self, read_items: &mut Vec<R>) -> ChunkResult<ChunkStatus> {
242 debug!("Start reading chunk");
243 read_items.clear();
244
245 loop {
246 let read_result = self.reader.read();
247
248 match read_result {
249 Ok(item) => {
250 match item {
251 Some(item) => {
252 read_items.push(item);
253 self.inc_read_count();
254 }
255 None => {
256 debug!("End reading chunk: FINISHED");
258 return Ok(ChunkStatus::Finished);
259 }
260 };
261
262 if read_items.len() == self.chunk_size {
263 debug!("End reading chunk: FULL");
265 return Ok(ChunkStatus::Full);
266 }
267 }
268 Err(err) => {
269 self.inc_read_error_count();
270 if self.is_skip_limit_reached() {
271 return Err(BatchError::ItemReader("error limit reached".to_string()));
272 } else {
273 warn!("Error occurred during read item: {}", err);
274 }
275 }
276 }
277 }
278 }
279
280 fn process_chunk(&self, read_items: &Vec<R>) -> Result<Vec<W>, BatchError> {
288 let mut processed_items = Vec::with_capacity(read_items.len());
289
290 debug!("Start processing chunk");
291 for item in read_items {
292 let result = self.processor.process(item);
293
294 match result {
295 Ok(item) => {
296 debug!("Processing item");
297 processed_items.push(item)
298 }
299 Err(err) => {
300 self.inc_process_error_count(1);
301 if self.is_skip_limit_reached() {
302 return Err(BatchError::ItemProcessor(err.to_string()));
303 } else {
304 warn!("ItemProcessor error: {}", err.to_string());
305 }
306 }
307 };
308 }
309 debug!("End processing chunk");
310
311 Ok(processed_items)
312 }
313
314 fn write_chunk(&self, processed_items: &[W]) -> Result<(), BatchError> {
322 debug!("Start writing chunk");
323
324 let result = self.writer.write(processed_items);
325 match result {
326 Ok(()) => {
327 debug!("ItemWriter success")
328 }
329 Err(err) => {
330 self.inc_write_error_count(processed_items.len());
331 if self.is_skip_limit_reached() {
332 return Err(BatchError::ItemWriter(err.to_string()));
333 } else {
334 warn!("Error occurred during write item: {}", err);
335 }
336 }
337 }
338
339 match self.writer.flush() {
340 Ok(()) => {
341 self.inc_write_count(processed_items.len());
342 debug!("End writing chunk");
343 Ok(())
344 }
345 Err(err) => {
346 self.inc_write_error_count(processed_items.len());
347 if self.is_skip_limit_reached() {
348 Err(BatchError::ItemWriter(err.to_string()))
349 } else {
350 warn!("Error occurred during flush item: {}", err);
351 Ok(())
352 }
353 }
354 }
355 }
356
357 fn inc_read_count(&self) {
359 self.read_count.set(self.read_count.get() + 1);
360 }
361
362 fn inc_read_error_count(&self) {
364 self.read_error_count.set(self.read_error_count.get() + 1);
365 }
366
367 fn inc_write_count(&self, write_count: usize) {
373 self.write_count.set(self.write_count.get() + write_count);
374 }
375
376 fn inc_write_error_count(&self, write_count: usize) {
382 self.write_error_count
383 .set(self.write_error_count.get() + write_count);
384 }
385
386 fn inc_process_error_count(&self, write_count: usize) {
392 self.process_error_count
393 .set(self.process_error_count.get() + write_count);
394 }
395
396 fn manage_error(result: Result<(), BatchError>) {
402 match result {
403 Ok(()) => {}
404 Err(error) => {
405 panic!("{}", error.to_string());
406 }
407 };
408 }
409}
410
411#[derive(Default)]
412pub struct StepBuilder<'a, R, W> {
413 name: Option<String>,
414 reader: Option<&'a dyn ItemReader<R>>,
415 processor: Option<&'a dyn ItemProcessor<R, W>>,
416 writer: Option<&'a dyn ItemWriter<W>>,
417 chunk_size: usize,
418 skip_limit: usize,
419}
420
421impl<'a, R: Serialize, W: DeserializeOwned> StepBuilder<'a, R, W> {
422 pub fn new() -> StepBuilder<'a, R, W> {
423 Self {
424 name: None,
425 reader: None,
426 processor: None,
427 writer: None,
428 chunk_size: 1,
429 skip_limit: 0,
430 }
431 }
432
433 pub fn name(mut self, name: String) -> StepBuilder<'a, R, W> {
434 self.name = Some(name);
435 self
436 }
437
438 pub fn reader(mut self, reader: &'a impl ItemReader<R>) -> StepBuilder<'a, R, W> {
439 self.reader = Some(reader);
440 self
441 }
442
443 pub fn processor(mut self, processor: &'a impl ItemProcessor<R, W>) -> StepBuilder<'a, R, W> {
444 self.processor = Some(processor);
445 self
446 }
447
448 pub fn writer(mut self, writer: &'a impl ItemWriter<W>) -> StepBuilder<'a, R, W> {
449 self.writer = Some(writer);
450 self
451 }
452
453 pub fn chunk(mut self, chunk_size: usize) -> StepBuilder<'a, R, W> {
454 self.chunk_size = chunk_size;
455 self
456 }
457
458 pub fn skip_limit(mut self, skip_limit: usize) -> StepBuilder<'a, R, W> {
459 self.skip_limit = skip_limit;
460 self
461 }
462
463 pub fn build(self) -> StepInstance<'a, R, W> {
464 let default_processor = &DefaultProcessor {};
465
466 StepInstance {
467 id: Uuid::new_v4(),
468 name: self.name.unwrap_or(build_name()),
469 status: Cell::new(StepStatus::Starting),
470 reader: self.reader.unwrap(),
471 processor: self.processor.unwrap_or(default_processor),
472 writer: self.writer.unwrap(),
473 chunk_size: self.chunk_size,
474 skip_limit: self.skip_limit,
475 write_error_count: Cell::new(0),
476 process_error_count: Cell::new(0),
477 read_error_count: Cell::new(0),
478 write_count: Cell::new(0),
479 read_count: Cell::new(0),
480 }
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use anyhow::Result;
487 use mockall::mock;
488 use serde::{Deserialize, Serialize};
489
490 use crate::{
491 core::{
492 item::{
493 ItemProcessor, ItemProcessorResult, ItemReader, ItemReaderResult, ItemWriter,
494 ItemWriterResult,
495 },
496 step::StepStatus,
497 },
498 BatchError,
499 };
500
501 use super::{Step, StepBuilder, StepInstance};
502
503 mock! {
504 pub TestItemReader {}
505 impl ItemReader<Car> for TestItemReader {
506 fn read(&self) -> ItemReaderResult<Car>;
507 }
508 }
509
510 mock! {
511 pub TestProcessor {}
512 impl ItemProcessor<Car, Car> for TestProcessor {
513 fn process(&self, item: &Car) -> ItemProcessorResult<Car>;
514 }
515 }
516
517 mock! {
518 pub TestItemWriter {}
519 impl ItemWriter<Car> for TestItemWriter {
520 fn write(&self, items: &[Car]) -> ItemWriterResult;
521 }
522 }
523
524 #[derive(Deserialize, Serialize, Debug, Clone)]
525 struct Car {
526 year: u16,
527 make: String,
528 model: String,
529 description: String,
530 }
531
532 fn mock_read(i: &mut u16, error_count: u16, end_count: u16) -> ItemReaderResult<Car> {
533 if end_count > 0 && *i == end_count {
534 return Ok(None);
535 } else if error_count > 0 && *i == error_count {
536 return Err(BatchError::ItemReader("mock read error".to_string()));
537 }
538
539 let car = Car {
540 year: 1979,
541 make: "make".to_owned(),
542 model: "model".to_owned(),
543 description: "description".to_owned(),
544 };
545 *i += 1;
546 Ok(Some(car))
547 }
548
549 fn mock_process(i: &mut u16, error_at: &[u16]) -> ItemProcessorResult<Car> {
550 *i += 1;
551 if error_at.contains(i) {
552 return Err(BatchError::ItemProcessor("mock process error".to_string()));
553 }
554
555 let car = Car {
556 year: 1979,
557 make: "make".to_owned(),
558 model: "model".to_owned(),
559 description: "description".to_owned(),
560 };
561 Ok(car)
562 }
563
564 #[test]
565 fn step_should_succeded_with_empty_data() -> Result<()> {
566 let mut reader = MockTestItemReader::default();
567 let reader_result = Ok(None);
568 reader.expect_read().return_once(move || reader_result);
569
570 let mut processor = MockTestProcessor::default();
571 processor.expect_process().never();
572
573 let mut writer = MockTestItemWriter::default();
574 writer.expect_write().times(1).returning(|_| Ok(()));
575
576 let step: StepInstance<Car, Car> = StepBuilder::new()
577 .name("test".to_string())
578 .reader(&reader)
579 .processor(&processor)
580 .writer(&writer)
581 .chunk(3)
582 .build();
583
584 let result = step.execute();
585
586 assert!(result.is_ok());
587 assert_eq!(step.get_name(), "test");
588 assert!(!step.get_name().is_empty());
589 assert!(!step.get_id().is_nil());
590 assert_eq!(step.get_status(), StepStatus::Success);
591
592 Ok(())
593 }
594
595 #[test]
596 fn step_should_failed_with_processor_error() -> Result<()> {
597 let mut i = 0;
598 let mut reader = MockTestItemReader::default();
599 reader
600 .expect_read()
601 .returning(move || mock_read(&mut i, 0, 4));
602
603 let mut processor = MockTestProcessor::default();
604 let mut i = 0;
605 processor
606 .expect_process()
607 .returning(move |_| mock_process(&mut i, &[2]));
608
609 let mut writer = MockTestItemWriter::default();
610 writer.expect_write().never();
611
612 let step: StepInstance<Car, Car> = StepBuilder::new()
613 .reader(&reader)
614 .processor(&processor)
615 .writer(&writer)
616 .chunk(3)
617 .build();
618
619 let result = step.execute();
620
621 assert!(result.is_err());
622 assert_eq!(step.get_status(), StepStatus::ProcessorError);
623
624 Ok(())
625 }
626
627 #[test]
628 fn step_should_failed_with_write_error() -> Result<()> {
629 let mut i = 0;
630 let mut reader = MockTestItemReader::default();
631 reader
632 .expect_read()
633 .returning(move || mock_read(&mut i, 0, 4));
634
635 let mut processor = MockTestProcessor::default();
636 let mut i = 0;
637 processor
638 .expect_process()
639 .returning(move |_| mock_process(&mut i, &[]));
640
641 let mut writer = MockTestItemWriter::default();
642 let result = Err(BatchError::ItemWriter("mock write error".to_string()));
643 writer.expect_write().return_once(move |_| result);
644
645 let step: StepInstance<Car, Car> = StepBuilder::new()
646 .reader(&reader)
647 .processor(&processor)
648 .writer(&writer)
649 .chunk(3)
650 .build();
651
652 let result = step.execute();
653
654 assert!(result.is_err());
655 assert_eq!(step.get_status(), StepStatus::WriteError);
656
657 Ok(())
658 }
659
660 #[test]
661 fn step_should_succeed_even_with_processor_error() -> Result<()> {
662 let mut i = 0;
663 let mut reader = MockTestItemReader::default();
664 reader
665 .expect_read()
666 .returning(move || mock_read(&mut i, 0, 4));
667
668 let mut processor = MockTestProcessor::default();
669 let mut i = 0;
670 processor
671 .expect_process()
672 .returning(move |_| mock_process(&mut i, &[2]));
673
674 let mut writer = MockTestItemWriter::default();
675 writer.expect_write().times(2).returning(|_| Ok(()));
676
677 let step: StepInstance<Car, Car> = StepBuilder::new()
678 .reader(&reader)
679 .processor(&processor)
680 .writer(&writer)
681 .chunk(3)
682 .skip_limit(1)
683 .build();
684
685 let result = step.execute();
686
687 assert!(result.is_ok());
688 assert_eq!(step.get_status(), StepStatus::Success);
689
690 Ok(())
691 }
692
693 #[test]
694 fn step_should_succeed_even_with_write_error() -> Result<()> {
695 let mut i = 0;
696 let mut reader = MockTestItemReader::default();
697 reader
698 .expect_read()
699 .returning(move || mock_read(&mut i, 0, 4));
700
701 let mut processor = MockTestProcessor::default();
702 let mut i = 0;
703 processor
704 .expect_process()
705 .returning(move |_| mock_process(&mut i, &[2]));
706
707 let mut writer = MockTestItemWriter::default();
708 writer.expect_write().times(2).returning(|_| Ok(()));
709
710 let step: StepInstance<Car, Car> = StepBuilder::new()
711 .reader(&reader)
712 .processor(&processor)
713 .writer(&writer)
714 .chunk(3)
715 .skip_limit(1)
716 .build();
717
718 let result = step.execute();
719
720 assert!(result.is_ok());
721 assert_eq!(step.get_status(), StepStatus::Success);
722
723 Ok(())
724 }
725}