1use std::{
39 ops::{Deref, DerefMut},
40 sync::Arc,
41 time::Duration,
42};
43
44use crate::runtime::Editor;
45use crate::{
46 error_utils,
47 event::Event,
48 flow::{FlowEngine, ProcessorResult},
49 types::EditorOptions,
50 EditorResult,
51};
52use moduforge_state::{
53 debug,
54 state::TransactionResult,
55 transaction::{Command, Transaction},
56 State,
57};
58
59#[derive(Debug, Clone)]
61pub struct PerformanceConfig {
62 pub enable_monitoring: bool,
64 pub middleware_timeout_ms: u64,
67 pub log_threshold_ms: u64,
70 pub task_receive_timeout_ms: u64,
74
75}
76
77impl Default for PerformanceConfig {
78 fn default() -> Self {
79 Self {
80 enable_monitoring: false,
81 middleware_timeout_ms: 500,
82 log_threshold_ms: 50,
83 task_receive_timeout_ms: 5000, }
85 }
86}
87
88pub struct AsyncEditor {
91 base: Editor,
92 flow_engine: FlowEngine,
93 perf_config: PerformanceConfig,
94}
95unsafe impl Send for AsyncEditor {}
96unsafe impl Sync for AsyncEditor {}
97
98impl Deref for AsyncEditor {
99 type Target = Editor;
100
101 fn deref(&self) -> &Self::Target {
102 &self.base
103 }
104}
105
106impl DerefMut for AsyncEditor {
107 fn deref_mut(&mut self) -> &mut Self::Target {
108 &mut self.base
109 }
110}
111impl AsyncEditor {
112 pub async fn create(options: EditorOptions) -> EditorResult<Self> {
115 let base = Editor::create(options).await?;
116 Ok(AsyncEditor {
117 base,
118 flow_engine: FlowEngine::new()?,
119 perf_config: PerformanceConfig::default(),
120 })
121 }
122
123 pub fn set_performance_config(
125 &mut self,
126 config: PerformanceConfig,
127 ) {
128 self.perf_config = config;
129 }
130
131 fn log_performance(
133 &self,
134 operation: &str,
135 duration: Duration,
136 ) {
137 if self.perf_config.enable_monitoring
138 && duration.as_millis() > self.perf_config.log_threshold_ms as u128
139 {
140 debug!("{} 耗时: {}ms", operation, duration.as_millis());
141 }
142 }
143 pub async fn command(
144 &mut self,
145 command: Arc<dyn Command>,
146 ) -> EditorResult<()> {
147 self.command_with_meta(command, "".to_string(), serde_json::Value::Null).await
148 }
149
150 pub async fn command_with_meta(
161 &mut self,
162 command: Arc<dyn Command>,
163 description: String,
164 meta: serde_json::Value,
165 ) -> EditorResult<()> {
166 let cmd_name = command.name();
167 debug!("正在执行命令: {}", cmd_name);
168
169 let mut tr = self.get_tr();
171 command.execute(&mut tr).await?;
172 tr.commit();
173 match self.dispatch_flow_with_meta(tr, description, meta).await {
175 Ok(_) => {
176 debug!("命令 '{}' 执行成功", cmd_name);
177 Ok(())
178 },
179 Err(e) => {
180 debug!("命令 '{}' 执行失败: {}", cmd_name, e);
181 Err(e)
182 },
183 }
184 }
185 pub async fn dispatch_flow(
186 &mut self,
187 transaction: Transaction,
188 ) -> EditorResult<()> {
189 self.dispatch_flow_with_meta(transaction, "".to_string(), serde_json::Value::Null).await
190 }
191 pub async fn dispatch_flow_with_meta(
205 &mut self,
206 transaction: Transaction,
207 description: String,
208 meta: serde_json::Value,
209 ) -> EditorResult<()> {
210 let start_time = std::time::Instant::now();
211 let mut current_transaction = transaction;
212 let old_id = self.get_state().version;
213 let middleware_start = std::time::Instant::now();
215 self.run_before_middleware(&mut current_transaction).await?;
216 self.log_performance("前置中间件处理", middleware_start.elapsed());
217
218 let (_id, mut rx) = self
220 .flow_engine
221 .submit_transaction((
222 self.base.get_state().clone(),
223 current_transaction,
224 ))
225 .await?;
226
227 let recv_start = std::time::Instant::now();
229 let task_receive_timeout = Duration::from_millis(self.perf_config.task_receive_timeout_ms);
230 let task_result = match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
231 Ok(Some(result)) => result,
232 Ok(None) => {
233 return Err(error_utils::state_error(
234 "任务接收通道已关闭".to_string(),
235 ));
236 },
237 Err(_) => {
238 return Err(error_utils::state_error(
239 format!("任务接收超时({}ms)", self.perf_config.task_receive_timeout_ms),
240 ));
241 },
242 };
243 self.log_performance("接收任务结果", recv_start.elapsed());
244
245 let Some(ProcessorResult { result: Some(result), .. }) =
247 task_result.output
248 else {
249 return Err(error_utils::state_error(
250 "任务处理结果无效".to_string(),
251 ));
252 };
253
254 let mut current_state = None;
256 let mut transactions = Vec::new();
257 transactions.extend(result.transactions);
258
259 if let Some(_) = transactions.last() {
261 current_state = Some(Arc::new(result.state));
262 }
263
264 let after_start = std::time::Instant::now();
266 self.run_after_middleware(&mut current_state, &mut transactions)
267 .await?;
268 self.log_performance("后置中间件处理", after_start.elapsed());
269
270 if let Some(state) = current_state {
272 self.base.update_state_with_meta(state.clone(), description, meta).await?;
273
274 let event_start = std::time::Instant::now();
275 self.base
276 .emit_event(Event::TrApply(
277 old_id,
278 Arc::new(transactions),
279 state,
280 ))
281 .await?;
282 self.log_performance("事件广播", event_start.elapsed());
283 }
284
285 self.log_performance("事务处理总耗时", start_time.elapsed());
286 Ok(())
287 }
288
289 pub async fn run_before_middleware(
290 &mut self,
291 transaction: &mut Transaction,
292 ) -> EditorResult<()> {
293 debug!("执行前置中间件链");
294 for middleware in
295 &self.base.get_options().get_middleware_stack().middlewares
296 {
297 let timeout = Duration::from_millis(self.perf_config.middleware_timeout_ms);
298 match tokio::time::timeout(
299 timeout,
300 middleware.before_dispatch(transaction),
301 )
302 .await
303 {
304 Ok(Ok(())) => {
305 continue;
307 },
308 Ok(Err(e)) => {
309 return Err(error_utils::middleware_error(format!(
310 "前置中间件执行失败: {}",
311 e
312 )));
313 },
314 Err(_) => {
315 return Err(error_utils::middleware_error(format!(
316 "前置中间件执行超时({}ms)",
317 self.perf_config.middleware_timeout_ms
318 )));
319 },
320 }
321 }
322 transaction.commit();
323 Ok(())
324 }
325 pub async fn run_after_middleware(
326 &mut self,
327 state: &mut Option<Arc<State>>,
328 transactions: &mut Vec<Transaction>,
329 ) -> EditorResult<()> {
330 debug!("执行后置中间件链");
331 for middleware in
332 &self.base.get_options().get_middleware_stack().middlewares
333 {
334 let timeout = std::time::Duration::from_millis(
337 self.perf_config.middleware_timeout_ms,
338 );
339
340 let start_time = std::time::Instant::now();
342
343 let middleware_result = match tokio::time::timeout(
344 timeout,
345 middleware.after_dispatch(state.clone(), transactions),
346 )
347 .await
348 {
349 Ok(result) => match result {
350 Ok(r) => r,
351 Err(e) => {
352 debug!("中间件执行失败: {}", e);
354 return Err(error_utils::middleware_error(format!(
355 "中间件执行失败: {}",
356 e
357 )));
358 },
359 },
360 Err(e) => {
361 debug!("中间件执行超时: {}", e);
362 return Err(error_utils::middleware_error(format!(
363 "中间件执行超时: {}",
364 e
365 )));
366 },
367 };
368
369 let elapsed = start_time.elapsed();
371 if elapsed.as_millis() > 100 {
372 debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
373 }
374
375 if let Some(mut transaction) = middleware_result.additional_transaction
376 {
377 transaction.commit();
378 let tx_start_time = std::time::Instant::now();
380
381 let result = match self
382 .flow_engine
383 .submit_transaction((
384 self.base.get_state().clone(),
385 transaction,
386 ))
387 .await
388 {
389 Ok(result) => result,
390 Err(e) => {
391 debug!("附加事务提交失败: {}", e);
392 return Err(error_utils::state_error(format!(
393 "附加事务提交失败: {}",
394 e
395 )));
396 },
397 };
398
399 let (_id, mut rx) = result;
400
401 let task_receive_timeout = Duration::from_millis(self.perf_config.task_receive_timeout_ms);
403 let task_result = match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
404 Ok(Some(result)) => result,
405 Ok(None) => {
406 debug!("附加事务接收通道已关闭");
407 return Ok(());
408 },
409 Err(_) => {
410 debug!("附加事务接收超时");
411 return Err(error_utils::state_error(format!(
412 "附加事务接收超时({}ms)",
413 self.perf_config.task_receive_timeout_ms
414 )));
415 },
416 };
417
418 let Some(ProcessorResult { result: Some(result), .. }) =
419 task_result.output
420 else {
421 debug!("附加事务处理结果无效");
422 return Ok(());
423 };
424
425 let TransactionResult { state: new_state, transactions: trs } =
426 result;
427 *state = Some(Arc::new(new_state));
428 transactions.extend(trs);
429
430 let tx_elapsed = tx_start_time.elapsed();
432 if tx_elapsed.as_millis() > 50 {
433 debug!(
434 "附加事务处理时间较长: {}ms",
435 tx_elapsed.as_millis()
436 );
437 }
438 }
439 }
440 Ok(())
441 }
442}