mod operator_client;
mod resources;
mod server_control;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use mlua_swarm::application::{BlueprintRef, TaskApplication, TaskApplicationInput};
use mlua_swarm::blueprint::store::{BlueprintId, BlueprintStore, InMemoryBlueprintStore};
use mlua_swarm::blueprint::Blueprint;
use mlua_swarm::{Application, Compiler, Engine, EngineCfg, OperatorKind, Role, TaskLaunchService};
use operator_client::{ClientError, OperatorClientState};
use rmcp::handler::server::wrapper::Parameters;
use rmcp::model::{
AnnotateAble, CallToolResult, Content, ListResourcesResult, PaginatedRequestParams,
RawResource, ReadResourceRequestParams, ReadResourceResult, ResourceContents,
ServerCapabilities, ServerInfo,
};
use rmcp::service::RequestContext;
use rmcp::{
tool, tool_handler, tool_router, ErrorData as McpError, RoleServer, ServerHandler, ServiceExt,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use tokio::sync::RwLock;
use uuid::Uuid;
#[allow(dead_code)]
#[derive(Clone, Debug)]
struct RunHandle {
run_id: String,
status: RunStatus,
}
#[allow(dead_code)]
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "snake_case")]
enum RunStatus {
Pending,
Running,
Done,
Cancelled,
Failed,
}
struct Inner {
runs: HashMap<String, RunHandle>,
task_app: Arc<TaskApplication>,
store: Arc<dyn BlueprintStore>,
}
#[derive(Clone)]
struct MseServer {
state: Arc<RwLock<Inner>>,
op_client: Arc<OperatorClientState>,
}
impl MseServer {
fn new() -> Self {
let engine = Engine::new(EngineCfg::default());
let registry = mlua_swarm_server::default_registry();
let store: Arc<dyn BlueprintStore> = Arc::new(InMemoryBlueprintStore::new());
let compiler = Compiler::new(registry);
let launch = Arc::new(TaskLaunchService::new(engine, compiler));
let task_app = Arc::new(TaskApplication::new(launch, store.clone()));
Self {
state: Arc::new(RwLock::new(Inner {
runs: HashMap::new(),
task_app,
store,
})),
op_client: Arc::new(OperatorClientState::new()),
}
}
}
fn client_error_to_mcp(e: ClientError) -> McpError {
match e {
ClientError::UnknownSid(_) | ClientError::InvalidAckKind(_) => {
McpError::invalid_params(e.to_string(), None)
}
ClientError::Http(_) | ClientError::Ws(_) => McpError::internal_error(e.to_string(), None),
}
}
#[derive(Deserialize, JsonSchema)]
struct DoctorReq {
#[serde(default)]
bind: Option<String>,
}
fn default_true_bool() -> bool {
true
}
#[derive(Deserialize, JsonSchema)]
struct BpArchiveReq {
id: String,
#[serde(default)]
bind: Option<String>,
#[serde(default)]
confirm: bool,
}
#[derive(Deserialize, JsonSchema)]
struct BpSchemaReq {}
#[derive(Deserialize, JsonSchema)]
struct BpUnarchiveReq {
id: String,
#[serde(default)]
bind: Option<String>,
}
#[derive(Deserialize, JsonSchema)]
struct ServerStartReq {
#[serde(default)]
bind: Option<String>,
}
#[derive(Deserialize, JsonSchema)]
struct ServerStatusReq {
#[serde(default)]
bind: Option<String>,
}
#[derive(Deserialize, JsonSchema)]
struct ServerShutdownReq {
#[serde(default)]
bind: Option<String>,
}
#[derive(Deserialize, JsonSchema)]
struct ServerRestartReq {
#[serde(default)]
bind: Option<String>,
}
#[derive(Deserialize, JsonSchema)]
struct OperatorJoinReq {
#[serde(default)]
roles: Option<Vec<String>>,
}
#[derive(Deserialize, JsonSchema)]
struct OperatorPendingWaitReq {
sid: String,
#[serde(default)]
timeout_ms: Option<u64>,
}
#[derive(Deserialize, JsonSchema)]
struct OperatorAckReq {
sid: String,
req_id: String,
kind: String,
#[serde(default)]
value: Option<JsonValue>,
#[serde(default = "default_true_bool")]
ok: bool,
#[serde(default)]
error: Option<String>,
}
#[derive(Deserialize, JsonSchema)]
struct OperatorLeaveReq {
sid: String,
}
#[derive(Deserialize, JsonSchema)]
struct SwarmRunReq {
blueprint: JsonValue,
#[serde(default)]
init_ctx: Option<JsonValue>,
#[serde(default)]
timeout_secs: Option<u64>,
#[serde(default)]
operator_id: Option<String>,
#[serde(default)]
operator_kind: Option<String>,
#[serde(default)]
operator_kind_overrides: Option<HashMap<String, String>>,
}
fn parse_operator_kind_str(s: &str) -> Result<OperatorKind, McpError> {
match s {
"main_ai" => Ok(OperatorKind::MainAi),
"composite" => Ok(OperatorKind::Composite),
"automate" => Ok(OperatorKind::Automate),
other => Err(McpError::invalid_params(
format!("operator_kind: unknown value '{other}' (expected main_ai|automate|composite)"),
None,
)),
}
}
#[derive(Deserialize, JsonSchema)]
struct SwarmStatusReq {
run_id: String,
}
#[derive(Deserialize, JsonSchema)]
struct SwarmCancelReq {
run_id: String,
}
#[tool_router]
impl MseServer {
#[tool(
description = "Join as an Operator session: POST /v1/operators (mint sid+token) then connect WS /v1/operators/:sid/ws with the returned Bearer token. The token stays process-local (never returned to the caller). Returns {sid, roles}. Use `sid` with mse_pending_wait / mse_ack / mse_operator_leave."
)]
async fn mse_operator_join(
&self,
Parameters(req): Parameters<OperatorJoinReq>,
) -> Result<CallToolResult, McpError> {
let roles = req.roles.unwrap_or_default();
let (sid, roles) = self
.op_client
.join(roles)
.await
.map_err(client_error_to_mcp)?;
json_result(&serde_json::json!({ "sid": sid, "roles": roles }))
}
#[tool(
description = "Pop one pending server frame (ask / hook_before / hook_after / spawn) for `sid`, waiting up to `timeout_ms` (default 30000) if the queue is empty. Returns {timed_out, req_id?, type?, payload?} on delivery — `type` mirrors the server's ServerMsg discriminant, `payload` carries the remaining frame fields verbatim. Returns {timed_out: true} on timeout. Reply via mse_ack with a matching `kind`."
)]
async fn mse_pending_wait(
&self,
Parameters(req): Parameters<OperatorPendingWaitReq>,
) -> Result<CallToolResult, McpError> {
let timeout_ms = req.timeout_ms.unwrap_or(30_000);
let frame = self
.op_client
.pending_wait(&req.sid, timeout_ms)
.await
.map_err(client_error_to_mcp)?;
match frame {
Some(f) => json_result(&serde_json::json!({
"timed_out": false,
"req_id": f.req_id,
"type": f.kind,
"payload": f.payload,
})),
None => json_result(&serde_json::json!({ "timed_out": true })),
}
}
#[tool(
description = "Ack a pending frame popped via mse_pending_wait. kind=\"answer\" (SeniorBridge.ask reply, pass `value`), kind=\"hook_ack\" (SpawnHook.before OK/NG, pass `ok` + optional `error` as the rejection reason), kind=\"spawn_ack\" (Operator.execute result, pass `value` + `ok` + optional `error`). Sends the corresponding ClientMsg over the sid's WS connection. Returns {sent: true}."
)]
async fn mse_ack(
&self,
Parameters(req): Parameters<OperatorAckReq>,
) -> Result<CallToolResult, McpError> {
self.op_client
.ack(
&req.sid, req.req_id, &req.kind, req.value, req.ok, req.error,
)
.await
.map_err(client_error_to_mcp)?;
json_result(&serde_json::json!({ "sent": true }))
}
#[tool(
description = "Leave an Operator session: DELETE /v1/operators/:sid (Bearer), abort the WS reader task, and drop the local sid entry. Returns {removed: true}."
)]
async fn mse_operator_leave(
&self,
Parameters(req): Parameters<OperatorLeaveReq>,
) -> Result<CallToolResult, McpError> {
self.op_client
.leave(&req.sid)
.await
.map_err(client_error_to_mcp)?;
json_result(&serde_json::json!({ "removed": true }))
}
#[tool(
description = "Run a Blueprint to completion via TaskApplication.handle. Blocking. Returns run_id + final_ctx + bound_version."
)]
async fn swarm_run(
&self,
Parameters(req): Parameters<SwarmRunReq>,
) -> Result<CallToolResult, McpError> {
let run_id = Uuid::new_v4().to_string();
let ttl = Duration::from_secs(req.timeout_secs.unwrap_or(300));
let task_app = {
let mut inner = self.state.write().await;
inner.runs.insert(
run_id.clone(),
RunHandle {
run_id: run_id.clone(),
status: RunStatus::Running,
},
);
inner.task_app.clone()
};
let blueprint: Blueprint = match serde_json::from_value(req.blueprint) {
Ok(b) => b,
Err(e) => {
let body = serde_json::json!({
"run_id": run_id,
"status": "failed",
"error": format!(
"blueprint decode failed: {} (hint: call the bp_schema tool for the Blueprint JSON Schema)",
e
),
});
let mut inner = self.state.write().await;
if let Some(h) = inner.runs.get_mut(&run_id) {
h.status = RunStatus::Failed;
}
drop(inner);
return json_result(&body);
}
};
let blueprint_id_str = blueprint.id.clone();
let operator_kind = req
.operator_kind
.as_deref()
.map(parse_operator_kind_str)
.transpose()?;
let mut operator_kind_overrides: HashMap<String, OperatorKind> = HashMap::new();
for (agent, kind_str) in req.operator_kind_overrides.unwrap_or_default() {
operator_kind_overrides.insert(agent, parse_operator_kind_str(&kind_str)?);
}
let input = TaskApplicationInput {
blueprint: BlueprintRef::Inline {
value: Box::new(blueprint),
},
operator_id: req.operator_id.unwrap_or_else(|| "mcp-run".into()),
role: Role::Operator,
ttl,
init_ctx: req.init_ctx.unwrap_or_else(|| serde_json::json!({})),
operator_kind,
bridge_id: None,
hook_id: None,
operator_backend_id: None,
operator_kind_overrides,
};
let exec = task_app.handle(input);
let result = tokio::time::timeout(ttl, exec).await;
let bp_id = BlueprintId::new(blueprint_id_str.clone());
let store = {
let inner = self.state.read().await;
inner.store.clone()
};
let head_id: Option<String> = match store.read_head(&bp_id).await {
Ok(_traced) => Some(blueprint_id_str.clone()),
Err(_) => None,
};
let history_len: usize = store
.history(&bp_id, 100)
.await
.map(|v| v.len())
.unwrap_or(0);
let log_tail: Vec<JsonValue> = Vec::new();
let (status, body) = match result {
Ok(Ok(out)) => (
RunStatus::Done,
serde_json::json!({
"run_id": run_id,
"status": "done",
"final_ctx": out.final_ctx,
"bound_version": out.bound_version.map(|v| format!("{:?}", v)),
"head": head_id,
"history_len": history_len,
"log_tail": log_tail,
}),
),
Ok(Err(e)) => (
RunStatus::Failed,
serde_json::json!({
"run_id": run_id,
"status": "failed",
"error": e.to_string(),
"head": head_id,
"history_len": history_len,
"log_tail": log_tail,
}),
),
Err(_) => (
RunStatus::Failed,
serde_json::json!({
"run_id": run_id,
"status": "failed",
"error": format!("timeout after {}s", ttl.as_secs()),
"head": head_id,
"history_len": history_len,
"log_tail": log_tail,
}),
),
};
{
let mut inner = self.state.write().await;
if let Some(h) = inner.runs.get_mut(&run_id) {
h.status = status;
}
}
json_result(&body)
}
#[tool(description = "Peek at a known run by run_id. Returns status snapshot.")]
async fn swarm_status(
&self,
Parameters(req): Parameters<SwarmStatusReq>,
) -> Result<CallToolResult, McpError> {
let inner = self.state.read().await;
match inner.runs.get(&req.run_id) {
Some(h) => json_result(&serde_json::json!({
"run_id": h.run_id,
"status": h.status,
})),
None => Err(McpError::invalid_params(
format!("run_id not found: {}", req.run_id),
None,
)),
}
}
#[tool(
description = "Archive a Blueprint (logical soft-delete via marker commit; reversible via bp_unarchive). Appends `archive: true` marker to head, filters id from list_ids default, and hard-rejects downstream resolvers with Archived. Safety: pass confirm=true to execute, otherwise returns dry-run report. Wraps DELETE /v1/blueprints/:id (path preserved for client compat; behavior is archive)."
)]
async fn bp_archive(
&self,
Parameters(req): Parameters<BpArchiveReq>,
) -> Result<CallToolResult, McpError> {
let bind = req
.bind
.unwrap_or_else(|| server_control::DEFAULT_BIND.to_string());
if !req.confirm {
return json_result(&serde_json::json!({
"status": "dry_run",
"id": req.id,
"bind": bind,
"note": "Pass confirm=true to archive. Reversible via bp_unarchive (marker commit; audit-trail preserved).",
}));
}
let url = format!("http://{bind}/v1/blueprints/{}", req.id);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|e| McpError::internal_error(format!("client build: {e}"), None))?;
let resp = client
.delete(&url)
.send()
.await
.map_err(|e| McpError::internal_error(format!("archive: {e}"), None))?;
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
json_result(&serde_json::json!({
"status": if status.is_success() { "archived" } else { "error" },
"http_status": status.as_u16(),
"id": req.id,
"bind": bind,
"body": body,
}))
}
#[tool(
description = "Return the Blueprint JSON Schema (schemars-generated from mlua-swarm-blueprint-schema types). Use it before authoring / registering a BP, or when a register / swarm_run parse error points here. Note: the `flow` field is opaque in the schema (flow.ir Node grammar is owned by the mlua-flow-ir crate). Identical body to the `mse://api/blueprint-schema` resource."
)]
async fn bp_schema(
&self,
Parameters(_req): Parameters<BpSchemaReq>,
) -> Result<CallToolResult, McpError> {
let body = resources::blueprint_schema_value()
.map_err(|e| McpError::internal_error(format!("schema serialize: {e}"), None))?;
json_result(&body)
}
#[tool(
description = "Unarchive a Blueprint — reverse of bp_archive. Appends `archive: false` marker commit to head, re-exposing the id to list_ids / read_head / swarm_run. Wraps POST /v1/blueprints/:id/unarchive."
)]
async fn bp_unarchive(
&self,
Parameters(req): Parameters<BpUnarchiveReq>,
) -> Result<CallToolResult, McpError> {
let bind = req
.bind
.unwrap_or_else(|| server_control::DEFAULT_BIND.to_string());
let url = format!("http://{bind}/v1/blueprints/{}/unarchive", req.id);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|e| McpError::internal_error(format!("client build: {e}"), None))?;
let resp = client
.post(&url)
.send()
.await
.map_err(|e| McpError::internal_error(format!("unarchive: {e}"), None))?;
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
json_result(&serde_json::json!({
"status": if status.is_success() { "unarchived" } else { "error" },
"http_status": status.as_u16(),
"id": req.id,
"bind": bind,
"body": body,
}))
}
#[tool(
description = "Doctor snapshot: mse mcp self state (in-process store = InMemory ephemeral) + server-side config (backend / store root / ref_base / registered BP list) fetched from GET /v1/doctor. Answers 'where is the store?' and 'how many BPs are registered?' in a single call."
)]
async fn mse_doctor(
&self,
Parameters(req): Parameters<DoctorReq>,
) -> Result<CallToolResult, McpError> {
let bind = req
.bind
.unwrap_or_else(|| server_control::DEFAULT_BIND.to_string());
let server_status = server_control::status(&bind).await;
let server_up = server_status.up;
let server_info: JsonValue = if server_up {
let url = format!("http://{bind}/v1/doctor");
match reqwest::Client::builder()
.timeout(Duration::from_secs(3))
.build()
{
Ok(client) => match client.get(&url).send().await {
Ok(r) => r.json::<JsonValue>().await.unwrap_or_else(
|e| serde_json::json!({"error": format!("doctor decode: {e}")}),
),
Err(e) => serde_json::json!({"error": format!("doctor fetch: {e}")}),
},
Err(e) => serde_json::json!({"error": format!("client build: {e}")}),
}
} else {
serde_json::json!({"note": "mse serve down; start via mlua_swarm_server_start"})
};
let run_count = self.state.read().await.runs.len();
let body = serde_json::json!({
"mse_mcp": {
"in_process_blueprint_store": "InMemory (ephemeral, mse mcp process-local)",
"in_flight_run_count": run_count,
"note": "The mse mcp in-process store is dedicated to swarm_run(Inline). The register path uses a separate store on the HTTP server side (POST /v1/blueprints/:id).",
},
"mlua_swarm_server": {
"bind": bind,
"up": server_up,
"launchd_state": server_status.launchd_state,
"launchd_pid": server_status.launchd_pid,
"doctor": server_info,
},
});
json_result(&body)
}
#[tool(
description = "Start mse serve via `launchctl kickstart gui/<uid>/com.mse.server`, then healthz-polls up to 30s. No-op if healthz is already up. Server settings come from ~/.mse/config.toml, not this call. Returns {status: already_running|started, bind}. Errors with install instructions if the launchd job is not bootstrapped yet."
)]
async fn mlua_swarm_server_start(
&self,
Parameters(req): Parameters<ServerStartReq>,
) -> Result<CallToolResult, McpError> {
let bind = req
.bind
.unwrap_or_else(|| server_control::DEFAULT_BIND.to_string());
match server_control::start(&bind).await {
Ok(outcome) => json_result(&outcome),
Err(e) => Err(McpError::internal_error(e, None)),
}
}
#[tool(
description = "Report mse serve state: healthz + a `launchctl print gui/<uid>/com.mse.server` summary (state / pid / last exit code). Returns {bind, up, launchd_state, launchd_pid, launchd_last_exit_code}."
)]
async fn mlua_swarm_server_status(
&self,
Parameters(req): Parameters<ServerStatusReq>,
) -> Result<CallToolResult, McpError> {
let bind = req
.bind
.unwrap_or_else(|| server_control::DEFAULT_BIND.to_string());
let out = server_control::status(&bind).await;
json_result(&out)
}
#[tool(
description = "Fully stop mse serve via `launchctl bootout gui/<uid>/com.mse.server` (unloads the job; KeepAlive will not restart it until the next `mlua_swarm_server_start` / `mlua_swarm_server_restart`). Returns {bind, stopped}."
)]
async fn mlua_swarm_server_shutdown(
&self,
Parameters(req): Parameters<ServerShutdownReq>,
) -> Result<CallToolResult, McpError> {
let bind = req
.bind
.unwrap_or_else(|| server_control::DEFAULT_BIND.to_string());
match server_control::shutdown(&bind).await {
Ok(out) => json_result(&out),
Err(e) => Err(McpError::internal_error(e, None)),
}
}
#[tool(
description = "Kill + restart mse serve via `launchctl kickstart -k gui/<uid>/com.mse.server`, then healthz-polls up to 30s. Use after editing ~/.mse/config.toml to pick up the new settings. Returns {status: started, bind}."
)]
async fn mlua_swarm_server_restart(
&self,
Parameters(req): Parameters<ServerRestartReq>,
) -> Result<CallToolResult, McpError> {
let bind = req
.bind
.unwrap_or_else(|| server_control::DEFAULT_BIND.to_string());
match server_control::restart(&bind).await {
Ok(outcome) => json_result(&outcome),
Err(e) => Err(McpError::internal_error(e, None)),
}
}
#[tool(
description = "Mark a run as cancelled in the local registry. Note: in-flight handle abort is v3 carry."
)]
async fn swarm_cancel(
&self,
Parameters(req): Parameters<SwarmCancelReq>,
) -> Result<CallToolResult, McpError> {
let mut inner = self.state.write().await;
match inner.runs.get_mut(&req.run_id) {
Some(h) => {
h.status = RunStatus::Cancelled;
json_result(&serde_json::json!({ "ok": true, "run_id": req.run_id }))
}
None => Err(McpError::invalid_params(
format!("run_id not found: {}", req.run_id),
None,
)),
}
}
}
#[tool_handler]
impl ServerHandler for MseServer {
fn get_info(&self) -> ServerInfo {
let mut info = ServerInfo::default();
info.instructions = Some(
"mse mcp: MCP server for mlua-swarm-engine (stdio, sibling of mse serve). Bundled \
guides, Blueprint samples, and the live Blueprint JSON Schema are exposed as MCP \
resources under mse://."
.into(),
);
info.capabilities = ServerCapabilities::builder()
.enable_tools()
.enable_resources()
.build();
info
}
async fn list_resources(
&self,
_request: Option<PaginatedRequestParams>,
_context: RequestContext<RoleServer>,
) -> Result<ListResourcesResult, McpError> {
let resources = resources::RESOURCES
.iter()
.map(|r| {
RawResource::new(r.uri.to_string(), r.title.to_string())
.with_description(r.description.to_string())
.with_mime_type(r.mime_type.to_string())
.no_annotation()
})
.collect();
Ok(ListResourcesResult {
resources,
next_cursor: None,
meta: None,
})
}
async fn read_resource(
&self,
request: ReadResourceRequestParams,
_context: RequestContext<RoleServer>,
) -> Result<ReadResourceResult, McpError> {
let Some(entry) = resources::find_by_uri(&request.uri) else {
return Err(McpError::resource_not_found(
format!("unknown resource: {}", request.uri),
None,
));
};
let body = resources::body_for(entry).map_err(|e| McpError::internal_error(e, None))?;
Ok(ReadResourceResult::new(vec![ResourceContents::text(
body,
request.uri,
)]))
}
}
fn json_result<T: Serialize>(value: &T) -> Result<CallToolResult, McpError> {
let text = serde_json::to_string_pretty(value)
.map_err(|e| McpError::internal_error(e.to_string(), None))?;
Ok(CallToolResult::success(vec![Content::text(text)]))
}
pub async fn run() -> Result<()> {
tracing::info!("mse mcp starting (stdio transport)");
let server = MseServer::new();
let service = server.serve(rmcp::transport::io::stdio()).await?;
service.waiting().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use mlua_flow_ir::{Expr, Node as FlowNode};
use mlua_swarm::blueprint::{
current_schema_version, AgentDef, AgentKind, AgentMeta, BlueprintMetadata, CompilerHints,
CompilerStrategy,
};
fn identity_blueprint() -> Blueprint {
use mlua_swarm::worker::baseline::AG_IDENTITY;
Blueprint {
schema_version: current_schema_version(),
id: "mse mcp-l2-identity".into(),
flow: FlowNode::Step {
ref_: AG_IDENTITY.into(),
in_: Expr::Path { at: "$.in".into() },
out: Expr::Path { at: "$.out".into() },
},
agents: vec![AgentDef {
name: AG_IDENTITY.into(),
kind: AgentKind::RustFn,
spec: serde_json::json!({"fn_id": AG_IDENTITY}),
profile: None,
meta: Some(AgentMeta::default()),
}],
operators: vec![],
hints: CompilerHints::default(),
strategy: CompilerStrategy::default(),
metadata: BlueprintMetadata {
description: Some("mse mcp L2 fixture".into()),
origin: Default::default(),
tags: vec![],
version_label: Some("0.1.0".into()),
project_name_alias: None,
default_run_ttl_secs: None,
},
spawner_hints: Default::default(),
default_agent_kind: AgentKind::Operator,
default_operator_kind: None,
}
}
fn extract_text_payload(result: &rmcp::model::CallToolResult) -> String {
match &result.content.first().expect("content").raw {
rmcp::model::RawContent::Text(t) => t.text.clone(),
_ => panic!("expected text content"),
}
}
#[tokio::test]
async fn swarm_run_registers_handle_and_returns_status() {
let server = MseServer::new();
let req = SwarmRunReq {
blueprint: serde_json::json!({}),
init_ctx: None,
timeout_secs: Some(5),
operator_id: None,
operator_kind: None,
operator_kind_overrides: None,
};
let res = server.swarm_run(Parameters(req)).await.unwrap();
assert!(!res.content.is_empty());
let inner = server.state.read().await;
assert_eq!(inner.runs.len(), 1);
}
#[tokio::test]
async fn swarm_status_unknown_run_id_returns_invalid_params() {
let server = MseServer::new();
let err = server
.swarm_status(Parameters(SwarmStatusReq {
run_id: "nope".into(),
}))
.await
.unwrap_err();
let _ = format!("{:?}", err);
}
#[tokio::test]
async fn swarm_run_with_valid_identity_blueprint_completes_done() {
let server = MseServer::new();
let bp_json = serde_json::to_value(identity_blueprint()).expect("serialize blueprint");
let req = SwarmRunReq {
blueprint: bp_json,
init_ctx: Some(serde_json::json!({"in": "hello"})),
timeout_secs: Some(10),
operator_id: None,
operator_kind: None,
operator_kind_overrides: None,
};
let result = server.swarm_run(Parameters(req)).await.expect("swarm_run");
let text = extract_text_payload(&result);
let parsed: serde_json::Value = serde_json::from_str(&text).expect("parse json");
assert_eq!(parsed["status"], "done", "payload: {text}");
let out = &parsed["final_ctx"]["out"];
assert_eq!(out["by"], "baseline-identity", "payload: {text}");
assert_eq!(out["agent"], "identity", "payload: {text}");
assert!(parsed.get("head").is_some(), "payload: {text}");
assert!(parsed.get("history_len").is_some(), "payload: {text}");
assert!(parsed.get("log_tail").is_some(), "payload: {text}");
assert_eq!(parsed["history_len"], 0, "Inline mode -> 0");
}
#[tokio::test]
async fn mse_pending_wait_unknown_sid_returns_invalid_params() {
let server = MseServer::new();
let err = server
.mse_pending_wait(Parameters(OperatorPendingWaitReq {
sid: "no-such-sid".into(),
timeout_ms: Some(10),
}))
.await
.unwrap_err();
let msg = format!("{err:?}");
assert!(msg.contains("no-such-sid"), "err: {msg}");
}
#[tokio::test]
async fn mse_ack_invalid_kind_returns_invalid_params() {
let server = MseServer::new();
let err = server
.mse_ack(Parameters(OperatorAckReq {
sid: "whatever".into(),
req_id: "r1".into(),
kind: "bogus".into(),
value: None,
ok: true,
error: None,
}))
.await
.unwrap_err();
let msg = format!("{err:?}");
assert!(msg.contains("bogus"), "err: {msg}");
}
#[tokio::test]
async fn mse_ack_unknown_sid_returns_invalid_params_for_valid_kind() {
let server = MseServer::new();
let err = server
.mse_ack(Parameters(OperatorAckReq {
sid: "no-such-sid".into(),
req_id: "r1".into(),
kind: "answer".into(),
value: Some(serde_json::json!({"v": 1})),
ok: true,
error: None,
}))
.await
.unwrap_err();
let msg = format!("{err:?}");
assert!(msg.contains("no-such-sid"), "err: {msg}");
}
#[tokio::test]
async fn mse_operator_leave_unknown_sid_returns_invalid_params() {
let server = MseServer::new();
let err = server
.mse_operator_leave(Parameters(OperatorLeaveReq {
sid: "no-such-sid".into(),
}))
.await
.unwrap_err();
let msg = format!("{err:?}");
assert!(msg.contains("no-such-sid"), "err: {msg}");
}
#[tokio::test]
async fn swarm_cancel_marks_handle_cancelled() {
let server = MseServer::new();
let _ = server
.swarm_run(Parameters(SwarmRunReq {
blueprint: serde_json::json!({}),
init_ctx: None,
timeout_secs: Some(5),
operator_id: None,
operator_kind: None,
operator_kind_overrides: None,
}))
.await
.unwrap();
let run_id = {
let inner = server.state.read().await;
inner.runs.keys().next().cloned().unwrap()
};
let _ = server
.swarm_cancel(Parameters(SwarmCancelReq {
run_id: run_id.clone(),
}))
.await
.unwrap();
let inner = server.state.read().await;
assert!(matches!(
inner.runs.get(&run_id).unwrap().status,
RunStatus::Cancelled
));
}
}