1use std::sync::Arc;
6use std::time::Instant;
7use async_trait::async_trait;
8use tokio::sync::oneshot;
9
10use crate::{
11 actors::{
12 system::{ForgeActorSystem, ForgeActorSystemHandle, ActorSystemConfig},
13 transaction_processor::TransactionMessage,
14 state_actor::StateMessage,
15 event_bus::EventBusMessage,
16 },
17 config::ForgeConfig,
18 debug::debug,
19 error::{error_utils, ForgeResult},
20 event::Event,
21 runtime::runtime_trait::RuntimeTrait,
22 types::RuntimeOptions,
23 metrics,
24};
25
26use mf_model::schema::Schema;
27use mf_state::{
28 state::State,
29 transaction::{Command, Transaction},
30};
31
32pub struct ForgeActorRuntime {
37 actor_system: Option<ForgeActorSystemHandle>,
39 config: ForgeConfig,
41 started: bool,
43}
44
45impl ForgeActorRuntime {
46 fn actor_system(&self) -> ForgeResult<&ForgeActorSystemHandle> {
48 self.actor_system.as_ref().ok_or_else(|| {
49 error_utils::engine_error("Actor系统未初始化".to_string())
50 })
51 }
52
53 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
61 Self::create_with_config(options, ForgeConfig::default()).await
62 }
63
64 pub async fn create_with_config(
73 options: RuntimeOptions,
74 config: ForgeConfig,
75 ) -> ForgeResult<Self> {
76 let start_time = Instant::now();
77 debug!("正在创建Actor运行时实例");
78
79 let actor_system = ForgeActorSystem::start(
81 options,
82 config.clone(),
83 ActorSystemConfig::default(),
84 )
85 .await
86 .map_err(|e| {
87 error_utils::engine_error(format!("启动Actor系统失败: {e}"))
88 })?;
89
90 debug!("Actor运行时实例创建成功");
91 metrics::editor_creation_duration(start_time.elapsed());
92
93 Ok(ForgeActorRuntime {
94 actor_system: Some(actor_system),
95 config,
96 started: true,
97 })
98 }
99
100 pub async fn dispatch(
104 &mut self,
105 transaction: Transaction,
106 ) -> ForgeResult<()> {
107 self.dispatch_with_meta(
108 transaction,
109 "".to_string(),
110 serde_json::Value::Null,
111 )
112 .await
113 }
114
115 pub async fn dispatch_with_meta(
119 &mut self,
120 transaction: Transaction,
121 description: String,
122 meta: serde_json::Value,
123 ) -> ForgeResult<()> {
124 if !self.started {
125 return Err(error_utils::engine_error("运行时未启动".to_string()));
126 }
127
128 let (tx, rx) = oneshot::channel();
130
131 self.actor_system()?
132 .transaction_processor
133 .send_message(TransactionMessage::ProcessTransaction {
134 transaction,
135 description,
136 meta,
137 reply: tx,
138 })
139 .map_err(|e| {
140 error_utils::engine_error(format!("发送事务消息失败: {e}"))
141 })?;
142
143 rx.await.map_err(|e| {
144 error_utils::engine_error(format!("等待事务处理结果失败: {e}"))
145 })?
146 }
147
148 pub async fn command(
152 &mut self,
153 command: Arc<dyn Command>,
154 ) -> ForgeResult<()> {
155 debug!("正在执行命令: {}", command.name());
156 metrics::command_executed(command.name().as_str());
157
158 let mut tr = self.get_tr().await?;
159 command.execute(&mut tr).await?;
160 tr.commit()?;
161 self.dispatch(tr).await
162 }
163
164 pub async fn command_with_meta(
168 &mut self,
169 command: Arc<dyn Command>,
170 description: String,
171 meta: serde_json::Value,
172 ) -> ForgeResult<()> {
173 debug!("正在执行命令: {}", command.name());
174 metrics::command_executed(command.name().as_str());
175
176 let mut tr = self.get_tr().await?;
177 command.execute(&mut tr).await?;
178 tr.commit()?;
179 self.dispatch_with_meta(tr, description, meta).await
180 }
181
182 pub async fn get_state(&self) -> ForgeResult<Arc<State>> {
186 let (tx, rx) = oneshot::channel();
187
188 self.actor_system()?
189 .state_actor
190 .send_message(StateMessage::GetState { reply: tx })
191 .map_err(|e| {
192 error_utils::state_error(format!("发送获取状态消息失败: {e}"))
193 })?;
194
195 rx.await.map_err(|e| {
196 error_utils::state_error(format!("接收状态响应失败: {e}"))
197 })
198 }
199
200 pub async fn get_tr(&self) -> ForgeResult<Transaction> {
204 let state = self.get_state().await?;
205 Ok(state.tr())
206 }
207
208 pub async fn undo(&mut self) -> ForgeResult<()> {
212 let (tx, rx) = oneshot::channel();
213
214 self.actor_system()?
215 .state_actor
216 .send_message(StateMessage::Undo { reply: tx })
217 .map_err(|e| {
218 error_utils::state_error(format!("发送撤销消息失败: {e}"))
219 })?;
220
221 rx.await
222 .map_err(|e| {
223 error_utils::state_error(format!("接收撤销响应失败: {e}"))
224 })?
225 .map(|_| ())
226 }
227
228 pub async fn redo(&mut self) -> ForgeResult<()> {
232 let (tx, rx) = oneshot::channel();
233
234 self.actor_system()?
235 .state_actor
236 .send_message(StateMessage::Redo { reply: tx })
237 .map_err(|e| {
238 error_utils::state_error(format!("发送重做消息失败: {e}"))
239 })?;
240
241 rx.await
242 .map_err(|e| {
243 error_utils::state_error(format!("接收重做响应失败: {e}"))
244 })?
245 .map(|_| ())
246 }
247
248 pub async fn jump(
252 &mut self,
253 steps: isize,
254 ) -> ForgeResult<()> {
255 let (tx, rx) = oneshot::channel();
256
257 self.actor_system()?
258 .state_actor
259 .send_message(StateMessage::Jump { steps, reply: tx })
260 .map_err(|e| {
261 error_utils::state_error(format!("发送跳转消息失败: {e}"))
262 })?;
263
264 rx.await
265 .map_err(|e| {
266 error_utils::state_error(format!("接收跳转响应失败: {e}"))
267 })?
268 .map(|_| ())
269 }
270
271 pub async fn emit_event(
275 &mut self,
276 event: Event,
277 ) -> ForgeResult<()> {
278 metrics::event_emitted(event.name());
279
280 self.actor_system()?
281 .event_bus
282 .send_message(EventBusMessage::PublishEvent { event })
283 .map_err(|e| {
284 error_utils::event_error(format!("发送事件消息失败: {e}"))
285 })?;
286
287 Ok(())
288 }
289
290 pub fn get_config(&self) -> &ForgeConfig {
294 &self.config
295 }
296
297 pub fn update_config(
301 &mut self,
302 config: ForgeConfig,
303 ) {
304 self.config = config;
305 }
307
308 pub async fn destroy(&mut self) -> ForgeResult<()> {
312 debug!("正在销毁Actor运行时实例");
313
314 if self.started {
315 let _ = self.emit_event(Event::Destroy).await;
317
318 if let Some(actor_system) = self.actor_system.take() {
320 ForgeActorSystem::shutdown(actor_system).await.map_err(
321 |e| {
322 error_utils::engine_error(format!(
323 "关闭Actor系统失败: {e}"
324 ))
325 },
326 )?;
327 }
328
329 self.started = false;
330 }
331
332 debug!("Actor运行时实例销毁成功");
333 Ok(())
334 }
335
336 pub fn is_started(&self) -> bool {
338 self.started
339 }
340
341 pub async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
343 let state = self.get_state().await?;
344 Ok(state.schema())
345 }
346
347 pub fn get_options(&self) -> RuntimeOptions {
349 RuntimeOptions::default()
350 }
351}
352
353impl Drop for ForgeActorRuntime {
355 fn drop(&mut self) {
356 if self.started {
357 debug!("ForgeActorRuntime Drop: 检测到未正确关闭的运行时");
358 }
361 }
362}
363
364#[async_trait]
367impl RuntimeTrait for ForgeActorRuntime {
368 async fn dispatch(
369 &mut self,
370 transaction: Transaction,
371 ) -> ForgeResult<()> {
372 self.dispatch(transaction).await
373 }
374
375 async fn dispatch_with_meta(
376 &mut self,
377 transaction: Transaction,
378 description: String,
379 meta: serde_json::Value,
380 ) -> ForgeResult<()> {
381 self.dispatch_with_meta(transaction, description, meta).await
382 }
383
384 async fn command(
385 &mut self,
386 command: Arc<dyn Command>,
387 ) -> ForgeResult<()> {
388 self.command(command).await
389 }
390
391 async fn command_with_meta(
392 &mut self,
393 command: Arc<dyn Command>,
394 description: String,
395 meta: serde_json::Value,
396 ) -> ForgeResult<()> {
397 self.command_with_meta(command, description, meta).await
398 }
399
400 async fn get_state(&self) -> ForgeResult<Arc<State>> {
401 self.get_state().await
402 }
403
404 async fn get_tr(&self) -> ForgeResult<Transaction> {
405 self.get_tr().await
406 }
407
408 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
409 self.get_schema().await
410 }
411
412 async fn undo(&mut self) -> ForgeResult<()> {
413 self.undo().await
414 }
415
416 async fn redo(&mut self) -> ForgeResult<()> {
417 self.redo().await
418 }
419
420 async fn jump(
421 &mut self,
422 steps: isize,
423 ) -> ForgeResult<()> {
424 self.jump(steps).await
425 }
426
427 fn get_config(&self) -> &ForgeConfig {
428 self.get_config()
429 }
430
431 fn update_config(
432 &mut self,
433 config: ForgeConfig,
434 ) {
435 self.update_config(config);
436 }
437
438 fn get_options(&self) -> &RuntimeOptions {
439 thread_local! {
442 static DEFAULT_OPTIONS: RuntimeOptions = RuntimeOptions::default();
443 }
444 DEFAULT_OPTIONS.with(|opts| unsafe {
445 std::mem::transmute::<&RuntimeOptions, &'static RuntimeOptions>(
447 opts,
448 )
449 })
450 }
451
452 async fn destroy(&mut self) -> ForgeResult<()> {
453 self.destroy().await
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460
461 #[tokio::test]
462 async fn test_actor_runtime_creation() {
463 let options = RuntimeOptions::default();
464 let result = ForgeActorRuntime::create(options).await;
465
466 assert!(result.is_ok() || result.is_err()); }
470
471 #[tokio::test]
472 async fn test_actor_runtime_api_compatibility() {
473 let options = RuntimeOptions::default();
477 if let Ok(mut runtime) = ForgeActorRuntime::create(options).await {
478 let _ = runtime.get_config();
480 let _ = runtime.is_started();
481
482 let _ = runtime.destroy().await;
484 }
485 }
486}