Skip to main content

alun_task/
producer.rs

1//! 任务生产者 —— 提交任务到 Kafka 并委托给 TaskStorage 持久化
2
3use rdkafka::producer::{FutureProducer, FutureRecord};
4use rdkafka::ClientConfig;
5use std::sync::Arc;
6use std::time::Duration;
7use tracing::{info, warn};
8
9use crate::storage::TaskStorage;
10use crate::types::*;
11use crate::HandlerRegistry;
12
13const MSG_TIMEOUT: Duration = Duration::from_secs(5);
14
15/// 任务生产者
16///
17/// 将任务提交到 Kafka 主题,并委托 `TaskStorage` 持久化任务日志与队列记录。
18/// 不持有任何 SQL 语句或数据库表名——完全由 `TaskStorage` 实现方控制。
19pub struct TaskProducer {
20    /// Kafka 生产者
21    producer: FutureProducer,
22    /// 任务持久化接口
23    storage: Arc<dyn TaskStorage>,
24    /// 处理器注册中心(用于获取 topic 等配置)
25    registry: HandlerRegistry,
26}
27
28impl TaskProducer {
29    /// 创建任务生产者
30    ///
31    /// - `brokers`: Kafka broker 地址
32    /// - `storage`: 由业务方实现的持久化接口
33    /// - `registry`: 已注册 handler 的注册中心
34    ///
35    /// 返回 `Err` 而非 panic,便于外部处理 Kafka 连接失败。
36    pub fn new(
37        brokers: &str,
38        storage: Arc<dyn TaskStorage>,
39        registry: HandlerRegistry,
40    ) -> Result<Self, String> {
41        let producer: FutureProducer = ClientConfig::new()
42            .set("bootstrap.servers", brokers)
43            .set("message.timeout.ms", "5000")
44            .create()
45            .map_err(|e| format!("Kafka Producer 创建失败: {}", e))?;
46        Ok(Self {
47            producer,
48            storage,
49            registry,
50        })
51    }
52
53    /// 提交任务
54    ///
55    /// 1. 生成 task_id
56    /// 2. 委托 storage 持久化任务日志与队列记录
57    /// 3. 发送 Kafka 消息
58    pub async fn submit(&self, params: SubmitTaskParams) -> Result<String, String> {
59        let config = self
60            .registry
61            .get_config(params.task_type)
62            .ok_or_else(|| format!("未注册的 task_type: {}", params.task_type))?;
63
64        let task_id = uuid::Uuid::new_v4().to_string();
65        let priority = params.priority.unwrap_or(config.priority).to_i16();
66        let now = chrono::Utc::now().to_rfc3339();
67
68        self.storage
69            .save_task_log(&task_id, params.task_type, priority, &config, &params)
70            .await
71            .map_err(|e| format!("持久化 task_logs 失败: {}", e))?;
72
73        self.storage
74            .save_task_queue(&task_id, &config.topic, priority)
75            .await
76            .map_err(|e| format!("持久化 task_queue 失败: {}", e))?;
77
78        let msg = TaskMessage {
79            task_id: task_id.clone(),
80            task_type: params.task_type,
81            payload: params.payload.clone(),
82            priority,
83            user_id: params.user_id,
84            resource_id: params.resource_id,
85            resource_type: params.resource_type.map(|r| r.to_i16()),
86            submitted_at: now,
87        };
88
89        let payload = serde_json::to_vec(&msg).map_err(|e| format!("序列化失败: {}", e))?;
90        let record = FutureRecord::to(&config.topic)
91            .key(&task_id)
92            .payload(&payload);
93
94        self.producer
95            .send(record, MSG_TIMEOUT)
96            .await
97            .map_err(|(e, _)| format!("Kafka 发送失败: {}", e))?;
98
99        info!(task_id = %task_id, task_type = params.task_type, "任务已提交");
100        Ok(task_id)
101    }
102
103    /// 批量提交任务
104    ///
105    /// 逐个提交,部分失败不影响其他任务。
106    /// 返回 `(成功数, 失败详情列表)`。
107    pub async fn submit_batch(&self, params: SubmitBatchParams) -> (usize, Vec<(usize, String)>) {
108        let mut succeeded = 0usize;
109        let mut failures = Vec::new();
110
111        for (idx, task) in params.tasks.into_iter().enumerate() {
112            match self.submit(task).await {
113                Ok(_) => succeeded += 1,
114                Err(e) => {
115                    warn!(index = idx, error = %e, "批量提交任务失败");
116                    failures.push((idx, e));
117                }
118            }
119        }
120
121        info!(succeeded = succeeded, failed = failures.len(), "批量提交完成");
122        (succeeded, failures)
123    }
124
125    /// 发送消息到死信队列
126    ///
127    /// 将失败的 TaskMessage 转发到 dead_letter_topic,同时委托 storage 更新状态为 DeadLetter。
128    pub async fn send_to_dlq(
129        &self,
130        msg: &TaskMessage,
131        dead_letter_topic: &str,
132        reason: &str,
133    ) -> Result<(), String> {
134        let payload = serde_json::to_vec(msg).map_err(|e| format!("DLQ 序列化失败: {}", e))?;
135        let record = FutureRecord::to(dead_letter_topic)
136            .key(&msg.task_id)
137            .payload(&payload);
138
139        self.producer
140            .send(record, MSG_TIMEOUT)
141            .await
142            .map_err(|(e, _)| format!("DLQ 发送失败: {}", e))?;
143
144        let _ = self
145            .storage
146            .update_task_status(&msg.task_id, TaskStatus::DeadLetter)
147            .await;
148
149        info!(task_id = %msg.task_id, reason = reason, "任务已转入死信队列");
150        Ok(())
151    }
152}