1use crate::entity::Entity;
35use sqlx::PgPool;
36use std::fmt::Debug;
37
38pub const DEFAULT_BATCH_SIZE: usize = 100;
40
41pub const MAX_BATCH_SIZE: usize = 1000;
43
44#[derive(Debug, thiserror::Error)]
46pub enum BatchError {
47 #[error("Database error: {0}")]
49 Database(#[from] sqlx::Error),
50
51 #[error("Empty batch provided")]
53 EmptyBatch,
54
55 #[error("Batch size {count} exceeds maximum {max}", count = count, max = max)]
57 BatchSizeExceeded { count: usize, max: usize },
58
59 #[error("Serialization error: {0}")]
61 Serialization(String),
62}
63
64pub type BatchResult<T> = Result<T, BatchError>;
66
67#[derive(Debug, Clone)]
69pub struct BatchOptions {
70 batch_size: usize,
72 transaction_per_batch: bool,
74 wrap_in_transaction: bool,
76 continue_on_error: bool,
78}
79
80impl BatchOptions {
81 pub fn new() -> Self {
83 Self {
84 batch_size: DEFAULT_BATCH_SIZE,
85 transaction_per_batch: true,
86 wrap_in_transaction: true,
87 continue_on_error: false,
88 }
89 }
90
91 pub fn batch_size(mut self, size: usize) -> Self {
93 let size = size.min(MAX_BATCH_SIZE).max(1);
94 self.batch_size = size;
95 self
96 }
97
98 pub fn transaction_per_batch(mut self, value: bool) -> Self {
100 self.transaction_per_batch = value;
101 self
102 }
103
104 pub fn wrap_in_transaction(mut self, value: bool) -> Self {
106 self.wrap_in_transaction = value;
107 self
108 }
109
110 pub fn continue_on_error(mut self, value: bool) -> Self {
112 self.continue_on_error = value;
113 self
114 }
115
116 pub fn get_batch_size(&self) -> usize {
118 self.batch_size
119 }
120
121 pub fn is_transaction_per_batch(&self) -> bool {
123 self.transaction_per_batch
124 }
125
126 pub fn is_wrap_in_transaction(&self) -> bool {
128 self.wrap_in_transaction
129 }
130
131 pub fn is_continue_on_error(&self) -> bool {
133 self.continue_on_error
134 }
135}
136
137impl Default for BatchOptions {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143#[derive(Debug)]
145pub struct BatchResultInfo {
146 pub total_processed: usize,
148 pub successful: usize,
150 pub failed: usize,
152 pub batches: usize,
154}
155
156impl BatchResultInfo {
157 pub fn new(total_processed: usize, successful: usize, failed: usize, batches: usize) -> Self {
159 Self {
160 total_processed,
161 successful,
162 failed,
163 batches,
164 }
165 }
166
167 pub fn is_success(&self) -> bool {
169 self.failed == 0
170 }
171
172 pub fn success_rate(&self) -> f64 {
174 if self.total_processed == 0 {
175 return 100.0;
176 }
177 (self.successful as f64 / self.total_processed as f64) * 100.0
178 }
179}
180
181pub async fn insert_many<E>(
215 _pool: &PgPool,
216 entities: &[E],
217 options: Option<BatchOptions>,
218) -> BatchResult<BatchResultInfo>
219where
220 E: Entity + Debug + Send + Sync,
221{
222 if entities.is_empty() {
223 return Err(BatchError::EmptyBatch);
224 }
225
226 let options = options.unwrap_or_default();
227 let _batch_size = options.get_batch_size();
228
229 return Err(BatchError::Serialization(
236 "Batch insert not yet implemented. Use individual inserts or implement reflection.".to_string()
237 ));
238
239}
240
241pub async fn update_many<P>(
275 pool: &PgPool,
276 table: &str,
277 updates: &[(&str, &str)],
278 condition: &str,
279 params: &[P],
280 _options: Option<BatchOptions>,
281) -> BatchResult<u64>
282where
283 P: for<'r> sqlx::Encode<'r, sqlx::Postgres> + sqlx::Type<sqlx::Postgres> + Clone + Sync + Send,
284{
285 let set_clause: String = updates
287 .iter()
288 .enumerate()
289 .map(|(i, (col, _))| format!(r#""{}" = ${}"#, col, i + 1))
290 .collect::<Vec<_>>()
291 .join(", ");
292
293 let query = format!(
295 r#"UPDATE "{}" SET {} WHERE {}"#,
296 table, set_clause, condition
297 );
298
299 let mut query_builder = sqlx::query(&*query);
301
302 for (_, val) in updates {
304 query_builder = query_builder.bind(*val);
305 }
306
307 for param in params {
309 query_builder = query_builder.bind(param.clone());
310 }
311
312 let result = query_builder.execute(pool).await?;
313
314 Ok(result.rows_affected())
315}
316
317pub async fn delete_many<E>(
331 pool: &PgPool,
332 ids: &[E::Id],
333 options: Option<BatchOptions>,
334) -> BatchResult<u64>
335where
336 E: Entity,
337 for<'a> E::Id: sqlx::Encode<'a, sqlx::Postgres> + sqlx::Type<sqlx::Postgres> + Send + Sync,
338{
339 if ids.is_empty() {
340 return Ok(0);
341 }
342
343 let options = options.unwrap_or_default();
344 let batch_size = options.get_batch_size();
345
346 let mut total_deleted = 0;
347
348 for chunk in ids.chunks(batch_size) {
349 let placeholders: Vec<String> = chunk
350 .iter()
351 .enumerate()
352 .map(|(i, _)| format!("${}", i + 1))
353 .collect();
354
355 let query = format!(
356 r#"DELETE FROM "{}" WHERE "id" IN ({})"#,
357 E::table_name(),
358 placeholders.join(", ")
359 );
360
361 let mut query_builder = sqlx::query(&query);
362
363 for id in chunk {
364 query_builder = query_builder.bind(id);
365 }
366
367 let result = query_builder.execute(pool).await?;
368
369 total_deleted += result.rows_affected();
370 }
371
372 Ok(total_deleted)
373}
374
375pub async fn execute_batch(
390 pool: &PgPool,
391 queries: &[&str],
392 options: Option<BatchOptions>,
393) -> BatchResult<BatchResultInfo> {
394 if queries.is_empty() {
395 return Err(BatchError::EmptyBatch);
396 }
397
398 let options = options.unwrap_or_default();
399 let batch_size = options.get_batch_size();
400
401 let mut total_processed = 0;
402 let mut successful = 0;
403 let mut failed = 0;
404 let mut batches = 0;
405
406 for chunk in queries.chunks(batch_size) {
407 batches += 1;
408
409 for query in chunk {
410 total_processed += 1;
411
412 let result = sqlx::query(query).execute(pool).await;
413
414 match result {
415 Ok(_) => successful += 1,
416 Err(e) => {
417 failed += 1;
418 if !options.is_continue_on_error() {
419 return Err(BatchError::Database(e));
420 }
421 }
422 }
423 }
424 }
425
426 Ok(BatchResultInfo::new(total_processed, successful, failed, batches))
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432
433 #[test]
434 fn test_batch_options_new() {
435 let options = BatchOptions::new();
436 assert_eq!(options.get_batch_size(), DEFAULT_BATCH_SIZE);
437 assert!(options.is_transaction_per_batch());
438 assert!(options.is_wrap_in_transaction());
439 assert!(!options.is_continue_on_error());
440 }
441
442 #[test]
443 fn test_batch_options_batch_size() {
444 let options = BatchOptions::new().batch_size(500);
445 assert_eq!(options.get_batch_size(), 500);
446 }
447
448 #[test]
449 fn test_batch_options_max_batch_size() {
450 let options = BatchOptions::new().batch_size(10000);
451 assert_eq!(options.get_batch_size(), MAX_BATCH_SIZE);
452 }
453
454 #[test]
455 fn test_batch_options_min_batch_size() {
456 let options = BatchOptions::new().batch_size(0);
457 assert_eq!(options.get_batch_size(), 1);
458 }
459
460 #[test]
461 fn test_batch_options_transaction_per_batch() {
462 let options = BatchOptions::new().transaction_per_batch(false);
463 assert!(!options.is_transaction_per_batch());
464 }
465
466 #[test]
467 fn test_batch_options_continue_on_error() {
468 let options = BatchOptions::new().continue_on_error(true);
469 assert!(options.is_continue_on_error());
470 }
471
472 #[test]
473 fn test_batch_options_default() {
474 let options = BatchOptions::default();
475 assert_eq!(options.get_batch_size(), DEFAULT_BATCH_SIZE);
476 }
477
478 #[test]
479 fn test_batch_result_info_new() {
480 let info = BatchResultInfo::new(100, 95, 5, 10);
481 assert_eq!(info.total_processed, 100);
482 assert_eq!(info.successful, 95);
483 assert_eq!(info.failed, 5);
484 assert_eq!(info.batches, 10);
485 }
486
487 #[test]
488 fn test_batch_result_info_is_success() {
489 let info = BatchResultInfo::new(100, 100, 0, 10);
490 assert!(info.is_success());
491
492 let info = BatchResultInfo::new(100, 95, 5, 10);
493 assert!(!info.is_success());
494 }
495
496 #[test]
497 fn test_batch_result_info_success_rate() {
498 let info = BatchResultInfo::new(100, 75, 25, 10);
499 assert_eq!(info.success_rate(), 75.0);
500 }
501
502 #[test]
503 fn test_batch_result_info_success_rate_empty() {
504 let info = BatchResultInfo::new(0, 0, 0, 0);
505 assert_eq!(info.success_rate(), 100.0);
506 }
507}