spring_batch_rs/item/rdbc/
postgres_writer.rs1use serde::Serialize;
2use sqlx::{Pool, Postgres, QueryBuilder};
3
4use crate::core::item::{ItemWriter, ItemWriterResult};
5use crate::item::rdbc::DatabaseItemBinder;
6
7use super::writer_common::{
8 create_write_error, log_write_success, max_items_per_batch, validate_config,
9};
10
11pub struct PostgresItemWriter<'a, O> {
74 pub(crate) pool: Option<&'a Pool<Postgres>>,
75 pub(crate) table: Option<&'a str>,
76 pub(crate) columns: Vec<&'a str>,
77 pub(crate) item_binder: Option<&'a dyn DatabaseItemBinder<O, Postgres>>,
78}
79
80impl<'a, O> PostgresItemWriter<'a, O> {
81 pub(crate) fn new() -> Self {
86 Self {
87 pool: None,
88 table: None,
89 columns: Vec::new(),
90 item_binder: None,
91 }
92 }
93
94 pub(crate) fn pool(mut self, pool: &'a Pool<Postgres>) -> Self {
96 self.pool = Some(pool);
97 self
98 }
99
100 pub(crate) fn table(mut self, table: &'a str) -> Self {
102 self.table = Some(table);
103 self
104 }
105
106 pub(crate) fn add_column(mut self, column: &'a str) -> Self {
108 self.columns.push(column);
109 self
110 }
111
112 pub(crate) fn item_binder(
114 mut self,
115 item_binder: &'a dyn DatabaseItemBinder<O, Postgres>,
116 ) -> Self {
117 self.item_binder = Some(item_binder);
118 self
119 }
120}
121
122impl<'a, O> Default for PostgresItemWriter<'a, O> {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128impl<O: Serialize + Clone> ItemWriter<O> for PostgresItemWriter<'_, O> {
129 fn write(&self, items: &[O]) -> ItemWriterResult {
130 if items.is_empty() {
131 return Ok(());
132 }
133
134 let (pool, table, item_binder) =
136 validate_config(self.pool, self.table, &self.columns, self.item_binder)?;
137
138 let mut query_builder = QueryBuilder::new("INSERT INTO ");
140 query_builder.push(table);
141 query_builder.push(" (");
142 query_builder.push(self.columns.join(","));
143 query_builder.push(") ");
144
145 let max_items = max_items_per_batch(self.columns.len());
147 let items_to_write = items.iter().take(max_items);
148 let items_count = items_to_write.len();
149
150 query_builder.push_values(items_to_write, |b, item| {
151 item_binder.bind(item, b);
152 });
153
154 let query = query_builder.build();
156 let result = tokio::task::block_in_place(|| {
157 tokio::runtime::Handle::current().block_on(async { query.execute(pool).await })
158 });
159
160 match result {
161 Ok(_) => {
162 log_write_success(items_count, table, "PostgreSQL");
163 Ok(())
164 }
165 Err(e) => Err(create_write_error(table, "PostgreSQL", e)),
166 }
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use crate::core::item::ItemWriter;
174
175 #[test]
176 fn test_new_creates_default_writer() {
177 let writer = PostgresItemWriter::<String>::new();
178
179 assert!(writer.pool.is_none());
180 assert!(writer.table.is_none());
181 assert!(writer.columns.is_empty());
182 assert!(writer.item_binder.is_none());
183 }
184
185 #[test]
186 fn test_builder_pattern_configuration() {
187 let writer = PostgresItemWriter::<String>::new()
188 .table("users")
189 .add_column("id")
190 .add_column("name")
191 .add_column("email");
192
193 assert_eq!(writer.table, Some("users"));
194 assert_eq!(writer.columns, vec!["id", "name", "email"]);
195 }
196
197 #[test]
198 fn test_write_empty_items() {
199 use crate::item::rdbc::DatabaseItemBinder;
200 use sqlx::query_builder::Separated;
201
202 struct DummyBinder;
203 impl DatabaseItemBinder<String, Postgres> for DummyBinder {
204 fn bind(&self, _item: &String, _query_builder: Separated<Postgres, &str>) {}
205 }
206
207 let binder = DummyBinder;
208 let writer = PostgresItemWriter::<String>::new()
209 .table("test")
210 .add_column("value")
211 .item_binder(&binder);
212
213 let result = writer.write(&[]);
214 assert!(result.is_ok());
215 }
216
217 #[test]
218 fn should_return_error_when_columns_missing_and_items_given() {
219 use crate::{item::rdbc::DatabaseItemBinder, BatchError};
220 use sqlx::query_builder::Separated;
221
222 struct DummyBinder;
223 impl DatabaseItemBinder<String, Postgres> for DummyBinder {
224 fn bind(&self, _: &String, _: Separated<Postgres, &str>) {}
225 }
226 let binder = DummyBinder;
227 let writer = PostgresItemWriter::<String>::new()
228 .table("t")
229 .item_binder(&binder); let result = writer.write(&["x".to_string()]);
232 assert!(result.is_err(), "expected error for missing columns");
233 match result.unwrap_err() {
234 BatchError::ItemWriter(msg) => assert!(msg.contains("columns"), "{msg}"),
235 e => panic!("expected ItemWriter, got {e:?}"),
236 }
237 }
238
239 #[test]
240 fn should_return_error_when_pool_not_configured_and_items_given() {
241 use crate::{item::rdbc::DatabaseItemBinder, BatchError};
242 use sqlx::query_builder::Separated;
243
244 struct DummyBinder;
245 impl DatabaseItemBinder<String, Postgres> for DummyBinder {
246 fn bind(&self, _: &String, _: Separated<Postgres, &str>) {}
247 }
248 let binder = DummyBinder;
249 let writer = PostgresItemWriter::<String>::new()
250 .table("t")
251 .add_column("v")
252 .item_binder(&binder); let result = writer.write(&["x".to_string()]);
255 assert!(result.is_err(), "expected error for missing pool");
256 match result.unwrap_err() {
257 BatchError::ItemWriter(msg) => assert!(msg.contains("pool"), "{msg}"),
258 e => panic!("expected ItemWriter, got {e:?}"),
259 }
260 }
261}