Skip to main content

alun_task/
retry.rs

1//! 重试机制 —— 后台定期扫描待重试任务并重新推入 Kafka
2
3use std::sync::Arc;
4use std::time::Duration;
5use tracing::{info, warn};
6
7use rdkafka::producer::{FutureProducer, FutureRecord};
8
9use crate::storage::TaskStorage;
10use crate::types::*;
11use crate::HandlerRegistry;
12
13/// 重试扫描器
14///
15/// 定期从 `TaskStorage` 扫描可重试任务,计算延迟后重新推入 Kafka。
16/// 不持有 SQL——扫描逻辑完全由 `TaskStorage::scan_retryable_tasks()` 实现。
17pub struct RetryScanner {
18    /// 任务持久化接口
19    storage: Arc<dyn TaskStorage>,
20    /// 处理器注册中心
21    registry: HandlerRegistry,
22    /// Kafka 生产者(用于重新发送任务消息)
23    producer: FutureProducer,
24    /// 扫描间隔(秒)
25    interval_secs: u64,
26    /// 每批次扫描最大任务数
27    max_batch_size: usize,
28    /// 运行状态标志
29    running: Arc<std::sync::atomic::AtomicBool>,
30}
31
32impl RetryScanner {
33    /// 创建重试扫描器
34    pub fn new(
35        brokers: &str,
36        storage: Arc<dyn TaskStorage>,
37        registry: HandlerRegistry,
38        interval_secs: u64,
39        max_batch_size: usize,
40    ) -> Result<Self, String> {
41        let producer: FutureProducer = rdkafka::ClientConfig::new()
42            .set("bootstrap.servers", brokers)
43            .set("message.timeout.ms", "5000")
44            .create()
45            .map_err(|e| format!("Kafka Producer 创建失败(RetryScanner): {}", e))?;
46
47        Ok(Self {
48            storage,
49            registry,
50            producer,
51            interval_secs,
52            max_batch_size,
53            running: Arc::new(std::sync::atomic::AtomicBool::new(true)),
54        })
55    }
56
57    /// 启动扫描循环
58    pub async fn run(&self) {
59        info!("RetryScanner 启动,扫描间隔: {}s", self.interval_secs);
60
61        while self.running.load(std::sync::atomic::Ordering::Relaxed) {
62            if let Err(e) = self.scan().await {
63                warn!("RetryScanner 扫描出错: {}", e);
64            }
65            tokio::time::sleep(Duration::from_secs(self.interval_secs)).await;
66        }
67
68        info!("RetryScanner 已停止");
69    }
70
71    /// 停止扫描循环
72    pub fn stop(&self) {
73        self.running.store(false, std::sync::atomic::Ordering::Relaxed);
74    }
75
76    /// 执行一轮扫描:将可重试任务重新推入 Kafka
77    async fn scan(&self) -> Result<(), String> {
78        let task_types: Vec<i16> = self.registry.task_types();
79        if task_types.is_empty() {
80            return Ok(());
81        }
82
83        let tasks = self
84            .storage
85            .scan_retryable_tasks(&task_types, self.max_batch_size)
86            .await?;
87
88        let mut retried = 0usize;
89        for task in &tasks {
90            let config = match self.registry.get_config(task.task_type) {
91                Some(c) => c,
92                None => continue,
93            };
94
95            if task.retry_count >= task.max_retries {
96                continue;
97            }
98
99            let delay = compute_retry_delay(
100                &config.retry_strategy,
101                config.retry_delay_seconds,
102                config.max_retry_delay_seconds,
103                task.retry_count as u32,
104            );
105
106            let _ = self
107                .storage
108                .update_retry(&task.task_id, task.retry_count)
109                .await;
110
111            let msg = TaskMessage {
112                task_id: task.task_id.clone(),
113                task_type: task.task_type,
114                payload: task.payload.clone(),
115                priority: 2i16,
116                user_id: None,
117                resource_id: None,
118                resource_type: None,
119                submitted_at: chrono::Utc::now().to_rfc3339(),
120            };
121
122            let payload = match serde_json::to_vec(&msg) {
123                Ok(p) => p,
124                Err(e) => {
125                    warn!(task_id = %task.task_id, "重试消息序列化失败: {}", e);
126                    continue;
127                }
128            };
129            let record = FutureRecord::to(&config.topic)
130                .key(&task.task_id)
131                .payload(&payload);
132            if let Err((e, _)) = self.producer.send(record, Duration::from_secs(5)).await {
133                warn!(task_id = %task.task_id, "重试 Kafka 推送失败: {}", e);
134                continue;
135            }
136
137            retried += 1;
138            info!(task_id = %task.task_id, retry_count = task.retry_count, delay_s = delay, "重试任务推送成功");
139        }
140
141        if retried > 0 {
142            info!("本轮 RetryScanner 重试 {} 个任务", retried);
143        }
144        Ok(())
145    }
146}
147
148/// 计算重试延迟(秒)
149pub fn compute_retry_delay(strategy: &RetryStrategy, base: u64, max: u64, attempt: u32) -> u64 {
150    match strategy {
151        RetryStrategy::Fixed => base,
152        RetryStrategy::Linear => (base * (attempt + 1) as u64).min(max),
153        RetryStrategy::Exponential => {
154            let delay = base * 2u64.pow(attempt);
155            delay.min(max)
156        }
157    }
158}