use crate::{
config::{ForgeConfig, RuntimeType, Environment},
debug::info,
runtime::{
adaptive::AdaptiveRuntimeSelector, actor_runtime::ForgeActorRuntime,
async_runtime::ForgeAsyncRuntime, runtime::ForgeRuntime,
runtime_trait::RuntimeTrait, system_detector::SystemResources,
},
types::{RuntimeOptions, Extensions, Content, EditorOptionsBuilder},
ForgeResult,
};
use std::sync::Arc;
#[derive(Default)]
pub struct ForgeRuntimeBuilder {
runtime_type: Option<RuntimeType>,
environment: Option<Environment>,
content: Option<Content>,
extensions: Vec<Extensions>,
history_limit: Option<usize>,
event_handlers: Vec<
Arc<dyn crate::event::EventHandler<crate::event::Event> + Send + Sync>,
>,
max_concurrent_tasks: Option<usize>,
queue_size: Option<usize>,
enable_monitoring: Option<bool>,
middleware_timeout_ms: Option<u64>,
schema_paths: Vec<String>,
full_config: Option<ForgeConfig>,
}
impl ForgeRuntimeBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn runtime_type(
mut self,
runtime_type: RuntimeType,
) -> Self {
self.runtime_type = Some(runtime_type);
self
}
pub fn environment(
mut self,
environment: Environment,
) -> Self {
self.environment = Some(environment);
self
}
pub fn content(
mut self,
content: Content,
) -> Self {
self.content = Some(content);
self
}
pub fn extension(
mut self,
extension: Extensions,
) -> Self {
self.extensions.push(extension);
self
}
pub fn extensions(
mut self,
extensions: Vec<Extensions>,
) -> Self {
self.extensions.extend(extensions);
self
}
pub fn schema_path(
mut self,
path: impl Into<String>,
) -> Self {
self.schema_paths.push(path.into());
self
}
pub fn schema_paths(
mut self,
paths: Vec<String>,
) -> Self {
self.schema_paths.extend(paths);
self
}
pub fn max_concurrent_tasks(
mut self,
count: usize,
) -> Self {
self.max_concurrent_tasks = Some(count);
self
}
pub fn queue_size(
mut self,
size: usize,
) -> Self {
self.queue_size = Some(size);
self
}
pub fn enable_monitoring(
mut self,
enable: bool,
) -> Self {
self.enable_monitoring = Some(enable);
self
}
pub fn middleware_timeout_ms(
mut self,
timeout: u64,
) -> Self {
self.middleware_timeout_ms = Some(timeout);
self
}
pub fn history_limit(
mut self,
limit: usize,
) -> Self {
self.history_limit = Some(limit);
self
}
pub fn event_handler(
mut self,
handler: Arc<
dyn crate::event::EventHandler<crate::event::Event> + Send + Sync,
>,
) -> Self {
self.event_handlers.push(handler);
self
}
pub fn with_config(
mut self,
config: ForgeConfig,
) -> Self {
self.full_config = Some(config);
self
}
pub async fn from_config_file(path: &str) -> ForgeResult<Self> {
let content = tokio::fs::read_to_string(path).await.map_err(|e| {
crate::error::error_utils::storage_error(format!(
"Failed to read config file: {}",
e
))
})?;
let config: ForgeConfig =
serde_json::from_str(&content).map_err(|e| {
crate::error::error_utils::config_error(format!(
"Failed to parse JSON config: {}",
e
))
})?;
Ok(Self::new().with_config(config))
}
pub async fn build(self) -> ForgeResult<AnyRuntime> {
let (config, options) = self.build_config_and_options().await?;
let runtime_type = match config.runtime.runtime_type {
RuntimeType::Auto => {
let resources = SystemResources::detect();
let selected =
AdaptiveRuntimeSelector::select_runtime(&resources);
info!(
"🖥️ 系统资源: {} 核心 / {} 线程, {} GB 内存 ({})",
resources.cpu_cores,
resources.cpu_threads,
resources.total_memory_mb / 1024,
resources.tier_description()
);
info!("⚡ 自动选择运行时: {:?}", selected);
selected
},
rt => {
info!("⚡ 使用指定运行时: {:?}", rt);
rt
},
};
Self::create_runtime(runtime_type, options, config).await
}
async fn build_config_and_options(
self
) -> ForgeResult<(ForgeConfig, RuntimeOptions)> {
let mut config = self.full_config.unwrap_or_else(|| {
match self.environment {
Some(env) => ForgeConfig::for_environment(env),
None => {
let resources = SystemResources::detect();
AdaptiveRuntimeSelector::generate_config(&resources)
},
}
});
if let Some(rt) = self.runtime_type {
config.runtime.runtime_type = rt;
}
if let Some(tasks) = self.max_concurrent_tasks {
config.processor.max_concurrent_tasks = tasks;
}
if let Some(size) = self.queue_size {
config.processor.max_queue_size = size;
}
if let Some(enable) = self.enable_monitoring {
config.performance.enable_monitoring = enable;
}
if let Some(timeout) = self.middleware_timeout_ms {
config.performance.middleware_timeout_ms = timeout;
}
if !self.schema_paths.is_empty() {
config.extension.xml_schema_paths = self.schema_paths;
}
let mut options_builder = EditorOptionsBuilder::new();
if let Some(content) = self.content {
options_builder = options_builder.content(content);
}
options_builder = options_builder.extensions(self.extensions);
if let Some(limit) = self.history_limit {
options_builder = options_builder.history_limit(limit);
}
options_builder = options_builder.event_handlers(self.event_handlers);
let options = options_builder.build();
Ok((config, options))
}
async fn create_runtime(
runtime_type: RuntimeType,
options: RuntimeOptions,
config: ForgeConfig,
) -> ForgeResult<AnyRuntime> {
match runtime_type {
RuntimeType::Sync => {
let runtime =
ForgeRuntime::create_with_config(options, config).await?;
Ok(AnyRuntime::Sync(runtime))
},
RuntimeType::Async => {
let runtime =
ForgeAsyncRuntime::create_with_config(options, config)
.await?;
Ok(AnyRuntime::Async(runtime))
},
RuntimeType::Actor => {
let runtime =
ForgeActorRuntime::create_with_config(options, config)
.await?;
Ok(AnyRuntime::Actor(runtime))
},
RuntimeType::Auto => {
unreachable!("Auto should be resolved before this point")
},
}
}
}
pub enum AnyRuntime {
Sync(ForgeRuntime),
Async(ForgeAsyncRuntime),
Actor(ForgeActorRuntime),
}
impl AnyRuntime {
pub fn runtime_type(&self) -> RuntimeType {
match self {
Self::Sync(_) => RuntimeType::Sync,
Self::Async(_) => RuntimeType::Async,
Self::Actor(_) => RuntimeType::Actor,
}
}
pub fn as_sync(&self) -> Option<&ForgeRuntime> {
match self {
Self::Sync(rt) => Some(rt),
_ => None,
}
}
pub fn as_async(&self) -> Option<&ForgeAsyncRuntime> {
match self {
Self::Async(rt) => Some(rt),
_ => None,
}
}
pub fn as_actor(&self) -> Option<&ForgeActorRuntime> {
match self {
Self::Actor(rt) => Some(rt),
_ => None,
}
}
pub fn as_sync_mut(&mut self) -> Option<&mut ForgeRuntime> {
match self {
Self::Sync(rt) => Some(rt),
_ => None,
}
}
pub fn as_async_mut(&mut self) -> Option<&mut ForgeAsyncRuntime> {
match self {
Self::Async(rt) => Some(rt),
_ => None,
}
}
pub fn as_actor_mut(&mut self) -> Option<&mut ForgeActorRuntime> {
match self {
Self::Actor(rt) => Some(rt),
_ => None,
}
}
}
impl AnyRuntime {
pub async fn dispatch(
&mut self,
transaction: mf_state::Transaction,
) -> ForgeResult<()> {
match self {
Self::Sync(rt) => rt.dispatch(transaction).await,
Self::Async(rt) => rt.dispatch(transaction).await,
Self::Actor(rt) => rt.dispatch(transaction).await,
}
}
pub async fn dispatch_with_meta(
&mut self,
transaction: mf_state::Transaction,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
match self {
Self::Sync(rt) => {
rt.dispatch_with_meta(transaction, description, meta).await
},
Self::Async(rt) => {
rt.dispatch_with_meta(transaction, description, meta).await
},
Self::Actor(rt) => {
rt.dispatch_with_meta(transaction, description, meta).await
},
}
}
pub async fn command(
&mut self,
command: Arc<dyn mf_state::transaction::Command>,
) -> ForgeResult<()> {
match self {
Self::Sync(rt) => rt.command(command).await,
Self::Async(rt) => rt.command(command).await,
Self::Actor(rt) => rt.command(command).await,
}
}
pub async fn command_with_meta(
&mut self,
command: Arc<dyn mf_state::transaction::Command>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
match self {
Self::Sync(rt) => {
rt.command_with_meta(command, description, meta).await
},
Self::Async(rt) => {
rt.command_with_meta(command, description, meta).await
},
Self::Actor(rt) => {
rt.command_with_meta(command, description, meta).await
},
}
}
pub async fn get_state(&self) -> ForgeResult<Arc<mf_state::State>> {
match self {
Self::Sync(rt) => Ok(Arc::clone(rt.get_state())),
Self::Async(rt) => rt.get_state().await,
Self::Actor(rt) => rt.get_state().await,
}
}
pub async fn get_tr(&self) -> ForgeResult<mf_state::Transaction> {
match self {
Self::Sync(rt) => Ok(rt.get_tr()),
Self::Async(rt) => rt.get_tr().await,
Self::Actor(rt) => rt.get_tr().await,
}
}
pub async fn doc(&self) -> ForgeResult<Arc<mf_model::NodePool>> {
match self {
Self::Sync(rt) => Ok(rt.doc()),
Self::Async(rt) => rt.doc().await,
Self::Actor(rt) => rt.doc().await,
}
}
pub async fn schema(&self) -> ForgeResult<Arc<mf_model::Schema>> {
match self {
Self::Sync(rt) => Ok(rt.get_schema()),
Self::Async(rt) => rt.get_schema().await,
Self::Actor(rt) => rt.get_schema().await,
}
}
pub async fn undo(&mut self) -> ForgeResult<()> {
match self {
Self::Sync(rt) => {
rt.undo();
Ok(())
},
Self::Async(rt) => rt.undo().await,
Self::Actor(rt) => rt.undo().await,
}
}
pub async fn redo(&mut self) -> ForgeResult<()> {
match self {
Self::Sync(rt) => {
rt.redo();
Ok(())
},
Self::Async(rt) => rt.redo().await,
Self::Actor(rt) => rt.redo().await,
}
}
pub async fn jump(
&mut self,
steps: isize,
) -> ForgeResult<()> {
match self {
Self::Sync(rt) => {
rt.jump(steps);
Ok(())
},
Self::Async(rt) => rt.jump(steps).await,
Self::Actor(rt) => rt.jump(steps).await,
}
}
pub fn get_config(&self) -> &crate::config::ForgeConfig {
match self {
Self::Sync(rt) => rt.get_config(),
Self::Async(rt) => rt.get_config(),
Self::Actor(rt) => rt.get_config(),
}
}
pub fn update_config(
&mut self,
config: crate::config::ForgeConfig,
) {
match self {
Self::Sync(rt) => rt.update_config(config),
Self::Async(rt) => rt.update_config(config),
Self::Actor(rt) => rt.update_config(config),
}
}
pub fn get_options(&self) -> crate::types::RuntimeOptions {
match self {
Self::Sync(rt) => rt.get_options().clone(),
Self::Async(rt) => rt.get_options().clone(),
Self::Actor(rt) => rt.get_options(),
}
}
pub async fn destroy(&mut self) -> ForgeResult<()> {
match self {
Self::Sync(rt) => rt.destroy().await,
Self::Async(rt) => rt.destroy().await,
Self::Actor(rt) => rt.destroy().await,
}
}
}