mf_core/runtime/
sync_processor.rs

1use std::{
2    fmt::Display,
3    sync::{Arc},
4    time::{Duration, Instant},
5    thread,
6    marker::PhantomData,
7};
8
9use async_trait::async_trait;
10
11use crate::metrics;
12
13/// 任务处理的结果状态
14#[derive(Debug, Clone, PartialEq)]
15pub enum TaskStatus {
16    Completed,
17    Failed(String),
18}
19
20impl From<&TaskStatus> for &'static str {
21    fn from(status: &TaskStatus) -> Self {
22        match status {
23            TaskStatus::Completed => "completed",
24            TaskStatus::Failed(_) => "failed",
25        }
26    }
27}
28
29/// 任务处理器的错误类型
30#[derive(Debug)]
31pub enum ProcessorError {
32    TaskFailed(String),
33    InternalError(String),
34}
35
36impl Display for ProcessorError {
37    fn fmt(
38        &self,
39        f: &mut std::fmt::Formatter<'_>,
40    ) -> std::fmt::Result {
41        match self {
42            ProcessorError::TaskFailed(msg) => {
43                write!(f, "任务执行失败: {}", msg)
44            },
45            ProcessorError::InternalError(msg) => {
46                write!(f, "内部错误: {}", msg)
47            },
48        }
49    }
50}
51
52impl std::error::Error for ProcessorError {}
53
54/// 任务处理的结果结构
55#[derive(Debug)]
56pub struct TaskResult<T, O>
57where
58    T: Send + Sync,
59    O: Send + Sync,
60{
61    pub status: TaskStatus,
62    pub task: Option<T>,
63    pub output: Option<O>,
64    pub error: Option<String>,
65    pub processing_time: Duration,
66}
67
68/// 任务处理器特征
69#[async_trait]
70pub trait TaskProcessor<T, O>: Send + Sync + 'static
71where
72    T: Clone + Send + Sync + 'static,
73    O: Clone + Send + Sync + 'static,
74{
75    async fn process(
76        &self,
77        task: T,
78    ) -> Result<O, ProcessorError>;
79}
80
81/// 同步任务处理器
82pub struct SyncProcessor<T, O, P>
83where
84    T: Clone + Send + Sync + 'static,
85    O: Clone + Send + Sync + 'static,
86    P: TaskProcessor<T, O>,
87{
88    processor: Arc<P>,
89    max_retries: u32,
90    retry_delay: Duration,
91    _phantom: PhantomData<(T, O)>,
92}
93
94impl<T, O, P> SyncProcessor<T, O, P>
95where
96    T: Clone + Send + Sync + 'static,
97    O: Clone + Send + Sync + 'static,
98    P: TaskProcessor<T, O>,
99{
100    pub fn new(
101        processor: P,
102        max_retries: u32,
103        retry_delay: Duration,
104    ) -> Self {
105        Self {
106            processor: Arc::new(processor),
107            max_retries,
108            retry_delay,
109            _phantom: PhantomData,
110        }
111    }
112
113    pub async fn process_task(
114        &self,
115        task: T,
116    ) -> TaskResult<T, O> {
117        metrics::task_submitted();
118        let start_time = Instant::now();
119        let mut current_retry = 0;
120
121        loop {
122            match self.processor.process(task.clone()).await {
123                Ok(output) => {
124                    let result = TaskResult {
125                        status: TaskStatus::Completed,
126                        task: Some(task),
127                        output: Some(output),
128                        error: None,
129                        processing_time: start_time.elapsed(),
130                    };
131                    metrics::task_processing_duration(result.processing_time);
132                    metrics::task_processed((&result.status).into());
133                    return result;
134                },
135                Err(e) => {
136                    if current_retry < self.max_retries {
137                        current_retry += 1;
138                        metrics::task_retried();
139                        thread::sleep(self.retry_delay);
140                        continue;
141                    }
142                    let result = TaskResult {
143                        status: TaskStatus::Failed(e.to_string()),
144                        task: Some(task),
145                        output: None,
146                        error: Some(e.to_string()),
147                        processing_time: start_time.elapsed(),
148                    };
149                    metrics::task_processing_duration(result.processing_time);
150                    metrics::task_processed((&result.status).into());
151                    return result;
152                },
153            }
154        }
155    }
156
157    pub async fn process_task_with_retry(
158        &self,
159        task: T,
160        max_retries: u32,
161        retry_delay: Duration,
162    ) -> TaskResult<T, O> {
163        metrics::task_submitted();
164        let start_time = Instant::now();
165        let mut current_retry = 0;
166
167        loop {
168            match self.processor.process(task.clone()).await {
169                Ok(output) => {
170                    let result = TaskResult {
171                        status: TaskStatus::Completed,
172                        task: Some(task),
173                        output: Some(output),
174                        error: None,
175                        processing_time: start_time.elapsed(),
176                    };
177                    metrics::task_processing_duration(result.processing_time);
178                    metrics::task_processed((&result.status).into());
179                    return result;
180                },
181                Err(e) => {
182                    if current_retry < max_retries {
183                        current_retry += 1;
184                        metrics::task_retried();
185                        thread::sleep(retry_delay);
186                        continue;
187                    }
188                    let result = TaskResult {
189                        status: TaskStatus::Failed(e.to_string()),
190                        task: Some(task),
191                        output: None,
192                        error: Some(e.to_string()),
193                        processing_time: start_time.elapsed(),
194                    };
195                    metrics::task_processing_duration(result.processing_time);
196                    metrics::task_processed((&result.status).into());
197                    return result;
198                },
199            }
200        }
201    }
202}