Skip to main content

spring_batch_rs/item/rdbc/
postgres_writer.rs

1use serde::Serialize;
2use sqlx::{Pool, Postgres, QueryBuilder};
3
4use crate::core::item::{ItemWriter, ItemWriterResult};
5use crate::item::rdbc::DatabaseItemBinder;
6
7use super::writer_common::{
8    create_write_error, log_write_success, max_items_per_batch, validate_config,
9};
10
11/// A writer for inserting items into a PostgreSQL database using SQLx.
12///
13/// This writer provides an implementation of the `ItemWriter` trait for PostgreSQL operations.
14/// It supports batch inserting items into a specified table with the provided columns.
15///
16/// # PostgreSQL-Specific Features
17///
18/// - Supports PostgreSQL's advanced data types (JSON, arrays, custom types)
19/// - Handles PostgreSQL's SERIAL and BIGSERIAL for auto-incrementing columns
20/// - Supports PostgreSQL's UPSERT operations with ON CONFLICT clauses
21/// - Leverages PostgreSQL's efficient bulk insert capabilities
22/// - Compatible with PostgreSQL's connection pooling and prepared statements
23///
24/// # Construction
25///
26/// This writer can only be created through `RdbcItemWriterBuilder`.
27/// Direct construction is not available to ensure proper configuration.
28///
29/// # Examples
30///
31/// ```no_run
32/// use spring_batch_rs::item::rdbc::{RdbcItemWriterBuilder, DatabaseItemBinder};
33/// use spring_batch_rs::core::item::ItemWriter;
34/// use sqlx::{PgPool, query_builder::Separated, Postgres};
35/// use serde::Serialize;
36///
37/// #[derive(Clone, Serialize)]
38/// struct User {
39///     id: i32,
40///     name: String,
41/// }
42///
43/// struct UserBinder;
44/// impl DatabaseItemBinder<User, Postgres> for UserBinder {
45///     fn bind(&self, item: &User, mut query_builder: Separated<Postgres, &str>) {
46///         let _ = (item, query_builder); // Placeholder to avoid unused warnings
47///         // In real usage: query_builder.push_bind(item.id);
48///         // In real usage: query_builder.push_bind(&item.name);
49///     }
50/// }
51///
52/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53/// let pool = PgPool::connect("postgresql://user:pass@localhost/db").await?;
54/// let binder = UserBinder;
55///
56/// let writer = RdbcItemWriterBuilder::<User>::new()
57///     .postgres(&pool)
58///     .table("users")
59///     .add_column("id")
60///     .add_column("name")
61///     .postgres_binder(&binder)
62///     .build_postgres();
63///
64/// let users = vec![
65///     User { id: 1, name: "Alice".to_string() },
66///     User { id: 2, name: "Bob".to_string() },
67/// ];
68///
69/// writer.write(&users)?;
70/// # Ok(())
71/// # }
72/// ```
73pub struct PostgresItemWriter<'a, O> {
74    pub(crate) pool: Option<&'a Pool<Postgres>>,
75    pub(crate) table: Option<&'a str>,
76    pub(crate) columns: Vec<&'a str>,
77    pub(crate) item_binder: Option<&'a dyn DatabaseItemBinder<O, Postgres>>,
78}
79
80impl<'a, O> PostgresItemWriter<'a, O> {
81    /// Creates a new `PostgresItemWriter` with default configuration.
82    ///
83    /// This constructor is only accessible within the crate to enforce the use
84    /// of `RdbcItemWriterBuilder` for creating writer instances.
85    pub(crate) fn new() -> Self {
86        Self {
87            pool: None,
88            table: None,
89            columns: Vec::new(),
90            item_binder: None,
91        }
92    }
93
94    /// Sets the database connection pool for the writer.
95    pub(crate) fn pool(mut self, pool: &'a Pool<Postgres>) -> Self {
96        self.pool = Some(pool);
97        self
98    }
99
100    /// Sets the table name for the writer.
101    pub(crate) fn table(mut self, table: &'a str) -> Self {
102        self.table = Some(table);
103        self
104    }
105
106    /// Adds a column to the writer.
107    pub(crate) fn add_column(mut self, column: &'a str) -> Self {
108        self.columns.push(column);
109        self
110    }
111
112    /// Sets the item binder for the writer.
113    pub(crate) fn item_binder(
114        mut self,
115        item_binder: &'a dyn DatabaseItemBinder<O, Postgres>,
116    ) -> Self {
117        self.item_binder = Some(item_binder);
118        self
119    }
120}
121
122impl<'a, O> Default for PostgresItemWriter<'a, O> {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128impl<O: Serialize + Clone> ItemWriter<O> for PostgresItemWriter<'_, O> {
129    fn write(&self, items: &[O]) -> ItemWriterResult {
130        if items.is_empty() {
131            return Ok(());
132        }
133
134        // Validate configuration
135        let (pool, table, item_binder) =
136            validate_config(self.pool, self.table, &self.columns, self.item_binder)?;
137
138        // Build INSERT query
139        let mut query_builder = QueryBuilder::new("INSERT INTO ");
140        query_builder.push(table);
141        query_builder.push(" (");
142        query_builder.push(self.columns.join(","));
143        query_builder.push(") ");
144
145        // Calculate max items per batch and add values
146        let max_items = max_items_per_batch(self.columns.len());
147        let items_to_write = items.iter().take(max_items);
148        let items_count = items_to_write.len();
149
150        query_builder.push_values(items_to_write, |b, item| {
151            item_binder.bind(item, b);
152        });
153
154        // Execute query inline (QueryBuilder lifetime requires this to be in same scope)
155        let query = query_builder.build();
156        let result = tokio::task::block_in_place(|| {
157            tokio::runtime::Handle::current().block_on(async { query.execute(pool).await })
158        });
159
160        match result {
161            Ok(_) => {
162                log_write_success(items_count, table, "PostgreSQL");
163                Ok(())
164            }
165            Err(e) => Err(create_write_error(table, "PostgreSQL", e)),
166        }
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use crate::core::item::ItemWriter;
174
175    #[test]
176    fn test_new_creates_default_writer() {
177        let writer = PostgresItemWriter::<String>::new();
178
179        assert!(writer.pool.is_none());
180        assert!(writer.table.is_none());
181        assert!(writer.columns.is_empty());
182        assert!(writer.item_binder.is_none());
183    }
184
185    #[test]
186    fn test_builder_pattern_configuration() {
187        let writer = PostgresItemWriter::<String>::new()
188            .table("users")
189            .add_column("id")
190            .add_column("name")
191            .add_column("email");
192
193        assert_eq!(writer.table, Some("users"));
194        assert_eq!(writer.columns, vec!["id", "name", "email"]);
195    }
196
197    #[test]
198    fn test_write_empty_items() {
199        use crate::item::rdbc::DatabaseItemBinder;
200        use sqlx::query_builder::Separated;
201
202        struct DummyBinder;
203        impl DatabaseItemBinder<String, Postgres> for DummyBinder {
204            fn bind(&self, _item: &String, _query_builder: Separated<Postgres, &str>) {}
205        }
206
207        let binder = DummyBinder;
208        let writer = PostgresItemWriter::<String>::new()
209            .table("test")
210            .add_column("value")
211            .item_binder(&binder);
212
213        let result = writer.write(&[]);
214        assert!(result.is_ok());
215    }
216
217    #[test]
218    fn should_return_error_when_columns_missing_and_items_given() {
219        use crate::{item::rdbc::DatabaseItemBinder, BatchError};
220        use sqlx::query_builder::Separated;
221
222        struct DummyBinder;
223        impl DatabaseItemBinder<String, Postgres> for DummyBinder {
224            fn bind(&self, _: &String, _: Separated<Postgres, &str>) {}
225        }
226        let binder = DummyBinder;
227        let writer = PostgresItemWriter::<String>::new()
228            .table("t")
229            .item_binder(&binder); // no columns
230
231        let result = writer.write(&["x".to_string()]);
232        assert!(result.is_err(), "expected error for missing columns");
233        match result.unwrap_err() {
234            BatchError::ItemWriter(msg) => assert!(msg.contains("columns"), "{msg}"),
235            e => panic!("expected ItemWriter, got {e:?}"),
236        }
237    }
238
239    #[test]
240    fn should_return_error_when_pool_not_configured_and_items_given() {
241        use crate::{item::rdbc::DatabaseItemBinder, BatchError};
242        use sqlx::query_builder::Separated;
243
244        struct DummyBinder;
245        impl DatabaseItemBinder<String, Postgres> for DummyBinder {
246            fn bind(&self, _: &String, _: Separated<Postgres, &str>) {}
247        }
248        let binder = DummyBinder;
249        let writer = PostgresItemWriter::<String>::new()
250            .table("t")
251            .add_column("v")
252            .item_binder(&binder); // no pool
253
254        let result = writer.write(&["x".to_string()]);
255        assert!(result.is_err(), "expected error for missing pool");
256        match result.unwrap_err() {
257            BatchError::ItemWriter(msg) => assert!(msg.contains("pool"), "{msg}"),
258            e => panic!("expected ItemWriter, got {e:?}"),
259        }
260    }
261}