use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time::{Instant, timeout};
use crate::core::platform::container::arsenal::ArsenalError;
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct TimeoutWrapper {
duration: Duration,
}
impl TimeoutWrapper {
pub fn new(duration: Duration) -> Self {
Self { duration }
}
pub async fn execute<F, T, E>(&self, future: F) -> Result<T, ArsenalError>
where
F: Future<Output = Result<T, E>>,
E: Into<ArsenalError>,
{
match timeout(self.duration, future).await {
Ok(result) => result.map_err(|e| e.into()),
Err(_) => Err(ArsenalError::Timeout(self.duration.as_secs())),
}
}
pub async fn execute_with_timing<F, T, E>(&self, future: F) -> (Result<T, ArsenalError>, u64)
where
F: Future<Output = Result<T, E>>,
E: Into<ArsenalError>,
{
let start = Instant::now();
let result = self.execute(future).await;
let elapsed_ms = start.elapsed().as_millis() as u64;
(result, elapsed_ms)
}
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct ConcurrencyLimiter {
semaphore: Arc<Semaphore>,
}
impl ConcurrencyLimiter {
pub fn new(max_concurrent: usize) -> Self {
Self {
semaphore: Arc::new(Semaphore::new(max_concurrent)),
}
}
pub async fn acquire(&self) -> tokio::sync::SemaphorePermit<'_> {
self.semaphore
.acquire()
.await
.expect("Semaphore should not be closed")
}
pub fn available_permits(&self) -> usize {
self.semaphore.available_permits()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::sleep;
#[tokio::test]
async fn test_timeout_within_limit() {
let wrapper = TimeoutWrapper::new(Duration::from_secs(1));
let result = wrapper
.execute(async {
sleep(Duration::from_millis(100)).await;
Ok::<_, ArsenalError>("success".to_string())
})
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "success");
}
#[tokio::test]
async fn test_timeout_exceeds_limit() {
let wrapper = TimeoutWrapper::new(Duration::from_millis(100));
let result = wrapper
.execute(async {
sleep(Duration::from_secs(1)).await;
Ok::<_, ArsenalError>("success".to_string())
})
.await;
assert!(result.is_err());
match result {
Err(ArsenalError::Timeout(seconds)) => {
assert_eq!(seconds, 0); }
_ => panic!("Expected Timeout error"),
}
}
#[tokio::test]
async fn test_execute_with_timing() {
let wrapper = TimeoutWrapper::new(Duration::from_secs(1));
let (result, elapsed_ms) = wrapper
.execute_with_timing(async {
sleep(Duration::from_millis(100)).await;
Ok::<_, ArsenalError>("success".to_string())
})
.await;
assert!(result.is_ok());
assert!(elapsed_ms >= 100, "Elapsed time: {}ms", elapsed_ms);
assert!(elapsed_ms < 500, "Elapsed time should be less than 500ms");
}
#[tokio::test]
async fn test_concurrency_limit_enforced() {
let limiter = ConcurrencyLimiter::new(2);
let permit1 = limiter.acquire().await;
let permit2 = limiter.acquire().await;
assert_eq!(limiter.available_permits(), 0);
let available = limiter.available_permits();
assert_eq!(available, 0);
drop(permit1);
assert_eq!(limiter.available_permits(), 1);
drop(permit2);
assert_eq!(limiter.available_permits(), 2);
}
#[tokio::test]
async fn test_concurrency_queuing() {
let limiter = ConcurrencyLimiter::new(1);
let permit = limiter.acquire().await;
let limiter_clone = limiter.clone();
let handle = tokio::spawn(async move {
let _p = limiter_clone.acquire().await;
"acquired".to_string()
});
sleep(Duration::from_millis(50)).await;
assert!(!handle.is_finished());
drop(permit);
let result = handle.await.unwrap();
assert_eq!(result, "acquired");
}
#[tokio::test]
async fn test_timeout_wrapper_clone() {
let wrapper1 = TimeoutWrapper::new(Duration::from_secs(30));
let wrapper2 = wrapper1.clone();
assert_eq!(wrapper1.duration, wrapper2.duration);
}
#[tokio::test]
async fn test_concurrency_limiter_clone() {
let limiter1 = ConcurrencyLimiter::new(5);
let limiter2 = limiter1.clone();
let _permit1 = limiter1.acquire().await;
assert_eq!(limiter2.available_permits(), 4);
}
}