1use std::sync::Arc;
7use async_trait::async_trait;
8use alun_core::{Plugin, Result as CoreResult};
9use tokio::task::JoinHandle;
10use tracing::{info, error};
11
12use crate::storage::TaskStorage;
13use crate::HandlerRegistry;
14use crate::TaskWorker;
15use crate::RetryScanner;
16use crate::TaskMetrics;
17use crate::TaskWorkerConfig;
18
19pub struct TaskPlugin {
36 config: TaskWorkerConfig,
38 storage: Arc<dyn TaskStorage>,
40 registry: HandlerRegistry,
42 handles: parking_lot::Mutex<Vec<JoinHandle<()>>>,
44 running: Arc<std::sync::atomic::AtomicBool>,
46 metrics: Arc<TaskMetrics>,
48 topics: Vec<String>,
50}
51
52impl TaskPlugin {
53 pub fn new(
59 config: TaskWorkerConfig,
60 storage: Arc<dyn TaskStorage>,
61 registry: HandlerRegistry,
62 ) -> Result<Self, String> {
63 let topics: Vec<String> = registry
64 .task_types()
65 .iter()
66 .filter_map(|tt| registry.get_config(*tt).map(|c| c.topic.clone()))
67 .collect();
68
69 let metrics = Arc::new(TaskMetrics::new());
70
71 Ok(Self {
72 config,
73 storage,
74 registry,
75 handles: parking_lot::Mutex::new(Vec::new()),
76 running: Arc::new(std::sync::atomic::AtomicBool::new(true)),
77 metrics,
78 topics,
79 })
80 }
81
82 pub fn metrics(&self) -> Arc<TaskMetrics> {
84 Arc::clone(&self.metrics)
85 }
86
87 pub fn topics(&self) -> &[String] {
89 &self.topics
90 }
91
92 fn signal_stop(&self) {
93 self.running.store(false, std::sync::atomic::Ordering::Relaxed);
94 }
95
96 async fn wait_for_tasks(&self) {
97 let handles: Vec<JoinHandle<()>> = {
98 let mut guard = self.handles.lock();
99 std::mem::take(&mut *guard)
100 };
101 for handle in handles {
102 let _ = handle.await;
103 }
104 }
105}
106
107#[async_trait]
108impl Plugin for TaskPlugin {
109 fn name(&self) -> &str {
110 "task"
111 }
112
113 fn depends_on(&self) -> &[&str] {
114 &[]
115 }
116
117 async fn start(&self) -> CoreResult<()> {
119 self.running.store(true, std::sync::atomic::Ordering::Relaxed);
120
121 let topics = self.topics.clone();
122 let running = Arc::clone(&self.running);
123 let metrics = Arc::clone(&self.metrics);
124
125 let worker = {
126 let w_config = self.config.clone();
127 let w_storage = Arc::clone(&self.storage);
128 let w_registry = self.registry.clone();
129 let w_metrics = Arc::clone(&metrics);
130 let w_running = Arc::clone(&running);
131 let w_topics = topics.clone();
132
133 tokio::spawn(async move {
134 let w_config_clone = w_config.clone();
135 let w_storage_clone = Arc::clone(&w_storage);
136 let w_registry_clone = w_registry.clone();
137 let worker = match tokio::task::spawn_blocking(move || {
138 TaskWorker::new(w_config_clone, w_storage_clone, w_registry_clone, w_metrics)
139 }).await
140 {
141 Ok(Ok(w)) => Arc::new(w),
142 Ok(Err(e)) => {
143 error!("TaskWorker 创建失败: {}", e);
144 return;
145 }
146 Err(e) => {
147 error!("TaskWorker 创建失败 (join error): {}", e);
148 return;
149 }
150 };
151
152 let worker_ref = Arc::clone(&worker);
153 let run_fut = worker_ref.run(&w_topics);
154
155 tokio::select! {
156 result = run_fut => {
157 if let Err(e) = result {
158 error!("TaskWorker 运行异常: {}", e);
159 }
160 }
161 _ = async {
162 while w_running.load(std::sync::atomic::Ordering::Relaxed) {
163 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
164 }
165 } => {
166 worker_ref.stop();
167 }
168 }
169 })
170 };
171
172 let scanner = {
173 let s_brokers = self.config.brokers.clone();
174 let s_storage = Arc::clone(&self.storage);
175 let s_registry = self.registry.clone();
176 let s_interval = self.config.scan_interval_secs;
177 let s_batch = self.config.max_batch_size;
178 let s_running = Arc::clone(&running);
179
180 tokio::spawn(async move {
181 let scanner = match RetryScanner::new(
182 &s_brokers, s_storage, s_registry, s_interval, s_batch,
183 ) {
184 Ok(s) => Arc::new(s),
185 Err(e) => {
186 error!("RetryScanner 创建失败: {}", e);
187 return;
188 }
189 };
190
191 let scanner_ref = Arc::clone(&scanner);
192
193 tokio::select! {
194 _ = scanner_ref.run() => {}
195 _ = async {
196 while s_running.load(std::sync::atomic::Ordering::Relaxed) {
197 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
198 }
199 } => {
200 scanner_ref.stop();
201 }
202 }
203 })
204 };
205
206 self.handles.lock().push(worker);
207 self.handles.lock().push(scanner);
208
209 info!(
210 "TaskPlugin 启动: topics={:?}, brokers={}, group={}",
211 self.topics, self.config.brokers, self.config.group_id
212 );
213 Ok(())
214 }
215
216 async fn stop(&self) -> CoreResult<()> {
218 info!("TaskPlugin 收到停止信号");
219 self.signal_stop();
220 self.wait_for_tasks().await;
221 info!("TaskPlugin 已停止");
222 Ok(())
223 }
224}