Skip to main content

spring_batch_rs/item/rdbc/
sqlite_writer.rs

1use serde::Serialize;
2use sqlx::{Pool, QueryBuilder, Sqlite};
3
4use crate::core::item::{ItemWriter, ItemWriterResult};
5use crate::item::rdbc::DatabaseItemBinder;
6
7use super::writer_common::{
8    create_write_error, log_write_success, max_items_per_batch, validate_config,
9};
10
11/// A writer for inserting items into a SQLite database using SQLx.
12///
13/// This writer provides an implementation of the `ItemWriter` trait for SQLite operations.
14/// It supports batch inserting items into a specified table with the provided columns.
15///
16/// # SQLite-Specific Features
17///
18/// - Supports SQLite's flexible type system
19/// - Handles SQLite's AUTOINCREMENT for auto-incrementing columns
20/// - Supports SQLite's INSERT OR REPLACE operations
21/// - Leverages SQLite's efficient bulk insert capabilities
22/// - Compatible with SQLite's connection pooling and prepared statements
23///
24/// # Examples
25///
26/// ```no_run
27/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
28/// use spring_batch_rs::core::item::ItemWriter;
29/// use sqlx::{SqlitePool, query_builder::Separated, Sqlite};
30/// use serde::Serialize;
31///
32/// #[derive(Clone, Serialize)]
33/// struct Task {
34///     id: i32,
35///     title: String,
36///     completed: bool,
37/// }
38///
39/// struct TaskBinder;
40/// impl DatabaseItemBinder<Task, Sqlite> for TaskBinder {
41///     fn bind(&self, item: &Task, mut query_builder: Separated<Sqlite, &str>) {
42///         let _ = (item, query_builder); // Placeholder to avoid unused warnings
43///         // In real usage: query_builder.push_bind(item.id);
44///         // In real usage: query_builder.push_bind(&item.title);
45///         // In real usage: query_builder.push_bind(item.completed);
46///     }
47/// }
48///
49/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
50/// let pool = SqlitePool::connect("sqlite::memory:").await?;
51/// let binder = TaskBinder;
52///
53/// let writer = RdbcItemWriterBuilder::<Task>::new()
54///     .sqlite(&pool)
55///     .table("tasks")
56///     .add_column("id")
57///     .add_column("title")
58///     .add_column("completed")
59///     .sqlite_binder(&binder)
60///     .build_sqlite();
61///
62/// let tasks = vec![
63///     Task { id: 1, title: "Task 1".to_string(), completed: false },
64///     Task { id: 2, title: "Task 2".to_string(), completed: true },
65/// ];
66///
67/// writer.write(&tasks)?;
68/// # Ok(())
69/// # }
70/// ```
71pub struct SqliteItemWriter<'a, O> {
72    pool: Option<&'a Pool<Sqlite>>,
73    table: Option<&'a str>,
74    columns: Vec<&'a str>,
75    item_binder: Option<&'a dyn DatabaseItemBinder<O, Sqlite>>,
76}
77
78impl<'a, O> SqliteItemWriter<'a, O> {
79    /// Creates a new `SqliteItemWriter` with default configuration.
80    pub(crate) fn new() -> Self {
81        Self {
82            pool: None,
83            table: None,
84            columns: Vec::new(),
85            item_binder: None,
86        }
87    }
88
89    /// Sets the database connection pool for the writer.
90    pub(crate) fn pool(mut self, pool: &'a Pool<Sqlite>) -> Self {
91        self.pool = Some(pool);
92        self
93    }
94
95    /// Sets the table name for the writer.
96    pub(crate) fn table(mut self, table: &'a str) -> Self {
97        self.table = Some(table);
98        self
99    }
100
101    /// Adds a column to the writer.
102    pub(crate) fn add_column(mut self, column: &'a str) -> Self {
103        self.columns.push(column);
104        self
105    }
106
107    /// Sets the item binder for the writer.
108    pub(crate) fn item_binder(
109        mut self,
110        item_binder: &'a dyn DatabaseItemBinder<O, Sqlite>,
111    ) -> Self {
112        self.item_binder = Some(item_binder);
113        self
114    }
115}
116
117impl<'a, O> Default for SqliteItemWriter<'a, O> {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl<O: Serialize + Clone> ItemWriter<O> for SqliteItemWriter<'_, O> {
124    fn write(&self, items: &[O]) -> ItemWriterResult {
125        if items.is_empty() {
126            return Ok(());
127        }
128
129        // Validate configuration
130        let (pool, table, item_binder) =
131            validate_config(self.pool, self.table, &self.columns, self.item_binder)?;
132
133        // Build INSERT query
134        let mut query_builder = QueryBuilder::new("INSERT INTO ");
135        query_builder.push(table);
136        query_builder.push(" (");
137        query_builder.push(self.columns.join(","));
138        query_builder.push(") ");
139
140        // Calculate max items per batch and add values
141        let max_items = max_items_per_batch(self.columns.len());
142        let items_to_write = items.iter().take(max_items);
143        let items_count = items_to_write.len();
144
145        query_builder.push_values(items_to_write, |b, item| {
146            item_binder.bind(item, b);
147        });
148
149        // Execute query inline (QueryBuilder lifetime requires this to be in same scope)
150        let query = query_builder.build();
151        let result = tokio::task::block_in_place(|| {
152            tokio::runtime::Handle::current().block_on(async { query.execute(pool).await })
153        });
154
155        match result {
156            Ok(_) => {
157                log_write_success(items_count, table, "SQLite");
158                Ok(())
159            }
160            Err(e) => Err(create_write_error(table, "SQLite", e)),
161        }
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use crate::core::item::ItemWriter;
169
170    #[test]
171    fn test_new_creates_default_writer() {
172        let writer = SqliteItemWriter::<String>::new();
173
174        assert!(writer.pool.is_none());
175        assert!(writer.table.is_none());
176        assert!(writer.columns.is_empty());
177        assert!(writer.item_binder.is_none());
178    }
179
180    #[test]
181    fn test_builder_pattern_configuration() {
182        let writer = SqliteItemWriter::<String>::new()
183            .table("tasks")
184            .add_column("id")
185            .add_column("title")
186            .add_column("completed");
187
188        assert_eq!(writer.table, Some("tasks"));
189        assert_eq!(writer.columns, vec!["id", "title", "completed"]);
190    }
191
192    #[test]
193    fn test_write_empty_items() {
194        use crate::item::rdbc::DatabaseItemBinder;
195        use sqlx::query_builder::Separated;
196
197        struct DummyBinder;
198        impl DatabaseItemBinder<String, Sqlite> for DummyBinder {
199            fn bind(&self, _item: &String, _query_builder: Separated<Sqlite, &str>) {}
200        }
201
202        let binder = DummyBinder;
203        let writer = SqliteItemWriter::<String>::new()
204            .table("test")
205            .add_column("value")
206            .item_binder(&binder);
207
208        let result = writer.write(&[]);
209        assert!(result.is_ok());
210    }
211
212    #[test]
213    fn should_create_via_default() {
214        let writer = SqliteItemWriter::<String>::default();
215        assert!(writer.pool.is_none());
216        assert!(writer.table.is_none());
217        assert!(writer.columns.is_empty());
218        assert!(writer.item_binder.is_none());
219    }
220
221    #[test]
222    fn should_return_error_when_columns_missing_and_items_given() {
223        use crate::{BatchError, item::rdbc::DatabaseItemBinder};
224        use sqlx::query_builder::Separated;
225
226        struct DummyBinder;
227        impl DatabaseItemBinder<String, Sqlite> for DummyBinder {
228            fn bind(&self, _: &String, _: Separated<Sqlite, &str>) {}
229        }
230        let binder = DummyBinder;
231        let writer = SqliteItemWriter::<String>::new()
232            .table("t")
233            .item_binder(&binder); // no columns
234
235        let result = writer.write(&["x".to_string()]);
236        assert!(result.is_err(), "expected error for missing columns");
237        match result.err().unwrap() {
238            BatchError::ItemWriter(msg) => assert!(msg.contains("columns"), "{msg}"),
239            e => panic!("expected ItemWriter, got {e:?}"),
240        }
241    }
242
243    #[test]
244    fn should_return_error_when_pool_not_configured_and_items_given() {
245        use crate::{BatchError, item::rdbc::DatabaseItemBinder};
246        use sqlx::query_builder::Separated;
247
248        struct DummyBinder;
249        impl DatabaseItemBinder<String, Sqlite> for DummyBinder {
250            fn bind(&self, _: &String, _: Separated<Sqlite, &str>) {}
251        }
252        let binder = DummyBinder;
253        let writer = SqliteItemWriter::<String>::new()
254            .table("t")
255            .add_column("v")
256            .item_binder(&binder); // no pool
257
258        let result = writer.write(&["x".to_string()]);
259        assert!(result.is_err(), "expected error for missing pool");
260        match result.err().unwrap() {
261            BatchError::ItemWriter(msg) => assert!(msg.contains("pool"), "{msg}"),
262            e => panic!("expected ItemWriter, got {e:?}"),
263        }
264    }
265
266    #[tokio::test(flavor = "multi_thread")]
267    async fn should_write_items_to_in_memory_sqlite() {
268        use crate::item::rdbc::DatabaseItemBinder;
269        use sqlx::{SqlitePool, query_builder::Separated};
270
271        struct StringBinder;
272        impl DatabaseItemBinder<String, Sqlite> for StringBinder {
273            fn bind(&self, item: &String, mut q: Separated<Sqlite, &str>) {
274                q.push_bind(item.clone());
275            }
276        }
277
278        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
279        sqlx::query("CREATE TABLE t (v TEXT NOT NULL)")
280            .execute(&pool)
281            .await
282            .unwrap();
283
284        let binder = StringBinder;
285        let writer = SqliteItemWriter::<String>::new()
286            .pool(&pool)
287            .table("t")
288            .add_column("v")
289            .item_binder(&binder);
290
291        let items = vec!["hello".to_string(), "world".to_string()];
292        writer.write(&items).unwrap();
293
294        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM t")
295            .fetch_one(&pool)
296            .await
297            .unwrap();
298        assert_eq!(count.0, 2, "both items should have been written");
299    }
300
301    #[tokio::test(flavor = "multi_thread")]
302    async fn should_return_error_when_sqlite_query_fails() {
303        use crate::{BatchError, item::rdbc::DatabaseItemBinder};
304        use sqlx::{SqlitePool, query_builder::Separated};
305
306        struct StringBinder;
307        impl DatabaseItemBinder<String, Sqlite> for StringBinder {
308            fn bind(&self, item: &String, mut q: Separated<Sqlite, &str>) {
309                q.push_bind(item.clone());
310            }
311        }
312
313        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
314        // Table is NOT created → INSERT will fail
315        let binder = StringBinder;
316        let writer = SqliteItemWriter::<String>::new()
317            .pool(&pool)
318            .table("nonexistent_table")
319            .add_column("v")
320            .item_binder(&binder);
321
322        let result = writer.write(&["x".to_string()]);
323        match result.err().unwrap() {
324            BatchError::ItemWriter(msg) => {
325                assert!(
326                    msg.contains("SQLite"),
327                    "error should mention SQLite, got: {msg}"
328                )
329            }
330            e => panic!("expected ItemWriter error, got {e:?}"),
331        }
332    }
333}