1use std::sync::Arc;
6use std::time::Instant;
7use async_trait::async_trait;
8use tokio::sync::oneshot;
9
10use crate::{
11 actors::{
12 system::{ForgeActorSystem, ForgeActorSystemHandle, ActorSystemConfig},
13 transaction_processor::TransactionMessage,
14 state_actor::StateMessage,
15 event_bus::EventBusMessage,
16 },
17 config::ForgeConfig,
18 debug::debug,
19 error::{error_utils, ForgeResult},
20 event::Event,
21 runtime::runtime_trait::RuntimeTrait,
22 types::RuntimeOptions,
23 metrics,
24};
25
26use mf_model::schema::Schema;
27use mf_state::{
28 state::State,
29 transaction::{Command, Transaction},
30};
31
32pub struct ForgeActorRuntime {
37 actor_system: Option<ForgeActorSystemHandle>,
39 config: ForgeConfig,
41 started: bool,
43}
44
45impl ForgeActorRuntime {
46 fn actor_system(&self) -> ForgeResult<&ForgeActorSystemHandle> {
48 self.actor_system.as_ref().ok_or_else(|| {
49 error_utils::engine_error("Actor系统未初始化".to_string())
50 })
51 }
52
53 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options), fields(
61 crate_name = "core",
62 runtime_type = "actor"
63 )))]
64 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
65 Self::create_with_config(options, ForgeConfig::default()).await
66 }
67
68 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
77 crate_name = "core",
78 runtime_type = "actor",
79 has_middleware = !options.get_middleware_stack().is_empty()
80 )))]
81 pub async fn create_with_config(
82 options: RuntimeOptions,
83 config: ForgeConfig,
84 ) -> ForgeResult<Self> {
85 let start_time = Instant::now();
86 debug!("正在创建Actor运行时实例");
87
88 let actor_system = ForgeActorSystem::start(
90 options,
91 config.clone(),
92 ActorSystemConfig::default(),
93 )
94 .await
95 .map_err(|e| {
96 error_utils::engine_error(format!("启动Actor系统失败: {e}"))
97 })?;
98
99 debug!("Actor运行时实例创建成功");
100 metrics::editor_creation_duration(start_time.elapsed());
101
102 Ok(ForgeActorRuntime {
103 actor_system: Some(actor_system),
104 config,
105 started: true,
106 })
107 }
108
109 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
113 crate_name = "core",
114 tr_id = %transaction.id,
115 runtime_type = "actor"
116 )))]
117 pub async fn dispatch(
118 &mut self,
119 transaction: Transaction,
120 ) -> ForgeResult<()> {
121 self.dispatch_with_meta(
122 transaction,
123 "".to_string(),
124 serde_json::Value::Null,
125 )
126 .await
127 }
128
129 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
133 crate_name = "core",
134 tr_id = %transaction.id,
135 description = %description,
136 runtime_type = "actor"
137 )))]
138 pub async fn dispatch_with_meta(
139 &mut self,
140 transaction: Transaction,
141 description: String,
142 meta: serde_json::Value,
143 ) -> ForgeResult<()> {
144 if !self.started {
145 return Err(error_utils::engine_error("运行时未启动".to_string()));
146 }
147
148 let (tx, rx) = oneshot::channel();
150
151 self.actor_system()?
152 .transaction_processor
153 .send_message(TransactionMessage::ProcessTransaction {
154 transaction,
155 description,
156 meta,
157 reply: tx,
158 })
159 .map_err(|e| {
160 error_utils::engine_error(format!("发送事务消息失败: {e}"))
161 })?;
162
163 rx.await.map_err(|e| {
164 error_utils::engine_error(format!("等待事务处理结果失败: {e}"))
165 })?
166 }
167
168 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
172 crate_name = "core",
173 command_name = %command.name(),
174 runtime_type = "actor"
175 )))]
176 pub async fn command(
177 &mut self,
178 command: Arc<dyn Command>,
179 ) -> ForgeResult<()> {
180 debug!("正在执行命令: {}", command.name());
181 metrics::command_executed(command.name().as_str());
182
183 let mut tr = self.get_tr().await?;
184 command.execute(&mut tr).await?;
185 tr.commit()?;
186 self.dispatch(tr).await
187 }
188
189 pub async fn command_with_meta(
193 &mut self,
194 command: Arc<dyn Command>,
195 description: String,
196 meta: serde_json::Value,
197 ) -> ForgeResult<()> {
198 debug!("正在执行命令: {}", command.name());
199 metrics::command_executed(command.name().as_str());
200
201 let mut tr = self.get_tr().await?;
202 command.execute(&mut tr).await?;
203 tr.commit()?;
204 self.dispatch_with_meta(tr, description, meta).await
205 }
206
207 pub async fn get_state(&self) -> ForgeResult<Arc<State>> {
211 let (tx, rx) = oneshot::channel();
212
213 self.actor_system()?
214 .state_actor
215 .send_message(StateMessage::GetState { reply: tx })
216 .map_err(|e| {
217 error_utils::state_error(format!("发送获取状态消息失败: {e}"))
218 })?;
219
220 rx.await.map_err(|e| {
221 error_utils::state_error(format!("接收状态响应失败: {e}"))
222 })
223 }
224
225 pub async fn get_tr(&self) -> ForgeResult<Transaction> {
229 let state = self.get_state().await?;
230 Ok(state.tr())
231 }
232
233 pub async fn undo(&mut self) -> ForgeResult<()> {
237 let (tx, rx) = oneshot::channel();
238
239 self.actor_system()?
240 .state_actor
241 .send_message(StateMessage::Undo { reply: tx })
242 .map_err(|e| {
243 error_utils::state_error(format!("发送撤销消息失败: {e}"))
244 })?;
245
246 rx.await
247 .map_err(|e| {
248 error_utils::state_error(format!("接收撤销响应失败: {e}"))
249 })?
250 .map(|_| ())
251 }
252
253 pub async fn redo(&mut self) -> ForgeResult<()> {
257 let (tx, rx) = oneshot::channel();
258
259 self.actor_system()?
260 .state_actor
261 .send_message(StateMessage::Redo { reply: tx })
262 .map_err(|e| {
263 error_utils::state_error(format!("发送重做消息失败: {e}"))
264 })?;
265
266 rx.await
267 .map_err(|e| {
268 error_utils::state_error(format!("接收重做响应失败: {e}"))
269 })?
270 .map(|_| ())
271 }
272
273 pub async fn jump(
277 &mut self,
278 steps: isize,
279 ) -> ForgeResult<()> {
280 let (tx, rx) = oneshot::channel();
281
282 self.actor_system()?
283 .state_actor
284 .send_message(StateMessage::Jump { steps, reply: tx })
285 .map_err(|e| {
286 error_utils::state_error(format!("发送跳转消息失败: {e}"))
287 })?;
288
289 rx.await
290 .map_err(|e| {
291 error_utils::state_error(format!("接收跳转响应失败: {e}"))
292 })?
293 .map(|_| ())
294 }
295
296 pub async fn emit_event(
300 &mut self,
301 event: Event,
302 ) -> ForgeResult<()> {
303 metrics::event_emitted(event.name());
304
305 self.actor_system()?
306 .event_bus
307 .send_message(EventBusMessage::PublishEvent { event })
308 .map_err(|e| {
309 error_utils::event_error(format!("发送事件消息失败: {e}"))
310 })?;
311
312 Ok(())
313 }
314
315 pub fn get_config(&self) -> &ForgeConfig {
319 &self.config
320 }
321
322 pub fn update_config(
326 &mut self,
327 config: ForgeConfig,
328 ) {
329 self.config = config;
330 }
332
333 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self), fields(
337 crate_name = "core",
338 runtime_type = "actor"
339 )))]
340 pub async fn destroy(&mut self) -> ForgeResult<()> {
341 debug!("正在销毁Actor运行时实例");
342
343 if self.started {
344 let _ = self.emit_event(Event::Destroy).await;
346
347 if let Some(actor_system) = self.actor_system.take() {
349 ForgeActorSystem::shutdown(actor_system).await.map_err(
350 |e| {
351 error_utils::engine_error(format!(
352 "关闭Actor系统失败: {e}"
353 ))
354 },
355 )?;
356 }
357
358 self.started = false;
359 }
360
361 debug!("Actor运行时实例销毁成功");
362 Ok(())
363 }
364
365 pub fn is_started(&self) -> bool {
367 self.started
368 }
369
370 pub async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
372 let state = self.get_state().await?;
373 Ok(state.schema())
374 }
375
376 pub fn get_options(&self) -> RuntimeOptions {
378 RuntimeOptions::default()
379 }
380}
381
382impl Drop for ForgeActorRuntime {
384 fn drop(&mut self) {
385 if self.started {
386 debug!("ForgeActorRuntime Drop: 检测到未正确关闭的运行时");
387 }
390 }
391}
392
393#[async_trait]
396impl RuntimeTrait for ForgeActorRuntime {
397 async fn dispatch(
398 &mut self,
399 transaction: Transaction,
400 ) -> ForgeResult<()> {
401 self.dispatch(transaction).await
402 }
403
404 async fn dispatch_with_meta(
405 &mut self,
406 transaction: Transaction,
407 description: String,
408 meta: serde_json::Value,
409 ) -> ForgeResult<()> {
410 self.dispatch_with_meta(transaction, description, meta).await
411 }
412
413 async fn command(
414 &mut self,
415 command: Arc<dyn Command>,
416 ) -> ForgeResult<()> {
417 self.command(command).await
418 }
419
420 async fn command_with_meta(
421 &mut self,
422 command: Arc<dyn Command>,
423 description: String,
424 meta: serde_json::Value,
425 ) -> ForgeResult<()> {
426 self.command_with_meta(command, description, meta).await
427 }
428
429 async fn get_state(&self) -> ForgeResult<Arc<State>> {
430 self.get_state().await
431 }
432
433 async fn get_tr(&self) -> ForgeResult<Transaction> {
434 self.get_tr().await
435 }
436
437 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
438 self.get_schema().await
439 }
440
441 async fn undo(&mut self) -> ForgeResult<()> {
442 self.undo().await
443 }
444
445 async fn redo(&mut self) -> ForgeResult<()> {
446 self.redo().await
447 }
448
449 async fn jump(
450 &mut self,
451 steps: isize,
452 ) -> ForgeResult<()> {
453 self.jump(steps).await
454 }
455
456 fn get_config(&self) -> &ForgeConfig {
457 self.get_config()
458 }
459
460 fn update_config(
461 &mut self,
462 config: ForgeConfig,
463 ) {
464 self.update_config(config);
465 }
466
467 fn get_options(&self) -> &RuntimeOptions {
468 thread_local! {
471 static DEFAULT_OPTIONS: RuntimeOptions = RuntimeOptions::default();
472 }
473 DEFAULT_OPTIONS.with(|opts| unsafe {
474 std::mem::transmute::<&RuntimeOptions, &'static RuntimeOptions>(
476 opts,
477 )
478 })
479 }
480
481 async fn destroy(&mut self) -> ForgeResult<()> {
482 self.destroy().await
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489
490 #[tokio::test]
491 async fn test_actor_runtime_creation() {
492 let options = RuntimeOptions::default();
493 let result = ForgeActorRuntime::create(options).await;
494
495 assert!(result.is_ok() || result.is_err()); }
499
500 #[tokio::test]
501 async fn test_actor_runtime_api_compatibility() {
502 let options = RuntimeOptions::default();
506 if let Ok(mut runtime) = ForgeActorRuntime::create(options).await {
507 let _ = runtime.get_config();
509 let _ = runtime.is_started();
510
511 let _ = runtime.destroy().await;
513 }
514 }
515}