use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::{timeout, Instant};
use crate::RunError;
#[derive(Clone)]
pub struct CancellationToken {
cancelled: Arc<RwLock<bool>>,
}
impl CancellationToken {
pub fn new() -> Self {
CancellationToken {
cancelled: Arc::new(RwLock::new(false)),
}
}
pub async fn is_cancelled(&self) -> bool {
*self.cancelled.read().await
}
pub async fn cancel(&self) {
*self.cancelled.write().await = true;
}
pub async fn reset(&self) {
*self.cancelled.write().await = false;
}
}
impl Default for CancellationToken {
fn default() -> Self {
Self::new()
}
}
pub struct ExecutionController {
cancellation_token: CancellationToken,
start_time: Instant,
timeout_duration: Option<Duration>,
}
impl ExecutionController {
pub fn new() -> Self {
ExecutionController {
cancellation_token: CancellationToken::new(),
start_time: Instant::now(),
timeout_duration: None,
}
}
pub fn with_timeout(duration: Duration) -> Self {
ExecutionController {
cancellation_token: CancellationToken::new(),
start_time: Instant::now(),
timeout_duration: Some(duration),
}
}
pub async fn is_cancelled(&self) -> bool {
self.cancellation_token.is_cancelled().await
}
pub async fn cancel(&self) {
self.cancellation_token.cancel().await;
}
pub fn is_timed_out(&self) -> bool {
if let Some(duration) = self.timeout_duration {
self.start_time.elapsed() >= duration
} else {
false
}
}
pub fn elapsed(&self) -> Duration {
self.start_time.elapsed()
}
pub fn remaining(&self) -> Option<Duration> {
self.timeout_duration.map(|d| {
let elapsed = self.start_time.elapsed();
if elapsed >= d {
Duration::ZERO
} else {
d - elapsed
}
})
}
pub async fn wait_with_timeout<F, T>(&self, future: F) -> Result<T, RunError>
where
F: std::future::Future<Output = Result<T, RunError>>,
{
if let Some(duration) = self.timeout_duration {
match timeout(duration, future).await {
Ok(result) => result,
Err(_) => Err(RunError::Timeout { after: duration }),
}
} else {
future.await
}
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
}
impl Default for ExecutionController {
fn default() -> Self {
Self::new()
}
}
pub struct CleanupManager {
cleanup_count: std::sync::atomic::AtomicUsize,
}
impl CleanupManager {
pub fn new() -> Self {
CleanupManager {
cleanup_count: std::sync::atomic::AtomicUsize::new(0),
}
}
pub fn register(&self) {
self.cleanup_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
pub async fn cleanup(&self) -> usize {
self.cleanup_count
.swap(0, std::sync::atomic::Ordering::SeqCst)
}
pub fn pending(&self) -> usize {
self.cleanup_count.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl Default for CleanupManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_cancellation_token() {
let token = CancellationToken::new();
assert!(!token.is_cancelled().await);
token.cancel().await;
assert!(token.is_cancelled().await);
token.reset().await;
assert!(!token.is_cancelled().await);
}
#[tokio::test]
async fn test_execution_controller() {
let controller = ExecutionController::new();
assert!(!controller.is_timed_out());
assert!(controller.remaining().is_none());
}
#[tokio::test]
async fn test_execution_controller_with_timeout() {
let controller = ExecutionController::with_timeout(Duration::from_millis(100));
assert!(!controller.is_timed_out());
tokio::time::sleep(Duration::from_millis(150)).await;
assert!(controller.is_timed_out());
assert_eq!(controller.remaining(), Some(Duration::ZERO));
}
#[tokio::test]
async fn test_execution_controller_cancel() {
let controller = ExecutionController::new();
assert!(!controller.is_cancelled().await);
controller.cancel().await;
assert!(controller.is_cancelled().await);
}
#[tokio::test]
async fn test_wait_with_timeout_success() {
let controller = ExecutionController::with_timeout(Duration::from_secs(10));
let future = async { Ok::<_, RunError>(42) };
let result = controller.wait_with_timeout(future).await;
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_wait_with_timeout_exceeded() {
let controller = ExecutionController::with_timeout(Duration::from_millis(10));
let future = async {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok::<_, RunError>(42)
};
let result = controller.wait_with_timeout(future).await;
assert!(matches!(result, Err(RunError::Timeout { .. })));
}
#[test]
fn test_elapsed_time() {
let controller = ExecutionController::new();
let before = controller.elapsed();
std::thread::sleep(Duration::from_millis(10));
let after = controller.elapsed();
assert!(after > before);
}
#[test]
fn test_cleanup_manager() {
let manager = CleanupManager::new();
assert_eq!(manager.pending(), 0);
manager.register();
manager.register();
assert_eq!(manager.pending(), 2);
assert_eq!(manager.pending(), 2);
}
}