server/service_bus_manager/
command_handlers.rs

1use super::consumer_manager::ConsumerManager;
2use super::producer_manager::ProducerManager;
3use super::queue_statistics_service::QueueStatisticsService;
4use super::types::{QueueInfo, QueueType};
5
6use crate::bulk_operations::BulkOperationResult;
7use crate::bulk_operations::{BulkOperationHandler, MessageIdentifier, types::BatchConfig};
8use crate::consumer::Consumer;
9use crate::service_bus_manager::{
10    errors::ServiceBusError, responses::ServiceBusResponse, types::MessageData,
11};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::Mutex;
16
17/// Parameters for target message processing in bulk send
18#[derive(Debug)]
19struct TargetMessageParams<'a> {
20    consumer: &'a mut Consumer,
21    msg: &'a azservicebus::ServiceBusReceivedMessage,
22    is_dlq_operation: bool,
23    should_delete_source: bool,
24    message_bytes: &'a mut Vec<Vec<u8>>,
25    successful_count: &'a mut usize,
26    failed_count: &'a mut usize,
27}
28
29/// Parameters for bulk send result finalization
30#[derive(Debug)]
31struct BulkSendResultParams {
32    message_ids: Vec<MessageIdentifier>,
33    successful_count: usize,
34    failed_count: usize,
35}
36
37/// Result type for service bus operations
38type ServiceBusResult<T> = Result<T, ServiceBusError>;
39
40// Error message constants
41const ERROR_INDIVIDUAL_MSG_OPERATIONS: &str =
42    "Individual message operations by ID require message to be received first";
43const ERROR_BULK_OPERATIONS: &str = "Bulk operations require message to be received first";
44
45/// Handles queue-related commands including queue switching and statistics.
46///
47/// Provides functionality for managing queue connections, retrieving queue
48/// information, and gathering comprehensive statistics from Azure Service Bus.
49///
50/// # Examples
51///
52/// ```no_run
53/// use quetty_server::service_bus_manager::command_handlers::QueueCommandHandler;
54///
55/// let handler = QueueCommandHandler::new(consumer_manager, statistics_service);
56///
57/// // Switch to a different queue
58/// let response = handler.handle_switch_queue(
59///     "orders".to_string(),
60///     QueueType::Main
61/// ).await?;
62///
63/// // Get current queue information
64/// let response = handler.handle_get_current_queue().await?;
65/// ```
66pub struct QueueCommandHandler {
67    consumer_manager: Arc<Mutex<ConsumerManager>>,
68    statistics_service: Arc<QueueStatisticsService>,
69}
70
71impl QueueCommandHandler {
72    pub fn new(
73        consumer_manager: Arc<Mutex<ConsumerManager>>,
74        statistics_service: Arc<QueueStatisticsService>,
75    ) -> Self {
76        Self {
77            consumer_manager,
78            statistics_service,
79        }
80    }
81
82    pub async fn handle_switch_queue(
83        &self,
84        queue_name: String,
85        queue_type: QueueType,
86    ) -> ServiceBusResult<ServiceBusResponse> {
87        let queue_info = QueueInfo::new(queue_name, queue_type);
88        let mut manager = self.consumer_manager.lock().await;
89        manager.switch_queue(queue_info.clone()).await?;
90        Ok(ServiceBusResponse::QueueSwitched { queue_info })
91    }
92
93    pub async fn handle_get_current_queue(&self) -> ServiceBusResult<ServiceBusResponse> {
94        let manager = self.consumer_manager.lock().await;
95        let queue_info = manager.current_queue().cloned();
96        Ok(ServiceBusResponse::CurrentQueue { queue_info })
97    }
98
99    pub async fn handle_get_queue_statistics(
100        &self,
101        queue_name: String,
102        queue_type: QueueType,
103    ) -> ServiceBusResult<ServiceBusResponse> {
104        log::debug!("Getting real statistics for queue: {queue_name} (type: {queue_type:?})");
105
106        let retrieved_at = chrono::Utc::now();
107
108        // Get both active and dead letter counts from Azure Management API
109        let (active_count, dlq_count) = self
110            .statistics_service
111            .get_both_queue_counts(&queue_name)
112            .await;
113
114        log::debug!(
115            "Retrieved stats for queue '{queue_name}': active={active_count:?}, dlq={dlq_count:?}"
116        );
117
118        Ok(ServiceBusResponse::QueueStatistics {
119            queue_name,
120            queue_type,
121            active_message_count: active_count,
122            dead_letter_message_count: dlq_count,
123            retrieved_at,
124        })
125    }
126}
127
128/// Handles message retrieval commands including peek and receive operations.
129///
130/// Provides functionality for retrieving messages from Service Bus queues
131/// in different modes - peeking (non-destructive) and receiving (with locks).
132/// Individual message operations are handled differently and require special setup.
133///
134/// # Examples
135///
136/// ```no_run
137/// use quetty_server::service_bus_manager::command_handlers::MessageCommandHandler;
138///
139/// let handler = MessageCommandHandler::new(consumer_manager);
140///
141/// // Peek at messages without removing them
142/// let response = handler.handle_peek_messages(10, None).await?;
143///
144/// // Receive messages with locks for processing
145/// let response = handler.handle_receive_messages(5).await?;
146/// ```
147pub struct MessageCommandHandler {
148    consumer_manager: Arc<Mutex<ConsumerManager>>,
149}
150
151impl MessageCommandHandler {
152    pub fn new(consumer_manager: Arc<Mutex<ConsumerManager>>) -> Self {
153        Self { consumer_manager }
154    }
155
156    pub async fn handle_peek_messages(
157        &self,
158        max_count: u32,
159        from_sequence: Option<i64>,
160    ) -> ServiceBusResult<ServiceBusResponse> {
161        let manager = self.consumer_manager.lock().await;
162        let messages = manager.peek_messages(max_count, from_sequence).await?;
163        Ok(ServiceBusResponse::MessagesReceived { messages })
164    }
165
166    pub async fn handle_receive_messages(
167        &self,
168        max_count: u32,
169    ) -> ServiceBusResult<ServiceBusResponse> {
170        let manager = self.consumer_manager.lock().await;
171        let messages = manager.receive_messages(max_count).await?;
172        Ok(ServiceBusResponse::ReceivedMessages { messages })
173    }
174
175    pub async fn handle_complete_message(
176        &self,
177        _message_id: String,
178    ) -> ServiceBusResult<ServiceBusResponse> {
179        Err(ServiceBusError::InternalError(
180            ERROR_INDIVIDUAL_MSG_OPERATIONS.to_string(),
181        ))
182    }
183
184    pub async fn handle_abandon_message(
185        &self,
186        _message_id: String,
187    ) -> ServiceBusResult<ServiceBusResponse> {
188        Err(ServiceBusError::InternalError(
189            ERROR_INDIVIDUAL_MSG_OPERATIONS.to_string(),
190        ))
191    }
192
193    pub async fn handle_dead_letter_message(
194        &self,
195        _message_id: String,
196        _reason: Option<String>,
197        _error_description: Option<String>,
198    ) -> ServiceBusResult<ServiceBusResponse> {
199        Err(ServiceBusError::InternalError(
200            ERROR_INDIVIDUAL_MSG_OPERATIONS.to_string(),
201        ))
202    }
203}
204
205/// Handles bulk operation commands for efficient processing of multiple messages.
206///
207/// Provides functionality for bulk message operations including delete, send,
208/// complete, abandon, and dead letter operations. Uses optimized batching
209/// strategies to handle large numbers of messages efficiently.
210///
211/// # Features
212///
213/// - **Bulk Delete** - Efficient deletion of multiple messages
214/// - **Bulk Send** - Send multiple messages to target queues
215/// - **Bulk Complete/Abandon** - Process multiple received messages
216/// - **Batch Optimization** - Smart batching based on operation size
217///
218/// # Examples
219///
220/// ```no_run
221/// use quetty_server::service_bus_manager::command_handlers::BulkCommandHandler;
222///
223/// let handler = BulkCommandHandler::new(
224///     bulk_handler,
225///     consumer_manager,
226///     producer_manager,
227///     batch_config
228/// );
229///
230/// // Bulk delete messages
231/// let response = handler.handle_bulk_delete(
232///     message_ids,
233///     1000  // max position
234/// ).await?;
235/// ```
236pub struct BulkCommandHandler {
237    bulk_handler: Arc<BulkOperationHandler>,
238    consumer_manager: Arc<Mutex<ConsumerManager>>,
239    producer_manager: Arc<Mutex<ProducerManager>>,
240    batch_config: BatchConfig,
241}
242
243impl BulkCommandHandler {
244    pub fn new(
245        bulk_handler: Arc<BulkOperationHandler>,
246        consumer_manager: Arc<Mutex<ConsumerManager>>,
247        producer_manager: Arc<Mutex<ProducerManager>>,
248        batch_config: BatchConfig,
249    ) -> Self {
250        Self {
251            bulk_handler,
252            consumer_manager,
253            producer_manager,
254            batch_config,
255        }
256    }
257
258    pub async fn handle_bulk_complete(
259        &self,
260        _message_ids: Vec<MessageIdentifier>,
261    ) -> ServiceBusResult<ServiceBusResponse> {
262        Err(ServiceBusError::InternalError(
263            ERROR_BULK_OPERATIONS.to_string(),
264        ))
265    }
266
267    pub async fn handle_bulk_delete(
268        &self,
269        message_ids: Vec<MessageIdentifier>,
270        max_position: usize,
271    ) -> ServiceBusResult<ServiceBusResponse> {
272        log::info!(
273            "Starting bulk delete operation for {} messages",
274            message_ids.len()
275        );
276
277        let (consumer, queue_name) = {
278            let manager = self.consumer_manager.lock().await;
279            let consumer_arc = manager
280                .get_raw_consumer()
281                .ok_or(ServiceBusError::ConsumerNotFound)?
282                .clone();
283            let queue = manager
284                .current_queue()
285                .ok_or(ServiceBusError::ConsumerNotFound)?
286                .name
287                .clone();
288            (consumer_arc, queue)
289        };
290
291        // Validate that we have messages to work with
292        if message_ids.is_empty() {
293            log::warn!("Bulk delete called with no message IDs");
294            let result = BulkOperationResult::new(0);
295            return Ok(ServiceBusResponse::BulkOperationCompleted { result });
296        }
297
298        // Log which queue we're deleting from for debugging
299        log::info!("Bulk delete operating on queue: {queue_name}");
300
301        match self
302            .bulk_handler
303            .delete_messages(consumer, queue_name, message_ids, max_position)
304            .await
305        {
306            Ok(result) => {
307                log::info!(
308                    "Bulk delete completed: {} successful, {} failed, {} not found",
309                    result.successful,
310                    result.failed,
311                    result.not_found
312                );
313                Ok(ServiceBusResponse::BulkOperationCompleted { result })
314            }
315            Err(e) => {
316                log::error!("Bulk delete failed: {e}");
317                Err(ServiceBusError::BulkOperationFailed(format!(
318                    "Bulk delete failed: {e}"
319                )))
320            }
321        }
322    }
323
324    pub async fn handle_bulk_abandon(
325        &self,
326        _message_ids: Vec<MessageIdentifier>,
327    ) -> ServiceBusResult<ServiceBusResponse> {
328        Err(ServiceBusError::InternalError(
329            ERROR_BULK_OPERATIONS.to_string(),
330        ))
331    }
332
333    pub async fn handle_bulk_dead_letter(
334        &self,
335        _message_ids: Vec<MessageIdentifier>,
336        _reason: Option<String>,
337        _error_description: Option<String>,
338    ) -> ServiceBusResult<ServiceBusResponse> {
339        Err(ServiceBusError::InternalError(
340            ERROR_BULK_OPERATIONS.to_string(),
341        ))
342    }
343
344    pub async fn handle_bulk_send(
345        &self,
346        message_ids: Vec<MessageIdentifier>,
347        target_queue: String,
348        should_delete_source: bool,
349        repeat_count: usize,
350        _max_position: usize,
351    ) -> ServiceBusResult<ServiceBusResponse> {
352        // Wrap the entire operation in a timeout
353        let operation_timeout = Duration::from_secs(self.batch_config.operation_timeout_secs());
354
355        match tokio::time::timeout(
356            operation_timeout,
357            self.handle_bulk_send_internal(
358                message_ids,
359                target_queue,
360                should_delete_source,
361                repeat_count,
362                _max_position,
363            ),
364        )
365        .await
366        {
367            Ok(result) => result,
368            Err(_) => {
369                log::error!(
370                    "Bulk send operation timed out after {} seconds",
371                    self.batch_config.operation_timeout_secs()
372                );
373                Err(ServiceBusError::OperationTimeout(format!(
374                    "Bulk send operation timed out after {} seconds",
375                    self.batch_config.operation_timeout_secs()
376                )))
377            }
378        }
379    }
380
381    async fn handle_bulk_send_internal(
382        &self,
383        message_ids: Vec<MessageIdentifier>,
384        target_queue: String,
385        should_delete_source: bool,
386        repeat_count: usize,
387        _max_position: usize,
388    ) -> ServiceBusResult<ServiceBusResponse> {
389        log::info!(
390            "Starting bulk send: {} -> {}, delete_source={}, repeat={}",
391            message_ids.len(),
392            target_queue,
393            should_delete_source,
394            repeat_count
395        );
396
397        // Check if this is a DLQ operation
398        let is_dlq_operation = target_queue.ends_with("/$deadletterqueue");
399
400        // Setup operation state
401        let (
402            consumer_arc,
403            mut remaining,
404            mut message_bytes,
405            mut successful_count,
406            mut failed_count,
407        ) = self.setup_bulk_send_operation(&message_ids).await?;
408
409        // Main processing loop
410        let batch_size = self.batch_config.bulk_chunk_size() as u32;
411        let mut processed_count = 0;
412        let mut highest_sequence_seen = 0i64;
413        let target_max_sequence = message_ids
414            .iter()
415            .map(|msg_id| msg_id.sequence)
416            .max()
417            .unwrap_or(0);
418        let mut pending_non_targets: Vec<azservicebus::ServiceBusReceivedMessage> = Vec::new();
419        let mut consecutive_empty_batches = 0;
420        let max_empty_batches = 3; // Exit after 3 consecutive empty batches
421
422        while self.should_continue_bulk_send(
423            &remaining,
424            target_max_sequence,
425            highest_sequence_seen,
426            consecutive_empty_batches,
427            max_empty_batches,
428        ) {
429            // Acquire consumer lock per batch instead of holding it for entire operation
430            let mut consumer = consumer_arc.lock().await;
431            let batch = match consumer
432                .receive_messages_with_timeout(
433                    batch_size,
434                    Duration::from_secs(self.batch_config.receive_timeout_secs()),
435                )
436                .await
437            {
438                Ok(msgs) => msgs,
439                Err(e) => {
440                    log::error!("Receive error during bulk send: {e}");
441                    drop(consumer); // Release lock before breaking
442                    break;
443                }
444            };
445
446            if batch.is_empty() {
447                consecutive_empty_batches += 1;
448                log::debug!(
449                    "Receive batch empty after processing {processed_count} messages (highest_sequence: {highest_sequence_seen}), consecutive empty: {consecutive_empty_batches}"
450                );
451                drop(consumer); // Release lock before continuing
452                continue;
453            } else {
454                consecutive_empty_batches = 0; // Reset counter on successful batch
455            }
456
457            let batch_len = batch.len();
458            for msg in batch.into_iter() {
459                let msg_id = msg.message_id().map(|s| s.to_string()).unwrap_or_default();
460                let msg_sequence = msg.sequence_number();
461                if msg_sequence > highest_sequence_seen {
462                    highest_sequence_seen = msg_sequence;
463                }
464                if remaining.remove(&msg_id).is_some() {
465                    let params = TargetMessageParams {
466                        consumer: &mut consumer,
467                        msg: &msg,
468                        is_dlq_operation,
469                        should_delete_source,
470                        message_bytes: &mut message_bytes,
471                        successful_count: &mut successful_count,
472                        failed_count: &mut failed_count,
473                    };
474                    self.process_target_message(params).await;
475                } else {
476                    pending_non_targets.push(msg);
477                }
478            }
479            processed_count += batch_len;
480
481            // Release consumer lock after processing batch
482            drop(consumer);
483
484            if processed_count % (batch_size as usize * 10) == 0 {
485                log::info!(
486                    "Bulk send progress: processed {} messages, highest_sequence: {}, remaining targets: {}",
487                    processed_count,
488                    highest_sequence_seen,
489                    remaining.len()
490                );
491            }
492            if target_max_sequence > 0
493                && highest_sequence_seen > target_max_sequence + 1000
494                && !remaining.is_empty()
495            {
496                log::warn!(
497                    "Safety break: highest_sequence {} exceeds target {} by 1000+, {} targets still remaining",
498                    highest_sequence_seen,
499                    target_max_sequence,
500                    remaining.len()
501                );
502                break;
503            }
504        }
505
506        log::info!(
507            "Bulk send scan completed: processed {} messages, highest_sequence: {}, targets found: {}, remaining: {}",
508            processed_count,
509            highest_sequence_seen,
510            successful_count,
511            remaining.len()
512        );
513
514        // Abandon non-target messages (acquire lock one final time)
515        if !pending_non_targets.is_empty() {
516            let mut consumer = consumer_arc.lock().await;
517            self.abandon_pending_non_targets(&mut consumer, pending_non_targets)
518                .await;
519        }
520        let params = BulkSendResultParams {
521            message_ids,
522            successful_count,
523            failed_count,
524        };
525        self.finalize_bulk_send_result(params)
526    }
527
528    async fn setup_bulk_send_operation(
529        &self,
530        message_ids: &[MessageIdentifier],
531    ) -> ServiceBusResult<(
532        Arc<Mutex<Consumer>>,
533        HashMap<String, MessageIdentifier>,
534        Vec<Vec<u8>>,
535        usize,
536        usize,
537    )> {
538        let consumer_arc = {
539            let manager = self.consumer_manager.lock().await;
540            manager
541                .get_raw_consumer()
542                .ok_or(ServiceBusError::ConsumerNotFound)?
543                .clone()
544        };
545        let remaining: HashMap<String, MessageIdentifier> = message_ids
546            .iter()
547            .map(|m| (m.id.clone(), m.clone()))
548            .collect();
549        let message_bytes: Vec<Vec<u8>> = Vec::new();
550        let successful_count: usize = 0;
551        let failed_count: usize = 0;
552        Ok((
553            consumer_arc,
554            remaining,
555            message_bytes,
556            successful_count,
557            failed_count,
558        ))
559    }
560
561    fn should_continue_bulk_send(
562        &self,
563        remaining: &HashMap<String, MessageIdentifier>,
564        target_max_sequence: i64,
565        highest_sequence_seen: i64,
566        consecutive_empty_batches: u32,
567        max_empty_batches: u32,
568    ) -> bool {
569        // Exit if no more targets remain
570        if remaining.is_empty() {
571            return false;
572        }
573
574        // Exit if too many consecutive empty batches (likely no more messages available)
575        if consecutive_empty_batches >= max_empty_batches {
576            log::warn!(
577                "Stopping bulk send after {} consecutive empty batches, {} targets still remaining",
578                consecutive_empty_batches,
579                remaining.len()
580            );
581            return false;
582        }
583
584        // Exit if we've gone far beyond the target sequence range
585        if target_max_sequence > 0 && highest_sequence_seen > target_max_sequence {
586            return false;
587        }
588
589        true
590    }
591
592    async fn process_target_message(&self, params: TargetMessageParams<'_>) {
593        if params.is_dlq_operation {
594            if let Err(e) = params
595                .consumer
596                .dead_letter_message(params.msg, Some("Bulk moved to DLQ".to_string()), None)
597                .await
598            {
599                *params.failed_count += 1;
600                log::error!(
601                    "Failed to dead letter message {:?}: {}",
602                    params.msg.message_id(),
603                    e
604                );
605                return;
606            }
607            *params.successful_count += 1;
608        } else {
609            if let Ok(body) = params.msg.body() {
610                params.message_bytes.push(body.to_vec());
611            }
612            let res = if params.should_delete_source {
613                params.consumer.complete_message(params.msg).await
614            } else {
615                params.consumer.abandon_message(params.msg).await
616            };
617            if let Err(e) = res {
618                *params.failed_count += 1;
619                log::error!(
620                    "Failed to finalise original message {:?}: {}",
621                    params.msg.message_id(),
622                    e
623                );
624                return;
625            }
626            *params.successful_count += 1;
627        }
628    }
629
630    async fn abandon_pending_non_targets(
631        &self,
632        consumer: &mut Consumer,
633        pending_non_targets: Vec<azservicebus::ServiceBusReceivedMessage>,
634    ) {
635        if !pending_non_targets.is_empty() {
636            log::info!(
637                "Abandoning {} non-target messages accumulated during scan",
638                pending_non_targets.len()
639            );
640            for msg in pending_non_targets.into_iter() {
641                if let Err(e) = consumer.abandon_message(&msg).await {
642                    log::warn!("Failed to abandon non-target message after scan: {e}");
643                }
644            }
645        }
646    }
647
648    fn finalize_bulk_send_result(
649        &self,
650        params: BulkSendResultParams,
651    ) -> ServiceBusResult<ServiceBusResponse> {
652        let mut result = BulkOperationResult::new(params.message_ids.len());
653        result.successful = params.successful_count;
654        result.failed = params.failed_count;
655        result.not_found = params
656            .message_ids
657            .len()
658            .saturating_sub(params.successful_count + params.failed_count);
659        Ok(ServiceBusResponse::BulkOperationCompleted { result })
660    }
661
662    pub async fn handle_bulk_send_peeked(
663        &self,
664        messages_data: Vec<(MessageIdentifier, Vec<u8>)>,
665        target_queue: String,
666        repeat_count: usize,
667    ) -> ServiceBusResult<ServiceBusResponse> {
668        log::info!(
669            "Bulk send (peeked) {} messages to {} (repeat={})",
670            messages_data.len(),
671            target_queue,
672            repeat_count
673        );
674
675        // Extract raw bytes
676        let raw_vec: Vec<Vec<u8>> = messages_data
677            .iter()
678            .map(|(_id, data)| data.clone())
679            .collect();
680
681        let mut producer_mgr = self.producer_manager.lock().await;
682        let stats = producer_mgr
683            .send_raw_messages(&target_queue, raw_vec, repeat_count)
684            .await
685            .map_err(|e| ServiceBusError::BulkOperationFailed(format!("Bulk send failed: {e}")))?;
686
687        Ok(ServiceBusResponse::MessagesSent {
688            queue_name: target_queue,
689            count: stats.total,
690            stats,
691        })
692    }
693}
694
695/// Handles message sending commands for single and multiple message operations.
696///
697/// Provides functionality for sending messages to Service Bus queues using
698/// the producer manager. Supports both single message sends and batch sending
699/// operations with comprehensive error handling and statistics tracking.
700///
701/// # Examples
702///
703/// ```no_run
704/// use quetty_server::service_bus_manager::command_handlers::SendCommandHandler;
705///
706/// let handler = SendCommandHandler::new(producer_manager);
707///
708/// // Send a single message
709/// let response = handler.handle_send_message(
710///     "target-queue".to_string(),
711///     message_data
712/// ).await?;
713///
714/// // Send multiple messages
715/// let response = handler.handle_send_messages(
716///     "target-queue".to_string(),
717///     vec![message1, message2, message3]
718/// ).await?;
719/// ```
720pub struct SendCommandHandler {
721    producer_manager: Arc<Mutex<ProducerManager>>,
722}
723
724impl SendCommandHandler {
725    pub fn new(producer_manager: Arc<Mutex<ProducerManager>>) -> Self {
726        Self { producer_manager }
727    }
728
729    pub async fn handle_send_message(
730        &self,
731        queue_name: String,
732        message: MessageData,
733    ) -> ServiceBusResult<ServiceBusResponse> {
734        let mut manager = self.producer_manager.lock().await;
735        manager.send_message(&queue_name, message).await?;
736        Ok(ServiceBusResponse::MessageSent {
737            queue_name: queue_name.clone(),
738        })
739    }
740
741    pub async fn handle_send_messages(
742        &self,
743        queue_name: String,
744        messages: Vec<MessageData>,
745    ) -> ServiceBusResult<ServiceBusResponse> {
746        let count = messages.len();
747        let mut manager = self.producer_manager.lock().await;
748        manager.send_messages(&queue_name, messages).await?;
749
750        let mut stats = super::types::OperationStats::new();
751        for _ in 0..count {
752            stats.add_success();
753        }
754
755        Ok(ServiceBusResponse::MessagesSent {
756            queue_name: queue_name.clone(),
757            count,
758            stats,
759        })
760    }
761}
762
763/// Handles status and health check commands for monitoring Service Bus connections.
764///
765/// Provides functionality for checking connection health, queue status, and
766/// overall system state. Monitors both consumer and producer managers to
767/// provide comprehensive status information.
768///
769/// # Examples
770///
771/// ```no_run
772/// use quetty_server::service_bus_manager::command_handlers::StatusCommandHandler;
773///
774/// let handler = StatusCommandHandler::new(consumer_manager, producer_manager);
775///
776/// // Check overall connection status
777/// let response = handler.handle_get_connection_status().await?;
778///
779/// // Get basic queue statistics
780/// let response = handler.handle_get_queue_stats(
781///     "orders".to_string()
782/// ).await?;
783/// ```
784pub struct StatusCommandHandler {
785    consumer_manager: Arc<Mutex<ConsumerManager>>,
786    producer_manager: Arc<Mutex<ProducerManager>>,
787}
788
789impl StatusCommandHandler {
790    pub fn new(
791        consumer_manager: Arc<Mutex<ConsumerManager>>,
792        producer_manager: Arc<Mutex<ProducerManager>>,
793    ) -> Self {
794        Self {
795            consumer_manager,
796            producer_manager,
797        }
798    }
799
800    pub async fn handle_get_connection_status(&self) -> ServiceBusResult<ServiceBusResponse> {
801        let consumer = self.consumer_manager.lock().await;
802        let producer = self.producer_manager.lock().await;
803
804        let connected = consumer.is_consumer_ready() || producer.producer_count() > 0;
805        let current_queue = consumer.current_queue().cloned();
806
807        Ok(ServiceBusResponse::ConnectionStatus {
808            connected,
809            current_queue,
810            last_error: None,
811        })
812    }
813
814    pub async fn handle_get_queue_stats(
815        &self,
816        queue_name: String,
817    ) -> ServiceBusResult<ServiceBusResponse> {
818        let consumer = self.consumer_manager.lock().await;
819        Ok(ServiceBusResponse::QueueStats {
820            queue_name: queue_name.clone(),
821            message_count: None,
822            active_consumer: consumer.is_consumer_ready(),
823        })
824    }
825}
826
827/// Handles resource management commands for cleanup and connection management.
828///
829/// Provides functionality for disposing of Service Bus resources, resetting
830/// connections, and performing cleanup operations. Essential for proper
831/// resource lifecycle management and error recovery.
832///
833/// # Examples
834///
835/// ```no_run
836/// use quetty_server::service_bus_manager::command_handlers::ResourceCommandHandler;
837///
838/// let handler = ResourceCommandHandler::new(consumer_manager, producer_manager);
839///
840/// // Dispose of current consumer
841/// let response = handler.handle_dispose_consumer().await?;
842///
843/// // Clean up all resources
844/// let response = handler.handle_dispose_all_resources().await?;
845///
846/// // Reset connection
847/// let response = handler.handle_reset_connection().await?;
848/// ```
849pub struct ResourceCommandHandler {
850    consumer_manager: Arc<Mutex<ConsumerManager>>,
851    producer_manager: Arc<Mutex<ProducerManager>>,
852}
853
854impl ResourceCommandHandler {
855    pub fn new(
856        consumer_manager: Arc<Mutex<ConsumerManager>>,
857        producer_manager: Arc<Mutex<ProducerManager>>,
858    ) -> Self {
859        Self {
860            consumer_manager,
861            producer_manager,
862        }
863    }
864
865    pub async fn handle_dispose_consumer(&self) -> ServiceBusResult<ServiceBusResponse> {
866        let mut manager = self.consumer_manager.lock().await;
867        manager.dispose_consumer().await?;
868        Ok(ServiceBusResponse::ConsumerDisposed)
869    }
870
871    pub async fn handle_dispose_all_resources(&self) -> ServiceBusResult<ServiceBusResponse> {
872        let mut consumer = self.consumer_manager.lock().await;
873        let mut producer = self.producer_manager.lock().await;
874        consumer.dispose_consumer().await?;
875        producer.dispose_all_producers().await?;
876        Ok(ServiceBusResponse::AllResourcesDisposed)
877    }
878}
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883    use crate::service_bus_manager::types::QueueType;
884
885    #[test]
886    fn test_error_constants() {
887        // Test that error messages are descriptive
888        assert!(ERROR_INDIVIDUAL_MSG_OPERATIONS.contains("require message to be received"));
889        assert!(ERROR_BULK_OPERATIONS.contains("require message to be received"));
890    }
891
892    #[test]
893    fn test_queue_info_creation() {
894        let queue_info = QueueInfo::new("test_queue".to_string(), QueueType::Main);
895        assert_eq!(queue_info.name, "test_queue");
896        assert_eq!(queue_info.queue_type, QueueType::Main);
897    }
898
899    #[test]
900    fn test_message_identifier_creation() {
901        use crate::bulk_operations::MessageIdentifier;
902
903        let msg_id = MessageIdentifier::new("test_id".to_string(), 123);
904        assert_eq!(msg_id.id, "test_id");
905        assert_eq!(msg_id.sequence, 123);
906    }
907
908    #[test]
909    fn test_error_message_consistency() {
910        // Test that error constants are used consistently
911        assert_ne!(ERROR_INDIVIDUAL_MSG_OPERATIONS, ERROR_BULK_OPERATIONS);
912
913        // Ensure all error messages provide helpful context
914        assert!(ERROR_INDIVIDUAL_MSG_OPERATIONS.len() > 10);
915        assert!(ERROR_BULK_OPERATIONS.len() > 10);
916    }
917}