use std::sync::Arc;
use async_trait::async_trait;
use alun_core::{Plugin, Result as CoreResult};
use tokio::task::JoinHandle;
use tracing::{info, error};
use crate::storage::TaskStorage;
use crate::HandlerRegistry;
use crate::TaskWorker;
use crate::RetryScanner;
use crate::TaskMetrics;
use crate::TaskWorkerConfig;
pub struct TaskPlugin {
config: TaskWorkerConfig,
storage: Arc<dyn TaskStorage>,
registry: HandlerRegistry,
handles: parking_lot::Mutex<Vec<JoinHandle<()>>>,
running: Arc<std::sync::atomic::AtomicBool>,
metrics: Arc<TaskMetrics>,
topics: Vec<String>,
}
impl TaskPlugin {
pub fn new(
config: TaskWorkerConfig,
storage: Arc<dyn TaskStorage>,
registry: HandlerRegistry,
) -> Result<Self, String> {
let topics: Vec<String> = registry
.task_types()
.iter()
.filter_map(|tt| registry.get_config(*tt).map(|c| c.topic.clone()))
.collect();
let metrics = Arc::new(TaskMetrics::new());
Ok(Self {
config,
storage,
registry,
handles: parking_lot::Mutex::new(Vec::new()),
running: Arc::new(std::sync::atomic::AtomicBool::new(true)),
metrics,
topics,
})
}
pub fn metrics(&self) -> Arc<TaskMetrics> {
Arc::clone(&self.metrics)
}
pub fn topics(&self) -> &[String] {
&self.topics
}
fn signal_stop(&self) {
self.running.store(false, std::sync::atomic::Ordering::Relaxed);
}
async fn wait_for_tasks(&self) {
let handles: Vec<JoinHandle<()>> = {
let mut guard = self.handles.lock();
std::mem::take(&mut *guard)
};
for handle in handles {
let _ = handle.await;
}
}
}
#[async_trait]
impl Plugin for TaskPlugin {
fn name(&self) -> &str {
"task"
}
fn depends_on(&self) -> &[&str] {
&[]
}
async fn start(&self) -> CoreResult<()> {
self.running.store(true, std::sync::atomic::Ordering::Relaxed);
let topics = self.topics.clone();
let running = Arc::clone(&self.running);
let metrics = Arc::clone(&self.metrics);
let worker = {
let w_config = self.config.clone();
let w_storage = Arc::clone(&self.storage);
let w_registry = self.registry.clone();
let w_metrics = Arc::clone(&metrics);
let w_running = Arc::clone(&running);
let w_topics = topics.clone();
tokio::spawn(async move {
let w_config_clone = w_config.clone();
let w_storage_clone = Arc::clone(&w_storage);
let w_registry_clone = w_registry.clone();
let worker = match tokio::task::spawn_blocking(move || {
TaskWorker::new(w_config_clone, w_storage_clone, w_registry_clone, w_metrics)
}).await
{
Ok(Ok(w)) => Arc::new(w),
Ok(Err(e)) => {
error!("TaskWorker 创建失败: {}", e);
return;
}
Err(e) => {
error!("TaskWorker 创建失败 (join error): {}", e);
return;
}
};
let worker_ref = Arc::clone(&worker);
let run_fut = worker_ref.run(&w_topics);
tokio::select! {
result = run_fut => {
if let Err(e) = result {
error!("TaskWorker 运行异常: {}", e);
}
}
_ = async {
while w_running.load(std::sync::atomic::Ordering::Relaxed) {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
} => {
worker_ref.stop();
}
}
})
};
let scanner = {
let s_brokers = self.config.brokers.clone();
let s_storage = Arc::clone(&self.storage);
let s_registry = self.registry.clone();
let s_interval = self.config.scan_interval_secs;
let s_batch = self.config.max_batch_size;
let s_running = Arc::clone(&running);
tokio::spawn(async move {
let scanner = match RetryScanner::new(
&s_brokers, s_storage, s_registry, s_interval, s_batch,
) {
Ok(s) => Arc::new(s),
Err(e) => {
error!("RetryScanner 创建失败: {}", e);
return;
}
};
let scanner_ref = Arc::clone(&scanner);
tokio::select! {
_ = scanner_ref.run() => {}
_ = async {
while s_running.load(std::sync::atomic::Ordering::Relaxed) {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
} => {
scanner_ref.stop();
}
}
})
};
self.handles.lock().push(worker);
self.handles.lock().push(scanner);
info!(
"TaskPlugin 启动: topics={:?}, brokers={}, group={}",
self.topics, self.config.brokers, self.config.group_id
);
Ok(())
}
async fn stop(&self) -> CoreResult<()> {
info!("TaskPlugin 收到停止信号");
self.signal_stop();
self.wait_for_tasks().await;
info!("TaskPlugin 已停止");
Ok(())
}
}