spring_batch_rs/item/rdbc/
sqlite_writer.rs1use serde::Serialize;
2use sqlx::{Pool, QueryBuilder, Sqlite};
3
4use crate::core::item::{ItemWriter, ItemWriterResult};
5use crate::item::rdbc::DatabaseItemBinder;
6
7use super::writer_common::{
8 create_write_error, log_write_success, max_items_per_batch, validate_config,
9};
10
11pub struct SqliteItemWriter<'a, O> {
72 pool: Option<&'a Pool<Sqlite>>,
73 table: Option<&'a str>,
74 columns: Vec<&'a str>,
75 item_binder: Option<&'a dyn DatabaseItemBinder<O, Sqlite>>,
76}
77
78impl<'a, O> SqliteItemWriter<'a, O> {
79 pub(crate) fn new() -> Self {
81 Self {
82 pool: None,
83 table: None,
84 columns: Vec::new(),
85 item_binder: None,
86 }
87 }
88
89 pub(crate) fn pool(mut self, pool: &'a Pool<Sqlite>) -> Self {
91 self.pool = Some(pool);
92 self
93 }
94
95 pub(crate) fn table(mut self, table: &'a str) -> Self {
97 self.table = Some(table);
98 self
99 }
100
101 pub(crate) fn add_column(mut self, column: &'a str) -> Self {
103 self.columns.push(column);
104 self
105 }
106
107 pub(crate) fn item_binder(
109 mut self,
110 item_binder: &'a dyn DatabaseItemBinder<O, Sqlite>,
111 ) -> Self {
112 self.item_binder = Some(item_binder);
113 self
114 }
115}
116
117impl<'a, O> Default for SqliteItemWriter<'a, O> {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl<O: Serialize + Clone> ItemWriter<O> for SqliteItemWriter<'_, O> {
124 fn write(&self, items: &[O]) -> ItemWriterResult {
125 if items.is_empty() {
126 return Ok(());
127 }
128
129 let (pool, table, item_binder) =
131 validate_config(self.pool, self.table, &self.columns, self.item_binder)?;
132
133 let mut query_builder = QueryBuilder::new("INSERT INTO ");
135 query_builder.push(table);
136 query_builder.push(" (");
137 query_builder.push(self.columns.join(","));
138 query_builder.push(") ");
139
140 let max_items = max_items_per_batch(self.columns.len());
142 let items_to_write = items.iter().take(max_items);
143 let items_count = items_to_write.len();
144
145 query_builder.push_values(items_to_write, |b, item| {
146 item_binder.bind(item, b);
147 });
148
149 let query = query_builder.build();
151 let result = tokio::task::block_in_place(|| {
152 tokio::runtime::Handle::current().block_on(async { query.execute(pool).await })
153 });
154
155 match result {
156 Ok(_) => {
157 log_write_success(items_count, table, "SQLite");
158 Ok(())
159 }
160 Err(e) => Err(create_write_error(table, "SQLite", e)),
161 }
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use crate::core::item::ItemWriter;
169
170 #[test]
171 fn test_new_creates_default_writer() {
172 let writer = SqliteItemWriter::<String>::new();
173
174 assert!(writer.pool.is_none());
175 assert!(writer.table.is_none());
176 assert!(writer.columns.is_empty());
177 assert!(writer.item_binder.is_none());
178 }
179
180 #[test]
181 fn test_builder_pattern_configuration() {
182 let writer = SqliteItemWriter::<String>::new()
183 .table("tasks")
184 .add_column("id")
185 .add_column("title")
186 .add_column("completed");
187
188 assert_eq!(writer.table, Some("tasks"));
189 assert_eq!(writer.columns, vec!["id", "title", "completed"]);
190 }
191
192 #[test]
193 fn test_write_empty_items() {
194 use crate::item::rdbc::DatabaseItemBinder;
195 use sqlx::query_builder::Separated;
196
197 struct DummyBinder;
198 impl DatabaseItemBinder<String, Sqlite> for DummyBinder {
199 fn bind(&self, _item: &String, _query_builder: Separated<Sqlite, &str>) {}
200 }
201
202 let binder = DummyBinder;
203 let writer = SqliteItemWriter::<String>::new()
204 .table("test")
205 .add_column("value")
206 .item_binder(&binder);
207
208 let result = writer.write(&[]);
209 assert!(result.is_ok());
210 }
211
212 #[test]
213 fn should_create_via_default() {
214 let writer = SqliteItemWriter::<String>::default();
215 assert!(writer.pool.is_none());
216 assert!(writer.table.is_none());
217 assert!(writer.columns.is_empty());
218 assert!(writer.item_binder.is_none());
219 }
220
221 #[test]
222 fn should_return_error_when_columns_missing_and_items_given() {
223 use crate::{BatchError, item::rdbc::DatabaseItemBinder};
224 use sqlx::query_builder::Separated;
225
226 struct DummyBinder;
227 impl DatabaseItemBinder<String, Sqlite> for DummyBinder {
228 fn bind(&self, _: &String, _: Separated<Sqlite, &str>) {}
229 }
230 let binder = DummyBinder;
231 let writer = SqliteItemWriter::<String>::new()
232 .table("t")
233 .item_binder(&binder); let result = writer.write(&["x".to_string()]);
236 assert!(result.is_err(), "expected error for missing columns");
237 match result.err().unwrap() {
238 BatchError::ItemWriter(msg) => assert!(msg.contains("columns"), "{msg}"),
239 e => panic!("expected ItemWriter, got {e:?}"),
240 }
241 }
242
243 #[test]
244 fn should_return_error_when_pool_not_configured_and_items_given() {
245 use crate::{BatchError, item::rdbc::DatabaseItemBinder};
246 use sqlx::query_builder::Separated;
247
248 struct DummyBinder;
249 impl DatabaseItemBinder<String, Sqlite> for DummyBinder {
250 fn bind(&self, _: &String, _: Separated<Sqlite, &str>) {}
251 }
252 let binder = DummyBinder;
253 let writer = SqliteItemWriter::<String>::new()
254 .table("t")
255 .add_column("v")
256 .item_binder(&binder); let result = writer.write(&["x".to_string()]);
259 assert!(result.is_err(), "expected error for missing pool");
260 match result.err().unwrap() {
261 BatchError::ItemWriter(msg) => assert!(msg.contains("pool"), "{msg}"),
262 e => panic!("expected ItemWriter, got {e:?}"),
263 }
264 }
265
266 #[tokio::test(flavor = "multi_thread")]
267 async fn should_write_items_to_in_memory_sqlite() {
268 use crate::item::rdbc::DatabaseItemBinder;
269 use sqlx::{SqlitePool, query_builder::Separated};
270
271 struct StringBinder;
272 impl DatabaseItemBinder<String, Sqlite> for StringBinder {
273 fn bind(&self, item: &String, mut q: Separated<Sqlite, &str>) {
274 q.push_bind(item.clone());
275 }
276 }
277
278 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
279 sqlx::query("CREATE TABLE t (v TEXT NOT NULL)")
280 .execute(&pool)
281 .await
282 .unwrap();
283
284 let binder = StringBinder;
285 let writer = SqliteItemWriter::<String>::new()
286 .pool(&pool)
287 .table("t")
288 .add_column("v")
289 .item_binder(&binder);
290
291 let items = vec!["hello".to_string(), "world".to_string()];
292 writer.write(&items).unwrap();
293
294 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM t")
295 .fetch_one(&pool)
296 .await
297 .unwrap();
298 assert_eq!(count.0, 2, "both items should have been written");
299 }
300
301 #[tokio::test(flavor = "multi_thread")]
302 async fn should_return_error_when_sqlite_query_fails() {
303 use crate::{BatchError, item::rdbc::DatabaseItemBinder};
304 use sqlx::{SqlitePool, query_builder::Separated};
305
306 struct StringBinder;
307 impl DatabaseItemBinder<String, Sqlite> for StringBinder {
308 fn bind(&self, item: &String, mut q: Separated<Sqlite, &str>) {
309 q.push_bind(item.clone());
310 }
311 }
312
313 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
314 let binder = StringBinder;
316 let writer = SqliteItemWriter::<String>::new()
317 .pool(&pool)
318 .table("nonexistent_table")
319 .add_column("v")
320 .item_binder(&binder);
321
322 let result = writer.write(&["x".to_string()]);
323 match result.err().unwrap() {
324 BatchError::ItemWriter(msg) => {
325 assert!(
326 msg.contains("SQLite"),
327 "error should mention SQLite, got: {msg}"
328 )
329 }
330 e => panic!("expected ItemWriter error, got {e:?}"),
331 }
332 }
333}