Skip to main content

mcp_context_server/
server.rs

1use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
2
3use crate::config::ServerConfig;
4use crate::handlers;
5use crate::protocol::{JsonRpcError, JsonRpcRequest, JsonRpcResponse};
6
7/// Maximum bytes per JSON-RPC message (1 MiB).
8const MAX_MESSAGE_BYTES: usize = 1024 * 1024;
9
10/// MCP server that communicates over stdio using newline-delimited JSON-RPC 2.0.
11pub struct McpServer {
12    config: ServerConfig,
13    initialized: bool,
14}
15
16impl McpServer {
17    pub fn new(config: ServerConfig) -> Self {
18        Self {
19            config,
20            initialized: false,
21        }
22    }
23
24    pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
25        let stdin = tokio::io::stdin();
26        let mut stdout = tokio::io::stdout();
27        let mut reader = BufReader::new(stdin);
28        let mut raw = Vec::new();
29
30        loop {
31            raw.clear();
32            let n = reader.read_until(b'\n', &mut raw).await?;
33            if n == 0 {
34                break;
35            }
36
37            if n > MAX_MESSAGE_BYTES {
38                eprintln!("Message too large: {n} bytes (limit {MAX_MESSAGE_BYTES})");
39                write_response(
40                    &mut stdout,
41                    &JsonRpcResponse::error(None, JsonRpcError::parse_error()),
42                ).await?;
43                continue;
44            }
45
46            let trimmed = match std::str::from_utf8(&raw) {
47                Ok(s) => s.trim(),
48                Err(_) => {
49                    write_response(
50                        &mut stdout,
51                        &JsonRpcResponse::error(None, JsonRpcError::parse_error()),
52                    ).await?;
53                    continue;
54                }
55            };
56
57            if trimmed.is_empty() {
58                continue;
59            }
60
61            let req: JsonRpcRequest = match serde_json::from_str(trimmed) {
62                Ok(r) => r,
63                Err(e) => {
64                    eprintln!("Parse error: {e}");
65                    write_response(
66                        &mut stdout,
67                        &JsonRpcResponse::error(None, JsonRpcError::parse_error()),
68                    ).await?;
69                    continue;
70                }
71            };
72
73            // Validate jsonrpc version
74            if req.jsonrpc != "2.0" {
75                write_response(
76                    &mut stdout,
77                    &JsonRpcResponse::error(req.id.clone(), JsonRpcError::invalid_request()),
78                ).await?;
79                continue;
80            }
81
82            // Initialization gate: only `initialize` is allowed before handshake completes
83            if !self.initialized && req.method != "initialize" {
84                if req.id.is_none() {
85                    continue;
86                }
87                write_response(
88                    &mut stdout,
89                    &JsonRpcResponse::error(
90                        req.id.clone(),
91                        JsonRpcError::invalid_request_with("Server not initialized"),
92                    ),
93                ).await?;
94                continue;
95            }
96
97            if let Some(resp) = handlers::dispatch(&req, &self.config).await {
98                write_response(&mut stdout, &resp).await?;
99            }
100
101            if req.method == "initialize" {
102                self.initialized = true;
103            }
104        }
105
106        Ok(())
107    }
108}
109
110async fn write_response(
111    stdout: &mut tokio::io::Stdout,
112    resp: &JsonRpcResponse,
113) -> Result<(), Box<dyn std::error::Error>> {
114    let out = serde_json::to_string(resp)?;
115    stdout.write_all(out.as_bytes()).await?;
116    stdout.write_all(b"\n").await?;
117    stdout.flush().await?;
118    Ok(())
119}