Skip to main content

alun_plugin/
async_task.rs

1//! 异步任务插件:后台任务队列
2
3use async_trait::async_trait;
4use alun_core::{Plugin, Result};
5use std::sync::Arc;
6use tokio::sync::Semaphore;
7
8/// 异步任务插件:基于 Semaphore 的并发控制后台任务队列
9///
10/// `stop()` 时会等待所有正在执行的任务完成后才返回。
11pub struct AsyncTaskPlugin {
12    /// 并发工作线程数
13    workers: usize,
14    /// Semaphore 用于控制并发数
15    semaphore: Arc<Semaphore>,
16    /// 运行状态标志
17    running: Arc<parking_lot::Mutex<bool>>,
18}
19
20impl AsyncTaskPlugin {
21    /// 创建异步任务插件
22    ///
23    /// `workers` = 0 时自动设为 4(CPU 核数的合理默认值)。
24    pub fn new(workers: usize) -> Self {
25        let w = if workers == 0 { 4 } else { workers };
26        Self {
27            workers: w,
28            semaphore: Arc::new(Semaphore::new(w)),
29            running: Arc::new(parking_lot::Mutex::new(false)),
30        }
31    }
32
33    /// 提交异步任务
34    pub async fn submit<F>(&self, task: F)
35    where
36        F: std::future::Future<Output = ()> + Send + 'static,
37    {
38        let sem = self.semaphore.clone();
39        tokio::spawn(async move {
40            let _permit = sem.acquire().await.expect("AsyncTask semaphore 异常");
41            task.await;
42        });
43    }
44
45    /// 工作线程数
46    pub fn worker_count(&self) -> usize { self.workers }
47}
48
49#[async_trait]
50impl Plugin for AsyncTaskPlugin {
51    fn name(&self) -> &str { "async-task" }
52
53    async fn start(&self) -> Result<()> {
54        *self.running.lock() = true;
55        tracing::info!("异步任务插件: workers={} 已就绪", self.workers);
56        Ok(())
57    }
58
59    async fn stop(&self) -> Result<()> {
60        *self.running.lock() = false;
61        tracing::info!("异步任务插件: 等待任务完成...");
62        // 等待所有 semaphore permit 回收
63        let _permits = self.semaphore.acquire_many(self.workers as u32).await;
64        Ok(())
65    }
66}