Skip to main content

spring_batch_rs/item/rdbc/
mysql_writer.rs

1use serde::Serialize;
2use sqlx::{MySql, Pool, QueryBuilder};
3
4use crate::core::item::{ItemWriter, ItemWriterResult};
5use crate::item::rdbc::DatabaseItemBinder;
6
7use super::writer_common::{
8    create_write_error, log_write_success, max_items_per_batch, validate_config,
9};
10
11/// A writer for inserting items into a MySQL database using SQLx.
12///
13/// This writer provides an implementation of the `ItemWriter` trait for MySQL operations.
14/// It supports batch inserting items into a specified table with the provided columns.
15///
16/// # MySQL-Specific Features
17///
18/// - Supports MySQL's data types and character sets
19/// - Handles MySQL's AUTO_INCREMENT for auto-incrementing columns
20/// - Supports MySQL's INSERT ... ON DUPLICATE KEY UPDATE operations
21/// - Leverages MySQL's efficient bulk insert capabilities
22/// - Compatible with MySQL's connection pooling and prepared statements
23///
24/// # Examples
25///
26/// ```no_run
27/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
28/// use spring_batch_rs::core::item::ItemWriter;
29/// use sqlx::{MySqlPool, query_builder::Separated, MySql};
30/// use serde::Serialize;
31///
32/// #[derive(Clone, Serialize)]
33/// struct Product {
34///     id: i32,
35///     name: String,
36///     price: f64,
37/// }
38///
39/// struct ProductBinder;
40/// impl DatabaseItemBinder<Product, MySql> for ProductBinder {
41///     fn bind(&self, item: &Product, mut query_builder: Separated<MySql, &str>) {
42///         let _ = (item, query_builder); // Placeholder to avoid unused warnings
43///         // In real usage: query_builder.push_bind(item.id);
44///         // In real usage: query_builder.push_bind(&item.name);
45///         // In real usage: query_builder.push_bind(item.price);
46///     }
47/// }
48///
49/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
50/// let pool = MySqlPool::connect("mysql://user:pass@localhost/db").await?;
51/// let binder = ProductBinder;
52///
53/// let writer = RdbcItemWriterBuilder::<Product>::new()
54///     .mysql(&pool)
55///     .table("products")
56///     .add_column("id")
57///     .add_column("name")
58///     .add_column("price")
59///     .mysql_binder(&binder)
60///     .build_mysql();
61///
62/// let products = vec![
63///     Product { id: 1, name: "Laptop".to_string(), price: 999.99 },
64///     Product { id: 2, name: "Mouse".to_string(), price: 29.99 },
65/// ];
66///
67/// writer.write(&products)?;
68/// # Ok(())
69/// # }
70/// ```
71pub struct MySqlItemWriter<'a, O> {
72    pub(crate) pool: Option<&'a Pool<MySql>>,
73    pub(crate) table: Option<&'a str>,
74    pub(crate) columns: Vec<&'a str>,
75    pub(crate) item_binder: Option<&'a dyn DatabaseItemBinder<O, MySql>>,
76}
77
78impl<'a, O> MySqlItemWriter<'a, O> {
79    /// Creates a new `MySqlItemWriter` with default configuration.
80    pub(crate) fn new() -> Self {
81        Self {
82            pool: None,
83            table: None,
84            columns: Vec::new(),
85            item_binder: None,
86        }
87    }
88
89    /// Sets the database connection pool for the writer.
90    pub(crate) fn pool(mut self, pool: &'a Pool<MySql>) -> Self {
91        self.pool = Some(pool);
92        self
93    }
94
95    /// Sets the table name for the writer.
96    pub(crate) fn table(mut self, table: &'a str) -> Self {
97        self.table = Some(table);
98        self
99    }
100
101    /// Adds a column to the writer.
102    pub(crate) fn add_column(mut self, column: &'a str) -> Self {
103        self.columns.push(column);
104        self
105    }
106
107    /// Sets the item binder for the writer.
108    pub(crate) fn item_binder(mut self, item_binder: &'a dyn DatabaseItemBinder<O, MySql>) -> Self {
109        self.item_binder = Some(item_binder);
110        self
111    }
112}
113
114impl<'a, O> Default for MySqlItemWriter<'a, O> {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl<O: Serialize + Clone> ItemWriter<O> for MySqlItemWriter<'_, O> {
121    fn write(&self, items: &[O]) -> ItemWriterResult {
122        if items.is_empty() {
123            return Ok(());
124        }
125
126        // Validate configuration
127        let (pool, table, item_binder) =
128            validate_config(self.pool, self.table, &self.columns, self.item_binder)?;
129
130        // Build INSERT query
131        let mut query_builder = QueryBuilder::new("INSERT INTO ");
132        query_builder.push(table);
133        query_builder.push(" (");
134        query_builder.push(self.columns.join(","));
135        query_builder.push(") ");
136
137        // Calculate max items per batch and add values
138        let max_items = max_items_per_batch(self.columns.len());
139        let items_to_write = items.iter().take(max_items);
140        let items_count = items_to_write.len();
141
142        query_builder.push_values(items_to_write, |b, item| {
143            item_binder.bind(item, b);
144        });
145
146        // Execute query inline (QueryBuilder lifetime requires this to be in same scope)
147        let query = query_builder.build();
148        let result = tokio::task::block_in_place(|| {
149            tokio::runtime::Handle::current().block_on(async { query.execute(pool).await })
150        });
151
152        match result {
153            Ok(_) => {
154                log_write_success(items_count, table, "MySQL");
155                Ok(())
156            }
157            Err(e) => Err(create_write_error(table, "MySQL", e)),
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use crate::core::item::ItemWriter;
166
167    #[test]
168    fn test_new_creates_default_writer() {
169        let writer = MySqlItemWriter::<String>::new();
170
171        assert!(writer.pool.is_none());
172        assert!(writer.table.is_none());
173        assert!(writer.columns.is_empty());
174        assert!(writer.item_binder.is_none());
175    }
176
177    #[test]
178    fn test_builder_pattern_configuration() {
179        let writer = MySqlItemWriter::<String>::new()
180            .table("products")
181            .add_column("id")
182            .add_column("name")
183            .add_column("price");
184
185        assert_eq!(writer.table, Some("products"));
186        assert_eq!(writer.columns, vec!["id", "name", "price"]);
187    }
188
189    #[test]
190    fn test_write_empty_items() {
191        use crate::item::rdbc::DatabaseItemBinder;
192        use sqlx::query_builder::Separated;
193
194        struct DummyBinder;
195        impl DatabaseItemBinder<String, MySql> for DummyBinder {
196            fn bind(&self, _item: &String, _query_builder: Separated<MySql, &str>) {}
197        }
198
199        let binder = DummyBinder;
200        let writer = MySqlItemWriter::<String>::new()
201            .table("test")
202            .add_column("value")
203            .item_binder(&binder);
204
205        let result = writer.write(&[]);
206        assert!(result.is_ok());
207    }
208
209    #[test]
210    fn should_return_error_when_columns_missing_and_items_given() {
211        use crate::{BatchError, item::rdbc::DatabaseItemBinder};
212        use sqlx::query_builder::Separated;
213
214        struct DummyBinder;
215        impl DatabaseItemBinder<String, MySql> for DummyBinder {
216            fn bind(&self, _: &String, _: Separated<MySql, &str>) {}
217        }
218        let binder = DummyBinder;
219        let writer = MySqlItemWriter::<String>::new()
220            .table("t")
221            .item_binder(&binder); // no columns
222
223        let result = writer.write(&["x".to_string()]);
224        assert!(result.is_err(), "expected error for missing columns");
225        match result.unwrap_err() {
226            BatchError::ItemWriter(msg) => assert!(msg.contains("columns"), "{msg}"),
227            e => panic!("expected ItemWriter, got {e:?}"),
228        }
229    }
230
231    #[test]
232    fn should_return_error_when_pool_not_configured_and_items_given() {
233        use crate::{BatchError, item::rdbc::DatabaseItemBinder};
234        use sqlx::query_builder::Separated;
235
236        struct DummyBinder;
237        impl DatabaseItemBinder<String, MySql> for DummyBinder {
238            fn bind(&self, _: &String, _: Separated<MySql, &str>) {}
239        }
240        let binder = DummyBinder;
241        let writer = MySqlItemWriter::<String>::new()
242            .table("t")
243            .add_column("v")
244            .item_binder(&binder); // no pool
245
246        let result = writer.write(&["x".to_string()]);
247        assert!(result.is_err(), "expected error for missing pool");
248        match result.unwrap_err() {
249            BatchError::ItemWriter(msg) => assert!(msg.contains("pool"), "{msg}"),
250            e => panic!("expected ItemWriter, got {e:?}"),
251        }
252    }
253}