1use ractor::ActorRef;
6use std::sync::Arc;
7use tokio::sync::oneshot;
8
9use crate::{
10 config::ForgeConfig,
11 debug::debug,
12 extension_manager::ExtensionManager,
13 history_manager::HistoryManager,
14 runtime::sync_flow::FlowEngine,
15 types::{RuntimeOptions, HistoryEntryWithMeta},
16};
17
18use mf_state::state::State;
19
20use super::{
21 event_bus::{EventBusActorManager, EventBusMessage},
22 extension_manager::{ExtensionManagerActorManager, ExtensionMessage},
23 state_actor::{StateActorManager, StateMessage},
24 transaction_processor::{TransactionProcessorManager, TransactionMessage},
25 ActorSystemError, ActorSystemResult,
26};
27
28#[derive(Debug, Clone)]
30pub struct ActorSystemConfig {
31 pub system_name: String,
33 pub enable_supervision: bool,
35 pub shutdown_timeout_ms: u64,
37 pub enable_metrics: bool,
39}
40
41impl Default for ActorSystemConfig {
42 fn default() -> Self {
43 Self {
44 system_name: "ForgeActorSystem".to_string(),
45 enable_supervision: true,
46 shutdown_timeout_ms: 5000,
47 enable_metrics: true,
48 }
49 }
50}
51
52pub struct ForgeActorSystemHandle {
54 pub transaction_processor: ActorRef<TransactionMessage>,
56 pub state_actor: ActorRef<StateMessage>,
58 pub event_bus: ActorRef<EventBusMessage>,
60 pub extension_manager: ActorRef<ExtensionMessage>,
62 pub config: ActorSystemConfig,
64}
65
66pub struct ForgeActorSystem;
68
69impl ForgeActorSystem {
70 pub async fn start(
80 runtime_options: RuntimeOptions,
81 forge_config: ForgeConfig,
82 system_config: ActorSystemConfig,
83 ) -> ActorSystemResult<ForgeActorSystemHandle> {
84 debug!("启动ForgeActorSystem: {}", system_config.system_name);
85
86 let extension_manager =
88 Self::create_extension_manager(&runtime_options, &forge_config)?;
89 let extension_manager_actor =
90 ExtensionManagerActorManager::start(extension_manager).await?;
91
92 let (initial_state, history_manager) = Self::create_state_and_history(
94 &runtime_options,
95 &forge_config,
96 &extension_manager_actor,
97 )
98 .await?;
99
100 let state_actor =
102 StateActorManager::start(initial_state, history_manager).await?;
103
104 let event_bus =
106 EventBusActorManager::start(forge_config.event.clone()).await?;
107
108 if !runtime_options.get_event_handlers().is_empty() {
110 EventBusActorManager::add_handlers(
111 &event_bus,
112 runtime_options.get_event_handlers(),
113 )
114 .await
115 .map_err(|e| ActorSystemError::ConfigurationError {
116 message: format!("添加事件处理器失败: {e}"),
117 })?;
118 }
119
120 let flow_engine = Arc::new(FlowEngine::new().map_err(|e| {
122 ActorSystemError::ConfigurationError {
123 message: format!("创建流引擎失败: {e}"),
124 }
125 })?);
126
127 let transaction_processor = TransactionProcessorManager::start(
129 state_actor.clone(),
130 event_bus.clone(),
131 runtime_options.get_middleware_stack(),
132 flow_engine,
133 forge_config,
134 )
135 .await?;
136
137 debug!("ForgeActorSystem启动完成");
138
139 Ok(ForgeActorSystemHandle {
140 transaction_processor,
141 state_actor,
142 event_bus,
143 extension_manager: extension_manager_actor,
144 config: system_config,
145 })
146 }
147
148 pub async fn shutdown(
156 handle: ForgeActorSystemHandle
157 ) -> ActorSystemResult<()> {
158 debug!("关闭ForgeActorSystem: {}", handle.config.system_name);
159
160 let shutdown_timeout = tokio::time::Duration::from_millis(
161 handle.config.shutdown_timeout_ms,
162 );
163
164 let _ = tokio::time::timeout(shutdown_timeout, async {
167 handle.transaction_processor.stop(None);
168 })
169 .await;
170
171 let _ = tokio::time::timeout(shutdown_timeout, async {
173 handle.event_bus.stop(None);
174 })
175 .await;
176
177 let _ = tokio::time::timeout(shutdown_timeout, async {
179 handle.state_actor.stop(None);
180 })
181 .await;
182
183 let _ = tokio::time::timeout(shutdown_timeout, async {
185 handle.extension_manager.stop(None);
186 })
187 .await;
188
189 debug!("ForgeActorSystem关闭完成");
190 Ok(())
191 }
192
193 fn create_extension_manager(
195 runtime_options: &RuntimeOptions,
196 forge_config: &ForgeConfig,
197 ) -> ActorSystemResult<ExtensionManager> {
198 crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
199 runtime_options,
200 forge_config,
201 )
202 .map_err(|e| ActorSystemError::ConfigurationError {
203 message: format!("创建扩展管理器失败: {e}"),
204 })
205 }
206
207 async fn create_state_and_history(
209 runtime_options: &RuntimeOptions,
210 forge_config: &ForgeConfig,
211 extension_manager_actor: &ActorRef<ExtensionMessage>,
212 ) -> ActorSystemResult<(Arc<State>, HistoryManager<HistoryEntryWithMeta>)>
213 {
214 let (tx, rx) = oneshot::channel();
216 extension_manager_actor
217 .send_message(ExtensionMessage::GetSchema { reply: tx })
218 .map_err(|e| ActorSystemError::CommunicationFailed {
219 message: format!("获取Schema失败: {e}"),
220 })?;
221
222 let schema =
223 rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
224 message: format!("接收Schema失败: {e}"),
225 })?;
226
227 let (tx, rx) = oneshot::channel();
229 extension_manager_actor
230 .send_message(ExtensionMessage::GetPlugins { reply: tx })
231 .map_err(|e| ActorSystemError::CommunicationFailed {
232 message: format!("获取插件失败: {e}"),
233 })?;
234
235 let plugins =
236 rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
237 message: format!("接收插件失败: {e}"),
238 })?;
239 println!("获取插件: {:?}", plugins.len());
240 let (tx, rx) = oneshot::channel();
242 extension_manager_actor
243 .send_message(ExtensionMessage::GetOpFns { reply: tx })
244 .map_err(|e| ActorSystemError::CommunicationFailed {
245 message: format!("获取操作函数失败: {e}"),
246 })?;
247
248 let op_fns =
249 rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
250 message: format!("接收操作函数失败: {e}"),
251 })?;
252
253 let op_state = mf_state::ops::GlobalResourceManager::new();
255 for op_fn in &op_fns {
256 op_fn(&op_state).map_err(|e| {
257 ActorSystemError::ConfigurationError {
258 message: format!("执行操作函数失败: {e}"),
259 }
260 })?;
261 }
262
263 let mut state_config = mf_state::state::StateConfig {
265 schema: Some(schema),
266 doc: None,
267 stored_marks: None,
268 plugins: Some(plugins),
269 resource_manager: Some(Arc::new(op_state)),
270 };
271
272 crate::helpers::create_doc::create_doc(
274 &runtime_options.get_content(),
275 &mut state_config,
276 )
277 .await
278 .map_err(|e| ActorSystemError::ConfigurationError {
279 message: format!("创建文档失败: {e}"),
280 })?;
281
282 let state = State::create(state_config).await.map_err(|e| {
284 ActorSystemError::ConfigurationError {
285 message: format!("创建状态失败: {e}"),
286 }
287 })?;
288
289 let state = Arc::new(state);
290
291 let history_manager = HistoryManager::with_config(
293 HistoryEntryWithMeta::new(
294 state.clone(),
295 "创建工程项目".to_string(),
296 serde_json::Value::Null,
297 ),
298 forge_config.history.clone(),
299 );
300
301 Ok((state, history_manager))
302 }
303}