1use std::{
2 ops::{Deref, DerefMut},
3 sync::Arc,
4 time::Duration,
5};
6
7use crate::runtime::Editor;
8use crate::{
9 error_utils,
10 event::Event,
11 flow::{FlowEngine, ProcessorResult},
12 types::EditorOptions,
13 EditorResult,
14};
15use moduforge_state::{
16 debug,
17 state::TransactionResult,
18 transaction::{Command, Transaction},
19 State,
20};
21
22#[derive(Debug, Clone)]
24pub struct PerformanceConfig {
25 pub enable_monitoring: bool,
26 pub middleware_timeout_ms: u64,
27 pub log_threshold_ms: u64,
28}
29
30impl Default for PerformanceConfig {
31 fn default() -> Self {
32 Self {
33 enable_monitoring: false,
34 middleware_timeout_ms: 500,
35 log_threshold_ms: 50,
36 }
37 }
38}
39
40pub struct AsyncEditor {
43 base: Editor,
44 flow_engine: FlowEngine,
45 perf_config: PerformanceConfig,
46}
47unsafe impl Send for AsyncEditor {}
48unsafe impl Sync for AsyncEditor {}
49
50impl Deref for AsyncEditor {
51 type Target = Editor;
52
53 fn deref(&self) -> &Self::Target {
54 &self.base
55 }
56}
57
58impl DerefMut for AsyncEditor {
59 fn deref_mut(&mut self) -> &mut Self::Target {
60 &mut self.base
61 }
62}
63impl AsyncEditor {
64 pub async fn create(
67 options: EditorOptions
68 ) -> Result<Self, Box<dyn std::error::Error>> {
69 let base = Editor::create(options).await?;
70 Ok(AsyncEditor {
71 base,
72 flow_engine: FlowEngine::new()?,
73 perf_config: PerformanceConfig::default(),
74 })
75 }
76
77 pub fn set_performance_config(
79 &mut self,
80 config: PerformanceConfig,
81 ) {
82 self.perf_config = config;
83 }
84
85 fn log_performance(
87 &self,
88 operation: &str,
89 duration: Duration,
90 ) {
91 if self.perf_config.enable_monitoring
92 && duration.as_millis() > self.perf_config.log_threshold_ms as u128
93 {
94 debug!("{} 耗时: {}ms", operation, duration.as_millis());
95 }
96 }
97
98 pub async fn command(
109 &mut self,
110 command: Arc<dyn Command>,
111 ) -> EditorResult<()> {
112 let cmd_name = command.name();
113 debug!("正在执行命令: {}", cmd_name);
114
115 let mut tr = self.get_tr();
117 command.execute(&mut tr).await.map_err(|e| {
118 error_utils::state_error(format!("命令执行失败: {}", e))
119 })?;
120
121 match self.dispatch_flow(tr).await {
123 Ok(_) => {
124 debug!("命令 '{}' 执行成功", cmd_name);
125 Ok(())
126 },
127 Err(e) => {
128 debug!("命令 '{}' 执行失败: {}", cmd_name, e);
129 Err(e)
130 },
131 }
132 }
133
134 pub async fn dispatch_flow(
148 &mut self,
149 transaction: Transaction,
150 ) -> EditorResult<()> {
151 let start_time = std::time::Instant::now();
152 let mut current_transaction = transaction;
153 let old_id = self.get_state().version;
154 let middleware_start = std::time::Instant::now();
156 self.run_before_middleware(&mut current_transaction).await?;
157 self.log_performance("前置中间件处理", middleware_start.elapsed());
158
159 let flow_start = std::time::Instant::now();
161 let (_id, mut rx) = self
162 .flow_engine
163 .submit_transaction((
164 self.base.get_state().clone(),
165 current_transaction,
166 ))
167 .await?;
168 self.log_performance("提交事务", flow_start.elapsed());
169
170 let recv_start = std::time::Instant::now();
172 let Some(task_result) = rx.recv().await else {
173 return Err(error_utils::state_error(
174 "无法接收任务结果".to_string(),
175 ));
176 };
177 self.log_performance("接收任务结果", recv_start.elapsed());
178
179 let Some(ProcessorResult { result: Some(result), .. }) =
181 task_result.output
182 else {
183 return Err(error_utils::state_error(
184 "任务处理结果无效".to_string(),
185 ));
186 };
187
188 let mut current_state = None;
190 let mut transactions = Vec::new();
191 transactions.extend(result.transactions);
192
193 if let Some(tr) = transactions.last() {
195 if tr.doc_changed() {
196 current_state = Some(Arc::new(result.state));
197 }
198 }
199
200 let after_start = std::time::Instant::now();
202 self.run_after_middleware(&mut current_state, &mut transactions)
203 .await?;
204 self.log_performance("后置中间件处理", after_start.elapsed());
205
206 if let Some(state) = current_state {
208 let update_start = std::time::Instant::now();
209 self.base.update_state(state.clone()).await?;
210 self.log_performance("状态更新", update_start.elapsed());
211
212 let event_start = std::time::Instant::now();
213 self.base
214 .emit_event(Event::TrApply(
215 old_id,
216 Arc::new(transactions),
217 state,
218 ))
219 .await?;
220 self.log_performance("事件广播", event_start.elapsed());
221 }
222
223 self.log_performance("事务处理总耗时", start_time.elapsed());
224 Ok(())
225 }
226
227 pub async fn run_before_middleware(
228 &mut self,
229 transaction: &mut Transaction,
230 ) -> EditorResult<()> {
231 debug!("执行前置中间件链");
232 for middleware in
233 &self.base.get_options().get_middleware_stack().middlewares
234 {
235 let timeout = std::time::Duration::from_millis(500);
236 if let Err(e) = tokio::time::timeout(
237 timeout,
238 middleware.before_dispatch(transaction),
239 )
240 .await
241 {
242 return Err(error_utils::middleware_error(format!(
243 "中间件执行超时: {}",
244 e
245 )));
246 }
247 }
248 Ok(())
249 }
250 pub async fn run_after_middleware(
251 &mut self,
252 state: &mut Option<Arc<State>>,
253 transactions: &mut Vec<Transaction>,
254 ) -> EditorResult<()> {
255 debug!("执行后置中间件链");
256 for middleware in
257 &self.base.get_options().get_middleware_stack().middlewares
258 {
259 let timeout = std::time::Duration::from_millis(
262 self.perf_config.middleware_timeout_ms,
263 );
264
265 let start_time = std::time::Instant::now();
267
268 let middleware_result = match tokio::time::timeout(
269 timeout,
270 middleware.after_dispatch(state.clone(), transactions),
271 )
272 .await
273 {
274 Ok(result) => match result {
275 Ok(r) => r,
276 Err(e) => {
277 debug!("中间件执行失败: {}", e);
279 return Err(error_utils::middleware_error(format!(
280 "中间件执行失败: {}",
281 e
282 )));
283 },
284 },
285 Err(e) => {
286 debug!("中间件执行超时: {}", e);
287 return Err(error_utils::middleware_error(format!(
288 "中间件执行超时: {}",
289 e
290 )));
291 },
292 };
293
294 let elapsed = start_time.elapsed();
296 if elapsed.as_millis() > 100 {
297 debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
298 }
299
300 if let Some(transaction) = middleware_result.additional_transaction
301 {
302 let tx_start_time = std::time::Instant::now();
304
305 let result = match self
306 .flow_engine
307 .submit_transaction((
308 self.base.get_state().clone(),
309 transaction,
310 ))
311 .await
312 {
313 Ok(result) => result,
314 Err(e) => {
315 debug!("附加事务提交失败: {}", e);
316 return Err(error_utils::state_error(format!(
317 "附加事务提交失败: {}",
318 e
319 )));
320 },
321 };
322
323 let (_id, mut rx) = result;
324
325 let Some(task_result) = rx.recv().await else {
326 debug!("接收事务处理结果失败");
327 return Ok(());
328 };
329
330 let Some(ProcessorResult { result: Some(result), .. }) =
331 task_result.output
332 else {
333 debug!("处理结果无效");
334 return Ok(());
335 };
336
337 let TransactionResult { state: new_state, transactions: trs } =
338 result;
339 *state = Some(Arc::new(new_state));
340 transactions.extend(trs);
341
342 let tx_elapsed = tx_start_time.elapsed();
344 if tx_elapsed.as_millis() > 50 {
345 debug!(
346 "附加事务处理时间较长: {}ms",
347 tx_elapsed.as_millis()
348 );
349 }
350 }
351 }
352 Ok(())
353 }
354}