Skip to main content

ormkit/
batch.rs

1//! Batch operations for efficient bulk database operations
2//!
3//! This module provides utilities for performing bulk insert, update, and delete
4//! operations with automatic batching and transaction management.
5//!
6//! # Example
7//!
8//! ```rust,no_run
9//! use ormkit::batch::{insert_many, update_many, BatchOptions};
10//! # use ormkit::Entity;
11//! # use sqlx::PgPool;
12//! # use uuid::Uuid;
13//! #
14//! # #[derive(Debug, Clone)]
15//! # struct User;
16//! # impl Entity for User { type Id = Uuid; fn table_name() -> &'static str { "users" } fn id(&self) -> Self::Id { unimplemented!() } }
17//!
18//! # async fn test(pool: &PgPool) -> Result<(), Box<dyn std::error::Error>> {
19//! // Bulk insert with automatic batching
20//! let users = vec![/* ... */];
21//! let inserted = insert_many(&pool, &users, None).await?;
22//!
23//! // Bulk insert with custom batch size
24//! let options = BatchOptions::new().batch_size(100);
25//! let inserted = insert_many(&pool, &users, Some(options)).await?;
26//!
27//! // Bulk update
28//! let updates = vec![/* ... */];
29//! let updated = update_many(&pool, &updates, None).await?;
30//! # Ok(())
31//! # }
32//! ```
33
34use crate::entity::Entity;
35use sqlx::PgPool;
36use std::fmt::Debug;
37
38/// Default batch size for batch operations
39pub const DEFAULT_BATCH_SIZE: usize = 100;
40
41/// Maximum batch size to prevent statement overflow
42pub const MAX_BATCH_SIZE: usize = 1000;
43
44/// Errors that can occur during batch operations
45#[derive(Debug, thiserror::Error)]
46pub enum BatchError {
47    /// Database error
48    #[error("Database error: {0}")]
49    Database(#[from] sqlx::Error),
50
51    /// Empty batch provided
52    #[error("Empty batch provided")]
53    EmptyBatch,
54
55    /// Batch size exceeds maximum
56    #[error("Batch size {count} exceeds maximum {max}", count = count, max = max)]
57    BatchSizeExceeded { count: usize, max: usize },
58
59    /// Serialization error
60    #[error("Serialization error: {0}")]
61    Serialization(String),
62}
63
64/// Result type for batch operations
65pub type BatchResult<T> = Result<T, BatchError>;
66
67/// Options for batch operations
68#[derive(Debug, Clone)]
69pub struct BatchOptions {
70    /// Number of items to process per batch
71    batch_size: usize,
72    /// Whether to wrap each batch in a transaction
73    transaction_per_batch: bool,
74    /// Whether to wrap the entire operation in a transaction
75    wrap_in_transaction: bool,
76    /// Whether to continue on error (only valid with transaction_per_batch)
77    continue_on_error: bool,
78}
79
80impl BatchOptions {
81    /// Create new batch options with defaults
82    pub fn new() -> Self {
83        Self {
84            batch_size: DEFAULT_BATCH_SIZE,
85            transaction_per_batch: true,
86            wrap_in_transaction: true,
87            continue_on_error: false,
88        }
89    }
90
91    /// Set the batch size
92    pub fn batch_size(mut self, size: usize) -> Self {
93        let size = size.min(MAX_BATCH_SIZE).max(1);
94        self.batch_size = size;
95        self
96    }
97
98    /// Set whether to wrap each batch in a transaction
99    pub fn transaction_per_batch(mut self, value: bool) -> Self {
100        self.transaction_per_batch = value;
101        self
102    }
103
104    /// Set whether to wrap the entire operation in a transaction
105    pub fn wrap_in_transaction(mut self, value: bool) -> Self {
106        self.wrap_in_transaction = value;
107        self
108    }
109
110    /// Set whether to continue on error
111    pub fn continue_on_error(mut self, value: bool) -> Self {
112        self.continue_on_error = value;
113        self
114    }
115
116    /// Get the batch size
117    pub fn get_batch_size(&self) -> usize {
118        self.batch_size
119    }
120
121    /// Check if transactions should be used per batch
122    pub fn is_transaction_per_batch(&self) -> bool {
123        self.transaction_per_batch
124    }
125
126    /// Check if the entire operation should be wrapped in a transaction
127    pub fn is_wrap_in_transaction(&self) -> bool {
128        self.wrap_in_transaction
129    }
130
131    /// Check if operation should continue on error
132    pub fn is_continue_on_error(&self) -> bool {
133        self.continue_on_error
134    }
135}
136
137impl Default for BatchOptions {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143/// Result of a batch operation
144#[derive(Debug)]
145pub struct BatchResultInfo {
146    /// Total number of items processed
147    pub total_processed: usize,
148    /// Number of items successfully processed
149    pub successful: usize,
150    /// Number of items that failed
151    pub failed: usize,
152    /// Number of batches processed
153    pub batches: usize,
154}
155
156impl BatchResultInfo {
157    /// Create a new batch result info
158    pub fn new(total_processed: usize, successful: usize, failed: usize, batches: usize) -> Self {
159        Self {
160            total_processed,
161            successful,
162            failed,
163            batches,
164        }
165    }
166
167    /// Check if all items were processed successfully
168    pub fn is_success(&self) -> bool {
169        self.failed == 0
170    }
171
172    /// Get the success rate as a percentage
173    pub fn success_rate(&self) -> f64 {
174        if self.total_processed == 0 {
175            return 100.0;
176        }
177        (self.successful as f64 / self.total_processed as f64) * 100.0
178    }
179}
180
181/// Insert multiple entities in batches
182///
183/// This function inserts multiple entities in batches for better performance
184/// and to avoid statement size limits.
185///
186/// # Arguments
187///
188/// * `pool` - The database connection pool
189/// * `entities` - The entities to insert
190/// * `options` - Optional batch operation settings
191///
192/// # Returns
193///
194/// Information about the batch operation
195///
196/// # Example
197///
198/// ```rust,no_run
199/// use ormkit::batch::insert_many;
200/// # use ormkit::Entity;
201/// # use sqlx::PgPool;
202/// # use uuid::Uuid;
203/// # #[derive(Debug, Clone)]
204/// # struct User;
205/// # impl Entity for User { type Id = Uuid; fn table_name() -> &'static str { "users" } fn id(&self) -> Self::Id { unimplemented!() } }
206///
207/// # async fn test(pool: &PgPool) -> Result<(), Box<dyn std::error::Error>> {
208/// let users = vec![/* ... */];
209/// let result = insert_many(&pool, &users, None).await?;
210/// println!("Inserted {} users", result.successful);
211/// # Ok(())
212/// # }
213/// ```
214pub async fn insert_many<E>(
215    _pool: &PgPool,
216    entities: &[E],
217    options: Option<BatchOptions>,
218) -> BatchResult<BatchResultInfo>
219where
220    E: Entity + Debug + Send + Sync,
221{
222    if entities.is_empty() {
223        return Err(BatchError::EmptyBatch);
224    }
225
226    let options = options.unwrap_or_default();
227    let _batch_size = options.get_batch_size();
228
229    // TODO: Implement batch insert with proper reflection/macro codegen
230    // This requires either:
231    // 1. Reflection to extract field values from entities
232    // 2. Procedural macro to generate insert code for each entity type
233    //
234    // For now, return an error indicating this is not yet implemented
235    return Err(BatchError::Serialization(
236        "Batch insert not yet implemented. Use individual inserts or implement reflection.".to_string()
237    ));
238
239}
240
241/// Update multiple entities in batches
242///
243/// This function updates multiple entities in batches for better performance.
244///
245/// # Arguments
246///
247/// * `pool` - The database connection pool
248/// * `updates` - Tuple of (column, value, condition) updates
249/// * `options` - Optional batch operation settings
250///
251/// # Returns
252///
253/// Information about the batch operation
254///
255/// # Example
256///
257/// ```rust,no_run
258/// use ormkit::batch::update_many;
259/// # use sqlx::PgPool;
260///
261/// # async fn test(pool: &PgPool) -> Result<(), Box<dyn std::error::Error>> {
262/// // Update all active users to have status='verified'
263/// let result = update_many(
264///     &pool,
265///     "users",
266///     &[("status", "verified")],
267///     "status = $1",
268///     &[&"active" as &(dyn sqlx::Encode<sqlx::Postgres> + sqlx::Type<sqlx::Postgres> + Sync + Send)],
269///     None
270/// ).await?;
271/// # Ok(())
272/// # }
273/// ```
274pub async fn update_many<P>(
275    pool: &PgPool,
276    table: &str,
277    updates: &[(&str, &str)],
278    condition: &str,
279    params: &[P],
280    _options: Option<BatchOptions>,
281) -> BatchResult<u64>
282where
283    P: for<'r> sqlx::Encode<'r, sqlx::Postgres> + sqlx::Type<sqlx::Postgres> + Clone + Sync + Send,
284{
285    // Build SET clause
286    let set_clause: String = updates
287        .iter()
288        .enumerate()
289        .map(|(i, (col, _))| format!(r#""{}" = ${}"#, col, i + 1))
290        .collect::<Vec<_>>()
291        .join(", ");
292
293    // Build the query
294    let query = format!(
295        r#"UPDATE "{}" SET {} WHERE {}"#,
296        table, set_clause, condition
297    );
298
299    // Execute
300    let mut query_builder = sqlx::query(&*query);
301
302    // Bind update values
303    for (_, val) in updates {
304        query_builder = query_builder.bind(*val);
305    }
306
307    // Bind condition parameters
308    for param in params {
309        query_builder = query_builder.bind(param.clone());
310    }
311
312    let result = query_builder.execute(pool).await?;
313
314    Ok(result.rows_affected())
315}
316
317/// Delete multiple entities in batches
318///
319/// This function deletes multiple entities in batches.
320///
321/// # Arguments
322///
323/// * `pool` - The database connection pool
324/// * `ids` - The IDs of entities to delete
325/// * `options` - Optional batch operation settings
326///
327/// # Returns
328///
329/// Information about the batch operation
330pub async fn delete_many<E>(
331    pool: &PgPool,
332    ids: &[E::Id],
333    options: Option<BatchOptions>,
334) -> BatchResult<u64>
335where
336    E: Entity,
337    for<'a> E::Id: sqlx::Encode<'a, sqlx::Postgres> + sqlx::Type<sqlx::Postgres> + Send + Sync,
338{
339    if ids.is_empty() {
340        return Ok(0);
341    }
342
343    let options = options.unwrap_or_default();
344    let batch_size = options.get_batch_size();
345
346    let mut total_deleted = 0;
347
348    for chunk in ids.chunks(batch_size) {
349        let placeholders: Vec<String> = chunk
350            .iter()
351            .enumerate()
352            .map(|(i, _)| format!("${}", i + 1))
353            .collect();
354
355        let query = format!(
356            r#"DELETE FROM "{}" WHERE "id" IN ({})"#,
357            E::table_name(),
358            placeholders.join(", ")
359        );
360
361        let mut query_builder = sqlx::query(&query);
362
363        for id in chunk {
364            query_builder = query_builder.bind(id);
365        }
366
367        let result = query_builder.execute(pool).await?;
368
369        total_deleted += result.rows_affected();
370    }
371
372    Ok(total_deleted)
373}
374
375/// Execute a batch of custom queries
376///
377/// This function executes multiple queries in batches with optional
378/// transaction wrapping.
379///
380/// # Arguments
381///
382/// * `pool` - The database connection pool
383/// * `queries` - The SQL queries to execute
384/// * `options` - Optional batch operation settings
385///
386/// # Returns
387///
388/// Information about the batch operation
389pub async fn execute_batch(
390    pool: &PgPool,
391    queries: &[&str],
392    options: Option<BatchOptions>,
393) -> BatchResult<BatchResultInfo> {
394    if queries.is_empty() {
395        return Err(BatchError::EmptyBatch);
396    }
397
398    let options = options.unwrap_or_default();
399    let batch_size = options.get_batch_size();
400
401    let mut total_processed = 0;
402    let mut successful = 0;
403    let mut failed = 0;
404    let mut batches = 0;
405
406    for chunk in queries.chunks(batch_size) {
407        batches += 1;
408
409        for query in chunk {
410            total_processed += 1;
411
412            let result = sqlx::query(query).execute(pool).await;
413
414            match result {
415                Ok(_) => successful += 1,
416                Err(e) => {
417                    failed += 1;
418                    if !options.is_continue_on_error() {
419                        return Err(BatchError::Database(e));
420                    }
421                }
422            }
423        }
424    }
425
426    Ok(BatchResultInfo::new(total_processed, successful, failed, batches))
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432
433    #[test]
434    fn test_batch_options_new() {
435        let options = BatchOptions::new();
436        assert_eq!(options.get_batch_size(), DEFAULT_BATCH_SIZE);
437        assert!(options.is_transaction_per_batch());
438        assert!(options.is_wrap_in_transaction());
439        assert!(!options.is_continue_on_error());
440    }
441
442    #[test]
443    fn test_batch_options_batch_size() {
444        let options = BatchOptions::new().batch_size(500);
445        assert_eq!(options.get_batch_size(), 500);
446    }
447
448    #[test]
449    fn test_batch_options_max_batch_size() {
450        let options = BatchOptions::new().batch_size(10000);
451        assert_eq!(options.get_batch_size(), MAX_BATCH_SIZE);
452    }
453
454    #[test]
455    fn test_batch_options_min_batch_size() {
456        let options = BatchOptions::new().batch_size(0);
457        assert_eq!(options.get_batch_size(), 1);
458    }
459
460    #[test]
461    fn test_batch_options_transaction_per_batch() {
462        let options = BatchOptions::new().transaction_per_batch(false);
463        assert!(!options.is_transaction_per_batch());
464    }
465
466    #[test]
467    fn test_batch_options_continue_on_error() {
468        let options = BatchOptions::new().continue_on_error(true);
469        assert!(options.is_continue_on_error());
470    }
471
472    #[test]
473    fn test_batch_options_default() {
474        let options = BatchOptions::default();
475        assert_eq!(options.get_batch_size(), DEFAULT_BATCH_SIZE);
476    }
477
478    #[test]
479    fn test_batch_result_info_new() {
480        let info = BatchResultInfo::new(100, 95, 5, 10);
481        assert_eq!(info.total_processed, 100);
482        assert_eq!(info.successful, 95);
483        assert_eq!(info.failed, 5);
484        assert_eq!(info.batches, 10);
485    }
486
487    #[test]
488    fn test_batch_result_info_is_success() {
489        let info = BatchResultInfo::new(100, 100, 0, 10);
490        assert!(info.is_success());
491
492        let info = BatchResultInfo::new(100, 95, 5, 10);
493        assert!(!info.is_success());
494    }
495
496    #[test]
497    fn test_batch_result_info_success_rate() {
498        let info = BatchResultInfo::new(100, 75, 25, 10);
499        assert_eq!(info.success_rate(), 75.0);
500    }
501
502    #[test]
503    fn test_batch_result_info_success_rate_empty() {
504        let info = BatchResultInfo::new(0, 0, 0, 0);
505        assert_eq!(info.success_rate(), 100.0);
506    }
507}