server/service_bus_manager/
consumer_manager.rs

1use super::errors::{ServiceBusError, ServiceBusResult};
2use super::types::QueueInfo;
3use crate::bulk_operations::types::BatchConfig;
4use crate::consumer::{Consumer, ServiceBusClientExt};
5use crate::model::MessageModel;
6use azservicebus::{ServiceBusClient, ServiceBusReceiverOptions, core::BasicRetryPolicy};
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::Mutex;
10
11pub struct ConsumerManager {
12    current_consumer: Option<Arc<Mutex<Consumer>>>,
13    current_queue: Option<QueueInfo>,
14    service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
15    batch_config: BatchConfig,
16}
17
18impl ConsumerManager {
19    pub fn new(
20        service_bus_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
21        batch_config: BatchConfig,
22    ) -> Self {
23        Self {
24            current_consumer: None,
25            current_queue: None,
26            service_bus_client,
27            batch_config,
28        }
29    }
30
31    /// Switch to a different queue, disposing current consumer if needed
32    pub async fn switch_queue(&mut self, queue_info: QueueInfo) -> ServiceBusResult<()> {
33        log::info!(
34            "Switching to queue: {} (type: {:?})",
35            queue_info.name,
36            queue_info.queue_type
37        );
38
39        // Check if we're already connected to this queue
40        if let Some(current_queue) = &self.current_queue {
41            if current_queue.name == queue_info.name
42                && current_queue.queue_type == queue_info.queue_type
43            {
44                log::debug!("Already connected to queue: {}", queue_info.name);
45                return Ok(());
46            }
47        }
48
49        // Dispose current consumer if exists
50        if let Some(consumer) = &self.current_consumer {
51            log::debug!("Disposing existing consumer");
52            if let Err(e) = consumer.lock().await.dispose().await {
53                log::error!("Failed to dispose existing consumer: {e}");
54                // Continue anyway - we'll create a new one
55            }
56        }
57
58        // Create new consumer
59        log::debug!("Creating new consumer for queue: {}", queue_info.name);
60        let mut client = self.service_bus_client.lock().await;
61        let consumer = client
62            .create_consumer_for_queue(
63                queue_info.name.clone(),
64                ServiceBusReceiverOptions::default(),
65            )
66            .await
67            .map_err(|e| {
68                ServiceBusError::ConsumerCreationFailed(format!(
69                    "Failed to create consumer for queue {}: {}",
70                    queue_info.name, e
71                ))
72            })?;
73
74        // Update state
75        self.current_consumer = Some(Arc::new(Mutex::new(consumer)));
76        self.current_queue = Some(queue_info);
77
78        if let Some(queue) = self.current_queue.as_ref() {
79            log::info!("Successfully switched to queue: {}", queue.name);
80        }
81        Ok(())
82    }
83
84    /// Get current queue information
85    pub fn current_queue(&self) -> Option<&QueueInfo> {
86        self.current_queue.as_ref()
87    }
88
89    /// Check if consumer is available and ready
90    pub fn is_consumer_ready(&self) -> bool {
91        self.current_consumer.is_some() && self.current_queue.is_some()
92    }
93
94    /// Peek messages from the current queue
95    pub async fn peek_messages(
96        &self,
97        max_count: u32,
98        from_sequence: Option<i64>,
99    ) -> ServiceBusResult<Vec<MessageModel>> {
100        let consumer = self.get_consumer()?;
101        let mut consumer_guard = consumer.lock().await;
102
103        consumer_guard
104            .peek_messages(max_count, from_sequence)
105            .await
106            .map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))
107    }
108
109    /// Receive messages from the current queue (for processing)
110    pub async fn receive_messages(
111        &self,
112        max_count: u32,
113    ) -> ServiceBusResult<Vec<azservicebus::ServiceBusReceivedMessage>> {
114        let consumer = self.get_consumer()?;
115        let mut consumer_guard = consumer.lock().await;
116
117        // Use timeout-based receive for consistency and to prevent indefinite blocking
118        let timeout = Duration::from_secs(self.batch_config.receive_timeout_secs());
119        consumer_guard
120            .receive_messages_with_timeout(max_count, timeout)
121            .await
122            .map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))
123    }
124
125    /// Complete a single message
126    pub async fn complete_message(
127        &self,
128        message: &azservicebus::ServiceBusReceivedMessage,
129    ) -> ServiceBusResult<()> {
130        let consumer = self.get_consumer()?;
131        let mut consumer_guard = consumer.lock().await;
132
133        consumer_guard
134            .complete_message(message)
135            .await
136            .map_err(|e| ServiceBusError::MessageCompleteFailed(e.to_string()))
137    }
138
139    /// Complete multiple messages
140    pub async fn complete_messages(
141        &self,
142        messages: &[azservicebus::ServiceBusReceivedMessage],
143    ) -> ServiceBusResult<()> {
144        let consumer = self.get_consumer()?;
145        let mut consumer_guard = consumer.lock().await;
146
147        consumer_guard
148            .complete_messages(messages)
149            .await
150            .map_err(|e| ServiceBusError::MessageCompleteFailed(e.to_string()))
151    }
152
153    /// Abandon a single message
154    pub async fn abandon_message(
155        &self,
156        message: &azservicebus::ServiceBusReceivedMessage,
157    ) -> ServiceBusResult<()> {
158        let consumer = self.get_consumer()?;
159        let mut consumer_guard = consumer.lock().await;
160
161        consumer_guard
162            .abandon_message(message)
163            .await
164            .map_err(|e| ServiceBusError::MessageAbandonFailed(e.to_string()))
165    }
166
167    /// Abandon multiple messages
168    pub async fn abandon_messages(
169        &self,
170        messages: &[azservicebus::ServiceBusReceivedMessage],
171    ) -> ServiceBusResult<()> {
172        let consumer = self.get_consumer()?;
173        let mut consumer_guard = consumer.lock().await;
174
175        consumer_guard
176            .abandon_messages(messages)
177            .await
178            .map_err(|e| ServiceBusError::MessageAbandonFailed(e.to_string()))
179    }
180
181    /// Dead letter a single message
182    pub async fn dead_letter_message(
183        &self,
184        message: &azservicebus::ServiceBusReceivedMessage,
185        reason: Option<String>,
186        error_description: Option<String>,
187    ) -> ServiceBusResult<()> {
188        let consumer = self.get_consumer()?;
189        let mut consumer_guard = consumer.lock().await;
190
191        consumer_guard
192            .dead_letter_message(message, reason, error_description)
193            .await
194            .map_err(|e| ServiceBusError::MessageDeadLetterFailed(e.to_string()))
195    }
196
197    /// Find a specific message by ID and sequence number (used for targeted operations)
198    pub async fn find_message(
199        &self,
200        message_id: &str,
201        sequence_number: i64,
202        max_position: Option<usize>,
203    ) -> ServiceBusResult<Option<azservicebus::ServiceBusReceivedMessage>> {
204        let consumer = self.get_consumer()?;
205        let batch_size = self.batch_config.bulk_chunk_size() as u32;
206        let timeout = Duration::from_secs(self.batch_config.bulk_processing_time_secs());
207        let max_position = max_position.unwrap_or(self.batch_config.max_messages_to_process());
208
209        log::info!(
210            "Searching for message {message_id} (sequence: {sequence_number}) in batches of {batch_size} up to position {max_position}"
211        );
212
213        let mut processed_count = 0;
214
215        while processed_count < max_position {
216            log::debug!(
217                "Search batch: fetching {batch_size} messages (processed: {processed_count}/{max_position})"
218            );
219
220            let messages = {
221                let mut consumer_guard = consumer.lock().await;
222                consumer_guard
223                    .receive_messages_with_timeout(batch_size, timeout)
224                    .await
225                    .map_err(|e| ServiceBusError::MessageReceiveFailed(e.to_string()))?
226            };
227
228            if messages.is_empty() {
229                log::debug!("No more messages available - stopping search");
230                break;
231            }
232
233            processed_count += messages.len();
234
235            // Look for the target message in this batch
236            for message in messages.into_iter() {
237                let msg_id = message.message_id().unwrap_or_default();
238                let msg_seq = message.sequence_number();
239
240                if msg_id == message_id && msg_seq == sequence_number {
241                    log::info!(
242                        "Found target message {message_id} (sequence: {sequence_number}) after processing {processed_count} messages"
243                    );
244                    return Ok(Some(message));
245                }
246
247                // Abandon non-target messages to keep the queue flowing
248                let mut consumer_guard = consumer.lock().await;
249                if let Err(e) = consumer_guard.abandon_message(&message).await {
250                    log::warn!("Failed to abandon non-target message: {e}");
251                }
252            }
253        }
254
255        log::info!(
256            "Message {message_id} (sequence: {sequence_number}) not found after processing {processed_count} messages (max: {max_position})"
257        );
258        Ok(None)
259    }
260
261    /// Dispose current consumer
262    pub async fn dispose_consumer(&mut self) -> ServiceBusResult<()> {
263        if let Some(consumer) = self.current_consumer.take() {
264            log::info!("Disposing consumer for queue: {:?}", self.current_queue);
265            consumer.lock().await.dispose().await.map_err(|e| {
266                ServiceBusError::InternalError(format!("Failed to dispose consumer: {e}"))
267            })?;
268        }
269        self.current_queue = None;
270        Ok(())
271    }
272
273    /// Get the current consumer, returning an error if not available
274    fn get_consumer(&self) -> ServiceBusResult<Arc<Mutex<Consumer>>> {
275        self.current_consumer
276            .clone()
277            .ok_or(ServiceBusError::ConsumerNotFound)
278    }
279
280    /// Get raw consumer for advanced operations (used by bulk operations)
281    pub fn get_raw_consumer(&self) -> Option<Arc<Mutex<Consumer>>> {
282        self.current_consumer.clone()
283    }
284
285    /// Reset the ServiceBusClient reference after connection reset
286    pub async fn reset_client(
287        &mut self,
288        new_client: Arc<Mutex<ServiceBusClient<BasicRetryPolicy>>>,
289    ) -> ServiceBusResult<()> {
290        log::info!("Resetting ServiceBusClient reference in ConsumerManager");
291
292        // Dispose existing consumer if any
293        self.dispose_consumer().await?;
294
295        // Update the client reference
296        self.service_bus_client = new_client;
297
298        log::info!("ConsumerManager client reference updated successfully");
299        Ok(())
300    }
301}