server/bulk_operations/
types.rs

1//! Types and data structures for bulk operations.
2//!
3//! This module defines the core types used throughout the bulk operations system,
4//! including result tracking, message identification, configuration, and operation contexts.
5
6use crate::consumer::Consumer;
7use azservicebus::ServiceBusClient;
8use azservicebus::core::BasicRetryPolicy;
9use serde::Deserialize;
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tokio_util::sync::CancellationToken;
14
15/// Result of a bulk operation with detailed statistics and error tracking.
16///
17/// Provides comprehensive information about the outcome of bulk operations,
18/// including success counts, failure details, and lists of processed messages.
19/// Used to report operation results to callers and for UI feedback.
20///
21/// # Examples
22///
23/// ```no_run
24/// use quetty_server::bulk_operations::BulkOperationResult;
25///
26/// let mut result = BulkOperationResult::new(100);
27/// result.add_success();
28/// result.add_failure("Connection timeout".to_string());
29///
30/// if result.is_complete_success() {
31///     println!("All operations completed successfully");
32/// } else {
33///     println!("Partial success: {} of {} succeeded",
34///              result.successful, result.total_requested);
35/// }
36/// ```
37#[derive(Debug, Clone)]
38pub struct BulkOperationResult {
39    /// Total number of operations requested
40    pub total_requested: usize,
41    /// Number of operations that completed successfully
42    pub successful: usize,
43    /// Number of operations that failed
44    pub failed: usize,
45    /// Number of target items that were not found
46    pub not_found: usize,
47    /// Detailed error messages for failed operations
48    pub error_details: Vec<String>,
49    /// Identifiers of messages that were processed successfully
50    pub successful_message_ids: Vec<MessageIdentifier>,
51}
52
53impl BulkOperationResult {
54    /// Creates a new BulkOperationResult for the specified number of operations.
55    ///
56    /// # Arguments
57    ///
58    /// * `total_requested` - The total number of operations that will be attempted
59    ///
60    /// # Returns
61    ///
62    /// A new result tracker with zero counts and empty collections
63    pub fn new(total_requested: usize) -> Self {
64        Self {
65            total_requested,
66            successful: 0,
67            failed: 0,
68            not_found: 0,
69            error_details: Vec::new(),
70            successful_message_ids: Vec::new(),
71        }
72    }
73
74    pub fn add_success(&mut self) {
75        self.successful += 1;
76    }
77
78    pub fn add_failure(&mut self, error: String) {
79        self.failed += 1;
80        self.error_details.push(error);
81    }
82
83    pub fn add_successful_message(&mut self, message_id: MessageIdentifier) {
84        self.successful += 1;
85        self.successful_message_ids.push(message_id.clone());
86        log::debug!(
87            "SUCCESS COUNT: Incremented to {} (added message: {})",
88            self.successful,
89            message_id.id
90        );
91    }
92
93    pub fn add_not_found(&mut self) {
94        self.not_found += 1;
95    }
96
97    /// Checks if all requested operations completed successfully.
98    ///
99    /// # Returns
100    ///
101    /// `true` if all operations succeeded with no failures or missing items
102    pub fn is_complete_success(&self) -> bool {
103        self.successful == self.total_requested && self.failed == 0 && self.not_found == 0
104    }
105}
106
107/// Identifier for targeting specific messages in bulk operations.
108///
109/// Combines message ID and sequence number for precise message targeting.
110/// The sequence number is used for optimization during bulk scanning operations
111/// to minimize the number of messages that need to be processed.
112///
113/// # Examples
114///
115/// ```no_run
116/// use quetty_server::bulk_operations::MessageIdentifier;
117///
118/// let msg_id = MessageIdentifier::new("msg-123".to_string(), 4567);
119/// println!("Message: {} at sequence {}", msg_id.id, msg_id.sequence);
120///
121/// let composite = msg_id.composite_key();
122/// println!("Composite key: {}", composite);
123/// ```
124#[derive(Debug, Clone, PartialEq, Eq, Hash)]
125pub struct MessageIdentifier {
126    /// The unique message identifier
127    pub id: String,
128    /// The message sequence number for optimization
129    pub sequence: i64,
130}
131
132impl std::fmt::Display for MessageIdentifier {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        write!(f, "{}", self.id)
135    }
136}
137
138impl MessageIdentifier {
139    /// Creates a new MessageIdentifier with the specified ID and sequence.
140    ///
141    /// # Arguments
142    ///
143    /// * `id` - The unique message identifier
144    /// * `sequence` - The message sequence number
145    ///
146    /// # Returns
147    ///
148    /// A new MessageIdentifier instance
149    pub fn new(id: String, sequence: i64) -> Self {
150        Self { id, sequence }
151    }
152
153    pub fn from_message(message: &crate::model::MessageModel) -> Self {
154        Self {
155            id: message.id.clone(),
156            sequence: message.sequence,
157        }
158    }
159
160    pub fn from_string(id: String) -> Self {
161        Self { id, sequence: 0 }
162    }
163
164    /// Creates a composite key for exact matching.
165    ///
166    /// Combines the message ID and sequence number into a single string
167    /// that can be used for precise identification in hash maps or other
168    /// data structures that require string keys.
169    ///
170    /// # Returns
171    ///
172    /// A string in the format "id:sequence"
173    pub fn composite_key(&self) -> String {
174        format!("{}:{}", self.id, self.sequence)
175    }
176}
177
178impl From<String> for MessageIdentifier {
179    fn from(id: String) -> Self {
180        Self::from_string(id)
181    }
182}
183
184impl From<&str> for MessageIdentifier {
185    fn from(id: &str) -> Self {
186        Self::from_string(id.to_string())
187    }
188}
189
190impl From<MessageIdentifier> for String {
191    fn from(val: MessageIdentifier) -> Self {
192        val.id
193    }
194}
195
196impl PartialEq<String> for MessageIdentifier {
197    fn eq(&self, other: &String) -> bool {
198        &self.id == other
199    }
200}
201
202impl PartialEq<MessageIdentifier> for String {
203    fn eq(&self, other: &MessageIdentifier) -> bool {
204        self == &other.id
205    }
206}
207
208/// Configuration for bulk operation batching and limits.
209///
210/// Controls various aspects of bulk operations including batch sizes,
211/// timeouts, processing limits, and UI behavior. Provides sensible defaults
212/// for all configuration values.
213///
214/// # Examples
215///
216/// ```no_run
217/// use quetty_server::bulk_operations::BatchConfig;
218///
219/// // Use default configuration
220/// let config = BatchConfig::default();
221///
222/// // Create custom configuration
223/// let config = BatchConfig::new(100, 600);
224///
225/// // Access configuration values
226/// println!("Max batch size: {}", config.max_batch_size());
227/// println!("Timeout: {}s", config.operation_timeout_secs());
228/// ```
229#[derive(Debug, Deserialize, Default, Clone)]
230pub struct BatchConfig {
231    /// Maximum batch size for bulk operations (default: 200)
232    max_batch_size: Option<u32>,
233    /// Timeout for bulk operations in seconds (default: 300)
234    operation_timeout_secs: Option<u64>,
235    /// Chunk size for bulk processing operations (default: 200, same as max_batch_size)
236    bulk_chunk_size: Option<usize>,
237    /// Processing time limit for bulk operations in seconds (default: 30)
238    bulk_processing_time_secs: Option<u64>,
239    /// Timeout for lock operations in seconds (default: 10)
240    lock_timeout_secs: Option<u64>,
241    /// Maximum messages to process in bulk operations (default: 10,000)
242    max_messages_to_process: Option<usize>,
243    /// Auto-reload threshold for UI refresh after bulk operations (default: 50)
244    auto_reload_threshold: Option<usize>,
245    /// Timeout for individual receive message operations in seconds (default: 5)
246    receive_timeout_secs: Option<u64>,
247}
248
249impl BatchConfig {
250    /// Creates a new BatchConfig with specified batch size and timeout.
251    ///
252    /// Other configuration values will use their defaults when accessed.
253    ///
254    /// # Arguments
255    ///
256    /// * `max_batch_size` - Maximum number of messages per batch
257    /// * `operation_timeout_secs` - Timeout for bulk operations in seconds
258    ///
259    /// # Returns
260    ///
261    /// A new BatchConfig with the specified values
262    pub fn new(max_batch_size: u32, operation_timeout_secs: u64) -> Self {
263        Self {
264            max_batch_size: Some(max_batch_size),
265            operation_timeout_secs: Some(operation_timeout_secs),
266            bulk_chunk_size: None,
267            bulk_processing_time_secs: None,
268            lock_timeout_secs: None,
269            max_messages_to_process: None,
270            auto_reload_threshold: None,
271            receive_timeout_secs: None,
272        }
273    }
274
275    /// Get the maximum batch size for bulk operations
276    pub fn max_batch_size(&self) -> u32 {
277        self.max_batch_size.unwrap_or(500)
278    }
279
280    /// Get the timeout for bulk operations
281    pub fn operation_timeout_secs(&self) -> u64 {
282        self.operation_timeout_secs.unwrap_or(600)
283    }
284
285    /// Get the chunk size for bulk processing operations
286    pub fn bulk_chunk_size(&self) -> usize {
287        self.bulk_chunk_size.unwrap_or(500)
288    }
289
290    /// Get the processing time limit for bulk operations in seconds
291    pub fn bulk_processing_time_secs(&self) -> u64 {
292        self.bulk_processing_time_secs.unwrap_or(300)
293    }
294
295    /// Get the timeout for lock operations in seconds
296    pub fn lock_timeout_secs(&self) -> u64 {
297        self.lock_timeout_secs.unwrap_or(10)
298    }
299
300    /// Get the maximum messages to process in bulk operations
301    pub fn max_messages_to_process(&self) -> usize {
302        self.max_messages_to_process.unwrap_or(10_000)
303    }
304
305    /// Get the threshold for triggering auto-reload after bulk operations
306    pub fn auto_reload_threshold(&self) -> usize {
307        self.auto_reload_threshold.unwrap_or(50)
308    }
309
310    /// Get the timeout for individual receive message operations in seconds
311    pub fn receive_timeout_secs(&self) -> u64 {
312        self.receive_timeout_secs.unwrap_or(5)
313    }
314}
315
316/// Context for Service Bus operations containing shared resources
317#[derive(Debug, Clone)]
318pub struct ServiceBusOperationContext {
319    pub consumer: Arc<Mutex<Consumer>>,
320    pub service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
321    pub main_queue_name: String,
322    pub cancel_token: CancellationToken,
323}
324
325impl ServiceBusOperationContext {
326    /// Create a new ServiceBusOperationContext
327    pub fn new(
328        consumer: Arc<Mutex<Consumer>>,
329        service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
330        main_queue_name: String,
331    ) -> Self {
332        Self {
333            consumer,
334            service_bus_client,
335            main_queue_name,
336            cancel_token: CancellationToken::new(),
337        }
338    }
339}
340
341/// Parameters for bulk send operations
342#[derive(Debug, Clone)]
343pub struct BulkSendParams {
344    pub target_queue: String,
345    pub should_delete: bool,
346    pub message_identifiers: Vec<MessageIdentifier>,
347    pub messages_data: Option<Vec<(MessageIdentifier, Vec<u8>)>>, // For peek-based operations
348    pub max_position: usize,                                      // For dynamic processing limits
349}
350
351impl BulkSendParams {
352    /// Create parameters for operations that retrieve messages from the queue
353    pub fn with_retrieval(
354        target_queue: String,
355        should_delete: bool,
356        message_identifiers: Vec<MessageIdentifier>,
357        max_position: usize,
358    ) -> Self {
359        Self {
360            target_queue,
361            should_delete,
362            message_identifiers,
363            messages_data: None,
364            max_position,
365        }
366    }
367
368    /// Create parameters for operations with pre-fetched message data
369    pub fn with_message_data(
370        target_queue: String,
371        should_delete: bool,
372        messages_data: Vec<(MessageIdentifier, Vec<u8>)>,
373        max_position: usize,
374    ) -> Self {
375        // Extract identifiers from the data
376        let message_identifiers = messages_data.iter().map(|(id, _)| id.clone()).collect();
377
378        Self {
379            target_queue,
380            should_delete,
381            message_identifiers,
382            messages_data: Some(messages_data),
383            max_position,
384        }
385    }
386
387    /// Create parameters with max position for better processing limits
388    pub fn with_max_position(
389        target_queue: String,
390        should_delete: bool,
391        message_identifiers: Vec<MessageIdentifier>,
392        max_position: usize,
393    ) -> Self {
394        Self {
395            target_queue,
396            should_delete,
397            message_identifiers,
398            messages_data: None,
399            max_position,
400        }
401    }
402}
403
404/// Queue operation type determination
405#[derive(Debug, Clone)]
406pub enum QueueOperationType {
407    /// Send to regular queue (copy message content)
408    SendToQueue,
409    /// Send to dead letter queue (use dead_letter_message operation)
410    SendToDLQ,
411}
412
413impl QueueOperationType {
414    /// Determine operation type based on target queue name
415    pub fn from_queue_name(queue_name: &str) -> Self {
416        if queue_name.ends_with("/$deadletterqueue") {
417            Self::SendToDLQ
418        } else {
419            Self::SendToQueue
420        }
421    }
422}
423
424/// Bulk operation context containing shared resources
425#[derive(Debug, Clone)]
426pub struct BulkOperationContext {
427    pub consumer: Arc<Mutex<crate::consumer::Consumer>>,
428    pub cancel_token: CancellationToken,
429    /// Name of the queue this operation is targeting (used for deferred message persistence)
430    pub queue_name: String,
431}
432
433/// Parameters for process_target_messages method
434pub struct ProcessTargetMessagesParams<'a> {
435    pub messages: Vec<azservicebus::ServiceBusReceivedMessage>,
436    pub context: &'a BulkOperationContext,
437    pub params: &'a BulkSendParams,
438    pub target_map: &'a HashMap<String, MessageIdentifier>,
439    pub result: &'a mut BulkOperationResult,
440}
441
442impl<'a> ProcessTargetMessagesParams<'a> {
443    pub fn new(
444        messages: Vec<azservicebus::ServiceBusReceivedMessage>,
445        context: &'a BulkOperationContext,
446        params: &'a BulkSendParams,
447        target_map: &'a HashMap<String, MessageIdentifier>,
448        result: &'a mut BulkOperationResult,
449    ) -> Self {
450        Self {
451            messages,
452            context,
453            params,
454            target_map,
455            result,
456        }
457    }
458}