1use std::sync::Arc;
6use std::time::Instant;
7use async_trait::async_trait;
8use tokio::sync::oneshot;
9
10use crate::{
11 actors::{
12 system::{ForgeActorSystem, ForgeActorSystemHandle, ActorSystemConfig},
13 transaction_processor::TransactionMessage,
14 state_actor::StateMessage,
15 event_bus::EventBusMessage,
16 },
17 config::ForgeConfig,
18 debug::debug,
19 error::{error_utils, ForgeResult},
20 event::Event,
21 runtime::runtime_trait::RuntimeTrait,
22 types::RuntimeOptions,
23 metrics,
24};
25
26use mf_model::schema::Schema;
27use mf_state::{
28 state::State,
29 transaction::{Command, Transaction},
30};
31
32pub struct ForgeActorRuntime {
37 actor_system: Option<ForgeActorSystemHandle>,
39 config: ForgeConfig,
41 started: bool,
43}
44
45impl ForgeActorRuntime {
46 fn actor_system(&self) -> ForgeResult<&ForgeActorSystemHandle> {
48 self.actor_system.as_ref().ok_or_else(|| {
49 error_utils::engine_error("Actor系统未初始化".to_string())
50 })
51 }
52
53 #[cfg_attr(
61 feature = "dev-tracing",
62 tracing::instrument(
63 skip(options),
64 fields(crate_name = "core", runtime_type = "actor")
65 )
66 )]
67 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
68 Self::create_with_config(options, ForgeConfig::default()).await
69 }
70
71 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
80 crate_name = "core",
81 runtime_type = "actor",
82 has_middleware = !options.get_middleware_stack().is_empty()
83 )))]
84 pub async fn create_with_config(
85 options: RuntimeOptions,
86 config: ForgeConfig,
87 ) -> ForgeResult<Self> {
88 let start_time = Instant::now();
89 debug!("正在创建Actor运行时实例");
90
91 let actor_system = ForgeActorSystem::start(
93 options,
94 config.clone(),
95 ActorSystemConfig::default(),
96 )
97 .await
98 .map_err(|e| {
99 error_utils::engine_error(format!("启动Actor系统失败: {e}"))
100 })?;
101
102 debug!("Actor运行时实例创建成功");
103 metrics::editor_creation_duration(start_time.elapsed());
104
105 Ok(ForgeActorRuntime {
106 actor_system: Some(actor_system),
107 config,
108 started: true,
109 })
110 }
111
112 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
116 crate_name = "core",
117 tr_id = %transaction.id,
118 runtime_type = "actor"
119 )))]
120 pub async fn dispatch(
121 &mut self,
122 transaction: Transaction,
123 ) -> ForgeResult<()> {
124 self.dispatch_with_meta(
125 transaction,
126 "".to_string(),
127 serde_json::Value::Null,
128 )
129 .await
130 }
131
132 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
136 crate_name = "core",
137 tr_id = %transaction.id,
138 description = %description,
139 runtime_type = "actor"
140 )))]
141 pub async fn dispatch_with_meta(
142 &mut self,
143 transaction: Transaction,
144 description: String,
145 meta: serde_json::Value,
146 ) -> ForgeResult<()> {
147 if !self.started {
148 return Err(error_utils::engine_error("运行时未启动".to_string()));
149 }
150
151 let (tx, rx) = oneshot::channel();
153
154 self.actor_system()?
155 .transaction_processor
156 .send_message(TransactionMessage::ProcessTransaction {
157 transaction,
158 description,
159 meta,
160 reply: tx,
161 })
162 .map_err(|e| {
163 error_utils::engine_error(format!("发送事务消息失败: {e}"))
164 })?;
165
166 rx.await.map_err(|e| {
167 error_utils::engine_error(format!("等待事务处理结果失败: {e}"))
168 })?
169 }
170
171 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
175 crate_name = "core",
176 command_name = %command.name(),
177 runtime_type = "actor"
178 )))]
179 pub async fn command(
180 &mut self,
181 command: Arc<dyn Command>,
182 ) -> ForgeResult<()> {
183 debug!("正在执行命令: {}", command.name());
184 metrics::command_executed(command.name().as_str());
185
186 let mut tr = self.get_tr().await?;
187 command.execute(&mut tr).await?;
188 tr.commit()?;
189 self.dispatch(tr).await
190 }
191
192 pub async fn command_with_meta(
196 &mut self,
197 command: Arc<dyn Command>,
198 description: String,
199 meta: serde_json::Value,
200 ) -> ForgeResult<()> {
201 debug!("正在执行命令: {}", command.name());
202 metrics::command_executed(command.name().as_str());
203
204 let mut tr = self.get_tr().await?;
205 command.execute(&mut tr).await?;
206 tr.commit()?;
207 self.dispatch_with_meta(tr, description, meta).await
208 }
209
210 pub async fn get_state(&self) -> ForgeResult<Arc<State>> {
214 let (tx, rx) = oneshot::channel();
215
216 self.actor_system()?
217 .state_actor
218 .send_message(StateMessage::GetState { reply: tx })
219 .map_err(|e| {
220 error_utils::state_error(format!("发送获取状态消息失败: {e}"))
221 })?;
222
223 rx.await.map_err(|e| {
224 error_utils::state_error(format!("接收状态响应失败: {e}"))
225 })
226 }
227
228 pub async fn get_tr(&self) -> ForgeResult<Transaction> {
232 let state = self.get_state().await?;
233 Ok(state.tr())
234 }
235
236 pub async fn undo(&mut self) -> ForgeResult<()> {
240 let (tx, rx) = oneshot::channel();
241
242 self.actor_system()?
243 .state_actor
244 .send_message(StateMessage::Undo { reply: tx })
245 .map_err(|e| {
246 error_utils::state_error(format!("发送撤销消息失败: {e}"))
247 })?;
248
249 rx.await
250 .map_err(|e| {
251 error_utils::state_error(format!("接收撤销响应失败: {e}"))
252 })?
253 .map(|_| ())
254 }
255
256 pub async fn redo(&mut self) -> ForgeResult<()> {
260 let (tx, rx) = oneshot::channel();
261
262 self.actor_system()?
263 .state_actor
264 .send_message(StateMessage::Redo { reply: tx })
265 .map_err(|e| {
266 error_utils::state_error(format!("发送重做消息失败: {e}"))
267 })?;
268
269 rx.await
270 .map_err(|e| {
271 error_utils::state_error(format!("接收重做响应失败: {e}"))
272 })?
273 .map(|_| ())
274 }
275
276 pub async fn jump(
280 &mut self,
281 steps: isize,
282 ) -> ForgeResult<()> {
283 let (tx, rx) = oneshot::channel();
284
285 self.actor_system()?
286 .state_actor
287 .send_message(StateMessage::Jump { steps, reply: tx })
288 .map_err(|e| {
289 error_utils::state_error(format!("发送跳转消息失败: {e}"))
290 })?;
291
292 rx.await
293 .map_err(|e| {
294 error_utils::state_error(format!("接收跳转响应失败: {e}"))
295 })?
296 .map(|_| ())
297 }
298
299 pub async fn emit_event(
303 &mut self,
304 event: Event,
305 ) -> ForgeResult<()> {
306 metrics::event_emitted(event.name());
307
308 self.actor_system()?
309 .event_bus
310 .send_message(EventBusMessage::PublishEvent { event })
311 .map_err(|e| {
312 error_utils::event_error(format!("发送事件消息失败: {e}"))
313 })?;
314
315 Ok(())
316 }
317
318 pub fn get_config(&self) -> &ForgeConfig {
322 &self.config
323 }
324
325 pub fn update_config(
329 &mut self,
330 config: ForgeConfig,
331 ) {
332 self.config = config;
333 }
335
336 #[cfg_attr(
340 feature = "dev-tracing",
341 tracing::instrument(
342 skip(self),
343 fields(crate_name = "core", runtime_type = "actor")
344 )
345 )]
346 pub async fn destroy(&mut self) -> ForgeResult<()> {
347 debug!("正在销毁Actor运行时实例");
348
349 if self.started {
350 let _ = self.emit_event(Event::Destroy).await;
352
353 if let Some(actor_system) = self.actor_system.take() {
355 ForgeActorSystem::shutdown(actor_system).await.map_err(
356 |e| {
357 error_utils::engine_error(format!(
358 "关闭Actor系统失败: {e}"
359 ))
360 },
361 )?;
362 }
363
364 self.started = false;
365 }
366
367 debug!("Actor运行时实例销毁成功");
368 Ok(())
369 }
370
371 pub fn is_started(&self) -> bool {
373 self.started
374 }
375
376 pub async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
378 let state = self.get_state().await?;
379 Ok(state.schema())
380 }
381
382 pub fn get_options(&self) -> RuntimeOptions {
384 RuntimeOptions::default()
385 }
386}
387
388impl Drop for ForgeActorRuntime {
390 fn drop(&mut self) {
391 if self.started {
392 debug!("ForgeActorRuntime Drop: 检测到未正确关闭的运行时");
393 }
396 }
397}
398
399#[async_trait]
402impl RuntimeTrait for ForgeActorRuntime {
403 async fn dispatch(
404 &mut self,
405 transaction: Transaction,
406 ) -> ForgeResult<()> {
407 self.dispatch(transaction).await
408 }
409
410 async fn dispatch_with_meta(
411 &mut self,
412 transaction: Transaction,
413 description: String,
414 meta: serde_json::Value,
415 ) -> ForgeResult<()> {
416 self.dispatch_with_meta(transaction, description, meta).await
417 }
418
419 async fn command(
420 &mut self,
421 command: Arc<dyn Command>,
422 ) -> ForgeResult<()> {
423 self.command(command).await
424 }
425
426 async fn command_with_meta(
427 &mut self,
428 command: Arc<dyn Command>,
429 description: String,
430 meta: serde_json::Value,
431 ) -> ForgeResult<()> {
432 self.command_with_meta(command, description, meta).await
433 }
434
435 async fn get_state(&self) -> ForgeResult<Arc<State>> {
436 self.get_state().await
437 }
438
439 async fn get_tr(&self) -> ForgeResult<Transaction> {
440 self.get_tr().await
441 }
442
443 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
444 self.get_schema().await
445 }
446
447 async fn undo(&mut self) -> ForgeResult<()> {
448 self.undo().await
449 }
450
451 async fn redo(&mut self) -> ForgeResult<()> {
452 self.redo().await
453 }
454
455 async fn jump(
456 &mut self,
457 steps: isize,
458 ) -> ForgeResult<()> {
459 self.jump(steps).await
460 }
461
462 fn get_config(&self) -> &ForgeConfig {
463 self.get_config()
464 }
465
466 fn update_config(
467 &mut self,
468 config: ForgeConfig,
469 ) {
470 self.update_config(config);
471 }
472
473 fn get_options(&self) -> &RuntimeOptions {
474 thread_local! {
477 static DEFAULT_OPTIONS: RuntimeOptions = RuntimeOptions::default();
478 }
479 DEFAULT_OPTIONS.with(|opts| unsafe {
480 std::mem::transmute::<&RuntimeOptions, &'static RuntimeOptions>(
482 opts,
483 )
484 })
485 }
486
487 async fn destroy(&mut self) -> ForgeResult<()> {
488 self.destroy().await
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495
496 #[tokio::test]
497 async fn test_actor_runtime_creation() {
498 let options = RuntimeOptions::default();
499 let result = ForgeActorRuntime::create(options).await;
500
501 assert!(result.is_ok() || result.is_err()); }
505
506 #[tokio::test]
507 async fn test_actor_runtime_api_compatibility() {
508 let options = RuntimeOptions::default();
512 if let Ok(mut runtime) = ForgeActorRuntime::create(options).await {
513 let _ = runtime.get_config();
515 let _ = runtime.is_started();
516
517 let _ = runtime.destroy().await;
519 }
520 }
521}