1use futures::{FutureExt, future::BoxFuture};
2use thiserror::Error;
3
4use crate::{
5 error::Error as McpError,
6 model::{
7 CancelledNotification, CancelledNotificationParam, Extensions, GetExtensions, GetMeta,
8 JsonRpcBatchRequestItem, JsonRpcBatchResponseItem, JsonRpcError, JsonRpcMessage,
9 JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Meta, NumberOrString, ProgressToken,
10 RequestId, ServerJsonRpcMessage,
11 },
12 transport::{IntoTransport, Transport},
13};
14#[cfg(feature = "client")]
15#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
16mod client;
17#[cfg(feature = "client")]
18#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
19pub use client::*;
20#[cfg(feature = "server")]
21#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
22mod server;
23#[cfg(feature = "server")]
24#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
25pub use server::*;
26#[cfg(feature = "tower")]
27#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
28mod tower;
29use tokio_util::sync::{CancellationToken, DropGuard};
30#[cfg(feature = "tower")]
31#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
32pub use tower::*;
33use tracing::instrument;
34#[derive(Error, Debug)]
35#[non_exhaustive]
36pub enum ServiceError {
37 #[error("Mcp error: {0}")]
38 McpError(McpError),
39 #[error("Transport send error: {0}")]
40 TransportSend(Box<dyn std::error::Error + Send + Sync>),
41 #[error("Transport closed")]
42 TransportClosed,
43 #[error("Unexpected response type")]
44 UnexpectedResponse,
45 #[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
46 Cancelled { reason: Option<String> },
47 #[error("request timeout after {}", chrono::Duration::from_std(*timeout).unwrap_or_default())]
48 Timeout { timeout: Duration },
49}
50
51impl ServiceError {}
52trait TransferObject:
53 std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
54{
55}
56
57impl<T> TransferObject for T where
58 T: std::fmt::Debug
59 + serde::Serialize
60 + serde::de::DeserializeOwned
61 + Send
62 + Sync
63 + 'static
64 + Clone
65{
66}
67
68#[allow(private_bounds, reason = "there's no the third implementation")]
69pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
70 type Req: TransferObject + GetMeta + GetExtensions;
71 type Resp: TransferObject;
72 type Not: TryInto<CancelledNotification, Error = Self::Not>
73 + From<CancelledNotification>
74 + TransferObject;
75 type PeerReq: TransferObject + GetMeta + GetExtensions;
76 type PeerResp: TransferObject;
77 type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
78 + From<CancelledNotification>
79 + TransferObject
80 + GetMeta
81 + GetExtensions;
82 type InitializeError<E>;
83 const IS_CLIENT: bool;
84 type Info: TransferObject;
85 type PeerInfo: TransferObject;
86}
87
88pub type TxJsonRpcMessage<R> =
89 JsonRpcMessage<<R as ServiceRole>::Req, <R as ServiceRole>::Resp, <R as ServiceRole>::Not>;
90pub type RxJsonRpcMessage<R> = JsonRpcMessage<
91 <R as ServiceRole>::PeerReq,
92 <R as ServiceRole>::PeerResp,
93 <R as ServiceRole>::PeerNot,
94>;
95
96pub trait Service<R: ServiceRole>: Send + Sync + 'static {
97 fn handle_request(
98 &self,
99 request: R::PeerReq,
100 context: RequestContext<R>,
101 ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_;
102 fn handle_notification(
103 &self,
104 notification: R::PeerNot,
105 context: NotificationContext<R>,
106 ) -> impl Future<Output = Result<(), McpError>> + Send + '_;
107 fn get_info(&self) -> R::Info;
108}
109
110pub trait ServiceExt<R: ServiceRole>: Service<R> + Sized {
111 fn into_dyn(self) -> Box<dyn DynService<R>> {
115 Box::new(self)
116 }
117 fn serve<T, E, A>(
118 self,
119 transport: T,
120 ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError<E>>> + Send
121 where
122 T: IntoTransport<R, E, A>,
123 E: std::error::Error + From<std::io::Error> + Send + Sync + 'static,
124 Self: Sized,
125 {
126 Self::serve_with_ct(self, transport, Default::default())
127 }
128 fn serve_with_ct<T, E, A>(
129 self,
130 transport: T,
131 ct: CancellationToken,
132 ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError<E>>> + Send
133 where
134 T: IntoTransport<R, E, A>,
135 E: std::error::Error + From<std::io::Error> + Send + Sync + 'static,
136 Self: Sized;
137}
138
139impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
140 fn handle_request(
141 &self,
142 request: R::PeerReq,
143 context: RequestContext<R>,
144 ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_ {
145 DynService::handle_request(self.as_ref(), request, context)
146 }
147
148 fn handle_notification(
149 &self,
150 notification: R::PeerNot,
151 context: NotificationContext<R>,
152 ) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
153 DynService::handle_notification(self.as_ref(), notification, context)
154 }
155
156 fn get_info(&self) -> R::Info {
157 DynService::get_info(self.as_ref())
158 }
159}
160
161pub trait DynService<R: ServiceRole>: Send + Sync {
162 fn handle_request(
163 &self,
164 request: R::PeerReq,
165 context: RequestContext<R>,
166 ) -> BoxFuture<Result<R::Resp, McpError>>;
167 fn handle_notification(
168 &self,
169 notification: R::PeerNot,
170 context: NotificationContext<R>,
171 ) -> BoxFuture<Result<(), McpError>>;
172 fn get_info(&self) -> R::Info;
173}
174
175impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
176 fn handle_request(
177 &self,
178 request: R::PeerReq,
179 context: RequestContext<R>,
180 ) -> BoxFuture<Result<R::Resp, McpError>> {
181 Box::pin(self.handle_request(request, context))
182 }
183 fn handle_notification(
184 &self,
185 notification: R::PeerNot,
186 context: NotificationContext<R>,
187 ) -> BoxFuture<Result<(), McpError>> {
188 Box::pin(self.handle_notification(notification, context))
189 }
190 fn get_info(&self) -> R::Info {
191 self.get_info()
192 }
193}
194
195use std::{
196 collections::{HashMap, VecDeque},
197 ops::Deref,
198 sync::{Arc, atomic::AtomicU32},
199 time::Duration,
200};
201
202use tokio::sync::mpsc;
203
204pub trait RequestIdProvider: Send + Sync + 'static {
205 fn next_request_id(&self) -> RequestId;
206}
207
208pub trait ProgressTokenProvider: Send + Sync + 'static {
209 fn next_progress_token(&self) -> ProgressToken;
210}
211
212pub type AtomicU32RequestIdProvider = AtomicU32Provider;
213pub type AtomicU32ProgressTokenProvider = AtomicU32Provider;
214
215#[derive(Debug, Default)]
216pub struct AtomicU32Provider {
217 id: AtomicU32,
218}
219
220impl RequestIdProvider for AtomicU32Provider {
221 fn next_request_id(&self) -> RequestId {
222 RequestId::Number(self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
223 }
224}
225
226impl ProgressTokenProvider for AtomicU32Provider {
227 fn next_progress_token(&self) -> ProgressToken {
228 ProgressToken(NumberOrString::Number(
229 self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
230 ))
231 }
232}
233
234type Responder<T> = tokio::sync::oneshot::Sender<T>;
235
236#[derive(Debug)]
242pub struct RequestHandle<R: ServiceRole> {
243 pub rx: tokio::sync::oneshot::Receiver<Result<R::PeerResp, ServiceError>>,
244 pub options: PeerRequestOptions,
245 pub peer: Peer<R>,
246 pub id: RequestId,
247 pub progress_token: ProgressToken,
248}
249
250impl<R: ServiceRole> RequestHandle<R> {
251 pub const REQUEST_TIMEOUT_REASON: &str = "request timeout";
252 pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
253 if let Some(timeout) = self.options.timeout {
254 let timeout_result = tokio::time::timeout(timeout, async move {
255 self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
256 })
257 .await;
258 match timeout_result {
259 Ok(response) => response,
260 Err(_) => {
261 let error = Err(ServiceError::Timeout { timeout });
262 let notification = CancelledNotification {
264 params: CancelledNotificationParam {
265 request_id: self.id,
266 reason: Some(Self::REQUEST_TIMEOUT_REASON.to_owned()),
267 },
268 method: crate::model::CancelledNotificationMethod,
269 extensions: Default::default(),
270 };
271 let _ = self.peer.send_notification(notification.into()).await;
272 error
273 }
274 }
275 } else {
276 self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
277 }
278 }
279
280 pub async fn cancel(self, reason: Option<String>) -> Result<(), ServiceError> {
282 let notification = CancelledNotification {
283 params: CancelledNotificationParam {
284 request_id: self.id,
285 reason,
286 },
287 method: crate::model::CancelledNotificationMethod,
288 extensions: Default::default(),
289 };
290 self.peer.send_notification(notification.into()).await?;
291 Ok(())
292 }
293}
294
295#[derive(Debug)]
296pub(crate) enum PeerSinkMessage<R: ServiceRole> {
297 Request {
298 request: R::Req,
299 id: RequestId,
300 responder: Responder<Result<R::PeerResp, ServiceError>>,
301 },
302 Notification {
303 notification: R::Not,
304 responder: Responder<Result<(), ServiceError>>,
305 },
306}
307
308#[derive(Clone)]
314pub struct Peer<R: ServiceRole> {
315 tx: mpsc::Sender<PeerSinkMessage<R>>,
316 request_id_provider: Arc<dyn RequestIdProvider>,
317 progress_token_provider: Arc<dyn ProgressTokenProvider>,
318 info: Arc<tokio::sync::OnceCell<R::PeerInfo>>,
319}
320
321impl<R: ServiceRole> std::fmt::Debug for Peer<R> {
322 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
323 f.debug_struct("PeerSink")
324 .field("tx", &self.tx)
325 .field("is_client", &R::IS_CLIENT)
326 .finish()
327 }
328}
329
330type ProxyOutbound<R> = mpsc::Receiver<PeerSinkMessage<R>>;
331
332#[derive(Debug, Default)]
333pub struct PeerRequestOptions {
334 pub timeout: Option<Duration>,
335 pub meta: Option<Meta>,
336}
337
338impl PeerRequestOptions {
339 pub fn no_options() -> Self {
340 Self::default()
341 }
342}
343
344impl<R: ServiceRole> Peer<R> {
345 const CLIENT_CHANNEL_BUFFER_SIZE: usize = 1024;
346 pub(crate) fn new(
347 request_id_provider: Arc<dyn RequestIdProvider>,
348 peer_info: Option<R::PeerInfo>,
349 ) -> (Peer<R>, ProxyOutbound<R>) {
350 let (tx, rx) = mpsc::channel(Self::CLIENT_CHANNEL_BUFFER_SIZE);
351 (
352 Self {
353 tx,
354 request_id_provider,
355 progress_token_provider: Arc::new(AtomicU32ProgressTokenProvider::default()),
356 info: Arc::new(tokio::sync::OnceCell::new_with(peer_info)),
357 },
358 rx,
359 )
360 }
361 pub async fn send_notification(&self, notification: R::Not) -> Result<(), ServiceError> {
362 let (responder, receiver) = tokio::sync::oneshot::channel();
363 self.tx
364 .send(PeerSinkMessage::Notification {
365 notification,
366 responder,
367 })
368 .await
369 .map_err(|_m| ServiceError::TransportClosed)?;
370 receiver.await.map_err(|_e| ServiceError::TransportClosed)?
371 }
372 pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
373 self.send_request_with_option(request, PeerRequestOptions::no_options())
374 .await?
375 .await_response()
376 .await
377 }
378
379 pub async fn send_cancellable_request(
380 &self,
381 request: R::Req,
382 options: PeerRequestOptions,
383 ) -> Result<RequestHandle<R>, ServiceError> {
384 self.send_request_with_option(request, options).await
385 }
386
387 pub async fn send_request_with_option(
388 &self,
389 mut request: R::Req,
390 options: PeerRequestOptions,
391 ) -> Result<RequestHandle<R>, ServiceError> {
392 let id = self.request_id_provider.next_request_id();
393 let progress_token = self.progress_token_provider.next_progress_token();
394 request
395 .get_meta_mut()
396 .set_progress_token(progress_token.clone());
397 if let Some(meta) = options.meta.clone() {
398 request.get_meta_mut().extend(meta);
399 }
400 let (responder, receiver) = tokio::sync::oneshot::channel();
401 self.tx
402 .send(PeerSinkMessage::Request {
403 request,
404 id: id.clone(),
405 responder,
406 })
407 .await
408 .map_err(|_m| ServiceError::TransportClosed)?;
409 Ok(RequestHandle {
410 id,
411 rx: receiver,
412 progress_token,
413 options,
414 peer: self.clone(),
415 })
416 }
417 pub fn peer_info(&self) -> Option<&R::PeerInfo> {
418 self.info.get()
419 }
420
421 pub fn set_peer_info(&self, info: R::PeerInfo) {
422 if self.info.initialized() {
423 tracing::warn!("trying to set peer info, which is already initialized");
424 } else {
425 let _ = self.info.set(info);
426 }
427 }
428
429 pub fn is_transport_closed(&self) -> bool {
430 self.tx.is_closed()
431 }
432}
433
434#[derive(Debug)]
435pub struct RunningService<R: ServiceRole, S: Service<R>> {
436 service: Arc<S>,
437 peer: Peer<R>,
438 handle: tokio::task::JoinHandle<QuitReason>,
439 cancellation_token: CancellationToken,
440 dg: DropGuard,
441}
442impl<R: ServiceRole, S: Service<R>> Deref for RunningService<R, S> {
443 type Target = Peer<R>;
444
445 fn deref(&self) -> &Self::Target {
446 &self.peer
447 }
448}
449
450impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
451 #[inline]
452 pub fn peer(&self) -> &Peer<R> {
453 &self.peer
454 }
455 #[inline]
456 pub fn service(&self) -> &S {
457 self.service.as_ref()
458 }
459 #[inline]
460 pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
461 RunningServiceCancellationToken(self.cancellation_token.clone())
462 }
463 #[inline]
464 pub async fn waiting(self) -> Result<QuitReason, tokio::task::JoinError> {
465 self.handle.await
466 }
467 pub async fn cancel(self) -> Result<QuitReason, tokio::task::JoinError> {
468 let RunningService { dg, handle, .. } = self;
469 dg.disarm().cancel();
470 handle.await
471 }
472}
473
474pub struct RunningServiceCancellationToken(CancellationToken);
476
477impl RunningServiceCancellationToken {
478 pub fn cancel(self) {
479 self.0.cancel();
480 }
481}
482
483#[derive(Debug)]
484pub enum QuitReason {
485 Cancelled,
486 Closed,
487 JoinError(tokio::task::JoinError),
488}
489
490#[derive(Debug, Clone)]
492pub struct RequestContext<R: ServiceRole> {
493 pub ct: CancellationToken,
495 pub id: RequestId,
496 pub meta: Meta,
497 pub extensions: Extensions,
498 pub peer: Peer<R>,
500}
501
502#[derive(Debug, Clone)]
504pub struct NotificationContext<R: ServiceRole> {
505 pub meta: Meta,
506 pub extensions: Extensions,
507 pub peer: Peer<R>,
509}
510
511pub fn serve_directly<R, S, T, E, A>(
513 service: S,
514 transport: T,
515 peer_info: Option<R::PeerInfo>,
516) -> RunningService<R, S>
517where
518 R: ServiceRole,
519 S: Service<R>,
520 T: IntoTransport<R, E, A>,
521 E: std::error::Error + Send + Sync + 'static,
522{
523 serve_directly_with_ct(service, transport, peer_info, Default::default())
524}
525
526pub fn serve_directly_with_ct<R, S, T, E, A>(
528 service: S,
529 transport: T,
530 peer_info: Option<R::PeerInfo>,
531 ct: CancellationToken,
532) -> RunningService<R, S>
533where
534 R: ServiceRole,
535 S: Service<R>,
536 T: IntoTransport<R, E, A>,
537 E: std::error::Error + Send + Sync + 'static,
538{
539 let (peer, peer_rx) = Peer::new(Arc::new(AtomicU32RequestIdProvider::default()), peer_info);
540 serve_inner(service, transport, peer, peer_rx, ct)
541}
542
543#[instrument(skip_all)]
544fn serve_inner<R, S, T, E, A>(
545 service: S,
546 transport: T,
547 peer: Peer<R>,
548 mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
549 ct: CancellationToken,
550) -> RunningService<R, S>
551where
552 R: ServiceRole,
553 S: Service<R>,
554 T: IntoTransport<R, E, A>,
555 E: std::error::Error + Send + Sync + 'static,
556{
557 const SINK_PROXY_BUFFER_SIZE: usize = 64;
558 let (sink_proxy_tx, mut sink_proxy_rx) =
559 tokio::sync::mpsc::channel::<TxJsonRpcMessage<R>>(SINK_PROXY_BUFFER_SIZE);
560 let peer_info = peer.peer_info();
561 if R::IS_CLIENT {
562 tracing::info!(?peer_info, "Service initialized as client");
563 } else {
564 tracing::info!(?peer_info, "Service initialized as server");
565 }
566
567 let mut local_responder_pool =
568 HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
569 let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
570 let shared_service = Arc::new(service);
571 let service = shared_service.clone();
573
574 let serve_loop_ct = ct.child_token();
577 let peer_return: Peer<R> = peer.clone();
578 let handle = tokio::spawn(async move {
579 let mut transport = transport.into_transport();
580 let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
581 let mut send_task_set = tokio::task::JoinSet::<SendTaskResult<E>>::new();
582 #[derive(Debug)]
583 enum SendTaskResult<E> {
584 Request {
585 id: RequestId,
586 result: Result<(), E>,
587 },
588 Notification {
589 responder: Responder<Result<(), ServiceError>>,
590 cancellation_param: Option<CancelledNotificationParam>,
591 result: Result<(), E>,
592 },
593 }
594 #[derive(Debug)]
595 enum Event<R: ServiceRole, E> {
596 ProxyMessage(PeerSinkMessage<R>),
597 PeerMessage(RxJsonRpcMessage<R>),
598 ToSink(TxJsonRpcMessage<R>),
599 SendTaskResult(SendTaskResult<E>),
600 }
601
602 let quit_reason = loop {
603 let evt = if let Some(m) = batch_messages.pop_front() {
604 Event::PeerMessage(m)
605 } else {
606 tokio::select! {
607 m = sink_proxy_rx.recv(), if !sink_proxy_rx.is_closed() => {
608 if let Some(m) = m {
609 Event::ToSink(m)
610 } else {
611 continue
612 }
613 }
614 m = transport.receive() => {
615 if let Some(m) = m {
616 Event::PeerMessage(m)
617 } else {
618 tracing::info!("input stream terminated");
620 break QuitReason::Closed
621 }
622 }
623 m = peer_rx.recv(), if !peer_rx.is_closed() => {
624 if let Some(m) = m {
625 Event::ProxyMessage(m)
626 } else {
627 continue
628 }
629 }
630 m = send_task_set.join_next(), if !send_task_set.is_empty() => {
631 let Some(result) = m else {
632 continue
633 };
634 match result {
635 Err(e) => {
636 tracing::error!(%e, "send request task encounter a tokio join error");
638 break QuitReason::JoinError(e)
639 }
640 Ok(result) => {
641 Event::SendTaskResult(result)
642 }
643 }
644 }
645 _ = serve_loop_ct.cancelled() => {
646 tracing::info!("task cancelled");
647 break QuitReason::Cancelled
648 }
649 }
650 };
651
652 tracing::trace!(?evt, "new event");
653 match evt {
654 Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
655 if let Err(e) = result {
656 if let Some(responder) = local_responder_pool.remove(&id) {
657 let _ = responder.send(Err(ServiceError::TransportSend(Box::new(e))));
658 }
659 }
660 }
661 Event::SendTaskResult(SendTaskResult::Notification {
662 responder,
663 result,
664 cancellation_param,
665 }) => {
666 let response = if let Err(e) = result {
667 Err(ServiceError::TransportSend(Box::new(e)))
668 } else {
669 Ok(())
670 };
671 let _ = responder.send(response);
672 if let Some(param) = cancellation_param {
673 if let Some(responder) = local_responder_pool.remove(¶m.request_id) {
674 tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
675 let _response_result = responder.send(Err(ServiceError::Cancelled {
676 reason: param.reason.clone(),
677 }));
678 }
679 }
680 }
681 Event::ToSink(m) => {
683 if let Some(id) = match &m {
684 JsonRpcMessage::Response(response) => Some(&response.id),
685 JsonRpcMessage::Error(error) => Some(&error.id),
686 _ => None,
687 } {
688 if let Some(ct) = local_ct_pool.remove(id) {
689 ct.cancel();
690 }
691 let send = transport.send(m);
692 tokio::spawn(async move {
693 let send_result = send.await;
694 if let Err(error) = send_result {
695 tracing::error!(%error, "fail to response message");
696 }
697 });
698 }
699 }
700 Event::ProxyMessage(PeerSinkMessage::Request {
701 request,
702 id,
703 responder,
704 }) => {
705 local_responder_pool.insert(id.clone(), responder);
706 let send = transport.send(JsonRpcMessage::request(request, id.clone()));
707 {
708 let id = id.clone();
709 send_task_set
710 .spawn(send.map(move |r| SendTaskResult::Request { id, result: r }));
711 }
712 }
713 Event::ProxyMessage(PeerSinkMessage::Notification {
714 notification,
715 responder,
716 }) => {
717 let mut cancellation_param = None;
719 let notification = match notification.try_into() {
720 Ok::<CancelledNotification, _>(cancelled) => {
721 cancellation_param.replace(cancelled.params.clone());
722 cancelled.into()
723 }
724 Err(notification) => notification,
725 };
726 let send = transport.send(JsonRpcMessage::notification(notification));
727 send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
728 responder,
729 cancellation_param,
730 result,
731 }));
732 }
733 Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
734 id,
735 mut request,
736 ..
737 })) => {
738 tracing::debug!(%id, ?request, "received request");
739 {
740 let service = shared_service.clone();
741 let sink = sink_proxy_tx.clone();
742 let request_ct = serve_loop_ct.child_token();
743 let context_ct = request_ct.child_token();
744 local_ct_pool.insert(id.clone(), request_ct);
745 let mut extensions = Extensions::new();
746 let mut meta = Meta::new();
747 std::mem::swap(&mut extensions, request.extensions_mut());
749 std::mem::swap(&mut meta, request.get_meta_mut());
750 let context = RequestContext {
751 ct: context_ct,
752 id: id.clone(),
753 peer: peer.clone(),
754 meta,
755 extensions,
756 };
757 tokio::spawn(async move {
758 let result = service.handle_request(request, context).await;
759 let response = match result {
760 Ok(result) => {
761 tracing::debug!(%id, ?result, "response message");
762 JsonRpcMessage::response(result, id)
763 }
764 Err(error) => {
765 tracing::warn!(%id, ?error, "response error");
766 JsonRpcMessage::error(error, id)
767 }
768 };
769 let _send_result = sink.send(response).await;
770 });
771 }
772 }
773 Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
774 notification,
775 ..
776 })) => {
777 tracing::info!(?notification, "received notification");
778 let mut notification = match notification.try_into() {
780 Ok::<CancelledNotification, _>(cancelled) => {
781 if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
782 tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
783 ct.cancel();
784 }
785 cancelled.into()
786 }
787 Err(notification) => notification,
788 };
789 {
790 let service = shared_service.clone();
791 let mut extensions = Extensions::new();
792 let mut meta = Meta::new();
793 std::mem::swap(&mut extensions, notification.extensions_mut());
795 std::mem::swap(&mut meta, notification.get_meta_mut());
796 let context = NotificationContext {
797 peer: peer.clone(),
798 meta,
799 extensions,
800 };
801 tokio::spawn(async move {
802 let result = service.handle_notification(notification, context).await;
803 if let Err(error) = result {
804 tracing::warn!(%error, "Error sending notification");
805 }
806 });
807 }
808 }
809 Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
810 result,
811 id,
812 ..
813 })) => {
814 if let Some(responder) = local_responder_pool.remove(&id) {
815 let response_result = responder.send(Ok(result));
816 if let Err(_error) = response_result {
817 tracing::warn!(%id, "Error sending response");
818 }
819 }
820 }
821 Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
822 if let Some(responder) = local_responder_pool.remove(&id) {
823 let _response_result = responder.send(Err(ServiceError::McpError(error)));
824 if let Err(_error) = _response_result {
825 tracing::warn!(%id, "Error sending response");
826 }
827 }
828 }
829 Event::PeerMessage(JsonRpcMessage::BatchRequest(batch)) => {
830 batch_messages.extend(
831 batch
832 .into_iter()
833 .map(JsonRpcBatchRequestItem::into_non_batch_message),
834 );
835 }
836 Event::PeerMessage(JsonRpcMessage::BatchResponse(batch)) => {
837 batch_messages.extend(
838 batch
839 .into_iter()
840 .map(JsonRpcBatchResponseItem::into_non_batch_message),
841 );
842 }
843 }
844 };
845 let sink_close_result = transport.close().await;
846 if let Err(e) = sink_close_result {
847 tracing::error!(%e, "fail to close sink");
848 }
849 tracing::info!(?quit_reason, "serve finished");
850 quit_reason
851 });
852 RunningService {
853 service,
854 peer: peer_return,
855 handle,
856 cancellation_token: ct.clone(),
857 dg: ct.drop_guard(),
858 }
859}