use anyhow::{Context, Result, bail};
use serde_json::{Value, json};
use std::sync::Arc;
use super::protocol::JsonRpcRequest;
use super::protocol::{
ClientCapabilities, ClientInfo, InitializeParams, InitializeResult, McpPrompt, McpResource,
McpToolCallResult, McpToolDefinition, PREFERRED_PROTOCOL_VERSION, PromptGetParams,
PromptGetResult, PromptsListResult, ResourceReadParams, ResourceReadResult,
ResourcesListResult, ToolCallParams, ToolsListResult, is_known_protocol_version,
};
use super::transport::McpTransport;
pub const MCP_PROTOCOL_VERSION: &str = PREFERRED_PROTOCOL_VERSION;
pub struct McpClient<T: McpTransport> {
transport: Arc<T>,
server_name: String,
server_info: Option<InitializeResult>,
negotiated_version: Option<String>,
}
impl<T: McpTransport> McpClient<T> {
pub async fn new(transport: Arc<T>, server_name: String) -> Result<Self> {
let mut client = Self {
transport,
server_name,
server_info: None,
negotiated_version: None,
};
client.initialize().await?;
Ok(client)
}
#[must_use]
pub const fn new_uninitialized(transport: Arc<T>, server_name: String) -> Self {
Self {
transport,
server_name,
server_info: None,
negotiated_version: None,
}
}
pub async fn initialize(&mut self) -> Result<&InitializeResult> {
#[cfg(feature = "otel")]
let started_at = std::time::Instant::now();
#[cfg(feature = "otel")]
let mut span = {
use crate::observability::langfuse;
let mut span = start_mcp_span("mcp.initialize", &self.server_name);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Chain);
span
};
let result = self.initialize_inner().await;
#[cfg(feature = "otel")]
finish_mcp_span(
&mut span,
&result,
"initialize",
&self.server_name,
started_at,
);
result?;
self.server_info
.as_ref()
.context("Server info not available")
}
async fn initialize_inner(&mut self) -> Result<()> {
let params = InitializeParams {
protocol_version: PREFERRED_PROTOCOL_VERSION.to_string(),
capabilities: ClientCapabilities::default(),
client_info: ClientInfo {
name: "agent-sdk".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
};
let request = JsonRpcRequest::new("initialize", Some(serde_json::to_value(¶ms)?), 0);
let response = self.transport.send(request).await?;
let result: InitializeResult = response
.result
.map(serde_json::from_value)
.transpose()
.context("Failed to parse initialize response")?
.context("Initialize response missing result")?;
let negotiated = result.protocol_version.clone();
if !is_known_protocol_version(&negotiated) {
log::warn!(
"MCP server '{}' negotiated unknown protocol revision '{}' (advertised '{}')",
self.server_name,
negotiated,
PREFERRED_PROTOCOL_VERSION,
);
}
self.transport.set_protocol_version(&negotiated).await;
self.negotiated_version = Some(negotiated);
let notification = JsonRpcRequest::new("notifications/initialized", None, 0);
let _ = self.transport.send_notification(notification).await;
self.server_info = Some(result);
Ok(())
}
#[must_use]
pub fn server_name(&self) -> &str {
&self.server_name
}
#[must_use]
pub const fn server_info(&self) -> Option<&InitializeResult> {
self.server_info.as_ref()
}
#[must_use]
pub fn protocol_version(&self) -> Option<&str> {
self.negotiated_version.as_deref()
}
pub async fn list_tools(&self) -> Result<Vec<McpToolDefinition>> {
#[cfg(feature = "otel")]
let started_at = std::time::Instant::now();
#[cfg(feature = "otel")]
let mut span = {
use crate::observability::langfuse;
let mut span = start_mcp_span("mcp.tools/list", &self.server_name);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Chain);
span
};
let result = self.list_tools_inner().await;
#[cfg(feature = "otel")]
{
use opentelemetry::KeyValue;
use opentelemetry::trace::Span;
if let Ok(ref tools) = result {
span.set_attribute(KeyValue::new(
"mcp.tools.count",
i64::try_from(tools.len()).unwrap_or(0),
));
}
finish_mcp_span(
&mut span,
&result,
"tools/list",
&self.server_name,
started_at,
);
}
result
}
async fn list_tools_inner(&self) -> Result<Vec<McpToolDefinition>> {
let request = JsonRpcRequest::new("tools/list", None, 0);
let response = self.transport.send(request).await?;
let result: ToolsListResult = response
.result
.map(serde_json::from_value)
.transpose()
.context("Failed to parse tools/list response")?
.context("tools/list response missing result")?;
Ok(result.tools)
}
pub async fn call_tool(&self, name: &str, arguments: Value) -> Result<McpToolCallResult> {
#[cfg(feature = "otel")]
let started_at = std::time::Instant::now();
#[cfg(feature = "otel")]
let mut span = {
use crate::observability::langfuse;
use opentelemetry::KeyValue;
let mut span = start_mcp_span_with_attrs(
"mcp.tools/call",
vec![
KeyValue::new("mcp.server.name", self.server_name.clone()),
KeyValue::new("gen_ai.tool.name", name.to_string()),
],
);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Tool);
span
};
let result = self.call_tool_inner(name, arguments).await;
#[cfg(feature = "otel")]
finish_mcp_call_tool_span(
&mut span,
&result,
"tools/call",
&self.server_name,
started_at,
);
result
}
async fn call_tool_inner(&self, name: &str, arguments: Value) -> Result<McpToolCallResult> {
let params = ToolCallParams {
name: name.to_string(),
arguments: Some(arguments),
};
let request = JsonRpcRequest::new("tools/call", Some(serde_json::to_value(¶ms)?), 0);
let response = self.transport.send(request).await?;
if let Some(ref error) = response.error {
bail!("Tool call failed: {} (code {})", error.message, error.code);
}
let result: McpToolCallResult = response
.result
.map(serde_json::from_value)
.transpose()
.context("Failed to parse tools/call response")?
.context("tools/call response missing result")?;
Ok(result)
}
pub async fn call_tool_raw(
&self,
name: &str,
arguments: Option<Value>,
) -> Result<McpToolCallResult> {
let args = arguments.unwrap_or_else(|| json!({}));
self.call_tool(name, args).await
}
pub async fn list_resources(&self) -> Result<Vec<McpResource>> {
if !self.supports_resources() {
return Ok(Vec::new());
}
#[cfg(feature = "otel")]
let started_at = std::time::Instant::now();
#[cfg(feature = "otel")]
let mut span = {
use crate::observability::langfuse;
let mut span = start_mcp_span("mcp.resources/list", &self.server_name);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Chain);
span
};
let result = self.list_resources_inner().await;
#[cfg(feature = "otel")]
finish_mcp_span(
&mut span,
&result,
"resources/list",
&self.server_name,
started_at,
);
result
}
async fn list_resources_inner(&self) -> Result<Vec<McpResource>> {
let request = JsonRpcRequest::new("resources/list", None, 0);
let response = self.transport.send(request).await?;
let result: ResourcesListResult = response
.result
.map(serde_json::from_value)
.transpose()
.context("Failed to parse resources/list response")?
.context("resources/list response missing result")?;
Ok(result.resources)
}
pub async fn read_resource(&self, uri: &str) -> Result<ResourceReadResult> {
#[cfg(feature = "otel")]
let started_at = std::time::Instant::now();
#[cfg(feature = "otel")]
let mut span = {
use crate::observability::langfuse;
let mut span = start_mcp_span("mcp.resources/read", &self.server_name);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Chain);
span
};
let result = self.read_resource_inner(uri).await;
#[cfg(feature = "otel")]
finish_mcp_span(
&mut span,
&result,
"resources/read",
&self.server_name,
started_at,
);
result
}
async fn read_resource_inner(&self, uri: &str) -> Result<ResourceReadResult> {
let params = ResourceReadParams {
uri: uri.to_string(),
};
let request =
JsonRpcRequest::new("resources/read", Some(serde_json::to_value(¶ms)?), 0);
let response = self.transport.send(request).await?;
let result: ResourceReadResult = response
.result
.map(serde_json::from_value)
.transpose()
.context("Failed to parse resources/read response")?
.context("resources/read response missing result")?;
Ok(result)
}
pub async fn list_prompts(&self) -> Result<Vec<McpPrompt>> {
if !self.supports_prompts() {
return Ok(Vec::new());
}
#[cfg(feature = "otel")]
let started_at = std::time::Instant::now();
#[cfg(feature = "otel")]
let mut span = {
use crate::observability::langfuse;
let mut span = start_mcp_span("mcp.prompts/list", &self.server_name);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Chain);
span
};
let result = self.list_prompts_inner().await;
#[cfg(feature = "otel")]
finish_mcp_span(
&mut span,
&result,
"prompts/list",
&self.server_name,
started_at,
);
result
}
async fn list_prompts_inner(&self) -> Result<Vec<McpPrompt>> {
let request = JsonRpcRequest::new("prompts/list", None, 0);
let response = self.transport.send(request).await?;
let result: PromptsListResult = response
.result
.map(serde_json::from_value)
.transpose()
.context("Failed to parse prompts/list response")?
.context("prompts/list response missing result")?;
Ok(result.prompts)
}
pub async fn get_prompt(
&self,
name: &str,
arguments: Option<Value>,
) -> Result<PromptGetResult> {
#[cfg(feature = "otel")]
let started_at = std::time::Instant::now();
#[cfg(feature = "otel")]
let mut span = {
use crate::observability::langfuse;
let mut span = start_mcp_span("mcp.prompts/get", &self.server_name);
langfuse::tag_observation(&mut span, langfuse::ObservationType::Chain);
span
};
let result = self.get_prompt_inner(name, arguments).await;
#[cfg(feature = "otel")]
finish_mcp_span(
&mut span,
&result,
"prompts/get",
&self.server_name,
started_at,
);
result
}
async fn get_prompt_inner(
&self,
name: &str,
arguments: Option<Value>,
) -> Result<PromptGetResult> {
let params = PromptGetParams {
name: name.to_string(),
arguments,
};
let request = JsonRpcRequest::new("prompts/get", Some(serde_json::to_value(¶ms)?), 0);
let response = self.transport.send(request).await?;
let result: PromptGetResult = response
.result
.map(serde_json::from_value)
.transpose()
.context("Failed to parse prompts/get response")?
.context("prompts/get response missing result")?;
Ok(result)
}
#[must_use]
pub fn supports_resources(&self) -> bool {
self.server_info
.as_ref()
.is_some_and(|info| info.capabilities.resources.is_some())
}
#[must_use]
pub fn supports_prompts(&self) -> bool {
self.server_info
.as_ref()
.is_some_and(|info| info.capabilities.prompts.is_some())
}
pub async fn close(&self) -> Result<()> {
self.transport.close().await
}
}
#[cfg(feature = "otel")]
fn start_mcp_span(
name: impl Into<std::borrow::Cow<'static, str>>,
server_name: &str,
) -> opentelemetry::global::BoxedSpan {
use opentelemetry::KeyValue;
start_mcp_span_with_attrs(
name,
vec![KeyValue::new("mcp.server.name", server_name.to_string())],
)
}
#[cfg(feature = "otel")]
fn start_mcp_span_with_attrs(
name: impl Into<std::borrow::Cow<'static, str>>,
attrs: Vec<opentelemetry::KeyValue>,
) -> opentelemetry::global::BoxedSpan {
use crate::observability::{baggage, spans};
let mut span = spans::start_client_span(name, attrs);
baggage::copy_baggage_to_active_span(&mut span);
span
}
#[cfg(feature = "otel")]
fn finish_mcp_span<T>(
span: &mut opentelemetry::global::BoxedSpan,
result: &Result<T>,
method: &'static str,
server_name: &str,
started_at: std::time::Instant,
) {
use crate::observability::{metrics, spans};
use opentelemetry::KeyValue;
use opentelemetry::trace::Span;
let mut metric_attrs = vec![
KeyValue::new("mcp.method", method),
KeyValue::new("mcp.server.name", server_name.to_string()),
];
if let Err(err) = result {
spans::set_span_error(span, "mcp_error", &format!("{err}"));
metric_attrs.push(KeyValue::new(
crate::observability::attrs::ERROR_TYPE,
"mcp_error",
));
}
let elapsed_secs = started_at.elapsed().as_secs_f64();
metrics::Metrics::global()
.mcp_requests_duration
.record(elapsed_secs, &metric_attrs);
span.end();
}
#[cfg(feature = "otel")]
fn finish_mcp_call_tool_span(
span: &mut opentelemetry::global::BoxedSpan,
result: &Result<super::protocol::McpToolCallResult>,
method: &'static str,
server_name: &str,
started_at: std::time::Instant,
) {
use crate::observability::{metrics, spans};
use opentelemetry::KeyValue;
use opentelemetry::trace::Span;
let mut metric_attrs = vec![
KeyValue::new("mcp.method", method),
KeyValue::new("mcp.server.name", server_name.to_string()),
];
let error_kind: Option<&'static str> = match result {
Ok(tool_result) if tool_result.is_error => {
let error_text = tool_result
.content
.iter()
.find_map(|c| match c {
super::protocol::McpContent::Text { text } => Some(text.as_str()),
_ => None,
})
.unwrap_or("MCP tool returned error");
spans::set_span_error(span, "tool_error", error_text);
Some("tool_error")
}
Err(err) => {
spans::set_span_error(span, "mcp_error", &format!("{err}"));
Some("mcp_error")
}
Ok(_) => None,
};
if let Some(kind) = error_kind {
metric_attrs.push(KeyValue::new(crate::observability::attrs::ERROR_TYPE, kind));
}
let elapsed_secs = started_at.elapsed().as_secs_f64();
metrics::Metrics::global()
.mcp_requests_duration
.record(elapsed_secs, &metric_attrs);
span.end();
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mcp_protocol_version() {
assert!(!MCP_PROTOCOL_VERSION.is_empty());
}
#[test]
fn test_client_info() {
let info = ClientInfo {
name: "test".to_string(),
version: "1.0.0".to_string(),
};
let json = serde_json::to_string(&info).expect("serialize");
assert!(json.contains("test"));
assert!(json.contains("1.0.0"));
}
}