server/bulk_operations/
handler.rs

1use super::deleter::BulkDeleter;
2use super::types::{
3    BatchConfig, BulkOperationContext, BulkOperationResult, BulkSendParams, MessageIdentifier,
4};
5use std::error::Error;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use tokio_util::sync::CancellationToken;
9
10/// High-level handler for bulk operations on Azure Service Bus messages.
11///
12/// The BulkOperationHandler provides a simplified interface for performing
13/// bulk operations like deleting multiple messages, sending messages to dead letter queues,
14/// and resending messages from dead letter queues. It orchestrates the underlying
15/// bulk deletion and processing logic.
16///
17/// # Features
18///
19/// - **Bulk Delete** - Efficiently delete multiple messages from queues
20/// - **Dead Letter Operations** - Move messages to/from dead letter queues
21/// - **Batch Processing** - Configurable batch sizes for optimal performance
22/// - **Error Handling** - Comprehensive error reporting with operation results
23/// - **Cancellation Support** - Graceful cancellation of long-running operations
24///
25/// # Examples
26///
27/// ```no_run
28/// use quetty_server::bulk_operations::{BulkOperationHandler, BatchConfig, MessageIdentifier};
29/// use quetty_server::consumer::Consumer;
30/// use std::sync::Arc;
31/// use tokio::sync::Mutex;
32///
33/// async fn example(consumer: Arc<Mutex<Consumer>>) -> Result<(), Box<dyn std::error::Error>> {
34///     let config = BatchConfig::default();
35///     let handler = BulkOperationHandler::new(config);
36///
37///     let message_ids = vec![
38///         MessageIdentifier::SequenceNumber(12345),
39///         MessageIdentifier::SequenceNumber(12346),
40///     ];
41///
42///     let result = handler.delete_messages(
43///         consumer,
44///         "my-queue".to_string(),
45///         message_ids,
46///         100, // max_position
47///     ).await?;
48///
49///     println!("Deleted {} messages", result.successful_count);
50///     Ok(())
51/// }
52/// ```
53pub struct BulkOperationHandler {
54    deleter: BulkDeleter,
55}
56
57impl BulkOperationHandler {
58    /// Creates a new BulkOperationHandler with the specified configuration.
59    ///
60    /// # Arguments
61    ///
62    /// * `config` - Batch configuration controlling operation behavior
63    ///
64    /// # Examples
65    ///
66    /// ```no_run
67    /// use quetty_server::bulk_operations::{BulkOperationHandler, BatchConfig};
68    ///
69    /// let config = BatchConfig {
70    ///     batch_size: 50,
71    ///     timeout: std::time::Duration::from_secs(30),
72    ///     ..Default::default()
73    /// };
74    /// let handler = BulkOperationHandler::new(config);
75    /// ```
76    pub fn new(config: BatchConfig) -> Self {
77        Self {
78            deleter: BulkDeleter::new(config),
79        }
80    }
81
82    /// Executes a bulk delete operation on the specified messages.
83    ///
84    /// This method deletes multiple messages from a Service Bus queue efficiently
85    /// by processing them in batches. It provides comprehensive error reporting
86    /// and handles partial failures gracefully.
87    ///
88    /// # Arguments
89    ///
90    /// * `consumer` - Service Bus consumer for message operations
91    /// * `queue_name` - Name of the queue containing the messages
92    /// * `targets` - List of message identifiers to delete
93    /// * `max_position` - Maximum position limit for message processing
94    ///
95    /// # Returns
96    ///
97    /// [`BulkOperationResult`] containing the count of successful and failed operations
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if:
102    /// - The consumer is unavailable or disposed
103    /// - Service Bus operations fail
104    /// - The operation is cancelled
105    /// - Invalid message identifiers are provided
106    ///
107    /// # Examples
108    ///
109    /// ```no_run
110    /// use quetty_server::bulk_operations::{BulkOperationHandler, MessageIdentifier};
111    /// use quetty_server::consumer::Consumer;
112    /// use std::sync::Arc;
113    /// use tokio::sync::Mutex;
114    ///
115    /// async fn delete_example(
116    ///     handler: &BulkOperationHandler,
117    ///     consumer: Arc<Mutex<Consumer>>
118    /// ) -> Result<(), Box<dyn std::error::Error>> {
119    ///     let messages_to_delete = vec![
120    ///         MessageIdentifier::SequenceNumber(100),
121    ///         MessageIdentifier::SequenceNumber(101),
122    ///         MessageIdentifier::SequenceNumber(102),
123    ///     ];
124    ///
125    ///     let result = handler.delete_messages(
126    ///         consumer,
127    ///         "orders-queue".to_string(),
128    ///         messages_to_delete,
129    ///         1000,
130    ///     ).await?;
131    ///
132    ///     println!("Successfully deleted: {}", result.successful_count);
133    ///     println!("Failed to delete: {}", result.failed_count);
134    ///
135    ///     Ok(())
136    /// }
137    /// ```
138    pub async fn delete_messages(
139        &self,
140        consumer: Arc<Mutex<crate::consumer::Consumer>>,
141        queue_name: String,
142        targets: Vec<MessageIdentifier>,
143        max_position: usize,
144    ) -> Result<BulkOperationResult, Box<dyn Error + Send + Sync>> {
145        let context = BulkOperationContext {
146            consumer,
147            cancel_token: CancellationToken::new(),
148            queue_name: queue_name.clone(),
149        };
150
151        // Create BulkSendParams with max position
152        let params = BulkSendParams {
153            target_queue: queue_name,
154            should_delete: true,
155            message_identifiers: targets,
156            messages_data: None,
157            max_position,
158        };
159
160        self.deleter.delete_messages(context, params).await
161    }
162}
163
164impl Default for BulkOperationHandler {
165    /// Creates a BulkOperationHandler with default configuration.
166    ///
167    /// Uses the default [`BatchConfig`] settings for batch size, timeouts,
168    /// and other operation parameters.
169    ///
170    /// # Examples
171    ///
172    /// ```no_run
173    /// use quetty_server::bulk_operations::BulkOperationHandler;
174    ///
175    /// let handler = BulkOperationHandler::default();
176    /// ```
177    fn default() -> Self {
178        Self::new(BatchConfig::default())
179    }
180}