1use crate::BatchError;
93use log::{debug, error, info, warn};
94use std::time::{Duration, Instant};
95use uuid::Uuid;
96
97use super::item::{ItemProcessor, ItemReader, ItemWriter};
98
99pub trait Tasklet {
125 fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
134}
135
136pub struct TaskletStep<'a> {
159 name: String,
160 tasklet: &'a dyn Tasklet,
161}
162
163impl Step for TaskletStep<'_> {
164 fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
165 step_execution.status = StepStatus::Started;
166 let start_time = Instant::now();
167
168 info!(
169 "Start of step: {}, id: {}",
170 step_execution.name, step_execution.id
171 );
172
173 loop {
174 let result = self.tasklet.execute(step_execution);
175 match result {
176 Ok(RepeatStatus::Continuable) => {}
177 Ok(RepeatStatus::Finished) => {
178 step_execution.status = StepStatus::Success;
179 break;
180 }
181 Err(e) => {
182 error!(
183 "Error in step: {}, id: {}, error: {}",
184 step_execution.name, step_execution.id, e
185 );
186 step_execution.status = StepStatus::Failed;
187 step_execution.end_time = Some(Instant::now());
188 step_execution.duration = Some(start_time.elapsed());
189 return Err(e);
190 }
191 }
192 }
193
194 step_execution.start_time = Some(start_time);
196 step_execution.end_time = Some(Instant::now());
197 step_execution.duration = Some(start_time.elapsed());
198
199 Ok(())
200 }
201
202 fn get_name(&self) -> &str {
203 &self.name
204 }
205}
206
207pub struct TaskletBuilder<'a> {
231 name: String,
232 tasklet: Option<&'a dyn Tasklet>,
233}
234
235impl<'a> TaskletBuilder<'a> {
236 pub fn new(name: &str) -> Self {
249 Self {
250 name: name.to_string(),
251 tasklet: None,
252 }
253 }
254
255 pub fn tasklet(mut self, tasklet: &'a dyn Tasklet) -> Self {
278 self.tasklet = Some(tasklet);
279 self
280 }
281
282 pub fn build(self) -> TaskletStep<'a> {
306 TaskletStep {
307 name: self.name,
308 tasklet: self
309 .tasklet
310 .expect("Tasklet is required for building a step"),
311 }
312 }
313}
314
315#[derive(Clone)]
332pub struct StepExecution {
333 pub id: Uuid,
335 pub name: String,
337 pub status: StepStatus,
339 pub start_time: Option<Instant>,
341 pub end_time: Option<Instant>,
343 pub duration: Option<Duration>,
345 pub read_count: usize,
347 pub write_count: usize,
349 pub read_error_count: usize,
351 pub process_count: usize,
353 pub process_error_count: usize,
355 pub filter_count: usize,
357 pub write_error_count: usize,
359}
360
361impl StepExecution {
362 pub fn new(name: &str) -> Self {
381 Self {
382 id: Uuid::new_v4(),
383 name: name.to_string(),
384 status: StepStatus::Starting,
385 start_time: None,
386 end_time: None,
387 duration: None,
388 read_count: 0,
389 write_count: 0,
390 read_error_count: 0,
391 process_count: 0,
392 process_error_count: 0,
393 filter_count: 0,
394 write_error_count: 0,
395 }
396 }
397}
398
399pub enum BatchStatus {
417 COMPLETED,
419 STARTING,
421 STARTED,
423 STOPPING,
425 STOPPED,
427 FAILED,
429 ABANDONED,
431 UNKNOWN,
433}
434
435pub trait Step {
463 fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError>;
496
497 fn get_name(&self) -> &str;
513}
514
515#[derive(Debug, PartialEq)]
532pub enum RepeatStatus {
533 Continuable,
538 Finished,
543}
544
545pub struct ChunkOrientedStep<'a, I, O> {
592 name: String,
593 reader: &'a dyn ItemReader<I>,
595 processor: &'a dyn ItemProcessor<I, O>,
597 writer: &'a dyn ItemWriter<O>,
599 chunk_size: u16,
601 skip_limit: u16,
603}
604
605impl<I, O> Step for ChunkOrientedStep<'_, I, O> {
606 fn execute(&self, step_execution: &mut StepExecution) -> Result<(), BatchError> {
607 let start_time = Instant::now();
609 info!(
610 "Start of step: {}, id: {}",
611 step_execution.name, step_execution.id
612 );
613
614 Self::manage_error(self.writer.open());
616
617 loop {
619 let (read_items, chunk_status) = match self.read_chunk(step_execution) {
621 Ok(chunk_data) => chunk_data,
622 Err(_) => {
623 step_execution.status = StepStatus::ReadError;
624 break;
625 }
626 };
627
628 if read_items.is_empty() {
630 step_execution.status = StepStatus::Success;
631 break;
632 }
633
634 if self
636 .process_and_write_chunk(step_execution, &read_items)
637 .is_err()
638 {
639 break; }
641
642 if chunk_status == ChunkStatus::Finished {
644 step_execution.status = StepStatus::Success;
645 break;
646 }
647 }
648
649 Self::manage_error(self.writer.close());
651
652 info!(
654 "End of step: {}, id: {}",
655 step_execution.name, step_execution.id
656 );
657
658 step_execution.start_time = Some(start_time);
660 step_execution.end_time = Some(Instant::now());
661 step_execution.duration = Some(start_time.elapsed());
662
663 if StepStatus::Success == step_execution.status {
666 Ok(())
667 } else {
668 Err(BatchError::Step(step_execution.name.clone()))
669 }
670 }
671
672 fn get_name(&self) -> &str {
673 &self.name
674 }
675}
676
677impl<I, O> ChunkOrientedStep<'_, I, O> {
678 fn process_and_write_chunk(
691 &self,
692 step_execution: &mut StepExecution,
693 read_items: &[I],
694 ) -> Result<(), BatchError> {
695 let processed_items = match self.process_chunk(step_execution, read_items) {
697 Ok(items) => items,
698 Err(error) => {
699 step_execution.status = StepStatus::ProcessorError;
700 return Err(error);
701 }
702 };
703
704 match self.write_chunk(step_execution, &processed_items) {
706 Ok(()) => Ok(()),
707 Err(error) => {
708 step_execution.status = StepStatus::WriteError;
709 Err(error)
710 }
711 }
712 }
713
714 fn read_chunk(
730 &self,
731 step_execution: &mut StepExecution,
732 ) -> Result<(Vec<I>, ChunkStatus), BatchError> {
733 debug!("Start reading chunk");
734
735 let mut read_items = Vec::with_capacity(self.chunk_size as usize);
736
737 loop {
738 let read_result = self.reader.read();
739
740 match read_result {
741 Ok(item) => {
742 match item {
743 Some(item) => {
744 read_items.push(item);
745 step_execution.read_count += 1;
746
747 if read_items.len() >= self.chunk_size as usize {
748 return Ok((read_items, ChunkStatus::Full));
749 }
750 }
751 None => {
752 if read_items.is_empty() {
753 return Ok((read_items, ChunkStatus::Finished));
754 } else {
755 return Ok((read_items, ChunkStatus::Full));
756 }
757 }
758 };
759 }
760 Err(error) => {
761 warn!("Error reading item: {}", error);
762 step_execution.read_error_count += 1;
763
764 if self.is_skip_limit_reached(step_execution) {
765 step_execution.status = StepStatus::ReadError;
767 return Err(error);
768 }
769 }
770 }
771 }
772 }
773
774 fn process_chunk(
786 &self,
787 step_execution: &mut StepExecution,
788 read_items: &[I],
789 ) -> Result<Vec<O>, BatchError> {
790 debug!("Processing chunk of {} items", read_items.len());
791 let mut result = Vec::with_capacity(read_items.len());
792
793 for item in read_items {
794 match self.processor.process(item) {
795 Ok(Some(processed_item)) => {
796 result.push(processed_item);
797 step_execution.process_count += 1;
798 }
799 Ok(None) => {
800 step_execution.filter_count += 1;
801 debug!("Item filtered by processor");
802 }
803 Err(error) => {
804 warn!("Error processing item: {}", error);
805 step_execution.process_error_count += 1;
806
807 if self.is_skip_limit_reached(step_execution) {
808 step_execution.status = StepStatus::ProcessorError;
810 return Err(error);
811 }
812 }
813 }
814 }
815
816 Ok(result)
817 }
818
819 fn write_chunk(
831 &self,
832 step_execution: &mut StepExecution,
833 processed_items: &[O],
834 ) -> Result<(), BatchError> {
835 debug!("Writing chunk of {} items", processed_items.len());
836
837 if processed_items.is_empty() {
838 debug!("No items to write, skipping write call");
839 return Ok(());
840 }
841
842 match self.writer.write(processed_items) {
843 Ok(()) => {
844 step_execution.write_count += processed_items.len();
845 Self::manage_error(self.writer.flush());
846 Ok(())
847 }
848 Err(error) => {
849 warn!("Error writing items: {}", error);
850 step_execution.write_error_count += processed_items.len();
851
852 if self.is_skip_limit_reached(step_execution) {
853 step_execution.status = StepStatus::WriteError;
855 return Err(error);
856 }
857 Ok(())
858 }
859 }
860 }
861
862 fn is_skip_limit_reached(&self, step_execution: &StepExecution) -> bool {
863 step_execution.read_error_count
864 + step_execution.write_error_count
865 + step_execution.process_error_count
866 > self.skip_limit.into()
867 }
868 fn manage_error(result: Result<(), BatchError>) {
876 if let Err(error) = result {
877 warn!("Non-fatal error: {}", error);
878 }
879 }
880}
881
882pub struct ChunkOrientedStepBuilder<'a, I, O> {
926 name: String,
928 reader: Option<&'a dyn ItemReader<I>>,
930 processor: Option<&'a dyn ItemProcessor<I, O>>,
932 writer: Option<&'a dyn ItemWriter<O>>,
934 chunk_size: u16,
936 skip_limit: u16,
938}
939
940impl<'a, I, O> ChunkOrientedStepBuilder<'a, I, O> {
941 pub fn new(name: &str) -> Self {
958 Self {
959 name: name.to_string(),
960 reader: None,
961 processor: None,
962 writer: None,
963 chunk_size: 10,
964 skip_limit: 0,
965 }
966 }
967
968 pub fn reader(mut self, reader: &'a dyn ItemReader<I>) -> Self {
991 self.reader = Some(reader);
992 self
993 }
994
995 pub fn processor(mut self, processor: &'a dyn ItemProcessor<I, O>) -> Self {
1024 self.processor = Some(processor);
1025 self
1026 }
1027
1028 pub fn writer(mut self, writer: &'a dyn ItemWriter<O>) -> Self {
1066 self.writer = Some(writer);
1067 self
1068 }
1069
1070 pub fn chunk_size(mut self, chunk_size: u16) -> Self {
1088 self.chunk_size = chunk_size;
1089 self
1090 }
1091
1092 pub fn skip_limit(mut self, skip_limit: u16) -> Self {
1109 self.skip_limit = skip_limit;
1110 self
1111 }
1112
1113 pub fn build(self) -> ChunkOrientedStep<'a, I, O> {
1152 ChunkOrientedStep {
1153 name: self.name,
1154 reader: self.reader.expect("Reader is required for building a step"),
1155 processor: self
1156 .processor
1157 .expect("Processor is required for building a step"),
1158 writer: self.writer.expect("Writer is required for building a step"),
1159 chunk_size: self.chunk_size,
1160 skip_limit: self.skip_limit,
1161 }
1162 }
1163}
1164
1165pub struct StepBuilder {
1230 name: String,
1231}
1232
1233impl StepBuilder {
1234 pub fn new(name: &str) -> Self {
1247 Self {
1248 name: name.to_string(),
1249 }
1250 }
1251
1252 pub fn tasklet(self, tasklet: &dyn Tasklet) -> TaskletBuilder<'_> {
1277 TaskletBuilder::new(&self.name).tasklet(tasklet)
1278 }
1279
1280 pub fn chunk<'a, I, O>(self, chunk_size: u16) -> ChunkOrientedStepBuilder<'a, I, O> {
1321 ChunkOrientedStepBuilder::new(&self.name).chunk_size(chunk_size)
1322 }
1323}
1324
1325#[derive(Debug, PartialEq)]
1343pub enum ChunkStatus {
1344 Finished,
1350
1351 Full,
1357}
1358
1359#[derive(Debug, PartialEq, Clone, Copy)]
1386pub enum StepStatus {
1387 Success,
1393
1394 ReadError,
1400
1401 ProcessorError,
1407
1408 WriteError,
1414
1415 Starting,
1420
1421 Failed,
1425
1426 Started,
1430}
1431
1432#[cfg(test)]
1433mod tests {
1434 use anyhow::Result;
1435 use mockall::mock;
1436 use serde::{Deserialize, Serialize};
1437
1438 use crate::{
1439 core::{
1440 item::{
1441 ItemProcessor, ItemProcessorResult, ItemReader, ItemReaderResult, ItemWriter,
1442 ItemWriterResult,
1443 },
1444 step::{StepExecution, StepStatus},
1445 },
1446 BatchError,
1447 };
1448
1449 use super::{
1450 BatchStatus, ChunkOrientedStepBuilder, ChunkStatus, RepeatStatus, Step, StepBuilder,
1451 Tasklet, TaskletBuilder,
1452 };
1453
1454 mock! {
1455 pub TestItemReader {}
1456 impl ItemReader<Car> for TestItemReader {
1457 fn read(&self) -> ItemReaderResult<Car>;
1458 }
1459 }
1460
1461 mock! {
1462 pub TestProcessor {}
1463 impl ItemProcessor<Car, Car> for TestProcessor {
1464 fn process(&self, item: &Car) -> ItemProcessorResult<Car>;
1465 }
1466 }
1467
1468 mock! {
1469 pub TestItemWriter {}
1470 impl ItemWriter<Car> for TestItemWriter {
1471 fn write(&self, items: &[Car]) -> ItemWriterResult;
1472 fn flush(&self) -> ItemWriterResult;
1473 fn open(&self) -> ItemWriterResult;
1474 fn close(&self) -> ItemWriterResult;
1475 }
1476 }
1477
1478 mock! {
1479 pub TestTasklet {}
1480 impl Tasklet for TestTasklet {
1481 fn execute(&self, step_execution: &StepExecution) -> Result<RepeatStatus, BatchError>;
1482 }
1483 }
1484
1485 #[derive(Deserialize, Serialize, Debug, Clone)]
1486 struct Car {
1487 year: u16,
1488 make: String,
1489 model: String,
1490 description: String,
1491 }
1492
1493 fn mock_read(i: &mut u16, error_count: u16, end_count: u16) -> ItemReaderResult<Car> {
1494 if end_count > 0 && *i == end_count {
1495 return Ok(None);
1496 } else if error_count > 0 && *i == error_count {
1497 return Err(BatchError::ItemReader("mock read error".to_string()));
1498 }
1499
1500 let car = Car {
1501 year: 1979,
1502 make: "make".to_owned(),
1503 model: "model".to_owned(),
1504 description: "description".to_owned(),
1505 };
1506 *i += 1;
1507 Ok(Some(car))
1508 }
1509
1510 fn mock_process(i: &mut u16, error_at: &[u16]) -> ItemProcessorResult<Car> {
1511 *i += 1;
1512 if error_at.contains(i) {
1513 return Err(BatchError::ItemProcessor("mock process error".to_string()));
1514 }
1515
1516 let car = Car {
1517 year: 1979,
1518 make: "make".to_owned(),
1519 model: "model".to_owned(),
1520 description: "description".to_owned(),
1521 };
1522 Ok(Some(car))
1523 }
1524
1525 #[test]
1526 fn step_should_succeded_with_empty_data() -> Result<()> {
1527 let mut reader = MockTestItemReader::default();
1528 let reader_result = Ok(None);
1529 reader.expect_read().return_once(move || reader_result);
1530
1531 let mut processor = MockTestProcessor::default();
1532 processor.expect_process().never();
1533
1534 let mut writer = MockTestItemWriter::default();
1535 writer.expect_open().times(1).returning(|| Ok(()));
1536 writer.expect_write().never();
1537 writer.expect_close().times(1).returning(|| Ok(()));
1538
1539 let step = StepBuilder::new("test")
1540 .chunk(3)
1541 .reader(&reader)
1542 .processor(&processor)
1543 .writer(&writer)
1544 .build();
1545
1546 let mut step_execution = StepExecution::new(&step.name);
1547
1548 let result = step.execute(&mut step_execution);
1549
1550 assert!(result.is_ok());
1551 assert_eq!(step.get_name(), "test");
1552 assert!(!step.get_name().is_empty());
1553 assert!(!step_execution.id.is_nil());
1554 assert_eq!(step_execution.status, StepStatus::Success);
1555
1556 Ok(())
1557 }
1558
1559 #[test]
1560 fn step_should_failed_with_processor_error() -> Result<()> {
1561 let mut i = 0;
1562 let mut reader = MockTestItemReader::default();
1563 reader
1564 .expect_read()
1565 .returning(move || mock_read(&mut i, 0, 4));
1566
1567 let mut processor = MockTestProcessor::default();
1568 let mut i = 0;
1569 processor
1570 .expect_process()
1571 .returning(move |_| mock_process(&mut i, &[2]));
1572
1573 let mut writer = MockTestItemWriter::default();
1574 writer.expect_open().times(1).returning(|| Ok(()));
1575 writer.expect_write().never();
1576 writer.expect_close().times(1).returning(|| Ok(()));
1577
1578 let step = StepBuilder::new("test")
1579 .chunk(3)
1580 .reader(&reader)
1581 .processor(&processor)
1582 .writer(&writer)
1583 .build();
1584
1585 let mut step_execution = StepExecution::new(&step.name);
1586
1587 let result = step.execute(&mut step_execution);
1588
1589 assert!(result.is_err());
1590 assert_eq!(step_execution.status, StepStatus::ProcessorError);
1591
1592 Ok(())
1593 }
1594
1595 #[test]
1596 fn step_should_failed_with_write_error() -> Result<()> {
1597 let mut i = 0;
1598 let mut reader = MockTestItemReader::default();
1599 reader
1600 .expect_read()
1601 .returning(move || mock_read(&mut i, 0, 4));
1602
1603 let mut processor = MockTestProcessor::default();
1604 let mut i = 0;
1605 processor
1606 .expect_process()
1607 .returning(move |_| mock_process(&mut i, &[]));
1608
1609 let mut writer = MockTestItemWriter::default();
1610 writer.expect_open().times(1).returning(|| Ok(()));
1611 let result = Err(BatchError::ItemWriter("mock write error".to_string()));
1612 writer.expect_write().return_once(move |_| result);
1613 writer.expect_close().times(1).returning(|| Ok(()));
1614
1615 let step = StepBuilder::new("test")
1616 .chunk(3)
1617 .reader(&reader)
1618 .processor(&processor)
1619 .writer(&writer)
1620 .build();
1621
1622 let mut step_execution = StepExecution::new(&step.name);
1623
1624 let result = step.execute(&mut step_execution);
1625
1626 assert!(result.is_err());
1627 assert_eq!(step_execution.status, StepStatus::WriteError);
1628
1629 Ok(())
1630 }
1631
1632 #[test]
1633 fn step_should_succeed_even_with_processor_error() -> Result<()> {
1634 let mut i = 0;
1635 let mut reader = MockTestItemReader::default();
1636 reader
1637 .expect_read()
1638 .returning(move || mock_read(&mut i, 0, 4));
1639
1640 let mut processor = MockTestProcessor::default();
1641 let mut i = 0;
1642 processor
1643 .expect_process()
1644 .returning(move |_| mock_process(&mut i, &[2]));
1645
1646 let mut writer = MockTestItemWriter::default();
1647 writer.expect_open().times(1).returning(|| Ok(()));
1648 writer.expect_write().times(2).returning(|_| Ok(()));
1649 writer.expect_flush().times(2).returning(|| Ok(()));
1650 writer.expect_close().times(1).returning(|| Ok(()));
1651
1652 let step = StepBuilder::new("test")
1653 .chunk(3)
1654 .reader(&reader)
1655 .processor(&processor)
1656 .writer(&writer)
1657 .skip_limit(1)
1658 .build();
1659
1660 let mut step_execution = StepExecution::new(step.get_name());
1661
1662 let result = step.execute(&mut step_execution);
1663
1664 assert!(result.is_ok());
1665 assert_eq!(step_execution.status, StepStatus::Success);
1666
1667 Ok(())
1668 }
1669
1670 #[test]
1671 fn step_should_fail_with_read_error() -> Result<()> {
1672 let mut i = 0;
1673 let mut reader = MockTestItemReader::default();
1674 reader
1675 .expect_read()
1676 .returning(move || mock_read(&mut i, 1, 4));
1677
1678 let mut processor = MockTestProcessor::default();
1679 let mut i = 0;
1680 processor
1681 .expect_process()
1682 .returning(move |_| mock_process(&mut i, &[]));
1683
1684 let mut writer = MockTestItemWriter::default();
1685 writer.expect_open().times(1).returning(|| Ok(()));
1686 writer.expect_write().never();
1687 writer.expect_close().times(1).returning(|| Ok(()));
1688
1689 let step = StepBuilder::new("test")
1690 .chunk(3)
1691 .reader(&reader)
1692 .processor(&processor)
1693 .writer(&writer)
1694 .build();
1695
1696 let mut step_execution = StepExecution::new(&step.name);
1697
1698 let result = step.execute(&mut step_execution);
1699
1700 assert!(result.is_err());
1701 assert_eq!(step_execution.status, StepStatus::ReadError);
1702 assert_eq!(step_execution.read_error_count, 1);
1703
1704 Ok(())
1705 }
1706
1707 #[test]
1708 fn step_should_respect_chunk_size() -> Result<()> {
1709 let mut i = 0;
1710 let mut reader = MockTestItemReader::default();
1711 reader
1712 .expect_read()
1713 .returning(move || mock_read(&mut i, 0, 6));
1714
1715 let mut processor = MockTestProcessor::default();
1716 let mut i = 0;
1717 processor
1718 .expect_process()
1719 .returning(move |_| mock_process(&mut i, &[]));
1720
1721 let mut writer = MockTestItemWriter::default();
1722 writer.expect_open().times(1).returning(|| Ok(()));
1723 writer.expect_write().times(2).returning(|_| Ok(()));
1724 writer.expect_flush().times(2).returning(|| Ok(()));
1725 writer.expect_close().times(1).returning(|| Ok(()));
1726
1727 let step = StepBuilder::new("test")
1728 .chunk(3)
1729 .reader(&reader)
1730 .processor(&processor)
1731 .writer(&writer)
1732 .build();
1733
1734 let mut step_execution = StepExecution::new(&step.name);
1735
1736 let result = step.execute(&mut step_execution);
1737
1738 assert!(result.is_ok());
1739 assert_eq!(step_execution.status, StepStatus::Success);
1740 assert_eq!(step_execution.read_count, 6);
1741 assert_eq!(step_execution.write_count, 6);
1742
1743 Ok(())
1744 }
1745
1746 #[test]
1747 fn step_should_track_error_counts() -> Result<()> {
1748 let mut i = 0;
1749 let mut reader = MockTestItemReader::default();
1750 reader
1751 .expect_read()
1752 .returning(move || mock_read(&mut i, 0, 4));
1753
1754 let mut processor = MockTestProcessor::default();
1755 let mut i = 0;
1756 processor
1757 .expect_process()
1758 .returning(move |_| mock_process(&mut i, &[1, 2]));
1759
1760 let mut writer = MockTestItemWriter::default();
1761 writer.expect_open().times(1).returning(|| Ok(()));
1762 writer.expect_write().times(2).returning(|_| Ok(()));
1763 writer.expect_flush().times(2).returning(|| Ok(()));
1764 writer.expect_close().times(1).returning(|| Ok(()));
1765
1766 let step = StepBuilder::new("test")
1767 .chunk(3)
1768 .reader(&reader)
1769 .processor(&processor)
1770 .writer(&writer)
1771 .skip_limit(2)
1772 .build();
1773
1774 let mut step_execution = StepExecution::new(&step.name);
1775
1776 let result = step.execute(&mut step_execution);
1777
1778 assert!(result.is_ok());
1779 assert_eq!(step_execution.status, StepStatus::Success);
1780 assert_eq!(step_execution.process_error_count, 2);
1781
1782 Ok(())
1783 }
1784
1785 #[test]
1786 fn step_should_measure_execution_time() -> Result<()> {
1787 let mut i = 0;
1788 let mut reader = MockTestItemReader::default();
1789 reader
1790 .expect_read()
1791 .returning(move || mock_read(&mut i, 0, 2));
1792
1793 let mut processor = MockTestProcessor::default();
1794 let mut i = 0;
1795 processor
1796 .expect_process()
1797 .returning(move |_| mock_process(&mut i, &[]));
1798
1799 let mut writer = MockTestItemWriter::default();
1800 writer.expect_open().times(1).returning(|| Ok(()));
1801 writer.expect_write().times(1).returning(|_| Ok(()));
1802 writer.expect_flush().times(1).returning(|| Ok(()));
1803 writer.expect_close().times(1).returning(|| Ok(()));
1804
1805 let step = StepBuilder::new("test")
1806 .chunk(3)
1807 .reader(&reader)
1808 .processor(&processor)
1809 .writer(&writer)
1810 .build();
1811
1812 let mut step_execution = StepExecution::new(&step.name);
1813
1814 let result = step.execute(&mut step_execution);
1815
1816 assert!(result.is_ok());
1817 assert!(step_execution.duration.unwrap().as_nanos() > 0);
1818 assert!(step_execution.start_time.unwrap() <= step_execution.end_time.unwrap());
1819
1820 Ok(())
1821 }
1822
1823 #[test]
1824 fn step_should_handle_empty_chunk_at_end() -> Result<()> {
1825 let mut i = 0;
1826 let mut reader = MockTestItemReader::default();
1827 reader
1828 .expect_read()
1829 .returning(move || mock_read(&mut i, 0, 1));
1830
1831 let mut processor = MockTestProcessor::default();
1832 let mut i = 0;
1833 processor
1834 .expect_process()
1835 .returning(move |_| mock_process(&mut i, &[]));
1836
1837 let mut writer = MockTestItemWriter::default();
1838 writer.expect_open().times(1).returning(|| Ok(()));
1839 writer.expect_write().times(1).returning(|items| {
1840 assert_eq!(items.len(), 1); Ok(())
1842 });
1843 writer.expect_flush().times(1).returning(|| Ok(()));
1844 writer.expect_close().times(1).returning(|| Ok(()));
1845
1846 let step = StepBuilder::new("test")
1847 .chunk(3)
1848 .reader(&reader)
1849 .processor(&processor)
1850 .writer(&writer)
1851 .build();
1852
1853 let mut step_execution = StepExecution::new(&step.name);
1854
1855 let result = step.execute(&mut step_execution);
1856
1857 assert!(result.is_ok());
1858 assert_eq!(step_execution.status, StepStatus::Success);
1859 assert_eq!(step_execution.read_count, 1);
1860 assert_eq!(step_execution.write_count, 1);
1861
1862 Ok(())
1863 }
1864
1865 #[test]
1866 fn step_execution_should_initialize_correctly() -> Result<()> {
1867 let step_execution = StepExecution::new("test_step");
1868
1869 assert_eq!(step_execution.name, "test_step");
1870 assert_eq!(step_execution.status, StepStatus::Starting);
1871 assert!(step_execution.start_time.is_none());
1872 assert!(step_execution.end_time.is_none());
1873 assert!(step_execution.duration.is_none());
1874 assert_eq!(step_execution.read_count, 0);
1875 assert_eq!(step_execution.write_count, 0);
1876 assert_eq!(step_execution.read_error_count, 0);
1877 assert_eq!(step_execution.process_count, 0);
1878 assert_eq!(step_execution.process_error_count, 0);
1879 assert_eq!(step_execution.write_error_count, 0);
1880 assert!(!step_execution.id.is_nil());
1881
1882 Ok(())
1883 }
1884
1885 #[test]
1886 fn tasklet_step_should_execute_successfully() -> Result<()> {
1887 let mut tasklet = MockTestTasklet::default();
1888 tasklet
1889 .expect_execute()
1890 .times(1)
1891 .returning(|_| Ok(RepeatStatus::Finished));
1892
1893 let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
1894
1895 let mut step_execution = StepExecution::new(&step.name);
1896
1897 let result = step.execute(&mut step_execution);
1898
1899 assert!(result.is_ok());
1900 assert_eq!(step.get_name(), "tasklet_test");
1901
1902 Ok(())
1903 }
1904
1905 #[test]
1906 fn tasklet_step_should_handle_tasklet_error() -> Result<()> {
1907 let mut tasklet = MockTestTasklet::default();
1908 tasklet
1909 .expect_execute()
1910 .times(1)
1911 .returning(|_| Err(BatchError::Step("tasklet error".to_string())));
1912
1913 let step = StepBuilder::new("tasklet_test").tasklet(&tasklet).build();
1914
1915 let mut step_execution = StepExecution::new(&step.name);
1916
1917 let result = step.execute(&mut step_execution);
1918
1919 assert!(result.is_err());
1921 if let Err(BatchError::Step(msg)) = result {
1922 assert_eq!(msg, "tasklet error");
1923 } else {
1924 panic!("Expected Step error");
1925 }
1926
1927 Ok(())
1928 }
1929
1930 #[test]
1931 fn tasklet_step_should_handle_continuable_status() -> Result<()> {
1932 use std::cell::Cell;
1933
1934 let call_count = Cell::new(0);
1935 let mut tasklet = MockTestTasklet::default();
1936 tasklet.expect_execute().times(4).returning(move |_| {
1937 let count = call_count.get();
1938 call_count.set(count + 1);
1939 if count < 3 {
1940 Ok(RepeatStatus::Continuable)
1941 } else {
1942 Ok(RepeatStatus::Finished)
1943 }
1944 });
1945
1946 let step = StepBuilder::new("continuable_tasklet_test")
1947 .tasklet(&tasklet)
1948 .build();
1949
1950 let mut step_execution = StepExecution::new(&step.name);
1951
1952 let result = step.execute(&mut step_execution);
1953
1954 assert!(result.is_ok());
1955 assert_eq!(step.get_name(), "continuable_tasklet_test");
1956
1957 Ok(())
1958 }
1959
1960 #[test]
1961 fn tasklet_step_should_handle_multiple_continuable_cycles() -> Result<()> {
1962 use std::cell::Cell;
1963
1964 let call_count = Cell::new(0);
1965 let mut tasklet = MockTestTasklet::default();
1966
1967 tasklet.expect_execute().times(6).returning(move |_| {
1969 let count = call_count.get();
1970 call_count.set(count + 1);
1971 if count < 5 {
1972 Ok(RepeatStatus::Continuable)
1973 } else {
1974 Ok(RepeatStatus::Finished)
1975 }
1976 });
1977
1978 let step = StepBuilder::new("multi_cycle_tasklet_test")
1979 .tasklet(&tasklet)
1980 .build();
1981
1982 let mut step_execution = StepExecution::new(&step.name);
1983
1984 let result = step.execute(&mut step_execution);
1985
1986 assert!(result.is_ok());
1987 assert_eq!(step.get_name(), "multi_cycle_tasklet_test");
1988
1989 Ok(())
1990 }
1991
1992 #[test]
1993 fn tasklet_step_should_handle_error_after_continuable() -> Result<()> {
1994 use std::cell::Cell;
1995
1996 let call_count = Cell::new(0);
1997 let mut tasklet = MockTestTasklet::default();
1998
1999 tasklet.expect_execute().times(3).returning(move |_| {
2001 let count = call_count.get();
2002 call_count.set(count + 1);
2003 if count < 2 {
2004 Ok(RepeatStatus::Continuable)
2005 } else {
2006 Err(BatchError::Step("error after continuable".to_string()))
2007 }
2008 });
2009
2010 let step = StepBuilder::new("error_after_continuable_test")
2011 .tasklet(&tasklet)
2012 .build();
2013
2014 let mut step_execution = StepExecution::new(&step.name);
2015
2016 let result = step.execute(&mut step_execution);
2017
2018 assert!(result.is_err());
2019 if let Err(BatchError::Step(msg)) = result {
2020 assert_eq!(msg, "error after continuable");
2021 } else {
2022 panic!("Expected Step error");
2023 }
2024
2025 Ok(())
2026 }
2027
2028 #[test]
2029 fn tasklet_step_should_handle_immediate_finished_status() -> Result<()> {
2030 let mut tasklet = MockTestTasklet::default();
2031 tasklet
2032 .expect_execute()
2033 .times(1)
2034 .returning(|_| Ok(RepeatStatus::Finished));
2035
2036 let step = StepBuilder::new("immediate_finished_test")
2037 .tasklet(&tasklet)
2038 .build();
2039
2040 let mut step_execution = StepExecution::new(&step.name);
2041
2042 let result = step.execute(&mut step_execution);
2043
2044 assert!(result.is_ok());
2045 assert_eq!(step.get_name(), "immediate_finished_test");
2046
2047 Ok(())
2048 }
2049
2050 #[test]
2051 fn tasklet_step_should_access_step_execution_context() -> Result<()> {
2052 let mut tasklet = MockTestTasklet::default();
2053 tasklet
2054 .expect_execute()
2055 .times(1)
2056 .withf(|step_execution| {
2057 step_execution.name == "context_test"
2059 && step_execution.status == StepStatus::Started
2060 })
2061 .returning(|_| Ok(RepeatStatus::Finished));
2062
2063 let step = StepBuilder::new("context_test").tasklet(&tasklet).build();
2064
2065 let mut step_execution = StepExecution::new(&step.name);
2066
2067 let result = step.execute(&mut step_execution);
2068
2069 assert!(result.is_ok());
2070
2071 Ok(())
2072 }
2073
2074 #[test]
2075 fn tasklet_builder_should_create_valid_tasklet_step() -> Result<()> {
2076 let mut tasklet = MockTestTasklet::default();
2077 tasklet
2078 .expect_execute()
2079 .times(1)
2080 .returning(|_| Ok(RepeatStatus::Finished));
2081
2082 let step = TaskletBuilder::new("builder_test")
2083 .tasklet(&tasklet)
2084 .build();
2085
2086 let mut step_execution = StepExecution::new(&step.name);
2087
2088 let result = step.execute(&mut step_execution);
2089
2090 assert!(result.is_ok());
2091 assert_eq!(step.get_name(), "builder_test");
2092
2093 Ok(())
2094 }
2095
2096 #[test]
2097 fn tasklet_builder_should_panic_without_tasklet() {
2098 let result = std::panic::catch_unwind(|| TaskletBuilder::new("test").build());
2099
2100 assert!(result.is_err());
2101 }
2102
2103 #[test]
2104 fn step_should_handle_writer_open_error() -> Result<()> {
2105 let mut reader = MockTestItemReader::default();
2106 let reader_result = Ok(None);
2107 reader.expect_read().return_once(move || reader_result);
2108
2109 let mut processor = MockTestProcessor::default();
2110 processor.expect_process().never();
2111
2112 let mut writer = MockTestItemWriter::default();
2113 writer
2114 .expect_open()
2115 .times(1)
2116 .returning(|| Err(BatchError::ItemWriter("open error".to_string())));
2117 writer.expect_close().times(1).returning(|| Ok(()));
2118
2119 let step = StepBuilder::new("test")
2120 .chunk(3)
2121 .reader(&reader)
2122 .processor(&processor)
2123 .writer(&writer)
2124 .build();
2125
2126 let mut step_execution = StepExecution::new(&step.name);
2127
2128 let result = step.execute(&mut step_execution);
2129
2130 assert!(result.is_ok());
2132 assert_eq!(step_execution.status, StepStatus::Success);
2133
2134 Ok(())
2135 }
2136
2137 #[test]
2138 fn step_should_handle_writer_close_error() -> Result<()> {
2139 let mut reader = MockTestItemReader::default();
2140 let reader_result = Ok(None);
2141 reader.expect_read().return_once(move || reader_result);
2142
2143 let mut processor = MockTestProcessor::default();
2144 processor.expect_process().never();
2145
2146 let mut writer = MockTestItemWriter::default();
2147 writer.expect_open().times(1).returning(|| Ok(()));
2148 writer.expect_write().never();
2149 writer
2150 .expect_close()
2151 .times(1)
2152 .returning(|| Err(BatchError::ItemWriter("close error".to_string())));
2153
2154 let step = StepBuilder::new("test")
2155 .chunk(3)
2156 .reader(&reader)
2157 .processor(&processor)
2158 .writer(&writer)
2159 .build();
2160
2161 let mut step_execution = StepExecution::new(&step.name);
2162
2163 let result = step.execute(&mut step_execution);
2164
2165 assert!(result.is_ok());
2167 assert_eq!(step_execution.status, StepStatus::Success);
2168
2169 Ok(())
2170 }
2171
2172 #[test]
2173 fn step_should_handle_writer_flush_error() -> Result<()> {
2174 let mut i = 0;
2175 let mut reader = MockTestItemReader::default();
2176 reader
2177 .expect_read()
2178 .returning(move || mock_read(&mut i, 0, 2));
2179
2180 let mut processor = MockTestProcessor::default();
2181 let mut i = 0;
2182 processor
2183 .expect_process()
2184 .returning(move |_| mock_process(&mut i, &[]));
2185
2186 let mut writer = MockTestItemWriter::default();
2187 writer.expect_open().times(1).returning(|| Ok(()));
2188 writer.expect_write().times(1).returning(|_| Ok(()));
2189 writer
2190 .expect_flush()
2191 .times(1)
2192 .returning(|| Err(BatchError::ItemWriter("flush error".to_string())));
2193 writer.expect_close().times(1).returning(|| Ok(()));
2194
2195 let step = StepBuilder::new("test")
2196 .chunk(3)
2197 .reader(&reader)
2198 .processor(&processor)
2199 .writer(&writer)
2200 .build();
2201
2202 let mut step_execution = StepExecution::new(&step.name);
2203
2204 let result = step.execute(&mut step_execution);
2205
2206 assert!(result.is_ok());
2208 assert_eq!(step_execution.status, StepStatus::Success);
2209
2210 Ok(())
2211 }
2212
2213 #[test]
2214 fn step_should_handle_multiple_chunks_with_exact_chunk_size() -> Result<()> {
2215 let mut i = 0;
2216 let mut reader = MockTestItemReader::default();
2217 reader
2218 .expect_read()
2219 .returning(move || mock_read(&mut i, 0, 6));
2220
2221 let mut processor = MockTestProcessor::default();
2222 let mut i = 0;
2223 processor
2224 .expect_process()
2225 .returning(move |_| mock_process(&mut i, &[]));
2226
2227 let mut writer = MockTestItemWriter::default();
2228 writer.expect_open().times(1).returning(|| Ok(()));
2229 writer.expect_write().times(2).returning(|items| {
2230 assert_eq!(items.len(), 3); Ok(())
2232 });
2233 writer.expect_flush().times(2).returning(|| Ok(()));
2234 writer.expect_close().times(1).returning(|| Ok(()));
2235
2236 let step = StepBuilder::new("test")
2237 .chunk(3)
2238 .reader(&reader)
2239 .processor(&processor)
2240 .writer(&writer)
2241 .build();
2242
2243 let mut step_execution = StepExecution::new(&step.name);
2244
2245 let result = step.execute(&mut step_execution);
2246
2247 assert!(result.is_ok());
2248 assert_eq!(step_execution.status, StepStatus::Success);
2249 assert_eq!(step_execution.read_count, 6);
2250 assert_eq!(step_execution.write_count, 6);
2251
2252 Ok(())
2253 }
2254
2255 #[test]
2256 fn step_should_handle_skip_limit_boundary() -> Result<()> {
2257 let mut i = 0;
2258 let mut reader = MockTestItemReader::default();
2259 reader
2260 .expect_read()
2261 .returning(move || mock_read(&mut i, 0, 4));
2262
2263 let mut processor = MockTestProcessor::default();
2264 let mut i = 0;
2265 processor
2266 .expect_process()
2267 .returning(move |_| mock_process(&mut i, &[1, 2])); let mut writer = MockTestItemWriter::default();
2270 writer.expect_open().times(1).returning(|| Ok(()));
2271 writer.expect_write().times(2).returning(|_| Ok(()));
2272 writer.expect_flush().times(2).returning(|| Ok(()));
2273 writer.expect_close().times(1).returning(|| Ok(()));
2274
2275 let step = StepBuilder::new("test")
2276 .chunk(3)
2277 .reader(&reader)
2278 .processor(&processor)
2279 .writer(&writer)
2280 .skip_limit(2) .build();
2282
2283 let mut step_execution = StepExecution::new(&step.name);
2284
2285 let result = step.execute(&mut step_execution);
2286
2287 assert!(result.is_ok());
2288 assert_eq!(step_execution.status, StepStatus::Success);
2289 assert_eq!(step_execution.process_error_count, 2);
2290
2291 Ok(())
2292 }
2293
2294 #[test]
2295 fn step_should_fail_when_skip_limit_exceeded() -> Result<()> {
2296 let mut i = 0;
2297 let mut reader = MockTestItemReader::default();
2298 reader
2299 .expect_read()
2300 .returning(move || mock_read(&mut i, 0, 4));
2301
2302 let mut processor = MockTestProcessor::default();
2303 let mut i = 0;
2304 processor
2305 .expect_process()
2306 .returning(move |_| mock_process(&mut i, &[1, 2, 3])); let mut writer = MockTestItemWriter::default();
2309 writer.expect_open().times(1).returning(|| Ok(()));
2310 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
2312
2313 let step = StepBuilder::new("test")
2314 .chunk(3)
2315 .reader(&reader)
2316 .processor(&processor)
2317 .writer(&writer)
2318 .skip_limit(2) .build();
2320
2321 let mut step_execution = StepExecution::new(&step.name);
2322
2323 let result = step.execute(&mut step_execution);
2324
2325 assert!(result.is_err());
2326 assert_eq!(step_execution.status, StepStatus::ProcessorError);
2327 assert_eq!(step_execution.process_error_count, 3);
2328
2329 Ok(())
2330 }
2331
2332 #[test]
2333 fn step_should_handle_empty_processed_chunk() -> Result<()> {
2334 let mut i = 0;
2335 let mut reader = MockTestItemReader::default();
2336 reader
2337 .expect_read()
2338 .returning(move || mock_read(&mut i, 0, 3));
2339
2340 let mut processor = MockTestProcessor::default();
2341 let mut i = 0;
2342 processor
2343 .expect_process()
2344 .returning(move |_| mock_process(&mut i, &[1, 2, 3, 4])); let mut writer = MockTestItemWriter::default();
2347 writer.expect_open().times(1).returning(|| Ok(()));
2348 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
2350
2351 let step = StepBuilder::new("test")
2352 .chunk(3)
2353 .reader(&reader)
2354 .processor(&processor)
2355 .writer(&writer)
2356 .skip_limit(3) .build();
2358
2359 let mut step_execution = StepExecution::new(&step.name);
2360
2361 let result = step.execute(&mut step_execution);
2362
2363 assert!(result.is_ok());
2364 assert_eq!(step_execution.status, StepStatus::Success);
2365 assert_eq!(step_execution.process_error_count, 3);
2366 assert_eq!(step_execution.write_count, 0); Ok(())
2369 }
2370
2371 #[test]
2372 fn chunk_status_should_be_comparable() {
2373 assert_eq!(ChunkStatus::Finished, ChunkStatus::Finished);
2374 assert_eq!(ChunkStatus::Full, ChunkStatus::Full);
2375 assert_ne!(ChunkStatus::Finished, ChunkStatus::Full);
2376 }
2377
2378 #[test]
2379 fn step_status_should_be_comparable() {
2380 assert_eq!(StepStatus::Success, StepStatus::Success);
2381 assert_eq!(StepStatus::ReadError, StepStatus::ReadError);
2382 assert_eq!(StepStatus::ProcessorError, StepStatus::ProcessorError);
2383 assert_eq!(StepStatus::WriteError, StepStatus::WriteError);
2384 assert_eq!(StepStatus::Starting, StepStatus::Starting);
2385
2386 assert_ne!(StepStatus::Success, StepStatus::ReadError);
2387 assert_ne!(StepStatus::ProcessorError, StepStatus::WriteError);
2388 }
2389
2390 #[test]
2391 fn repeat_status_should_be_comparable() {
2392 assert_eq!(RepeatStatus::Continuable, RepeatStatus::Continuable);
2393 assert_eq!(RepeatStatus::Finished, RepeatStatus::Finished);
2394 assert_ne!(RepeatStatus::Continuable, RepeatStatus::Finished);
2395 }
2396
2397 #[test]
2398 fn step_builder_should_create_chunk_oriented_step() -> Result<()> {
2399 let mut reader = MockTestItemReader::default();
2400 reader.expect_read().return_once(|| Ok(None));
2401
2402 let mut processor = MockTestProcessor::default();
2403 processor.expect_process().never();
2404
2405 let mut writer = MockTestItemWriter::default();
2406 writer.expect_open().times(1).returning(|| Ok(()));
2407 writer.expect_write().never();
2408 writer.expect_close().times(1).returning(|| Ok(()));
2409
2410 let step = StepBuilder::new("builder_test")
2411 .chunk(5)
2412 .reader(&reader)
2413 .processor(&processor)
2414 .writer(&writer)
2415 .skip_limit(10)
2416 .build();
2417
2418 let mut step_execution = StepExecution::new(&step.name);
2419 let result = step.execute(&mut step_execution);
2420
2421 assert!(result.is_ok());
2422 assert_eq!(step.get_name(), "builder_test");
2423
2424 Ok(())
2425 }
2426
2427 #[test]
2428 fn step_should_handle_large_chunk_size() -> Result<()> {
2429 let mut i = 0;
2430 let mut reader = MockTestItemReader::default();
2431 reader
2432 .expect_read()
2433 .returning(move || mock_read(&mut i, 0, 5));
2434
2435 let mut processor = MockTestProcessor::default();
2436 let mut i = 0;
2437 processor
2438 .expect_process()
2439 .returning(move |_| mock_process(&mut i, &[]));
2440
2441 let mut writer = MockTestItemWriter::default();
2442 writer.expect_open().times(1).returning(|| Ok(()));
2443 writer.expect_write().times(1).returning(|items| {
2444 assert_eq!(items.len(), 5); Ok(())
2446 });
2447 writer.expect_flush().times(1).returning(|| Ok(()));
2448 writer.expect_close().times(1).returning(|| Ok(()));
2449
2450 let step = StepBuilder::new("test")
2451 .chunk(100) .reader(&reader)
2453 .processor(&processor)
2454 .writer(&writer)
2455 .build();
2456
2457 let mut step_execution = StepExecution::new(&step.name);
2458
2459 let result = step.execute(&mut step_execution);
2460
2461 assert!(result.is_ok());
2462 assert_eq!(step_execution.status, StepStatus::Success);
2463 assert_eq!(step_execution.read_count, 5);
2464 assert_eq!(step_execution.write_count, 5);
2465
2466 Ok(())
2467 }
2468
2469 #[test]
2470 fn step_should_handle_mixed_errors_within_skip_limit() -> Result<()> {
2471 use std::cell::Cell;
2472
2473 let read_counter = Cell::new(0u16);
2474 let mut reader = MockTestItemReader::default();
2475 reader.expect_read().returning(move || {
2476 let current = read_counter.get();
2477 if current == 2 {
2478 read_counter.set(current + 1);
2479 Err(BatchError::ItemReader("read error".to_string()))
2480 } else {
2481 let mut i = current;
2482 let result = mock_read(&mut i, 0, 6);
2483 read_counter.set(i);
2484 result
2485 }
2486 });
2487
2488 let mut processor = MockTestProcessor::default();
2489 let mut i = 0;
2490 processor
2491 .expect_process()
2492 .returning(move |_| mock_process(&mut i, &[2])); let mut writer = MockTestItemWriter::default();
2495 writer.expect_open().times(1).returning(|| Ok(()));
2496 writer.expect_write().times(2).returning(|_| Ok(()));
2497 writer.expect_flush().times(2).returning(|| Ok(()));
2498 writer.expect_close().times(1).returning(|| Ok(()));
2499
2500 let step = StepBuilder::new("test")
2501 .chunk(3)
2502 .reader(&reader)
2503 .processor(&processor)
2504 .writer(&writer)
2505 .skip_limit(2) .build();
2507
2508 let mut step_execution = StepExecution::new(&step.name);
2509
2510 let result = step.execute(&mut step_execution);
2511
2512 assert!(result.is_ok());
2513 assert_eq!(step_execution.status, StepStatus::Success);
2514 assert_eq!(step_execution.read_error_count, 1);
2515 assert_eq!(step_execution.process_error_count, 1);
2516
2517 Ok(())
2518 }
2519
2520 #[test]
2521 fn step_execution_should_be_cloneable() -> Result<()> {
2522 let step_execution = StepExecution::new("test_step");
2523 let cloned_execution = step_execution.clone();
2524
2525 assert_eq!(step_execution.id, cloned_execution.id);
2526 assert_eq!(step_execution.name, cloned_execution.name);
2527 assert_eq!(step_execution.status, cloned_execution.status);
2528 assert_eq!(step_execution.read_count, cloned_execution.read_count);
2529 assert_eq!(step_execution.write_count, cloned_execution.write_count);
2530
2531 Ok(())
2532 }
2533
2534 #[test]
2535 fn step_should_handle_zero_chunk_size() -> Result<()> {
2536 let mut reader = MockTestItemReader::default();
2537 reader.expect_read().return_once(|| Ok(None));
2538
2539 let mut processor = MockTestProcessor::default();
2540 processor.expect_process().never();
2541
2542 let mut writer = MockTestItemWriter::default();
2543 writer.expect_open().times(1).returning(|| Ok(()));
2544 writer.expect_write().never();
2545 writer.expect_close().times(1).returning(|| Ok(()));
2546
2547 let step = StepBuilder::new("test")
2549 .chunk(1)
2550 .reader(&reader)
2551 .processor(&processor)
2552 .writer(&writer)
2553 .build();
2554
2555 let mut step_execution = StepExecution::new(&step.name);
2556
2557 let result = step.execute(&mut step_execution);
2558
2559 assert!(result.is_ok());
2560 assert_eq!(step_execution.status, StepStatus::Success);
2561
2562 Ok(())
2563 }
2564
2565 #[test]
2566 fn step_should_handle_continuous_read_errors_until_skip_limit() -> Result<()> {
2567 use std::cell::Cell;
2568
2569 let counter = Cell::new(0u16);
2570 let mut reader = MockTestItemReader::default();
2571 reader.expect_read().returning(move || {
2572 let current = counter.get();
2573 counter.set(current + 1);
2574 if current < 3 {
2575 Err(BatchError::ItemReader("continuous read error".to_string()))
2576 } else {
2577 Ok(None) }
2579 });
2580
2581 let mut processor = MockTestProcessor::default();
2582 processor.expect_process().never();
2583
2584 let mut writer = MockTestItemWriter::default();
2585 writer.expect_open().times(1).returning(|| Ok(()));
2586 writer.expect_write().never();
2587 writer.expect_close().times(1).returning(|| Ok(()));
2588
2589 let step = StepBuilder::new("test")
2590 .chunk(3)
2591 .reader(&reader)
2592 .processor(&processor)
2593 .writer(&writer)
2594 .skip_limit(2) .build();
2596
2597 let mut step_execution = StepExecution::new(&step.name);
2598
2599 let result = step.execute(&mut step_execution);
2600
2601 assert!(result.is_err());
2602 assert_eq!(step_execution.status, StepStatus::ReadError);
2603 assert_eq!(step_execution.read_error_count, 3);
2604
2605 Ok(())
2606 }
2607
2608 #[test]
2609 fn step_should_handle_write_error_with_skip_limit() -> Result<()> {
2610 let mut i = 0;
2611 let mut reader = MockTestItemReader::default();
2612 reader
2613 .expect_read()
2614 .returning(move || mock_read(&mut i, 0, 4));
2615
2616 let mut processor = MockTestProcessor::default();
2617 let mut i = 0;
2618 processor
2619 .expect_process()
2620 .returning(move |_| mock_process(&mut i, &[]));
2621
2622 let mut writer = MockTestItemWriter::default();
2623 writer.expect_open().times(1).returning(|| Ok(()));
2624 writer
2625 .expect_write()
2626 .times(1)
2627 .returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
2628 writer.expect_close().times(1).returning(|| Ok(()));
2629
2630 let step = StepBuilder::new("test")
2631 .chunk(3)
2632 .reader(&reader)
2633 .processor(&processor)
2634 .writer(&writer)
2635 .skip_limit(0) .build();
2637
2638 let mut step_execution = StepExecution::new(&step.name);
2639
2640 let result = step.execute(&mut step_execution);
2641
2642 assert!(result.is_err());
2643 assert_eq!(step_execution.status, StepStatus::WriteError);
2644 assert_eq!(step_execution.write_error_count, 3); Ok(())
2647 }
2648
2649 #[test]
2650 fn step_should_succeed_when_write_error_within_skip_limit() -> Result<()> {
2651 let mut i = 0;
2652 let mut reader = MockTestItemReader::default();
2653 reader
2654 .expect_read()
2655 .returning(move || mock_read(&mut i, 0, 3));
2656
2657 let mut processor = MockTestProcessor::default();
2658 let mut i = 0;
2659 processor
2660 .expect_process()
2661 .returning(move |_| mock_process(&mut i, &[]));
2662
2663 let mut writer = MockTestItemWriter::default();
2664 writer.expect_open().times(1).returning(|| Ok(()));
2665 writer
2666 .expect_write()
2667 .times(1)
2668 .returning(|_| Err(BatchError::ItemWriter("write error".to_string())));
2669 writer.expect_close().times(1).returning(|| Ok(()));
2670
2671 let step = StepBuilder::new("test")
2672 .chunk(3)
2673 .reader(&reader)
2674 .processor(&processor)
2675 .writer(&writer)
2676 .skip_limit(3) .build();
2678
2679 let mut step_execution = StepExecution::new(&step.name);
2680
2681 let result = step.execute(&mut step_execution);
2682
2683 assert!(result.is_ok());
2684 assert_eq!(step_execution.status, StepStatus::Success);
2685 assert_eq!(step_execution.write_error_count, 3);
2686 assert_eq!(step_execution.write_count, 0); Ok(())
2689 }
2690
2691 #[test]
2692 fn step_should_handle_partial_chunk_at_end() -> Result<()> {
2693 let mut i = 0;
2694 let mut reader = MockTestItemReader::default();
2695 reader
2696 .expect_read()
2697 .returning(move || mock_read(&mut i, 0, 2)); let mut processor = MockTestProcessor::default();
2700 let mut i = 0;
2701 processor
2702 .expect_process()
2703 .returning(move |_| mock_process(&mut i, &[]));
2704
2705 let mut writer = MockTestItemWriter::default();
2706 writer.expect_open().times(1).returning(|| Ok(()));
2707 writer.expect_write().times(1).returning(|items| {
2708 assert_eq!(items.len(), 2); Ok(())
2710 });
2711 writer.expect_flush().times(1).returning(|| Ok(()));
2712 writer.expect_close().times(1).returning(|| Ok(()));
2713
2714 let step = StepBuilder::new("test")
2715 .chunk(3)
2716 .reader(&reader)
2717 .processor(&processor)
2718 .writer(&writer)
2719 .build();
2720
2721 let mut step_execution = StepExecution::new(&step.name);
2722
2723 let result = step.execute(&mut step_execution);
2724
2725 assert!(result.is_ok());
2726 assert_eq!(step_execution.status, StepStatus::Success);
2727 assert_eq!(step_execution.read_count, 2);
2728 assert_eq!(step_execution.write_count, 2);
2729
2730 Ok(())
2731 }
2732
2733 #[test]
2734 fn batch_status_should_have_all_variants() {
2735 let _completed = BatchStatus::COMPLETED;
2737 let _starting = BatchStatus::STARTING;
2738 let _started = BatchStatus::STARTED;
2739 let _stopping = BatchStatus::STOPPING;
2740 let _stopped = BatchStatus::STOPPED;
2741 let _failed = BatchStatus::FAILED;
2742 let _abandoned = BatchStatus::ABANDONED;
2743 let _unknown = BatchStatus::UNKNOWN;
2744 }
2745
2746 #[test]
2747 fn tasklet_builder_should_require_tasklet() {
2748 let mut tasklet = MockTestTasklet::default();
2749 tasklet.expect_execute().never();
2750
2751 let builder = TaskletBuilder::new("test").tasklet(&tasklet);
2754 let _step = builder.build(); }
2756
2757 #[test]
2758 fn chunk_oriented_step_builder_should_require_all_components() -> Result<()> {
2759 let mut reader = MockTestItemReader::default();
2760 reader.expect_read().return_once(|| Ok(None));
2761
2762 let mut processor = MockTestProcessor::default();
2763 processor.expect_process().never();
2764
2765 let mut writer = MockTestItemWriter::default();
2766 writer.expect_open().times(1).returning(|| Ok(()));
2767 writer.expect_write().never();
2768 writer.expect_close().times(1).returning(|| Ok(()));
2769
2770 let step = ChunkOrientedStepBuilder::new("test")
2772 .reader(&reader)
2773 .processor(&processor)
2774 .writer(&writer)
2775 .chunk_size(10)
2776 .skip_limit(5)
2777 .build();
2778
2779 let mut step_execution = StepExecution::new(&step.name);
2780 let result = step.execute(&mut step_execution);
2781
2782 assert!(result.is_ok());
2783 assert_eq!(step.get_name(), "test");
2784
2785 Ok(())
2786 }
2787
2788 #[test]
2789 fn step_should_handle_maximum_skip_limit() -> Result<()> {
2790 let mut i = 0;
2791 let mut reader = MockTestItemReader::default();
2792 reader
2793 .expect_read()
2794 .returning(move || mock_read(&mut i, 0, 3)); let mut processor = MockTestProcessor::default();
2797 let mut i = 0;
2798 processor
2799 .expect_process()
2800 .returning(move |_| mock_process(&mut i, &[1, 2, 3])); let mut writer = MockTestItemWriter::default();
2803 writer.expect_open().times(1).returning(|| Ok(()));
2804 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
2806
2807 let step = StepBuilder::new("test")
2808 .chunk(3)
2809 .reader(&reader)
2810 .processor(&processor)
2811 .writer(&writer)
2812 .skip_limit(u16::MAX) .build();
2814
2815 let mut step_execution = StepExecution::new(&step.name);
2816
2817 let result = step.execute(&mut step_execution);
2818
2819 assert!(result.is_ok());
2820 assert_eq!(step_execution.status, StepStatus::Success);
2821 assert_eq!(step_execution.process_error_count, 3);
2822
2823 Ok(())
2824 }
2825
2826 #[test]
2827 fn step_should_handle_tasklet_step_timing() -> Result<()> {
2828 let mut tasklet = MockTestTasklet::default();
2829 tasklet
2830 .expect_execute()
2831 .times(1)
2832 .returning(|_| Ok(RepeatStatus::Finished));
2833
2834 let step = StepBuilder::new("timing_test").tasklet(&tasklet).build();
2835
2836 let mut step_execution = StepExecution::new(&step.name);
2837
2838 let result = step.execute(&mut step_execution);
2839
2840 assert!(result.is_ok());
2841 assert!(step_execution.start_time.is_some());
2842 assert!(step_execution.end_time.is_some());
2843 assert!(step_execution.duration.is_some());
2844 assert!(step_execution.duration.unwrap().as_nanos() > 0);
2845
2846 Ok(())
2847 }
2848
2849 #[test]
2850 fn step_should_handle_tasklet_step_status_transitions() -> Result<()> {
2851 let mut tasklet = MockTestTasklet::default();
2852 tasklet
2853 .expect_execute()
2854 .times(1)
2855 .returning(|step_execution| {
2856 assert_eq!(step_execution.status, StepStatus::Started);
2858 Ok(RepeatStatus::Finished)
2859 });
2860
2861 let step = StepBuilder::new("status_test").tasklet(&tasklet).build();
2862
2863 let mut step_execution = StepExecution::new(&step.name);
2864 assert_eq!(step_execution.status, StepStatus::Starting);
2865
2866 let result = step.execute(&mut step_execution);
2867
2868 assert!(result.is_ok());
2869 assert_eq!(step_execution.status, StepStatus::Success);
2870
2871 Ok(())
2872 }
2873
2874 #[test]
2875 fn step_should_handle_tasklet_step_failed_status() -> Result<()> {
2876 let mut tasklet = MockTestTasklet::default();
2877 tasklet
2878 .expect_execute()
2879 .times(1)
2880 .returning(|_| Err(BatchError::Step("tasklet failure".to_string())));
2881
2882 let step = StepBuilder::new("failed_test").tasklet(&tasklet).build();
2883
2884 let mut step_execution = StepExecution::new(&step.name);
2885
2886 let result = step.execute(&mut step_execution);
2887
2888 assert!(result.is_err());
2889 assert_eq!(step_execution.status, StepStatus::Failed);
2890 assert!(step_execution.end_time.is_some());
2891 assert!(step_execution.duration.is_some());
2892
2893 Ok(())
2894 }
2895
2896 #[test]
2897 fn chunk_oriented_step_builder_should_panic_without_reader() {
2898 let mut processor = MockTestProcessor::default();
2899 processor.expect_process().never();
2900
2901 let mut writer = MockTestItemWriter::default();
2902 writer.expect_open().never();
2903
2904 let result = std::panic::catch_unwind(|| {
2905 ChunkOrientedStepBuilder::new("test")
2906 .processor(&processor)
2907 .writer(&writer)
2908 .build()
2909 });
2910
2911 assert!(result.is_err());
2912 }
2913
2914 #[test]
2915 fn chunk_oriented_step_builder_should_panic_without_processor() {
2916 let mut reader = MockTestItemReader::default();
2917 reader.expect_read().never();
2918
2919 let mut writer = MockTestItemWriter::default();
2920 writer.expect_open().never();
2921
2922 let result = std::panic::catch_unwind(|| {
2923 ChunkOrientedStepBuilder::new("test")
2924 .reader(&reader)
2925 .writer(&writer)
2926 .build()
2927 });
2928
2929 assert!(result.is_err());
2930 }
2931
2932 #[test]
2933 fn chunk_oriented_step_builder_should_panic_without_writer() {
2934 let mut reader = MockTestItemReader::default();
2935 reader.expect_read().never();
2936
2937 let mut processor = MockTestProcessor::default();
2938 processor.expect_process().never();
2939
2940 let result = std::panic::catch_unwind(|| {
2941 ChunkOrientedStepBuilder::new("test")
2942 .reader(&reader)
2943 .processor(&processor)
2944 .build()
2945 });
2946
2947 assert!(result.is_err());
2948 }
2949
2950 #[test]
2951 fn step_should_handle_read_chunk_with_full_chunk() -> Result<()> {
2952 let mut i = 0;
2953 let mut reader = MockTestItemReader::default();
2954 reader
2955 .expect_read()
2956 .returning(move || mock_read(&mut i, 0, 4)); let mut processor = MockTestProcessor::default();
2959 let mut i = 0;
2960 processor
2961 .expect_process()
2962 .returning(move |_| mock_process(&mut i, &[]));
2963
2964 let mut writer = MockTestItemWriter::default();
2965 writer.expect_open().times(1).returning(|| Ok(()));
2966 writer.expect_write().times(2).returning(|items| {
2967 assert!(items.len() <= 3);
2969 Ok(())
2970 });
2971 writer.expect_flush().times(2).returning(|| Ok(()));
2972 writer.expect_close().times(1).returning(|| Ok(()));
2973
2974 let step = StepBuilder::new("test")
2975 .chunk(3)
2976 .reader(&reader)
2977 .processor(&processor)
2978 .writer(&writer)
2979 .build();
2980
2981 let mut step_execution = StepExecution::new(&step.name);
2982
2983 let result = step.execute(&mut step_execution);
2984
2985 assert!(result.is_ok());
2986 assert_eq!(step_execution.status, StepStatus::Success);
2987 assert_eq!(step_execution.read_count, 4);
2988 assert_eq!(step_execution.write_count, 4);
2989
2990 Ok(())
2991 }
2992
2993 #[test]
2994 fn step_should_handle_process_chunk_with_all_errors() -> Result<()> {
2995 let mut i = 0;
2996 let mut reader = MockTestItemReader::default();
2997 reader
2998 .expect_read()
2999 .returning(move || mock_read(&mut i, 0, 3));
3000
3001 let mut processor = MockTestProcessor::default();
3002 let mut i = 0;
3003 processor
3004 .expect_process()
3005 .returning(move |_| mock_process(&mut i, &[1, 2, 3])); let mut writer = MockTestItemWriter::default();
3008 writer.expect_open().times(1).returning(|| Ok(()));
3009 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
3011
3012 let step = StepBuilder::new("test")
3013 .chunk(3)
3014 .reader(&reader)
3015 .processor(&processor)
3016 .writer(&writer)
3017 .skip_limit(5) .build();
3019
3020 let mut step_execution = StepExecution::new(&step.name);
3021
3022 let result = step.execute(&mut step_execution);
3023
3024 assert!(result.is_ok());
3025 assert_eq!(step_execution.status, StepStatus::Success);
3026 assert_eq!(step_execution.process_error_count, 3);
3027 assert_eq!(step_execution.write_count, 0);
3028
3029 Ok(())
3030 }
3031
3032 #[test]
3033 fn step_should_handle_write_chunk_with_empty_items() -> Result<()> {
3034 let mut reader = MockTestItemReader::default();
3035 reader.expect_read().return_once(|| Ok(None));
3036
3037 let mut processor = MockTestProcessor::default();
3038 processor.expect_process().never();
3039
3040 let mut writer = MockTestItemWriter::default();
3041 writer.expect_open().times(1).returning(|| Ok(()));
3042 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
3044
3045 let step = StepBuilder::new("test")
3046 .chunk(3)
3047 .reader(&reader)
3048 .processor(&processor)
3049 .writer(&writer)
3050 .build();
3051
3052 let mut step_execution = StepExecution::new(&step.name);
3053
3054 let result = step.execute(&mut step_execution);
3055
3056 assert!(result.is_ok());
3057 assert_eq!(step_execution.status, StepStatus::Success);
3058 assert_eq!(step_execution.read_count, 0);
3059 assert_eq!(step_execution.write_count, 0);
3060
3061 Ok(())
3062 }
3063
3064 #[test]
3065 fn step_should_handle_is_skip_limit_reached_boundary_conditions() -> Result<()> {
3066 let mut i = 0;
3067 let mut reader = MockTestItemReader::default();
3068 reader
3069 .expect_read()
3070 .returning(move || mock_read(&mut i, 0, 4));
3071
3072 let mut processor = MockTestProcessor::default();
3073 let mut i = 0;
3074 processor
3075 .expect_process()
3076 .returning(move |_| mock_process(&mut i, &[1, 2])); let mut writer = MockTestItemWriter::default();
3079 writer.expect_open().times(1).returning(|| Ok(()));
3080 writer.expect_write().times(2).returning(|_| Ok(()));
3081 writer.expect_flush().times(2).returning(|| Ok(()));
3082 writer.expect_close().times(1).returning(|| Ok(()));
3083
3084 let step = StepBuilder::new("test")
3085 .chunk(3)
3086 .reader(&reader)
3087 .processor(&processor)
3088 .writer(&writer)
3089 .skip_limit(2) .build();
3091
3092 let mut step_execution = StepExecution::new(&step.name);
3093
3094 let result = step.execute(&mut step_execution);
3095
3096 assert!(result.is_ok());
3097 assert_eq!(step_execution.status, StepStatus::Success);
3098 assert_eq!(step_execution.process_error_count, 2);
3099
3100 Ok(())
3101 }
3102
3103 #[test]
3104 fn step_should_handle_manage_error_with_various_errors() -> Result<()> {
3105 let mut reader = MockTestItemReader::default();
3106 reader.expect_read().return_once(|| Ok(None));
3107
3108 let mut processor = MockTestProcessor::default();
3109 processor.expect_process().never();
3110
3111 let mut writer = MockTestItemWriter::default();
3112 writer
3113 .expect_open()
3114 .times(1)
3115 .returning(|| Err(BatchError::ItemWriter("open error".to_string())));
3116 writer.expect_write().never();
3117 writer
3118 .expect_close()
3119 .times(1)
3120 .returning(|| Err(BatchError::ItemWriter("close error".to_string())));
3121
3122 let step = StepBuilder::new("test")
3123 .chunk(3)
3124 .reader(&reader)
3125 .processor(&processor)
3126 .writer(&writer)
3127 .build();
3128
3129 let mut step_execution = StepExecution::new(&step.name);
3130
3131 let result = step.execute(&mut step_execution);
3132
3133 assert!(result.is_ok());
3135 assert_eq!(step_execution.status, StepStatus::Success);
3136
3137 Ok(())
3138 }
3139
3140 #[test]
3141 fn step_execution_should_have_unique_ids() -> Result<()> {
3142 let step_execution1 = StepExecution::new("test1");
3143 let step_execution2 = StepExecution::new("test2");
3144
3145 assert_ne!(step_execution1.id, step_execution2.id);
3146
3147 Ok(())
3148 }
3149
3150 #[test]
3151 fn step_execution_should_clone_with_same_values() -> Result<()> {
3152 let mut step_execution = StepExecution::new("test_step");
3153 step_execution.read_count = 10;
3154 step_execution.write_count = 8;
3155 step_execution.status = StepStatus::Success;
3156
3157 let cloned_execution = step_execution.clone();
3158
3159 assert_eq!(step_execution.id, cloned_execution.id);
3160 assert_eq!(step_execution.name, cloned_execution.name);
3161 assert_eq!(step_execution.status, cloned_execution.status);
3162 assert_eq!(step_execution.read_count, cloned_execution.read_count);
3163 assert_eq!(step_execution.write_count, cloned_execution.write_count);
3164
3165 Ok(())
3166 }
3167
3168 #[test]
3169 fn step_status_should_support_copy_trait() {
3170 let status1 = StepStatus::Success;
3171 let status2 = status1; assert_eq!(status1, status2);
3174 assert_eq!(status1, StepStatus::Success); }
3176
3177 #[test]
3178 fn step_status_should_support_debug_trait() {
3179 let status = StepStatus::ProcessorError;
3180 let debug_string = format!("{:?}", status);
3181
3182 assert!(debug_string.contains("ProcessorError"));
3183 }
3184
3185 #[test]
3186 fn chunk_status_should_support_debug_trait() {
3187 let status = ChunkStatus::Full;
3188 let debug_string = format!("{:?}", status);
3189
3190 assert!(debug_string.contains("Full"));
3191 }
3192
3193 #[test]
3194 fn repeat_status_should_support_debug_trait() {
3195 let status = RepeatStatus::Continuable;
3196 let debug_string = format!("{:?}", status);
3197
3198 assert!(debug_string.contains("Continuable"));
3199 }
3200
3201 #[test]
3202 fn step_should_count_filtered_items() -> Result<()> {
3203 let mut i = 0u16;
3205 let mut reader = MockTestItemReader::default();
3206 reader
3207 .expect_read()
3208 .returning(move || mock_read(&mut i, 0, 4));
3209
3210 let mut j = 0u16;
3212 let mut processor = MockTestProcessor::default();
3213 processor.expect_process().returning(move |_| {
3214 j += 1;
3215 if j == 2 {
3216 return Ok(None); }
3218 Ok(Some(Car {
3219 year: 1979,
3220 make: "make".to_owned(),
3221 model: "model".to_owned(),
3222 description: "description".to_owned(),
3223 }))
3224 });
3225
3226 let mut writer = MockTestItemWriter::default();
3227 writer.expect_open().times(1).returning(|| Ok(()));
3228 writer.expect_write().times(1).returning(|items| {
3230 assert_eq!(items.len(), 3, "expected 3 items written after filtering");
3231 Ok(())
3232 });
3233 writer.expect_flush().returning(|| Ok(()));
3234 writer.expect_close().times(1).returning(|| Ok(()));
3235
3236 let step = StepBuilder::new("test")
3237 .chunk(10)
3238 .reader(&reader)
3239 .processor(&processor)
3240 .writer(&writer)
3241 .build();
3242
3243 let mut step_execution = StepExecution::new(&step.name);
3244 let result = step.execute(&mut step_execution);
3245
3246 assert!(result.is_ok());
3247 assert_eq!(step_execution.read_count, 4, "should have read 4 items");
3248 assert_eq!(
3249 step_execution.filter_count, 1,
3250 "should have filtered 1 item"
3251 );
3252 assert_eq!(
3253 step_execution.process_count, 3,
3254 "should have processed 3 items"
3255 );
3256 assert_eq!(step_execution.write_count, 3, "should have written 3 items");
3257
3258 Ok(())
3259 }
3260
3261 #[test]
3262 fn step_should_not_call_writer_when_all_items_filtered() -> Result<()> {
3263 let mut i = 0u16;
3264 let mut reader = MockTestItemReader::default();
3265 reader
3266 .expect_read()
3267 .returning(move || mock_read(&mut i, 0, 3));
3268
3269 let mut processor = MockTestProcessor::default();
3270 processor.expect_process().returning(|_| Ok(None)); let mut writer = MockTestItemWriter::default();
3273 writer.expect_open().times(1).returning(|| Ok(()));
3274 writer.expect_write().never(); writer.expect_close().times(1).returning(|| Ok(()));
3276
3277 let step = StepBuilder::new("test")
3278 .chunk(10)
3279 .reader(&reader)
3280 .processor(&processor)
3281 .writer(&writer)
3282 .build();
3283
3284 let mut step_execution = StepExecution::new(&step.name);
3285 let result = step.execute(&mut step_execution);
3286
3287 assert!(result.is_ok());
3288 assert_eq!(
3289 step_execution.filter_count, 3,
3290 "all 3 items should be filtered"
3291 );
3292 assert_eq!(
3293 step_execution.process_count, 0,
3294 "no items should reach process_count"
3295 );
3296 assert_eq!(step_execution.write_count, 0, "nothing should be written");
3297
3298 Ok(())
3299 }
3300}