1use std::sync::Arc;
4use std::time::Duration;
5use tracing::{info, warn};
6
7use rdkafka::producer::{FutureProducer, FutureRecord};
8
9use crate::storage::TaskStorage;
10use crate::types::*;
11use crate::HandlerRegistry;
12
13pub struct RetryScanner {
18 storage: Arc<dyn TaskStorage>,
20 registry: HandlerRegistry,
22 producer: FutureProducer,
24 interval_secs: u64,
26 max_batch_size: usize,
28 running: Arc<std::sync::atomic::AtomicBool>,
30}
31
32impl RetryScanner {
33 pub fn new(
35 brokers: &str,
36 storage: Arc<dyn TaskStorage>,
37 registry: HandlerRegistry,
38 interval_secs: u64,
39 max_batch_size: usize,
40 ) -> Result<Self, String> {
41 let producer: FutureProducer = rdkafka::ClientConfig::new()
42 .set("bootstrap.servers", brokers)
43 .set("message.timeout.ms", "5000")
44 .create()
45 .map_err(|e| format!("Kafka Producer 创建失败(RetryScanner): {}", e))?;
46
47 Ok(Self {
48 storage,
49 registry,
50 producer,
51 interval_secs,
52 max_batch_size,
53 running: Arc::new(std::sync::atomic::AtomicBool::new(true)),
54 })
55 }
56
57 pub async fn run(&self) {
59 info!("RetryScanner 启动,扫描间隔: {}s", self.interval_secs);
60
61 while self.running.load(std::sync::atomic::Ordering::Relaxed) {
62 if let Err(e) = self.scan().await {
63 warn!("RetryScanner 扫描出错: {}", e);
64 }
65 tokio::time::sleep(Duration::from_secs(self.interval_secs)).await;
66 }
67
68 info!("RetryScanner 已停止");
69 }
70
71 pub fn stop(&self) {
73 self.running.store(false, std::sync::atomic::Ordering::Relaxed);
74 }
75
76 async fn scan(&self) -> Result<(), String> {
78 let task_types: Vec<i16> = self.registry.task_types();
79 if task_types.is_empty() {
80 return Ok(());
81 }
82
83 let tasks = self
84 .storage
85 .scan_retryable_tasks(&task_types, self.max_batch_size)
86 .await?;
87
88 let mut retried = 0usize;
89 for task in &tasks {
90 let config = match self.registry.get_config(task.task_type) {
91 Some(c) => c,
92 None => continue,
93 };
94
95 if task.retry_count >= task.max_retries {
96 continue;
97 }
98
99 let delay = compute_retry_delay(
100 &config.retry_strategy,
101 config.retry_delay_seconds,
102 config.max_retry_delay_seconds,
103 task.retry_count as u32,
104 );
105
106 let _ = self
107 .storage
108 .update_retry(&task.task_id, task.retry_count)
109 .await;
110
111 let msg = TaskMessage {
112 task_id: task.task_id.clone(),
113 task_type: task.task_type,
114 payload: task.payload.clone(),
115 priority: 2i16,
116 user_id: None,
117 resource_id: None,
118 resource_type: None,
119 submitted_at: chrono::Utc::now().to_rfc3339(),
120 };
121
122 let payload = match serde_json::to_vec(&msg) {
123 Ok(p) => p,
124 Err(e) => {
125 warn!(task_id = %task.task_id, "重试消息序列化失败: {}", e);
126 continue;
127 }
128 };
129 let record = FutureRecord::to(&config.topic)
130 .key(&task.task_id)
131 .payload(&payload);
132 if let Err((e, _)) = self.producer.send(record, Duration::from_secs(5)).await {
133 warn!(task_id = %task.task_id, "重试 Kafka 推送失败: {}", e);
134 continue;
135 }
136
137 retried += 1;
138 info!(task_id = %task.task_id, retry_count = task.retry_count, delay_s = delay, "重试任务推送成功");
139 }
140
141 if retried > 0 {
142 info!("本轮 RetryScanner 重试 {} 个任务", retried);
143 }
144 Ok(())
145 }
146}
147
148pub fn compute_retry_delay(strategy: &RetryStrategy, base: u64, max: u64, attempt: u32) -> u64 {
150 match strategy {
151 RetryStrategy::Fixed => base,
152 RetryStrategy::Linear => (base * (attempt + 1) as u64).min(max),
153 RetryStrategy::Exponential => {
154 let delay = base * 2u64.pow(attempt);
155 delay.min(max)
156 }
157 }
158}