Skip to main content

alun_task/
plugin.rs

1//! TaskPlugin —— 任务插件入口
2//!
3//! 实现 `alun_core::Plugin` trait,通过 `PluginManager` 管理生命周期。
4//! 不持有任何数据库连接——持久化通过 `TaskStorage` trait 委托给业务方。
5
6use std::sync::Arc;
7use async_trait::async_trait;
8use alun_core::{Plugin, Result as CoreResult};
9use tokio::task::JoinHandle;
10use tracing::{info, error};
11
12use crate::storage::TaskStorage;
13use crate::HandlerRegistry;
14use crate::TaskWorker;
15use crate::RetryScanner;
16use crate::TaskMetrics;
17use crate::TaskWorkerConfig;
18
19/// 任务插件
20///
21/// 管理 TaskWorker 和 RetryScanner 的生命周期。
22/// 持久化通过 `TaskStorage` trait 委托给业务方。
23///
24/// 实现 `alun_core::Plugin`,可通过 `PluginManager` 或 `App::plugin()` 统一管理。
25///
26/// # 使用示例
27///
28/// ```ignore
29/// // 配置从 config.toml 的 [task] section 读取
30/// let task_cfg: TaskWorkerConfig = app.config().get_section("task")?;
31/// let storage = Arc::new(DbTaskStorage::new());
32/// let task_plugin = TaskPlugin::new(task_cfg, storage, registry)?;
33/// app.plugin(task_plugin).scan().start().await
34/// ```
35pub struct TaskPlugin {
36    /// TaskWorker 运行时配置
37    config: TaskWorkerConfig,
38    /// 任务持久化接口(由业务方实现)
39    storage: Arc<dyn TaskStorage>,
40    /// 处理器注册中心
41    registry: HandlerRegistry,
42    /// tokio 任务句柄(用于停止时等待任务完成)
43    handles: parking_lot::Mutex<Vec<JoinHandle<()>>>,
44    /// 运行状态标志
45    running: Arc<std::sync::atomic::AtomicBool>,
46    /// 任务执行指标
47    metrics: Arc<TaskMetrics>,
48    /// 已注册的 Kafka topic 列表
49    topics: Vec<String>,
50}
51
52impl TaskPlugin {
53    /// 创建任务插件
54    ///
55    /// - `config`: TaskWorker 运行时配置(建议从 `[task]` section 读取,支持 Deserialize)
56    /// - `storage`: 由业务方实现的持久化接口
57    /// - `registry`: 已注册 handler 的注册中心
58    pub fn new(
59        config: TaskWorkerConfig,
60        storage: Arc<dyn TaskStorage>,
61        registry: HandlerRegistry,
62    ) -> Result<Self, String> {
63        let topics: Vec<String> = registry
64            .task_types()
65            .iter()
66            .filter_map(|tt| registry.get_config(*tt).map(|c| c.topic.clone()))
67            .collect();
68
69        let metrics = Arc::new(TaskMetrics::new());
70
71        Ok(Self {
72            config,
73            storage,
74            registry,
75            handles: parking_lot::Mutex::new(Vec::new()),
76            running: Arc::new(std::sync::atomic::AtomicBool::new(true)),
77            metrics,
78            topics,
79        })
80    }
81
82    /// 返回任务指标(供外部查询)
83    pub fn metrics(&self) -> Arc<TaskMetrics> {
84        Arc::clone(&self.metrics)
85    }
86
87    /// 返回已注册的 topic 列表
88    pub fn topics(&self) -> &[String] {
89        &self.topics
90    }
91
92    fn signal_stop(&self) {
93        self.running.store(false, std::sync::atomic::Ordering::Relaxed);
94    }
95
96    async fn wait_for_tasks(&self) {
97        let handles: Vec<JoinHandle<()>> = {
98            let mut guard = self.handles.lock();
99            std::mem::take(&mut *guard)
100        };
101        for handle in handles {
102            let _ = handle.await;
103        }
104    }
105}
106
107#[async_trait]
108impl Plugin for TaskPlugin {
109    fn name(&self) -> &str {
110        "task"
111    }
112
113    fn depends_on(&self) -> &[&str] {
114        &[]
115    }
116
117    /// 启动插件:在后台 tokio task 中启动 Worker 和 RetryScanner
118    async fn start(&self) -> CoreResult<()> {
119        self.running.store(true, std::sync::atomic::Ordering::Relaxed);
120
121        let topics = self.topics.clone();
122        let running = Arc::clone(&self.running);
123        let metrics = Arc::clone(&self.metrics);
124
125        let worker = {
126            let w_config = self.config.clone();
127            let w_storage = Arc::clone(&self.storage);
128            let w_registry = self.registry.clone();
129            let w_metrics = Arc::clone(&metrics);
130            let w_running = Arc::clone(&running);
131            let w_topics = topics.clone();
132
133            tokio::spawn(async move {
134                let w_config_clone = w_config.clone();
135                let w_storage_clone = Arc::clone(&w_storage);
136                let w_registry_clone = w_registry.clone();
137                let worker = match tokio::task::spawn_blocking(move || {
138                    TaskWorker::new(w_config_clone, w_storage_clone, w_registry_clone, w_metrics)
139                }).await
140                {
141                    Ok(Ok(w)) => Arc::new(w),
142                    Ok(Err(e)) => {
143                        error!("TaskWorker 创建失败: {}", e);
144                        return;
145                    }
146                    Err(e) => {
147                        error!("TaskWorker 创建失败 (join error): {}", e);
148                        return;
149                    }
150                };
151
152                let worker_ref = Arc::clone(&worker);
153                let run_fut = worker_ref.run(&w_topics);
154
155                tokio::select! {
156                    result = run_fut => {
157                        if let Err(e) = result {
158                            error!("TaskWorker 运行异常: {}", e);
159                        }
160                    }
161                    _ = async {
162                        while w_running.load(std::sync::atomic::Ordering::Relaxed) {
163                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
164                        }
165                    } => {
166                        worker_ref.stop();
167                    }
168                }
169            })
170        };
171
172        let scanner = {
173            let s_brokers = self.config.brokers.clone();
174            let s_storage = Arc::clone(&self.storage);
175            let s_registry = self.registry.clone();
176            let s_interval = self.config.scan_interval_secs;
177            let s_batch = self.config.max_batch_size;
178            let s_running = Arc::clone(&running);
179
180            tokio::spawn(async move {
181                let scanner = match RetryScanner::new(
182                    &s_brokers, s_storage, s_registry, s_interval, s_batch,
183                ) {
184                    Ok(s) => Arc::new(s),
185                    Err(e) => {
186                        error!("RetryScanner 创建失败: {}", e);
187                        return;
188                    }
189                };
190
191                let scanner_ref = Arc::clone(&scanner);
192
193                tokio::select! {
194                    _ = scanner_ref.run() => {}
195                    _ = async {
196                        while s_running.load(std::sync::atomic::Ordering::Relaxed) {
197                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
198                        }
199                    } => {
200                        scanner_ref.stop();
201                    }
202                }
203            })
204        };
205
206        self.handles.lock().push(worker);
207        self.handles.lock().push(scanner);
208
209        info!(
210            "TaskPlugin 启动: topics={:?}, brokers={}, group={}",
211            self.topics, self.config.brokers, self.config.group_id
212        );
213        Ok(())
214    }
215
216    /// 停止插件:发送停止信号并等待后台任务完成
217    async fn stop(&self) -> CoreResult<()> {
218        info!("TaskPlugin 收到停止信号");
219        self.signal_stop();
220        self.wait_for_tasks().await;
221        info!("TaskPlugin 已停止");
222        Ok(())
223    }
224}