1use std::sync::Arc;
2use std::time::Instant;
3
4use crate::{
5 error::{error_utils, ForgeResult},
6 event::{Event, EventBus},
7 extension_manager::ExtensionManager,
8 helpers::create_doc,
9 history_manager::HistoryManager,
10 metrics,
11 sync_flow::FlowEngine,
12 types::{HistoryEntryWithMeta, ProcessorResult, RuntimeOptions},
13};
14
15use moduforge_model::{node_pool::NodePool, schema::Schema};
16use moduforge_state::{
17 debug, error, info,
18 ops::GlobalResourceManager,
19 state::{State, StateConfig, TransactionResult},
20 transaction::{Command, Transaction},
21};
22
23const DEFAULT_MIDDLEWARE_TIMEOUT_MS: u64 = 500;
25
26pub struct ForgeRuntime {
29 event_bus: EventBus<Event>,
30 state: Arc<State>,
31 flow_engine: Arc<FlowEngine>,
32 extension_manager: ExtensionManager,
33 history_manager: HistoryManager<HistoryEntryWithMeta>,
34 options: RuntimeOptions,
35}
36unsafe impl Send for ForgeRuntime {}
37unsafe impl Sync for ForgeRuntime {}
38impl ForgeRuntime {
39 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
42 let start_time = Instant::now();
43 info!("正在创建新的编辑器实例");
44 let extension_manager =
45 ExtensionManager::new(&options.get_extensions())?;
46 debug!("已初始化扩展管理器");
47
48 let event_bus = EventBus::new();
49 debug!("已创建文档和事件总线");
50 let op_state = GlobalResourceManager::new();
51 for op_fn in extension_manager.get_op_fns() {
52 op_fn(&op_state)?;
53 }
54
55 let mut config = StateConfig {
56 schema: Some(extension_manager.get_schema()),
57 doc: None,
58 stored_marks: None,
59 plugins: Some(extension_manager.get_plugins().clone()),
60 resource_manager: Some(Arc::new(op_state)),
61 };
62 create_doc::create_doc(&options.get_content(), &mut config).await?;
63 let state: State = State::create(config).await?;
64
65 let state: Arc<State> = Arc::new(state);
66 debug!("已创建编辑器状态");
67
68 let mut runtime = ForgeRuntime {
69 event_bus,
70 state: state.clone(),
71 flow_engine: Arc::new(FlowEngine::new()?),
72 extension_manager,
73 history_manager: HistoryManager::new(
74 HistoryEntryWithMeta::new(
75 state.clone(),
76 "创建工程项目".to_string(),
77 serde_json::Value::Null,
78 ),
79 options.get_history_limit(),
80 ),
81 options,
82 };
83 runtime.init().await?;
84 info!("编辑器实例创建成功");
85 metrics::editor_creation_duration(start_time.elapsed());
86 Ok(runtime)
87 }
88
89 async fn init(&mut self) -> ForgeResult<()> {
91 debug!("正在初始化编辑器");
92 self.event_bus.add_event_handlers(self.options.get_event_handlers())?;
93 self.event_bus.start_event_loop();
94 debug!("事件总线已启动");
95
96 self.event_bus
97 .broadcast_blocking(Event::Create(self.state.clone()))
98 .map_err(|e| {
99 error!("广播创建事件失败: {}", e);
100 error_utils::event_error(format!(
101 "Failed to broadcast create event: {}",
102 e
103 ))
104 })?;
105 debug!("已广播创建事件");
106 Ok(())
107 }
108
109 pub async fn destroy(&mut self) -> ForgeResult<()> {
111 debug!("正在销毁编辑器实例");
112 self.event_bus.broadcast(Event::Destroy).await?;
114 self.event_bus.broadcast(Event::Stop).await?;
116 debug!("编辑器实例销毁成功");
117 Ok(())
118 }
119
120 pub async fn emit_event(
121 &mut self,
122 event: Event,
123 ) -> ForgeResult<()> {
124 metrics::event_emitted(event.name());
125 self.event_bus.broadcast(event).await?;
126 Ok(())
127 }
128 pub async fn run_before_middleware(
129 &mut self,
130 transaction: &mut Transaction,
131 ) -> ForgeResult<()> {
132 debug!("执行前置中间件链");
133 for middleware in &self.options.get_middleware_stack().middlewares {
134 let start_time = Instant::now();
135 let timeout =
136 std::time::Duration::from_millis(DEFAULT_MIDDLEWARE_TIMEOUT_MS);
137 match tokio::time::timeout(
138 timeout,
139 middleware.before_dispatch(transaction),
140 )
141 .await
142 {
143 Ok(Ok(())) => {
144 metrics::middleware_execution_duration(
146 start_time.elapsed(),
147 "before",
148 middleware.name().as_str(),
149 );
150 continue;
151 },
152 Ok(Err(e)) => {
153 return Err(error_utils::middleware_error(format!(
154 "前置中间件执行失败: {}",
155 e
156 )));
157 },
158 Err(_) => {
159 return Err(error_utils::middleware_error(format!(
160 "前置中间件执行超时({}ms)",
161 DEFAULT_MIDDLEWARE_TIMEOUT_MS
162 )));
163 },
164 }
165 }
166 transaction.commit();
167 Ok(())
168 }
169 pub async fn run_after_middleware(
170 &mut self,
171 state: &mut Option<Arc<State>>,
172 transactions: &mut Vec<Transaction>,
173 ) -> ForgeResult<()> {
174 debug!("执行后置中间件链");
175 for middleware in &self.options.get_middleware_stack().middlewares {
176 let start_time = Instant::now();
177 let timeout =
178 std::time::Duration::from_millis(DEFAULT_MIDDLEWARE_TIMEOUT_MS);
179 let middleware_result = match tokio::time::timeout(
180 timeout,
181 middleware.after_dispatch(state.clone(), transactions),
182 )
183 .await
184 {
185 Ok(Ok(result)) => {
186 metrics::middleware_execution_duration(
187 start_time.elapsed(),
188 "after",
189 middleware.name().as_str(),
190 );
191 result
192 },
193 Ok(Err(e)) => {
194 return Err(error_utils::middleware_error(format!(
195 "后置中间件执行失败: {}",
196 e
197 )));
198 },
199 Err(_) => {
200 return Err(error_utils::middleware_error(format!(
201 "后置中间件执行超时({}ms)",
202 DEFAULT_MIDDLEWARE_TIMEOUT_MS
203 )));
204 },
205 };
206
207 if let Some(mut transaction) = middleware_result {
208 transaction.commit();
209 let TransactionResult { state: new_state, transactions: trs } =
210 self.state.apply(transaction).await.map_err(|e| {
211 error_utils::state_error(format!(
212 "附加事务应用失败: {}",
213 e
214 ))
215 })?;
216 *state = Some(Arc::new(new_state));
217 transactions.extend(trs);
218 }
219 }
220 Ok(())
221 }
222 pub async fn command(
223 &mut self,
224 command: Arc<dyn Command>,
225 ) -> ForgeResult<()> {
226 debug!("正在执行命令: {}", command.name());
227 metrics::command_executed(command.name().as_str());
228 let mut tr = self.get_tr();
229 command.execute(&mut tr).await?;
230 tr.commit();
231 self.dispatch(tr).await
232 }
233
234 pub async fn command_with_meta(
235 &mut self,
236 command: Arc<dyn Command>,
237 description: String,
238 meta: serde_json::Value,
239 ) -> ForgeResult<()> {
240 debug!("正在执行命令: {}", command.name());
241 metrics::command_executed(command.name().as_str());
242 let mut tr = self.get_tr();
243 command.execute(&mut tr).await?;
244 tr.commit();
245 self.dispatch_with_meta(tr, description, meta).await
246 }
247
248 pub async fn dispatch(
256 &mut self,
257 transaction: Transaction,
258 ) -> ForgeResult<()> {
259 self.dispatch_with_meta(
260 transaction,
261 "".to_string(),
262 serde_json::Value::Null,
263 )
264 .await
265 }
266 pub async fn dispatch_with_meta(
268 &mut self,
269 transaction: Transaction,
270 description: String,
271 meta: serde_json::Value,
272 ) -> ForgeResult<()> {
273 metrics::transaction_dispatched();
274 let old_id = self.get_state().version;
275 let mut current_transaction = transaction;
277 self.run_before_middleware(&mut current_transaction).await?;
278
279 let task_result = self
281 .flow_engine
282 .submit((self.state.clone(), current_transaction.clone()))
283 .await;
284 let Some(ProcessorResult { result: Some(result), .. }) =
285 task_result.output
286 else {
287 return Err(error_utils::state_error(
288 "任务处理结果无效".to_string(),
289 ));
290 };
291 let mut state_update = None;
293 let mut transactions = Vec::new();
294 transactions.extend(result.transactions);
295 if let Some(_) = transactions.last() {
297 state_update = Some(Arc::new(result.state));
298 }
299 self.run_after_middleware(&mut state_update, &mut transactions).await?;
301
302 if let Some(state) = state_update {
304 self.update_state_with_meta(state.clone(), description, meta)
305 .await?;
306 self.emit_event(Event::TrApply(
307 old_id,
308 Arc::new(transactions),
309 state,
310 ))
311 .await?;
312 }
313 Ok(())
314 }
315 pub async fn update_state(
317 &mut self,
318 state: Arc<State>,
319 ) -> ForgeResult<()> {
320 self.update_state_with_meta(
321 state,
322 "".to_string(),
323 serde_json::Value::Null,
324 )
325 .await
326 }
327 pub async fn update_state_with_meta(
329 &mut self,
330 state: Arc<State>,
331 description: String,
332 meta: serde_json::Value,
333 ) -> ForgeResult<()> {
334 self.state = state.clone();
335 self.history_manager.insert(HistoryEntryWithMeta::new(
336 state,
337 description,
338 meta,
339 ));
340 Ok(())
341 }
342
343 pub async fn register_plugin(&mut self) -> ForgeResult<()> {
344 info!("正在注册新插件");
345 let state = self
346 .get_state()
347 .reconfigure(StateConfig {
348 schema: Some(self.get_schema()),
349 doc: Some(self.get_state().doc()),
350 stored_marks: None,
351 plugins: Some(self.get_state().plugins().clone()),
352 resource_manager: Some(
353 self.get_state().resource_manager().clone(),
354 ),
355 })
356 .await?;
357 self.update_state(Arc::new(state)).await?;
358 info!("插件注册成功");
359 Ok(())
360 }
361
362 pub async fn unregister_plugin(
363 &mut self,
364 plugin_key: String,
365 ) -> ForgeResult<()> {
366 info!("正在注销插件: {}", plugin_key);
367 let ps = self
368 .get_state()
369 .plugins()
370 .iter()
371 .filter(|p| p.key != plugin_key)
372 .cloned()
373 .collect();
374 let state = self
375 .get_state()
376 .reconfigure(StateConfig {
377 schema: Some(self.get_schema().clone()),
378 doc: Some(self.get_state().doc()),
379 stored_marks: None,
380 plugins: Some(ps),
381 resource_manager: Some(
382 self.get_state().resource_manager().clone(),
383 ),
384 })
385 .await?;
386 self.update_state(Arc::new(state)).await?;
387 info!("插件注销成功");
388 Ok(())
389 }
390
391 pub fn doc(&self) -> Arc<NodePool> {
393 self.state.doc()
394 }
395
396 pub fn get_options(&self) -> &RuntimeOptions {
397 &self.options
398 }
399
400 pub fn get_state(&self) -> &Arc<State> {
401 &self.state
402 }
403
404 pub fn get_schema(&self) -> Arc<Schema> {
405 self.extension_manager.get_schema()
406 }
407
408 pub fn get_event_bus(&self) -> &EventBus<Event> {
409 &self.event_bus
410 }
411
412 pub fn get_tr(&self) -> Transaction {
413 let tr = self.get_state().tr();
414 tr
415 }
416
417 pub fn undo(&mut self) {
418 self.history_manager.jump(-1);
419 self.state = self.history_manager.get_present().state;
420 metrics::history_operation("undo");
421 }
422
423 pub fn redo(&mut self) {
424 self.history_manager.jump(1);
425 self.state = self.history_manager.get_present().state;
426 metrics::history_operation("redo");
427 }
428
429 pub fn jump(
430 &mut self,
431 n: isize,
432 ) {
433 self.history_manager.jump(n);
434 self.state = self.history_manager.get_present().state;
435 metrics::history_operation("jump");
436 }
437 pub fn get_history_manager(&self) -> &HistoryManager<HistoryEntryWithMeta> {
438 &self.history_manager
439 }
440}
441impl Drop for ForgeRuntime {
442 fn drop(&mut self) {
443 self.event_bus.destroy();
444 }
445}