server/bulk_operations/types.rs
1//! Types and data structures for bulk operations.
2//!
3//! This module defines the core types used throughout the bulk operations system,
4//! including result tracking, message identification, configuration, and operation contexts.
5
6use crate::consumer::Consumer;
7use azservicebus::ServiceBusClient;
8use azservicebus::core::BasicRetryPolicy;
9use serde::Deserialize;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tokio_util::sync::CancellationToken;
14
15/// Result of a bulk operation with detailed statistics and error tracking.
16///
17/// Provides comprehensive information about the outcome of bulk operations,
18/// including success counts, failure details, and lists of processed messages.
19/// Used to report operation results to callers and for UI feedback.
20///
21/// # Examples
22///
23/// ```no_run
24/// use quetty_server::bulk_operations::BulkOperationResult;
25///
26/// let mut result = BulkOperationResult::new(100);
27/// result.add_success();
28/// result.add_failure("Connection timeout".to_string());
29///
30/// if result.is_complete_success() {
31/// println!("All operations completed successfully");
32/// } else {
33/// println!("Partial success: {} of {} succeeded",
34/// result.successful, result.total_requested);
35/// }
36/// ```
37#[derive(Debug, Clone)]
38pub struct BulkOperationResult {
39 /// Total number of operations requested
40 pub total_requested: usize,
41 /// Number of operations that completed successfully
42 pub successful: usize,
43 /// Number of operations that failed
44 pub failed: usize,
45 /// Number of target items that were not found
46 pub not_found: usize,
47 /// Detailed error messages for failed operations
48 pub error_details: Vec<String>,
49 /// Identifiers of messages that were processed successfully
50 pub successful_message_ids: Vec<MessageIdentifier>,
51}
52
53impl BulkOperationResult {
54 /// Creates a new BulkOperationResult for the specified number of operations.
55 ///
56 /// # Arguments
57 ///
58 /// * `total_requested` - The total number of operations that will be attempted
59 ///
60 /// # Returns
61 ///
62 /// A new result tracker with zero counts and empty collections
63 pub fn new(total_requested: usize) -> Self {
64 Self {
65 total_requested,
66 successful: 0,
67 failed: 0,
68 not_found: 0,
69 error_details: Vec::new(),
70 successful_message_ids: Vec::new(),
71 }
72 }
73
74 pub fn add_success(&mut self) {
75 self.successful += 1;
76 }
77
78 pub fn add_failure(&mut self, error: String) {
79 self.failed += 1;
80 self.error_details.push(error);
81 }
82
83 pub fn add_successful_message(&mut self, message_id: MessageIdentifier) {
84 self.successful += 1;
85 self.successful_message_ids.push(message_id.clone());
86 log::debug!(
87 "SUCCESS COUNT: Incremented to {} (added message: {})",
88 self.successful,
89 message_id.id
90 );
91 }
92
93 pub fn add_not_found(&mut self) {
94 self.not_found += 1;
95 }
96
97 /// Checks if all requested operations completed successfully.
98 ///
99 /// # Returns
100 ///
101 /// `true` if all operations succeeded with no failures or missing items
102 pub fn is_complete_success(&self) -> bool {
103 self.successful == self.total_requested && self.failed == 0 && self.not_found == 0
104 }
105}
106
107/// Identifier for targeting specific messages in bulk operations.
108///
109/// Combines message ID and sequence number for precise message targeting.
110/// The sequence number is used for optimization during bulk scanning operations
111/// to minimize the number of messages that need to be processed.
112///
113/// # Examples
114///
115/// ```no_run
116/// use quetty_server::bulk_operations::MessageIdentifier;
117///
118/// let msg_id = MessageIdentifier::new("msg-123".to_string(), 4567);
119/// println!("Message: {} at sequence {}", msg_id.id, msg_id.sequence);
120///
121/// let composite = msg_id.composite_key();
122/// println!("Composite key: {}", composite);
123/// ```
124#[derive(Debug, Clone, PartialEq, Eq, Hash)]
125pub struct MessageIdentifier {
126 /// The unique message identifier
127 pub id: String,
128 /// The message sequence number for optimization
129 pub sequence: i64,
130}
131
132impl std::fmt::Display for MessageIdentifier {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 write!(f, "{}", self.id)
135 }
136}
137
138impl MessageIdentifier {
139 /// Creates a new MessageIdentifier with the specified ID and sequence.
140 ///
141 /// # Arguments
142 ///
143 /// * `id` - The unique message identifier
144 /// * `sequence` - The message sequence number
145 ///
146 /// # Returns
147 ///
148 /// A new MessageIdentifier instance
149 pub fn new(id: String, sequence: i64) -> Self {
150 Self { id, sequence }
151 }
152
153 pub fn from_message(message: &crate::model::MessageModel) -> Self {
154 Self {
155 id: message.id.clone(),
156 sequence: message.sequence,
157 }
158 }
159
160 pub fn from_string(id: String) -> Self {
161 Self { id, sequence: 0 }
162 }
163
164 /// Creates a composite key for exact matching.
165 ///
166 /// Combines the message ID and sequence number into a single string
167 /// that can be used for precise identification in hash maps or other
168 /// data structures that require string keys.
169 ///
170 /// # Returns
171 ///
172 /// A string in the format "id:sequence"
173 pub fn composite_key(&self) -> String {
174 format!("{}:{}", self.id, self.sequence)
175 }
176}
177
178impl From<String> for MessageIdentifier {
179 fn from(id: String) -> Self {
180 Self::from_string(id)
181 }
182}
183
184impl From<&str> for MessageIdentifier {
185 fn from(id: &str) -> Self {
186 Self::from_string(id.to_string())
187 }
188}
189
190impl From<MessageIdentifier> for String {
191 fn from(val: MessageIdentifier) -> Self {
192 val.id
193 }
194}
195
196impl PartialEq<String> for MessageIdentifier {
197 fn eq(&self, other: &String) -> bool {
198 &self.id == other
199 }
200}
201
202impl PartialEq<MessageIdentifier> for String {
203 fn eq(&self, other: &MessageIdentifier) -> bool {
204 self == &other.id
205 }
206}
207
208/// Configuration for bulk operation batching and limits.
209///
210/// Controls various aspects of bulk operations including batch sizes,
211/// timeouts, processing limits, and UI behavior. Provides sensible defaults
212/// for all configuration values.
213///
214/// # Examples
215///
216/// ```no_run
217/// use quetty_server::bulk_operations::BatchConfig;
218///
219/// // Use default configuration
220/// let config = BatchConfig::default();
221///
222/// // Create custom configuration
223/// let config = BatchConfig::new(100, 600);
224///
225/// // Access configuration values
226/// println!("Max batch size: {}", config.max_batch_size());
227/// println!("Timeout: {}s", config.operation_timeout_secs());
228/// ```
229#[derive(Debug, Deserialize, Default, Clone)]
230pub struct BatchConfig {
231 /// Maximum batch size for bulk operations (default: 200)
232 max_batch_size: Option<u32>,
233 /// Timeout for bulk operations in seconds (default: 300)
234 operation_timeout_secs: Option<u64>,
235 /// Chunk size for bulk processing operations (default: 200, same as max_batch_size)
236 bulk_chunk_size: Option<usize>,
237 /// Processing time limit for bulk operations in seconds (default: 30)
238 bulk_processing_time_secs: Option<u64>,
239 /// Timeout for lock operations in seconds (default: 10)
240 lock_timeout_secs: Option<u64>,
241 /// Maximum messages to process in bulk operations (default: 10,000)
242 max_messages_to_process: Option<usize>,
243 /// Auto-reload threshold for UI refresh after bulk operations (default: 50)
244 auto_reload_threshold: Option<usize>,
245 /// Timeout for individual receive message operations in seconds (default: 5)
246 receive_timeout_secs: Option<u64>,
247}
248
249impl BatchConfig {
250 /// Creates a new BatchConfig with specified batch size and timeout.
251 ///
252 /// Other configuration values will use their defaults when accessed.
253 ///
254 /// # Arguments
255 ///
256 /// * `max_batch_size` - Maximum number of messages per batch
257 /// * `operation_timeout_secs` - Timeout for bulk operations in seconds
258 ///
259 /// # Returns
260 ///
261 /// A new BatchConfig with the specified values
262 pub fn new(max_batch_size: u32, operation_timeout_secs: u64) -> Self {
263 Self {
264 max_batch_size: Some(max_batch_size),
265 operation_timeout_secs: Some(operation_timeout_secs),
266 bulk_chunk_size: None,
267 bulk_processing_time_secs: None,
268 lock_timeout_secs: None,
269 max_messages_to_process: None,
270 auto_reload_threshold: None,
271 receive_timeout_secs: None,
272 }
273 }
274
275 /// Get the maximum batch size for bulk operations
276 pub fn max_batch_size(&self) -> u32 {
277 self.max_batch_size.unwrap_or(500)
278 }
279
280 /// Get the timeout for bulk operations
281 pub fn operation_timeout_secs(&self) -> u64 {
282 self.operation_timeout_secs.unwrap_or(600)
283 }
284
285 /// Get the chunk size for bulk processing operations
286 pub fn bulk_chunk_size(&self) -> usize {
287 self.bulk_chunk_size.unwrap_or(500)
288 }
289
290 /// Get the processing time limit for bulk operations in seconds
291 pub fn bulk_processing_time_secs(&self) -> u64 {
292 self.bulk_processing_time_secs.unwrap_or(300)
293 }
294
295 /// Get the timeout for lock operations in seconds
296 pub fn lock_timeout_secs(&self) -> u64 {
297 self.lock_timeout_secs.unwrap_or(10)
298 }
299
300 /// Get the maximum messages to process in bulk operations
301 pub fn max_messages_to_process(&self) -> usize {
302 self.max_messages_to_process.unwrap_or(10_000)
303 }
304
305 /// Get the threshold for triggering auto-reload after bulk operations
306 pub fn auto_reload_threshold(&self) -> usize {
307 self.auto_reload_threshold.unwrap_or(50)
308 }
309
310 /// Get the timeout for individual receive message operations in seconds
311 pub fn receive_timeout_secs(&self) -> u64 {
312 self.receive_timeout_secs.unwrap_or(5)
313 }
314}
315
316/// Context for Service Bus operations containing shared resources
317#[derive(Debug, Clone)]
318pub struct ServiceBusOperationContext {
319 pub consumer: Arc<Mutex<Consumer>>,
320 pub service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
321 pub main_queue_name: String,
322 pub cancel_token: CancellationToken,
323}
324
325impl ServiceBusOperationContext {
326 /// Create a new ServiceBusOperationContext
327 pub fn new(
328 consumer: Arc<Mutex<Consumer>>,
329 service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
330 main_queue_name: String,
331 ) -> Self {
332 Self {
333 consumer,
334 service_bus_client,
335 main_queue_name,
336 cancel_token: CancellationToken::new(),
337 }
338 }
339}
340
341/// Parameters for bulk send operations
342#[derive(Debug, Clone)]
343pub struct BulkSendParams {
344 pub target_queue: String,
345 pub should_delete: bool,
346 pub message_identifiers: Vec<MessageIdentifier>,
347 pub messages_data: Option<Vec<(MessageIdentifier, Vec<u8>)>>, // For peek-based operations
348 pub max_position: usize, // For dynamic processing limits
349}
350
351impl BulkSendParams {
352 /// Create parameters for operations that retrieve messages from the queue
353 pub fn with_retrieval(
354 target_queue: String,
355 should_delete: bool,
356 message_identifiers: Vec<MessageIdentifier>,
357 max_position: usize,
358 ) -> Self {
359 Self {
360 target_queue,
361 should_delete,
362 message_identifiers,
363 messages_data: None,
364 max_position,
365 }
366 }
367
368 /// Create parameters for operations with pre-fetched message data
369 pub fn with_message_data(
370 target_queue: String,
371 should_delete: bool,
372 messages_data: Vec<(MessageIdentifier, Vec<u8>)>,
373 max_position: usize,
374 ) -> Self {
375 // Extract identifiers from the data
376 let message_identifiers = messages_data.iter().map(|(id, _)| id.clone()).collect();
377
378 Self {
379 target_queue,
380 should_delete,
381 message_identifiers,
382 messages_data: Some(messages_data),
383 max_position,
384 }
385 }
386
387 /// Create parameters with max position for better processing limits
388 pub fn with_max_position(
389 target_queue: String,
390 should_delete: bool,
391 message_identifiers: Vec<MessageIdentifier>,
392 max_position: usize,
393 ) -> Self {
394 Self {
395 target_queue,
396 should_delete,
397 message_identifiers,
398 messages_data: None,
399 max_position,
400 }
401 }
402}
403
404/// Queue operation type determination
405#[derive(Debug, Clone)]
406pub enum QueueOperationType {
407 /// Send to regular queue (copy message content)
408 SendToQueue,
409 /// Send to dead letter queue (use dead_letter_message operation)
410 SendToDLQ,
411}
412
413impl QueueOperationType {
414 /// Determine operation type based on target queue name
415 pub fn from_queue_name(queue_name: &str) -> Self {
416 if queue_name.ends_with("/$deadletterqueue") {
417 Self::SendToDLQ
418 } else {
419 Self::SendToQueue
420 }
421 }
422}
423
424/// Bulk operation context containing shared resources
425#[derive(Debug, Clone)]
426pub struct BulkOperationContext {
427 pub consumer: Arc<Mutex<crate::consumer::Consumer>>,
428 pub cancel_token: CancellationToken,
429 /// Name of the queue this operation is targeting (used for deferred message persistence)
430 pub queue_name: String,
431}
432
433/// Parameters for process_target_messages method
434pub struct ProcessTargetMessagesParams<'a> {
435 pub messages: Vec<azservicebus::ServiceBusReceivedMessage>,
436 pub context: &'a BulkOperationContext,
437 pub params: &'a BulkSendParams,
438 pub target_map: &'a HashMap<String, MessageIdentifier>,
439 pub result: &'a mut BulkOperationResult,
440}
441
442impl<'a> ProcessTargetMessagesParams<'a> {
443 pub fn new(
444 messages: Vec<azservicebus::ServiceBusReceivedMessage>,
445 context: &'a BulkOperationContext,
446 params: &'a BulkSendParams,
447 target_map: &'a HashMap<String, MessageIdentifier>,
448 result: &'a mut BulkOperationResult,
449 ) -> Self {
450 Self {
451 messages,
452 context,
453 params,
454 target_map,
455 result,
456 }
457 }
458}