server/bulk_operations/
resource_guard.rs

1use std::error::Error;
2use std::sync::Arc;
3use std::time::Duration;
4use tokio::sync::Mutex;
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7
8/// RAII guard for managing service bus resources with automatic cleanup
9pub struct ServiceBusResourceGuard<T: 'static> {
10    resource: Option<tokio::sync::MutexGuard<'static, T>>,
11    cleanup_fn: Option<Box<dyn FnOnce() + Send>>,
12}
13
14impl<T: 'static> ServiceBusResourceGuard<T> {
15    /// Create a new resource guard with optional cleanup function
16    pub fn new(
17        resource: tokio::sync::MutexGuard<'static, T>,
18        cleanup_fn: Option<Box<dyn FnOnce() + Send>>,
19    ) -> Self {
20        Self {
21            resource: Some(resource),
22            cleanup_fn,
23        }
24    }
25
26    /// Get a reference to the guarded resource
27    pub fn get(&self) -> Option<&T> {
28        self.resource.as_deref()
29    }
30
31    /// Get a mutable reference to the guarded resource
32    pub fn get_mut(&mut self) -> Option<&mut T> {
33        self.resource.as_deref_mut()
34    }
35}
36
37impl<T: 'static> Drop for ServiceBusResourceGuard<T> {
38    fn drop(&mut self) {
39        // Drop the resource first
40        self.resource.take();
41
42        // Then run cleanup if provided
43        if let Some(cleanup) = self.cleanup_fn.take() {
44            cleanup();
45        }
46
47        log::debug!("ServiceBusResourceGuard: Resource cleanup completed");
48    }
49}
50
51/// Safe lock acquisition with timeout
52pub async fn acquire_lock_with_timeout<'a, T>(
53    mutex: &'a Arc<Mutex<T>>,
54    operation_name: &str,
55    timeout_duration: Duration,
56    cancel_token: Option<&CancellationToken>,
57) -> Result<tokio::sync::MutexGuard<'a, T>, Box<dyn Error + Send + Sync>> {
58    log::debug!("Attempting to acquire lock for {operation_name}");
59
60    let lock_future = mutex.lock();
61
62    // Handle cancellation if token is provided
63    if let Some(token) = cancel_token {
64        tokio::select! {
65            guard = timeout(timeout_duration, lock_future) => {
66                match guard {
67                    Ok(guard) => {
68                        log::debug!("Successfully acquired lock for {operation_name}");
69                        Ok(guard)
70                    }
71                    Err(_) => {
72                        let error_msg = format!(
73                            "Timeout acquiring lock for {operation_name} after {timeout_duration:?}"
74                        );
75                        log::error!("{error_msg}");
76                        Err(Box::new(std::io::Error::new(std::io::ErrorKind::TimedOut, error_msg)))
77                    }
78                }
79            }
80            _ = token.cancelled() => {
81                let error_msg = format!("Lock acquisition for {operation_name} was cancelled");
82                log::warn!("{error_msg}");
83                Err(Box::new(std::io::Error::new(std::io::ErrorKind::Interrupted, error_msg)))
84            }
85        }
86    } else {
87        // No cancellation token, just use timeout
88        match timeout(timeout_duration, lock_future).await {
89            Ok(guard) => {
90                log::debug!("Successfully acquired lock for {operation_name}");
91                Ok(guard)
92            }
93            Err(_) => {
94                let error_msg = format!(
95                    "Timeout acquiring lock for {operation_name} after {timeout_duration:?}"
96                );
97                log::error!("{error_msg}");
98                Err(Box::new(std::io::Error::new(
99                    std::io::ErrorKind::TimedOut,
100                    error_msg,
101                )))
102            }
103        }
104    }
105}