1use super::consumer_manager::ConsumerManager;
2use super::producer_manager::ProducerManager;
3use super::queue_statistics_service::QueueStatisticsService;
4use super::types::{QueueInfo, QueueType};
5
6use crate::bulk_operations::BulkOperationResult;
7use crate::bulk_operations::{BulkOperationHandler, MessageIdentifier, types::BatchConfig};
8use crate::consumer::Consumer;
9use crate::service_bus_manager::{
10 errors::ServiceBusError, responses::ServiceBusResponse, types::MessageData,
11};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::Mutex;
16
17#[derive(Debug)]
19struct TargetMessageParams<'a> {
20 consumer: &'a mut Consumer,
21 msg: &'a azservicebus::ServiceBusReceivedMessage,
22 is_dlq_operation: bool,
23 should_delete_source: bool,
24 message_bytes: &'a mut Vec<Vec<u8>>,
25 successful_count: &'a mut usize,
26 failed_count: &'a mut usize,
27}
28
29#[derive(Debug)]
31struct BulkSendResultParams {
32 message_ids: Vec<MessageIdentifier>,
33 successful_count: usize,
34 failed_count: usize,
35}
36
37type ServiceBusResult<T> = Result<T, ServiceBusError>;
39
40const ERROR_INDIVIDUAL_MSG_OPERATIONS: &str =
42 "Individual message operations by ID require message to be received first";
43const ERROR_BULK_OPERATIONS: &str = "Bulk operations require message to be received first";
44
45pub struct QueueCommandHandler {
67 consumer_manager: Arc<Mutex<ConsumerManager>>,
68 statistics_service: Arc<QueueStatisticsService>,
69}
70
71impl QueueCommandHandler {
72 pub fn new(
73 consumer_manager: Arc<Mutex<ConsumerManager>>,
74 statistics_service: Arc<QueueStatisticsService>,
75 ) -> Self {
76 Self {
77 consumer_manager,
78 statistics_service,
79 }
80 }
81
82 pub async fn handle_switch_queue(
83 &self,
84 queue_name: String,
85 queue_type: QueueType,
86 ) -> ServiceBusResult<ServiceBusResponse> {
87 let queue_info = QueueInfo::new(queue_name, queue_type);
88 let mut manager = self.consumer_manager.lock().await;
89 manager.switch_queue(queue_info.clone()).await?;
90 Ok(ServiceBusResponse::QueueSwitched { queue_info })
91 }
92
93 pub async fn handle_get_current_queue(&self) -> ServiceBusResult<ServiceBusResponse> {
94 let manager = self.consumer_manager.lock().await;
95 let queue_info = manager.current_queue().cloned();
96 Ok(ServiceBusResponse::CurrentQueue { queue_info })
97 }
98
99 pub async fn handle_get_queue_statistics(
100 &self,
101 queue_name: String,
102 queue_type: QueueType,
103 ) -> ServiceBusResult<ServiceBusResponse> {
104 log::debug!("Getting real statistics for queue: {queue_name} (type: {queue_type:?})");
105
106 let retrieved_at = chrono::Utc::now();
107
108 let (active_count, dlq_count) = self
110 .statistics_service
111 .get_both_queue_counts(&queue_name)
112 .await;
113
114 log::debug!(
115 "Retrieved stats for queue '{queue_name}': active={active_count:?}, dlq={dlq_count:?}"
116 );
117
118 Ok(ServiceBusResponse::QueueStatistics {
119 queue_name,
120 queue_type,
121 active_message_count: active_count,
122 dead_letter_message_count: dlq_count,
123 retrieved_at,
124 })
125 }
126}
127
128pub struct MessageCommandHandler {
148 consumer_manager: Arc<Mutex<ConsumerManager>>,
149}
150
151impl MessageCommandHandler {
152 pub fn new(consumer_manager: Arc<Mutex<ConsumerManager>>) -> Self {
153 Self { consumer_manager }
154 }
155
156 pub async fn handle_peek_messages(
157 &self,
158 max_count: u32,
159 from_sequence: Option<i64>,
160 ) -> ServiceBusResult<ServiceBusResponse> {
161 let manager = self.consumer_manager.lock().await;
162 let messages = manager.peek_messages(max_count, from_sequence).await?;
163 Ok(ServiceBusResponse::MessagesReceived { messages })
164 }
165
166 pub async fn handle_receive_messages(
167 &self,
168 max_count: u32,
169 ) -> ServiceBusResult<ServiceBusResponse> {
170 let manager = self.consumer_manager.lock().await;
171 let messages = manager.receive_messages(max_count).await?;
172 Ok(ServiceBusResponse::ReceivedMessages { messages })
173 }
174
175 pub async fn handle_complete_message(
176 &self,
177 _message_id: String,
178 ) -> ServiceBusResult<ServiceBusResponse> {
179 Err(ServiceBusError::InternalError(
180 ERROR_INDIVIDUAL_MSG_OPERATIONS.to_string(),
181 ))
182 }
183
184 pub async fn handle_abandon_message(
185 &self,
186 _message_id: String,
187 ) -> ServiceBusResult<ServiceBusResponse> {
188 Err(ServiceBusError::InternalError(
189 ERROR_INDIVIDUAL_MSG_OPERATIONS.to_string(),
190 ))
191 }
192
193 pub async fn handle_dead_letter_message(
194 &self,
195 _message_id: String,
196 _reason: Option<String>,
197 _error_description: Option<String>,
198 ) -> ServiceBusResult<ServiceBusResponse> {
199 Err(ServiceBusError::InternalError(
200 ERROR_INDIVIDUAL_MSG_OPERATIONS.to_string(),
201 ))
202 }
203}
204
205pub struct BulkCommandHandler {
237 bulk_handler: Arc<BulkOperationHandler>,
238 consumer_manager: Arc<Mutex<ConsumerManager>>,
239 producer_manager: Arc<Mutex<ProducerManager>>,
240 batch_config: BatchConfig,
241}
242
243impl BulkCommandHandler {
244 pub fn new(
245 bulk_handler: Arc<BulkOperationHandler>,
246 consumer_manager: Arc<Mutex<ConsumerManager>>,
247 producer_manager: Arc<Mutex<ProducerManager>>,
248 batch_config: BatchConfig,
249 ) -> Self {
250 Self {
251 bulk_handler,
252 consumer_manager,
253 producer_manager,
254 batch_config,
255 }
256 }
257
258 pub async fn handle_bulk_complete(
259 &self,
260 _message_ids: Vec<MessageIdentifier>,
261 ) -> ServiceBusResult<ServiceBusResponse> {
262 Err(ServiceBusError::InternalError(
263 ERROR_BULK_OPERATIONS.to_string(),
264 ))
265 }
266
267 pub async fn handle_bulk_delete(
268 &self,
269 message_ids: Vec<MessageIdentifier>,
270 max_position: usize,
271 ) -> ServiceBusResult<ServiceBusResponse> {
272 log::info!(
273 "Starting bulk delete operation for {} messages",
274 message_ids.len()
275 );
276
277 let (consumer, queue_name) = {
278 let manager = self.consumer_manager.lock().await;
279 let consumer_arc = manager
280 .get_raw_consumer()
281 .ok_or(ServiceBusError::ConsumerNotFound)?
282 .clone();
283 let queue = manager
284 .current_queue()
285 .ok_or(ServiceBusError::ConsumerNotFound)?
286 .name
287 .clone();
288 (consumer_arc, queue)
289 };
290
291 if message_ids.is_empty() {
293 log::warn!("Bulk delete called with no message IDs");
294 let result = BulkOperationResult::new(0);
295 return Ok(ServiceBusResponse::BulkOperationCompleted { result });
296 }
297
298 log::info!("Bulk delete operating on queue: {queue_name}");
300
301 match self
302 .bulk_handler
303 .delete_messages(consumer, queue_name, message_ids, max_position)
304 .await
305 {
306 Ok(result) => {
307 log::info!(
308 "Bulk delete completed: {} successful, {} failed, {} not found",
309 result.successful,
310 result.failed,
311 result.not_found
312 );
313 Ok(ServiceBusResponse::BulkOperationCompleted { result })
314 }
315 Err(e) => {
316 log::error!("Bulk delete failed: {e}");
317 Err(ServiceBusError::BulkOperationFailed(format!(
318 "Bulk delete failed: {e}"
319 )))
320 }
321 }
322 }
323
324 pub async fn handle_bulk_abandon(
325 &self,
326 _message_ids: Vec<MessageIdentifier>,
327 ) -> ServiceBusResult<ServiceBusResponse> {
328 Err(ServiceBusError::InternalError(
329 ERROR_BULK_OPERATIONS.to_string(),
330 ))
331 }
332
333 pub async fn handle_bulk_dead_letter(
334 &self,
335 _message_ids: Vec<MessageIdentifier>,
336 _reason: Option<String>,
337 _error_description: Option<String>,
338 ) -> ServiceBusResult<ServiceBusResponse> {
339 Err(ServiceBusError::InternalError(
340 ERROR_BULK_OPERATIONS.to_string(),
341 ))
342 }
343
344 pub async fn handle_bulk_send(
345 &self,
346 message_ids: Vec<MessageIdentifier>,
347 target_queue: String,
348 should_delete_source: bool,
349 repeat_count: usize,
350 _max_position: usize,
351 ) -> ServiceBusResult<ServiceBusResponse> {
352 let operation_timeout = Duration::from_secs(self.batch_config.operation_timeout_secs());
354
355 match tokio::time::timeout(
356 operation_timeout,
357 self.handle_bulk_send_internal(
358 message_ids,
359 target_queue,
360 should_delete_source,
361 repeat_count,
362 _max_position,
363 ),
364 )
365 .await
366 {
367 Ok(result) => result,
368 Err(_) => {
369 log::error!(
370 "Bulk send operation timed out after {} seconds",
371 self.batch_config.operation_timeout_secs()
372 );
373 Err(ServiceBusError::OperationTimeout(format!(
374 "Bulk send operation timed out after {} seconds",
375 self.batch_config.operation_timeout_secs()
376 )))
377 }
378 }
379 }
380
381 async fn handle_bulk_send_internal(
382 &self,
383 message_ids: Vec<MessageIdentifier>,
384 target_queue: String,
385 should_delete_source: bool,
386 repeat_count: usize,
387 _max_position: usize,
388 ) -> ServiceBusResult<ServiceBusResponse> {
389 log::info!(
390 "Starting bulk send: {} -> {}, delete_source={}, repeat={}",
391 message_ids.len(),
392 target_queue,
393 should_delete_source,
394 repeat_count
395 );
396
397 let is_dlq_operation = target_queue.ends_with("/$deadletterqueue");
399
400 let (
402 consumer_arc,
403 mut remaining,
404 mut message_bytes,
405 mut successful_count,
406 mut failed_count,
407 ) = self.setup_bulk_send_operation(&message_ids).await?;
408
409 let batch_size = self.batch_config.bulk_chunk_size() as u32;
411 let mut processed_count = 0;
412 let mut highest_sequence_seen = 0i64;
413 let target_max_sequence = message_ids
414 .iter()
415 .map(|msg_id| msg_id.sequence)
416 .max()
417 .unwrap_or(0);
418 let mut pending_non_targets: Vec<azservicebus::ServiceBusReceivedMessage> = Vec::new();
419 let mut consecutive_empty_batches = 0;
420 let max_empty_batches = 3; while self.should_continue_bulk_send(
423 &remaining,
424 target_max_sequence,
425 highest_sequence_seen,
426 consecutive_empty_batches,
427 max_empty_batches,
428 ) {
429 let mut consumer = consumer_arc.lock().await;
431 let batch = match consumer
432 .receive_messages_with_timeout(
433 batch_size,
434 Duration::from_secs(self.batch_config.receive_timeout_secs()),
435 )
436 .await
437 {
438 Ok(msgs) => msgs,
439 Err(e) => {
440 log::error!("Receive error during bulk send: {e}");
441 drop(consumer); break;
443 }
444 };
445
446 if batch.is_empty() {
447 consecutive_empty_batches += 1;
448 log::debug!(
449 "Receive batch empty after processing {processed_count} messages (highest_sequence: {highest_sequence_seen}), consecutive empty: {consecutive_empty_batches}"
450 );
451 drop(consumer); continue;
453 } else {
454 consecutive_empty_batches = 0; }
456
457 let batch_len = batch.len();
458 for msg in batch.into_iter() {
459 let msg_id = msg.message_id().map(|s| s.to_string()).unwrap_or_default();
460 let msg_sequence = msg.sequence_number();
461 if msg_sequence > highest_sequence_seen {
462 highest_sequence_seen = msg_sequence;
463 }
464 if remaining.remove(&msg_id).is_some() {
465 let params = TargetMessageParams {
466 consumer: &mut consumer,
467 msg: &msg,
468 is_dlq_operation,
469 should_delete_source,
470 message_bytes: &mut message_bytes,
471 successful_count: &mut successful_count,
472 failed_count: &mut failed_count,
473 };
474 self.process_target_message(params).await;
475 } else {
476 pending_non_targets.push(msg);
477 }
478 }
479 processed_count += batch_len;
480
481 drop(consumer);
483
484 if processed_count % (batch_size as usize * 10) == 0 {
485 log::info!(
486 "Bulk send progress: processed {} messages, highest_sequence: {}, remaining targets: {}",
487 processed_count,
488 highest_sequence_seen,
489 remaining.len()
490 );
491 }
492 if target_max_sequence > 0
493 && highest_sequence_seen > target_max_sequence + 1000
494 && !remaining.is_empty()
495 {
496 log::warn!(
497 "Safety break: highest_sequence {} exceeds target {} by 1000+, {} targets still remaining",
498 highest_sequence_seen,
499 target_max_sequence,
500 remaining.len()
501 );
502 break;
503 }
504 }
505
506 log::info!(
507 "Bulk send scan completed: processed {} messages, highest_sequence: {}, targets found: {}, remaining: {}",
508 processed_count,
509 highest_sequence_seen,
510 successful_count,
511 remaining.len()
512 );
513
514 if !pending_non_targets.is_empty() {
516 let mut consumer = consumer_arc.lock().await;
517 self.abandon_pending_non_targets(&mut consumer, pending_non_targets)
518 .await;
519 }
520 let params = BulkSendResultParams {
521 message_ids,
522 successful_count,
523 failed_count,
524 };
525 self.finalize_bulk_send_result(params)
526 }
527
528 async fn setup_bulk_send_operation(
529 &self,
530 message_ids: &[MessageIdentifier],
531 ) -> ServiceBusResult<(
532 Arc<Mutex<Consumer>>,
533 HashMap<String, MessageIdentifier>,
534 Vec<Vec<u8>>,
535 usize,
536 usize,
537 )> {
538 let consumer_arc = {
539 let manager = self.consumer_manager.lock().await;
540 manager
541 .get_raw_consumer()
542 .ok_or(ServiceBusError::ConsumerNotFound)?
543 .clone()
544 };
545 let remaining: HashMap<String, MessageIdentifier> = message_ids
546 .iter()
547 .map(|m| (m.id.clone(), m.clone()))
548 .collect();
549 let message_bytes: Vec<Vec<u8>> = Vec::new();
550 let successful_count: usize = 0;
551 let failed_count: usize = 0;
552 Ok((
553 consumer_arc,
554 remaining,
555 message_bytes,
556 successful_count,
557 failed_count,
558 ))
559 }
560
561 fn should_continue_bulk_send(
562 &self,
563 remaining: &HashMap<String, MessageIdentifier>,
564 target_max_sequence: i64,
565 highest_sequence_seen: i64,
566 consecutive_empty_batches: u32,
567 max_empty_batches: u32,
568 ) -> bool {
569 if remaining.is_empty() {
571 return false;
572 }
573
574 if consecutive_empty_batches >= max_empty_batches {
576 log::warn!(
577 "Stopping bulk send after {} consecutive empty batches, {} targets still remaining",
578 consecutive_empty_batches,
579 remaining.len()
580 );
581 return false;
582 }
583
584 if target_max_sequence > 0 && highest_sequence_seen > target_max_sequence {
586 return false;
587 }
588
589 true
590 }
591
592 async fn process_target_message(&self, params: TargetMessageParams<'_>) {
593 if params.is_dlq_operation {
594 if let Err(e) = params
595 .consumer
596 .dead_letter_message(params.msg, Some("Bulk moved to DLQ".to_string()), None)
597 .await
598 {
599 *params.failed_count += 1;
600 log::error!(
601 "Failed to dead letter message {:?}: {}",
602 params.msg.message_id(),
603 e
604 );
605 return;
606 }
607 *params.successful_count += 1;
608 } else {
609 if let Ok(body) = params.msg.body() {
610 params.message_bytes.push(body.to_vec());
611 }
612 let res = if params.should_delete_source {
613 params.consumer.complete_message(params.msg).await
614 } else {
615 params.consumer.abandon_message(params.msg).await
616 };
617 if let Err(e) = res {
618 *params.failed_count += 1;
619 log::error!(
620 "Failed to finalise original message {:?}: {}",
621 params.msg.message_id(),
622 e
623 );
624 return;
625 }
626 *params.successful_count += 1;
627 }
628 }
629
630 async fn abandon_pending_non_targets(
631 &self,
632 consumer: &mut Consumer,
633 pending_non_targets: Vec<azservicebus::ServiceBusReceivedMessage>,
634 ) {
635 if !pending_non_targets.is_empty() {
636 log::info!(
637 "Abandoning {} non-target messages accumulated during scan",
638 pending_non_targets.len()
639 );
640 for msg in pending_non_targets.into_iter() {
641 if let Err(e) = consumer.abandon_message(&msg).await {
642 log::warn!("Failed to abandon non-target message after scan: {e}");
643 }
644 }
645 }
646 }
647
648 fn finalize_bulk_send_result(
649 &self,
650 params: BulkSendResultParams,
651 ) -> ServiceBusResult<ServiceBusResponse> {
652 let mut result = BulkOperationResult::new(params.message_ids.len());
653 result.successful = params.successful_count;
654 result.failed = params.failed_count;
655 result.not_found = params
656 .message_ids
657 .len()
658 .saturating_sub(params.successful_count + params.failed_count);
659 Ok(ServiceBusResponse::BulkOperationCompleted { result })
660 }
661
662 pub async fn handle_bulk_send_peeked(
663 &self,
664 messages_data: Vec<(MessageIdentifier, Vec<u8>)>,
665 target_queue: String,
666 repeat_count: usize,
667 ) -> ServiceBusResult<ServiceBusResponse> {
668 log::info!(
669 "Bulk send (peeked) {} messages to {} (repeat={})",
670 messages_data.len(),
671 target_queue,
672 repeat_count
673 );
674
675 let raw_vec: Vec<Vec<u8>> = messages_data
677 .iter()
678 .map(|(_id, data)| data.clone())
679 .collect();
680
681 let mut producer_mgr = self.producer_manager.lock().await;
682 let stats = producer_mgr
683 .send_raw_messages(&target_queue, raw_vec, repeat_count)
684 .await
685 .map_err(|e| ServiceBusError::BulkOperationFailed(format!("Bulk send failed: {e}")))?;
686
687 Ok(ServiceBusResponse::MessagesSent {
688 queue_name: target_queue,
689 count: stats.total,
690 stats,
691 })
692 }
693}
694
695pub struct SendCommandHandler {
721 producer_manager: Arc<Mutex<ProducerManager>>,
722}
723
724impl SendCommandHandler {
725 pub fn new(producer_manager: Arc<Mutex<ProducerManager>>) -> Self {
726 Self { producer_manager }
727 }
728
729 pub async fn handle_send_message(
730 &self,
731 queue_name: String,
732 message: MessageData,
733 ) -> ServiceBusResult<ServiceBusResponse> {
734 let mut manager = self.producer_manager.lock().await;
735 manager.send_message(&queue_name, message).await?;
736 Ok(ServiceBusResponse::MessageSent {
737 queue_name: queue_name.clone(),
738 })
739 }
740
741 pub async fn handle_send_messages(
742 &self,
743 queue_name: String,
744 messages: Vec<MessageData>,
745 ) -> ServiceBusResult<ServiceBusResponse> {
746 let count = messages.len();
747 let mut manager = self.producer_manager.lock().await;
748 manager.send_messages(&queue_name, messages).await?;
749
750 let mut stats = super::types::OperationStats::new();
751 for _ in 0..count {
752 stats.add_success();
753 }
754
755 Ok(ServiceBusResponse::MessagesSent {
756 queue_name: queue_name.clone(),
757 count,
758 stats,
759 })
760 }
761}
762
763pub struct StatusCommandHandler {
785 consumer_manager: Arc<Mutex<ConsumerManager>>,
786 producer_manager: Arc<Mutex<ProducerManager>>,
787}
788
789impl StatusCommandHandler {
790 pub fn new(
791 consumer_manager: Arc<Mutex<ConsumerManager>>,
792 producer_manager: Arc<Mutex<ProducerManager>>,
793 ) -> Self {
794 Self {
795 consumer_manager,
796 producer_manager,
797 }
798 }
799
800 pub async fn handle_get_connection_status(&self) -> ServiceBusResult<ServiceBusResponse> {
801 let consumer = self.consumer_manager.lock().await;
802 let producer = self.producer_manager.lock().await;
803
804 let connected = consumer.is_consumer_ready() || producer.producer_count() > 0;
805 let current_queue = consumer.current_queue().cloned();
806
807 Ok(ServiceBusResponse::ConnectionStatus {
808 connected,
809 current_queue,
810 last_error: None,
811 })
812 }
813
814 pub async fn handle_get_queue_stats(
815 &self,
816 queue_name: String,
817 ) -> ServiceBusResult<ServiceBusResponse> {
818 let consumer = self.consumer_manager.lock().await;
819 Ok(ServiceBusResponse::QueueStats {
820 queue_name: queue_name.clone(),
821 message_count: None,
822 active_consumer: consumer.is_consumer_ready(),
823 })
824 }
825}
826
827pub struct ResourceCommandHandler {
850 consumer_manager: Arc<Mutex<ConsumerManager>>,
851 producer_manager: Arc<Mutex<ProducerManager>>,
852}
853
854impl ResourceCommandHandler {
855 pub fn new(
856 consumer_manager: Arc<Mutex<ConsumerManager>>,
857 producer_manager: Arc<Mutex<ProducerManager>>,
858 ) -> Self {
859 Self {
860 consumer_manager,
861 producer_manager,
862 }
863 }
864
865 pub async fn handle_dispose_consumer(&self) -> ServiceBusResult<ServiceBusResponse> {
866 let mut manager = self.consumer_manager.lock().await;
867 manager.dispose_consumer().await?;
868 Ok(ServiceBusResponse::ConsumerDisposed)
869 }
870
871 pub async fn handle_dispose_all_resources(&self) -> ServiceBusResult<ServiceBusResponse> {
872 let mut consumer = self.consumer_manager.lock().await;
873 let mut producer = self.producer_manager.lock().await;
874 consumer.dispose_consumer().await?;
875 producer.dispose_all_producers().await?;
876 Ok(ServiceBusResponse::AllResourcesDisposed)
877 }
878}
879
880#[cfg(test)]
881mod tests {
882 use super::*;
883 use crate::service_bus_manager::types::QueueType;
884
885 #[test]
886 fn test_error_constants() {
887 assert!(ERROR_INDIVIDUAL_MSG_OPERATIONS.contains("require message to be received"));
889 assert!(ERROR_BULK_OPERATIONS.contains("require message to be received"));
890 }
891
892 #[test]
893 fn test_queue_info_creation() {
894 let queue_info = QueueInfo::new("test_queue".to_string(), QueueType::Main);
895 assert_eq!(queue_info.name, "test_queue");
896 assert_eq!(queue_info.queue_type, QueueType::Main);
897 }
898
899 #[test]
900 fn test_message_identifier_creation() {
901 use crate::bulk_operations::MessageIdentifier;
902
903 let msg_id = MessageIdentifier::new("test_id".to_string(), 123);
904 assert_eq!(msg_id.id, "test_id");
905 assert_eq!(msg_id.sequence, 123);
906 }
907
908 #[test]
909 fn test_error_message_consistency() {
910 assert_ne!(ERROR_INDIVIDUAL_MSG_OPERATIONS, ERROR_BULK_OPERATIONS);
912
913 assert!(ERROR_INDIVIDUAL_MSG_OPERATIONS.len() > 10);
915 assert!(ERROR_BULK_OPERATIONS.len() > 10);
916 }
917}