use std::collections::BTreeMap;
use std::fmt::Write as _;
use std::path::Path;
use std::process::Stdio;
use std::time::Duration;
use rmcp::model::{CallToolRequestParams, RawContent, ResourceContents};
use rmcp::service::{RoleClient, RunningService, serve_client};
use serde_json::Value;
use tokio::process::Child;
use crate::config::{EnvValue, McpServerSpec};
use crate::container::{Container, embedded::McpDeclarationSource};
use crate::error::{OutrigError, Result};
const SHUTDOWN_GRACE: Duration = Duration::from_secs(2);
#[derive(Debug, Clone)]
pub struct McpTool {
pub name: String,
pub description: Option<String>,
pub input_schema: Value,
}
#[derive(Debug, Clone)]
pub struct McpToolResult {
pub content_text: String,
pub is_error: bool,
}
#[derive(Debug)]
pub struct McpClient {
name: String,
service: RunningService<RoleClient, ()>,
child: Child,
}
impl McpClient {
pub async fn connect_via_podman_exec(
container: &Container,
server_cfg: &McpServerSpec,
name: &str,
log_dir: &Path,
extra_env: &BTreeMap<String, EnvValue>,
) -> Result<Self> {
Self::connect_via_podman_exec_inner(container, server_cfg, name, None, log_dir, extra_env)
.await
}
pub(crate) async fn connect_via_podman_exec_with_source(
container: &Container,
server_cfg: &McpServerSpec,
name: &str,
declaration_source: McpDeclarationSource,
log_dir: &Path,
extra_env: &BTreeMap<String, EnvValue>,
) -> Result<Self> {
Self::connect_via_podman_exec_inner(
container,
server_cfg,
name,
Some(declaration_source.description()),
log_dir,
extra_env,
)
.await
}
async fn connect_via_podman_exec_inner(
container: &Container,
server_cfg: &McpServerSpec,
name: &str,
declaration_source: Option<&'static str>,
log_dir: &Path,
extra_env: &BTreeMap<String, EnvValue>,
) -> Result<Self> {
let (command, mut env_spec) = server_cfg.normalize();
for (key, value) in extra_env {
env_spec.insert(key.clone(), value.clone());
}
let mut env: BTreeMap<String, String> = BTreeMap::new();
for (key, value) in env_spec {
let resolved = value
.resolve()
.map_err(|source| OutrigError::McpEnvResolveFailed {
name: name.to_string(),
key: key.clone(),
source,
})?;
env.insert(key, resolved);
}
tokio::fs::create_dir_all(log_dir).await?;
let stderr_path = log_dir.join(format!("{name}.stderr"));
let stderr_file = tokio::fs::File::create(&stderr_path).await?;
let stderr_std = stderr_file.into_std().await;
let exec_cmd = container.build_exec_argv(&command, &env);
if let Some(transcript) = container.transcript() {
transcript
.line("podman", &format!("$ {}", exec_cmd.render()))
.await?;
}
let mut cmd = exec_cmd.to_tokio_command();
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::from(stderr_std))
.kill_on_drop(true);
let mut child = cmd.spawn()?;
let stdin = child.stdin.take().expect("stdin was piped above");
let stdout = child.stdout.take().expect("stdout was piped above");
let service = match serve_client((), (stdout, stdin)).await {
Ok(s) => s,
Err(source) => {
return Err(enrich_startup_error(
name,
declaration_source,
&command,
&stderr_path,
&mut child,
source,
)
.await);
}
};
Ok(Self {
name: name.to_string(),
service,
child,
})
}
pub fn name(&self) -> &str {
&self.name
}
pub async fn list_tools(&self) -> Result<Vec<McpTool>> {
let tools = self.service.list_all_tools().await.map_err(|source| {
OutrigError::McpToolsListFailed {
name: self.name.clone(),
source: Box::new(source),
}
})?;
Ok(tools
.into_iter()
.map(|t| {
let input_schema = t.schema_as_json_value();
let description = t.description.and_then(|description| {
(!description.is_empty()).then(|| description.into_owned())
});
McpTool {
name: t.name.into_owned(),
description,
input_schema,
}
})
.collect())
}
pub async fn call_tool(&self, name: &str, args: Value) -> Result<McpToolResult> {
let arguments = match args {
Value::Object(map) => Some(map),
Value::Null => None,
other => {
return Err(OutrigError::McpArgsNotObject {
kind: kind_of(&other),
});
}
};
let mut request = CallToolRequestParams::new(name.to_string());
if let Some(arguments) = arguments {
request = request.with_arguments(arguments);
}
let result = self.service.call_tool(request).await?;
let mut content_text = String::new();
for (i, content) in result.content.iter().enumerate() {
if i > 0 {
content_text.push('\n');
}
match &content.raw {
RawContent::Text(t) => content_text.push_str(&t.text),
RawContent::Image(img) => {
let _ = write!(
content_text,
"[image: {}, {} base64 bytes]",
img.mime_type,
img.data.len()
);
}
RawContent::Resource(r) => match &r.resource {
ResourceContents::TextResourceContents { text, .. } => {
content_text.push_str(text)
}
ResourceContents::BlobResourceContents {
mime_type, blob, ..
} => {
let mime = mime_type.as_deref().unwrap_or("application/octet-stream");
let _ = write!(content_text, "[blob: {mime}, {} base64 bytes]", blob.len());
}
},
RawContent::Audio(audio) => {
let _ = write!(
content_text,
"[audio: {}, {} base64 bytes]",
audio.mime_type,
audio.data.len()
);
}
RawContent::ResourceLink(link) => {
let _ = write!(content_text, "[resource link: {}]", link.uri);
}
}
}
Ok(McpToolResult {
content_text,
is_error: result.is_error.unwrap_or(false),
})
}
pub async fn shutdown(self) -> Result<()> {
let Self {
service, mut child, ..
} = self;
let _ = tokio::time::timeout(SHUTDOWN_GRACE, service.cancel()).await;
match tokio::time::timeout(SHUTDOWN_GRACE, child.wait()).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(e.into()),
Err(_) => {
let _ = child.start_kill();
let _ = tokio::time::timeout(SHUTDOWN_GRACE, child.wait()).await;
Ok(())
}
}
}
}
async fn enrich_startup_error(
name: &str,
declaration_source: Option<&str>,
command: &[String],
stderr_path: &Path,
child: &mut Child,
source: rmcp::service::ClientInitializeError,
) -> OutrigError {
let exit_status = match tokio::time::timeout(Duration::from_millis(250), child.wait()).await {
Ok(Ok(status)) => Some(status),
_ => None,
};
let exit = format_exit(exit_status);
let stderr_tail = read_stderr_tail(stderr_path).await;
if !exit_status.is_some_and(|s| s.success()) {
tracing::error!(
target: "outrig::mcp",
server = name,
"mcp server {name:?} terminated before initialize ({exit}); \
see {} for details",
stderr_path.display()
);
}
OutrigError::McpStartupFailed(Box::new(crate::error::McpStartupFailure {
name: name.to_string(),
declaration_source: declaration_source.map(str::to_string),
command: render_command(command),
exit,
stderr_path: stderr_path.to_path_buf(),
stderr_tail,
source: Box::new(source),
}))
}
fn format_exit(status: Option<std::process::ExitStatus>) -> String {
let Some(status) = status else {
return "still running (wait timed out)".to_string();
};
if let Some(code) = status.code() {
return format!("code {code}");
}
#[cfg(unix)]
{
use std::os::unix::process::ExitStatusExt;
if let Some(sig) = status.signal() {
return format!("signal {sig}");
}
}
"terminated".to_string()
}
async fn read_stderr_tail(path: &Path) -> String {
use tokio::io::{AsyncReadExt, AsyncSeekExt, SeekFrom};
const MAX: u64 = 2048;
async fn inner(path: &Path) -> std::io::Result<Vec<u8>> {
let mut file = tokio::fs::File::open(path).await?;
let len = file.metadata().await?.len();
if len > MAX {
file.seek(SeekFrom::End(-(MAX as i64))).await?;
}
let mut buf = Vec::with_capacity(len.min(MAX) as usize);
file.read_to_end(&mut buf).await?;
Ok(buf)
}
match inner(path).await {
Ok(bytes) if bytes.is_empty() => "(empty)".to_string(),
Ok(bytes) => String::from_utf8_lossy(&bytes).trim_end().to_string(),
Err(_) => "(could not read stderr file)".to_string(),
}
}
fn render_command(argv: &[String]) -> String {
argv.iter()
.map(|a| {
if a.is_empty()
|| a.chars()
.any(|c| c.is_whitespace() || matches!(c, '\'' | '"' | '$' | '`' | '\\'))
{
format!("'{}'", a.replace('\'', "'\\''"))
} else {
a.clone()
}
})
.collect::<Vec<_>>()
.join(" ")
}
pub(crate) fn kind_of(v: &Value) -> &'static str {
match v {
Value::Null => "null",
Value::Bool(_) => "bool",
Value::Number(_) => "number",
Value::String(_) => "string",
Value::Array(_) => "array",
Value::Object(_) => "object",
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn render_command_passes_through_simple_argv() {
let argv = vec![
"mcp-server-git".to_string(),
"--repository".to_string(),
"/workspace".to_string(),
];
assert_eq!(
render_command(&argv),
"mcp-server-git --repository /workspace"
);
}
#[test]
fn render_command_quotes_args_with_whitespace_or_specials() {
let argv = vec![
"echo".to_string(),
"hello world".to_string(),
"it's".to_string(),
"".to_string(),
];
assert_eq!(render_command(&argv), "echo 'hello world' 'it'\\''s' ''");
}
#[tokio::test]
async fn read_stderr_tail_handles_empty_missing_and_long() {
let dir = tempfile::tempdir().unwrap();
let missing = dir.path().join("nope.stderr");
assert_eq!(
read_stderr_tail(&missing).await,
"(could not read stderr file)"
);
let empty = dir.path().join("empty.stderr");
tokio::fs::write(&empty, b"").await.unwrap();
assert_eq!(read_stderr_tail(&empty).await, "(empty)");
let normal = dir.path().join("normal.stderr");
tokio::fs::write(&normal, b"line one\nline two\n")
.await
.unwrap();
assert_eq!(read_stderr_tail(&normal).await, "line one\nline two");
let big = dir.path().join("big.stderr");
let payload: Vec<u8> = (0..10_000).map(|i| b'A' + (i % 26) as u8).collect();
tokio::fs::write(&big, &payload).await.unwrap();
let tail = read_stderr_tail(&big).await;
assert!(tail.len() <= 2048, "tail was {} bytes", tail.len());
assert!(payload.ends_with(tail.trim_end().as_bytes()));
}
#[tokio::test]
async fn enrich_startup_error_carries_name_command_and_stderr() {
let dir = tempfile::tempdir().unwrap();
let stderr_path = dir.path().join("svc.stderr");
let stderr_file = tokio::fs::File::create(&stderr_path).await.unwrap();
let mut child = tokio::process::Command::new("sh")
.args(["-c", "echo boom 1>&2; exit 7"])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::from(stderr_file.into_std().await))
.kill_on_drop(true)
.spawn()
.unwrap();
let argv = vec![
"sh".to_string(),
"-c".to_string(),
"echo boom 1>&2; exit 7".to_string(),
];
let source = rmcp::service::ClientInitializeError::ConnectionClosed(
"expect initialize response".to_string(),
);
let err = enrich_startup_error(
"svc",
Some("launch spec"),
&argv,
&stderr_path,
&mut child,
source,
)
.await;
let OutrigError::McpStartupFailed(payload) = &err else {
panic!("expected McpStartupFailed, got {err:?}");
};
assert_eq!(payload.name, "svc");
assert_eq!(payload.declaration_source.as_deref(), Some("launch spec"));
assert!(payload.command.contains("sh"));
assert_eq!(payload.exit, "code 7");
assert!(
payload.stderr_tail.contains("boom"),
"stderr_tail={:?}",
payload.stderr_tail
);
let display = err.to_string();
assert!(display.contains("svc"));
assert!(display.contains("from launch spec"));
assert!(display.contains("expect initialize response"));
assert!(display.contains("code 7"));
assert!(display.contains("boom"));
}
#[test]
fn format_exit_renders_codes_signals_and_unknown() {
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
assert_eq!(format_exit(Some(ExitStatus::from_raw(0))), "code 0");
assert_eq!(format_exit(Some(ExitStatus::from_raw(2 << 8))), "code 2");
assert_eq!(format_exit(Some(ExitStatus::from_raw(9))), "signal 9");
assert_eq!(format_exit(None), "still running (wait timed out)");
}
}