use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
pub struct ServiceBusResourceGuard<T: 'static> {
resource: Option<tokio::sync::MutexGuard<'static, T>>,
cleanup_fn: Option<Box<dyn FnOnce() + Send>>,
}
impl<T: 'static> ServiceBusResourceGuard<T> {
pub fn new(
resource: tokio::sync::MutexGuard<'static, T>,
cleanup_fn: Option<Box<dyn FnOnce() + Send>>,
) -> Self {
Self {
resource: Some(resource),
cleanup_fn,
}
}
pub fn get(&self) -> Option<&T> {
self.resource.as_deref()
}
pub fn get_mut(&mut self) -> Option<&mut T> {
self.resource.as_deref_mut()
}
}
impl<T: 'static> Drop for ServiceBusResourceGuard<T> {
fn drop(&mut self) {
self.resource.take();
if let Some(cleanup) = self.cleanup_fn.take() {
cleanup();
}
log::debug!("ServiceBusResourceGuard: Resource cleanup completed");
}
}
pub async fn acquire_lock_with_timeout<'a, T>(
mutex: &'a Arc<Mutex<T>>,
operation_name: &str,
timeout_duration: Duration,
cancel_token: Option<&CancellationToken>,
) -> Result<tokio::sync::MutexGuard<'a, T>, Box<dyn Error + Send + Sync>> {
log::debug!("Attempting to acquire lock for {operation_name}");
let lock_future = mutex.lock();
if let Some(token) = cancel_token {
tokio::select! {
guard = timeout(timeout_duration, lock_future) => {
match guard {
Ok(guard) => {
log::debug!("Successfully acquired lock for {operation_name}");
Ok(guard)
}
Err(_) => {
let error_msg = format!(
"Timeout acquiring lock for {operation_name} after {timeout_duration:?}"
);
log::error!("{error_msg}");
Err(Box::new(std::io::Error::new(std::io::ErrorKind::TimedOut, error_msg)))
}
}
}
_ = token.cancelled() => {
let error_msg = format!("Lock acquisition for {operation_name} was cancelled");
log::warn!("{error_msg}");
Err(Box::new(std::io::Error::new(std::io::ErrorKind::Interrupted, error_msg)))
}
}
} else {
match timeout(timeout_duration, lock_future).await {
Ok(guard) => {
log::debug!("Successfully acquired lock for {operation_name}");
Ok(guard)
}
Err(_) => {
let error_msg = format!(
"Timeout acquiring lock for {operation_name} after {timeout_duration:?}"
);
log::error!("{error_msg}");
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::TimedOut,
error_msg,
)))
}
}
}
}