1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
use std::{
collections::HashMap,
net::SocketAddr,
sync::{
Arc, RwLock,
atomic::{AtomicUsize, Ordering},
},
};
use futures::{SinkExt, StreamExt};
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpListener, ToSocketAddrs},
runtime::Builder,
sync::{Mutex, mpsc},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::{
connection::ServerHandler,
context::ServerCtx,
error::{Error, Result},
http::HttpServerTransport,
jsonrpc::create_jsonrpc_notification,
schema::{self, *},
transport::{GenericDuplex, StdioTransport, StreamTransport, Transport},
};
/// MCP Server implementation
pub struct Server<F> {
/// Factory for creating per-connection handlers.
connection_factory: F,
}
impl Server<()> {
/// Create a new server with a handler factory.
///
/// The factory function is called once for each incoming connection,
/// allowing each connection to have its own handler instance with
/// independent state.
///
/// Server capabilities are specified by returning them from the handler's
/// [`ServerHandler::initialize`] method. This makes the handler the single
/// source of truth for what the server advertises to clients.
///
/// # Example
///
/// ```ignore
/// use tmcp::{Server, ServerHandler, ServerCtx, Result};
/// use tmcp::schema::{ClientCapabilities, Implementation, InitializeResult};
///
/// struct MyHandler;
///
/// #[async_trait::async_trait]
/// impl ServerHandler for MyHandler {
/// async fn initialize(
/// &self,
/// _ctx: &ServerCtx,
/// _protocol_version: String,
/// _capabilities: ClientCapabilities,
/// _client_info: Implementation,
/// ) -> Result<InitializeResult> {
/// // Specify server capabilities here
/// Ok(InitializeResult::new("my-server")
/// .with_version("1.0.0")
/// .with_tools(true) // Enable tools capability
/// .with_resources(true, true) // Enable resources with subscribe and list_changed
/// .with_prompts(true) // Enable prompts capability
/// .with_logging() // Enable logging capability
/// .with_instructions("A helpful MCP server"))
/// }
/// }
///
/// let server = Server::new(|| MyHandler);
/// server.serve_stdio().await?;
/// ```
pub fn new<C, G>(
factory: G,
) -> Server<impl Fn() -> Box<dyn ServerHandler> + Clone + Send + Sync + 'static>
where
C: ServerHandler + 'static,
G: Fn() -> C + Clone + Send + Sync + 'static,
{
Server {
connection_factory: move || Box::new(factory()) as Box<dyn ServerHandler>,
}
}
}
impl<F> Server<F>
where
F: Fn() -> Box<dyn ServerHandler> + Send + Sync + 'static,
{
/// Create a server from a pre-boxed handler factory.
///
/// This is for internal use when the factory already returns `Box<dyn ServerHandler>`.
pub(crate) fn from_factory(factory: F) -> Self {
Self {
connection_factory: factory,
}
}
/// Serve a single connection using the provided transport
/// This is a convenience method that starts the server and waits for completion
pub(crate) async fn serve(self, transport: Box<dyn Transport>) -> Result<()> {
let handle = ServerHandle::new(self, transport).await?;
handle
.handle
.await
.map_err(|e| Error::InternalError(format!("Server task failed: {e}")))
}
/// Serve connections from stdin/stdout.
///
/// This is a convenience method for the common stdio use case.
///
/// `stdout` is reserved for JSON-RPC traffic while this server is running. Do not print human
/// logs to `stdout` or install a tracing/logging subscriber that writes there; route
/// diagnostics to `stderr`, a file, or another sink instead.
pub async fn serve_stdio(self) -> Result<()> {
let transport = Box::new(StdioTransport);
self.serve(transport).await
}
/// Serve connections from stdin/stdout using an internal Tokio runtime.
///
/// This is a convenience for binaries that aren't already running within a Tokio runtime.
pub fn serve_stdio_blocking(self) -> Result<()> {
let rt = Builder::new_multi_thread().enable_all().build()?;
rt.block_on(self.serve_stdio())
}
/// Serve using generic AsyncRead and AsyncWrite streams
/// This is a convenience method that creates a StreamTransport from the provided streams
pub async fn serve_stream<R, W>(self, reader: R, writer: W) -> Result<()>
where
R: AsyncRead + Send + Sync + Unpin + 'static,
W: AsyncWrite + Send + Sync + Unpin + 'static,
{
let duplex = GenericDuplex::new(reader, writer);
let transport = Box::new(StreamTransport::new(duplex));
self.serve(transport).await
}
/// Serve TCP connections by accepting them in a loop
///
/// Returns a [`TcpServerHandle`] that can be used to stop accepting new connections.
/// Existing connections will continue until they complete or their clients disconnect.
pub async fn serve_tcp(self, addr: impl ToSocketAddrs) -> Result<TcpServerHandle>
where
F: Clone,
{
let listener = TcpListener::bind(addr).await?;
let bound_addr = listener.local_addr()?;
info!("MCP server listening on {}", bound_addr);
// Convert connection factory to Arc for sharing across tasks
let connection_factory = Arc::new(self.connection_factory);
// Create shutdown token for coordinating shutdown
let shutdown_token = CancellationToken::new();
let shutdown_token_loop = shutdown_token.clone();
// Spawn the accept loop
let handle = tokio::spawn(async move {
loop {
tokio::select! {
// Check for shutdown signal
_ = shutdown_token_loop.cancelled() => {
info!("TCP server shutting down");
break;
}
// Accept new connections
result = listener.accept() => {
match result {
Ok((stream, peer_addr)) => {
info!("New connection from {}", peer_addr);
// Clone Arc reference for the spawned task
let factory = connection_factory.clone();
// Handle each connection in a separate task
tokio::spawn(async move {
// Create a new server with cloned factory
let server = Self {
connection_factory: factory.as_ref().clone(),
};
let transport = Box::new(StreamTransport::new(stream));
match server.serve(transport).await {
Ok(()) => info!("Connection from {} closed", peer_addr),
Err(e) => error!("Error handling connection from {}: {}", peer_addr, e),
}
});
}
Err(e) => {
error!("Failed to accept connection: {}", e);
}
}
}
}
}
});
Ok(TcpServerHandle {
handle,
shutdown_token,
bound_addr,
})
}
/// Serve HTTP connections
/// This is a convenience method for the common HTTP server use case
/// Returns a ServerHandle that can be used to stop the server
pub async fn serve_http(self, addr: impl AsRef<str>) -> Result<ServerHandle> {
let mut http_transport = HttpServerTransport::new(addr.as_ref());
http_transport.start().await?;
let bound_addr = http_transport.bind_addr.clone();
let mut handle = ServerHandle::from_transport(self, Box::new(http_transport)).await?;
handle.bound_addr = Some(bound_addr);
Ok(handle)
}
}
/// Handle for controlling a running MCP server instance
pub struct ServerHandle {
/// Join handle for the server task.
pub handle: JoinHandle<()>,
/// Sender for outbound server notifications.
notification_tx: mpsc::UnboundedSender<ServerNotification>,
/// Token used to signal shutdown to the server loop.
shutdown_token: CancellationToken,
/// The actual bound address (for servers that bind to a network port)
pub bound_addr: Option<String>,
/// Capabilities from the handler's initialize response.
/// This is set when the client initializes and is used to gate notifications.
capabilities: Arc<RwLock<ServerCapabilities>>,
}
impl ServerHandle {
/// Start serving connections using the provided transport, returning a handle for runtime operations
pub(crate) async fn new<F>(server: Server<F>, mut transport: Box<dyn Transport>) -> Result<Self>
where
F: Fn() -> Box<dyn ServerHandler> + Send + Sync + 'static,
{
transport.connect().await?;
let remote_addr = transport.remote_addr();
let stream = transport.framed()?;
let (sink_tx, mut stream_rx) = stream.split();
info!("MCP server started");
let (notification_tx, mut notification_rx) = mpsc::unbounded_channel();
// Channel for queueing responses to be sent
let (response_tx, mut response_rx) = mpsc::unbounded_channel::<JSONRPCMessage>();
// Wrap the sink in an Arc<Mutex> for sharing
let sink_tx = Arc::new(Mutex::new(sink_tx));
// Clone notification_tx for the handle
let notification_tx_handle = notification_tx.clone();
// Create connection instance wrapped in Arc for shared access
let connection = Arc::new((server.connection_factory)());
// Create a single ServerCtx instance that will be used throughout the connection
let server_ctx = ServerCtx::new(notification_tx, Some(sink_tx.clone()));
// Create shutdown token for coordinating shutdown
let shutdown_token = CancellationToken::new();
let shutdown_token_task = shutdown_token.clone();
// Shared capabilities - updated when client initializes
let capabilities = Arc::new(RwLock::new(ServerCapabilities::default()));
let capabilities_task = capabilities.clone();
// Track whether we've called on_connect after initialization
let mut initialized = false;
let mut client_disconnected = false;
let in_flight_requests = Arc::new(AtomicUsize::new(0));
// Start the main server loop in a background task
let handle = tokio::spawn(async move {
loop {
tokio::select! {
// Check for shutdown signal
_ = shutdown_token_task.cancelled() => {
info!("Server received shutdown signal");
break;
}
// Handle incoming messages from client
result = stream_rx.next(), if !client_disconnected => {
match result {
Some(Ok(message)) => {
match message {
JSONRPCMessage::Request(request)
if !initialized && request.request.method == "initialize" =>
{
// Handle initialize specially to capture capabilities
let (response, init_caps) =
handle_initialize_request(
connection.as_ref().as_ref(),
request,
&server_ctx
).await;
let should_connect = init_caps.is_some();
// Store capabilities from the handler's response
if let Some(caps) = init_caps
&& let Ok(mut guard) = capabilities_task.write() {
*guard = caps;
}
{
let mut sink = sink_tx.lock().await;
if let Err(e) = sink.send(response).await {
error!("Error sending initialize response: {}", e);
break;
}
}
if should_connect {
if let Err(e) = connection.on_connect(&server_ctx, &remote_addr).await {
error!("Error during on_connect: {}", e);
break;
}
initialized = true;
}
}
JSONRPCMessage::Response(response) => {
let response_id = match &response {
JSONRPCResponse::Result(result) => {
Some(result.id.clone())
}
JSONRPCResponse::Error(error) => error.id.clone(),
};
tracing::info!(
"Server received response from client: {:?}",
response_id
);
server_ctx.handle_client_response(response).await;
}
other => {
if let Err(e) = handle_message_with_connection(
connection.clone(),
other,
response_tx.clone(),
in_flight_requests.clone(),
&server_ctx,
)
.await
{
error!("Error handling message: {}", e);
}
}
}
}
Some(Err(e)) => {
error!("Error reading message: {}", e);
break;
}
None => {
info!("Client disconnected");
client_disconnected = true;
}
}
}
// Forward internal notifications to client
Some(notification) = notification_rx.recv() => {
let jsonrpc_notification = create_jsonrpc_notification(¬ification);
{
let mut sink = sink_tx.lock().await;
if let Err(e) = sink.send(JSONRPCMessage::Notification(jsonrpc_notification)).await {
error!("Error sending notification to client: {}", e);
break;
}
}
}
// Send queued responses to client
Some(response) = response_rx.recv() => {
let mut sink = sink_tx.lock().await;
if let Err(e) = sink.send(response).await {
error!("Error sending response to client: {}", e);
break;
}
}
}
if client_disconnected
&& in_flight_requests.load(Ordering::SeqCst) == 0
&& response_rx.is_empty()
{
break;
}
}
// Clean up connection
if let Err(e) = connection.on_shutdown().await {
error!("Error during server shutdown: {}", e);
}
info!("MCP server stopped");
});
Ok(Self {
handle,
notification_tx: notification_tx_handle,
shutdown_token,
bound_addr: None,
capabilities,
})
}
/// Create a ServerHandle using generic AsyncRead and AsyncWrite streams
/// This is a convenience method that creates a StreamTransport from the provided streams
pub async fn from_stream<F, R, W>(server: Server<F>, reader: R, writer: W) -> Result<Self>
where
F: Fn() -> Box<dyn ServerHandler> + Send + Sync + 'static,
R: AsyncRead + Send + Sync + Unpin + 'static,
W: AsyncWrite + Send + Sync + Unpin + 'static,
{
let duplex = GenericDuplex::new(reader, writer);
let transport = Box::new(StreamTransport::new(duplex));
Self::new(server, transport).await
}
/// Create a ServerHandle from a transport
/// This allows using any transport implementation
pub async fn from_transport<F>(server: Server<F>, transport: Box<dyn Transport>) -> Result<Self>
where
F: Fn() -> Box<dyn ServerHandler> + Send + Sync + 'static,
{
Self::new(server, transport).await
}
/// Stop the server and wait for the background task to finish.
pub async fn stop(self) -> Result<()> {
// Signal shutdown
self.shutdown_token.cancel();
// Wait for the server task to complete
self.handle
.await
.map_err(|e| Error::InternalError(format!("Server task failed: {e}")))?;
Ok(())
}
/// Send a server notification to connected clients.
pub fn send_server_notification(&self, notification: &ServerNotification) {
if !self.can_forward_notification(notification) {
debug!(
"Skipping server notification {:?} due to missing capability",
notification
);
return;
}
if let Err(e) = self.notification_tx.send(notification.clone()) {
error!(
"Failed to send server notification {:?}: {}",
notification, e
);
}
}
/// Check whether a notification is supported by the configured capabilities.
fn can_forward_notification(&self, notification: &ServerNotification) -> bool {
let caps = self.capabilities.read().unwrap_or_else(|e| e.into_inner());
match notification {
ServerNotification::LoggingMessage { .. } => caps.logging.is_some(),
ServerNotification::ResourceUpdated { .. } => caps
.resources
.as_ref()
.and_then(|c| c.subscribe)
.unwrap_or(false),
ServerNotification::ResourceListChanged { .. } => caps
.resources
.as_ref()
.and_then(|c| c.list_changed)
.unwrap_or(false),
ServerNotification::ToolListChanged { .. } => caps
.tools
.as_ref()
.and_then(|c| c.list_changed)
.unwrap_or(false),
ServerNotification::PromptListChanged { .. } => caps
.prompts
.as_ref()
.and_then(|c| c.list_changed)
.unwrap_or(false),
ServerNotification::ElicitationComplete { .. } => true,
ServerNotification::TaskStatus { .. } => caps.tasks.is_some(),
ServerNotification::Progress { .. } | ServerNotification::Cancelled { .. } => true,
}
}
}
/// Handle for controlling a running TCP MCP server
///
/// Unlike [`ServerHandle`] which manages a single connection, `TcpServerHandle`
/// manages an accept loop that spawns handlers for multiple connections.
pub struct TcpServerHandle {
/// Join handle for the accept loop task.
handle: JoinHandle<()>,
/// Token used to signal shutdown to the accept loop.
shutdown_token: CancellationToken,
/// The actual bound address.
pub bound_addr: SocketAddr,
}
impl TcpServerHandle {
/// Stop accepting new connections and wait for the accept loop to finish.
///
/// Note: This stops accepting new connections but does not terminate
/// existing connections - they will continue until they complete or
/// their clients disconnect.
pub async fn stop(self) -> Result<()> {
// Signal shutdown
self.shutdown_token.cancel();
// Wait for the accept loop to complete
self.handle
.await
.map_err(|e| Error::InternalError(format!("TCP accept loop failed: {e}")))?;
Ok(())
}
}
/// Handle a message using the Connection trait
async fn handle_message_with_connection(
connection: Arc<Box<dyn ServerHandler>>,
message: JSONRPCMessage,
response_tx: mpsc::UnboundedSender<JSONRPCMessage>,
in_flight_requests: Arc<AtomicUsize>,
context: &ServerCtx,
) -> Result<()> {
if let JSONRPCMessage::Notification(notification) = message {
handle_notification(&**connection, notification, context).await?;
return Ok(());
}
handle_message_without_await(
&connection,
message,
response_tx,
in_flight_requests,
context,
);
Ok(())
}
/// Handle messages that do not require awaiting on the connection.
#[allow(clippy::cognitive_complexity)]
fn handle_message_without_await(
connection: &Arc<Box<dyn ServerHandler>>,
message: JSONRPCMessage,
response_tx: mpsc::UnboundedSender<JSONRPCMessage>,
in_flight_requests: Arc<AtomicUsize>,
context: &ServerCtx,
) {
if let JSONRPCMessage::Request(request) = message {
in_flight_requests.fetch_add(1, Ordering::SeqCst);
spawn_request_handler(
connection,
request,
response_tx,
in_flight_requests,
context,
);
return;
}
if let JSONRPCMessage::Response(_) = message {
// Response handling is done in the main message loop
debug!("Response handling delegated to main loop");
}
}
/// Spawn a task to handle a request and send its response.
fn spawn_request_handler(
connection: &Arc<Box<dyn ServerHandler>>,
request: JSONRPCRequest,
response_tx: mpsc::UnboundedSender<JSONRPCMessage>,
in_flight_requests: Arc<AtomicUsize>,
context: &ServerCtx,
) {
let conn = connection.clone();
let ctx = context.clone();
let tx = response_tx;
tokio::spawn(async move {
let response_message = handle_request(&**conn, request.clone(), &ctx).await;
tracing::info!("Server sending response: {:?}", response_message);
if let Err(e) = tx.send(response_message) {
error!("Failed to queue response: {}", e);
}
in_flight_requests.fetch_sub(1, Ordering::SeqCst);
});
}
/// Handle an initialize request specially, returning both the response and capabilities.
///
/// This is needed so the `ServerHandle` can capture the capabilities from the handler's
/// response rather than from a separate configuration.
async fn handle_initialize_request(
connection: &dyn ServerHandler,
request: JSONRPCRequest,
context: &ServerCtx,
) -> (JSONRPCMessage, Option<ServerCapabilities>) {
let ctx_with_request = context.with_request_id(request.id.clone());
// Parse the initialize parameters
let params = match &request.request.params {
Some(params) => params,
None => {
return (
JSONRPCMessage::Response(JSONRPCResponse::Error(JSONRPCErrorResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: Some(request.id),
error: ErrorObject {
code: INVALID_PARAMS,
message: "Missing initialize parameters".to_string(),
data: None,
},
})),
None,
);
}
};
// Extract initialize parameters
let protocol_version = params
.other
.get("protocolVersion")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let capabilities: ClientCapabilities = params
.other
.get("capabilities")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let client_info: Implementation = params
.other
.get("clientInfo")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_else(|| Implementation::new("unknown", "0.0.0"));
// Call the handler's initialize method
match connection
.initialize(
&ctx_with_request,
protocol_version,
capabilities,
client_info,
)
.await
{
Ok(result) => {
// Capture the capabilities from the result
let caps = result.capabilities.clone();
// Serialize the result
match serde_json::to_value(&result) {
Ok(value) => {
let response =
JSONRPCMessage::Response(JSONRPCResponse::Result(JSONRPCResultResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: schema::JSONRpcResult {
_meta: None,
other: if let Some(obj) = value.as_object() {
obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else {
HashMap::new()
},
},
}));
(response, Some(caps))
}
Err(e) => (
JSONRPCMessage::Response(JSONRPCResponse::Error(JSONRPCErrorResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: Some(request.id),
error: ErrorObject {
code: INTERNAL_ERROR,
message: format!("Failed to serialize initialize result: {e}"),
data: None,
},
})),
None,
),
}
}
Err(e) => {
let response = if let Some(jsonrpc_error) = e.to_jsonrpc_response(request.id.clone()) {
JSONRPCMessage::Response(JSONRPCResponse::Error(jsonrpc_error))
} else {
JSONRPCMessage::Response(JSONRPCResponse::Error(JSONRPCErrorResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: Some(request.id),
error: ErrorObject {
code: INTERNAL_ERROR,
message: e.to_string(),
data: None,
},
}))
};
(response, None)
}
}
}
/// Handle a request using the Connection trait and convert result to JSONRPCMessage
async fn handle_request(
connection: &dyn ServerHandler,
request: JSONRPCRequest,
context: &ServerCtx,
) -> JSONRPCMessage {
tracing::info!(
"Server handling request: {:?} method: {}",
request.id,
request.request.method
);
// Create a context with the request ID
let ctx_with_request = context.with_request_id(request.id.clone());
let result = handle_request_inner(connection, request.clone(), &ctx_with_request).await;
match result {
Ok(value) => {
// Create a successful response
JSONRPCMessage::Response(JSONRPCResponse::Result(JSONRPCResultResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: schema::JSONRpcResult {
_meta: None,
other: if let Some(obj) = value.as_object() {
obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else {
let mut map = HashMap::new();
map.insert("result".to_string(), value);
map
},
},
}))
}
Err(e) => {
// Check if error has a specific JSONRPC response
if let Some(jsonrpc_error) = e.to_jsonrpc_response(request.id.clone()) {
JSONRPCMessage::Response(JSONRPCResponse::Error(jsonrpc_error))
} else {
// For all other errors, use INTERNAL_ERROR
JSONRPCMessage::Response(JSONRPCResponse::Error(JSONRPCErrorResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: Some(request.id),
error: ErrorObject {
code: INTERNAL_ERROR,
message: e.to_string(),
data: None,
},
}))
}
}
}
}
/// Inner handler that returns Result<serde_json::Value>
async fn handle_request_inner(
conn: &dyn ServerHandler,
request: JSONRPCRequest,
ctx: &ServerCtx,
) -> Result<serde_json::Value> {
let JSONRPCRequest {
request: Request { method, params },
..
} = request;
let client_request = parse_client_request(method, params)?;
conn.handle_request(ctx, client_request).await
}
/// Parse a client request from the JSON-RPC method and params payload.
fn parse_client_request(method: String, params: Option<RequestParams>) -> Result<ClientRequest> {
let mut request_obj = serde_json::Map::new();
request_obj.insert(
"method".to_string(),
serde_json::Value::String(method.clone()),
);
if let Some(params) = params {
if let Some(meta) = params._meta {
request_obj.insert("_meta".to_string(), serde_json::to_value(meta)?);
}
for (key, value) in params.other {
request_obj.insert(key, value);
}
}
match serde_json::from_value::<ClientRequest>(serde_json::Value::Object(request_obj)) {
Ok(req) => Ok(req),
Err(err) => {
let err_str = err.to_string();
if err_str.contains("unknown variant") {
Err(Error::MethodNotFound(method))
} else {
Err(Error::InvalidParams(format!(
"Invalid parameters for {}: {}",
method, err
)))
}
}
}
}
/// Handle a notification using the Connection trait
async fn handle_notification(
connection: &dyn ServerHandler,
notification: JSONRPCNotification,
context: &ServerCtx,
) -> Result<()> {
debug!(
"Received notification: {}",
notification.notification.method
);
// Build a value that matches the shape expected by ClientNotification.
let mut object = serde_json::Map::new();
object.insert(
"method".to_string(),
serde_json::Value::String(notification.notification.method.clone()),
);
if let Some(params) = notification.notification.params {
if let Some(meta) = params._meta {
object.insert("_meta".to_string(), serde_json::to_value(meta)?);
}
for (k, v) in params.other {
object.insert(k, v);
}
}
let value = serde_json::Value::Object(object);
match serde_json::from_value::<ClientNotification>(value) {
Ok(typed) => connection.notification(context, typed).await,
Err(e) => {
warn!("Failed to deserialize client notification: {}", e);
Ok(())
}
}
}