use std::collections::HashMap;
use std::io::{self, BufRead, Write};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{Mutex, oneshot};
use crate::context::{
ChannelClientRequester, ClientRequesterHandle, NotificationReceiver, OutgoingRequest,
OutgoingRequestReceiver, ServerNotification, notification_channel, outgoing_request_channel,
};
use tower_service::Service;
use crate::error::{Error, Result};
use crate::jsonrpc::JsonRpcService;
use crate::protocol::{
JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JsonRpcResponseMessage,
McpNotification, RequestId, notifications,
};
use crate::router::{McpRouter, RouterRequest, RouterResponse};
use crate::transport::service::{CatchError, InjectAnnotations};
fn clean_input_line(line: &str) -> &str {
line.strip_prefix('\u{feff}').unwrap_or(line).trim()
}
async fn process_line(
service: &mut JsonRpcService<McpRouter>,
router: &McpRouter,
line: &str,
) -> Result<Option<JsonRpcResponseMessage>> {
let parsed: serde_json::Value = serde_json::from_str(line)?;
if parsed.get("id").is_none()
&& let Ok(notification) = serde_json::from_str::<JsonRpcNotification>(line)
{
handle_notification(router, notification)?;
return Ok(None);
}
let message: JsonRpcMessage = serde_json::from_str(line)?;
let response = service.call_message(message).await?;
Ok(Some(response))
}
fn handle_notification(router: &McpRouter, notification: JsonRpcNotification) -> Result<()> {
let mcp_notification = McpNotification::from_jsonrpc(¬ification)?;
router.handle_notification(mcp_notification);
Ok(())
}
pub(crate) fn serialize_notification(notification: &ServerNotification) -> Option<String> {
match notification {
ServerNotification::Progress(params) => {
let notif = JsonRpcNotification::new(notifications::PROGRESS)
.with_params(serde_json::to_value(params).unwrap_or_default());
serde_json::to_string(¬if).ok()
}
ServerNotification::LogMessage(params) => {
let notif = JsonRpcNotification::new(notifications::MESSAGE)
.with_params(serde_json::to_value(params).unwrap_or_default());
serde_json::to_string(¬if).ok()
}
ServerNotification::ResourceUpdated { uri } => {
let notif = JsonRpcNotification::new(notifications::RESOURCE_UPDATED)
.with_params(serde_json::json!({ "uri": uri }));
serde_json::to_string(¬if).ok()
}
ServerNotification::ResourcesListChanged => {
let notif = JsonRpcNotification::new(notifications::RESOURCES_LIST_CHANGED);
serde_json::to_string(¬if).ok()
}
ServerNotification::ToolsListChanged => {
let notif = JsonRpcNotification::new(notifications::TOOLS_LIST_CHANGED);
serde_json::to_string(¬if).ok()
}
ServerNotification::PromptsListChanged => {
let notif = JsonRpcNotification::new(notifications::PROMPTS_LIST_CHANGED);
serde_json::to_string(¬if).ok()
}
ServerNotification::TaskStatusChanged(params) => {
let notif = JsonRpcNotification::new(notifications::TASK_STATUS_CHANGED)
.with_params(serde_json::to_value(params).unwrap_or_default());
serde_json::to_string(¬if).ok()
}
}
}
async fn write_line_to_stdout(stdout: &mut tokio::io::Stdout, line: &str) -> Result<()> {
stdout
.write_all(line.as_bytes())
.await
.map_err(|e| Error::Transport(format!("Failed to write to stdout: {}", e)))?;
stdout
.write_all(b"\n")
.await
.map_err(|e| Error::Transport(format!("Failed to write newline: {}", e)))?;
stdout
.flush()
.await
.map_err(|e| Error::Transport(format!("Failed to flush stdout: {}", e)))?;
Ok(())
}
pub struct StdioTransport {
service: JsonRpcService<McpRouter>,
router: McpRouter,
notification_rx: NotificationReceiver,
}
impl StdioTransport {
pub fn new(router: McpRouter) -> Self {
let (notif_tx, notification_rx) = notification_channel(256);
let router = router.with_notification_sender(notif_tx);
let service = JsonRpcService::new(router.clone());
Self {
service,
router,
notification_rx,
}
}
pub fn layer<L>(
self,
layer: L,
) -> GenericStdioTransport<InjectAnnotations<CatchError<L::Service>>>
where
L: tower::Layer<McpRouter>,
L::Service: Service<RouterRequest, Response = RouterResponse> + Clone + Send + 'static,
<L::Service as Service<RouterRequest>>::Error: std::fmt::Display + Send,
<L::Service as Service<RouterRequest>>::Future: Send,
{
let annotations = self.router.tool_annotations_map();
let wrapped = layer.layer(self.router);
let service = InjectAnnotations::new(CatchError::new(wrapped), annotations);
GenericStdioTransport::with_notifications(service, self.notification_rx)
}
pub async fn run(&mut self) -> Result<()> {
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
let mut reader = BufReader::new(stdin);
tracing::info!("Stdio transport started, waiting for input");
loop {
let mut line = String::new();
tokio::select! {
result = reader.read_line(&mut line) => {
let bytes_read = result.map_err(|e| {
Error::Transport(format!("Failed to read from stdin: {}", e))
})?;
if bytes_read == 0 {
tracing::info!("Stdin closed, shutting down");
break;
}
let trimmed = clean_input_line(&line);
if trimmed.is_empty() {
continue;
}
tracing::debug!(input = %trimmed, "Received message");
match process_line(&mut self.service, &self.router, trimmed).await {
Ok(Some(response)) => {
let response_json = serde_json::to_string(&response).map_err(|e| {
Error::Transport(format!("Failed to serialize response: {}", e))
})?;
tracing::debug!(output = %response_json, "Sending response");
write_line_to_stdout(&mut stdout, &response_json).await?;
}
Ok(None) => {
}
Err(e) => {
tracing::error!(error = %e, "Error processing message");
let error_response = JsonRpcResponse::error(
None,
crate::error::JsonRpcError::parse_error(e.to_string()),
);
let response_json = serde_json::to_string(&error_response).map_err(|e| {
Error::Transport(format!("Failed to serialize error: {}", e))
})?;
write_line_to_stdout(&mut stdout, &response_json).await?;
}
}
}
Some(notification) = self.notification_rx.recv() => {
if let Some(json) = serialize_notification(¬ification) {
tracing::debug!(output = %json, "Sending notification");
write_line_to_stdout(&mut stdout, &json).await?;
}
}
}
}
Ok(())
}
}
pub struct GenericStdioTransport<S>
where
S: Service<RouterRequest, Response = RouterResponse, Error = std::convert::Infallible>
+ Clone
+ Send
+ 'static,
S::Future: Send,
{
service: JsonRpcService<S>,
notification_rx: Option<NotificationReceiver>,
}
impl<S> GenericStdioTransport<S>
where
S: Service<RouterRequest, Response = RouterResponse, Error = std::convert::Infallible>
+ Clone
+ Send
+ 'static,
S::Future: Send,
{
pub fn new(service: S) -> Self {
Self {
service: JsonRpcService::new(service),
notification_rx: None,
}
}
pub fn with_notifications(service: S, notification_rx: NotificationReceiver) -> Self {
Self {
service: JsonRpcService::new(service),
notification_rx: Some(notification_rx),
}
}
pub async fn run(&mut self) -> Result<()> {
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
let mut reader = BufReader::new(stdin);
tracing::info!("Generic stdio transport started, waiting for input");
loop {
let mut line = String::new();
if let Some(ref mut notif_rx) = self.notification_rx {
tokio::select! {
result = reader.read_line(&mut line) => {
let bytes_read = result.map_err(|e| {
Error::Transport(format!("Failed to read from stdin: {}", e))
})?;
if bytes_read == 0 {
tracing::info!("Stdin closed, shutting down");
break;
}
self.process_input(&line, &mut stdout).await?;
}
Some(notification) = notif_rx.recv() => {
if let Some(json) = serialize_notification(¬ification) {
tracing::debug!(output = %json, "Sending notification");
write_line_to_stdout(&mut stdout, &json).await?;
}
}
}
} else {
let bytes_read = reader
.read_line(&mut line)
.await
.map_err(|e| Error::Transport(format!("Failed to read from stdin: {}", e)))?;
if bytes_read == 0 {
tracing::info!("Stdin closed, shutting down");
break;
}
self.process_input(&line, &mut stdout).await?;
}
}
Ok(())
}
async fn process_input(&mut self, line: &str, stdout: &mut tokio::io::Stdout) -> Result<()> {
let trimmed = clean_input_line(line);
if trimmed.is_empty() {
return Ok(());
}
tracing::debug!(input = %trimmed, "Received message");
let parsed: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(e) => {
self.write_error(stdout, None, &e.to_string()).await?;
return Ok(());
}
};
if parsed.get("id").is_none() {
tracing::debug!(
method = parsed.get("method").and_then(|m| m.as_str()),
"Received notification (ignored in generic transport)"
);
return Ok(());
}
let message: JsonRpcMessage = match serde_json::from_str(trimmed) {
Ok(m) => m,
Err(e) => {
self.write_error(stdout, None, &e.to_string()).await?;
return Ok(());
}
};
match self.service.call_message(message).await {
Ok(response) => {
let response_json = serde_json::to_string(&response).map_err(|e| {
Error::Transport(format!("Failed to serialize response: {}", e))
})?;
tracing::debug!(output = %response_json, "Sending response");
write_line_to_stdout(stdout, &response_json).await?;
}
Err(e) => {
tracing::error!(error = %e, "Error processing message");
self.write_error(stdout, None, &e.to_string()).await?;
}
}
Ok(())
}
async fn write_error(
&self,
stdout: &mut tokio::io::Stdout,
id: Option<crate::protocol::RequestId>,
message: &str,
) -> Result<()> {
let error_response =
JsonRpcResponse::error(id, crate::error::JsonRpcError::parse_error(message));
let response_json = serde_json::to_string(&error_response)
.map_err(|e| Error::Transport(format!("Failed to serialize error: {}", e)))?;
write_line_to_stdout(stdout, &response_json).await
}
}
pub struct SyncStdioTransport {
service: JsonRpcService<McpRouter>,
router: McpRouter,
}
impl SyncStdioTransport {
pub fn new(router: McpRouter) -> Self {
let service = JsonRpcService::new(router.clone());
Self { service, router }
}
pub fn run_blocking(&mut self) -> Result<()> {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| Error::Transport(format!("Failed to create runtime: {}", e)))?;
let stdin = io::stdin();
let mut stdout = io::stdout();
tracing::info!("Sync stdio transport started");
for line in stdin.lock().lines() {
let line =
line.map_err(|e| Error::Transport(format!("Failed to read from stdin: {}", e)))?;
let trimmed = clean_input_line(&line);
if trimmed.is_empty() {
continue;
}
tracing::debug!(input = %trimmed, "Received message");
match rt.block_on(process_line(&mut self.service, &self.router, trimmed)) {
Ok(Some(response)) => {
let response_json = serde_json::to_string(&response).map_err(|e| {
Error::Transport(format!("Failed to serialize response: {}", e))
})?;
tracing::debug!(output = %response_json, "Sending response");
writeln!(stdout, "{}", response_json).map_err(|e| {
Error::Transport(format!("Failed to write to stdout: {}", e))
})?;
stdout
.flush()
.map_err(|e| Error::Transport(format!("Failed to flush stdout: {}", e)))?;
}
Ok(None) => {
}
Err(e) => {
tracing::error!(error = %e, "Error processing message");
let error_response = JsonRpcResponse::error(
None,
crate::error::JsonRpcError::parse_error(e.to_string()),
);
let response_json = serde_json::to_string(&error_response).map_err(|e| {
Error::Transport(format!("Failed to serialize error: {}", e))
})?;
writeln!(stdout, "{}", response_json)
.map_err(|e| Error::Transport(format!("Failed to write error: {}", e)))?;
stdout
.flush()
.map_err(|e| Error::Transport(format!("Failed to flush stdout: {}", e)))?;
}
}
}
tracing::info!("Stdin closed, shutting down");
Ok(())
}
}
struct PendingRequest {
response_tx: oneshot::Sender<Result<serde_json::Value>>,
}
pub struct BidirectionalStdioTransport {
service: JsonRpcService<McpRouter>,
router: McpRouter,
request_rx: OutgoingRequestReceiver,
client_requester: ClientRequesterHandle,
pending_requests: Arc<Mutex<HashMap<RequestId, PendingRequest>>>,
notification_rx: NotificationReceiver,
}
impl BidirectionalStdioTransport {
pub fn new(router: McpRouter) -> Self {
let (request_tx, request_rx) = outgoing_request_channel(32);
let client_requester: ClientRequesterHandle =
Arc::new(ChannelClientRequester::new(request_tx));
let (notif_tx, notification_rx) = notification_channel(256);
let router = router.with_notification_sender(notif_tx);
let service = JsonRpcService::new(router.clone());
Self {
service,
router,
request_rx,
client_requester,
pending_requests: Arc::new(Mutex::new(HashMap::new())),
notification_rx,
}
}
pub fn client_requester(&self) -> ClientRequesterHandle {
self.client_requester.clone()
}
pub async fn run(&mut self) -> Result<()> {
let stdin = tokio::io::stdin();
let stdout = Arc::new(Mutex::new(tokio::io::stdout()));
let mut reader = BufReader::new(stdin);
tracing::info!("Bidirectional stdio transport started, waiting for input");
loop {
let mut line = String::new();
tokio::select! {
result = reader.read_line(&mut line) => {
let bytes_read = result.map_err(|e| {
Error::Transport(format!("Failed to read from stdin: {}", e))
})?;
if bytes_read == 0 {
tracing::info!("Stdin closed, shutting down");
break;
}
let trimmed = clean_input_line(&line);
if trimmed.is_empty() {
continue;
}
self.handle_incoming_message(trimmed, stdout.clone()).await?;
}
Some(outgoing) = self.request_rx.recv() => {
self.send_outgoing_request(outgoing, stdout.clone()).await?;
}
Some(notification) = self.notification_rx.recv() => {
if let Some(json) = serialize_notification(¬ification) {
tracing::debug!(output = %json, "Sending notification");
self.write_line(&json, stdout.clone()).await?;
}
}
}
}
Ok(())
}
async fn handle_incoming_message(
&mut self,
line: &str,
stdout: Arc<Mutex<tokio::io::Stdout>>,
) -> Result<()> {
tracing::debug!(input = %line, "Received message");
let parsed: serde_json::Value = match serde_json::from_str(line) {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, "Malformed JSON on stdin");
return self.write_parse_error(&e.to_string(), stdout).await;
}
};
if parsed.get("method").is_none()
&& (parsed.get("result").is_some() || parsed.get("error").is_some())
{
return self.handle_response(&parsed).await;
}
if parsed.get("id").is_none() {
if let Ok(notification) = serde_json::from_str::<JsonRpcNotification>(line) {
handle_notification(&self.router, notification)?;
}
return Ok(());
}
let message: JsonRpcMessage = match serde_json::from_str(line) {
Ok(m) => m,
Err(e) => {
tracing::warn!(error = %e, "JSON did not match JSON-RPC request shape");
return self.write_parse_error(&e.to_string(), stdout).await;
}
};
match self.service.call_message(message).await {
Ok(response) => {
let response_json = serde_json::to_string(&response).map_err(|e| {
Error::Transport(format!("Failed to serialize response: {}", e))
})?;
tracing::debug!(output = %response_json, "Sending response");
self.write_line(&response_json, stdout).await?;
}
Err(e) => {
tracing::error!(error = %e, "Error processing message");
let error_response = JsonRpcResponse::error(
None,
crate::error::JsonRpcError::parse_error(e.to_string()),
);
let response_json = serde_json::to_string(&error_response)
.map_err(|e| Error::Transport(format!("Failed to serialize error: {}", e)))?;
self.write_line(&response_json, stdout).await?;
}
}
Ok(())
}
async fn write_parse_error(
&self,
message: &str,
stdout: Arc<Mutex<tokio::io::Stdout>>,
) -> Result<()> {
let error_response =
JsonRpcResponse::error(None, crate::error::JsonRpcError::parse_error(message));
let response_json = serde_json::to_string(&error_response)
.map_err(|e| Error::Transport(format!("Failed to serialize error: {}", e)))?;
self.write_line(&response_json, stdout).await
}
async fn handle_response(&self, parsed: &serde_json::Value) -> Result<()> {
let id = match parsed.get("id") {
Some(id) => {
if let Some(n) = id.as_i64() {
RequestId::Number(n)
} else if let Some(s) = id.as_str() {
RequestId::String(s.to_string())
} else {
tracing::warn!("Response has invalid id type");
return Ok(());
}
}
None => {
tracing::warn!("Response missing id field");
return Ok(());
}
};
let pending = {
let mut pending_requests = self.pending_requests.lock().await;
pending_requests.remove(&id)
};
match pending {
Some(pending) => {
let result = if let Some(error) = parsed.get("error") {
let code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(-1);
let message = error
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("Unknown error");
Err(Error::Internal(format!(
"Client error ({}): {}",
code, message
)))
} else if let Some(result) = parsed.get("result") {
Ok(result.clone())
} else {
Err(Error::Internal(
"Response has neither result nor error".to_string(),
))
};
let _ = pending.response_tx.send(result);
}
None => {
tracing::warn!(id = ?id, "Received response for unknown request");
}
}
Ok(())
}
async fn send_outgoing_request(
&mut self,
outgoing: OutgoingRequest,
stdout: Arc<Mutex<tokio::io::Stdout>>,
) -> Result<()> {
let request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: outgoing.id.clone(),
method: outgoing.method,
params: Some(outgoing.params),
};
let request_json = serde_json::to_string(&request)
.map_err(|e| Error::Transport(format!("Failed to serialize request: {}", e)))?;
tracing::debug!(output = %request_json, "Sending request to client");
{
let mut pending_requests = self.pending_requests.lock().await;
pending_requests.insert(
outgoing.id,
PendingRequest {
response_tx: outgoing.response_tx,
},
);
}
self.write_line(&request_json, stdout).await?;
Ok(())
}
async fn write_line(&self, line: &str, stdout: Arc<Mutex<tokio::io::Stdout>>) -> Result<()> {
let mut stdout = stdout.lock().await;
stdout
.write_all(line.as_bytes())
.await
.map_err(|e| Error::Transport(format!("Failed to write to stdout: {}", e)))?;
stdout
.write_all(b"\n")
.await
.map_err(|e| Error::Transport(format!("Failed to write newline: {}", e)))?;
stdout
.flush()
.await
.map_err(|e| Error::Transport(format!("Failed to flush stdout: {}", e)))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::context::ServerNotification;
use crate::protocol::{
LogLevel, LoggingMessageParams, ProgressParams, ProgressToken, TaskStatus, TaskStatusParams,
};
#[test]
fn test_serialize_progress_notification() {
let notification = ServerNotification::Progress(ProgressParams {
progress_token: ProgressToken::String("tok-1".to_string()),
progress: 50.0,
total: Some(100.0),
message: Some("Halfway there".to_string()),
meta: None,
});
let json = serialize_notification(¬ification).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["jsonrpc"], "2.0");
assert_eq!(parsed["method"], "notifications/progress");
assert_eq!(parsed["params"]["progressToken"], "tok-1");
assert_eq!(parsed["params"]["progress"], 50.0);
assert_eq!(parsed["params"]["total"], 100.0);
assert!(parsed.get("id").is_none());
}
#[test]
fn test_serialize_log_message_notification() {
let notification = ServerNotification::LogMessage(LoggingMessageParams {
level: LogLevel::Warning,
logger: Some("test-logger".to_string()),
data: serde_json::json!("something happened"),
meta: None,
});
let json = serialize_notification(¬ification).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["method"], "notifications/message");
assert_eq!(parsed["params"]["level"], "warning");
assert_eq!(parsed["params"]["logger"], "test-logger");
}
#[test]
fn test_serialize_resource_updated_notification() {
let notification = ServerNotification::ResourceUpdated {
uri: "file:///data.json".to_string(),
};
let json = serialize_notification(¬ification).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["method"], "notifications/resources/updated");
assert_eq!(parsed["params"]["uri"], "file:///data.json");
}
#[test]
fn test_serialize_resources_list_changed_notification() {
let notification = ServerNotification::ResourcesListChanged;
let json = serialize_notification(¬ification).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["method"], "notifications/resources/list_changed");
assert!(parsed.get("params").is_none());
}
#[test]
fn test_serialize_tools_list_changed_notification() {
let notification = ServerNotification::ToolsListChanged;
let json = serialize_notification(¬ification).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["method"], "notifications/tools/list_changed");
}
#[test]
fn test_serialize_prompts_list_changed_notification() {
let notification = ServerNotification::PromptsListChanged;
let json = serialize_notification(¬ification).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["method"], "notifications/prompts/list_changed");
}
#[test]
fn test_serialize_task_status_changed_notification() {
let notification = ServerNotification::TaskStatusChanged(TaskStatusParams {
task_id: "task-42".to_string(),
status: TaskStatus::Working,
status_message: Some("Processing...".to_string()),
created_at: "2025-01-01T00:00:00Z".to_string(),
last_updated_at: "2025-01-01T00:01:00Z".to_string(),
ttl: None,
poll_interval: None,
meta: None,
});
let json = serialize_notification(¬ification).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["method"], "notifications/tasks/status");
assert_eq!(parsed["params"]["taskId"], "task-42");
assert_eq!(parsed["params"]["status"], "working");
}
fn make_router() -> McpRouter {
McpRouter::new().server_info("test-server", "1.0.0")
}
async fn init_service(router: &McpRouter) -> JsonRpcService<McpRouter> {
let mut service = JsonRpcService::new(router.clone());
let init_msg = serde_json::json!({
"jsonrpc": "2.0",
"id": 0,
"method": "initialize",
"params": {
"protocolVersion": "2025-11-25",
"capabilities": {},
"clientInfo": { "name": "test-client", "version": "1.0.0" }
}
});
let msg: JsonRpcMessage = serde_json::from_value(init_msg).unwrap();
let _ = service.call_message(msg).await.unwrap();
let notif_line = r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#;
let notif = serde_json::from_str::<JsonRpcNotification>(notif_line).unwrap();
handle_notification(router, notif).unwrap();
service
}
#[tokio::test]
async fn test_process_line_valid_request() {
let router = make_router();
let mut service = init_service(&router).await;
let line = r#"{"jsonrpc":"2.0","id":1,"method":"ping"}"#;
let result = process_line(&mut service, &router, line).await;
let response = result.unwrap().unwrap();
let json = serde_json::to_value(&response).unwrap();
assert_eq!(json["jsonrpc"], "2.0");
assert_eq!(json["id"], 1);
assert!(json.get("result").is_some());
}
#[tokio::test]
async fn test_process_line_notification_returns_none() {
let router = make_router();
let mut service = init_service(&router).await;
let line = r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#;
let result = process_line(&mut service, &router, line).await;
assert!(result.unwrap().is_none());
}
#[tokio::test]
async fn test_process_line_malformed_json() {
let router = make_router();
let mut service = init_service(&router).await;
let line = r#"not valid json at all"#;
let result = process_line(&mut service, &router, line).await;
assert!(result.is_err());
}
#[test]
fn test_clean_input_line_no_bom() {
assert_eq!(
clean_input_line(r#"{"jsonrpc":"2.0"}"#),
r#"{"jsonrpc":"2.0"}"#
);
}
#[test]
fn test_clean_input_line_strips_leading_bom() {
let with_bom = "\u{feff}{\"jsonrpc\":\"2.0\"}";
assert_eq!(clean_input_line(with_bom), r#"{"jsonrpc":"2.0"}"#);
}
#[test]
fn test_clean_input_line_strips_bom_then_trims() {
let input = "\u{feff} {\"id\":1}\n";
assert_eq!(clean_input_line(input), r#"{"id":1}"#);
}
#[test]
fn test_clean_input_line_does_not_strip_internal_bom() {
let input = "{\"text\":\"hi\u{feff}there\"}";
assert_eq!(clean_input_line(input), input);
}
#[test]
fn test_clean_input_line_empty() {
assert_eq!(clean_input_line(""), "");
assert_eq!(clean_input_line("\u{feff}"), "");
assert_eq!(clean_input_line(" \n\t"), "");
}
#[tokio::test]
async fn test_process_line_with_bom_stripped_input_parses() {
let router = make_router();
let mut service = init_service(&router).await;
let raw = "\u{feff}{\"jsonrpc\":\"2.0\",\"id\":7,\"method\":\"tools/list\",\"params\":{}}";
let cleaned = clean_input_line(raw);
let result = process_line(&mut service, &router, cleaned).await;
let response = result.unwrap().unwrap();
let json = serde_json::to_value(&response).unwrap();
assert_eq!(json["id"], 7);
assert!(json["result"]["tools"].is_array());
}
#[tokio::test]
async fn test_process_line_tools_list() {
let router = make_router();
let mut service = init_service(&router).await;
let line = r#"{"jsonrpc":"2.0","id":2,"method":"tools/list","params":{}}"#;
let result = process_line(&mut service, &router, line).await;
let response = result.unwrap().unwrap();
let json = serde_json::to_value(&response).unwrap();
assert_eq!(json["id"], 2);
assert!(json["result"]["tools"].is_array());
}
#[tokio::test]
async fn test_process_line_unknown_method() {
let router = make_router();
let mut service = init_service(&router).await;
let line = r#"{"jsonrpc":"2.0","id":3,"method":"nonexistent/method"}"#;
let result = process_line(&mut service, &router, line).await;
let response = result.unwrap().unwrap();
let json = serde_json::to_value(&response).unwrap();
assert!(json.get("error").is_some());
assert_eq!(json["error"]["code"], -32601); }
#[test]
fn test_handle_notification_initialized() {
let router = make_router();
let notif_json = r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#;
let notif: JsonRpcNotification = serde_json::from_str(notif_json).unwrap();
let result = handle_notification(&router, notif);
assert!(result.is_ok());
}
#[test]
fn test_handle_notification_cancelled() {
let router = make_router();
let notif_json = r#"{"jsonrpc":"2.0","method":"notifications/cancelled","params":{"requestId":1,"reason":"timeout"}}"#;
let notif: JsonRpcNotification = serde_json::from_str(notif_json).unwrap();
let result = handle_notification(&router, notif);
assert!(result.is_ok());
}
}