use super::errors::{ServiceBusError, ServiceBusResult};
use super::types::QueueInfo;
use crate::bulk_operations::types::BatchConfig;
use crate::consumer::{Consumer, ServiceBusClientExt};
use crate::model::MessageModel;
use azservicebus::{ServiceBusClient, ServiceBusReceiverOptions, core::BasicRetryPolicy};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
pub struct ConsumerManager {
current_consumer: Option<Arc<Mutex<Consumer>>>,
current_queue: Option<QueueInfo>,
service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
batch_config: BatchConfig,
}
impl ConsumerManager {
pub fn new(
service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
batch_config: BatchConfig,
) -> Self {
Self {
current_consumer: None,
current_queue: None,
service_bus_client,
batch_config,
}
}
pub async fn switch_queue(&mut self, queue_info: QueueInfo) -> ServiceBusResult<()> {
log::info!(
"Switching to queue: {} (type: {:?})",
queue_info.name,
queue_info.queue_type
);
if let Some(current_queue) = &self.current_queue {
if current_queue.name == queue_info.name
&& current_queue.queue_type == queue_info.queue_type
{
log::debug!("Already connected to queue: {}", queue_info.name);
return Ok(());
}
}
if let Some(consumer) = &self.current_consumer {
log::debug!("Disposing existing consumer");
if let Err(e) = consumer.lock().await.dispose().await {
log::error!("Failed to dispose existing consumer: {e}");
}
}
log::debug!("Creating new consumer for queue: {}", queue_info.name);
let mut client = self.service_bus_client.lock().await;
let consumer = client
.create_consumer_for_queue(
queue_info.name.clone(),
ServiceBusReceiverOptions::default(),
)
.await
.map_err(|e| {
ServiceBusError::ConsumerCreationFailed(format!(
"Failed to create consumer for queue {}: {}",
queue_info.name, e
))
})?;
self.current_consumer = Some(Arc::new(Mutex::new(consumer)));
self.current_queue = Some(queue_info);
if let Some(queue) = self.current_queue.as_ref() {
log::info!("Successfully switched to queue: {}", queue.name);
}
Ok(())
}
pub fn current_queue(&self) -> Option<&QueueInfo> {
self.current_queue.as_ref()
}
pub fn is_consumer_ready(&self) -> bool {
self.current_consumer.is_some() && self.current_queue.is_some()
}
pub async fn peek_messages(
&self,
max_count: u32,
from_sequence: Option<i64>,
) -> ServiceBusResult<Vec<MessageModel>> {
let consumer = self.get_consumer()?;
let mut consumer_guard = consumer.lock().await;
consumer_guard
.peek_messages(max_count, from_sequence)
.await
.map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))
}
pub async fn receive_messages(
&self,
max_count: u32,
) -> ServiceBusResult<Vec<azservicebus::ServiceBusReceivedMessage>> {
let consumer = self.get_consumer()?;
let mut consumer_guard = consumer.lock().await;
let timeout = Duration::from_secs(self.batch_config.receive_timeout_secs());
consumer_guard
.receive_messages_with_timeout(max_count, timeout)
.await
.map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))
}
pub async fn complete_message(
&self,
message: &azservicebus::ServiceBusReceivedMessage,
) -> ServiceBusResult<()> {
let consumer = self.get_consumer()?;
let mut consumer_guard = consumer.lock().await;
consumer_guard
.complete_message(message)
.await
.map_err(|e| ServiceBusError::MessageCompleteFailed(e.to_string()))
}
pub async fn complete_messages(
&self,
messages: &[azservicebus::ServiceBusReceivedMessage],
) -> ServiceBusResult<()> {
let consumer = self.get_consumer()?;
let mut consumer_guard = consumer.lock().await;
consumer_guard
.complete_messages(messages)
.await
.map_err(|e| ServiceBusError::MessageCompleteFailed(e.to_string()))
}
pub async fn abandon_message(
&self,
message: &azservicebus::ServiceBusReceivedMessage,
) -> ServiceBusResult<()> {
let consumer = self.get_consumer()?;
let mut consumer_guard = consumer.lock().await;
consumer_guard
.abandon_message(message)
.await
.map_err(|e| ServiceBusError::MessageAbandonFailed(e.to_string()))
}
pub async fn abandon_messages(
&self,
messages: &[azservicebus::ServiceBusReceivedMessage],
) -> ServiceBusResult<()> {
let consumer = self.get_consumer()?;
let mut consumer_guard = consumer.lock().await;
consumer_guard
.abandon_messages(messages)
.await
.map_err(|e| ServiceBusError::MessageAbandonFailed(e.to_string()))
}
pub async fn dead_letter_message(
&self,
message: &azservicebus::ServiceBusReceivedMessage,
reason: Option<String>,
error_description: Option<String>,
) -> ServiceBusResult<()> {
let consumer = self.get_consumer()?;
let mut consumer_guard = consumer.lock().await;
consumer_guard
.dead_letter_message(message, reason, error_description)
.await
.map_err(|e| ServiceBusError::MessageDeadLetterFailed(e.to_string()))
}
pub async fn find_message(
&self,
message_id: &str,
sequence_number: i64,
max_position: Option<usize>,
) -> ServiceBusResult<Option<azservicebus::ServiceBusReceivedMessage>> {
let consumer = self.get_consumer()?;
let batch_size = self.batch_config.bulk_chunk_size() as u32;
let timeout = Duration::from_secs(self.batch_config.bulk_processing_time_secs());
let max_position = max_position.unwrap_or(self.batch_config.max_messages_to_process());
log::info!(
"Searching for message {message_id} (sequence: {sequence_number}) in batches of {batch_size} up to position {max_position}"
);
let mut processed_count = 0;
while processed_count < max_position {
log::debug!(
"Search batch: fetching {batch_size} messages (processed: {processed_count}/{max_position})"
);
let messages = {
let mut consumer_guard = consumer.lock().await;
consumer_guard
.receive_messages_with_timeout(batch_size, timeout)
.await
.map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))?
};
if messages.is_empty() {
log::debug!("No more messages available - stopping search");
break;
}
processed_count += messages.len();
for message in messages.into_iter() {
let msg_id = message.message_id().unwrap_or_default();
let msg_seq = message.sequence_number();
if msg_id == message_id && msg_seq == sequence_number {
log::info!(
"Found target message {message_id} (sequence: {sequence_number}) after processing {processed_count} messages"
);
return Ok(Some(message));
}
let mut consumer_guard = consumer.lock().await;
if let Err(e) = consumer_guard.abandon_message(&message).await {
log::warn!("Failed to abandon non-target message: {e}");
}
}
}
log::info!(
"Message {message_id} (sequence: {sequence_number}) not found after processing {processed_count} messages (max: {max_position})"
);
Ok(None)
}
pub async fn dispose_consumer(&mut self) -> ServiceBusResult<()> {
if let Some(consumer) = self.current_consumer.take() {
log::info!("Disposing consumer for queue: {:?}", self.current_queue);
consumer.lock().await.dispose().await.map_err(|e| {
ServiceBusError::InternalError(format!("Failed to dispose consumer: {e}"))
})?;
}
self.current_queue = None;
Ok(())
}
fn get_consumer(&self) -> ServiceBusResult<Arc<Mutex<Consumer>>> {
self.current_consumer
.clone()
.ok_or(ServiceBusError::ConsumerNotFound)
}
pub fn get_raw_consumer(&self) -> Option<Arc<Mutex<Consumer>>> {
self.current_consumer.clone()
}
pub async fn reset_client(
&mut self,
new_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
) -> ServiceBusResult<()> {
log::info!("Resetting ServiceBusClient reference in ConsumerManager");
self.dispose_consumer().await?;
self.service_bus_client = new_client;
log::info!("ConsumerManager client reference updated successfully");
Ok(())
}
}