use std::future::Future;
use anyhow::Result;
use clap::Args;
use rmcp::handler::server::router::tool::ToolRouter;
use rmcp::handler::server::tool::Parameters;
use rmcp::model::{CallToolResult, Content, ServerCapabilities, ServerInfo};
use rmcp::{tool, tool_handler, tool_router, Error as McpError, ServerHandler, ServiceExt};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use super::batch;
use super::identity::{get_or_create_identity, parse_relays};
use super::spawn::{nostr_spawn_round_trip, NostrSpawnOutcome};
use super::status::{nostr_status_round_trip, NostrStatusOutcome};
use super::topup::{nostr_topup_round_trip, NostrTopupOutcome};
use crate::exec_client;
use paygress::discovery::DiscoveryClient;
use paygress::nostr::ProviderFilter;
#[derive(Args, Default, Debug, Clone)]
pub struct McpArgs {
#[arg(long)]
pub nostr_key: Option<String>,
#[arg(long)]
pub relays: Option<String>,
}
pub async fn execute(args: McpArgs, _verbose: bool) -> Result<()> {
let server = PaygressMcpServer::new(args);
let (stdin, stdout) = rmcp::transport::stdio();
let running = server.serve((stdin, stdout)).await?;
running.waiting().await?;
Ok(())
}
#[derive(Clone)]
pub struct PaygressMcpServer {
nostr_key: Option<String>,
relays_override: Option<String>,
tool_router: ToolRouter<Self>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct ListProvidersParams {
#[serde(default)]
pub capability: Option<String>,
#[serde(default)]
pub min_uptime: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct SpawnParams {
pub provider: String,
pub token: String,
#[serde(default = "default_tier")]
pub tier: String,
#[serde(default)]
pub template: Option<String>,
#[serde(default = "default_image")]
pub image: String,
#[serde(default = "default_ssh_user")]
pub ssh_user: String,
#[serde(default)]
pub ssh_pass: Option<String>,
#[serde(default = "default_timeout_secs")]
pub timeout_secs: u64,
#[serde(default)]
pub isolation_level: Option<String>,
}
fn default_tier() -> String {
"basic".to_string()
}
fn default_image() -> String {
"ubuntu:22.04".to_string()
}
fn default_ssh_user() -> String {
"user".to_string()
}
fn default_timeout_secs() -> u64 {
120
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct BatchParams {
pub provider: String,
#[serde(default)]
pub tokens: Option<Vec<String>>,
#[serde(default)]
pub split_token: Option<String>,
#[serde(default)]
pub shards: Option<usize>,
#[serde(default = "default_tier")]
pub tier: String,
#[serde(default = "default_batch_template")]
pub template: String,
#[serde(default = "default_timeout_secs")]
pub timeout_secs: u64,
#[serde(default)]
pub isolation_level: Option<String>,
}
fn default_batch_template() -> String {
"agent-sandbox".to_string()
}
fn parse_iso_param(s: Option<&str>) -> Result<Option<paygress::nostr::IsolationLevel>, McpError> {
match s {
None => Ok(None),
Some(slug) => paygress::nostr::IsolationLevel::from_slug(slug).map(Some).ok_or_else(|| {
McpError::invalid_params(
format!(
"isolation_level: unknown value `{}` (expected: shared-kernel, dedicated-host, attested-research-tier)",
slug
),
None,
)
}),
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct StatusParams {
pub provider: String,
pub pod_id: String,
#[serde(default = "default_status_timeout_secs")]
pub timeout_secs: u64,
}
fn default_status_timeout_secs() -> u64 {
30
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct TopupParams {
pub provider: String,
pub pod_id: String,
pub token: String,
#[serde(default = "default_timeout_secs")]
pub timeout_secs: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct RunCommandParams {
pub host: String,
pub port: u16,
#[serde(default = "default_exec_user")]
pub user: String,
pub pass: String,
pub command: String,
#[serde(default = "default_exec_timeout_secs")]
pub timeout_secs: u64,
#[serde(default)]
pub working_dir: Option<String>,
}
fn default_exec_user() -> String {
"root".to_string()
}
fn default_exec_timeout_secs() -> u64 {
60
}
#[tool_router]
impl PaygressMcpServer {
pub fn new(args: McpArgs) -> Self {
Self {
nostr_key: args.nostr_key,
relays_override: args.relays,
tool_router: Self::tool_router(),
}
}
fn resolve_nostr_key(&self) -> Result<String, McpError> {
get_or_create_identity(self.nostr_key.clone()).map_err(|e| {
McpError::internal_error(format!("failed to load Nostr identity: {}", e), None)
})
}
fn resolve_relays(&self) -> Vec<String> {
parse_relays(self.relays_override.clone())
}
#[tool(
description = "List Paygress providers currently advertising on the Nostr network. Returns a JSON array of providers with their capabilities, advertised tiers, prices in msats/sec, and last heartbeat timestamps."
)]
async fn list_providers(
&self,
Parameters(params): Parameters<ListProvidersParams>,
) -> Result<CallToolResult, McpError> {
let relays = self.resolve_relays();
let nostr_key = self.resolve_nostr_key()?;
let client = DiscoveryClient::new_with_key(relays, nostr_key)
.await
.map_err(|e| McpError::internal_error(format!("nostr connect: {}", e), None))?;
let filter = if params.capability.is_some() || params.min_uptime.is_some() {
Some(ProviderFilter {
capability: params.capability.clone(),
min_uptime: params.min_uptime,
min_memory_mb: None,
min_cpu: None,
isolation_level: None,
})
} else {
None
};
let providers = client
.list_providers(filter)
.await
.map_err(|e| McpError::internal_error(format!("discovery: {}", e), None))?;
let json = serde_json::to_string_pretty(&providers)
.map_err(|e| McpError::internal_error(format!("serialize: {}", e), None))?;
Ok(CallToolResult::success(vec![Content::text(json)]))
}
#[tool(
description = "Spawn a single Paygress workload. Pays the provider with the supplied Cashu token, optionally materializes from a template (e.g. `agent-sandbox`, `inference-endpoint`), and returns the workload's access details (host, ssh credentials, expiry, template ports). The agent then connects to the host:port itself to do the actual work."
)]
async fn spawn_workload(
&self,
Parameters(params): Parameters<SpawnParams>,
) -> Result<CallToolResult, McpError> {
let relays = self.resolve_relays();
let nostr_key = self.resolve_nostr_key()?;
let ssh_pass = params
.ssh_pass
.clone()
.unwrap_or_else(|| generate_password(16));
let iso = parse_iso_param(params.isolation_level.as_deref())?;
let outcome = nostr_spawn_round_trip(
¶ms.provider,
¶ms.tier,
¶ms.token,
params.image.clone(),
params.ssh_user.clone(),
ssh_pass.clone(),
params.template.clone(),
None,
None,
None,
None,
iso,
relays,
nostr_key,
params.timeout_secs,
)
.await
.map_err(|e| McpError::internal_error(format!("spawn: {}", e), None))?;
let body = match outcome {
NostrSpawnOutcome::Success(access) => serde_json::json!({
"status": "spawned",
"ssh_user": params.ssh_user,
"ssh_pass": ssh_pass,
"access": access,
}),
NostrSpawnOutcome::ProviderOffline => serde_json::json!({
"status": "offline",
"message": "provider's heartbeat did not appear within the live window",
}),
NostrSpawnOutcome::ProviderError(err) => serde_json::json!({
"status": "provider_error",
"error_type": err.error_type,
"message": err.message,
"details": err.details,
}),
NostrSpawnOutcome::UnknownResponse(content) => serde_json::json!({
"status": "unknown_response",
"content": content,
}),
NostrSpawnOutcome::Timeout => serde_json::json!({
"status": "timeout",
"message": "no response within timeout; token may have been spent",
}),
};
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&body).unwrap_or_else(|_| "{}".to_string()),
)]))
}
#[tool(
description = "Fan out N Paygress workloads in parallel for map-reduce shards, CI matrices, or batch jobs. Either pass `tokens` (a list of N pre-minted Cashu tokens) or `split_token` + `shards` (one big token split into N shards before fan-out). Returns the same shard manifest as `paygress-cli batch` — index, status, host, ssh credentials, expiry, error per shard."
)]
async fn batch_spawn(
&self,
Parameters(params): Parameters<BatchParams>,
) -> Result<CallToolResult, McpError> {
let cli_args = batch::BatchArgs {
provider: params.provider.clone(),
tokens: params.tokens.as_ref().map(|v| v.join(",")),
tokens_file: None,
split_token: params.split_token.clone(),
shards: params.shards,
tier: params.tier.clone(),
template: params.template.clone(),
output: std::path::PathBuf::from("/tmp/paygress-mcp-batch-unused"),
timeout_secs: params.timeout_secs,
image: default_image(),
nostr_key: self.nostr_key.clone(),
relays: self.relays_override.clone(),
isolation_level: None,
};
let tokens = batch::materialize_tokens(&cli_args)
.await
.map_err(|e| McpError::invalid_params(format!("token resolution: {}", e), None))?;
let n = tokens.len();
let relays = self.resolve_relays();
let nostr_key = self.resolve_nostr_key()?;
let iso = parse_iso_param(params.isolation_level.as_deref())?;
let mut handles = Vec::with_capacity(n);
for (i, token) in tokens.into_iter().enumerate() {
let provider = params.provider.clone();
let tier = params.tier.clone();
let image = default_image();
let template = Some(params.template.clone());
let relays = relays.clone();
let nostr_key = nostr_key.clone();
let timeout = params.timeout_secs;
let ssh_user = "user".to_string();
let ssh_pass = generate_password(16);
let iso_per_shard = iso;
handles.push(tokio::spawn(async move {
let outcome = nostr_spawn_round_trip(
&provider,
&tier,
&token,
image,
ssh_user.clone(),
ssh_pass.clone(),
template,
None,
None,
None,
None,
iso_per_shard,
relays,
nostr_key,
timeout,
)
.await;
(i, ssh_user, ssh_pass, outcome)
}));
}
let mut shards: Vec<serde_json::Value> = Vec::with_capacity(n);
for h in handles {
let (i, ssh_user, ssh_pass, outcome) = match h.await {
Ok(v) => v,
Err(e) => {
shards.push(serde_json::json!({
"index": 0,
"status": "join_error",
"error": e.to_string(),
}));
continue;
}
};
let entry = match outcome {
Ok(NostrSpawnOutcome::Success(access)) => serde_json::json!({
"index": i,
"status": "spawned",
"ssh_user": ssh_user,
"ssh_pass": ssh_pass,
"access": access,
}),
Ok(NostrSpawnOutcome::ProviderOffline) => serde_json::json!({
"index": i,
"status": "offline",
}),
Ok(NostrSpawnOutcome::ProviderError(err)) => serde_json::json!({
"index": i,
"status": "provider_error",
"error_type": err.error_type,
"message": err.message,
}),
Ok(NostrSpawnOutcome::UnknownResponse(content)) => serde_json::json!({
"index": i,
"status": "unknown_response",
"content": content,
}),
Ok(NostrSpawnOutcome::Timeout) => serde_json::json!({
"index": i,
"status": "timeout",
}),
Err(e) => serde_json::json!({
"index": i,
"status": "transport_error",
"error": e.to_string(),
}),
};
shards.push(entry);
}
shards.sort_by_key(|v| v["index"].as_u64().unwrap_or(0));
let spawned_count = shards
.iter()
.filter(|s| s["status"].as_str() == Some("spawned"))
.count();
let manifest = serde_json::json!({
"provider": params.provider,
"template": params.template,
"tier": params.tier,
"shard_count": n,
"spawned_count": spawned_count,
"shards": shards,
});
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&manifest).unwrap_or_else(|_| "{}".to_string()),
)]))
}
#[tool(
description = "Get the current status of an existing Paygress workload by pod_id. Returns expiry, time-remaining, ssh host/port, and resource allocation. Use this to monitor a lease before it expires so you can call `topup_workload` proactively."
)]
async fn workload_status(
&self,
Parameters(params): Parameters<StatusParams>,
) -> Result<CallToolResult, McpError> {
let relays = self.resolve_relays();
let nostr_key = self.resolve_nostr_key()?;
let outcome = nostr_status_round_trip(
¶ms.pod_id,
¶ms.provider,
relays,
nostr_key,
params.timeout_secs,
)
.await
.map_err(|e| McpError::internal_error(format!("status: {}", e), None))?;
let body = match outcome {
NostrStatusOutcome::Success(s) => serde_json::json!({
"status": "ok",
"pod_id": s.pod_id,
"lease_status": s.status,
"expires_at": s.expires_at,
"time_remaining_seconds": s.time_remaining_seconds,
"cpu_millicores": s.cpu_millicores,
"memory_mb": s.memory_mb,
"ssh_host": s.ssh_host,
"ssh_port": s.ssh_port,
"ssh_username": s.ssh_username,
}),
NostrStatusOutcome::UnparseableResponse(content) => serde_json::json!({
"status": "unknown_response",
"content": content,
}),
NostrStatusOutcome::Timeout => serde_json::json!({
"status": "timeout",
"message": "provider did not respond within the timeout window",
}),
};
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&body).unwrap_or_else(|_| "{}".to_string()),
)]))
}
#[tool(
description = "Extend an existing Paygress workload's lease by paying the provider with another Cashu token. The provider redeems the token at the mint and adds `redeemed_amount / rate_msats_per_sec` seconds to the lease. Returns the new expiry on success or a structured provider error (lease_expired, not_owner, insufficient_payment, race_lost, etc.) on failure."
)]
async fn topup_workload(
&self,
Parameters(params): Parameters<TopupParams>,
) -> Result<CallToolResult, McpError> {
let relays = self.resolve_relays();
let nostr_key = self.resolve_nostr_key()?;
let outcome = nostr_topup_round_trip(
¶ms.pod_id,
¶ms.token,
¶ms.provider,
relays,
nostr_key,
params.timeout_secs,
)
.await
.map_err(|e| McpError::internal_error(format!("topup: {}", e), None))?;
let body = match outcome {
NostrTopupOutcome::Success(r) => serde_json::json!({
"status": "ok",
"pod_id": r.pod_npub,
"extended_duration_seconds": r.extended_duration_seconds,
"new_expires_at": r.new_expires_at,
"message": r.message,
}),
NostrTopupOutcome::ProviderError(err) => serde_json::json!({
"status": "provider_error",
"error_type": err.error_type,
"message": err.message,
"details": err.details,
}),
NostrTopupOutcome::UnknownResponse(content) => serde_json::json!({
"status": "unknown_response",
"content": content,
}),
NostrTopupOutcome::Timeout => serde_json::json!({
"status": "timeout",
"message": "provider did not respond within the timeout window — token MAY have been spent; call workload_status to verify before retrying"
}),
};
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&body).unwrap_or_else(|_| "{}".to_string()),
)]))
}
#[tool(
description = "Run a shell command inside an agent-sandbox workload via its baked-in HTTP exec server. Pass the host and port from the spawn response (port label `sandbox-exec`), plus the SSH password (the provider reuses it as EXEC_PASS). Returns stdout, stderr, exit_code, duration_ms, timed_out. Use this immediately after `spawn_workload` to actually run code inside the paid sandbox — Paygress is the spawn fabric, this is the exec channel."
)]
async fn run_command(
&self,
Parameters(params): Parameters<RunCommandParams>,
) -> Result<CallToolResult, McpError> {
let total_timeout = std::time::Duration::from_secs(params.timeout_secs.saturating_add(5));
let outcome = exec_client::call_exec(
¶ms.host,
params.port,
¶ms.user,
¶ms.pass,
¶ms.command,
Some(params.timeout_secs),
params.working_dir.as_deref(),
total_timeout,
)
.await;
let body = match outcome {
Ok(resp) => serde_json::json!({
"status": "ok",
"stdout": resp.stdout,
"stderr": resp.stderr,
"exit_code": resp.exit_code,
"duration_ms": resp.duration_ms,
"timed_out": resp.timed_out,
}),
Err(e) => serde_json::json!({
"status": "transport_error",
"message": e.to_string(),
}),
};
Ok(CallToolResult::success(vec![Content::text(
serde_json::to_string_pretty(&body).unwrap_or_else(|_| "{}".to_string()),
)]))
}
}
#[tool_handler]
impl ServerHandler for PaygressMcpServer {
fn get_info(&self) -> ServerInfo {
ServerInfo {
instructions: Some(
"Paygress: pay-per-use compute marketplace using Cashu ecash and Nostr. \
Full lifecycle: `list_providers` (discover) → `spawn_workload` / \
`batch_spawn` (pay + provision) → `run_command` (exec inside the \
sandbox via its baked-in HTTP server) → `workload_status` (monitor) \
→ `topup_workload` (extend before expiry). For the canonical agent \
flow: spawn an `agent-sandbox` template, then call `run_command` \
with the returned host + sandbox-exec port + ssh_pass — that's the \
paid box's stdout in your hand."
.to_string(),
),
capabilities: ServerCapabilities::builder().enable_tools().build(),
..Default::default()
}
}
}
fn generate_password(len: usize) -> String {
use rand::Rng;
const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
let mut rng = rand::thread_rng();
(0..len)
.map(|_| CHARSET[rng.gen_range(0..CHARSET.len())] as char)
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn server_advertises_tool_capability() {
let server = PaygressMcpServer::new(McpArgs::default());
let info = server.get_info();
assert!(info.capabilities.tools.is_some());
let inst = info.instructions.as_deref().unwrap_or("");
assert!(inst.contains("Paygress"));
assert!(inst.contains("workload_status"));
assert!(inst.contains("topup_workload"));
assert!(inst.contains("run_command"));
}
#[test]
fn run_command_params_defaults_round_trip() {
let v = serde_json::from_value::<RunCommandParams>(serde_json::json!({
"host": "1.2.3.4",
"port": 30043,
"pass": "hunter2",
"command": "ls /workspace",
}))
.unwrap();
assert_eq!(v.user, "root");
assert_eq!(v.timeout_secs, 60);
assert!(v.working_dir.is_none());
assert_eq!(v.command, "ls /workspace");
}
#[test]
fn spawn_params_default_tier_is_basic() {
let v = serde_json::from_value::<SpawnParams>(serde_json::json!({
"provider": "npub1abc",
"token": "tok",
}))
.unwrap();
assert_eq!(v.tier, "basic");
assert_eq!(v.image, "ubuntu:22.04");
assert_eq!(v.ssh_user, "user");
assert_eq!(v.timeout_secs, 120);
assert!(v.template.is_none());
}
#[test]
fn batch_params_default_template_is_agent_sandbox() {
let v = serde_json::from_value::<BatchParams>(serde_json::json!({
"provider": "npub1abc",
"tokens": ["t1", "t2"],
}))
.unwrap();
assert_eq!(v.template, "agent-sandbox");
assert_eq!(v.tier, "basic");
assert_eq!(v.tokens.as_ref().unwrap().len(), 2);
}
#[test]
fn status_params_defaults_round_trip() {
let v = serde_json::from_value::<StatusParams>(serde_json::json!({
"provider": "npub1abc",
"pod_id": "container-7",
}))
.unwrap();
assert_eq!(v.timeout_secs, 30);
assert_eq!(v.pod_id, "container-7");
}
#[test]
fn topup_params_defaults_round_trip() {
let v = serde_json::from_value::<TopupParams>(serde_json::json!({
"provider": "npub1abc",
"pod_id": "container-7",
"token": "tok",
}))
.unwrap();
assert_eq!(v.timeout_secs, 120);
assert_eq!(v.token, "tok");
}
#[test]
fn batch_params_split_mode_carries_through() {
let v = serde_json::from_value::<BatchParams>(serde_json::json!({
"provider": "npub1abc",
"split_token": "big-token",
"shards": 5,
}))
.unwrap();
assert!(v.tokens.is_none());
assert_eq!(v.split_token.as_deref(), Some("big-token"));
assert_eq!(v.shards, Some(5));
}
}