moduforge_core/
sync_processor.rs

1use std::{
2    fmt::Display,
3    sync::{Arc},
4    time::{Duration, Instant},
5    thread,
6    marker::PhantomData,
7};
8
9/// 任务处理的结果状态
10#[derive(Debug, Clone, PartialEq)]
11pub enum TaskStatus {
12    Completed,
13    Failed(String),
14}
15
16/// 任务处理器的错误类型
17#[derive(Debug)]
18pub enum ProcessorError {
19    TaskFailed(String),
20    InternalError(String),
21}
22
23impl Display for ProcessorError {
24    fn fmt(
25        &self,
26        f: &mut std::fmt::Formatter<'_>,
27    ) -> std::fmt::Result {
28        match self {
29            ProcessorError::TaskFailed(msg) => {
30                write!(f, "任务执行失败: {}", msg)
31            },
32            ProcessorError::InternalError(msg) => {
33                write!(f, "内部错误: {}", msg)
34            },
35        }
36    }
37}
38
39impl std::error::Error for ProcessorError {}
40
41/// 任务处理的结果结构
42#[derive(Debug)]
43pub struct TaskResult<T, O>
44where
45    T: Send + Sync,
46    O: Send + Sync,
47{
48    pub status: TaskStatus,
49    pub task: Option<T>,
50    pub output: Option<O>,
51    pub error: Option<String>,
52    pub processing_time: Duration,
53}
54
55/// 任务处理器特征
56pub trait TaskProcessor<T, O>: Send + Sync + 'static
57where
58    T: Clone + Send + Sync + 'static,
59    O: Clone + Send + Sync + 'static,
60{
61    fn process(
62        &self,
63        task: T,
64    ) -> Result<O, ProcessorError>;
65}
66
67/// 同步任务处理器
68pub struct SyncProcessor<T, O, P>
69where
70    T: Clone + Send + Sync + 'static,
71    O: Clone + Send + Sync + 'static,
72    P: TaskProcessor<T, O>,
73{
74    processor: Arc<P>,
75    max_retries: u32,
76    retry_delay: Duration,
77    _phantom: PhantomData<(T, O)>,
78}
79
80impl<T, O, P> SyncProcessor<T, O, P>
81where
82    T: Clone + Send + Sync + 'static,
83    O: Clone + Send + Sync + 'static,
84    P: TaskProcessor<T, O>,
85{
86    pub fn new(
87        processor: P,
88        max_retries: u32,
89        retry_delay: Duration,
90    ) -> Self {
91        Self {
92            processor: Arc::new(processor),
93            max_retries,
94            retry_delay,
95            _phantom: PhantomData,
96        }
97    }
98
99    pub fn process_task(
100        &self,
101        task: T,
102    ) -> TaskResult<T, O> {
103        let start_time = Instant::now();
104        let mut current_retry = 0;
105
106        loop {
107            match self.processor.process(task.clone()) {
108                Ok(output) => {
109                    return TaskResult {
110                        status: TaskStatus::Completed,
111                        task: Some(task),
112                        output: Some(output),
113                        error: None,
114                        processing_time: start_time.elapsed(),
115                    };
116                },
117                Err(e) => {
118                    if current_retry < self.max_retries {
119                        current_retry += 1;
120                        thread::sleep(self.retry_delay);
121                        continue;
122                    }
123                    return TaskResult {
124                        status: TaskStatus::Failed(e.to_string()),
125                        task: Some(task),
126                        output: None,
127                        error: Some(e.to_string()),
128                        processing_time: start_time.elapsed(),
129                    };
130                },
131            }
132        }
133    }
134
135    pub fn process_task_with_retry(
136        &self,
137        task: T,
138        max_retries: u32,
139        retry_delay: Duration,
140    ) -> TaskResult<T, O> {
141        let start_time = Instant::now();
142        let mut current_retry = 0;
143
144        loop {
145            match self.processor.process(task.clone()) {
146                Ok(output) => {
147                    return TaskResult {
148                        status: TaskStatus::Completed,
149                        task: Some(task),
150                        output: Some(output),
151                        error: None,
152                        processing_time: start_time.elapsed(),
153                    };
154                },
155                Err(e) => {
156                    if current_retry < max_retries {
157                        current_retry += 1;
158                        thread::sleep(retry_delay);
159                        continue;
160                    }
161                    return TaskResult {
162                        status: TaskStatus::Failed(e.to_string()),
163                        task: Some(task),
164                        output: None,
165                        error: Some(e.to_string()),
166                        processing_time: start_time.elapsed(),
167                    };
168                },
169            }
170        }
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    struct TestProcessor;
179
180    impl TaskProcessor<i32, String> for TestProcessor {
181        fn process(
182            &self,
183            task: i32,
184        ) -> Result<String, ProcessorError> {
185            if task < 0 {
186                return Err(ProcessorError::TaskFailed("负数任务".to_string()));
187            }
188            thread::sleep(Duration::from_millis(100));
189            Ok(format!("Processed: {}", task))
190        }
191    }
192
193    #[test]
194    fn test_sync_processor() {
195        let processor =
196            SyncProcessor::new(TestProcessor, 3, Duration::from_millis(100));
197
198        // 测试成功的情况
199        let result = processor.process_task(42);
200        assert_eq!(result.status, TaskStatus::Completed);
201        assert!(result.error.is_none());
202        assert_eq!(result.output, Some("Processed: 42".to_string()));
203
204        // 测试失败的情况
205        let result = processor.process_task(-1);
206        assert_eq!(
207            result.status,
208            TaskStatus::Failed("任务执行失败: 负数任务".to_string())
209        );
210        assert!(result.output.is_none());
211        assert!(result.error.is_some());
212    }
213
214    #[test]
215    fn test_processor_with_retry() {
216        let processor =
217            SyncProcessor::new(TestProcessor, 3, Duration::from_millis(100));
218
219        // 测试自定义重试参数
220        let result =
221            processor.process_task_with_retry(42, 2, Duration::from_millis(50));
222        assert_eq!(result.status, TaskStatus::Completed);
223        assert!(result.error.is_none());
224        assert_eq!(result.output, Some("Processed: 42".to_string()));
225    }
226}