use async_trait::async_trait;
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use crate::context::ExecutionContext;
use crate::error::ToolError;
use crate::registry::{Tool, ToolConfig};
use crate::result::ToolResult;
use crate::template::TemplateEngine;
const DEFAULT_MCP_TIMEOUT_SECS: f64 = 60.0;
const DEFAULT_COMMAND_TIMEOUT_SECS: f64 = 180.0;
const MIN_TIMEOUT_SECS: f64 = 0.1;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub endpoint: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub server_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub base_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub server: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub method: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub action: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub arguments: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub args: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timeout: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub protocol_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub client_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub client_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub capabilities: Option<serde_json::Value>,
}
pub struct McpTool {
template_engine: TemplateEngine,
}
impl McpTool {
pub fn new() -> Self {
Self {
template_engine: TemplateEngine::new(),
}
}
fn parse_config(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<McpConfig, ToolError> {
let template_ctx = ctx.to_template_context();
let rendered = self
.template_engine
.render_value(&config.config, &template_ctx)?;
serde_json::from_value(rendered)
.map_err(|e| ToolError::Configuration(format!("Invalid mcp config: {}", e)))
}
}
impl Default for McpTool {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Tool for McpTool {
fn name(&self) -> &'static str {
"mcp"
}
async fn execute(
&self,
config: &ToolConfig,
ctx: &ExecutionContext,
) -> Result<ToolResult, ToolError> {
let mcp_cfg = self.parse_config(config, ctx)?;
let server = mcp_cfg
.server
.clone()
.or_else(|| mcp_cfg.name.clone())
.unwrap_or_else(|| "kubernetes".to_string());
let endpoint = resolve_endpoint(&mcp_cfg)?;
let method = mcp_cfg
.method
.clone()
.or_else(|| mcp_cfg.action.clone())
.unwrap_or_else(|| "tools/call".to_string());
let timeout_secs = resolve_timeout(&mcp_cfg.timeout);
let request_id = mcp_cfg.request_id.unwrap_or(1);
let log_level = "debug";
let _ = log_level;
tracing::debug!(
method = %method,
server = %server,
endpoint = %endpoint,
execution_id = ctx.execution_id,
"MCP tool dispatch"
);
let start = std::time::Instant::now();
let span = tracing::info_span!(
"mcp.op",
method = %method,
server = %server,
execution_id = ctx.execution_id,
);
let _guard = span.enter();
let result = execute_mcp(
&mcp_cfg,
&server,
&endpoint,
&method,
timeout_secs,
request_id,
)
.await;
let duration_ms = start.elapsed().as_millis() as u64;
match result {
Ok(data) => {
tracing::debug!(
method = %method,
server = %server,
duration_ms,
"MCP request complete"
);
Ok(ToolResult::success(data).with_duration(duration_ms))
}
Err(e) => {
tracing::warn!(
method = %method,
server = %server,
endpoint = %endpoint,
duration_ms,
error = %e,
execution_id = ctx.execution_id,
"MCP request failed"
);
let payload = serde_json::json!({
"status": "error",
"server": server,
"endpoint": endpoint,
"method": method,
"error": e.to_string(),
"text": e.to_string(),
});
Ok(ToolResult::success(payload).with_duration(duration_ms))
}
}
}
}
async fn execute_mcp(
cfg: &McpConfig,
server: &str,
endpoint: &str,
method: &str,
timeout_secs: f64,
request_id: u64,
) -> Result<serde_json::Value, McpError> {
let timeout = Duration::from_secs_f64(timeout_secs);
if method == "health" {
let health_url = resolve_health_url(endpoint);
tracing::debug!(url = %health_url, "MCP health probe");
let client = build_client(timeout)?;
let response = client
.get(&health_url)
.send()
.await
.map_err(|e| McpError::Http(e.to_string()))?;
response
.error_for_status_ref()
.map_err(|e| McpError::Http(e.to_string()))?;
let text = response.text().await.unwrap_or_default();
return Ok(serde_json::json!({
"status": "ok",
"server": server,
"endpoint": endpoint,
"method": method,
"healthy": true,
"text": text,
}));
}
let client = build_client(timeout)?;
let protocol_version = cfg
.protocol_version
.clone()
.unwrap_or_else(|| "2025-03-26".to_string());
let client_name = cfg
.client_name
.clone()
.unwrap_or_else(|| "noetl-worker".to_string());
let client_version = cfg
.client_version
.clone()
.unwrap_or_else(|| "0".to_string());
let capabilities = cfg.capabilities.clone().unwrap_or(serde_json::json!({}));
let init_payload = serde_json::json!({
"jsonrpc": "2.0",
"id": request_id,
"method": "initialize",
"params": {
"protocolVersion": protocol_version,
"capabilities": capabilities,
"clientInfo": {
"name": client_name,
"version": client_version,
},
},
});
let (init_envelope, session_id) = post_jsonrpc(&client, endpoint, &init_payload, None).await?;
if session_id.is_none() {
tracing::debug!(
server = %server,
"MCP server did not return session id; treating as stateless"
);
}
let (params, tool_name, arguments) = build_method_params(cfg, method)?;
let call_payload = serde_json::json!({
"jsonrpc": "2.0",
"id": request_id + 1,
"method": method,
"params": params,
});
let (envelope, _) =
post_jsonrpc(&client, endpoint, &call_payload, session_id.as_deref()).await?;
let result = envelope
.get("result")
.cloned()
.unwrap_or(serde_json::json!({}));
let result = if result.is_object() {
result
} else {
serde_json::json!({ "value": result })
};
let text = extract_text(&result);
Ok(serde_json::json!({
"status": "ok",
"server": server,
"endpoint": endpoint,
"method": method,
"tool": tool_name,
"arguments": arguments,
"text": text,
"result": result,
"initialize": init_envelope.get("result"),
}))
}
fn build_method_params(
cfg: &McpConfig,
method: &str,
) -> Result<(serde_json::Value, serde_json::Value, serde_json::Value), McpError> {
match method {
"tools/call" => {
let tool_name = cfg
.tool
.clone()
.or_else(|| cfg.tool_name.clone())
.ok_or_else(|| {
McpError::Config("mcp tool name is required for tools/call".into())
})?;
let arguments = cfg
.arguments
.clone()
.or_else(|| cfg.args.clone())
.unwrap_or(serde_json::json!({}));
let arguments = coerce_to_object(arguments, "arguments")?;
let params = serde_json::json!({
"name": tool_name,
"arguments": arguments,
});
Ok((params, serde_json::Value::String(tool_name), arguments))
}
"tools/list" => Ok((
serde_json::json!({}),
serde_json::Value::Null,
serde_json::Value::Null,
)),
_ => {
let params = cfg.params.clone().unwrap_or(serde_json::json!({}));
let params = coerce_to_object(params, "params")?;
Ok((params, serde_json::Value::Null, serde_json::Value::Null))
}
}
}
fn coerce_to_object(v: serde_json::Value, field: &str) -> Result<serde_json::Value, McpError> {
match v {
serde_json::Value::Object(_) => Ok(v),
serde_json::Value::String(s) => serde_json::from_str(&s)
.map_err(|e| McpError::Config(format!("mcp {field} must be a JSON object: {e}"))),
_ => Err(McpError::Config(format!("mcp {field} must be an object"))),
}
}
fn build_client(timeout: Duration) -> Result<reqwest::Client, McpError> {
reqwest::Client::builder()
.timeout(timeout)
.build()
.map_err(|e| McpError::Http(format!("Failed to build HTTP client: {e}")))
}
async fn post_jsonrpc(
client: &reqwest::Client,
endpoint: &str,
payload: &serde_json::Value,
session_id: Option<&str>,
) -> Result<(serde_json::Value, Option<String>), McpError> {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
headers.insert(
ACCEPT,
HeaderValue::from_static("application/json, text/event-stream"),
);
if let Some(sid) = session_id {
let hv = HeaderValue::from_str(sid)
.map_err(|e| McpError::Http(format!("Invalid session id header value: {e}")))?;
headers.insert("Mcp-Session-Id", hv);
}
let response = client
.post(endpoint)
.headers(headers)
.json(payload)
.send()
.await
.map_err(|e| McpError::Http(e.to_string()))?;
response
.error_for_status_ref()
.map_err(|e| McpError::Http(e.to_string()))?;
let returned_session = response
.headers()
.get("mcp-session-id")
.or_else(|| response.headers().get("Mcp-Session-Id"))
.and_then(|v| v.to_str().ok())
.map(str::to_string);
let body = response.text().await.unwrap_or_default();
let method_hint = payload
.get("method")
.and_then(|m| m.as_str())
.unwrap_or("request");
let envelope = parse_mcp_envelope(&body, method_hint)?;
if let Some(err) = envelope.get("error") {
let msg = if let Some(obj) = err.as_object() {
obj.get("message")
.and_then(|m| m.as_str())
.map(str::to_string)
.unwrap_or_else(|| err.to_string())
} else {
err.to_string()
};
return Err(McpError::JsonRpc(msg));
}
Ok((envelope, returned_session))
}
pub(crate) fn parse_mcp_envelope(body: &str, context: &str) -> Result<serde_json::Value, McpError> {
if let Ok(v) = serde_json::from_str(body) {
return Ok(v);
}
let data_lines: Vec<&str> = body
.lines()
.filter(|l| l.starts_with("data:"))
.map(|l| l["data:".len()..].trim())
.collect();
if data_lines.is_empty() {
let preview: String = body
.split_whitespace()
.take(40)
.collect::<Vec<_>>()
.join(" ");
return Err(McpError::Envelope(format!(
"Invalid MCP response for {context}: {preview}"
)));
}
let joined = data_lines.join("\n");
serde_json::from_str(&joined).map_err(|e| {
let preview: String = joined
.split_whitespace()
.take(40)
.collect::<Vec<_>>()
.join(" ");
McpError::Envelope(format!(
"Invalid MCP response for {context}: {preview} ({e})"
))
})
}
pub(crate) fn extract_text(result: &serde_json::Value) -> String {
if let Some(content) = result.get("content") {
if let Some(items) = content.as_array() {
let parts: Vec<&str> = items
.iter()
.filter_map(|item| {
let obj = item.as_object()?;
if obj.get("type")?.as_str()? != "text" {
return None;
}
obj.get("text")?.as_str()
})
.collect();
if !parts.is_empty() {
return parts.join("\n");
}
}
}
serde_json::to_string(result).unwrap_or_default()
}
pub(crate) fn resolve_endpoint(cfg: &McpConfig) -> Result<String, McpError> {
if let Some(ep) = cfg
.endpoint
.as_deref()
.or(cfg.url.as_deref())
.or(cfg.server_url.as_deref())
.or(cfg.base_url.as_deref())
.filter(|s| !s.is_empty())
{
return Ok(trim_slash(ep));
}
let server = cfg
.server
.as_deref()
.or(cfg.name.as_deref())
.unwrap_or("kubernetes");
let env_name = server_env_name(server);
if let Ok(ep) = std::env::var(&env_name) {
if !ep.is_empty() {
return Ok(trim_slash(&ep));
}
}
if let Ok(ep) = std::env::var("NOETL_MCP_URL") {
if !ep.is_empty() {
return Ok(trim_slash(&ep));
}
}
Err(McpError::Config(format!(
"mcp endpoint is required for server '{server}'. \
Set 'endpoint' in config or set the '{env_name}' env var."
)))
}
pub(crate) fn server_env_name(server: &str) -> String {
let safe: String = server
.chars()
.map(|c| {
if c.is_alphanumeric() {
c.to_ascii_uppercase()
} else {
'_'
}
})
.collect();
format!("NOETL_MCP_{safe}_ENDPOINT")
}
fn trim_slash(s: &str) -> String {
s.trim_end_matches('/').to_string()
}
pub(crate) fn resolve_health_url(endpoint: &str) -> String {
if let Ok(url) = reqwest::Url::parse(endpoint) {
let path = url.path().trim_end_matches('/');
let new_path = if matches!(path, "/mcp" | "/sse" | "/message") || path.is_empty() {
"/healthz".to_string()
} else {
format!("{path}/healthz")
};
let mut base = format!(
"{}://{}{}",
url.scheme(),
url.host_str().unwrap_or(""),
new_path
);
if let Some(port) = url.port() {
base = format!(
"{}://{}:{}{}",
url.scheme(),
url.host_str().unwrap_or(""),
port,
new_path
);
}
return base;
}
format!("{endpoint}/healthz")
}
pub(crate) fn resolve_timeout(timeout_value: &Option<serde_json::Value>) -> f64 {
let default_timeout = read_float_env(
"NOETL_MCP_REQUEST_TIMEOUT_SECONDS",
DEFAULT_MCP_TIMEOUT_SECS,
);
let command_budget = read_float_env(
"NOETL_WORKER_COMMAND_TIMEOUT_SECONDS",
DEFAULT_COMMAND_TIMEOUT_SECS,
)
.max(1.0);
let requested = match timeout_value {
None => return (default_timeout).min(command_budget),
Some(serde_json::Value::Null) => return (default_timeout).min(command_budget),
Some(serde_json::Value::Number(n)) => n.as_f64().unwrap_or(default_timeout),
Some(serde_json::Value::String(s)) if s.trim().is_empty() => {
return (default_timeout).min(command_budget)
}
Some(serde_json::Value::String(s)) => s.trim().parse::<f64>().unwrap_or(default_timeout),
Some(_) => return (default_timeout).min(command_budget),
};
if !requested.is_finite() || requested <= 0.0 {
return (default_timeout).min(command_budget);
}
requested.max(MIN_TIMEOUT_SECS).min(command_budget)
}
fn read_float_env(name: &str, default: f64) -> f64 {
match std::env::var(name) {
Ok(raw) if !raw.trim().is_empty() => raw
.trim()
.parse::<f64>()
.ok()
.filter(|v| v.is_finite() && *v > 0.0)
.unwrap_or(default),
_ => default,
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum McpError {
#[error("MCP config error: {0}")]
Config(String),
#[error("MCP HTTP error: {0}")]
Http(String),
#[error("MCP JSON-RPC error: {0}")]
JsonRpc(String),
#[error("MCP envelope parse error: {0}")]
Envelope(String),
}
impl From<McpError> for ToolError {
fn from(e: McpError) -> Self {
match e {
McpError::Config(msg) => ToolError::Configuration(msg),
McpError::Http(msg) | McpError::JsonRpc(msg) | McpError::Envelope(msg) => {
ToolError::ExecutionFailed(msg)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_envelope_plain_json() {
let body = r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#;
let v = parse_mcp_envelope(body, "tools/list").unwrap();
assert_eq!(v["id"], 1);
assert!(v["result"]["tools"].is_array());
}
#[test]
fn test_envelope_sse_single_data_line() {
let body = "data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"ok\":true}}\n\n";
let v = parse_mcp_envelope(body, "tools/call").unwrap();
assert_eq!(v["result"]["ok"], true);
}
#[test]
fn test_envelope_sse_multi_data_lines() {
let body = concat!(
"data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"ok\":true}}\n",
"data: \n\n",
);
let v = parse_mcp_envelope(body, "tools/call").unwrap();
assert_eq!(v["result"]["ok"], true);
}
#[test]
fn test_envelope_sse_valid_split() {
let body = "data: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"count\":42}}\ndata: \n\n";
let v = parse_mcp_envelope(body, "count").unwrap();
assert_eq!(v["result"]["count"], 42);
}
#[test]
fn test_envelope_empty_body_error() {
let r = parse_mcp_envelope("", "test");
assert!(r.is_err());
}
#[test]
fn test_extract_text_content_array() {
let result = serde_json::json!({
"content": [
{ "type": "text", "text": "hello" },
{ "type": "image", "url": "http://x" },
{ "type": "text", "text": "world" },
]
});
assert_eq!(extract_text(&result), "hello\nworld");
}
#[test]
fn test_extract_text_no_content_fallback() {
let result = serde_json::json!({ "answer": 42 });
let t = extract_text(&result);
assert!(t.contains("42"));
}
#[test]
fn test_extract_text_empty_content() {
let result = serde_json::json!({ "content": [] });
let t = extract_text(&result);
assert!(t.contains("content"));
}
#[test]
fn test_resolve_endpoint_direct() {
let cfg = McpConfig {
endpoint: Some("http://localhost:8080/mcp".to_string()),
url: None,
server_url: None,
base_url: None,
server: None,
name: None,
method: None,
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: None,
timeout: None,
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
assert_eq!(resolve_endpoint(&cfg).unwrap(), "http://localhost:8080/mcp");
}
#[test]
fn test_resolve_endpoint_trailing_slash_stripped() {
let cfg = McpConfig {
endpoint: Some("http://localhost:8080/mcp/".to_string()),
url: None,
server_url: None,
base_url: None,
server: None,
name: None,
method: None,
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: None,
timeout: None,
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
assert_eq!(resolve_endpoint(&cfg).unwrap(), "http://localhost:8080/mcp");
}
#[test]
fn test_resolve_endpoint_env_var() {
let env_name = server_env_name("my-server");
std::env::set_var(&env_name, "http://my-server:9000/mcp");
let cfg = McpConfig {
endpoint: None,
url: None,
server_url: None,
base_url: None,
server: Some("my-server".to_string()),
name: None,
method: None,
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: None,
timeout: None,
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
let ep = resolve_endpoint(&cfg).unwrap();
std::env::remove_var(&env_name);
assert_eq!(ep, "http://my-server:9000/mcp");
}
#[test]
fn test_resolve_endpoint_missing_error() {
std::env::remove_var("NOETL_MCP_KUBERNETES_ENDPOINT");
std::env::remove_var("NOETL_MCP_URL");
let cfg = McpConfig {
endpoint: None,
url: None,
server_url: None,
base_url: None,
server: None,
name: None,
method: None,
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: None,
timeout: None,
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
assert!(resolve_endpoint(&cfg).is_err());
}
#[test]
fn test_server_env_name_simple() {
assert_eq!(
server_env_name("kubernetes"),
"NOETL_MCP_KUBERNETES_ENDPOINT"
);
}
#[test]
fn test_server_env_name_dashes() {
assert_eq!(server_env_name("my-server"), "NOETL_MCP_MY_SERVER_ENDPOINT");
}
#[test]
fn test_server_env_name_dots() {
assert_eq!(
server_env_name("my.server.v2"),
"NOETL_MCP_MY_SERVER_V2_ENDPOINT"
);
}
#[test]
fn test_timeout_null_uses_default() {
std::env::remove_var("NOETL_MCP_REQUEST_TIMEOUT_SECONDS");
std::env::remove_var("NOETL_WORKER_COMMAND_TIMEOUT_SECONDS");
let t = resolve_timeout(&None);
assert_eq!(t, 60.0_f64.min(180.0));
}
#[test]
fn test_timeout_explicit_value() {
std::env::remove_var("NOETL_MCP_REQUEST_TIMEOUT_SECONDS");
std::env::remove_var("NOETL_WORKER_COMMAND_TIMEOUT_SECONDS");
let t = resolve_timeout(&Some(serde_json::json!(30)));
assert_eq!(t, 30.0);
}
#[test]
fn test_timeout_clamped_by_command_budget() {
let requested = 120_f64;
let budget = 45_f64;
let clamped = requested.max(MIN_TIMEOUT_SECS).min(budget);
assert_eq!(clamped, 45.0);
}
#[test]
fn test_timeout_string_value() {
std::env::remove_var("NOETL_MCP_REQUEST_TIMEOUT_SECONDS");
std::env::remove_var("NOETL_WORKER_COMMAND_TIMEOUT_SECONDS");
let t = resolve_timeout(&Some(serde_json::json!("25")));
assert_eq!(t, 25.0);
}
#[test]
fn test_timeout_invalid_string_falls_back_to_default() {
std::env::remove_var("NOETL_MCP_REQUEST_TIMEOUT_SECONDS");
std::env::remove_var("NOETL_WORKER_COMMAND_TIMEOUT_SECONDS");
let t = resolve_timeout(&Some(serde_json::json!("not-a-number")));
assert_eq!(t, 60.0_f64.min(180.0));
}
#[test]
fn test_health_url_mcp_path() {
assert_eq!(
resolve_health_url("http://localhost:8080/mcp"),
"http://localhost:8080/healthz"
);
}
#[test]
fn test_health_url_sse_path() {
assert_eq!(
resolve_health_url("http://localhost:8080/sse"),
"http://localhost:8080/healthz"
);
}
#[test]
fn test_health_url_message_path() {
assert_eq!(
resolve_health_url("http://localhost:8080/message"),
"http://localhost:8080/healthz"
);
}
#[test]
fn test_health_url_other_path() {
assert_eq!(
resolve_health_url("http://localhost:8080/api/v1"),
"http://localhost:8080/api/v1/healthz"
);
}
#[test]
fn test_health_url_root() {
assert_eq!(
resolve_health_url("http://localhost:8080"),
"http://localhost:8080/healthz"
);
}
#[test]
fn test_build_method_params_tools_call() {
let cfg = McpConfig {
endpoint: None,
url: None,
server_url: None,
base_url: None,
server: None,
name: None,
method: None,
action: None,
tool: Some("get_pods".to_string()),
tool_name: None,
arguments: Some(serde_json::json!({ "namespace": "default" })),
args: None,
params: None,
timeout: None,
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
let (params, tool_name, args) = build_method_params(&cfg, "tools/call").unwrap();
assert_eq!(params["name"], "get_pods");
assert_eq!(params["arguments"]["namespace"], "default");
assert_eq!(tool_name, "get_pods");
assert_eq!(args["namespace"], "default");
}
#[test]
fn test_build_method_params_tools_call_missing_tool_errors() {
let cfg = McpConfig {
endpoint: None,
url: None,
server_url: None,
base_url: None,
server: None,
name: None,
method: None,
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: None,
timeout: None,
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
let r = build_method_params(&cfg, "tools/call");
assert!(r.is_err());
}
#[test]
fn test_build_method_params_tools_list() {
let cfg = McpConfig {
endpoint: None,
url: None,
server_url: None,
base_url: None,
server: None,
name: None,
method: None,
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: None,
timeout: None,
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
let (params, tool, args) = build_method_params(&cfg, "tools/list").unwrap();
assert!(params.as_object().unwrap().is_empty());
assert!(tool.is_null());
assert!(args.is_null());
}
#[test]
fn test_build_method_params_passthrough() {
let cfg = McpConfig {
endpoint: None,
url: None,
server_url: None,
base_url: None,
server: None,
name: None,
method: None,
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: Some(serde_json::json!({ "cursor": "abc" })),
timeout: None,
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
let (params, _, _) = build_method_params(&cfg, "resources/list").unwrap();
assert_eq!(params["cursor"], "abc");
}
#[tokio::test]
async fn test_mcp_tool_name() {
let tool = McpTool::new();
assert_eq!(tool.name(), "mcp");
}
#[tokio::test]
async fn test_mcp_integration_health() {
let endpoint = match std::env::var("NOETL_TEST_MCP_ENDPOINT") {
Ok(ep) => ep,
Err(_) => return,
};
let cfg = McpConfig {
endpoint: Some(endpoint),
url: None,
server_url: None,
base_url: None,
server: Some("test".to_string()),
name: None,
method: Some("health".to_string()),
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: None,
timeout: Some(serde_json::json!(10)),
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
let result = execute_mcp(
&cfg,
"test",
cfg.endpoint.as_deref().unwrap(),
"health",
10.0,
1,
)
.await;
assert!(result.is_ok(), "health probe failed: {:?}", result);
let v = result.unwrap();
assert_eq!(v["status"], "ok");
assert_eq!(v["method"], "health");
}
#[tokio::test]
async fn test_mcp_integration_tools_list() {
let endpoint = match std::env::var("NOETL_TEST_MCP_ENDPOINT") {
Ok(ep) => ep,
Err(_) => return,
};
let cfg = McpConfig {
endpoint: Some(endpoint.clone()),
url: None,
server_url: None,
base_url: None,
server: Some("test".to_string()),
name: None,
method: Some("tools/list".to_string()),
action: None,
tool: None,
tool_name: None,
arguments: None,
args: None,
params: None,
timeout: Some(serde_json::json!(15)),
request_id: None,
protocol_version: None,
client_name: None,
client_version: None,
capabilities: None,
};
let result = execute_mcp(&cfg, "test", &endpoint, "tools/list", 15.0, 1).await;
assert!(result.is_ok(), "tools/list failed: {:?}", result);
let v = result.unwrap();
assert_eq!(v["status"], "ok");
}
}