spring_batch_rs/item/rdbc/rdbc_writer.rs
1use serde::Serialize;
2use sqlx::{query_builder::Separated, Any, Pool, QueryBuilder};
3
4use crate::core::item::{ItemWriter, ItemWriterResult};
5
6// The number of parameters in MySQL must fit in a `u16`.
7const BIND_LIMIT: usize = 65535;
8
9pub trait RdbcItemBinder<T> {
10 fn bind(&self, item: &T, query_builder: Separated<Any, &str>);
11}
12
13pub struct RdbcItemWriter<'a, W> {
14 pool: &'a Pool<Any>,
15 table: &'a str,
16 columns: Vec<&'a str>,
17 item_binder: &'a dyn RdbcItemBinder<W>,
18}
19
20impl<'a, W> RdbcItemWriter<'a, W> {
21 /// Creates a new instance of `RdbcItemWriter`.
22 ///
23 /// # Arguments
24 ///
25 /// * `pool` - A reference to the connection pool.
26 /// * `table` - The name of the database table.
27 /// * `columns` - A vector of column names.
28 /// * `item_binder` - A reference to the item binder.
29 ///
30 /// # Returns
31 ///
32 /// A new instance of `RdbcItemWriter`.
33 pub fn new(
34 pool: &'a Pool<Any>,
35 table: &'a str,
36 columns: Vec<&'a str>,
37 item_binder: &'a dyn RdbcItemBinder<W>,
38 ) -> Self {
39 Self {
40 pool,
41 table,
42 columns,
43 item_binder,
44 }
45 }
46}
47
48impl<'a, W: Serialize + Clone> ItemWriter<W> for RdbcItemWriter<'a, W> {
49 /// Writes the items to the database.
50 ///
51 /// # Arguments
52 ///
53 /// * `items` - A slice of items to be written.
54 ///
55 /// # Returns
56 ///
57 /// An `ItemWriterResult` indicating the result of the write operation.
58 fn write(&self, items: &[W]) -> ItemWriterResult {
59 let mut query_builder = QueryBuilder::new("INSERT INTO ");
60
61 query_builder.push(self.table);
62 query_builder.push(" (");
63 query_builder.push(self.columns.join(","));
64 query_builder.push(") ");
65
66 query_builder.push_values(
67 items.iter().take(BIND_LIMIT / self.columns.len()),
68 |b: sqlx::query_builder::Separated<'_, '_, Any, &str>, item| {
69 self.item_binder.bind(item, b);
70 },
71 );
72
73 let query = query_builder.build();
74
75 let _result = tokio::task::block_in_place(|| {
76 tokio::runtime::Runtime::new()
77 .unwrap()
78 .block_on(async { query.execute(self.pool).await.unwrap() })
79 });
80
81 Ok(())
82 }
83}
84
85#[derive(Default)]
86pub struct RdbcItemWriterBuilder<'a, T> {
87 pool: Option<&'a Pool<Any>>,
88 table: Option<&'a str>,
89 columns: Vec<&'a str>,
90 item_binder: Option<&'a dyn RdbcItemBinder<T>>,
91}
92
93impl<'a, T> RdbcItemWriterBuilder<'a, T> {
94 /// Creates a new instance of `RdbcItemWriterBuilder`.
95 ///
96 /// # Returns
97 ///
98 /// A new instance of `RdbcItemWriterBuilder`.
99 pub fn new() -> Self {
100 Self {
101 pool: None,
102 table: None,
103 columns: Vec::new(),
104 item_binder: None,
105 }
106 }
107
108 /// Sets the table name for the item writer.
109 ///
110 /// # Arguments
111 ///
112 /// * `table` - The name of the database table.
113 ///
114 /// # Returns
115 ///
116 /// The updated `RdbcItemWriterBuilder` instance.
117 pub fn table(mut self, table: &'a str) -> Self {
118 self.table = Some(table);
119 self
120 }
121
122 /// Sets the connection pool for the item writer.
123 ///
124 /// # Arguments
125 ///
126 /// * `pool` - A reference to the connection pool.
127 ///
128 /// # Returns
129 ///
130 /// The updated `RdbcItemWriterBuilder` instance.
131 pub fn pool(mut self, pool: &'a Pool<Any>) -> Self {
132 self.pool = Some(pool);
133 self
134 }
135
136 /// Sets the item binder for the item writer.
137 ///
138 /// # Arguments
139 ///
140 /// * `item_binder` - A reference to the item binder.
141 ///
142 /// # Returns
143 ///
144 /// The updated `RdbcItemWriterBuilder` instance.
145 pub fn item_binder(mut self, item_binder: &'a dyn RdbcItemBinder<T>) -> Self {
146 self.item_binder = Some(item_binder);
147 self
148 }
149
150 /// Adds a column to the item writer.
151 ///
152 /// # Arguments
153 ///
154 /// * `column` - The name of the column to add.
155 ///
156 /// # Returns
157 ///
158 /// The updated `RdbcItemWriterBuilder` instance.
159 pub fn add_column(mut self, column: &'a str) -> Self {
160 self.columns.push(column);
161 self
162 }
163
164 /// Builds an instance of `RdbcItemWriter` based on the configured parameters.
165 ///
166 /// # Panics
167 ///
168 /// This method will panic if the table name is not set or if no columns are added.
169 ///
170 /// # Returns
171 ///
172 /// An instance of `RdbcItemWriter`.
173 pub fn build(self) -> RdbcItemWriter<'a, T> {
174 if self.table.is_none() {
175 panic!("Table name is mandatory");
176 }
177
178 if self.columns.is_empty() {
179 panic!("One or more columns are required");
180 }
181
182 RdbcItemWriter::new(
183 self.pool.unwrap(),
184 self.table.unwrap(),
185 self.columns.clone(),
186 self.item_binder.unwrap(),
187 )
188 }
189}