spring_batch_rs/item/rdbc/
mysql_writer.rs1use serde::Serialize;
2use sqlx::{MySql, Pool, QueryBuilder};
3
4use crate::core::item::{ItemWriter, ItemWriterResult};
5use crate::item::rdbc::DatabaseItemBinder;
6
7use super::writer_common::{
8 create_write_error, log_write_success, max_items_per_batch, validate_config,
9};
10
11pub struct MySqlItemWriter<'a, O> {
72 pub(crate) pool: Option<&'a Pool<MySql>>,
73 pub(crate) table: Option<&'a str>,
74 pub(crate) columns: Vec<&'a str>,
75 pub(crate) item_binder: Option<&'a dyn DatabaseItemBinder<O, MySql>>,
76}
77
78impl<'a, O> MySqlItemWriter<'a, O> {
79 pub(crate) fn new() -> Self {
81 Self {
82 pool: None,
83 table: None,
84 columns: Vec::new(),
85 item_binder: None,
86 }
87 }
88
89 pub(crate) fn pool(mut self, pool: &'a Pool<MySql>) -> Self {
91 self.pool = Some(pool);
92 self
93 }
94
95 pub(crate) fn table(mut self, table: &'a str) -> Self {
97 self.table = Some(table);
98 self
99 }
100
101 pub(crate) fn add_column(mut self, column: &'a str) -> Self {
103 self.columns.push(column);
104 self
105 }
106
107 pub(crate) fn item_binder(mut self, item_binder: &'a dyn DatabaseItemBinder<O, MySql>) -> Self {
109 self.item_binder = Some(item_binder);
110 self
111 }
112}
113
114impl<'a, O> Default for MySqlItemWriter<'a, O> {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl<O: Serialize + Clone> ItemWriter<O> for MySqlItemWriter<'_, O> {
121 fn write(&self, items: &[O]) -> ItemWriterResult {
122 if items.is_empty() {
123 return Ok(());
124 }
125
126 let (pool, table, item_binder) =
128 validate_config(self.pool, self.table, &self.columns, self.item_binder)?;
129
130 let mut query_builder = QueryBuilder::new("INSERT INTO ");
132 query_builder.push(table);
133 query_builder.push(" (");
134 query_builder.push(self.columns.join(","));
135 query_builder.push(") ");
136
137 let max_items = max_items_per_batch(self.columns.len());
139 let items_to_write = items.iter().take(max_items);
140 let items_count = items_to_write.len();
141
142 query_builder.push_values(items_to_write, |b, item| {
143 item_binder.bind(item, b);
144 });
145
146 let query = query_builder.build();
148 let result = tokio::task::block_in_place(|| {
149 tokio::runtime::Handle::current().block_on(async { query.execute(pool).await })
150 });
151
152 match result {
153 Ok(_) => {
154 log_write_success(items_count, table, "MySQL");
155 Ok(())
156 }
157 Err(e) => Err(create_write_error(table, "MySQL", e)),
158 }
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use crate::core::item::ItemWriter;
166
167 #[test]
168 fn test_new_creates_default_writer() {
169 let writer = MySqlItemWriter::<String>::new();
170
171 assert!(writer.pool.is_none());
172 assert!(writer.table.is_none());
173 assert!(writer.columns.is_empty());
174 assert!(writer.item_binder.is_none());
175 }
176
177 #[test]
178 fn test_builder_pattern_configuration() {
179 let writer = MySqlItemWriter::<String>::new()
180 .table("products")
181 .add_column("id")
182 .add_column("name")
183 .add_column("price");
184
185 assert_eq!(writer.table, Some("products"));
186 assert_eq!(writer.columns, vec!["id", "name", "price"]);
187 }
188
189 #[test]
190 fn test_write_empty_items() {
191 use crate::item::rdbc::DatabaseItemBinder;
192 use sqlx::query_builder::Separated;
193
194 struct DummyBinder;
195 impl DatabaseItemBinder<String, MySql> for DummyBinder {
196 fn bind(&self, _item: &String, _query_builder: Separated<MySql, &str>) {}
197 }
198
199 let binder = DummyBinder;
200 let writer = MySqlItemWriter::<String>::new()
201 .table("test")
202 .add_column("value")
203 .item_binder(&binder);
204
205 let result = writer.write(&[]);
206 assert!(result.is_ok());
207 }
208
209 #[test]
210 fn should_return_error_when_columns_missing_and_items_given() {
211 use crate::{BatchError, item::rdbc::DatabaseItemBinder};
212 use sqlx::query_builder::Separated;
213
214 struct DummyBinder;
215 impl DatabaseItemBinder<String, MySql> for DummyBinder {
216 fn bind(&self, _: &String, _: Separated<MySql, &str>) {}
217 }
218 let binder = DummyBinder;
219 let writer = MySqlItemWriter::<String>::new()
220 .table("t")
221 .item_binder(&binder); let result = writer.write(&["x".to_string()]);
224 assert!(result.is_err(), "expected error for missing columns");
225 match result.unwrap_err() {
226 BatchError::ItemWriter(msg) => assert!(msg.contains("columns"), "{msg}"),
227 e => panic!("expected ItemWriter, got {e:?}"),
228 }
229 }
230
231 #[test]
232 fn should_return_error_when_pool_not_configured_and_items_given() {
233 use crate::{BatchError, item::rdbc::DatabaseItemBinder};
234 use sqlx::query_builder::Separated;
235
236 struct DummyBinder;
237 impl DatabaseItemBinder<String, MySql> for DummyBinder {
238 fn bind(&self, _: &String, _: Separated<MySql, &str>) {}
239 }
240 let binder = DummyBinder;
241 let writer = MySqlItemWriter::<String>::new()
242 .table("t")
243 .add_column("v")
244 .item_binder(&binder); let result = writer.write(&["x".to_string()]);
247 assert!(result.is_err(), "expected error for missing pool");
248 match result.unwrap_err() {
249 BatchError::ItemWriter(msg) => assert!(msg.contains("pool"), "{msg}"),
250 e => panic!("expected ItemWriter, got {e:?}"),
251 }
252 }
253}