1use crate::BatchError;
93use log::{debug, error, info, warn};
94use std::time::{Duration, Instant};
95use uuid::Uuid;
96
97use super::item::{ItemProcessor, ItemReader, ItemWriter};
98
99pub trait Tasklet {
125 fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
134}
135
136pub struct TaskletStep<'a> {
159 name: String,
160 tasklet: &'a dyn Tasklet,
161}
162
163impl Step for TaskletStep<'_> {
164 fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
165 step_execution.status = StepStatus::Started;
166 let start_time = Instant::now();
167
168 info!(
169 "Start of step: {}, id: {}",
170 step_execution.name, step_execution.id
171 );
172
173 loop {
174 let result = self.tasklet.execute(step_execution);
175 match result {
176 Ok(RepeatStatus::Continuable) => {}
177 Ok(RepeatStatus::Finished) => {
178 step_execution.status = StepStatus::Success;
179 break;
180 }
181 Err(e) => {
182 error!(
183 "Error in step: {}, id: {}, error: {}",
184 step_execution.name, step_execution.id, e
185 );
186 step_execution.status = StepStatus::Failed;
187 step_execution.end_time = Some(Instant::now());
188 step_execution.duration = Some(start_time.elapsed());
189 return Err(e);
190 }
191 }
192 }
193
194 step_execution.start_time = Some(start_time);
196 step_execution.end_time = Some(Instant::now());
197 step_execution.duration = Some(start_time.elapsed());
198
199 Ok(())
200 }
201
202 fn get_name(&self) -> &str {
203 &self.name
204 }
205}
206
207pub struct TaskletBuilder<'a> {
231 name: String,
232 tasklet: Option<&'a dyn Tasklet>,
233}
234
235impl<'a> TaskletBuilder<'a> {
236 pub fn new(name: &str) -> Self {
249 Self {
250 name: name.to_string(),
251 tasklet: None,
252 }
253 }
254
255 pub fn tasklet(mut self, tasklet: &'a dyn Tasklet) -> Self {
278 self.tasklet = Some(tasklet);
279 self
280 }
281
282 pub fn build(self) -> TaskletStep<'a> {
306 TaskletStep {
307 name: self.name,
308 tasklet: self
309 .tasklet
310 .expect("Tasklet is required for building a step"),
311 }
312 }
313}
314
315#[derive(Clone)]
331pub struct StepExecution {
332 pub id: Uuid,
334 pub name: String,
336 pub status: StepStatus,
338 pub start_time: Option<Instant>,
340 pub end_time: Option<Instant>,
342 pub duration: Option<Duration>,
344 pub read_count: usize,
346 pub write_count: usize,
348 pub read_error_count: usize,
350 pub process_count: usize,
352 pub process_error_count: usize,
354 pub write_error_count: usize,
356}
357
358impl StepExecution {
359 pub fn new(name: &str) -> Self {
378 Self {
379 id: Uuid::new_v4(),
380 name: name.to_string(),
381 status: StepStatus::Starting,
382 start_time: None,
383 end_time: None,
384 duration: None,
385 read_count: 0,
386 write_count: 0,
387 read_error_count: 0,
388 process_count: 0,
389 process_error_count: 0,
390 write_error_count: 0,
391 }
392 }
393}
394
395pub enum BatchStatus {
413 COMPLETED,
415 STARTING,
417 STARTED,
419 STOPPING,
421 STOPPED,
423 FAILED,
425 ABANDONED,
427 UNKNOWN,
429}
430
431pub trait Step {
459 fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError>;
492
493 fn get_name(&self) -> &str;
509}
510
511#[derive(Debug, PartialEq)]
528pub enum RepeatStatus {
529 Continuable,
534 Finished,
539}
540
541pub struct ChunkOrientedStep<'a, I, O> {
588 name: String,
589 reader: &'a dyn ItemReader<I>,
591 processor: &'a dyn ItemProcessor<I, O>,
593 writer: &'a dyn ItemWriter<O>,
595 chunk_size: u16,
597 skip_limit: u16,
599}
600
601impl<I, O> Step for ChunkOrientedStep<'_, I, O> {
602 fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
603 let start_time = Instant::now();
605 info!(
606 "Start of step: {}, id: {}",
607 step_execution.name, step_execution.id
608 );
609
610 Self::manage_error(self.writer.open());
612
613 loop {
615 let (read_items, chunk_status) = match self.read_chunk(step_execution) {
617 Ok(chunk_data) => chunk_data,
618 Err(_) => {
619 step_execution.status = StepStatus::ReadError;
620 break;
621 }
622 };
623
624 if read_items.is_empty() {
626 step_execution.status = StepStatus::Success;
627 break;
628 }
629
630 if self
632 .process_and_write_chunk(step_execution, &read_items)
633 .is_err()
634 {
635 break; }
637
638 if chunk_status == ChunkStatus::Finished {
640 step_execution.status = StepStatus::Success;
641 break;
642 }
643 }
644
645 Self::manage_error(self.writer.close());
647
648 info!(
650 "End of step: {}, id: {}",
651 step_execution.name, step_execution.id
652 );
653
654 step_execution.start_time = Some(start_time);
656 step_execution.end_time = Some(Instant::now());
657 step_execution.duration = Some(start_time.elapsed());
658
659 if StepStatus::Success == step_execution.status {
662 Ok(())
663 } else {
664 Err(BatchError::Step(step_execution.name.clone()))
665 }
666 }
667
668 fn get_name(&self) -> &str {
669 &self.name
670 }
671}
672
673impl<I, O> ChunkOrientedStep<'_, I, O> {
674 fn process_and_write_chunk(
687 &self,
688 step_execution: &mut StepExecution,
689 read_items: &[I],
690 ) -> Result<(), BatchError> {
691 let processed_items = match self.process_chunk(step_execution, read_items) {
693 Ok(items) => items,
694 Err(error) => {
695 step_execution.status = StepStatus::ProcessorError;
696 return Err(error);
697 }
698 };
699
700 match self.write_chunk(step_execution, &processed_items) {
702 Ok(()) => Ok(()),
703 Err(error) => {
704 step_execution.status = StepStatus::WriteError;
705 Err(error)
706 }
707 }
708 }
709
710 fn read_chunk(
726 &self,
727 step_execution: &mut StepExecution,
728 ) -> Result<(Vec<I>, ChunkStatus), BatchError> {
729 debug!("Start reading chunk");
730
731 let mut read_items = Vec::with_capacity(self.chunk_size as usize);
732
733 loop {
734 let read_result = self.reader.read();
735
736 match read_result {
737 Ok(item) => {
738 match item {
739 Some(item) => {
740 read_items.push(item);
741 step_execution.read_count += 1;
742
743 if read_items.len() >= self.chunk_size as usize {
744 return Ok((read_items, ChunkStatus::Full));
745 }
746 }
747 None => {
748 if read_items.is_empty() {
749 return Ok((read_items, ChunkStatus::Finished));
750 } else {
751 return Ok((read_items, ChunkStatus::Full));
752 }
753 }
754 };
755 }
756 Err(error) => {
757 warn!("Error reading item: {}", error);
758 step_execution.read_error_count += 1;
759
760 if self.is_skip_limit_reached(step_execution) {
761 step_execution.status = StepStatus::ReadError;
763 return Err(error);
764 }
765 }
766 }
767 }
768 }
769
770 fn process_chunk(
782 &self,
783 step_execution: &mut StepExecution,
784 read_items: &[I],
785 ) -> Result<Vec<O>, BatchError> {
786 debug!("Processing chunk of {} items", read_items.len());
787 let mut result = Vec::with_capacity(read_items.len());
788
789 for item in read_items {
790 match self.processor.process(item) {
791 Ok(processed_item) => {
792 result.push(processed_item);
793 step_execution.process_count += 1;
794 }
795 Err(error) => {
796 warn!("Error processing item: {}", error);
797 step_execution.process_error_count += 1;
798
799 if self.is_skip_limit_reached(step_execution) {
800 step_execution.status = StepStatus::ProcessorError;
802 return Err(error);
803 }
804 }
805 }
806 }
807
808 Ok(result)
809 }
810
811 fn write_chunk(
823 &self,
824 step_execution: &mut StepExecution,
825 processed_items: &[O],
826 ) -> Result<(), BatchError> {
827 debug!("Writing chunk of {} items", processed_items.len());
828
829 if processed_items.is_empty() {
830 debug!("No items to write, skipping write call");
831 return Ok(());
832 }
833
834 match self.writer.write(processed_items) {
835 Ok(()) => {
836 step_execution.write_count += processed_items.len();
837 Self::manage_error(self.writer.flush());
838 Ok(())
839 }
840 Err(error) => {
841 warn!("Error writing items: {}", error);
842 step_execution.write_error_count += processed_items.len();
843
844 if self.is_skip_limit_reached(step_execution) {
845 step_execution.status = StepStatus::WriteError;
847 return Err(error);
848 }
849 Ok(())
850 }
851 }
852 }
853
854 fn is_skip_limit_reached(&self, step_execution: &StepExecution) -> bool {
855 step_execution.read_error_count
856 + step_execution.write_error_count
857 + step_execution.process_error_count
858 > self.skip_limit.into()
859 }
860 fn manage_error(result: Result<(), BatchError>) {
868 if let Err(error) = result {
869 warn!("Non-fatal error: {}", error);
870 }
871 }
872}
873
874pub struct ChunkOrientedStepBuilder<'a, I, O> {
918 name: String,
920 reader: Option<&'a dyn ItemReader<I>>,
922 processor: Option<&'a dyn ItemProcessor<I, O>>,
924 writer: Option<&'a dyn ItemWriter<O>>,
926 chunk_size: u16,
928 skip_limit: u16,
930}
931
932impl<'a, I, O> ChunkOrientedStepBuilder<'a, I, O> {
933 pub fn new(name: &str) -> Self {
950 Self {
951 name: name.to_string(),
952 reader: None,
953 processor: None,
954 writer: None,
955 chunk_size: 10,
956 skip_limit: 0,
957 }
958 }
959
960 pub fn reader(mut self, reader: &'a dyn ItemReader<I>) -> Self {
983 self.reader = Some(reader);
984 self
985 }
986
987 pub fn processor(mut self, processor: &'a dyn ItemProcessor<I, O>) -> Self {
1016 self.processor = Some(processor);
1017 self
1018 }
1019
1020 pub fn writer(mut self, writer: &'a dyn ItemWriter<O>) -> Self {
1058 self.writer = Some(writer);
1059 self
1060 }
1061
1062 pub fn chunk_size(mut self, chunk_size: u16) -> Self {
1080 self.chunk_size = chunk_size;
1081 self
1082 }
1083
1084 pub fn skip_limit(mut self, skip_limit: u16) -> Self {
1101 self.skip_limit = skip_limit;
1102 self
1103 }
1104
1105 pub fn build(self) -> ChunkOrientedStep<'a, I, O> {
1144 ChunkOrientedStep {
1145 name: self.name,
1146 reader: self.reader.expect("Reader is required for building a step"),
1147 processor: self
1148 .processor
1149 .expect("Processor is required for building a step"),
1150 writer: self.writer.expect("Writer is required for building a step"),
1151 chunk_size: self.chunk_size,
1152 skip_limit: self.skip_limit,
1153 }
1154 }
1155}
1156
1157pub struct StepBuilder {
1222 name: String,
1223}
1224
1225impl StepBuilder {
1226 pub fn new(name: &str) -> Self {
1239 Self {
1240 name: name.to_string(),
1241 }
1242 }
1243
1244 pub fn tasklet(self, tasklet: &dyn Tasklet) -> TaskletBuilder<'_> {
1269 TaskletBuilder::new(&self.name).tasklet(tasklet)
1270 }
1271
1272 pub fn chunk<'a, I, O>(self, chunk_size: u16) -> ChunkOrientedStepBuilder<'a, I, O> {
1313 ChunkOrientedStepBuilder::new(&self.name).chunk_size(chunk_size)
1314 }
1315}
1316
1317#[derive(Debug, PartialEq)]
1335pub enum ChunkStatus {
1336 Finished,
1342
1343 Full,
1349}
1350
1351#[derive(Debug, PartialEq, Clone, Copy)]
1378pub enum StepStatus {
1379 Success,
1385
1386 ReadError,
1392
1393 ProcessorError,
1399
1400 WriteError,
1406
1407 Starting,
1412
1413 Failed,
1417
1418 Started,
1422}
1423
1424#[cfg(test)]
1425mod tests {
1426 use anyhow::Result;
1427 use mockall::mock;
1428 use serde::{Deserialize, Serialize};
1429
1430 use crate::{
1431 core::{
1432 item::{
1433 ItemProcessor, ItemProcessorResult, ItemReader, ItemReaderResult, ItemWriter,
1434 ItemWriterResult,
1435 },
1436 step::{StepExecution, StepStatus},
1437 },
1438 BatchError,
1439 };
1440
1441 use super::{
1442 BatchStatus, ChunkOrientedStepBuilder, ChunkStatus, RepeatStatus, Step, StepBuilder,
1443 Tasklet, TaskletBuilder,
1444 };
1445
1446 mock! {
1447 pub TestItemReader {}
1448 impl ItemReader<Car> for TestItemReader {
1449 fn read(&self) -> ItemReaderResult<Car>;
1450 }
1451 }
1452
1453 mock! {
1454 pub TestProcessor {}
1455 impl ItemProcessor<Car, Car> for TestProcessor {
1456 fn process(&self, item: &Car) -> ItemProcessorResult<Car>;
1457 }
1458 }
1459
1460 mock! {
1461 pub TestItemWriter {}
1462 impl ItemWriter<Car> for TestItemWriter {
1463 fn write(&self, items: &[Car]) -> ItemWriterResult;
1464 fn flush(&self) -> ItemWriterResult;
1465 fn open(&self) -> ItemWriterResult;
1466 fn close(&self) -> ItemWriterResult;
1467 }
1468 }
1469
1470 mock! {
1471 pub TestTasklet {}
1472 impl Tasklet for TestTasklet {
1473 fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
1474 }
1475 }
1476
1477 #[derive(Deserialize, Serialize, Debug, Clone)]
1478 struct Car {
1479 year: u16,
1480 make: String,
1481 model: String,
1482 description: String,
1483 }
1484
1485 fn mock_read(i: &mut u16, error_count: u16, end_count: u16) -> ItemReaderResult<Car> {
1486 if end_count > 0 && *i == end_count {
1487 return Ok(None);
1488 } else if error_count > 0 && *i == error_count {
1489 return Err(BatchError::ItemReader("mock read error".to_string()));
1490 }
1491
1492 let car = Car {
1493 year: 1979,
1494 make: "make".to_owned(),
1495 model: "model".to_owned(),
1496 description: "description".to_owned(),
1497 };
1498 *i += 1;
1499 Ok(Some(car))
1500 }
1501
1502 fn mock_process(i: &mut u16, error_at: &[u16]) -> ItemProcessorResult<Car> {
1503 *i += 1;
1504 if error_at.contains(i) {
1505 return Err(BatchError::ItemProcessor("mock process error".to_string()));
1506 }
1507
1508 let car = Car {
1509 year: 1979,
1510 make: "make".to_owned(),
1511 model: "model".to_owned(),
1512 description: "description".to_owned(),
1513 };
1514 Ok(car)
1515 }
1516
1517 #[test]
1518 fn step_should_succeded_with_empty_data() -> Result<()> {
1519 let mut reader = MockTestItemReader::default();
1520 let reader_result = Ok(None);
1521 reader.expect_read().return_once(move || reader_result);
1522
1523 let mut processor = MockTestProcessor::default();
1524 processor.expect_process().never();
1525
1526 let mut writer = MockTestItemWriter::default();
1527 writer.expect_open().times(1).returning(|| Ok(()));
1528 writer.expect_write().never();
1529 writer.expect_close().times(1).returning(|| Ok(()));
1530
1531 let step = StepBuilder::new("test")
1532 .chunk(3)
1533 .reader(&reader)
1534 .processor(&processor)
1535 .writer(&writer)
1536 .build();
1537
1538 let mut step_execution = StepExecution::new(&step.name);
1539
1540 let result = step.execute(&mut step_execution);
1541
1542 assert!(result.is_ok());
1543 assert_eq!(step.get_name(), "test");
1544 assert!(!step.get_name().is_empty());
1545 assert!(!step_execution.id.is_nil());
1546 assert_eq!(step_execution.status, StepStatus::Success);
1547
1548 Ok(())
1549 }
1550
1551 #[test]
1552 fn step_should_failed_with_processor_error() -> Result<()> {
1553 let mut i = 0;
1554 let mut reader = MockTestItemReader::default();
1555 reader
1556 .expect_read()
1557 .returning(move || mock_read(&mut i, 0, 4));
1558
1559 let mut processor = MockTestProcessor::default();
1560 let mut i = 0;
1561 processor
1562 .expect_process()
1563 .returning(move |_| mock_process(&mut i, &[2]));
1564
1565 let mut writer = MockTestItemWriter::default();
1566 writer.expect_open().times(1).returning(|| Ok(()));
1567 writer.expect_write().never();
1568 writer.expect_close().times(1).returning(|| Ok(()));
1569
1570 let step = StepBuilder::new("test")
1571 .chunk(3)
1572 .reader(&reader)
1573 .processor(&processor)
1574 .writer(&writer)
1575 .build();
1576
1577 let mut step_execution = StepExecution::new(&step.name);
1578
1579 let result = step.execute(&mut step_execution);
1580
1581 assert!(result.is_err());
1582 assert_eq!(step_execution.status, StepStatus::ProcessorError);
1583
1584 Ok(())
1585 }
1586
1587 #[test]
1588 fn step_should_failed_with_write_error() -> Result<()> {
1589 let mut i = 0;
1590 let mut reader = MockTestItemReader::default();
1591 reader
1592 .expect_read()
1593 .returning(move || mock_read(&mut i, 0, 4));
1594
1595 let mut processor = MockTestProcessor::default();
1596 let mut i = 0;
1597 processor
1598 .expect_process()
1599 .returning(move |_| mock_process(&mut i, &[]));
1600
1601 let mut writer = MockTestItemWriter::default();
1602 writer.expect_open().times(1).returning(|| Ok(()));
1603 let result = Err(BatchError::ItemWriter("mock write error".to_string()));
1604 writer.expect_write().return_once(move |_| result);
1605 writer.expect_close().times(1).returning(|| Ok(()));
1606
1607 let step = StepBuilder::new("test")
1608 .chunk(3)
1609 .reader(&reader)
1610 .processor(&processor)
1611 .writer(&writer)
1612 .build();
1613
1614 let mut step_execution = StepExecution::new(&step.name);
1615
1616 let result = step.execute(&mut step_execution);
1617
1618 assert!(result.is_err());
1619 assert_eq!(step_execution.status, StepStatus::WriteError);
1620
1621 Ok(())
1622 }
1623
1624 #[test]
1625 fn step_should_succeed_even_with_processor_error() -> Result<()> {
1626 let mut i = 0;
1627 let mut reader = MockTestItemReader::default();
1628 reader
1629 .expect_read()
1630 .returning(move || mock_read(&mut i, 0, 4));
1631
1632 let mut processor = MockTestProcessor::default();
1633 let mut i = 0;
1634 processor
1635 .expect_process()
1636 .returning(move |_| mock_process(&mut i, &[2]));
1637
1638 let mut writer = MockTestItemWriter::default();
1639 writer.expect_open().times(1).returning(|| Ok(()));
1640 writer.expect_write().times(2).returning(|_| Ok(()));
1641 writer.expect_flush().times(2).returning(|| Ok(()));
1642 writer.expect_close().times(1).returning(|| Ok(()));
1643
1644 let step = StepBuilder::new("test")
1645 .chunk(3)
1646 .reader(&reader)
1647 .processor(&processor)
1648 .writer(&writer)
1649 .skip_limit(1)
1650 .build();
1651
1652 let mut step_execution = StepExecution::new(step.get_name());
1653
1654 let result = step.execute(&mut step_execution);
1655
1656 assert!(result.is_ok());
1657 assert_eq!(step_execution.status, StepStatus::Success);
1658
1659 Ok(())
1660 }
1661
1662 #[test]
1663 fn step_should_fail_with_read_error() -> Result<()> {
1664 let mut i = 0;
1665 let mut reader = MockTestItemReader::default();
1666 reader
1667 .expect_read()
1668 .returning(move || mock_read(&mut i, 1, 4));
1669
1670 let mut processor = MockTestProcessor::default();
1671 let mut i = 0;
1672 processor
1673 .expect_process()
1674 .returning(move |_| mock_process(&mut i, &[]));
1675
1676 let mut writer = MockTestItemWriter::default();
1677 writer.expect_open().times(1).returning(|| Ok(()));
1678 writer.expect_write().never();
1679 writer.expect_close().times(1).returning(|| Ok(()));
1680
1681 let step = StepBuilder::new("test")
1682 .chunk(3)
1683 .reader(&reader)
1684 .processor(&processor)
1685 .writer(&writer)
1686 .build();
1687
1688 let mut step_execution = StepExecution::new(&step.name);
1689
1690 let result = step.execute(&mut step_execution);
1691
1692 assert!(result.is_err());
1693 assert_eq!(step_execution.status, StepStatus::ReadError);
1694 assert_eq!(step_execution.read_error_count, 1);
1695
1696 Ok(())
1697 }
1698
1699 #[test]
1700 fn step_should_respect_chunk_size() -> Result<()> {
1701 let mut i = 0;
1702 let mut reader = MockTestItemReader::default();
1703 reader
1704 .expect_read()
1705 .returning(move || mock_read(&mut i, 0, 6));
1706
1707 let mut processor = MockTestProcessor::default();
1708 let mut i = 0;
1709 processor
1710 .expect_process()
1711 .returning(move |_| mock_process(&mut i, &[]));
1712
1713 let mut writer = MockTestItemWriter::default();
1714 writer.expect_open().times(1).returning(|| Ok(()));
1715 writer.expect_write().times(2).returning(|_| Ok(()));
1716 writer.expect_flush().times(2).returning(|| Ok(()));
1717 writer.expect_close().times(1).returning(|| Ok(()));
1718
1719 let step = StepBuilder::new("test")
1720 .chunk(3)
1721 .reader(&reader)
1722 .processor(&processor)
1723 .writer(&writer)
1724 .build();
1725
1726 let mut step_execution = StepExecution::new(&step.name);
1727
1728 let result = step.execute(&mut step_execution);
1729
1730 assert!(result.is_ok());
1731 assert_eq!(step_execution.status, StepStatus::Success);
1732 assert_eq!(step_execution.read_count, 6);
1733 assert_eq!(step_execution.write_count, 6);
1734
1735 Ok(())
1736 }
1737
1738 #[test]
1739 fn step_should_track_error_counts() -> Result<()> {
1740 let mut i = 0;
1741 let mut reader = MockTestItemReader::default();
1742 reader
1743 .expect_read()
1744 .returning(move || mock_read(&mut i, 0, 4));
1745
1746 let mut processor = MockTestProcessor::default();
1747 let mut i = 0;
1748 processor
1749 .expect_process()
1750 .returning(move |_| mock_process(&mut i, &[1, 2]));
1751
1752 let mut writer = MockTestItemWriter::default();
1753 writer.expect_open().times(1).returning(|| Ok(()));
1754 writer.expect_write().times(2).returning(|_| Ok(()));
1755 writer.expect_flush().times(2).returning(|| Ok(()));
1756 writer.expect_close().times(1).returning(|| Ok(()));
1757
1758 let step = StepBuilder::new("test")
1759 .chunk(3)
1760 .reader(&reader)
1761 .processor(&processor)
1762 .writer(&writer)
1763 .skip_limit(2)
1764 .build();
1765
1766 let mut step_execution = StepExecution::new(&step.name);
1767
1768 let result = step.execute(&mut step_execution);
1769
1770 assert!(result.is_ok());
1771 assert_eq!(step_execution.status, StepStatus::Success);
1772 assert_eq!(step_execution.process_error_count, 2);
1773
1774 Ok(())
1775 }
1776
1777 #[test]
1778 fn step_should_measure_execution_time() -> Result<()> {
1779 let mut i = 0;
1780 let mut reader = MockTestItemReader::default();
1781 reader
1782 .expect_read()
1783 .returning(move || mock_read(&mut i, 0, 2));
1784
1785 let mut processor = MockTestProcessor::default();
1786 let mut i = 0;
1787 processor
1788 .expect_process()
1789 .returning(move |_| mock_process(&mut i, &[]));
1790
1791 let mut writer = MockTestItemWriter::default();
1792 writer.expect_open().times(1).returning(|| Ok(()));
1793 writer.expect_write().times(1).returning(|_| Ok(()));
1794 writer.expect_flush().times(1).returning(|| Ok(()));
1795 writer.expect_close().times(1).returning(|| Ok(()));
1796
1797 let step = StepBuilder::new("test")
1798 .chunk(3)
1799 .reader(&reader)
1800 .processor(&processor)
1801 .writer(&writer)
1802 .build();
1803
1804 let mut step_execution = StepExecution::new(&step.name);
1805
1806 let result = step.execute(&mut step_execution);
1807
1808 assert!(result.is_ok());
1809 assert!(step_execution.duration.unwrap().as_nanos() > 0);
1810 assert!(step_execution.start_time.unwrap() <= step_execution.end_time.unwrap());
1811
1812 Ok(())
1813 }
1814
1815 #[test]
1816 fn step_should_handle_empty_chunk_at_end() -> Result<()> {
1817 let mut i = 0;
1818 let mut reader = MockTestItemReader::default();
1819 reader
1820 .expect_read()
1821 .returning(move || mock_read(&mut i, 0, 1));
1822
1823 let mut processor = MockTestProcessor::default();
1824 let mut i = 0;
1825 processor
1826 .expect_process()
1827 .returning(move |_| mock_process(&mut i, &[]));
1828
1829 let mut writer = MockTestItemWriter::default();
1830 writer.expect_open().times(1).returning(|| Ok(()));
1831 writer.expect_write().times(1).returning(|items| {
1832 assert_eq!(items.len(), 1); Ok(())
1834 });
1835 writer.expect_flush().times(1).returning(|| Ok(()));
1836 writer.expect_close().times(1).returning(|| Ok(()));
1837
1838 let step = StepBuilder::new("test")
1839 .chunk(3)
1840 .reader(&reader)
1841 .processor(&processor)
1842 .writer(&writer)
1843 .build();
1844
1845 let mut step_execution = StepExecution::new(&step.name);
1846
1847 let result = step.execute(&mut step_execution);
1848
1849 assert!(result.is_ok());
1850 assert_eq!(step_execution.status, StepStatus::Success);
1851 assert_eq!(step_execution.read_count, 1);
1852 assert_eq!(step_execution.write_count, 1);
1853
1854 Ok(())
1855 }
1856
1857 #[test]
1858 fn step_execution_should_initialize_correctly() -> Result<()> {
1859 let step_execution = StepExecution::new("test_step");
1860
1861 assert_eq!(step_execution.name, "test_step");
1862 assert_eq!(step_execution.status, StepStatus::Starting);
1863 assert!(step_execution.start_time.is_none());
1864 assert!(step_execution.end_time.is_none());
1865 assert!(step_execution.duration.is_none());
1866 assert_eq!(step_execution.read_count, 0);
1867 assert_eq!(step_execution.write_count, 0);
1868 assert_eq!(step_execution.read_error_count, 0);
1869 assert_eq!(step_execution.process_count, 0);
1870 assert_eq!(step_execution.process_error_count, 0);
1871 assert_eq!(step_execution.write_error_count, 0);
1872 assert!(!step_execution.id.is_nil());
1873
1874 Ok(())
1875 }
1876
1877 #[test]
1878 fn tasklet_step_should_execute_successfully() -> Result<()> {
1879 let mut tasklet = MockTestTasklet::default();
1880 tasklet
1881 .expect_execute()
1882 .times(1)
1883 .returning(|_| Ok(RepeatStatus::Finished));
1884
1885 let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
1886
1887 let mut step_execution = StepExecution::new(&step.name);
1888
1889 let result = step.execute(&mut step_execution);
1890
1891 assert!(result.is_ok());
1892 assert_eq!(step.get_name(), "tasklet_test");
1893
1894 Ok(())
1895 }
1896
1897 #[test]
1898 fn tasklet_step_should_handle_tasklet_error() -> Result<()> {
1899 let mut tasklet = MockTestTasklet::default();
1900 tasklet
1901 .expect_execute()
1902 .times(1)
1903 .returning(|_| Err(BatchError::Step("tasklet error".to_string())));
1904
1905 let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
1906
1907 let mut step_execution = StepExecution::new(&step.name);
1908
1909 let result = step.execute(&mut step_execution);
1910
1911 assert!(result.is_err());
1913 if let Err(BatchError::Step(msg)) = result {
1914 assert_eq!(msg, "tasklet error");
1915 } else {
1916 panic!("Expected Step error");
1917 }
1918
1919 Ok(())
1920 }
1921
1922 #[test]
1923 fn tasklet_step_should_handle_continuable_status() -> Result<()> {
1924 use std::cell::Cell;
1925
1926 let call_count = Cell::new(0);
1927 let mut tasklet = MockTestTasklet::default();
1928 tasklet.expect_execute().times(4).returning(move |_| {
1929 let count = call_count.get();
1930 call_count.set(count + 1);
1931 if count < 3 {
1932 Ok(RepeatStatus::Continuable)
1933 } else {
1934 Ok(RepeatStatus::Finished)
1935 }
1936 });
1937
1938 let step = StepBuilder::new("continuable_tasklet_test")
1939 .tasklet(&tasklet)
1940 .build();
1941
1942 let mut step_execution = StepExecution::new(&step.name);
1943
1944 let result = step.execute(&mut step_execution);
1945
1946 assert!(result.is_ok());
1947 assert_eq!(step.get_name(), "continuable_tasklet_test");
1948
1949 Ok(())
1950 }
1951
1952 #[test]
1953 fn tasklet_step_should_handle_multiple_continuable_cycles() -> Result<()> {
1954 use std::cell::Cell;
1955
1956 let call_count = Cell::new(0);
1957 let mut tasklet = MockTestTasklet::default();
1958
1959 tasklet.expect_execute().times(6).returning(move |_| {
1961 let count = call_count.get();
1962 call_count.set(count + 1);
1963 if count < 5 {
1964 Ok(RepeatStatus::Continuable)
1965 } else {
1966 Ok(RepeatStatus::Finished)
1967 }
1968 });
1969
1970 let step = StepBuilder::new("multi_cycle_tasklet_test")
1971 .tasklet(&tasklet)
1972 .build();
1973
1974 let mut step_execution = StepExecution::new(&step.name);
1975
1976 let result = step.execute(&mut step_execution);
1977
1978 assert!(result.is_ok());
1979 assert_eq!(step.get_name(), "multi_cycle_tasklet_test");
1980
1981 Ok(())
1982 }
1983
1984 #[test]
1985 fn tasklet_step_should_handle_error_after_continuable() -> Result<()> {
1986 use std::cell::Cell;
1987
1988 let call_count = Cell::new(0);
1989 let mut tasklet = MockTestTasklet::default();
1990
1991 tasklet.expect_execute().times(3).returning(move |_| {
1993 let count = call_count.get();
1994 call_count.set(count + 1);
1995 if count < 2 {
1996 Ok(RepeatStatus::Continuable)
1997 } else {
1998 Err(BatchError::Step("error after continuable".to_string()))
1999 }
2000 });
2001
2002 let step = StepBuilder::new("error_after_continuable_test")
2003 .tasklet(&tasklet)
2004 .build();
2005
2006 let mut step_execution = StepExecution::new(&step.name);
2007
2008 let result = step.execute(&mut step_execution);
2009
2010 assert!(result.is_err());
2011 if let Err(BatchError::Step(msg)) = result {
2012 assert_eq!(msg, "error after continuable");
2013 } else {
2014 panic!("Expected Step error");
2015 }
2016
2017 Ok(())
2018 }
2019
2020 #[test]
2021 fn tasklet_step_should_handle_immediate_finished_status() -> Result<()> {
2022 let mut tasklet = MockTestTasklet::default();
2023 tasklet
2024 .expect_execute()
2025 .times(1)
2026 .returning(|_| Ok(RepeatStatus::Finished));
2027
2028 let step = StepBuilder::new("immediate_finished_test")
2029 .tasklet(&tasklet)
2030 .build();
2031
2032 let mut step_execution = StepExecution::new(&step.name);
2033
2034 let result = step.execute(&mut step_execution);
2035
2036 assert!(result.is_ok());
2037 assert_eq!(step.get_name(), "immediate_finished_test");
2038
2039 Ok(())
2040 }
2041
2042 #[test]
2043 fn tasklet_step_should_access_step_execution_context() -> Result<()> {
2044 let mut tasklet = MockTestTasklet::default();
2045 tasklet
2046 .expect_execute()
2047 .times(1)
2048 .withf(|step_execution| {
2049 step_execution.name == "context_test"
2051 && step_execution.status == StepStatus::Started
2052 })
2053 .returning(|_| Ok(RepeatStatus::Finished));
2054
2055 let step = StepBuilder::new("context_test").tasklet(&tasklet).build();
2056
2057 let mut step_execution = StepExecution::new(&step.name);
2058
2059 let result = step.execute(&mut step_execution);
2060
2061 assert!(result.is_ok());
2062
2063 Ok(())
2064 }
2065
2066 #[test]
2067 fn tasklet_builder_should_create_valid_tasklet_step() -> Result<()> {
2068 let mut tasklet = MockTestTasklet::default();
2069 tasklet
2070 .expect_execute()
2071 .times(1)
2072 .returning(|_| Ok(RepeatStatus::Finished));
2073
2074 let step = TaskletBuilder::new("builder_test")
2075 .tasklet(&tasklet)
2076 .build();
2077
2078 let mut step_execution = StepExecution::new(&step.name);
2079
2080 let result = step.execute(&mut step_execution);
2081
2082 assert!(result.is_ok());
2083 assert_eq!(step.get_name(), "builder_test");
2084
2085 Ok(())
2086 }
2087
2088 #[test]
2089 fn tasklet_builder_should_panic_without_tasklet() {
2090 let result = std::panic::catch_unwind(|| TaskletBuilder::new("test").build());
2091
2092 assert!(result.is_err());
2093 }
2094
2095 #[test]
2096 fn step_should_handle_writer_open_error() -> Result<()> {
2097 let mut reader = MockTestItemReader::default();
2098 let reader_result = Ok(None);
2099 reader.expect_read().return_once(move || reader_result);
2100
2101 let mut processor = MockTestProcessor::default();
2102 processor.expect_process().never();
2103
2104 let mut writer = MockTestItemWriter::default();
2105 writer
2106 .expect_open()
2107 .times(1)
2108 .returning(|| Err(BatchError::ItemWriter("open error".to_string())));
2109 writer.expect_close().times(1).returning(|| Ok(()));
2110
2111 let step = StepBuilder::new("test")
2112 .chunk(3)
2113 .reader(&reader)
2114 .processor(&processor)
2115 .writer(&writer)
2116 .build();
2117
2118 let mut step_execution = StepExecution::new(&step.name);
2119
2120 let result = step.execute(&mut step_execution);
2121
2122 assert!(result.is_ok());
2124 assert_eq!(step_execution.status, StepStatus::Success);
2125
2126 Ok(())
2127 }
2128
2129 #[test]
2130 fn step_should_handle_writer_close_error() -> Result<()> {
2131 let mut reader = MockTestItemReader::default();
2132 let reader_result = Ok(None);
2133 reader.expect_read().return_once(move || reader_result);
2134
2135 let mut processor = MockTestProcessor::default();
2136 processor.expect_process().never();
2137
2138 let mut writer = MockTestItemWriter::default();
2139 writer.expect_open().times(1).returning(|| Ok(()));
2140 writer.expect_write().never();
2141 writer
2142 .expect_close()
2143 .times(1)
2144 .returning(|| Err(BatchError::ItemWriter("close error".to_string())));
2145
2146 let step = StepBuilder::new("test")
2147 .chunk(3)
2148 .reader(&reader)
2149 .processor(&processor)
2150 .writer(&writer)
2151 .build();
2152
2153 let mut step_execution = StepExecution::new(&step.name);
2154
2155 let result = step.execute(&mut step_execution);
2156
2157 assert!(result.is_ok());
2159 assert_eq!(step_execution.status, StepStatus::Success);
2160
2161 Ok(())
2162 }
2163
2164 #[test]
2165 fn step_should_handle_writer_flush_error() -> Result<()> {
2166 let mut i = 0;
2167 let mut reader = MockTestItemReader::default();
2168 reader
2169 .expect_read()
2170 .returning(move || mock_read(&mut i, 0, 2));
2171
2172 let mut processor = MockTestProcessor::default();
2173 let mut i = 0;
2174 processor
2175 .expect_process()
2176 .returning(move |_| mock_process(&mut i, &[]));
2177
2178 let mut writer = MockTestItemWriter::default();
2179 writer.expect_open().times(1).returning(|| Ok(()));
2180 writer.expect_write().times(1).returning(|_| Ok(()));
2181 writer
2182 .expect_flush()
2183 .times(1)
2184 .returning(|| Err(BatchError::ItemWriter("flush error".to_string())));
2185 writer.expect_close().times(1).returning(|| Ok(()));
2186
2187 let step = StepBuilder::new("test")
2188 .chunk(3)
2189 .reader(&reader)
2190 .processor(&processor)
2191 .writer(&writer)
2192 .build();
2193
2194 let mut step_execution = StepExecution::new(&step.name);
2195
2196 let result = step.execute(&mut step_execution);
2197
2198 assert!(result.is_ok());
2200 assert_eq!(step_execution.status, StepStatus::Success);
2201
2202 Ok(())
2203 }
2204
2205 #[test]
2206 fn step_should_handle_multiple_chunks_with_exact_chunk_size() -> Result<()> {
2207 let mut i = 0;
2208 let mut reader = MockTestItemReader::default();
2209 reader
2210 .expect_read()
2211 .returning(move || mock_read(&mut i, 0, 6));
2212
2213 let mut processor = MockTestProcessor::default();
2214 let mut i = 0;
2215 processor
2216 .expect_process()
2217 .returning(move |_| mock_process(&mut i, &[]));
2218
2219 let mut writer = MockTestItemWriter::default();
2220 writer.expect_open().times(1).returning(|| Ok(()));
2221 writer.expect_write().times(2).returning(|items| {
2222 assert_eq!(items.len(), 3); Ok(())
2224 });
2225 writer.expect_flush().times(2).returning(|| Ok(()));
2226 writer.expect_close().times(1).returning(|| Ok(()));
2227
2228 let step = StepBuilder::new("test")
2229 .chunk(3)
2230 .reader(&reader)
2231 .processor(&processor)
2232 .writer(&writer)
2233 .build();
2234
2235 let mut step_execution = StepExecution::new(&step.name);
2236
2237 let result = step.execute(&mut step_execution);
2238
2239 assert!(result.is_ok());
2240 assert_eq!(step_execution.status, StepStatus::Success);
2241 assert_eq!(step_execution.read_count, 6);
2242 assert_eq!(step_execution.write_count, 6);
2243
2244 Ok(())
2245 }
2246
2247 #[test]
2248 fn step_should_handle_skip_limit_boundary() -> Result<()> {
2249 let mut i = 0;
2250 let mut reader = MockTestItemReader::default();
2251 reader
2252 .expect_read()
2253 .returning(move || mock_read(&mut i, 0, 4));
2254
2255 let mut processor = MockTestProcessor::default();
2256 let mut i = 0;
2257 processor
2258 .expect_process()
2259 .returning(move |_| mock_process(&mut i, &[1, 2])); let mut writer = MockTestItemWriter::default();
2262 writer.expect_open().times(1).returning(|| Ok(()));
2263 writer.expect_write().times(2).returning(|_| Ok(()));
2264 writer.expect_flush().times(2).returning(|| Ok(()));
2265 writer.expect_close().times(1).returning(|| Ok(()));
2266
2267 let step = StepBuilder::new("test")
2268 .chunk(3)
2269 .reader(&reader)
2270 .processor(&processor)
2271 .writer(&writer)
2272 .skip_limit(2) .build();
2274
2275 let mut step_execution = StepExecution::new(&step.name);
2276
2277 let result = step.execute(&mut step_execution);
2278
2279 assert!(result.is_ok());
2280 assert_eq!(step_execution.status, StepStatus::Success);
2281 assert_eq!(step_execution.process_error_count, 2);
2282
2283 Ok(())
2284 }
2285
2286 #[test]
2287 fn step_should_fail_when_skip_limit_exceeded() -> Result<()> {
2288 let mut i = 0;
2289 let mut reader = MockTestItemReader::default();
2290 reader
2291 .expect_read()
2292 .returning(move || mock_read(&mut i, 0, 4));
2293
2294 let mut processor = MockTestProcessor::default();
2295 let mut i = 0;
2296 processor
2297 .expect_process()
2298 .returning(move |_| mock_process(&mut i, &[1, 2, 3])); let mut writer = MockTestItemWriter::default();
2301 writer.expect_open().times(1).returning(|| Ok(()));
2302 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
2304
2305 let step = StepBuilder::new("test")
2306 .chunk(3)
2307 .reader(&reader)
2308 .processor(&processor)
2309 .writer(&writer)
2310 .skip_limit(2) .build();
2312
2313 let mut step_execution = StepExecution::new(&step.name);
2314
2315 let result = step.execute(&mut step_execution);
2316
2317 assert!(result.is_err());
2318 assert_eq!(step_execution.status, StepStatus::ProcessorError);
2319 assert_eq!(step_execution.process_error_count, 3);
2320
2321 Ok(())
2322 }
2323
2324 #[test]
2325 fn step_should_handle_empty_processed_chunk() -> Result<()> {
2326 let mut i = 0;
2327 let mut reader = MockTestItemReader::default();
2328 reader
2329 .expect_read()
2330 .returning(move || mock_read(&mut i, 0, 3));
2331
2332 let mut processor = MockTestProcessor::default();
2333 let mut i = 0;
2334 processor
2335 .expect_process()
2336 .returning(move |_| mock_process(&mut i, &[1, 2, 3, 4])); let mut writer = MockTestItemWriter::default();
2339 writer.expect_open().times(1).returning(|| Ok(()));
2340 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
2342
2343 let step = StepBuilder::new("test")
2344 .chunk(3)
2345 .reader(&reader)
2346 .processor(&processor)
2347 .writer(&writer)
2348 .skip_limit(3) .build();
2350
2351 let mut step_execution = StepExecution::new(&step.name);
2352
2353 let result = step.execute(&mut step_execution);
2354
2355 assert!(result.is_ok());
2356 assert_eq!(step_execution.status, StepStatus::Success);
2357 assert_eq!(step_execution.process_error_count, 3);
2358 assert_eq!(step_execution.write_count, 0); Ok(())
2361 }
2362
2363 #[test]
2364 fn chunk_status_should_be_comparable() {
2365 assert_eq!(ChunkStatus::Finished, ChunkStatus::Finished);
2366 assert_eq!(ChunkStatus::Full, ChunkStatus::Full);
2367 assert_ne!(ChunkStatus::Finished, ChunkStatus::Full);
2368 }
2369
2370 #[test]
2371 fn step_status_should_be_comparable() {
2372 assert_eq!(StepStatus::Success, StepStatus::Success);
2373 assert_eq!(StepStatus::ReadError, StepStatus::ReadError);
2374 assert_eq!(StepStatus::ProcessorError, StepStatus::ProcessorError);
2375 assert_eq!(StepStatus::WriteError, StepStatus::WriteError);
2376 assert_eq!(StepStatus::Starting, StepStatus::Starting);
2377
2378 assert_ne!(StepStatus::Success, StepStatus::ReadError);
2379 assert_ne!(StepStatus::ProcessorError, StepStatus::WriteError);
2380 }
2381
2382 #[test]
2383 fn repeat_status_should_be_comparable() {
2384 assert_eq!(RepeatStatus::Continuable, RepeatStatus::Continuable);
2385 assert_eq!(RepeatStatus::Finished, RepeatStatus::Finished);
2386 assert_ne!(RepeatStatus::Continuable, RepeatStatus::Finished);
2387 }
2388
2389 #[test]
2390 fn step_builder_should_create_chunk_oriented_step() -> Result<()> {
2391 let mut reader = MockTestItemReader::default();
2392 reader.expect_read().return_once(|| Ok(None));
2393
2394 let mut processor = MockTestProcessor::default();
2395 processor.expect_process().never();
2396
2397 let mut writer = MockTestItemWriter::default();
2398 writer.expect_open().times(1).returning(|| Ok(()));
2399 writer.expect_write().never();
2400 writer.expect_close().times(1).returning(|| Ok(()));
2401
2402 let step = StepBuilder::new("builder_test")
2403 .chunk(5)
2404 .reader(&reader)
2405 .processor(&processor)
2406 .writer(&writer)
2407 .skip_limit(10)
2408 .build();
2409
2410 let mut step_execution = StepExecution::new(&step.name);
2411 let result = step.execute(&mut step_execution);
2412
2413 assert!(result.is_ok());
2414 assert_eq!(step.get_name(), "builder_test");
2415
2416 Ok(())
2417 }
2418
2419 #[test]
2420 fn step_should_handle_large_chunk_size() -> Result<()> {
2421 let mut i = 0;
2422 let mut reader = MockTestItemReader::default();
2423 reader
2424 .expect_read()
2425 .returning(move || mock_read(&mut i, 0, 5));
2426
2427 let mut processor = MockTestProcessor::default();
2428 let mut i = 0;
2429 processor
2430 .expect_process()
2431 .returning(move |_| mock_process(&mut i, &[]));
2432
2433 let mut writer = MockTestItemWriter::default();
2434 writer.expect_open().times(1).returning(|| Ok(()));
2435 writer.expect_write().times(1).returning(|items| {
2436 assert_eq!(items.len(), 5); Ok(())
2438 });
2439 writer.expect_flush().times(1).returning(|| Ok(()));
2440 writer.expect_close().times(1).returning(|| Ok(()));
2441
2442 let step = StepBuilder::new("test")
2443 .chunk(100) .reader(&reader)
2445 .processor(&processor)
2446 .writer(&writer)
2447 .build();
2448
2449 let mut step_execution = StepExecution::new(&step.name);
2450
2451 let result = step.execute(&mut step_execution);
2452
2453 assert!(result.is_ok());
2454 assert_eq!(step_execution.status, StepStatus::Success);
2455 assert_eq!(step_execution.read_count, 5);
2456 assert_eq!(step_execution.write_count, 5);
2457
2458 Ok(())
2459 }
2460
2461 #[test]
2462 fn step_should_handle_mixed_errors_within_skip_limit() -> Result<()> {
2463 use std::cell::Cell;
2464
2465 let read_counter = Cell::new(0u16);
2466 let mut reader = MockTestItemReader::default();
2467 reader.expect_read().returning(move || {
2468 let current = read_counter.get();
2469 if current == 2 {
2470 read_counter.set(current + 1);
2471 Err(BatchError::ItemReader("read error".to_string()))
2472 } else {
2473 let mut i = current;
2474 let result = mock_read(&mut i, 0, 6);
2475 read_counter.set(i);
2476 result
2477 }
2478 });
2479
2480 let mut processor = MockTestProcessor::default();
2481 let mut i = 0;
2482 processor
2483 .expect_process()
2484 .returning(move |_| mock_process(&mut i, &[2])); let mut writer = MockTestItemWriter::default();
2487 writer.expect_open().times(1).returning(|| Ok(()));
2488 writer.expect_write().times(2).returning(|_| Ok(()));
2489 writer.expect_flush().times(2).returning(|| Ok(()));
2490 writer.expect_close().times(1).returning(|| Ok(()));
2491
2492 let step = StepBuilder::new("test")
2493 .chunk(3)
2494 .reader(&reader)
2495 .processor(&processor)
2496 .writer(&writer)
2497 .skip_limit(2) .build();
2499
2500 let mut step_execution = StepExecution::new(&step.name);
2501
2502 let result = step.execute(&mut step_execution);
2503
2504 assert!(result.is_ok());
2505 assert_eq!(step_execution.status, StepStatus::Success);
2506 assert_eq!(step_execution.read_error_count, 1);
2507 assert_eq!(step_execution.process_error_count, 1);
2508
2509 Ok(())
2510 }
2511
2512 #[test]
2513 fn step_execution_should_be_cloneable() -> Result<()> {
2514 let step_execution = StepExecution::new("test_step");
2515 let cloned_execution = step_execution.clone();
2516
2517 assert_eq!(step_execution.id, cloned_execution.id);
2518 assert_eq!(step_execution.name, cloned_execution.name);
2519 assert_eq!(step_execution.status, cloned_execution.status);
2520 assert_eq!(step_execution.read_count, cloned_execution.read_count);
2521 assert_eq!(step_execution.write_count, cloned_execution.write_count);
2522
2523 Ok(())
2524 }
2525
2526 #[test]
2527 fn step_should_handle_zero_chunk_size() -> Result<()> {
2528 let mut reader = MockTestItemReader::default();
2529 reader.expect_read().return_once(|| Ok(None));
2530
2531 let mut processor = MockTestProcessor::default();
2532 processor.expect_process().never();
2533
2534 let mut writer = MockTestItemWriter::default();
2535 writer.expect_open().times(1).returning(|| Ok(()));
2536 writer.expect_write().never();
2537 writer.expect_close().times(1).returning(|| Ok(()));
2538
2539 let step = StepBuilder::new("test")
2541 .chunk(1)
2542 .reader(&reader)
2543 .processor(&processor)
2544 .writer(&writer)
2545 .build();
2546
2547 let mut step_execution = StepExecution::new(&step.name);
2548
2549 let result = step.execute(&mut step_execution);
2550
2551 assert!(result.is_ok());
2552 assert_eq!(step_execution.status, StepStatus::Success);
2553
2554 Ok(())
2555 }
2556
2557 #[test]
2558 fn step_should_handle_continuous_read_errors_until_skip_limit() -> Result<()> {
2559 use std::cell::Cell;
2560
2561 let counter = Cell::new(0u16);
2562 let mut reader = MockTestItemReader::default();
2563 reader.expect_read().returning(move || {
2564 let current = counter.get();
2565 counter.set(current + 1);
2566 if current < 3 {
2567 Err(BatchError::ItemReader("continuous read error".to_string()))
2568 } else {
2569 Ok(None) }
2571 });
2572
2573 let mut processor = MockTestProcessor::default();
2574 processor.expect_process().never();
2575
2576 let mut writer = MockTestItemWriter::default();
2577 writer.expect_open().times(1).returning(|| Ok(()));
2578 writer.expect_write().never();
2579 writer.expect_close().times(1).returning(|| Ok(()));
2580
2581 let step = StepBuilder::new("test")
2582 .chunk(3)
2583 .reader(&reader)
2584 .processor(&processor)
2585 .writer(&writer)
2586 .skip_limit(2) .build();
2588
2589 let mut step_execution = StepExecution::new(&step.name);
2590
2591 let result = step.execute(&mut step_execution);
2592
2593 assert!(result.is_err());
2594 assert_eq!(step_execution.status, StepStatus::ReadError);
2595 assert_eq!(step_execution.read_error_count, 3);
2596
2597 Ok(())
2598 }
2599
2600 #[test]
2601 fn step_should_handle_write_error_with_skip_limit() -> Result<()> {
2602 let mut i = 0;
2603 let mut reader = MockTestItemReader::default();
2604 reader
2605 .expect_read()
2606 .returning(move || mock_read(&mut i, 0, 4));
2607
2608 let mut processor = MockTestProcessor::default();
2609 let mut i = 0;
2610 processor
2611 .expect_process()
2612 .returning(move |_| mock_process(&mut i, &[]));
2613
2614 let mut writer = MockTestItemWriter::default();
2615 writer.expect_open().times(1).returning(|| Ok(()));
2616 writer
2617 .expect_write()
2618 .times(1)
2619 .returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
2620 writer.expect_close().times(1).returning(|| Ok(()));
2621
2622 let step = StepBuilder::new("test")
2623 .chunk(3)
2624 .reader(&reader)
2625 .processor(&processor)
2626 .writer(&writer)
2627 .skip_limit(0) .build();
2629
2630 let mut step_execution = StepExecution::new(&step.name);
2631
2632 let result = step.execute(&mut step_execution);
2633
2634 assert!(result.is_err());
2635 assert_eq!(step_execution.status, StepStatus::WriteError);
2636 assert_eq!(step_execution.write_error_count, 3); Ok(())
2639 }
2640
2641 #[test]
2642 fn step_should_succeed_when_write_error_within_skip_limit() -> Result<()> {
2643 let mut i = 0;
2644 let mut reader = MockTestItemReader::default();
2645 reader
2646 .expect_read()
2647 .returning(move || mock_read(&mut i, 0, 3));
2648
2649 let mut processor = MockTestProcessor::default();
2650 let mut i = 0;
2651 processor
2652 .expect_process()
2653 .returning(move |_| mock_process(&mut i, &[]));
2654
2655 let mut writer = MockTestItemWriter::default();
2656 writer.expect_open().times(1).returning(|| Ok(()));
2657 writer
2658 .expect_write()
2659 .times(1)
2660 .returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
2661 writer.expect_close().times(1).returning(|| Ok(()));
2662
2663 let step = StepBuilder::new("test")
2664 .chunk(3)
2665 .reader(&reader)
2666 .processor(&processor)
2667 .writer(&writer)
2668 .skip_limit(3) .build();
2670
2671 let mut step_execution = StepExecution::new(&step.name);
2672
2673 let result = step.execute(&mut step_execution);
2674
2675 assert!(result.is_ok());
2676 assert_eq!(step_execution.status, StepStatus::Success);
2677 assert_eq!(step_execution.write_error_count, 3);
2678 assert_eq!(step_execution.write_count, 0); Ok(())
2681 }
2682
2683 #[test]
2684 fn step_should_handle_partial_chunk_at_end() -> Result<()> {
2685 let mut i = 0;
2686 let mut reader = MockTestItemReader::default();
2687 reader
2688 .expect_read()
2689 .returning(move || mock_read(&mut i, 0, 2)); let mut processor = MockTestProcessor::default();
2692 let mut i = 0;
2693 processor
2694 .expect_process()
2695 .returning(move |_| mock_process(&mut i, &[]));
2696
2697 let mut writer = MockTestItemWriter::default();
2698 writer.expect_open().times(1).returning(|| Ok(()));
2699 writer.expect_write().times(1).returning(|items| {
2700 assert_eq!(items.len(), 2); Ok(())
2702 });
2703 writer.expect_flush().times(1).returning(|| Ok(()));
2704 writer.expect_close().times(1).returning(|| Ok(()));
2705
2706 let step = StepBuilder::new("test")
2707 .chunk(3)
2708 .reader(&reader)
2709 .processor(&processor)
2710 .writer(&writer)
2711 .build();
2712
2713 let mut step_execution = StepExecution::new(&step.name);
2714
2715 let result = step.execute(&mut step_execution);
2716
2717 assert!(result.is_ok());
2718 assert_eq!(step_execution.status, StepStatus::Success);
2719 assert_eq!(step_execution.read_count, 2);
2720 assert_eq!(step_execution.write_count, 2);
2721
2722 Ok(())
2723 }
2724
2725 #[test]
2726 fn batch_status_should_have_all_variants() {
2727 let _completed = BatchStatus::COMPLETED;
2729 let _starting = BatchStatus::STARTING;
2730 let _started = BatchStatus::STARTED;
2731 let _stopping = BatchStatus::STOPPING;
2732 let _stopped = BatchStatus::STOPPED;
2733 let _failed = BatchStatus::FAILED;
2734 let _abandoned = BatchStatus::ABANDONED;
2735 let _unknown = BatchStatus::UNKNOWN;
2736 }
2737
2738 #[test]
2739 fn tasklet_builder_should_require_tasklet() {
2740 let mut tasklet = MockTestTasklet::default();
2741 tasklet.expect_execute().never();
2742
2743 let builder = TaskletBuilder::new("test").tasklet(&tasklet);
2746 let _step = builder.build(); }
2748
2749 #[test]
2750 fn chunk_oriented_step_builder_should_require_all_components() -> Result<()> {
2751 let mut reader = MockTestItemReader::default();
2752 reader.expect_read().return_once(|| Ok(None));
2753
2754 let mut processor = MockTestProcessor::default();
2755 processor.expect_process().never();
2756
2757 let mut writer = MockTestItemWriter::default();
2758 writer.expect_open().times(1).returning(|| Ok(()));
2759 writer.expect_write().never();
2760 writer.expect_close().times(1).returning(|| Ok(()));
2761
2762 let step = ChunkOrientedStepBuilder::new("test")
2764 .reader(&reader)
2765 .processor(&processor)
2766 .writer(&writer)
2767 .chunk_size(10)
2768 .skip_limit(5)
2769 .build();
2770
2771 let mut step_execution = StepExecution::new(&step.name);
2772 let result = step.execute(&mut step_execution);
2773
2774 assert!(result.is_ok());
2775 assert_eq!(step.get_name(), "test");
2776
2777 Ok(())
2778 }
2779
2780 #[test]
2781 fn step_should_handle_maximum_skip_limit() -> Result<()> {
2782 let mut i = 0;
2783 let mut reader = MockTestItemReader::default();
2784 reader
2785 .expect_read()
2786 .returning(move || mock_read(&mut i, 0, 3)); let mut processor = MockTestProcessor::default();
2789 let mut i = 0;
2790 processor
2791 .expect_process()
2792 .returning(move |_| mock_process(&mut i, &[1, 2, 3])); let mut writer = MockTestItemWriter::default();
2795 writer.expect_open().times(1).returning(|| Ok(()));
2796 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
2798
2799 let step = StepBuilder::new("test")
2800 .chunk(3)
2801 .reader(&reader)
2802 .processor(&processor)
2803 .writer(&writer)
2804 .skip_limit(u16::MAX) .build();
2806
2807 let mut step_execution = StepExecution::new(&step.name);
2808
2809 let result = step.execute(&mut step_execution);
2810
2811 assert!(result.is_ok());
2812 assert_eq!(step_execution.status, StepStatus::Success);
2813 assert_eq!(step_execution.process_error_count, 3);
2814
2815 Ok(())
2816 }
2817
2818 #[test]
2819 fn step_should_handle_tasklet_step_timing() -> Result<()> {
2820 let mut tasklet = MockTestTasklet::default();
2821 tasklet
2822 .expect_execute()
2823 .times(1)
2824 .returning(|_| Ok(RepeatStatus::Finished));
2825
2826 let step = StepBuilder::new("timing_test").tasklet(&tasklet).build();
2827
2828 let mut step_execution = StepExecution::new(&step.name);
2829
2830 let result = step.execute(&mut step_execution);
2831
2832 assert!(result.is_ok());
2833 assert!(step_execution.start_time.is_some());
2834 assert!(step_execution.end_time.is_some());
2835 assert!(step_execution.duration.is_some());
2836 assert!(step_execution.duration.unwrap().as_nanos() > 0);
2837
2838 Ok(())
2839 }
2840
2841 #[test]
2842 fn step_should_handle_tasklet_step_status_transitions() -> Result<()> {
2843 let mut tasklet = MockTestTasklet::default();
2844 tasklet
2845 .expect_execute()
2846 .times(1)
2847 .returning(|step_execution| {
2848 assert_eq!(step_execution.status, StepStatus::Started);
2850 Ok(RepeatStatus::Finished)
2851 });
2852
2853 let step = StepBuilder::new("status_test").tasklet(&tasklet).build();
2854
2855 let mut step_execution = StepExecution::new(&step.name);
2856 assert_eq!(step_execution.status, StepStatus::Starting);
2857
2858 let result = step.execute(&mut step_execution);
2859
2860 assert!(result.is_ok());
2861 assert_eq!(step_execution.status, StepStatus::Success);
2862
2863 Ok(())
2864 }
2865
2866 #[test]
2867 fn step_should_handle_tasklet_step_failed_status() -> Result<()> {
2868 let mut tasklet = MockTestTasklet::default();
2869 tasklet
2870 .expect_execute()
2871 .times(1)
2872 .returning(|_| Err(BatchError::Step("tasklet failure".to_string())));
2873
2874 let step = StepBuilder::new("failed_test").tasklet(&tasklet).build();
2875
2876 let mut step_execution = StepExecution::new(&step.name);
2877
2878 let result = step.execute(&mut step_execution);
2879
2880 assert!(result.is_err());
2881 assert_eq!(step_execution.status, StepStatus::Failed);
2882 assert!(step_execution.end_time.is_some());
2883 assert!(step_execution.duration.is_some());
2884
2885 Ok(())
2886 }
2887
2888 #[test]
2889 fn chunk_oriented_step_builder_should_panic_without_reader() {
2890 let mut processor = MockTestProcessor::default();
2891 processor.expect_process().never();
2892
2893 let mut writer = MockTestItemWriter::default();
2894 writer.expect_open().never();
2895
2896 let result = std::panic::catch_unwind(|| {
2897 ChunkOrientedStepBuilder::new("test")
2898 .processor(&processor)
2899 .writer(&writer)
2900 .build()
2901 });
2902
2903 assert!(result.is_err());
2904 }
2905
2906 #[test]
2907 fn chunk_oriented_step_builder_should_panic_without_processor() {
2908 let mut reader = MockTestItemReader::default();
2909 reader.expect_read().never();
2910
2911 let mut writer = MockTestItemWriter::default();
2912 writer.expect_open().never();
2913
2914 let result = std::panic::catch_unwind(|| {
2915 ChunkOrientedStepBuilder::new("test")
2916 .reader(&reader)
2917 .writer(&writer)
2918 .build()
2919 });
2920
2921 assert!(result.is_err());
2922 }
2923
2924 #[test]
2925 fn chunk_oriented_step_builder_should_panic_without_writer() {
2926 let mut reader = MockTestItemReader::default();
2927 reader.expect_read().never();
2928
2929 let mut processor = MockTestProcessor::default();
2930 processor.expect_process().never();
2931
2932 let result = std::panic::catch_unwind(|| {
2933 ChunkOrientedStepBuilder::new("test")
2934 .reader(&reader)
2935 .processor(&processor)
2936 .build()
2937 });
2938
2939 assert!(result.is_err());
2940 }
2941
2942 #[test]
2943 fn step_should_handle_read_chunk_with_full_chunk() -> Result<()> {
2944 let mut i = 0;
2945 let mut reader = MockTestItemReader::default();
2946 reader
2947 .expect_read()
2948 .returning(move || mock_read(&mut i, 0, 4)); let mut processor = MockTestProcessor::default();
2951 let mut i = 0;
2952 processor
2953 .expect_process()
2954 .returning(move |_| mock_process(&mut i, &[]));
2955
2956 let mut writer = MockTestItemWriter::default();
2957 writer.expect_open().times(1).returning(|| Ok(()));
2958 writer.expect_write().times(2).returning(|items| {
2959 assert!(items.len() <= 3);
2961 Ok(())
2962 });
2963 writer.expect_flush().times(2).returning(|| Ok(()));
2964 writer.expect_close().times(1).returning(|| Ok(()));
2965
2966 let step = StepBuilder::new("test")
2967 .chunk(3)
2968 .reader(&reader)
2969 .processor(&processor)
2970 .writer(&writer)
2971 .build();
2972
2973 let mut step_execution = StepExecution::new(&step.name);
2974
2975 let result = step.execute(&mut step_execution);
2976
2977 assert!(result.is_ok());
2978 assert_eq!(step_execution.status, StepStatus::Success);
2979 assert_eq!(step_execution.read_count, 4);
2980 assert_eq!(step_execution.write_count, 4);
2981
2982 Ok(())
2983 }
2984
2985 #[test]
2986 fn step_should_handle_process_chunk_with_all_errors() -> Result<()> {
2987 let mut i = 0;
2988 let mut reader = MockTestItemReader::default();
2989 reader
2990 .expect_read()
2991 .returning(move || mock_read(&mut i, 0, 3));
2992
2993 let mut processor = MockTestProcessor::default();
2994 let mut i = 0;
2995 processor
2996 .expect_process()
2997 .returning(move |_| mock_process(&mut i, &[1, 2, 3])); let mut writer = MockTestItemWriter::default();
3000 writer.expect_open().times(1).returning(|| Ok(()));
3001 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
3003
3004 let step = StepBuilder::new("test")
3005 .chunk(3)
3006 .reader(&reader)
3007 .processor(&processor)
3008 .writer(&writer)
3009 .skip_limit(5) .build();
3011
3012 let mut step_execution = StepExecution::new(&step.name);
3013
3014 let result = step.execute(&mut step_execution);
3015
3016 assert!(result.is_ok());
3017 assert_eq!(step_execution.status, StepStatus::Success);
3018 assert_eq!(step_execution.process_error_count, 3);
3019 assert_eq!(step_execution.write_count, 0);
3020
3021 Ok(())
3022 }
3023
3024 #[test]
3025 fn step_should_handle_write_chunk_with_empty_items() -> Result<()> {
3026 let mut reader = MockTestItemReader::default();
3027 reader.expect_read().return_once(|| Ok(None));
3028
3029 let mut processor = MockTestProcessor::default();
3030 processor.expect_process().never();
3031
3032 let mut writer = MockTestItemWriter::default();
3033 writer.expect_open().times(1).returning(|| Ok(()));
3034 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
3036
3037 let step = StepBuilder::new("test")
3038 .chunk(3)
3039 .reader(&reader)
3040 .processor(&processor)
3041 .writer(&writer)
3042 .build();
3043
3044 let mut step_execution = StepExecution::new(&step.name);
3045
3046 let result = step.execute(&mut step_execution);
3047
3048 assert!(result.is_ok());
3049 assert_eq!(step_execution.status, StepStatus::Success);
3050 assert_eq!(step_execution.read_count, 0);
3051 assert_eq!(step_execution.write_count, 0);
3052
3053 Ok(())
3054 }
3055
3056 #[test]
3057 fn step_should_handle_is_skip_limit_reached_boundary_conditions() -> Result<()> {
3058 let mut i = 0;
3059 let mut reader = MockTestItemReader::default();
3060 reader
3061 .expect_read()
3062 .returning(move || mock_read(&mut i, 0, 4));
3063
3064 let mut processor = MockTestProcessor::default();
3065 let mut i = 0;
3066 processor
3067 .expect_process()
3068 .returning(move |_| mock_process(&mut i, &[1, 2])); let mut writer = MockTestItemWriter::default();
3071 writer.expect_open().times(1).returning(|| Ok(()));
3072 writer.expect_write().times(2).returning(|_| Ok(()));
3073 writer.expect_flush().times(2).returning(|| Ok(()));
3074 writer.expect_close().times(1).returning(|| Ok(()));
3075
3076 let step = StepBuilder::new("test")
3077 .chunk(3)
3078 .reader(&reader)
3079 .processor(&processor)
3080 .writer(&writer)
3081 .skip_limit(2) .build();
3083
3084 let mut step_execution = StepExecution::new(&step.name);
3085
3086 let result = step.execute(&mut step_execution);
3087
3088 assert!(result.is_ok());
3089 assert_eq!(step_execution.status, StepStatus::Success);
3090 assert_eq!(step_execution.process_error_count, 2);
3091
3092 Ok(())
3093 }
3094
3095 #[test]
3096 fn step_should_handle_manage_error_with_various_errors() -> Result<()> {
3097 let mut reader = MockTestItemReader::default();
3098 reader.expect_read().return_once(|| Ok(None));
3099
3100 let mut processor = MockTestProcessor::default();
3101 processor.expect_process().never();
3102
3103 let mut writer = MockTestItemWriter::default();
3104 writer
3105 .expect_open()
3106 .times(1)
3107 .returning(|| Err(BatchError::ItemWriter("open error".to_string())));
3108 writer.expect_write().never();
3109 writer
3110 .expect_close()
3111 .times(1)
3112 .returning(|| Err(BatchError::ItemWriter("close error".to_string())));
3113
3114 let step = StepBuilder::new("test")
3115 .chunk(3)
3116 .reader(&reader)
3117 .processor(&processor)
3118 .writer(&writer)
3119 .build();
3120
3121 let mut step_execution = StepExecution::new(&step.name);
3122
3123 let result = step.execute(&mut step_execution);
3124
3125 assert!(result.is_ok());
3127 assert_eq!(step_execution.status, StepStatus::Success);
3128
3129 Ok(())
3130 }
3131
3132 #[test]
3133 fn step_execution_should_have_unique_ids() -> Result<()> {
3134 let step_execution1 = StepExecution::new("test1");
3135 let step_execution2 = StepExecution::new("test2");
3136
3137 assert_ne!(step_execution1.id, step_execution2.id);
3138
3139 Ok(())
3140 }
3141
3142 #[test]
3143 fn step_execution_should_clone_with_same_values() -> Result<()> {
3144 let mut step_execution = StepExecution::new("test_step");
3145 step_execution.read_count = 10;
3146 step_execution.write_count = 8;
3147 step_execution.status = StepStatus::Success;
3148
3149 let cloned_execution = step_execution.clone();
3150
3151 assert_eq!(step_execution.id, cloned_execution.id);
3152 assert_eq!(step_execution.name, cloned_execution.name);
3153 assert_eq!(step_execution.status, cloned_execution.status);
3154 assert_eq!(step_execution.read_count, cloned_execution.read_count);
3155 assert_eq!(step_execution.write_count, cloned_execution.write_count);
3156
3157 Ok(())
3158 }
3159
3160 #[test]
3161 fn step_status_should_support_copy_trait() {
3162 let status1 = StepStatus::Success;
3163 let status2 = status1; assert_eq!(status1, status2);
3166 assert_eq!(status1, StepStatus::Success); }
3168
3169 #[test]
3170 fn step_status_should_support_debug_trait() {
3171 let status = StepStatus::ProcessorError;
3172 let debug_string = format!("{:?}", status);
3173
3174 assert!(debug_string.contains("ProcessorError"));
3175 }
3176
3177 #[test]
3178 fn chunk_status_should_support_debug_trait() {
3179 let status = ChunkStatus::Full;
3180 let debug_string = format!("{:?}", status);
3181
3182 assert!(debug_string.contains("Full"));
3183 }
3184
3185 #[test]
3186 fn repeat_status_should_support_debug_trait() {
3187 let status = RepeatStatus::Continuable;
3188 let debug_string = format!("{:?}", status);
3189
3190 assert!(debug_string.contains("Continuable"));
3191 }
3192}