use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use crate::error::ExecutorError;
pub struct SlotToken {
permit: Option<OwnedSemaphorePermit>,
semaphore: Arc<Semaphore>,
}
impl SlotToken {
pub(crate) fn new(permit: OwnedSemaphorePermit, semaphore: Arc<Semaphore>) -> Self {
Self {
permit: Some(permit),
semaphore,
}
}
pub fn release(&mut self) -> bool {
self.permit.take().is_some()
}
pub async fn reclaim(&mut self) -> Result<(), ExecutorError> {
if self.permit.is_some() {
return Ok(());
}
let permit = self.semaphore.clone().acquire_owned().await.map_err(|_| {
ExecutorError::TaskExecution(crate::TaskError::ExecutionFailed {
message: "semaphore closed during slot reclaim".into(),
task_id: String::new(),
timestamp: chrono::Utc::now(),
})
})?;
self.permit = Some(permit);
Ok(())
}
pub fn is_held(&self) -> bool {
self.permit.is_some()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_slot_token_release_frees_permit() {
let semaphore = Arc::new(Semaphore::new(1));
let permit = semaphore.clone().acquire_owned().await.unwrap();
let mut token = SlotToken::new(permit, semaphore.clone());
assert!(token.is_held());
assert_eq!(semaphore.available_permits(), 0);
assert!(token.release());
assert!(!token.is_held());
assert_eq!(semaphore.available_permits(), 1);
assert!(!token.release());
}
#[tokio::test]
async fn test_slot_token_reclaim_reacquires_permit() {
let semaphore = Arc::new(Semaphore::new(1));
let permit = semaphore.clone().acquire_owned().await.unwrap();
let mut token = SlotToken::new(permit, semaphore.clone());
token.release();
assert_eq!(semaphore.available_permits(), 1);
token.reclaim().await.unwrap();
assert!(token.is_held());
assert_eq!(semaphore.available_permits(), 0);
}
#[tokio::test]
async fn test_slot_token_reclaim_when_already_held_is_noop() {
let semaphore = Arc::new(Semaphore::new(1));
let permit = semaphore.clone().acquire_owned().await.unwrap();
let mut token = SlotToken::new(permit, semaphore.clone());
token.reclaim().await.unwrap();
assert!(token.is_held());
assert_eq!(semaphore.available_permits(), 0);
}
#[tokio::test]
async fn test_slot_token_drop_releases_permit() {
let semaphore = Arc::new(Semaphore::new(1));
let permit = semaphore.clone().acquire_owned().await.unwrap();
{
let _token = SlotToken::new(permit, semaphore.clone());
assert_eq!(semaphore.available_permits(), 0);
}
assert_eq!(semaphore.available_permits(), 1);
}
#[tokio::test]
async fn test_slot_token_reclaim_waits_for_availability() {
let semaphore = Arc::new(Semaphore::new(1));
let permit = semaphore.clone().acquire_owned().await.unwrap();
let mut token = SlotToken::new(permit, semaphore.clone());
token.release();
let _other_permit = semaphore.clone().acquire_owned().await.unwrap();
assert_eq!(semaphore.available_permits(), 0);
let sem_clone = semaphore.clone();
let handle = tokio::spawn(async move {
token.reclaim().await.unwrap();
assert!(token.is_held());
token
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert_eq!(sem_clone.available_permits(), 0);
drop(_other_permit);
let token = handle.await.unwrap();
assert!(token.is_held());
assert_eq!(sem_clone.available_permits(), 0);
}
}