moduforge_core/
sync_processor.rs

1use std::{
2    fmt::Display,
3    sync::{Arc},
4    time::{Duration, Instant},
5    thread,
6    marker::PhantomData,
7};
8
9use crate::metrics;
10
11/// 任务处理的结果状态
12#[derive(Debug, Clone, PartialEq)]
13pub enum TaskStatus {
14    Completed,
15    Failed(String),
16}
17
18impl From<&TaskStatus> for &'static str {
19    fn from(status: &TaskStatus) -> Self {
20        match status {
21            TaskStatus::Completed => "completed",
22            TaskStatus::Failed(_) => "failed",
23        }
24    }
25}
26
27/// 任务处理器的错误类型
28#[derive(Debug)]
29pub enum ProcessorError {
30    TaskFailed(String),
31    InternalError(String),
32}
33
34impl Display for ProcessorError {
35    fn fmt(
36        &self,
37        f: &mut std::fmt::Formatter<'_>,
38    ) -> std::fmt::Result {
39        match self {
40            ProcessorError::TaskFailed(msg) => {
41                write!(f, "任务执行失败: {}", msg)
42            },
43            ProcessorError::InternalError(msg) => {
44                write!(f, "内部错误: {}", msg)
45            },
46        }
47    }
48}
49
50impl std::error::Error for ProcessorError {}
51
52/// 任务处理的结果结构
53#[derive(Debug)]
54pub struct TaskResult<T, O>
55where
56    T: Send + Sync,
57    O: Send + Sync,
58{
59    pub status: TaskStatus,
60    pub task: Option<T>,
61    pub output: Option<O>,
62    pub error: Option<String>,
63    pub processing_time: Duration,
64}
65
66/// 任务处理器特征
67pub trait TaskProcessor<T, O>: Send + Sync + 'static
68where
69    T: Clone + Send + Sync + 'static,
70    O: Clone + Send + Sync + 'static,
71{
72    fn process(
73        &self,
74        task: T,
75    ) -> Result<O, ProcessorError>;
76}
77
78/// 同步任务处理器
79pub struct SyncProcessor<T, O, P>
80where
81    T: Clone + Send + Sync + 'static,
82    O: Clone + Send + Sync + 'static,
83    P: TaskProcessor<T, O>,
84{
85    processor: Arc<P>,
86    max_retries: u32,
87    retry_delay: Duration,
88    _phantom: PhantomData<(T, O)>,
89}
90
91impl<T, O, P> SyncProcessor<T, O, P>
92where
93    T: Clone + Send + Sync + 'static,
94    O: Clone + Send + Sync + 'static,
95    P: TaskProcessor<T, O>,
96{
97    pub fn new(
98        processor: P,
99        max_retries: u32,
100        retry_delay: Duration,
101    ) -> Self {
102        Self {
103            processor: Arc::new(processor),
104            max_retries,
105            retry_delay,
106            _phantom: PhantomData,
107        }
108    }
109
110    pub fn process_task(
111        &self,
112        task: T,
113    ) -> TaskResult<T, O> {
114        metrics::task_submitted();
115        let start_time = Instant::now();
116        let mut current_retry = 0;
117
118        loop {
119            match self.processor.process(task.clone()) {
120                Ok(output) => {
121                    let result = TaskResult {
122                        status: TaskStatus::Completed,
123                        task: Some(task),
124                        output: Some(output),
125                        error: None,
126                        processing_time: start_time.elapsed(),
127                    };
128                    metrics::task_processing_duration(result.processing_time);
129                    metrics::task_processed((&result.status).into());
130                    return result;
131                },
132                Err(e) => {
133                    if current_retry < self.max_retries {
134                        current_retry += 1;
135                        metrics::task_retried();
136                        thread::sleep(self.retry_delay);
137                        continue;
138                    }
139                    let result = TaskResult {
140                        status: TaskStatus::Failed(e.to_string()),
141                        task: Some(task),
142                        output: None,
143                        error: Some(e.to_string()),
144                        processing_time: start_time.elapsed(),
145                    };
146                    metrics::task_processing_duration(result.processing_time);
147                    metrics::task_processed((&result.status).into());
148                    return result;
149                },
150            }
151        }
152    }
153
154    pub fn process_task_with_retry(
155        &self,
156        task: T,
157        max_retries: u32,
158        retry_delay: Duration,
159    ) -> TaskResult<T, O> {
160        metrics::task_submitted();
161        let start_time = Instant::now();
162        let mut current_retry = 0;
163
164        loop {
165            match self.processor.process(task.clone()) {
166                Ok(output) => {
167                    let result = TaskResult {
168                        status: TaskStatus::Completed,
169                        task: Some(task),
170                        output: Some(output),
171                        error: None,
172                        processing_time: start_time.elapsed(),
173                    };
174                    metrics::task_processing_duration(result.processing_time);
175                    metrics::task_processed((&result.status).into());
176                    return result;
177                },
178                Err(e) => {
179                    if current_retry < max_retries {
180                        current_retry += 1;
181                        metrics::task_retried();
182                        thread::sleep(retry_delay);
183                        continue;
184                    }
185                    let result = TaskResult {
186                        status: TaskStatus::Failed(e.to_string()),
187                        task: Some(task),
188                        output: None,
189                        error: Some(e.to_string()),
190                        processing_time: start_time.elapsed(),
191                    };
192                    metrics::task_processing_duration(result.processing_time);
193                    metrics::task_processed((&result.status).into());
194                    return result;
195                },
196            }
197        }
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204
205    struct TestProcessor;
206
207    impl TaskProcessor<i32, String> for TestProcessor {
208        fn process(
209            &self,
210            task: i32,
211        ) -> Result<String, ProcessorError> {
212            if task < 0 {
213                return Err(ProcessorError::TaskFailed("负数任务".to_string()));
214            }
215            thread::sleep(Duration::from_millis(100));
216            Ok(format!("Processed: {}", task))
217        }
218    }
219
220    #[test]
221    fn test_sync_processor() {
222        let processor =
223            SyncProcessor::new(TestProcessor, 3, Duration::from_millis(100));
224
225        // 测试成功的情况
226        let result = processor.process_task(42);
227        assert_eq!(result.status, TaskStatus::Completed);
228        assert!(result.error.is_none());
229        assert_eq!(result.output, Some("Processed: 42".to_string()));
230
231        // 测试失败的情况
232        let result = processor.process_task(-1);
233        assert_eq!(
234            result.status,
235            TaskStatus::Failed("任务执行失败: 负数任务".to_string())
236        );
237        assert!(result.output.is_none());
238        assert!(result.error.is_some());
239    }
240
241    #[test]
242    fn test_processor_with_retry() {
243        let processor =
244            SyncProcessor::new(TestProcessor, 3, Duration::from_millis(100));
245
246        // 测试自定义重试参数
247        let result =
248            processor.process_task_with_retry(42, 2, Duration::from_millis(50));
249        assert_eq!(result.status, TaskStatus::Completed);
250        assert!(result.error.is_none());
251        assert_eq!(result.output, Some("Processed: 42".to_string()));
252    }
253}