spring_batch_rs/item/rdbc/
mysql_reader.rs1use std::cell::{Cell, RefCell};
2
3use sqlx::{mysql::MySqlRow, Execute, FromRow, MySql, Pool, QueryBuilder};
4
5use super::reader_common::{calculate_page_index, should_load_page};
6use crate::core::item::{ItemReader, ItemReaderResult};
7use crate::BatchError;
8
9pub struct MySqlRdbcItemReader<'a, I>
16where
17 for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
18{
19 pub(crate) pool: Pool<MySql>,
20 pub(crate) query: &'a str,
21 pub(crate) page_size: Option<i32>,
22 pub(crate) offset: Cell<i32>,
23 pub(crate) buffer: RefCell<Vec<I>>,
24}
25
26impl<'a, I> MySqlRdbcItemReader<'a, I>
27where
28 for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
29{
30 pub(crate) fn new(pool: Pool<MySql>, query: &'a str, page_size: Option<i32>) -> Self {
35 Self {
36 pool,
37 query,
38 page_size,
39 offset: Cell::new(0),
40 buffer: RefCell::new(vec![]),
41 }
42 }
43
44 fn read_page(&self) -> Result<(), BatchError> {
50 let mut query_builder = QueryBuilder::<MySql>::new(self.query);
51
52 if let Some(page_size) = self.page_size {
53 query_builder.push(format!(" LIMIT {} OFFSET {}", page_size, self.offset.get()));
54 }
55
56 let query = query_builder.build();
57
58 let items = tokio::task::block_in_place(|| {
59 tokio::runtime::Handle::current().block_on(async {
60 sqlx::query_as::<_, I>(query.sql())
61 .fetch_all(&self.pool)
62 .await
63 .map_err(|e| BatchError::ItemReader(e.to_string()))
64 })
65 })?;
66
67 self.buffer.borrow_mut().clear();
68 self.buffer.borrow_mut().extend(items);
69 Ok(())
70 }
71}
72
73impl<I> ItemReader<I> for MySqlRdbcItemReader<'_, I>
74where
75 for<'r> I: FromRow<'r, MySqlRow> + Send + Unpin + Clone,
76{
77 fn read(&self) -> ItemReaderResult<I> {
79 let index = calculate_page_index(self.offset.get(), self.page_size);
80
81 if should_load_page(index) {
82 self.read_page()?;
83 }
84
85 let buffer = self.buffer.borrow();
86 let result = buffer.get(index as usize);
87
88 self.offset.set(self.offset.get() + 1);
89
90 Ok(result.cloned())
91 }
92}