use serde::Serialize;
use sqlx::{MySql, Pool, QueryBuilder};
use crate::core::item::{ItemWriter, ItemWriterResult};
use crate::item::rdbc::DatabaseItemBinder;
use super::writer_common::{
create_write_error, log_write_success, max_items_per_batch, validate_config,
};
pub struct MySqlItemWriter<'a, O> {
pub(crate) pool: Option<&'a Pool<MySql>>,
pub(crate) table: Option<&'a str>,
pub(crate) columns: Vec<&'a str>,
pub(crate) item_binder: Option<&'a dyn DatabaseItemBinder<O, MySql>>,
}
impl<'a, O> MySqlItemWriter<'a, O> {
pub(crate) fn new() -> Self {
Self {
pool: None,
table: None,
columns: Vec::new(),
item_binder: None,
}
}
pub(crate) fn pool(mut self, pool: &'a Pool<MySql>) -> Self {
self.pool = Some(pool);
self
}
pub(crate) fn table(mut self, table: &'a str) -> Self {
self.table = Some(table);
self
}
pub(crate) fn add_column(mut self, column: &'a str) -> Self {
self.columns.push(column);
self
}
pub(crate) fn item_binder(mut self, item_binder: &'a dyn DatabaseItemBinder<O, MySql>) -> Self {
self.item_binder = Some(item_binder);
self
}
}
impl<'a, O> Default for MySqlItemWriter<'a, O> {
fn default() -> Self {
Self::new()
}
}
impl<O: Serialize + Clone> ItemWriter<O> for MySqlItemWriter<'_, O> {
fn write(&self, items: &[O]) -> ItemWriterResult {
if items.is_empty() {
return Ok(());
}
let (pool, table, item_binder) =
validate_config(self.pool, self.table, &self.columns, self.item_binder)?;
let mut query_builder = QueryBuilder::new("INSERT INTO ");
query_builder.push(table);
query_builder.push(" (");
query_builder.push(self.columns.join(","));
query_builder.push(") ");
let max_items = max_items_per_batch(self.columns.len());
let items_to_write = items.iter().take(max_items);
let items_count = items_to_write.len();
query_builder.push_values(items_to_write, |b, item| {
item_binder.bind(item, b);
});
let query = query_builder.build();
let result = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { query.execute(pool).await })
});
match result {
Ok(_) => {
log_write_success(items_count, table, "MySQL");
Ok(())
}
Err(e) => Err(create_write_error(table, "MySQL", e)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::item::ItemWriter;
#[test]
fn test_new_creates_default_writer() {
let writer = MySqlItemWriter::<String>::new();
assert!(writer.pool.is_none());
assert!(writer.table.is_none());
assert!(writer.columns.is_empty());
assert!(writer.item_binder.is_none());
}
#[test]
fn test_builder_pattern_configuration() {
let writer = MySqlItemWriter::<String>::new()
.table("products")
.add_column("id")
.add_column("name")
.add_column("price");
assert_eq!(writer.table, Some("products"));
assert_eq!(writer.columns, vec!["id", "name", "price"]);
}
#[test]
fn test_write_empty_items() {
use crate::item::rdbc::DatabaseItemBinder;
use sqlx::query_builder::Separated;
struct DummyBinder;
impl DatabaseItemBinder<String, MySql> for DummyBinder {
fn bind(&self, _item: &String, _query_builder: Separated<MySql, &str>) {}
}
let binder = DummyBinder;
let writer = MySqlItemWriter::<String>::new()
.table("test")
.add_column("value")
.item_binder(&binder);
let result = writer.write(&[]);
assert!(result.is_ok());
}
#[test]
fn should_return_error_when_columns_missing_and_items_given() {
use crate::{BatchError, item::rdbc::DatabaseItemBinder};
use sqlx::query_builder::Separated;
struct DummyBinder;
impl DatabaseItemBinder<String, MySql> for DummyBinder {
fn bind(&self, _: &String, _: Separated<MySql, &str>) {}
}
let binder = DummyBinder;
let writer = MySqlItemWriter::<String>::new()
.table("t")
.item_binder(&binder);
let result = writer.write(&["x".to_string()]);
assert!(result.is_err(), "expected error for missing columns");
match result.unwrap_err() {
BatchError::ItemWriter(msg) => assert!(msg.contains("columns"), "{msg}"),
e => panic!("expected ItemWriter, got {e:?}"),
}
}
#[test]
fn should_return_error_when_pool_not_configured_and_items_given() {
use crate::{BatchError, item::rdbc::DatabaseItemBinder};
use sqlx::query_builder::Separated;
struct DummyBinder;
impl DatabaseItemBinder<String, MySql> for DummyBinder {
fn bind(&self, _: &String, _: Separated<MySql, &str>) {}
}
let binder = DummyBinder;
let writer = MySqlItemWriter::<String>::new()
.table("t")
.add_column("v")
.item_binder(&binder);
let result = writer.write(&["x".to_string()]);
assert!(result.is_err(), "expected error for missing pool");
match result.unwrap_err() {
BatchError::ItemWriter(msg) => assert!(msg.contains("pool"), "{msg}"),
e => panic!("expected ItemWriter, got {e:?}"),
}
}
}