Skip to main content

spring_batch_rs/item/rdbc/
mysql_reader.rs

1use std::cell::{Cell, RefCell};
2
3use sqlx::{mysql::MySqlRow, Execute, FromRow, MySql, Pool, QueryBuilder};
4
5use super::reader_common::{calculate_page_index, should_load_page};
6use crate::core::item::{ItemReader, ItemReaderResult};
7use crate::BatchError;
8
9/// MySQL RDBC Item Reader for batch processing
10///
11/// # Construction
12///
13/// This reader can only be created through `RdbcItemReaderBuilder`.
14/// Direct construction is not available to ensure proper configuration.
15pub struct MySqlRdbcItemReader<'a, I>
16where
17    for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
18{
19    pub(crate) pool: Pool<MySql>,
20    pub(crate) query: &'a str,
21    pub(crate) page_size: Option<i32>,
22    pub(crate) offset: Cell<i32>,
23    pub(crate) buffer: RefCell<Vec<I>>,
24}
25
26impl<'a, I> MySqlRdbcItemReader<'a, I>
27where
28    for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
29{
30    /// Creates a new MySqlRdbcItemReader with the specified parameters
31    ///
32    /// This constructor is only accessible within the crate to enforce the use
33    /// of `RdbcItemReaderBuilder` for creating reader instances.
34    pub(crate) fn new(pool: Pool<MySql>, query: &'a str, page_size: Option<i32>) -> Self {
35        Self {
36            pool,
37            query,
38            page_size,
39            offset: Cell::new(0),
40            buffer: RefCell::new(vec![]),
41        }
42    }
43
44    /// Reads a page of data from the database and stores it in the internal buffer.
45    ///
46    /// # Errors
47    ///
48    /// Returns [`BatchError::ItemReader`] if the database query fails.
49    fn read_page(&self) -> Result<(), BatchError> {
50        let mut query_builder = QueryBuilder::<MySql>::new(self.query);
51
52        if let Some(page_size) = self.page_size {
53            query_builder.push(format!(" LIMIT {} OFFSET {}", page_size, self.offset.get()));
54        }
55
56        let query = query_builder.build();
57
58        let items = tokio::task::block_in_place(|| {
59            tokio::runtime::Handle::current().block_on(async {
60                sqlx::query_as::<_, I>(query.sql())
61                    .fetch_all(&self.pool)
62                    .await
63                    .map_err(|e| BatchError::ItemReader(e.to_string()))
64            })
65        })?;
66
67        self.buffer.borrow_mut().clear();
68        self.buffer.borrow_mut().extend(items);
69        Ok(())
70    }
71}
72
73impl<I> ItemReader<I> for MySqlRdbcItemReader<'_, I>
74where
75    for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
76{
77    /// Reads the next item from the MySQL database
78    fn read(&self) -> ItemReaderResult<I> {
79        let index = calculate_page_index(self.offset.get(), self.page_size);
80
81        if should_load_page(index) {
82            self.read_page()?;
83        }
84
85        let buffer = self.buffer.borrow();
86        let result = buffer.get(index as usize);
87
88        self.offset.set(self.offset.get() + 1);
89
90        Ok(result.cloned())
91    }
92}