use crate::BatchError;
use sqlx::{Database, Pool};
pub const BIND_LIMIT: usize = 65535;
type ValidatedConfig<'a, O, DB> = (
&'a Pool<DB>,
&'a str,
&'a dyn super::DatabaseItemBinder<O, DB>,
);
pub fn validate_config<'a, O, DB: Database>(
pool: Option<&'a Pool<DB>>,
table: Option<&'a str>,
columns: &[&'a str],
item_binder: Option<&'a dyn super::DatabaseItemBinder<O, DB>>,
) -> Result<ValidatedConfig<'a, O, DB>, BatchError> {
if columns.is_empty() {
return Err(BatchError::ItemWriter(
"No columns specified for database write".to_string(),
));
}
let pool =
pool.ok_or_else(|| BatchError::ItemWriter("Database pool not configured".to_string()))?;
let table =
table.ok_or_else(|| BatchError::ItemWriter("Table name not configured".to_string()))?;
let item_binder = item_binder
.ok_or_else(|| BatchError::ItemWriter("Item binder not configured".to_string()))?;
Ok((pool, table, item_binder))
}
#[inline]
pub fn log_write_success(items_count: usize, table: &str, db_name: &str) {
log::debug!(
"Successfully wrote {} items to {} table {}",
items_count,
db_name,
table
);
}
pub fn create_write_error(table: &str, db_name: &str, error: impl std::fmt::Display) -> BatchError {
log::error!(
"Failed to write items to {} table {}: {}",
db_name,
table,
error
);
BatchError::ItemWriter(format!("{} write failed: {}", db_name, error))
}
#[inline]
pub fn max_items_per_batch(column_count: usize) -> usize {
BIND_LIMIT / column_count
}
#[cfg(test)]
mod tests {
use super::*;
use crate::BatchError;
use sqlx::Sqlite;
#[test]
fn should_compute_max_items_per_batch() {
assert_eq!(max_items_per_batch(1), 65535);
assert_eq!(max_items_per_batch(2), 32767);
assert_eq!(max_items_per_batch(10), 6553);
assert_eq!(max_items_per_batch(100), 655);
}
#[test]
fn should_define_bind_limit_as_65535() {
assert_eq!(BIND_LIMIT, 65535);
}
#[test]
fn should_return_error_when_columns_is_empty() {
let result = validate_config::<String, Sqlite>(None, Some("tbl"), &[], None);
match result.err().unwrap() {
BatchError::ItemWriter(msg) => assert!(msg.contains("columns"), "unexpected: {msg}"),
e => panic!("expected ItemWriter, got {e:?}"),
}
}
#[test]
fn should_return_error_when_pool_is_missing() {
let result = validate_config::<String, Sqlite>(None, Some("tbl"), &["col"], None);
match result.err().unwrap() {
BatchError::ItemWriter(msg) => assert!(msg.contains("pool"), "unexpected: {msg}"),
e => panic!("expected ItemWriter, got {e:?}"),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn should_return_error_when_table_is_missing() {
let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
let result = validate_config::<String, Sqlite>(Some(&pool), None, &["col"], None);
match result.err().unwrap() {
BatchError::ItemWriter(msg) => assert!(msg.contains("Table"), "unexpected: {msg}"),
e => panic!("expected ItemWriter, got {e:?}"),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn should_return_error_when_binder_is_missing() {
let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
let result = validate_config::<String, Sqlite>(Some(&pool), Some("tbl"), &["col"], None);
match result.err().unwrap() {
BatchError::ItemWriter(msg) => assert!(msg.contains("binder"), "unexpected: {msg}"),
e => panic!("expected ItemWriter, got {e:?}"),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn should_return_ok_when_all_config_provided() {
use crate::item::rdbc::DatabaseItemBinder;
use sqlx::query_builder::Separated;
struct DummyBinder;
impl DatabaseItemBinder<String, Sqlite> for DummyBinder {
fn bind(&self, _: &String, _: Separated<Sqlite, &str>) {}
}
let pool = sqlx::SqlitePool::connect("sqlite::memory:").await.unwrap();
let binder = DummyBinder;
let result = validate_config::<String, Sqlite>(
Some(&pool),
Some("tbl"),
&["col"],
Some(&binder as &dyn DatabaseItemBinder<String, Sqlite>),
);
assert!(
result.is_ok(),
"should return Ok when all config is provided"
);
}
#[test]
fn should_call_log_write_success_without_panic() {
log_write_success(42, "users", "PostgreSQL");
}
#[test]
fn should_create_write_error_with_formatted_message() {
let err = create_write_error("orders", "MySQL", "connection refused");
match err {
BatchError::ItemWriter(msg) => {
assert!(msg.contains("MySQL"), "missing db name: {msg}");
assert!(msg.contains("connection refused"), "missing cause: {msg}");
}
e => panic!("expected ItemWriter, got {e:?}"),
}
}
}