server/bulk_operations/
resource_guard.rs1use std::error::Error;
2use std::sync::Arc;
3use std::time::Duration;
4use tokio::sync::Mutex;
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7
8pub struct ServiceBusResourceGuard<T: 'static> {
10 resource: Option<tokio::sync::MutexGuard<'static, T>>,
11 cleanup_fn: Option<Box<dyn FnOnce() + Send>>,
12}
13
14impl<T: 'static> ServiceBusResourceGuard<T> {
15 pub fn new(
17 resource: tokio::sync::MutexGuard<'static, T>,
18 cleanup_fn: Option<Box<dyn FnOnce() + Send>>,
19 ) -> Self {
20 Self {
21 resource: Some(resource),
22 cleanup_fn,
23 }
24 }
25
26 pub fn get(&self) -> Option<&T> {
28 self.resource.as_deref()
29 }
30
31 pub fn get_mut(&mut self) -> Option<&mut T> {
33 self.resource.as_deref_mut()
34 }
35}
36
37impl<T: 'static> Drop for ServiceBusResourceGuard<T> {
38 fn drop(&mut self) {
39 self.resource.take();
41
42 if let Some(cleanup) = self.cleanup_fn.take() {
44 cleanup();
45 }
46
47 log::debug!("ServiceBusResourceGuard: Resource cleanup completed");
48 }
49}
50
51pub async fn acquire_lock_with_timeout<'a, T>(
53 mutex: &'a Arc<Mutex<T>>,
54 operation_name: &str,
55 timeout_duration: Duration,
56 cancel_token: Option<&CancellationToken>,
57) -> Result<tokio::sync::MutexGuard<'a, T>, Box<dyn Error + Send + Sync>> {
58 log::debug!("Attempting to acquire lock for {operation_name}");
59
60 let lock_future = mutex.lock();
61
62 if let Some(token) = cancel_token {
64 tokio::select! {
65 guard = timeout(timeout_duration, lock_future) => {
66 match guard {
67 Ok(guard) => {
68 log::debug!("Successfully acquired lock for {operation_name}");
69 Ok(guard)
70 }
71 Err(_) => {
72 let error_msg = format!(
73 "Timeout acquiring lock for {operation_name} after {timeout_duration:?}"
74 );
75 log::error!("{error_msg}");
76 Err(Box::new(std::io::Error::new(std::io::ErrorKind::TimedOut, error_msg)))
77 }
78 }
79 }
80 _ = token.cancelled() => {
81 let error_msg = format!("Lock acquisition for {operation_name} was cancelled");
82 log::warn!("{error_msg}");
83 Err(Box::new(std::io::Error::new(std::io::ErrorKind::Interrupted, error_msg)))
84 }
85 }
86 } else {
87 match timeout(timeout_duration, lock_future).await {
89 Ok(guard) => {
90 log::debug!("Successfully acquired lock for {operation_name}");
91 Ok(guard)
92 }
93 Err(_) => {
94 let error_msg = format!(
95 "Timeout acquiring lock for {operation_name} after {timeout_duration:?}"
96 );
97 log::error!("{error_msg}");
98 Err(Box::new(std::io::Error::new(
99 std::io::ErrorKind::TimedOut,
100 error_msg,
101 )))
102 }
103 }
104 }
105}