use clap::Parser;
use mlua_swarm::blueprint::store::{
blueprint_version, BlueprintId, BlueprintStore, CommitMetadata, Git2BlueprintStore,
InMemoryBlueprintStore,
};
use mlua_swarm::blueprint::{
current_schema_version, AgentDef, AgentKind, Blueprint, BlueprintMetadata, BlueprintOrigin,
CompilerHints, CompilerStrategy,
};
use mlua_swarm::store::enhance_setting::{
EnhanceSettingId, EnhanceSettingStore, InMemoryEnhanceSettingStore,
};
use mlua_swarm::store::issue::{InMemoryIssueStore, IssueStore};
use mlua_swarm::{
Compiler, Engine, EngineCfg, EnhanceApplication, EnhanceApplicationConfig, Role,
TaskLaunchService,
};
use mlua_swarm::{
OperatorSpawnerFactory, RustFnInProcessSpawnerFactory, SpawnerRegistry,
SubprocessProcessSpawnerFactory,
};
use mlua_swarm_server::{
build_blueprints_router_with_refs, build_enhance_log_router, build_enhance_settings_router,
build_issues_router, build_router_with_ws_factory, default_layer_registry,
default_registry_with_enhance_flow,
doctor::{build_doctor_router, DoctorInfo},
};
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
#[derive(Parser, Debug)]
#[command(about = "Run the HTTP server (mse serve).")]
pub struct Args {
#[arg(long)]
config: Option<std::path::PathBuf>,
#[arg(long)]
bind: Option<String>,
#[arg(long)]
token_secret: Option<String>,
#[arg(long)]
seed_blueprint_id: Option<String>,
#[arg(long)]
git_store_path: Option<std::path::PathBuf>,
#[arg(long)]
enable_enhance_flow: bool,
#[arg(long)]
blueprint_ref_base: Option<std::path::PathBuf>,
#[arg(long)]
default_agent_kind: Option<String>,
}
fn parse_agent_kind_cli(s: &str) -> Result<mlua_swarm::blueprint::AgentKind, String> {
serde_json::from_value(serde_json::Value::String(s.to_string()))
.map_err(|e| format!("invalid --default-agent-kind {s:?}: {e}"))
}
pub async fn run(args: Args) -> anyhow::Result<()> {
let config_path = args
.config
.clone()
.unwrap_or_else(mlua_swarm_server::config::default_config_path);
let file_config = mlua_swarm_server::config::load_file_config(&config_path)
.unwrap_or_else(|e| panic!("mse serve: config load failed: {e}"));
let cli_overrides = mlua_swarm_server::config::CliOverrides {
bind: args.bind.clone(),
enable_enhance_flow: if args.enable_enhance_flow {
Some(true)
} else {
None
},
blueprint_ref_base: args.blueprint_ref_base.clone(),
git_store_path: args.git_store_path.clone(),
seed_blueprint_id: args.seed_blueprint_id.clone(),
default_agent_kind: args.default_agent_kind.clone(),
token_secret: args.token_secret.clone(),
};
let cfg = mlua_swarm_server::config::resolve(cli_overrides, file_config)
.unwrap_or_else(|e| panic!("mse serve: config resolve failed: {e}"));
let default_agent_kind: Option<mlua_swarm::blueprint::AgentKind> = cfg
.default_agent_kind
.as_ref()
.map(|s| parse_agent_kind_cli(s).unwrap_or_else(|e| panic!("mse serve: {e}")));
eprintln!("mse serve: config loaded from {}", config_path.display());
let make_cfg = || {
let mut c = EngineCfg::default();
if let Some(hex_secret) = &cfg.token_secret {
c.token_secret = hex::decode(hex_secret).expect("token-secret must be hex");
}
c
};
let engine = Engine::new_with_layers(make_cfg(), default_layer_registry());
let store: Arc<dyn BlueprintStore> = match &cfg.git_store_path {
Some(root) => {
let bp_root = root.join("blueprints");
let s = Git2BlueprintStore::open_or_init(&bp_root).expect("git store open_or_init");
eprintln!(
"mse serve: blueprint store = Git2 root={} (per-id repos)",
bp_root.display()
);
Arc::new(s)
}
None => {
eprintln!("mse serve: blueprint store = InMemory (volatile)");
Arc::new(InMemoryBlueprintStore::new())
}
};
let id = BlueprintId::new(cfg.seed_blueprint_id.clone());
let need_seed = store.read_head(&id).await.is_err();
if need_seed {
let bp = seed_blueprint(&cfg.seed_blueprint_id);
let v0 = blueprint_version(&bp).expect("blueprint_version");
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
store
.write_new(&id, &bp, &[], CommitMetadata::seed(id.clone(), v0, now_ms))
.await
.expect("seed write");
eprintln!("mse serve: seeded blueprint_id={}", id.as_str());
} else {
eprintln!("mse serve: existing head found, skip seed");
}
let op_factory = Arc::new(OperatorSpawnerFactory::new());
let make_registry = || -> SpawnerRegistry {
let mut reg = if cfg.enable_enhance_flow {
default_registry_with_enhance_flow()
} else {
let rustfn_factory = mlua_swarm::worker::baseline::extend_with_baseline(
RustFnInProcessSpawnerFactory::new(),
);
let mut r = SpawnerRegistry::new();
r.register::<SubprocessProcessSpawnerFactory>(Arc::new(
SubprocessProcessSpawnerFactory,
));
r.register::<RustFnInProcessSpawnerFactory>(Arc::new(rustfn_factory));
r.register::<OperatorSpawnerFactory>(op_factory.clone());
r
};
reg.register::<OperatorSpawnerFactory>(op_factory.clone());
reg
};
let mut app = build_router_with_ws_factory(
engine.clone(),
make_registry(),
Some(store.clone()),
Some(op_factory.clone()),
);
let setting_store: Arc<dyn EnhanceSettingStore> = Arc::new(InMemoryEnhanceSettingStore::new());
let issue_store: Arc<dyn IssueStore> = Arc::new(InMemoryIssueStore::new());
let log_store: Arc<dyn mlua_swarm::store::enhance_log::EnhanceLogStore> =
Arc::new(mlua_swarm::store::enhance_log::InMemoryEnhanceLogStore::new());
let compiler = Compiler::new(make_registry());
let launch_enhance = Arc::new(TaskLaunchService::new(engine.clone(), compiler));
let enhance_app = Arc::new(EnhanceApplication::new(
EnhanceApplicationConfig {
name: "enhance".into(),
setting_id: EnhanceSettingId::default_id(),
operator_id: "mse-enhance".into(),
role: Role::Operator,
},
issue_store.clone(),
setting_store.clone(),
store.clone(),
log_store.clone(),
launch_enhance,
));
let enhance_loop = tokio::spawn(enhance_app.clone().run_forever(Duration::from_millis(100)));
let doctor_info = DoctorInfo {
bind: cfg.bind.to_string(),
blueprint_backend: if cfg.git_store_path.is_some() {
"git2".into()
} else {
"in_memory".into()
},
blueprint_store_root: cfg
.git_store_path
.as_ref()
.map(|p| p.join("blueprints").display().to_string()),
blueprint_ref_base: cfg
.blueprint_ref_base
.as_ref()
.map(|p| p.display().to_string()),
enhance_flow_enabled: cfg.enable_enhance_flow,
seed_blueprint_id: cfg.seed_blueprint_id.clone(),
};
app = app
.merge(build_issues_router(issue_store.clone()))
.merge(build_blueprints_router_with_refs(
store.clone(),
cfg.blueprint_ref_base.clone(),
default_agent_kind,
))
.merge(build_enhance_log_router(log_store.clone()))
.merge(build_enhance_settings_router(
setting_store.clone(),
store.clone(),
))
.merge(build_doctor_router(doctor_info, store.clone()));
let _ = id;
eprintln!(
"mse serve: combined mode (task+enhance+operator) listening on http://{}",
cfg.bind
);
let listener = tokio::net::TcpListener::bind(cfg.bind).await.expect("bind");
let serve = axum::serve(listener, app);
tokio::select! {
r = serve => { r.expect("serve"); }
_ = tokio::signal::ctrl_c() => { eprintln!("mse serve: ctrl-c, shutting down"); }
_ = wait_sigterm() => { eprintln!("mse serve: SIGTERM, shutting down"); }
}
enhance_loop.abort();
Ok(())
}
async fn wait_sigterm() {
use tokio::signal::unix::{signal, SignalKind};
match signal(SignalKind::terminate()) {
Ok(mut sig) => {
sig.recv().await;
}
Err(e) => {
eprintln!("mse serve: failed to install SIGTERM handler: {e}");
std::future::pending::<()>().await;
}
}
}
fn seed_blueprint(id: &str) -> Blueprint {
Blueprint {
schema_version: current_schema_version(),
id: id.into(),
flow: serde_json::from_value(json!({
"kind": "step",
"ref": mlua_swarm::worker::baseline::AG_IDENTITY,
"in": {"op": "lit", "value": "hello"},
"out": {"op": "path", "at": "$.out"},
}))
.unwrap(),
agents: vec![AgentDef {
name: mlua_swarm::worker::baseline::AG_IDENTITY.into(),
kind: AgentKind::RustFn,
spec: json!({"fn_id": mlua_swarm::worker::baseline::AG_IDENTITY}),
profile: None,
meta: None,
}],
operators: vec![],
hints: CompilerHints::default(),
strategy: CompilerStrategy::default(),
metadata: BlueprintMetadata {
description: Some("mse serve enhance seed".into()),
origin: BlueprintOrigin::Inline,
tags: vec![],
version_label: Some("0.1.0".into()),
project_name_alias: None,
default_run_ttl_secs: None,
},
spawner_hints: Default::default(),
default_agent_kind: AgentKind::Operator,
default_operator_kind: None,
}
}