use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use crate::{
config::ForgeConfig,
debug::{debug, info},
error::{error_utils, ForgeResult},
event::{Event, EventBus},
extension_manager::ExtensionManager,
helpers::{
create_doc, event_helper::EventHelper, history_helper::HistoryHelper,
middleware_helper::MiddlewareHelper,
},
history_manager::HistoryManager,
metrics,
runtime::{runtime_trait::RuntimeTrait, sync_flow::FlowEngine},
types::{HistoryEntryWithMeta, ProcessorResult, RuntimeOptions},
};
use mf_model::{node_pool::NodePool, schema::Schema};
use mf_state::{
ops::GlobalResourceManager,
state::{State, StateConfig},
transaction::{Command, Transaction},
};
pub struct ForgeRuntime {
event_bus: EventBus<Event>,
state: Arc<State>,
flow_engine: Arc<FlowEngine>,
extension_manager: ExtensionManager,
history_manager: HistoryManager<HistoryEntryWithMeta>,
options: RuntimeOptions,
config: ForgeConfig,
}
impl ForgeRuntime {
#[cfg_attr(
feature = "dev-tracing",
tracing::instrument(
skip(options),
fields(crate_name = "core", runtime_type = "sync")
)
)]
pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
Self::create_with_config(options, ForgeConfig::default()).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
crate_name = "core",
schema_path = xml_schema_path
)))]
pub async fn from_xml_schema_path(
xml_schema_path: &str,
options: Option<RuntimeOptions>,
config: Option<ForgeConfig>,
) -> ForgeResult<Self> {
let mut config = config.unwrap_or_default();
config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
Self::create_with_config(options.unwrap_or_default(), config).await
}
fn merge_options_with_extensions(
options: Option<RuntimeOptions>,
extension_manager: ExtensionManager,
) -> RuntimeOptions {
match options {
Some(opts) => {
let schema = extension_manager.get_schema();
let mut xml_extensions = Vec::new();
let factory = schema.factory();
let (nodes, marks) = factory.definitions();
for (name, node_type) in nodes {
let node =
crate::node::Node::create(name, node_type.spec.clone());
xml_extensions.push(crate::types::Extensions::N(node));
}
for (name, mark_type) in marks {
let mark =
crate::mark::Mark::new(name, mark_type.spec.clone());
xml_extensions.push(crate::types::Extensions::M(mark));
}
let existing_extensions = opts.get_extensions();
xml_extensions.extend(existing_extensions);
opts.set_extensions(xml_extensions)
},
None => {
RuntimeOptions::from_extension_manager(extension_manager)
},
}
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
crate_name = "core",
schema_count = xml_schema_paths.len(),
runtime_type = "sync"
)))]
pub async fn from_xml_schemas(
xml_schema_paths: &[&str],
options: Option<RuntimeOptions>,
config: Option<ForgeConfig>,
) -> ForgeResult<Self> {
let mut config = config.unwrap_or_default();
config.extension.xml_schema_paths =
xml_schema_paths.iter().map(|s| s.to_string()).collect();
Self::create_with_config(options.unwrap_or_default(), config).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
crate_name = "core",
content_size = xml_content.len(),
runtime_type = "sync"
)))]
pub async fn from_xml_content(
xml_content: &str,
options: Option<RuntimeOptions>,
config: Option<ForgeConfig>,
) -> ForgeResult<Self> {
let extension_manager = ExtensionManager::from_xml_string(xml_content)?;
let final_options =
Self::merge_options_with_extensions(options, extension_manager);
Self::create_with_config(final_options, config.unwrap_or_default())
.await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
crate_name = "core",
runtime_type = "sync",
has_middleware = !options.get_middleware_stack().is_empty()
)))]
pub async fn create_with_config(
options: RuntimeOptions,
config: ForgeConfig,
) -> ForgeResult<Self> {
let start_time = Instant::now();
info!("正在创建新的编辑器实例");
let extension_manager =
Self::create_extension_manager(&options, &config)?;
debug!("已初始化扩展管理器");
let op_state = GlobalResourceManager::new();
for op_fn in extension_manager.get_op_fns() {
op_fn(&op_state)?;
}
let mut state_config = StateConfig {
schema: Some(extension_manager.get_schema()),
doc: None,
stored_marks: None,
plugins: Some(extension_manager.get_plugins().clone()),
resource_manager: Some(Arc::new(op_state)),
};
create_doc::create_doc(&options.get_content(), &mut state_config)
.await?;
let state: State = State::create(state_config).await?;
let state: Arc<State> = Arc::new(state);
debug!("已创建编辑器状态");
let event_bus = EventHelper::create_and_init_event_bus(
&config,
&options,
state.clone(),
)
.await?;
let initial_transaction = state.tr();
let runtime = ForgeRuntime {
event_bus,
state: state.clone(),
flow_engine: Arc::new(FlowEngine::new()?),
extension_manager,
history_manager: HistoryManager::with_config(
HistoryEntryWithMeta::new(
Arc::new(initial_transaction),
state.clone(),
"创建工程项目".to_string(),
serde_json::Value::Null,
),
config.history.clone(),
),
options,
config,
};
info!("编辑器实例创建成功");
metrics::editor_creation_duration(start_time.elapsed());
Ok(runtime)
}
fn create_extension_manager(
options: &RuntimeOptions,
config: &ForgeConfig,
) -> ForgeResult<ExtensionManager> {
crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
options, config,
)
}
#[cfg_attr(
feature = "dev-tracing",
tracing::instrument(skip(self), fields(crate_name = "core"))
)]
pub async fn destroy(&mut self) -> ForgeResult<()> {
debug!("正在销毁编辑器实例");
EventHelper::destroy_event_bus(&mut self.event_bus).await?;
debug!("编辑器实例销毁成功");
Ok(())
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, event), fields(
crate_name = "core",
event_type = std::any::type_name_of_val(&event)
)))]
pub async fn emit_event(
&mut self,
event: Event,
) -> ForgeResult<()> {
EventHelper::emit_event(&mut self.event_bus, event).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
crate_name = "core",
tr_id = %transaction.id,
middleware_count = self.options.get_middleware_stack().middlewares.len()
)))]
pub async fn run_before_middleware(
&mut self,
transaction: &mut Transaction,
) -> ForgeResult<()> {
MiddlewareHelper::run_before_middleware(
transaction,
&self.options.get_middleware_stack(),
&self.config,
)
.await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, state, transactions), fields(
crate_name = "core",
has_state = state.is_some(),
tr_count = transactions.len(),
middleware_count = self.options.get_middleware_stack().middlewares.len()
)))]
pub async fn run_after_middleware(
&mut self,
state: &mut Option<Arc<State>>,
transactions: &mut Vec<Arc<Transaction>>,
) -> ForgeResult<()> {
debug!("执行后置中间件链");
for middleware in &self.options.get_middleware_stack().middlewares {
let start_time = Instant::now();
let timeout = std::time::Duration::from_millis(
self.config.performance.middleware_timeout_ms,
);
let middleware_result = match tokio::time::timeout(
timeout,
middleware.after_dispatch(state.clone(), transactions),
)
.await
{
Ok(Ok(result)) => {
metrics::middleware_execution_duration(
start_time.elapsed(),
"after",
middleware.name().as_str(),
);
result
},
Ok(Err(e)) => {
return Err(error_utils::middleware_error(format!(
"后置中间件执行失败: {e}"
)));
},
Err(_) => {
return Err(error_utils::middleware_error(format!(
"后置中间件执行超时({}ms)",
self.config.performance.middleware_timeout_ms
)));
},
};
if let Some(mut transaction) = middleware_result {
transaction.commit()?;
let task_result = self
.flow_engine
.submit((self.state.clone(), transaction))
.await;
let Some(ProcessorResult { result: Some(result), .. }) =
task_result.output
else {
return Err(error_utils::state_error(
"附加事务处理结果无效".to_string(),
));
};
*state = Some(result.state);
transactions.extend(result.transactions);
}
}
Ok(())
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
crate_name = "core",
command_name = %command.name()
)))]
pub async fn command(
&mut self,
command: Arc<dyn Command>,
) -> ForgeResult<()> {
debug!("正在执行命令: {}", command.name());
metrics::command_executed(command.name().as_str());
let mut tr = self.get_tr();
command.execute(&mut tr).await?;
tr.commit()?;
self.dispatch(tr).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
crate_name = "core",
command_name = %command.name(),
description = %description
)))]
pub async fn command_with_meta(
&mut self,
command: Arc<dyn Command>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
debug!("正在执行命令: {}", command.name());
metrics::command_executed(command.name().as_str());
let mut tr = self.get_tr();
command.execute(&mut tr).await?;
tr.commit()?;
self.dispatch_with_meta(tr, description, meta).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
crate_name = "core",
tr_id = %transaction.id,
runtime_type = "sync"
)))]
pub async fn dispatch(
&mut self,
transaction: Transaction,
) -> ForgeResult<()> {
self.dispatch_with_meta(
transaction,
"".to_string(),
serde_json::Value::Null,
)
.await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
crate_name = "core",
tr_id = %transaction.id,
description = %description,
runtime_type = "sync"
)))]
pub async fn dispatch_with_meta(
&mut self,
transaction: Transaction,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
metrics::transaction_dispatched();
let _old_id = self.get_state().version;
let mut current_transaction = transaction;
self.run_before_middleware(&mut current_transaction).await?;
let task_result = self
.flow_engine
.submit((self.state.clone(), current_transaction.clone()))
.await;
let Some(ProcessorResult { result: Some(result), .. }) =
task_result.output
else {
return Err(error_utils::state_error(
"任务处理结果无效".to_string(),
));
};
let mut state_update = None;
let mut transactions = Vec::new();
transactions.extend(result.transactions);
if transactions.last().is_some() {
state_update = Some(result.state);
}
self.run_after_middleware(&mut state_update, &mut transactions).await?;
if let Some(new_state) = state_update {
let old_state = self.state.clone();
self.update_state_with_meta(
new_state.clone(),
transactions.clone(),
description,
meta,
)
.await?;
self.emit_event(Event::TrApply {
old_state,
new_state,
transactions,
})
.await?;
}
Ok(())
}
pub async fn update_state(
&mut self,
state: Arc<State>,
) -> ForgeResult<()> {
self.update_state_with_meta(
state,
vec![],
"".to_string(),
serde_json::Value::Null,
)
.await
}
pub async fn update_state_with_meta(
&mut self,
state: Arc<State>,
transactions: Vec<Arc<mf_state::Transaction>>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
self.state = state.clone();
HistoryHelper::insert(
&mut self.history_manager,
state,
transactions,
description,
meta,
);
Ok(())
}
pub async fn register_plugin(&mut self) -> ForgeResult<()> {
info!("正在注册新插件");
let state = self
.get_state()
.reconfigure(StateConfig {
schema: Some(self.get_schema()),
doc: Some(self.get_state().doc()),
stored_marks: None,
plugins: Some(self.get_state().plugins().await),
resource_manager: Some(
self.get_state().resource_manager().clone(),
),
})
.await?;
self.update_state(Arc::new(state)).await?;
info!("插件注册成功");
Ok(())
}
pub async fn unregister_plugin(
&mut self,
plugin_key: String,
) -> ForgeResult<()> {
info!("正在注销插件: {}", plugin_key);
let ps = self
.get_state()
.plugins()
.await
.iter()
.filter(|p| p.key != plugin_key)
.cloned()
.collect();
let state = self
.get_state()
.reconfigure(StateConfig {
schema: Some(self.get_schema().clone()),
doc: Some(self.get_state().doc()),
stored_marks: None,
plugins: Some(ps),
resource_manager: Some(
self.get_state().resource_manager().clone(),
),
})
.await?;
self.update_state(Arc::new(state)).await?;
info!("插件注销成功");
Ok(())
}
pub fn doc(&self) -> Arc<NodePool> {
self.state.doc()
}
pub fn get_options(&self) -> &RuntimeOptions {
&self.options
}
pub fn get_config(&self) -> &ForgeConfig {
&self.config
}
pub fn update_config(
&mut self,
config: ForgeConfig,
) {
self.config = config;
}
pub fn get_state(&self) -> &Arc<State> {
&self.state
}
pub fn get_schema(&self) -> Arc<Schema> {
self.extension_manager.get_schema()
}
pub fn get_event_bus(&self) -> &EventBus<Event> {
&self.event_bus
}
pub fn get_tr(&self) -> Transaction {
self.get_state().tr()
}
pub fn undo(&mut self) {
if let Some(result) =
HistoryHelper::undo(&mut self.history_manager, self.state.clone())
{
self.state = result.new_state.clone();
let _ = self.event_bus.broadcast_blocking(Event::Undo {
old_state: result.old_state,
new_state: result.new_state,
transactions: result.transactions,
});
}
}
pub fn redo(&mut self) {
if let Some(result) =
HistoryHelper::redo(&mut self.history_manager, self.state.clone())
{
self.state = result.new_state.clone();
let _ = self.event_bus.broadcast_blocking(Event::Redo {
old_state: result.old_state,
new_state: result.new_state,
transactions: result.transactions,
});
}
}
pub fn jump(
&mut self,
n: isize,
) {
if let Some(result) = HistoryHelper::jump(
&mut self.history_manager,
self.state.clone(),
n,
) {
self.state = result.new_state.clone();
let _ = self.event_bus.broadcast_blocking(Event::Jump {
old_state: result.old_state,
new_state: result.new_state,
transactions: result.transactions,
steps: n,
});
}
}
pub fn get_history_manager(&self) -> &HistoryManager<HistoryEntryWithMeta> {
&self.history_manager
}
}
impl Drop for ForgeRuntime {
fn drop(&mut self) {
EventHelper::destroy_event_bus_blocking(&mut self.event_bus);
}
}
#[async_trait]
impl RuntimeTrait for ForgeRuntime {
async fn dispatch(
&mut self,
transaction: Transaction,
) -> ForgeResult<()> {
self.dispatch(transaction).await
}
async fn dispatch_with_meta(
&mut self,
transaction: Transaction,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
self.dispatch_with_meta(transaction, description, meta).await
}
async fn command(
&mut self,
command: Arc<dyn Command>,
) -> ForgeResult<()> {
self.command(command).await
}
async fn command_with_meta(
&mut self,
command: Arc<dyn Command>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
self.command_with_meta(command, description, meta).await
}
async fn get_state(&self) -> ForgeResult<Arc<State>> {
Ok(self.get_state().clone())
}
async fn get_tr(&self) -> ForgeResult<Transaction> {
Ok(self.get_tr())
}
async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
Ok(self.get_schema())
}
async fn undo(&mut self) -> ForgeResult<()> {
self.undo();
Ok(())
}
async fn redo(&mut self) -> ForgeResult<()> {
self.redo();
Ok(())
}
async fn jump(
&mut self,
steps: isize,
) -> ForgeResult<()> {
self.jump(steps);
Ok(())
}
fn get_config(&self) -> &ForgeConfig {
self.get_config()
}
fn update_config(
&mut self,
config: ForgeConfig,
) {
self.update_config(config);
}
fn get_options(&self) -> &RuntimeOptions {
self.get_options()
}
async fn destroy(&mut self) -> ForgeResult<()> {
self.destroy().await
}
}