server/bulk_operations/handler.rs
1use super::deleter::BulkDeleter;
2use super::types::{
3 BatchConfig, BulkOperationContext, BulkOperationResult, BulkSendParams, MessageIdentifier,
4};
5use std::error::Error;
6use std::sync::Arc;
7use tokio::sync::Mutex;
8use tokio_util::sync::CancellationToken;
9
10/// High-level handler for bulk operations on Azure Service Bus messages.
11///
12/// The BulkOperationHandler provides a simplified interface for performing
13/// bulk operations like deleting multiple messages, sending messages to dead letter queues,
14/// and resending messages from dead letter queues. It orchestrates the underlying
15/// bulk deletion and processing logic.
16///
17/// # Features
18///
19/// - **Bulk Delete** - Efficiently delete multiple messages from queues
20/// - **Dead Letter Operations** - Move messages to/from dead letter queues
21/// - **Batch Processing** - Configurable batch sizes for optimal performance
22/// - **Error Handling** - Comprehensive error reporting with operation results
23/// - **Cancellation Support** - Graceful cancellation of long-running operations
24///
25/// # Examples
26///
27/// ```no_run
28/// use quetty_server::bulk_operations::{BulkOperationHandler, BatchConfig, MessageIdentifier};
29/// use quetty_server::consumer::Consumer;
30/// use std::sync::Arc;
31/// use tokio::sync::Mutex;
32///
33/// async fn example(consumer: Arc<Mutex<Consumer>>) -> Result<(), Box<dyn std::error::Error>> {
34/// let config = BatchConfig::default();
35/// let handler = BulkOperationHandler::new(config);
36///
37/// let message_ids = vec![
38/// MessageIdentifier::SequenceNumber(12345),
39/// MessageIdentifier::SequenceNumber(12346),
40/// ];
41///
42/// let result = handler.delete_messages(
43/// consumer,
44/// "my-queue".to_string(),
45/// message_ids,
46/// 100, // max_position
47/// ).await?;
48///
49/// println!("Deleted {} messages", result.successful_count);
50/// Ok(())
51/// }
52/// ```
53pub struct BulkOperationHandler {
54 deleter: BulkDeleter,
55}
56
57impl BulkOperationHandler {
58 /// Creates a new BulkOperationHandler with the specified configuration.
59 ///
60 /// # Arguments
61 ///
62 /// * `config` - Batch configuration controlling operation behavior
63 ///
64 /// # Examples
65 ///
66 /// ```no_run
67 /// use quetty_server::bulk_operations::{BulkOperationHandler, BatchConfig};
68 ///
69 /// let config = BatchConfig {
70 /// batch_size: 50,
71 /// timeout: std::time::Duration::from_secs(30),
72 /// ..Default::default()
73 /// };
74 /// let handler = BulkOperationHandler::new(config);
75 /// ```
76 pub fn new(config: BatchConfig) -> Self {
77 Self {
78 deleter: BulkDeleter::new(config),
79 }
80 }
81
82 /// Executes a bulk delete operation on the specified messages.
83 ///
84 /// This method deletes multiple messages from a Service Bus queue efficiently
85 /// by processing them in batches. It provides comprehensive error reporting
86 /// and handles partial failures gracefully.
87 ///
88 /// # Arguments
89 ///
90 /// * `consumer` - Service Bus consumer for message operations
91 /// * `queue_name` - Name of the queue containing the messages
92 /// * `targets` - List of message identifiers to delete
93 /// * `max_position` - Maximum position limit for message processing
94 ///
95 /// # Returns
96 ///
97 /// [`BulkOperationResult`] containing the count of successful and failed operations
98 ///
99 /// # Errors
100 ///
101 /// Returns an error if:
102 /// - The consumer is unavailable or disposed
103 /// - Service Bus operations fail
104 /// - The operation is cancelled
105 /// - Invalid message identifiers are provided
106 ///
107 /// # Examples
108 ///
109 /// ```no_run
110 /// use quetty_server::bulk_operations::{BulkOperationHandler, MessageIdentifier};
111 /// use quetty_server::consumer::Consumer;
112 /// use std::sync::Arc;
113 /// use tokio::sync::Mutex;
114 ///
115 /// async fn delete_example(
116 /// handler: &BulkOperationHandler,
117 /// consumer: Arc<Mutex<Consumer>>
118 /// ) -> Result<(), Box<dyn std::error::Error>> {
119 /// let messages_to_delete = vec![
120 /// MessageIdentifier::SequenceNumber(100),
121 /// MessageIdentifier::SequenceNumber(101),
122 /// MessageIdentifier::SequenceNumber(102),
123 /// ];
124 ///
125 /// let result = handler.delete_messages(
126 /// consumer,
127 /// "orders-queue".to_string(),
128 /// messages_to_delete,
129 /// 1000,
130 /// ).await?;
131 ///
132 /// println!("Successfully deleted: {}", result.successful_count);
133 /// println!("Failed to delete: {}", result.failed_count);
134 ///
135 /// Ok(())
136 /// }
137 /// ```
138 pub async fn delete_messages(
139 &self,
140 consumer: Arc<Mutex<crate::consumer::Consumer>>,
141 queue_name: String,
142 targets: Vec<MessageIdentifier>,
143 max_position: usize,
144 ) -> Result<BulkOperationResult, Box<dyn Error + Send + Sync>> {
145 let context = BulkOperationContext {
146 consumer,
147 cancel_token: CancellationToken::new(),
148 queue_name: queue_name.clone(),
149 };
150
151 // Create BulkSendParams with max position
152 let params = BulkSendParams {
153 target_queue: queue_name,
154 should_delete: true,
155 message_identifiers: targets,
156 messages_data: None,
157 max_position,
158 };
159
160 self.deleter.delete_messages(context, params).await
161 }
162}
163
164impl Default for BulkOperationHandler {
165 /// Creates a BulkOperationHandler with default configuration.
166 ///
167 /// Uses the default [`BatchConfig`] settings for batch size, timeouts,
168 /// and other operation parameters.
169 ///
170 /// # Examples
171 ///
172 /// ```no_run
173 /// use quetty_server::bulk_operations::BulkOperationHandler;
174 ///
175 /// let handler = BulkOperationHandler::default();
176 /// ```
177 fn default() -> Self {
178 Self::new(BatchConfig::default())
179 }
180}