use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use std::sync::Arc;
use std::time::Duration;
use tracing::{info, warn};
use crate::storage::TaskStorage;
use crate::types::*;
use crate::HandlerRegistry;
const MSG_TIMEOUT: Duration = Duration::from_secs(5);
pub struct TaskProducer {
producer: FutureProducer,
storage: Arc<dyn TaskStorage>,
registry: HandlerRegistry,
}
impl TaskProducer {
pub fn new(
brokers: &str,
storage: Arc<dyn TaskStorage>,
registry: HandlerRegistry,
) -> Result<Self, String> {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.map_err(|e| format!("Kafka Producer 创建失败: {}", e))?;
Ok(Self {
producer,
storage,
registry,
})
}
pub async fn submit(&self, params: SubmitTaskParams) -> Result<String, String> {
let config = self
.registry
.get_config(params.task_type)
.ok_or_else(|| format!("未注册的 task_type: {}", params.task_type))?;
let task_id = uuid::Uuid::new_v4().to_string();
let priority = params.priority.unwrap_or(config.priority).to_i16();
let now = chrono::Utc::now().to_rfc3339();
self.storage
.save_task_log(&task_id, params.task_type, priority, &config, ¶ms)
.await
.map_err(|e| format!("持久化 task_logs 失败: {}", e))?;
self.storage
.save_task_queue(&task_id, &config.topic, priority)
.await
.map_err(|e| format!("持久化 task_queue 失败: {}", e))?;
let msg = TaskMessage {
task_id: task_id.clone(),
task_type: params.task_type,
payload: params.payload.clone(),
priority,
user_id: params.user_id,
resource_id: params.resource_id,
resource_type: params.resource_type.map(|r| r.to_i16()),
submitted_at: now,
};
let payload = serde_json::to_vec(&msg).map_err(|e| format!("序列化失败: {}", e))?;
let record = FutureRecord::to(&config.topic)
.key(&task_id)
.payload(&payload);
self.producer
.send(record, MSG_TIMEOUT)
.await
.map_err(|(e, _)| format!("Kafka 发送失败: {}", e))?;
info!(task_id = %task_id, task_type = params.task_type, "任务已提交");
Ok(task_id)
}
pub async fn submit_batch(&self, params: SubmitBatchParams) -> (usize, Vec<(usize, String)>) {
let mut succeeded = 0usize;
let mut failures = Vec::new();
for (idx, task) in params.tasks.into_iter().enumerate() {
match self.submit(task).await {
Ok(_) => succeeded += 1,
Err(e) => {
warn!(index = idx, error = %e, "批量提交任务失败");
failures.push((idx, e));
}
}
}
info!(succeeded = succeeded, failed = failures.len(), "批量提交完成");
(succeeded, failures)
}
pub async fn send_to_dlq(
&self,
msg: &TaskMessage,
dead_letter_topic: &str,
reason: &str,
) -> Result<(), String> {
let payload = serde_json::to_vec(msg).map_err(|e| format!("DLQ 序列化失败: {}", e))?;
let record = FutureRecord::to(dead_letter_topic)
.key(&msg.task_id)
.payload(&payload);
self.producer
.send(record, MSG_TIMEOUT)
.await
.map_err(|(e, _)| format!("DLQ 发送失败: {}", e))?;
let _ = self
.storage
.update_task_status(&msg.task_id, TaskStatus::DeadLetter)
.await;
info!(task_id = %msg.task_id, reason = reason, "任务已转入死信队列");
Ok(())
}
}