mcp_context_server/
server.rs1use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
2
3use crate::config::ServerConfig;
4use crate::handlers;
5use crate::protocol::{JsonRpcError, JsonRpcRequest, JsonRpcResponse};
6
7const MAX_MESSAGE_BYTES: usize = 1024 * 1024;
9
10pub struct McpServer {
12 config: ServerConfig,
13 initialized: bool,
14}
15
16impl McpServer {
17 pub fn new(config: ServerConfig) -> Self {
18 Self {
19 config,
20 initialized: false,
21 }
22 }
23
24 pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
25 let stdin = tokio::io::stdin();
26 let mut stdout = tokio::io::stdout();
27 let mut reader = BufReader::new(stdin);
28 let mut raw = Vec::new();
29
30 loop {
31 raw.clear();
32 let n = reader.read_until(b'\n', &mut raw).await?;
33 if n == 0 {
34 break;
35 }
36
37 if n > MAX_MESSAGE_BYTES {
38 eprintln!("Message too large: {n} bytes (limit {MAX_MESSAGE_BYTES})");
39 write_response(
40 &mut stdout,
41 &JsonRpcResponse::error(None, JsonRpcError::parse_error()),
42 ).await?;
43 continue;
44 }
45
46 let trimmed = match std::str::from_utf8(&raw) {
47 Ok(s) => s.trim(),
48 Err(_) => {
49 write_response(
50 &mut stdout,
51 &JsonRpcResponse::error(None, JsonRpcError::parse_error()),
52 ).await?;
53 continue;
54 }
55 };
56
57 if trimmed.is_empty() {
58 continue;
59 }
60
61 let req: JsonRpcRequest = match serde_json::from_str(trimmed) {
62 Ok(r) => r,
63 Err(e) => {
64 eprintln!("Parse error: {e}");
65 write_response(
66 &mut stdout,
67 &JsonRpcResponse::error(None, JsonRpcError::parse_error()),
68 ).await?;
69 continue;
70 }
71 };
72
73 if req.jsonrpc != "2.0" {
75 write_response(
76 &mut stdout,
77 &JsonRpcResponse::error(req.id.clone(), JsonRpcError::invalid_request()),
78 ).await?;
79 continue;
80 }
81
82 if !self.initialized && req.method != "initialize" {
84 if req.id.is_none() {
85 continue;
86 }
87 write_response(
88 &mut stdout,
89 &JsonRpcResponse::error(
90 req.id.clone(),
91 JsonRpcError::invalid_request_with("Server not initialized"),
92 ),
93 ).await?;
94 continue;
95 }
96
97 if let Some(resp) = handlers::dispatch(&req, &self.config).await {
98 write_response(&mut stdout, &resp).await?;
99 }
100
101 if req.method == "initialize" {
102 self.initialized = true;
103 }
104 }
105
106 Ok(())
107 }
108}
109
110async fn write_response(
111 stdout: &mut tokio::io::Stdout,
112 resp: &JsonRpcResponse,
113) -> Result<(), Box<dyn std::error::Error>> {
114 let out = serde_json::to_string(resp)?;
115 stdout.write_all(out.as_bytes()).await?;
116 stdout.write_all(b"\n").await?;
117 stdout.flush().await?;
118 Ok(())
119}