1use rdkafka::producer::{FutureProducer, FutureRecord};
4use rdkafka::ClientConfig;
5use std::sync::Arc;
6use std::time::Duration;
7use tracing::{info, warn};
8
9use crate::storage::TaskStorage;
10use crate::types::*;
11use crate::HandlerRegistry;
12
13const MSG_TIMEOUT: Duration = Duration::from_secs(5);
14
15pub struct TaskProducer {
20 producer: FutureProducer,
22 storage: Arc<dyn TaskStorage>,
24 registry: HandlerRegistry,
26}
27
28impl TaskProducer {
29 pub fn new(
37 brokers: &str,
38 storage: Arc<dyn TaskStorage>,
39 registry: HandlerRegistry,
40 ) -> Result<Self, String> {
41 let producer: FutureProducer = ClientConfig::new()
42 .set("bootstrap.servers", brokers)
43 .set("message.timeout.ms", "5000")
44 .create()
45 .map_err(|e| format!("Kafka Producer 创建失败: {}", e))?;
46 Ok(Self {
47 producer,
48 storage,
49 registry,
50 })
51 }
52
53 pub async fn submit(&self, params: SubmitTaskParams) -> Result<String, String> {
59 let config = self
60 .registry
61 .get_config(params.task_type)
62 .ok_or_else(|| format!("未注册的 task_type: {}", params.task_type))?;
63
64 let task_id = uuid::Uuid::new_v4().to_string();
65 let priority = params.priority.unwrap_or(config.priority).to_i16();
66 let now = chrono::Utc::now().to_rfc3339();
67
68 self.storage
69 .save_task_log(&task_id, params.task_type, priority, &config, ¶ms)
70 .await
71 .map_err(|e| format!("持久化 task_logs 失败: {}", e))?;
72
73 self.storage
74 .save_task_queue(&task_id, &config.topic, priority)
75 .await
76 .map_err(|e| format!("持久化 task_queue 失败: {}", e))?;
77
78 let msg = TaskMessage {
79 task_id: task_id.clone(),
80 task_type: params.task_type,
81 payload: params.payload.clone(),
82 priority,
83 user_id: params.user_id,
84 resource_id: params.resource_id,
85 resource_type: params.resource_type.map(|r| r.to_i16()),
86 submitted_at: now,
87 };
88
89 let payload = serde_json::to_vec(&msg).map_err(|e| format!("序列化失败: {}", e))?;
90 let record = FutureRecord::to(&config.topic)
91 .key(&task_id)
92 .payload(&payload);
93
94 self.producer
95 .send(record, MSG_TIMEOUT)
96 .await
97 .map_err(|(e, _)| format!("Kafka 发送失败: {}", e))?;
98
99 info!(task_id = %task_id, task_type = params.task_type, "任务已提交");
100 Ok(task_id)
101 }
102
103 pub async fn submit_batch(&self, params: SubmitBatchParams) -> (usize, Vec<(usize, String)>) {
108 let mut succeeded = 0usize;
109 let mut failures = Vec::new();
110
111 for (idx, task) in params.tasks.into_iter().enumerate() {
112 match self.submit(task).await {
113 Ok(_) => succeeded += 1,
114 Err(e) => {
115 warn!(index = idx, error = %e, "批量提交任务失败");
116 failures.push((idx, e));
117 }
118 }
119 }
120
121 info!(succeeded = succeeded, failed = failures.len(), "批量提交完成");
122 (succeeded, failures)
123 }
124
125 pub async fn send_to_dlq(
129 &self,
130 msg: &TaskMessage,
131 dead_letter_topic: &str,
132 reason: &str,
133 ) -> Result<(), String> {
134 let payload = serde_json::to_vec(msg).map_err(|e| format!("DLQ 序列化失败: {}", e))?;
135 let record = FutureRecord::to(dead_letter_topic)
136 .key(&msg.task_id)
137 .payload(&payload);
138
139 self.producer
140 .send(record, MSG_TIMEOUT)
141 .await
142 .map_err(|(e, _)| format!("DLQ 发送失败: {}", e))?;
143
144 let _ = self
145 .storage
146 .update_task_status(&msg.task_id, TaskStatus::DeadLetter)
147 .await;
148
149 info!(task_id = %msg.task_id, reason = reason, "任务已转入死信队列");
150 Ok(())
151 }
152}