spring_batch_rs/item/rdbc/
rdbc_writer.rs

1use serde::Serialize;
2use sqlx::{query_builder::Separated, Any, Pool, QueryBuilder};
3
4use crate::core::item::{ItemWriter, ItemWriterResult};
5
6// The number of parameters in MySQL must fit in a `u16`.
7const BIND_LIMIT: usize = 65535;
8
9pub trait RdbcItemBinder<T> {
10    fn bind(&self, item: &T, query_builder: Separated<Any, &str>);
11}
12
13pub struct RdbcItemWriter<'a, W> {
14    pool: &'a Pool<Any>,
15    table: &'a str,
16    columns: Vec<&'a str>,
17    item_binder: &'a dyn RdbcItemBinder<W>,
18}
19
20impl<'a, W> RdbcItemWriter<'a, W> {
21    /// Creates a new instance of `RdbcItemWriter`.
22    ///
23    /// # Arguments
24    ///
25    /// * `pool` - A reference to the connection pool.
26    /// * `table` - The name of the database table.
27    /// * `columns` - A vector of column names.
28    /// * `item_binder` - A reference to the item binder.
29    ///
30    /// # Returns
31    ///
32    /// A new instance of `RdbcItemWriter`.
33    pub fn new(
34        pool: &'a Pool<Any>,
35        table: &'a str,
36        columns: Vec<&'a str>,
37        item_binder: &'a dyn RdbcItemBinder<W>,
38    ) -> Self {
39        Self {
40            pool,
41            table,
42            columns,
43            item_binder,
44        }
45    }
46}
47
48impl<'a, W: Serialize + Clone> ItemWriter<W> for RdbcItemWriter<'a, W> {
49    /// Writes the items to the database.
50    ///
51    /// # Arguments
52    ///
53    /// * `items` - A slice of items to be written.
54    ///
55    /// # Returns
56    ///
57    /// An `ItemWriterResult` indicating the result of the write operation.
58    fn write(&self, items: &[W]) -> ItemWriterResult {
59        let mut query_builder = QueryBuilder::new("INSERT INTO ");
60
61        query_builder.push(self.table);
62        query_builder.push(" (");
63        query_builder.push(self.columns.join(","));
64        query_builder.push(") ");
65
66        query_builder.push_values(
67            items.iter().take(BIND_LIMIT / self.columns.len()),
68            |b: sqlx::query_builder::Separated<'_, '_, Any, &str>, item| {
69                self.item_binder.bind(item, b);
70            },
71        );
72
73        let query = query_builder.build();
74
75        let _result = tokio::task::block_in_place(|| {
76            tokio::runtime::Runtime::new()
77                .unwrap()
78                .block_on(async { query.execute(self.pool).await.unwrap() })
79        });
80
81        Ok(())
82    }
83}
84
85#[derive(Default)]
86pub struct RdbcItemWriterBuilder<'a, T> {
87    pool: Option<&'a Pool<Any>>,
88    table: Option<&'a str>,
89    columns: Vec<&'a str>,
90    item_binder: Option<&'a dyn RdbcItemBinder<T>>,
91}
92
93impl<'a, T> RdbcItemWriterBuilder<'a, T> {
94    /// Creates a new instance of `RdbcItemWriterBuilder`.
95    ///
96    /// # Returns
97    ///
98    /// A new instance of `RdbcItemWriterBuilder`.
99    pub fn new() -> Self {
100        Self {
101            pool: None,
102            table: None,
103            columns: Vec::new(),
104            item_binder: None,
105        }
106    }
107
108    /// Sets the table name for the item writer.
109    ///
110    /// # Arguments
111    ///
112    /// * `table` - The name of the database table.
113    ///
114    /// # Returns
115    ///
116    /// The updated `RdbcItemWriterBuilder` instance.
117    pub fn table(mut self, table: &'a str) -> Self {
118        self.table = Some(table);
119        self
120    }
121
122    /// Sets the connection pool for the item writer.
123    ///
124    /// # Arguments
125    ///
126    /// * `pool` - A reference to the connection pool.
127    ///
128    /// # Returns
129    ///
130    /// The updated `RdbcItemWriterBuilder` instance.
131    pub fn pool(mut self, pool: &'a Pool<Any>) -> Self {
132        self.pool = Some(pool);
133        self
134    }
135
136    /// Sets the item binder for the item writer.
137    ///
138    /// # Arguments
139    ///
140    /// * `item_binder` - A reference to the item binder.
141    ///
142    /// # Returns
143    ///
144    /// The updated `RdbcItemWriterBuilder` instance.
145    pub fn item_binder(mut self, item_binder: &'a dyn RdbcItemBinder<T>) -> Self {
146        self.item_binder = Some(item_binder);
147        self
148    }
149
150    /// Adds a column to the item writer.
151    ///
152    /// # Arguments
153    ///
154    /// * `column` - The name of the column to add.
155    ///
156    /// # Returns
157    ///
158    /// The updated `RdbcItemWriterBuilder` instance.
159    pub fn add_column(mut self, column: &'a str) -> Self {
160        self.columns.push(column);
161        self
162    }
163
164    /// Builds an instance of `RdbcItemWriter` based on the configured parameters.
165    ///
166    /// # Panics
167    ///
168    /// This method will panic if the table name is not set or if no columns are added.
169    ///
170    /// # Returns
171    ///
172    /// An instance of `RdbcItemWriter`.
173    pub fn build(self) -> RdbcItemWriter<'a, T> {
174        if self.table.is_none() {
175            panic!("Table name is mandatory");
176        }
177
178        if self.columns.is_empty() {
179            panic!("One or more columns are required");
180        }
181
182        RdbcItemWriter::new(
183            self.pool.unwrap(),
184            self.table.unwrap(),
185            self.columns.clone(),
186            self.item_binder.unwrap(),
187        )
188    }
189}