rmcp/
service.rs

1use futures::{FutureExt, future::BoxFuture};
2use thiserror::Error;
3
4use crate::{
5    error::ErrorData as McpError,
6    model::{
7        CancelledNotification, CancelledNotificationParam, Extensions, GetExtensions, GetMeta,
8        JsonRpcError, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, Meta,
9        NumberOrString, ProgressToken, RequestId, ServerJsonRpcMessage,
10    },
11    transport::{DynamicTransportError, IntoTransport, Transport},
12};
13#[cfg(feature = "client")]
14#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
15mod client;
16#[cfg(feature = "client")]
17#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
18pub use client::*;
19#[cfg(feature = "server")]
20#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
21mod server;
22#[cfg(feature = "server")]
23#[cfg_attr(docsrs, doc(cfg(feature = "server")))]
24pub use server::*;
25#[cfg(feature = "tower")]
26#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
27mod tower;
28use tokio_util::sync::{CancellationToken, DropGuard};
29#[cfg(feature = "tower")]
30#[cfg_attr(docsrs, doc(cfg(feature = "tower")))]
31pub use tower::*;
32use tracing::{Instrument as _, instrument};
33#[derive(Error, Debug)]
34#[non_exhaustive]
35pub enum ServiceError {
36    #[error("Mcp error: {0}")]
37    McpError(McpError),
38    #[error("Transport send error: {0}")]
39    TransportSend(DynamicTransportError),
40    #[error("Transport closed")]
41    TransportClosed,
42    #[error("Unexpected response type")]
43    UnexpectedResponse,
44    #[error("task cancelled for reason {}", reason.as_deref().unwrap_or("<unknown>"))]
45    Cancelled { reason: Option<String> },
46    #[error("request timeout after {}", chrono::Duration::from_std(*timeout).unwrap_or_default())]
47    Timeout { timeout: Duration },
48}
49
50trait TransferObject:
51    std::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned + Send + Sync + 'static
52{
53}
54
55impl<T> TransferObject for T where
56    T: std::fmt::Debug
57        + serde::Serialize
58        + serde::de::DeserializeOwned
59        + Send
60        + Sync
61        + 'static
62        + Clone
63{
64}
65
66#[allow(private_bounds, reason = "there's no the third implementation")]
67pub trait ServiceRole: std::fmt::Debug + Send + Sync + 'static + Copy + Clone {
68    type Req: TransferObject + GetMeta + GetExtensions;
69    type Resp: TransferObject;
70    type Not: TryInto<CancelledNotification, Error = Self::Not>
71        + From<CancelledNotification>
72        + TransferObject;
73    type PeerReq: TransferObject + GetMeta + GetExtensions;
74    type PeerResp: TransferObject;
75    type PeerNot: TryInto<CancelledNotification, Error = Self::PeerNot>
76        + From<CancelledNotification>
77        + TransferObject
78        + GetMeta
79        + GetExtensions;
80    type InitializeError;
81    const IS_CLIENT: bool;
82    type Info: TransferObject;
83    type PeerInfo: TransferObject;
84}
85
86pub type TxJsonRpcMessage<R> =
87    JsonRpcMessage<<R as ServiceRole>::Req, <R as ServiceRole>::Resp, <R as ServiceRole>::Not>;
88pub type RxJsonRpcMessage<R> = JsonRpcMessage<
89    <R as ServiceRole>::PeerReq,
90    <R as ServiceRole>::PeerResp,
91    <R as ServiceRole>::PeerNot,
92>;
93
94pub trait Service<R: ServiceRole>: Send + Sync + 'static {
95    fn handle_request(
96        &self,
97        request: R::PeerReq,
98        context: RequestContext<R>,
99    ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_;
100    fn handle_notification(
101        &self,
102        notification: R::PeerNot,
103        context: NotificationContext<R>,
104    ) -> impl Future<Output = Result<(), McpError>> + Send + '_;
105    fn get_info(&self) -> R::Info;
106}
107
108pub trait ServiceExt<R: ServiceRole>: Service<R> + Sized {
109    /// Convert this service to a dynamic boxed service
110    ///
111    /// This could be very helpful when you want to store the services in a collection
112    fn into_dyn(self) -> Box<dyn DynService<R>> {
113        Box::new(self)
114    }
115    fn serve<T, E, A>(
116        self,
117        transport: T,
118    ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
119    where
120        T: IntoTransport<R, E, A>,
121        E: std::error::Error + Send + Sync + 'static,
122        Self: Sized,
123    {
124        Self::serve_with_ct(self, transport, Default::default())
125    }
126    fn serve_with_ct<T, E, A>(
127        self,
128        transport: T,
129        ct: CancellationToken,
130    ) -> impl Future<Output = Result<RunningService<R, Self>, R::InitializeError>> + Send
131    where
132        T: IntoTransport<R, E, A>,
133        E: std::error::Error + Send + Sync + 'static,
134        Self: Sized;
135}
136
137impl<R: ServiceRole> Service<R> for Box<dyn DynService<R>> {
138    fn handle_request(
139        &self,
140        request: R::PeerReq,
141        context: RequestContext<R>,
142    ) -> impl Future<Output = Result<R::Resp, McpError>> + Send + '_ {
143        DynService::handle_request(self.as_ref(), request, context)
144    }
145
146    fn handle_notification(
147        &self,
148        notification: R::PeerNot,
149        context: NotificationContext<R>,
150    ) -> impl Future<Output = Result<(), McpError>> + Send + '_ {
151        DynService::handle_notification(self.as_ref(), notification, context)
152    }
153
154    fn get_info(&self) -> R::Info {
155        DynService::get_info(self.as_ref())
156    }
157}
158
159pub trait DynService<R: ServiceRole>: Send + Sync {
160    fn handle_request(
161        &self,
162        request: R::PeerReq,
163        context: RequestContext<R>,
164    ) -> BoxFuture<'_, Result<R::Resp, McpError>>;
165    fn handle_notification(
166        &self,
167        notification: R::PeerNot,
168        context: NotificationContext<R>,
169    ) -> BoxFuture<'_, Result<(), McpError>>;
170    fn get_info(&self) -> R::Info;
171}
172
173impl<R: ServiceRole, S: Service<R>> DynService<R> for S {
174    fn handle_request(
175        &self,
176        request: R::PeerReq,
177        context: RequestContext<R>,
178    ) -> BoxFuture<'_, Result<R::Resp, McpError>> {
179        Box::pin(self.handle_request(request, context))
180    }
181    fn handle_notification(
182        &self,
183        notification: R::PeerNot,
184        context: NotificationContext<R>,
185    ) -> BoxFuture<'_, Result<(), McpError>> {
186        Box::pin(self.handle_notification(notification, context))
187    }
188    fn get_info(&self) -> R::Info {
189        self.get_info()
190    }
191}
192
193use std::{
194    collections::{HashMap, VecDeque},
195    ops::Deref,
196    sync::{Arc, atomic::AtomicU64},
197    time::Duration,
198};
199
200use tokio::sync::mpsc;
201
202pub trait RequestIdProvider: Send + Sync + 'static {
203    fn next_request_id(&self) -> RequestId;
204}
205
206pub trait ProgressTokenProvider: Send + Sync + 'static {
207    fn next_progress_token(&self) -> ProgressToken;
208}
209
210pub type AtomicU32RequestIdProvider = AtomicU32Provider;
211pub type AtomicU32ProgressTokenProvider = AtomicU32Provider;
212
213#[derive(Debug, Default)]
214pub struct AtomicU32Provider {
215    id: AtomicU64,
216}
217
218impl RequestIdProvider for AtomicU32Provider {
219    fn next_request_id(&self) -> RequestId {
220        let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
221        // Safe conversion: we start at 0 and increment by 1, so we won't overflow i64::MAX in practice
222        RequestId::Number(id as i64)
223    }
224}
225
226impl ProgressTokenProvider for AtomicU32Provider {
227    fn next_progress_token(&self) -> ProgressToken {
228        let id = self.id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
229        ProgressToken(NumberOrString::Number(id as i64))
230    }
231}
232
233type Responder<T> = tokio::sync::oneshot::Sender<T>;
234
235/// A handle to a remote request
236///
237/// You can cancel it by call [`RequestHandle::cancel`] with a reason,
238///
239/// or wait for response by call [`RequestHandle::await_response`]
240#[derive(Debug)]
241pub struct RequestHandle<R: ServiceRole> {
242    pub rx: tokio::sync::oneshot::Receiver<Result<R::PeerResp, ServiceError>>,
243    pub options: PeerRequestOptions,
244    pub peer: Peer<R>,
245    pub id: RequestId,
246    pub progress_token: ProgressToken,
247}
248
249impl<R: ServiceRole> RequestHandle<R> {
250    pub const REQUEST_TIMEOUT_REASON: &str = "request timeout";
251    pub async fn await_response(self) -> Result<R::PeerResp, ServiceError> {
252        if let Some(timeout) = self.options.timeout {
253            let timeout_result = tokio::time::timeout(timeout, async move {
254                self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
255            })
256            .await;
257            match timeout_result {
258                Ok(response) => response,
259                Err(_) => {
260                    let error = Err(ServiceError::Timeout { timeout });
261                    // cancel this request
262                    let notification = CancelledNotification {
263                        params: CancelledNotificationParam {
264                            request_id: self.id,
265                            reason: Some(Self::REQUEST_TIMEOUT_REASON.to_owned()),
266                        },
267                        method: crate::model::CancelledNotificationMethod,
268                        extensions: Default::default(),
269                    };
270                    let _ = self.peer.send_notification(notification.into()).await;
271                    error
272                }
273            }
274        } else {
275            self.rx.await.map_err(|_e| ServiceError::TransportClosed)?
276        }
277    }
278
279    /// Cancel this request
280    pub async fn cancel(self, reason: Option<String>) -> Result<(), ServiceError> {
281        let notification = CancelledNotification {
282            params: CancelledNotificationParam {
283                request_id: self.id,
284                reason,
285            },
286            method: crate::model::CancelledNotificationMethod,
287            extensions: Default::default(),
288        };
289        self.peer.send_notification(notification.into()).await?;
290        Ok(())
291    }
292}
293
294#[derive(Debug)]
295pub(crate) enum PeerSinkMessage<R: ServiceRole> {
296    Request {
297        request: R::Req,
298        id: RequestId,
299        responder: Responder<Result<R::PeerResp, ServiceError>>,
300    },
301    Notification {
302        notification: R::Not,
303        responder: Responder<Result<(), ServiceError>>,
304    },
305}
306
307/// An interface to fetch the remote client or server
308///
309/// For general purpose, call [`Peer::send_request`] or [`Peer::send_notification`] to send message to remote peer.
310///
311/// To create a cancellable request, call [`Peer::send_request_with_option`].
312#[derive(Clone)]
313pub struct Peer<R: ServiceRole> {
314    tx: mpsc::Sender<PeerSinkMessage<R>>,
315    request_id_provider: Arc<dyn RequestIdProvider>,
316    progress_token_provider: Arc<dyn ProgressTokenProvider>,
317    info: Arc<tokio::sync::OnceCell<R::PeerInfo>>,
318}
319
320impl<R: ServiceRole> std::fmt::Debug for Peer<R> {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        f.debug_struct("PeerSink")
323            .field("tx", &self.tx)
324            .field("is_client", &R::IS_CLIENT)
325            .finish()
326    }
327}
328
329type ProxyOutbound<R> = mpsc::Receiver<PeerSinkMessage<R>>;
330
331#[derive(Debug, Default)]
332pub struct PeerRequestOptions {
333    pub timeout: Option<Duration>,
334    pub meta: Option<Meta>,
335}
336
337impl PeerRequestOptions {
338    pub fn no_options() -> Self {
339        Self::default()
340    }
341}
342
343impl<R: ServiceRole> Peer<R> {
344    const CLIENT_CHANNEL_BUFFER_SIZE: usize = 1024;
345    pub(crate) fn new(
346        request_id_provider: Arc<dyn RequestIdProvider>,
347        peer_info: Option<R::PeerInfo>,
348    ) -> (Peer<R>, ProxyOutbound<R>) {
349        let (tx, rx) = mpsc::channel(Self::CLIENT_CHANNEL_BUFFER_SIZE);
350        (
351            Self {
352                tx,
353                request_id_provider,
354                progress_token_provider: Arc::new(AtomicU32ProgressTokenProvider::default()),
355                info: Arc::new(tokio::sync::OnceCell::new_with(peer_info)),
356            },
357            rx,
358        )
359    }
360    pub async fn send_notification(&self, notification: R::Not) -> Result<(), ServiceError> {
361        let (responder, receiver) = tokio::sync::oneshot::channel();
362        self.tx
363            .send(PeerSinkMessage::Notification {
364                notification,
365                responder,
366            })
367            .await
368            .map_err(|_m| ServiceError::TransportClosed)?;
369        receiver.await.map_err(|_e| ServiceError::TransportClosed)?
370    }
371    pub async fn send_request(&self, request: R::Req) -> Result<R::PeerResp, ServiceError> {
372        self.send_request_with_option(request, PeerRequestOptions::no_options())
373            .await?
374            .await_response()
375            .await
376    }
377
378    pub async fn send_cancellable_request(
379        &self,
380        request: R::Req,
381        options: PeerRequestOptions,
382    ) -> Result<RequestHandle<R>, ServiceError> {
383        self.send_request_with_option(request, options).await
384    }
385
386    pub async fn send_request_with_option(
387        &self,
388        mut request: R::Req,
389        options: PeerRequestOptions,
390    ) -> Result<RequestHandle<R>, ServiceError> {
391        let id = self.request_id_provider.next_request_id();
392        let progress_token = self.progress_token_provider.next_progress_token();
393        request
394            .get_meta_mut()
395            .set_progress_token(progress_token.clone());
396        if let Some(meta) = options.meta.clone() {
397            request.get_meta_mut().extend(meta);
398        }
399        let (responder, receiver) = tokio::sync::oneshot::channel();
400        self.tx
401            .send(PeerSinkMessage::Request {
402                request,
403                id: id.clone(),
404                responder,
405            })
406            .await
407            .map_err(|_m| ServiceError::TransportClosed)?;
408        Ok(RequestHandle {
409            id,
410            rx: receiver,
411            progress_token,
412            options,
413            peer: self.clone(),
414        })
415    }
416    pub fn peer_info(&self) -> Option<&R::PeerInfo> {
417        self.info.get()
418    }
419
420    pub fn set_peer_info(&self, info: R::PeerInfo) {
421        if self.info.initialized() {
422            tracing::warn!("trying to set peer info, which is already initialized");
423        } else {
424            let _ = self.info.set(info);
425        }
426    }
427
428    pub fn is_transport_closed(&self) -> bool {
429        self.tx.is_closed()
430    }
431}
432
433#[derive(Debug)]
434pub struct RunningService<R: ServiceRole, S: Service<R>> {
435    service: Arc<S>,
436    peer: Peer<R>,
437    handle: tokio::task::JoinHandle<QuitReason>,
438    cancellation_token: CancellationToken,
439    dg: DropGuard,
440}
441impl<R: ServiceRole, S: Service<R>> Deref for RunningService<R, S> {
442    type Target = Peer<R>;
443
444    fn deref(&self) -> &Self::Target {
445        &self.peer
446    }
447}
448
449impl<R: ServiceRole, S: Service<R>> RunningService<R, S> {
450    #[inline]
451    pub fn peer(&self) -> &Peer<R> {
452        &self.peer
453    }
454    #[inline]
455    pub fn service(&self) -> &S {
456        self.service.as_ref()
457    }
458    #[inline]
459    pub fn cancellation_token(&self) -> RunningServiceCancellationToken {
460        RunningServiceCancellationToken(self.cancellation_token.clone())
461    }
462    #[inline]
463    pub async fn waiting(self) -> Result<QuitReason, tokio::task::JoinError> {
464        self.handle.await
465    }
466    pub async fn cancel(self) -> Result<QuitReason, tokio::task::JoinError> {
467        let RunningService { dg, handle, .. } = self;
468        dg.disarm().cancel();
469        handle.await
470    }
471}
472
473// use a wrapper type so we can tweak the implementation if needed
474pub struct RunningServiceCancellationToken(CancellationToken);
475
476impl RunningServiceCancellationToken {
477    pub fn cancel(self) {
478        self.0.cancel();
479    }
480}
481
482#[derive(Debug)]
483pub enum QuitReason {
484    Cancelled,
485    Closed,
486    JoinError(tokio::task::JoinError),
487}
488
489/// Request execution context
490#[derive(Debug, Clone)]
491pub struct RequestContext<R: ServiceRole> {
492    /// this token will be cancelled when the [`CancelledNotification`] is received.
493    pub ct: CancellationToken,
494    pub id: RequestId,
495    pub meta: Meta,
496    pub extensions: Extensions,
497    /// An interface to fetch the remote client or server
498    pub peer: Peer<R>,
499}
500
501/// Request execution context
502#[derive(Debug, Clone)]
503pub struct NotificationContext<R: ServiceRole> {
504    pub meta: Meta,
505    pub extensions: Extensions,
506    /// An interface to fetch the remote client or server
507    pub peer: Peer<R>,
508}
509
510/// Use this function to skip initialization process
511pub fn serve_directly<R, S, T, E, A>(
512    service: S,
513    transport: T,
514    peer_info: Option<R::PeerInfo>,
515) -> RunningService<R, S>
516where
517    R: ServiceRole,
518    S: Service<R>,
519    T: IntoTransport<R, E, A>,
520    E: std::error::Error + Send + Sync + 'static,
521{
522    serve_directly_with_ct(service, transport, peer_info, Default::default())
523}
524
525/// Use this function to skip initialization process
526pub fn serve_directly_with_ct<R, S, T, E, A>(
527    service: S,
528    transport: T,
529    peer_info: Option<R::PeerInfo>,
530    ct: CancellationToken,
531) -> RunningService<R, S>
532where
533    R: ServiceRole,
534    S: Service<R>,
535    T: IntoTransport<R, E, A>,
536    E: std::error::Error + Send + Sync + 'static,
537{
538    let (peer, peer_rx) = Peer::new(Arc::new(AtomicU32RequestIdProvider::default()), peer_info);
539    serve_inner(service, transport.into_transport(), peer, peer_rx, ct)
540}
541
542#[instrument(skip_all)]
543fn serve_inner<R, S, T>(
544    service: S,
545    transport: T,
546    peer: Peer<R>,
547    mut peer_rx: tokio::sync::mpsc::Receiver<PeerSinkMessage<R>>,
548    ct: CancellationToken,
549) -> RunningService<R, S>
550where
551    R: ServiceRole,
552    S: Service<R>,
553    T: Transport<R> + 'static,
554{
555    const SINK_PROXY_BUFFER_SIZE: usize = 64;
556    let (sink_proxy_tx, mut sink_proxy_rx) =
557        tokio::sync::mpsc::channel::<TxJsonRpcMessage<R>>(SINK_PROXY_BUFFER_SIZE);
558    let peer_info = peer.peer_info();
559    if R::IS_CLIENT {
560        tracing::info!(?peer_info, "Service initialized as client");
561    } else {
562        tracing::info!(?peer_info, "Service initialized as server");
563    }
564
565    let mut local_responder_pool =
566        HashMap::<RequestId, Responder<Result<R::PeerResp, ServiceError>>>::new();
567    let mut local_ct_pool = HashMap::<RequestId, CancellationToken>::new();
568    let shared_service = Arc::new(service);
569    // for return
570    let service = shared_service.clone();
571
572    // let message_sink = tokio::sync::
573    // let mut stream = std::pin::pin!(stream);
574    let serve_loop_ct = ct.child_token();
575    let peer_return: Peer<R> = peer.clone();
576    let current_span = tracing::Span::current();
577    let handle = tokio::spawn(async move {
578        let mut transport = transport.into_transport();
579        let mut batch_messages = VecDeque::<RxJsonRpcMessage<R>>::new();
580        let mut send_task_set = tokio::task::JoinSet::<SendTaskResult>::new();
581        #[derive(Debug)]
582        enum SendTaskResult {
583            Request {
584                id: RequestId,
585                result: Result<(), DynamicTransportError>,
586            },
587            Notification {
588                responder: Responder<Result<(), ServiceError>>,
589                cancellation_param: Option<CancelledNotificationParam>,
590                result: Result<(), DynamicTransportError>,
591            },
592        }
593        #[derive(Debug)]
594        enum Event<R: ServiceRole> {
595            ProxyMessage(PeerSinkMessage<R>),
596            PeerMessage(RxJsonRpcMessage<R>),
597            ToSink(TxJsonRpcMessage<R>),
598            SendTaskResult(SendTaskResult),
599        }
600
601        let quit_reason = loop {
602            let evt = if let Some(m) = batch_messages.pop_front() {
603                Event::PeerMessage(m)
604            } else {
605                tokio::select! {
606                    m = sink_proxy_rx.recv(), if !sink_proxy_rx.is_closed() => {
607                        if let Some(m) = m {
608                            Event::ToSink(m)
609                        } else {
610                            continue
611                        }
612                    }
613                    m = transport.receive() => {
614                        if let Some(m) = m {
615                            Event::PeerMessage(m)
616                        } else {
617                            // input stream closed
618                            tracing::info!("input stream terminated");
619                            break QuitReason::Closed
620                        }
621                    }
622                    m = peer_rx.recv(), if !peer_rx.is_closed() => {
623                        if let Some(m) = m {
624                            Event::ProxyMessage(m)
625                        } else {
626                            continue
627                        }
628                    }
629                    m = send_task_set.join_next(), if !send_task_set.is_empty() => {
630                        let Some(result) = m else {
631                            continue
632                        };
633                        match result {
634                            Err(e) => {
635                                // join error, which is serious, we should quit.
636                                tracing::error!(%e, "send request task encounter a tokio join error");
637                                break QuitReason::JoinError(e)
638                            }
639                            Ok(result) => {
640                                Event::SendTaskResult(result)
641                            }
642                        }
643                    }
644                    _ = serve_loop_ct.cancelled() => {
645                        tracing::info!("task cancelled");
646                        break QuitReason::Cancelled
647                    }
648                }
649            };
650
651            tracing::trace!(?evt, "new event");
652            match evt {
653                Event::SendTaskResult(SendTaskResult::Request { id, result }) => {
654                    if let Err(e) = result {
655                        if let Some(responder) = local_responder_pool.remove(&id) {
656                            let _ = responder.send(Err(ServiceError::TransportSend(e)));
657                        }
658                    }
659                }
660                Event::SendTaskResult(SendTaskResult::Notification {
661                    responder,
662                    result,
663                    cancellation_param,
664                }) => {
665                    let response = if let Err(e) = result {
666                        Err(ServiceError::TransportSend(e))
667                    } else {
668                        Ok(())
669                    };
670                    let _ = responder.send(response);
671                    if let Some(param) = cancellation_param {
672                        if let Some(responder) = local_responder_pool.remove(&param.request_id) {
673                            tracing::info!(id = %param.request_id, reason = param.reason, "cancelled");
674                            let _response_result = responder.send(Err(ServiceError::Cancelled {
675                                reason: param.reason.clone(),
676                            }));
677                        }
678                    }
679                }
680                // response and error
681                Event::ToSink(m) => {
682                    if let Some(id) = match &m {
683                        JsonRpcMessage::Response(response) => Some(&response.id),
684                        JsonRpcMessage::Error(error) => Some(&error.id),
685                        _ => None,
686                    } {
687                        if let Some(ct) = local_ct_pool.remove(id) {
688                            ct.cancel();
689                        }
690                        let send = transport.send(m);
691                        let current_span = tracing::Span::current();
692                        tokio::spawn(async move {
693                            let send_result = send.await;
694                            if let Err(error) = send_result {
695                                tracing::error!(%error, "fail to response message");
696                            }
697                        }.instrument(current_span));
698                    }
699                }
700                Event::ProxyMessage(PeerSinkMessage::Request {
701                    request,
702                    id,
703                    responder,
704                }) => {
705                    local_responder_pool.insert(id.clone(), responder);
706                    let send = transport.send(JsonRpcMessage::request(request, id.clone()));
707                    {
708                        let id = id.clone();
709                        let current_span = tracing::Span::current();
710                        send_task_set.spawn(send.map(move |r| SendTaskResult::Request {
711                            id,
712                            result: r.map_err(DynamicTransportError::new::<T, R>),
713                        }).instrument(current_span));
714                    }
715                }
716                Event::ProxyMessage(PeerSinkMessage::Notification {
717                    notification,
718                    responder,
719                }) => {
720                    // catch cancellation notification
721                    let mut cancellation_param = None;
722                    let notification = match notification.try_into() {
723                        Ok::<CancelledNotification, _>(cancelled) => {
724                            cancellation_param.replace(cancelled.params.clone());
725                            cancelled.into()
726                        }
727                        Err(notification) => notification,
728                    };
729                    let send = transport.send(JsonRpcMessage::notification(notification));
730                    let current_span = tracing::Span::current();
731                    send_task_set.spawn(send.map(move |result| SendTaskResult::Notification {
732                        responder,
733                        cancellation_param,
734                        result: result.map_err(DynamicTransportError::new::<T, R>),
735                    }).instrument(current_span));
736                }
737                Event::PeerMessage(JsonRpcMessage::Request(JsonRpcRequest {
738                    id,
739                    mut request,
740                    ..
741                })) => {
742                    tracing::debug!(%id, ?request, "received request");
743                    {
744                        let service = shared_service.clone();
745                        let sink = sink_proxy_tx.clone();
746                        let request_ct = serve_loop_ct.child_token();
747                        let context_ct = request_ct.child_token();
748                        local_ct_pool.insert(id.clone(), request_ct);
749                        let mut extensions = Extensions::new();
750                        let mut meta = Meta::new();
751                        // avoid clone
752                        // swap meta firstly, otherwise progress token will be lost
753                        std::mem::swap(&mut meta, request.get_meta_mut());
754                        std::mem::swap(&mut extensions, request.extensions_mut());
755                        let context = RequestContext {
756                            ct: context_ct,
757                            id: id.clone(),
758                            peer: peer.clone(),
759                            meta,
760                            extensions,
761                        };
762                        let current_span = tracing::Span::current();
763                        tokio::spawn(async move {
764                            let result = service
765                                .handle_request(request, context)
766                                .await;
767                            let response = match result {
768                                Ok(result) => {
769                                    tracing::debug!(%id, ?result, "response message");
770                                    JsonRpcMessage::response(result, id)
771                                }
772                                Err(error) => {
773                                    tracing::warn!(%id, ?error, "response error");
774                                    JsonRpcMessage::error(error, id)
775                                }
776                            };
777                            let _send_result = sink.send(response).await;
778                        }.instrument(current_span));
779                    }
780                }
781                Event::PeerMessage(JsonRpcMessage::Notification(JsonRpcNotification {
782                    notification,
783                    ..
784                })) => {
785                    tracing::info!(?notification, "received notification");
786                    // catch cancelled notification
787                    let mut notification = match notification.try_into() {
788                        Ok::<CancelledNotification, _>(cancelled) => {
789                            if let Some(ct) = local_ct_pool.remove(&cancelled.params.request_id) {
790                                tracing::info!(id = %cancelled.params.request_id, reason = cancelled.params.reason, "cancelled");
791                                ct.cancel();
792                            }
793                            cancelled.into()
794                        }
795                        Err(notification) => notification,
796                    };
797                    {
798                        let service = shared_service.clone();
799                        let mut extensions = Extensions::new();
800                        let mut meta = Meta::new();
801                        // avoid clone
802                        std::mem::swap(&mut extensions, notification.extensions_mut());
803                        std::mem::swap(&mut meta, notification.get_meta_mut());
804                        let context = NotificationContext {
805                            peer: peer.clone(),
806                            meta,
807                            extensions,
808                        };
809                        let current_span = tracing::Span::current();
810                        tokio::spawn(async move {
811                            let result = service.handle_notification(notification, context).await;
812                            if let Err(error) = result {
813                                tracing::warn!(%error, "Error sending notification");
814                            }
815                        }.instrument(current_span));
816                    }
817                }
818                Event::PeerMessage(JsonRpcMessage::Response(JsonRpcResponse {
819                    result,
820                    id,
821                    ..
822                })) => {
823                    if let Some(responder) = local_responder_pool.remove(&id) {
824                        let response_result = responder.send(Ok(result));
825                        if let Err(_error) = response_result {
826                            tracing::warn!(%id, "Error sending response");
827                        }
828                    }
829                }
830                Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
831                    if let Some(responder) = local_responder_pool.remove(&id) {
832                        let _response_result = responder.send(Err(ServiceError::McpError(error)));
833                        if let Err(_error) = _response_result {
834                            tracing::warn!(%id, "Error sending response");
835                        }
836                    }
837                }
838            }
839        };
840        let sink_close_result = transport.close().await;
841        if let Err(e) = sink_close_result {
842            tracing::error!(%e, "fail to close sink");
843        }
844        tracing::info!(?quit_reason, "serve finished");
845        quit_reason
846    }.instrument(current_span));
847    RunningService {
848        service,
849        peer: peer_return,
850        handle,
851        cancellation_token: ct.clone(),
852        dg: ct.drop_guard(),
853    }
854}